Source code for teaspoon.SP.adaptivePart

# -*- coding: utf-8 -*-
"""
Methods of partitioning birth-lifetime plane for persistence diagrams. This is
used for the adaptive partitioning version of template function featurization.

"""

# Created on Tue Aug 14 09:35:30 2018
#
# @author: khasawn3

import numpy as np
from scipy.stats import binned_statistic_2d, chisquare, rankdata
import matplotlib.pyplot as plt
import matplotlib.patches as patches
from copy import deepcopy
from sklearn.cluster import KMeans, MiniBatchKMeans


[docs]class Partitions: ''' A data structure for storing a partition coming from an adapative meshing scheme. Parameters: data (np.array): A numpy array of type many by 2 convertToOrd (bool): Boolean variable to decide if you want to use ordinals for partitioning. Ordinals make things faster but not as nice partitions. meshingScheme (str): The type of meshing scheme you want to use. Options include: - 'DV' method is based on (mention paper here). For more details see function return_partition_DV. - 'clustering' uses a clustering algorithm to find clusters in the data, then takes the bounding box of all points assigned to each cluster. For more details see the function return_partition_clustering. partitionParams (dict): Dictionary of parameters needed for the particular meshing scheme selected. For the explanation of the parameters see the function for the specific meshingScheme (i.e. return_partition_DV or return_partition_clustering) - For 'DV' the adjustable parameters are 'alpha', 'c', 'nmin', 'numParts', 'split'. - For 'clustering' the adjustable parameters are 'numClusters', 'clusterAlg', 'weights', 'boxOption', 'boxWidth', 'split'. kwargs: Any leftover inputs are stored as attributes. ''' def __init__(self, data=None, convertToOrd=False, meshingScheme=None, partitionParams={}, **kwargs): self.convertToOrd = convertToOrd self.meshingScheme = meshingScheme self.__dict__.update(kwargs) if data is not None: # # if using kmeans, we dont want to convert to ordinals if meshingScheme == 'DV': convertToOrd = True # check if we want to convert to ordinals # may not want to for certain partitioning schemes if convertToOrd: # check that the data is in ordinal coordinates # data converted to ordinal and stored locally if not already if not self.isOrdinal(data): print("Converting the data to ordinal...") # perform ordinal sampling (ranking) transformation xRanked = rankdata(data[:, 0], method='ordinal') yRanked = rankdata(data[:, 1], method='ordinal') # copy original data and save xFloats = np.copy(data[:, 0]) xFloats.sort() yFloats = np.copy(data[:, 1]) yFloats.sort() self.xFloats = xFloats self.yFloats = yFloats data = np.column_stack((xRanked, yRanked)) # and return an empty partition bucket # If there is data, set the bounding box to be the max and min in the data xmin = data[:, 0].min() xmax = data[:, 0].max() ymin = data[:, 1].min() ymax = data[:, 1].max() # self.borders stores x and y min and max of overall bounding box in 'nodes' and the number of points in the bounding box in 'npts' self.borders = {} self.borders['nodes'] = np.array([xmin, xmax, ymin, ymax]) self.borders['npts'] = data.shape[0] # set parameters for partitioning algorithm self.setParameters(partitionParams=partitionParams) # If there is data, use the chosen meshing scheme to build the partitions. if meshingScheme == 'DV': self.partitionBucket = self.return_partition_DV(data=data, borders=self.borders, r=self.numParts, alpha=self.alpha, c=self.c, nmin=self.nmin) elif meshingScheme == 'clustering': self.partitionBucket = self.return_partition_clustering(data=data, clusterAlg=self.clusterAlg, num_clusters=self.numClusters, weights=self.weights, boxOption=self.boxOption, boxSize=self.boxSize) else: # meshingScheme == None # Note that right now, this will just do the dumb thing for every other input self.partitionBucket = [self.borders] # set the partitions to just be the bounding box else: self.partitionBucket = []
[docs] def convertOrdToFloat(self, partitionEntry): ''' Converts to nodes of a partition entry from ordinal back to floats. Parameters: partitionEntry (dict): The partition that you want to convert. Returns: Partition entry with converted nodes. Also sets dictionary element to the converted version. ''' bdyList = partitionEntry['nodes'].copy() # Need to subtract one to deal with counting from # 0 vs counting from 1 problems xLow = int(bdyList[0])-1 xHigh = int(bdyList[1])-1 yLow = int(bdyList[2])-1 yHigh = int(bdyList[3])-1 if hasattr(self, 'xFloats'): xLowFloat = self.xFloats[xLow] xHighFloat = self.xFloats[xHigh] yLowFloat = self.yFloats[yLow] yHighFloat = self.yFloats[yHigh] convertedBdyList = [xLowFloat, xHighFloat, yLowFloat, yHighFloat] partitionEntry['nodes'] = convertedBdyList return partitionEntry else: print("You're trying to convert your ordinal data") print("back to floats, but you must have had ordinal") print("to begin with so I can't. Exiting...")
def __len__(self): return len(self.partitionBucket) def __getitem__(self, key): if hasattr(self, 'xFloats'): # if the data wasn't ordinal entry = self.partitionBucket[key].copy() entry = self.convertOrdToFloat(entry) return entry else: return self.partitionBucket[key]
[docs] def getOrdinal(self, key): ''' Overrides the builtin magic method in the case where you had non-ordinal data but still want the ordinal stuff back. If the data wasn't ordinal, this has the exact same effect as self[key]. ''' # overrides the builtin magic method in the case where # you had non-ordinal data but still want the ordinal # stuff back. # If the data wasn't ordinal, this has the exact same # effect as self[key] return self.partitionBucket[key]
def __iter__(self): # iterates over the converted entries in the # parameter bucket if hasattr(self, 'xFloats'): return map(self.convertOrdToFloat, deepcopy(self.partitionBucket)) else: return iter(self.partitionBucket)
[docs] def iterOrdinal(self): ''' Functions just like iter magic method without converting each entry back to its float ''' # functions just like iter magic method without # converting each entry back to its float return iter(self.partitionBucket)
def __str__(self): """! @brief Nicely prints all currently set values in the bucket. """ attrs = vars(self) output = '' output += 'Variables in partition bucket\n' output += '---\n' for key in attrs.keys(): output += str(key) + ' : ' output += str(attrs[key]) + '\n' output += '---\n' return output
[docs] def plot(self): ''' Plot the partitions. Can plot in ordinal or float, whichever is in the partition bucket when it's called. ''' # plot the partitions fig1, ax1 = plt.subplots() for binNode in self: # print(binNode) # get the bottom left corner corner = (binNode['nodes'][0], binNode['nodes'][2]) # get the width and height width = binNode['nodes'][1] - binNode['nodes'][0] height = binNode['nodes'][3] - binNode['nodes'][2] # add the corresponding rectangle ax1.add_patch(patches.Rectangle(corner, width, height, fill=False)) # Doesn't show unless we do this plt.axis('tight')
# helper function for error checking. Used to make sure input is in # ordinarl coordinates. It checks that when the two data columns are sorted # they are each equal to an ordered vector with the same number of rows.
[docs] def isOrdinal(self, dd): ''' Helper function for error checking. Used to make sure input is in ordinal coordinates. It checks that when the two data columns are sorted they are each equal to an ordered vector with the same number of rows. :param dd: Data in a manyx2 numpy array ''' return np.all(np.equal(np.sort(dd, axis=0), np.reshape(np.repeat(np.arange(start=1, stop=dd.shape[0]+1), 2), (dd.shape[0], 2))))
# data: is a manyx2 numpy array that contains all the original data # borders: a dictionary that contains 'nodes' with a numpyu array of Xmin, Xmax, Ymin, Ymax, # and 'npts' which contains the number of points in the bin # r: is the number of partitions # alpha: the significance level to test for independence
[docs] def return_partition_DV(self, data, borders, r=2, alpha=0.05, c=0, nmin=5): ''' Recursive method that partitions the data based on the DV method. Parameters: data (np.array): A manyx2 numpy array that contains all the data in ordinal format. borders (dict): A dictionary that contains 'nodes' with a numpy array of Xmin, Xmax, Ymin, Ymax. r (int): The number of partitions to split in each direction (i.e. r=2 means each partition is recursively split into a 2 by 2 grid of partitions) alpha (float): The required significance level for independence to stop partitioning c (int): Parameter for an exit criteria. Partitioning stops if min(width of partition, height of partition) < max(width of bounding box, height of bounding box)/c. nmin (int): Minimum average number of points in each partition to keep recursion going. The default is 5 because chisquare test breaks down with less than 5 points per partition, thus we recommend choosing nmin >= 5. Returns: List of dictionaries. Each dictionary corresponds to a partition and contains 'nodes', a numpy array of Xmin, Xmax, Ymin, Ymax of the partition, and 'npts', the number of points in the partition. ''' # extract the bin boundaries Xmin = borders['nodes'][0] Xmax = borders['nodes'][1] Ymin = borders['nodes'][2] Ymax = borders['nodes'][3] # find the number of bins # numBins = r ** 2 idx = np.where((data[:, 0] >= Xmin) & (data[:, 0] <= Xmax) & (data[:, 1] >= Ymin) & (data[:, 1] <= Ymax)) partitions = [] # Exit Criteria: # if either height or width is less than the max size, return width = self.xFloats[int(Xmax-1)] - self.xFloats[int(Xmin-1)] height = self.yFloats[int(Ymax-1)] - self.yFloats[int(Ymin-1)] if ((c != 0) and (min(width, height) < c)): # print('Box getting too small, min(width,height)<', c) # reject futher partitions, and return original bin partitions.insert(0, {'nodes': np.array([Xmin, Xmax, Ymin, Ymax]), 'npts': len(idx[0])}) return partitions # extract the points in the bin Xsub = data[idx, 0] Ysub = data[idx, 1] # print(Xsub.shape, '\t', Ysub.shape) # find the indices of the points in the x- and y-patches idx_x = np.where((data[:, 0] >= Xmin) & (data[:, 0] <= Xmax)) idx_y = np.where((data[:, 1] >= Ymin) & (data[:, 1] <= Ymax)) # get the subpartitions ai = np.floor(np.percentile( data[idx_x, 0], 1/r * np.arange(1, r) * 100)) bj = np.floor(np.percentile( data[idx_y, 1], 1/r * np.arange(1, r) * 100)) # get the bin edges edges1 = np.concatenate(([Xmin], ai, [Xmax])) edges2 = np.concatenate(([Ymin], bj, [Ymax])) # first exit criteria: we cannot split inot unique boundaries any more # preallocate the partition list if (len(np.unique(edges1, return_counts=True)[1]) < r + 1 or len(np.unique(edges2, return_counts=True)[1]) < r + 1): # reject futher partitions, and return original bin partitions.insert(0, {'nodes': np.array([Xmin, Xmax, Ymin, Ymax]), 'npts': len(idx[0])}) return partitions # figure out the shift in the edges so that boundaries do not overlap xShift = np.zeros((2 * r, 2 * r)) yShift = xShift xShift[:, 1:-1] = np.tile(np.array([[-1, 0]]), (2 * r, r - 1)) yShift = xShift.T # find the boundaries for each bin # duplicate inner nodes for x mesh dupMidNodesX = np.append(np.insert(np.repeat((edges1[1:-1]), 2, axis=0), 0, edges1[0]), edges1[-1]) # duplicate inner nodes for y mesh dupMidNodesY = np.append(np.insert(np.repeat((edges2[1:-1]), 2, axis=0), 0, edges2[0]), edges2[-1]) # reshape dupMidNodesY = np.reshape(dupMidNodesY, (-1, 1)) # now find the nodes for each bin xBinBound = dupMidNodesX + xShift yBinBound = dupMidNodesY + yShift # find the number of points in each bin, and put this info into array binned_data = binned_statistic_2d(Xsub.flatten(), Ysub.flatten(), None, 'count', bins=[edges1, edges2]) # get the counts. Flatten columnwise to match the bin definition in the # loop that creates the dictionaries below binCounts = binned_data.statistic.flatten('F') # Exit Criteria: # check if sum of bin counts is less than threshold of nmin per bin # nmin is necessary because chisquare breaks down if you have less than # 5 points in each bin if nmin != 0: if np.sum(binCounts) < nmin * (r**2): partitions.insert(0, {'nodes': np.array( [Xmin, Xmax, Ymin, Ymax]), 'npts': len(idx[0])}) return partitions # define an empty list to hold the dictionaries of the fresh partitions bins = [] # create dictionaries for each bin # start with the loop over y # note how the loop counts were obtained above to match the convention # here for yInd in np.arange(r): # this is the loop over x for xInd in np.arange(r): # get the bin number binNo = yInd * r + xInd xLow, xHigh = xBinBound[yInd, 2*xInd + np.arange(2)] yLow, yHigh = yBinBound[2*yInd + np.arange(2), xInd] bins.append({'nodes': np.array([xLow, xHigh, yLow, yHigh]), 'npts': binCounts[binNo]}) # calculate the chi square statistic chi2 = chisquare(binCounts) # check for independence and start recursion # if the chi2 test fails, do further partitioning: if (chi2.pvalue < alpha and Xmax != Xmin and Ymax != Ymin).all(): for binInfo in bins: if binInfo['npts'] != 0: # if the bin is not empty: # append entries to the tuple partitions.extend(self.return_partition_DV(data=data, borders=binInfo, r=r, alpha=alpha)) # Second exit criteria: # if the partitions are independent, reject further partitioning and # save the orignal, unpartitioned bin elif len(idx[0]) != 0: partitions.insert(0, {'nodes': np.array([Xmin, Xmax, Ymin, Ymax]), 'npts': len(idx[0])}) return partitions
[docs] def return_partition_clustering(self, data, clusterAlg=KMeans, num_clusters=5, weights=None, boxOption="boundPoints", boxSize=2): ''' Partitioning method based on clustering algorithms. First cluster the data, then using the cluster centers and labels determine the partitions. Parameters: data (np.array): A manyx2 numpy array that contains all the original data (not ordinals). cluster_algorithm (function): Clustering algorithm you want to use. Only options right now are KMeans and MiniBatchKMeans from scikit learn. num_clusters (int): The number of clusters you want. This is the number of partitions you want to divide your space into. weights (np.array): An array of the same length as data containing weights of points to use weighted clustering boxOption (str): Specifies how to choose the boxes based on cluster centers. Only option right now is "boundPoints" which takes the bounding box of all data points assigned to that cluster center. Additional options may be added in the future. boxSize (int): This input is not used as of now. Returns: List of dictionaries. Each dictionary corresponds to a partition and contains 'nodes', a numpy array of Xmin, Xmax, Ymin, Ymax of the partition, and 'center', the center of the cluster for that partition. ''' if weights == 0: sample_weights = data[:, 0] elif weights == 1: sample_weights = data[:, 1] else: sample_weights = None # Fit using whatever the chosen cluster algorithm is kmeans = clusterAlg(n_clusters=num_clusters).fit( data, sample_weight=sample_weights) # Get the centers of each cluster and the labels for the data points centers = kmeans.cluster_centers_ labels = kmeans.labels_ bins = [] # Using this optin, take the bounding box of points closest to each cluster center # These will be the partitions if boxOption == "boundPoints": for l in np.unique(labels): cluster = data[labels == l] xmin = min(cluster[:, 0]) xmax = max(cluster[:, 0]) ymin = min(cluster[:, 1]) ymax = max(cluster[:, 1]) # issues if bounding box touches x-axis so print a warning if it does # if ymin == 0: # print("Uh oh can't have points with zero lifetime!") bins.insert( 0, {'nodes': [xmin, xmax, ymin, ymax], 'center': centers[l]}) # # Using this option, put the equal size box centered at each cluster center # # These are then the partitions and we ignore anything outside them # elif boxOption == "equalSize": # print("STOP: the 'equalSize' option has not been debugged. It may be available later.") # print("If you used this option, I'm just giving you back the bounding box.") # # bins.insert(0, {'nodes':[ min(data[:,0]), max(data[:,0]), min(data[:,1]), max(data[:,1]) ]}) # # ###################################################################### # ### DON'T DELETE # ### This is a starting point but commented out because it doesn't work # ### properly. There is no error checking so boxes could cross x axis # ### which we can't have so needs more before it is usable # ###################################################################### # # if isinstance(boxSize, int): # # boxSize = list([boxSize,boxSize]) # # # # for l in np.unique(labels): # # center = centers[l] # # # # xmin = center[0] - boxSize[0]/2 # # xmax = center[0] + boxSize[0]/2 # # ymin = center[1] - boxSize[1]/2 # # ymax = center[1] + boxSize[1]/2 # # # # bins.insert(0,{'nodes': [xmin,xmax,ymin,ymax], 'center': centers[l]}) # ###################################################################### return bins
[docs] def setParameters(self, partitionParams): ''' Helper function to set the parameters depending on the meshing scheme. If any are not specified, it is set to a default value. Parameters: partitionParams: Dictionary containing parameters needed for the partitioning algorithm. ''' if self.meshingScheme == 'DV': xmin, xmax, ymin, ymax = self.borders['nodes'] # c det if 'c' in partitionParams: c = partitionParams['c'] if c != 0: # convert c from integer to the corresponding width/height width = (self.xFloats[xmax-1]-self.xFloats[xmin-1]) / c height = (self.yFloats[ymax-1]-self.yFloats[ymin-1]) / c self.c = max(width, height) else: # c=0 means we don't use this paramter for an exit criteria self.c = 0 else: c = 10 # convert c from integer to the corresponding width/height width = (self.xFloats[xmax-1]-self.xFloats[xmin-1]) / c height = (self.yFloats[ymax-1]-self.yFloats[ymin-1]) / c self.c = max(width, height) if 'alpha' in partitionParams: self.alpha = partitionParams['alpha'] else: self.alpha = 0.05 if 'nmin' in partitionParams: self.nmin = partitionParams['nmin'] else: self.nmin = 5 if 'numParts' in partitionParams: self.numParts = partitionParams['numParts'] else: self.numParts = 2 # if self.convertToOrd == False: # self.convertToOrd = True elif self.meshingScheme == 'clustering': if 'clusterAlg' in partitionParams: self.clusterAlg = partitionParams['clusterAlg'] else: self.clusterAlg = KMeans if 'numClusters' in partitionParams: self.numClusters = partitionParams['numClusters'] else: self.numClusters = 5 if 'weights' in partitionParams: self.weights = partitionParams['weights'] else: self.weights = None if 'pad' in partitionParams: self.pad = partitionParams['pad'] else: self.pad = 0.1 if 'boxOption' in partitionParams: self.boxOption = partitionParams['boxOption'] else: self.boxOption = "boundPoints" if 'boxSize' in partitionParams: self.boxSize = partitionParams['boxSize'] else: self.boxSize = 2 # if self.convertToOrd == True: # self.convertToOrd = False if 'split' in partitionParams: self.split = partitionParams['split'] else: self.split = False
# -------------------------------------------------- # this part tests the adaptive meshing algorithm if __name__ == "__main__": # generate a bivariate Gaussian # fix the seed For reproducibility np.random.seed(48824) # create a bivariate Gaussian mu = np.array([0, 0]) # the means cov = 0.7 # covariance sigma = np.array([[1, cov], [cov, 1]]) # covariance matrix # create the multivariate random variable nsamples = 100 # number of random samples x, y = np.random.multivariate_normal(mu, sigma, nsamples).T # # perform ordinal sampling (ranking) transformation # xRanked = rankdata(x, method='ordinal') # yRanked = rankdata(y, method='ordinal') # obtain the adaptive mesh numParts = 2 # get the adaptive partition of the data partitionList = Partitions(np.column_stack((x, y)), meshingScheme="DV", partitionParams={'numParts': numParts}, convertToOrd=True) # plot the partitions partitionList.plot() # overlay the data plt.plot(x, y, 'r*') # add formatting plt.axis('tight') # show the figure plt.show()