#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Sun Aug 25 20:29:36 2019
@author: wanxiang.shen@u.nus.edu
matrix operation
"""
import numpy as np
from lapjv import lapjv
from scipy.signal import convolve2d
from scipy.spatial.distance import cdist
[docs]class Scatter2Grid:
def __init__(self):
"""assign x,y coords to gird numpy array"""
self.fmap_shape = None
self.indices = None
self.indices_list = None
[docs] def fit(self, df, split_channels = True, channel_col = 'Channels'):
"""
parameters
------------------
df: dataframe with x, y columns
split_channels: bool, if True, will apply split by group
channel_col: column in df.columns, split to groups by this col
"""
df['idx'] = range(len(df))
embedding_2d = df[['x','y']].values
N = len(df)
size1 = int(np.ceil(np.sqrt(N)))
size2 = int(np.ceil(N/size1))
grid_size = (size1, size2)
grid = np.dstack(np.meshgrid(np.linspace(0, 1, size2),
np.linspace(0, 1, size1))).reshape(-1, 2)
grid_map = grid[:N]
cost_matrix = cdist(grid_map, embedding_2d, "sqeuclidean").astype(float)
cost_matrix = cost_matrix * (100000 / cost_matrix.max())
row_asses, col_asses, _ = lapjv(cost_matrix)
self.row_asses = row_asses
self.col_asses = col_asses
self.fmap_shape = grid_size
self.indices = col_asses
self.channel_col = channel_col
self.split_channels = split_channels
df['indices'] = self.indices
self.df = df
if self.split_channels:
def _apply_split(x):
return x[['idx', 'indices']].to_dict('list')
sidx = df.groupby(channel_col).apply(_apply_split)
channels = sidx.index.tolist()
indices_list = sidx.tolist()
self.channels = channels
self.indices_list = indices_list
[docs] def refit_c(self, df):
"""
parameters
------------------
df: dataframe with x, y columns
"""
df['idx'] = range(len(df))
df['indices'] = self.indices
self.df = df
if self.split_channels:
def _apply_split(x):
return x[['idx', 'indices']].to_dict('list')
sidx = df.groupby(self.channel_col).apply(_apply_split)
channels = sidx.index.tolist()
indices_list = sidx.tolist()
self.channels = channels
self.indices_list = indices_list
[docs]class Scatter2Array:
def __init__(self, fmap_shape = (128,128)):
"""convert x,y coords to numpy array"""
self.fmap_shape = fmap_shape
self.indices = None
self.indices_list = None
def _fit(self, df):
"""df: dataframe with x, y columns"""
M, N = self.fmap_shape
self.X = np.linspace(df.x.min(), df.x.max(), M)
self.Y = np.linspace(df.y.min(), df.y.max(), N)
def _transform(self, dfnew):
"""dfnew: dataframe with x, y columns
in case we need to split channels
"""
x = dfnew.x.values
y = dfnew.y.values
M, N = self.fmap_shape
indices = []
for i in range(len(dfnew)):
#perform a l1 distance
idx = np.argmin(abs(self.X-x[i]))
idy = np.argmin(abs(self.Y-y[i]))
indice = N*idy + idx
indices.append(indice)
return indices
[docs] def fit(self, df, split_channels = True, channel_col = 'Channels'):
"""
parameters
---------------
df: embedding_df, dataframe
split_channels: bool, if True, will apply split by group
channel_col: column in df.columns, split to groups by this col
"""
df['idx'] = range(len(df))
self.df = df
self.channel_col = channel_col
self.split_channels = split_channels
_ = self._fit(df)
if self.split_channels:
g = df.groupby(channel_col)
sidx = g.apply(self._transform)
self.channels = sidx.index.tolist()
self.indices_list = sidx.tolist()
else:
self.indices = self._transform(df)
[docs]def smartpadding(array, target_size, mode='constant', constant_values=0):
"""
array: 2d array to be padded
target_size: tuple of target array's shape
"""
X, Y = array.shape
M, N = target_size
top = int(np.ceil((M-X)/2))
bottom = int(M - X - top)
right = int(np.ceil((N-Y)/2))
left = int(N - Y - right)
array_pad = np.pad(array, pad_width=[(top, bottom),
(left, right)],
mode=mode,
constant_values=constant_values)
return array_pad
[docs]def fspecial_gauss(size = 31, sigma = 2):
"""Function to mimic the 'fspecial' gaussian MATLAB function
size should be odd value
"""
x, y = np.mgrid[-size//2 + 1:size//2 + 1, -size//2 + 1:size//2 + 1]
g = np.exp(-((x**2 + y**2)/(2.0*sigma**2)))
return g/g.sum()
[docs]def conv2(array, kernel_size = 31, sigma = 2, mode='same', fillvalue = 0):
kernel = fspecial_gauss(kernel_size, sigma)
return np.rot90(convolve2d(np.rot90(array, 2), np.rot90(kernel, 2),
mode=mode,
fillvalue = fillvalue), 2)