Source code for aggmap.utils.matrixopt

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Sun Aug 25 20:29:36 2019

@author: wanxiang.shen@u.nus.edu

matrix operation

"""

import numpy as np
from lapjv import lapjv
from scipy.signal import convolve2d
from scipy.spatial.distance import cdist


[docs]class Scatter2Grid: def __init__(self): """assign x,y coords to gird numpy array""" self.fmap_shape = None self.indices = None self.indices_list = None
[docs] def fit(self, df, split_channels = True, channel_col = 'Channels'): """ parameters ------------------ df: dataframe with x, y columns split_channels: bool, if True, will apply split by group channel_col: column in df.columns, split to groups by this col """ df['idx'] = range(len(df)) embedding_2d = df[['x','y']].values N = len(df) size1 = int(np.ceil(np.sqrt(N))) size2 = int(np.ceil(N/size1)) grid_size = (size1, size2) grid = np.dstack(np.meshgrid(np.linspace(0, 1, size2), np.linspace(0, 1, size1))).reshape(-1, 2) grid_map = grid[:N] cost_matrix = cdist(grid_map, embedding_2d, "sqeuclidean").astype(float) cost_matrix = cost_matrix * (100000 / cost_matrix.max()) row_asses, col_asses, _ = lapjv(cost_matrix) self.row_asses = row_asses self.col_asses = col_asses self.fmap_shape = grid_size self.indices = col_asses self.channel_col = channel_col self.split_channels = split_channels df['indices'] = self.indices self.df = df if self.split_channels: def _apply_split(x): return x[['idx', 'indices']].to_dict('list') sidx = df.groupby(channel_col).apply(_apply_split) channels = sidx.index.tolist() indices_list = sidx.tolist() self.channels = channels self.indices_list = indices_list
[docs] def refit_c(self, df): """ parameters ------------------ df: dataframe with x, y columns """ df['idx'] = range(len(df)) df['indices'] = self.indices self.df = df if self.split_channels: def _apply_split(x): return x[['idx', 'indices']].to_dict('list') sidx = df.groupby(self.channel_col).apply(_apply_split) channels = sidx.index.tolist() indices_list = sidx.tolist() self.channels = channels self.indices_list = indices_list
[docs] def transform(self, vector_1d): """vector_1d: extracted features """ ### linear assignment map ### M, N = self.fmap_shape if self.split_channels: arr_res = [] for idict in self.indices_list: indices = idict['indices'] idx = idict['idx'] arr = np.zeros(self.fmap_shape) arr_1d = arr.reshape(M*N, ) arr_1d[indices] = vector_1d[idx] arr = arr_1d.reshape(M, N) arr_res.append(arr) arr_res = np.stack(arr_res, axis=-1) else: arr_res = np.zeros(self.fmap_shape) arr_1d = arr_res.reshape(M*N, ) arr_1d[self.indices] = vector_1d arr_res = arr_1d.reshape(M, N, 1) return arr_res
[docs]class Scatter2Array: def __init__(self, fmap_shape = (128,128)): """convert x,y coords to numpy array""" self.fmap_shape = fmap_shape self.indices = None self.indices_list = None def _fit(self, df): """df: dataframe with x, y columns""" M, N = self.fmap_shape self.X = np.linspace(df.x.min(), df.x.max(), M) self.Y = np.linspace(df.y.min(), df.y.max(), N) def _transform(self, dfnew): """dfnew: dataframe with x, y columns in case we need to split channels """ x = dfnew.x.values y = dfnew.y.values M, N = self.fmap_shape indices = [] for i in range(len(dfnew)): #perform a l1 distance idx = np.argmin(abs(self.X-x[i])) idy = np.argmin(abs(self.Y-y[i])) indice = N*idy + idx indices.append(indice) return indices
[docs] def fit(self, df, split_channels = True, channel_col = 'Channels'): """ parameters --------------- df: embedding_df, dataframe split_channels: bool, if True, will apply split by group channel_col: column in df.columns, split to groups by this col """ df['idx'] = range(len(df)) self.df = df self.channel_col = channel_col self.split_channels = split_channels _ = self._fit(df) if self.split_channels: g = df.groupby(channel_col) sidx = g.apply(self._transform) self.channels = sidx.index.tolist() self.indices_list = sidx.tolist() else: self.indices = self._transform(df)
[docs] def transform(self, vector_1d): """vector_1d: feature values 1d array""" M, N = self.fmap_shape arr = np.zeros(self.fmap_shape) arr_1d = arr.reshape(M*N, ) if self.split_channels: df = self.df arr_res = [] for indices, channel in zip(self.indices_list, self.channels): arr = np.zeros(self.fmap_shape) df1 = df[df[self.channel_col] == channel] idx = df1.idx.tolist() arr_1d_copy = arr_1d.copy() arr_1d_copy[indices] = vector_1d[idx] arr_1d_copy = arr_1d_copy.reshape(M, N) arr_res.append(arr_1d_copy) arr_res = np.stack(arr_res, axis=-1) else: arr_1d_copy = arr_1d.copy() arr_1d_copy[self.indices] = vector_1d arr_res = arr_1d_copy.reshape(M, N, 1) return arr_res
[docs]def smartpadding(array, target_size, mode='constant', constant_values=0): """ array: 2d array to be padded target_size: tuple of target array's shape """ X, Y = array.shape M, N = target_size top = int(np.ceil((M-X)/2)) bottom = int(M - X - top) right = int(np.ceil((N-Y)/2)) left = int(N - Y - right) array_pad = np.pad(array, pad_width=[(top, bottom), (left, right)], mode=mode, constant_values=constant_values) return array_pad
[docs]def fspecial_gauss(size = 31, sigma = 2): """Function to mimic the 'fspecial' gaussian MATLAB function size should be odd value """ x, y = np.mgrid[-size//2 + 1:size//2 + 1, -size//2 + 1:size//2 + 1] g = np.exp(-((x**2 + y**2)/(2.0*sigma**2))) return g/g.sum()
[docs]def conv2(array, kernel_size = 31, sigma = 2, mode='same', fillvalue = 0): kernel = fspecial_gauss(kernel_size, sigma) return np.rot90(convolve2d(np.rot90(array, 2), np.rot90(kernel, 2), mode=mode, fillvalue = fillvalue), 2)