Source code for teaspoon.parameter_selection.MI_delay

"""
Uses mutual information to find a suitable delay via the location
of the first minima in the mutual information vs delay plot, which is calculated using multiple
x(t) vs x(t+tau) plots. These plots have their individual mutual information calculated. Various methods
for partitioning the x(t) vs x(t+tau) plots for calculating the mutual information.
"""

import scipy
import numpy as np


class Partitions:
    def __init__(self, data=None,
                 meshingScheme=None,
                 numParts=3,
                 alpha=0.05):
        import scipy

        if data is not None:
            # check that the data is in ordinal coordinates
            if not self.isOrdinal(data):
                print("Converting the data to ordinal...")
                # perform ordinal sampling (ranking) transformation
                xRanked = scipy.stats.rankdata(data[:, 0], method='ordinal')
                yRanked = scipy.stats.rankdata(data[:, 1], method='ordinal')

                xFloats = np.copy(data[:, 0])
                xFloats.sort()
                yFloats = np.copy(data[:, 1])
                yFloats.sort()

                self.xFloats = xFloats
                self.yFloats = yFloats

                data = np.column_stack((xRanked, yRanked))

                # and return an empty partition bucket

            # If there is data, set the bounding box to be the max and min in the data

            xmin = data[:, 0].min()
            xmax = data[:, 0].max()
            ymin = data[:, 1].min()
            ymax = data[:, 1].max()

            self.borders = {}
            self.borders['nodes'] = np.array([xmin, xmax, ymin, ymax])
            self.borders['npts'] = data.shape[0]
            self.numParts = numParts
            self.alpha = alpha

            # If there is data, use the chosen meshing scheme to build the partitions.
            if meshingScheme == 'DV' and self.isOrdinal(data):
                # Figure out
                self.partitionBucket = self.return_partition_DV(data=data,
                                                                borders=self.borders,
                                                                r=self.numParts,
                                                                alpha=self.alpha)
            else:  # meshingScheme == None
                # Note that right now, this will just do the dumb thing for every other input
                self.partitionBucket = [self.borders]
                #  set the partitions to just be the bounding box

        else:
            self.partitionBucket = []

    def convertOrdToFloat(self, partitionEntry):
        bdyList = partitionEntry['nodes'].copy()
        # Need to subtract one to deal with counting from
        # 0 vs counting from 1 problems
        xLow = int(bdyList[0])-1
        xHigh = int(bdyList[1])-1
        yLow = int(bdyList[2])-1
        yHigh = int(bdyList[3])-1

        if hasattr(self, 'xFloats'):
            xLowFloat = self.xFloats[xLow]
            xHighFloat = self.xFloats[xHigh]
            yLowFloat = self.yFloats[yLow]
            yHighFloat = self.yFloats[yHigh]
            convertedBdyList = [xLowFloat, xHighFloat, yLowFloat, yHighFloat]
            partitionEntry['nodes'] = convertedBdyList
            return partitionEntry
        else:
            print("You're trying to convert your ordinal data")
            print("back to floats, but you must have had ordinal")
            print("to begin with so I can't.  Exiting...")

    def __len__(self):
        return len(self.partitionBucket)

    def __getitem__(self, key):
        if hasattr(self, 'xFloats'):  # if the data wasn't ordinal
            entry = self.partitionBucket[key].copy()
            entry = self.convertOrdToFloat(entry)
            return entry
        else:
            return self.partitionBucket[key]

    def getOrdinal(self, key):
        # overrides the builtin magic method in the case where
        # you had non-ordinal data but still want the ordinal
        # stuff back.
        # If the data wasn't ordinal, this has the exact same
        # effect as self[key]
        return self.partitionBucket[key]

    def __iter__(self):
        # iterates over the converted entries in the
        # parameter bucket
        if hasattr(self, 'xFloats'):
            return map(self.convertOrdToFloat, deepcopy(self.partitionBucket))
        else:
            return iter(self.partitionBucket)

    def iterOrdinal(self):
        # functions just like iter magic method without
        # converting each entry back to its float
        return iter(self.partitionBucket)

    def __str__(self):
        """!
        @brief Nicely prints all currently set values in the bucket.
        """
        attrs = vars(self)
        output = ''
        output += 'Variables in partition bucket\n'
        output += '---\n'
        for key in attrs.keys():
            output += str(key) + ' : '
            output += str(attrs[key]) + '\n'
            output += '---\n'
        return output

    def plot(self):
        import matplotlib.pyplot as plt
        import matplotlib
        # plot the partitions
        fig1, ax1 = plt.subplots()
        for binNode in self:
            # print(binNode)
            # get the bottom left corner
            corner = (binNode['nodes'][0], binNode['nodes'][2])

            # get the width and height
            width = binNode['nodes'][1] - binNode['nodes'][0]
            height = binNode['nodes'][3] - binNode['nodes'][2]

            # add the corresponding rectangle
            ax1.add_patch(matplotlib.patches.Rectangle(
                corner, width, height, fill=False))

        # Doesn't show unless we do this
        plt.axis('tight')

    # helper function for error checking. Used to make sure input is in
    # ordinarl coordinates. It checks that when the two data columns are sorted
    # they are each equal to an ordered vector with the same number of rows.
    def isOrdinal(self, dd):
        return np.all(np.equal(np.sort(dd, axis=0),
                               np.reshape(np.repeat(np.arange(start=1, stop=dd.shape[0]+1),
                                                    2), (dd.shape[0], 2))))

    # data: is a manyx2 numpy array that contains all the original data
    # borders: a dictionary that contains 'nodes' with a numpyu array of Xmin, Xmax, Ymin, Ymax,
    # and 'npts' which contains the number of points in the bin
    # r: is the number of partitions
    # alpha: the significance level to test for independence

    def return_partition_DV(self, data, borders, r=2, alpha=0.05):
        import scipy
        # extract the bin boundaries
        Xmin = borders['nodes'][0]
        Xmax = borders['nodes'][1]
        Ymin = borders['nodes'][2]
        Ymax = borders['nodes'][3]

        # find the number of bins
    #    numBins = r ** 2
        idx = np.where((data[:, 0] >= Xmin)
                       & (data[:, 0] <= Xmax)
                       & (data[:, 1] >= Ymin)
                       & (data[:, 1] <= Ymax))

        # extract the points in the bin
        Xsub = data[idx, 0]
        Ysub = data[idx, 1]

    #    print(Xsub.shape, '\t', Ysub.shape)

        # find the indices of the points in the x- and y-patches
        idx_x = np.where((data[:, 0] >= Xmin) & (data[:, 0] <= Xmax))
        idx_y = np.where((data[:, 1] >= Ymin) & (data[:, 1] <= Ymax))

        # get the subpartitions
        ai = np.floor(np.percentile(
            data[idx_x, 0], 1/r * np.arange(1, r) * 100))
        bj = np.floor(np.percentile(
            data[idx_y, 1], 1/r * np.arange(1, r) * 100))

        # get the bin edges
        edges1 = np.concatenate(([Xmin], ai, [Xmax]))
        edges2 = np.concatenate(([Ymin], bj, [Ymax]))

        # first exit criteria: we cannot split inot unique boundaries any more
        # preallocate the partition list
        partitions = []
        if (len(np.unique(edges1, return_counts=True)[1]) < r + 1 or
                len(np.unique(edges2, return_counts=True)[1]) < r + 1):

            # reject futher partitions, and return original bin
            partitions.insert(0, {'nodes': np.array([Xmin, Xmax, Ymin, Ymax]),
                                  'npts': len(idx[0])})
            return partitions

        # figure out the shift in the edges so that boundaries do not overlap
        xShift = np.zeros((2 * r, 2 * r))
        yShift = xShift
        xShift[:, 1:-1] = np.tile(np.array([[-1, 0]]), (2 * r, r - 1))
        yShift = xShift.T

        # find the boundaries for each bin
        # duplicate inner nodes for x mesh
        dupMidNodesX = np.append(np.insert(np.repeat((edges1[1:-1]), 2, axis=0),
                                           0, edges1[0]), edges1[-1])

        # duplicate inner nodes for y mesh
        dupMidNodesY = np.append(np.insert(np.repeat((edges2[1:-1]), 2, axis=0),
                                           0, edges2[0]), edges2[-1])
        # reshape
        dupMidNodesY = np.reshape(dupMidNodesY, (-1, 1))

        # now find the nodes for each bin
        xBinBound = dupMidNodesX + xShift
        yBinBound = dupMidNodesY + yShift
        # find the number of points in each bin, and put this info into array
        binned_data = scipy.stats.binned_statistic_2d(Xsub.flatten(), Ysub.flatten(), None, 'count',
                                                      bins=[edges1, edges2])
        # get the counts. Flatten columnwise to match the bin definition in the
        # loop that creates the dictionaries below
        binCounts = binned_data.statistic.flatten('F')

        # define an empty list to hold the dictionaries of the fresh partitions
        bins = []
        # create dictionaries for each bin
        # start with the loop over y
        # note how the loop counts were obtained above to match the convention
        # here
        for yInd in np.arange(r):
            # this is the loop over x
            for xInd in np.arange(r):
                # get the bin number
                binNo = yInd * r + xInd
                xLow, xHigh = xBinBound[yInd, 2*xInd + np.arange(2)]
                yLow, yHigh = yBinBound[2*yInd + np.arange(2), xInd]
                bins.append({'nodes': np.array([xLow, xHigh, yLow, yHigh]),
                             'npts': binCounts[binNo]})

        # calculate the chi square statistic
        chi2 = scipy.stats.chisquare(binCounts)

        # check for independence and start recursion
        # if the chi2 test fails, do further partitioning:
        if (chi2.pvalue < alpha and Xmax != Xmin and Ymax != Ymin).all():
            for binInfo in bins:
                if binInfo['npts'] != 0:  # if the bin is not empty:
                    # append entries to the tuple
                    partitions.extend(self.return_partition_DV(data=data,
                                                               borders=binInfo,
                                                               r=r, alpha=alpha))

        # Second exit criteria:
        # if the partitions are independent, reject further partitioning and
        # save the orignal, unpartitioned bin
        elif len(idx[0]) != 0:
            partitions.insert(0, {'nodes': np.array([Xmin, Xmax, Ymin, Ymax]),
                                  'npts': len(idx[0])})

        return partitions


# In[ ]:


[docs]def MI_DV(x, y): """This function calculates the mutual information between the time series x(t) and its delayed version x(t+tau) using adaptive partitioning of the plots of the time series x(t) and its delayed version x(t+tau). This method was developed by Georges Darbellay and Igor Vajda in 1999 and was published as Estimation of information by an adaptive partitioning of the observation. Args: x (array): time series x(t) y (array): delayed time series x(t + tau) Returns: (float): I, mutual information between x(t) and x(t+tau). """ # input: x: x cooridnate array (could be time array), y: y coorindate array (could be time series) import numpy as np from scipy.stats import rankdata # perform ordinal sampling (ranking) transformation xRanked = rankdata(x, method='ordinal') yRanked = rankdata(y, method='ordinal') # obtain the adaptive mesh numParts = 4 # get the adaptive partition of the data partitionList = Partitions(np.column_stack( (xRanked, yRanked)), meshingScheme="DV", numParts=numParts) # partitions are sorted in class as: # 1. overall borders with number of points total: xmin, xmax, ymin, ymax # 2. numParts = max number of parts in a single bin/partition # 3. alpha # 4. partition bucket: this has all the partition's borders and number of points in each. # extract the bin counts binCounts = np.array([partitionList.partitionBucket[i].get( "npts") for i in range(len(partitionList))]) # get the total number of points N = partitionList.borders.get("npts") # grab the probability information from the partition Pn_AB = binCounts / N # grab the probbility of the horizontal strips for each bin PC = partitionList # partition cells Pn_AR = np.zeros(len(partitionList)) Pn_RB = np.zeros(len(partitionList)) for Bin in range(len(partitionList)): # go through each bin Pn_AR[Bin] = len([xRanked for i in range(N) # count number of point between x bounds of bin if xRanked[i] >= PC[Bin].get('nodes')[0] and xRanked[i] <= PC[Bin].get('nodes')[1]]) Pn_RB[Bin] = len([yRanked for i in range(N) # count number of point between y bounds of bin if yRanked[i] >= PC[Bin].get('nodes')[2] and yRanked[i] <= PC[Bin].get('nodes')[3]]) Pn_AR = (Pn_AR)/N # divide for probability Pn_RB = (Pn_RB)/N # find the approximation for the mutual information function Iapprox = np.dot(Pn_AB, np.log(Pn_AB/(Pn_AR*Pn_RB))) return Iapprox
# This function computes the mutual information function based on the # algorithm described in: # "Estimating Mutual Information," Alexander Kraskov, Harald Stoegbauer, Peter Grassberger, 2003.
[docs]def MI_kraskov(x, y, k=2, ranking=True): """This function estimates the mutual information between the time series x(t) and its delayed version x(t+tau) in two different ways. This method was developed by Alexander Kraskov, Harald Stoegbauer, and Peter Grassberger in 2003 and published as Estimating Mutual Information. Args: x (array): time series x(t) y (array): delayed time series x(t + tau) Kwargs: k (int): number of nearest neighbors used in MI estimation. Default is k = 2. ranking (bool): whether the ranked or unranked x and y inputs will be used. Default is ranking = True. Returns: (float): I1, first mutual information estimation method between x(t) and x(t+tau). (float): I2, second mutual information estimation method between x(t) and x(t+tau). """ if ranking == True: # rank x and y ordinally from scipy.stats import rankdata x = rankdata(x, method='ordinal') y = rankdata(y, method='ordinal') import numpy as np # put into a column vector and perturb the data to improve uniqueness x = x # + (10**-5)*np.random.random_sample((len(x),)) y = y # + (10**-5)*np.random.random_sample((len(y),)) lenZ = len(x) # find nearest neighbors in s-q plane using the chebyshev distance from sklearn.neighbors import NearestNeighbors vec = np.stack((x, y)).T nbrs = NearestNeighbors( n_neighbors=k+1, algorithm='ball_tree', metric='chebyshev').fit(vec) distances, indZ = nbrs.kneighbors(vec) # from the subset of nearest neighbors, find the distance to the kth nearest neighbor after # projecting the points on each axis cDx = abs( x[indZ] - np.tile(np.reshape((x[indZ].T[0]), (len(x[indZ].T[0]), 1)), k+1)) cDx = np.sort(cDx, axis=-1).T[-1] cDy = abs( y[indZ] - np.tile(np.reshape((y[indZ].T[0]), (len(y[indZ].T[0]), 1)), k+1)) cDy = np.sort(cDy, axis=-1).T[-1] # finding closest nearest kth neighbor distance for each node cD = distances.T[-1] # METHOD 1 # find number of points (s-q plane) that fall within strips with widths defined by cDx and cDy nxI = [] nyI = [] # array of distances in x from point i x_d = abs((np.tile(x.reshape((len(x), 1)), len(x)) - x)) # array of distances in y from point i y_d = abs((np.tile(y.reshape((len(y), 1)), len(y)) - y)) for i in range(len(cDx)): nxI.append(len(x_d[i][x_d[i] <= cD[i]])) nyI.append(len(y_d[i][y_d[i] <= cD[i]])) nxI = np.array(nxI) - 1 nyI = np.array(nyI) - 1 # METHOD 2 # find number of points falling within epsilon/2 in x and y nxI2 = [] nyI2 = [] for i in range(len(cDx)): # append if distance is less than epsilon_x/2 nxI2.append(len(x_d[i][x_d[i] <= (cDx[i])])) # append if distance is less than epsilon_y/2 nyI2.append(len(y_d[i][y_d[i] <= (cDy[i])])) nxI2 = np.array(nxI2) - 1 # subtract one for self nyI2 = np.array(nyI2) - 1 # subtract one for self # compute the mutual information function using the digamma function from scipy.special import digamma # alogirthm 1 mutual information: I1 = digamma(k) - np.mean(digamma(nxI + 1) + digamma(nyI+1)) + digamma(lenZ) # algorithm 2 mutual information I2 = digamma(k) - (1/k) - np.mean(digamma(nxI2) + digamma(nyI2)) + digamma(lenZ) return I1, I2
# This function computes the mutual information function based on # equal sized bins for x and y arrays (seperately)
[docs]def MI_basic(x, y, h_method='sturge', ranking=True): """This function calculates the mutual information between the time series x(t) and its delayed version x(t+tau) using equi-spaced partitions. The size of the partition is based on the desired bin size method commonly selected for histograms. Args: x (array): time series x(t) y (array): delayed time series x(t + tau) Kwargs: h_method (string): bin size selection method. Methods are struge, sqrt, or rice. Default is sturge. ranking (bool): whether the ranked or unranked x and y inputs will be used. Default is ranking = True. Returns: (float): I, mutual information between x(t) and x(t+tau). """ # input: x: x array (could be time array), y: y array (could be time series) import numpy as np from scipy.stats import rankdata # number of points Nx = len(x) Ny = len(y) if Nx == Ny: N = Nx else: N = max([Nx, Ny]) # perform ordinal sampling (ranking) transformation if ranking == True: x = rankdata(x, method='ordinal') y = rankdata(y, method='ordinal') # first find number of bins based on given method # Either Sturge's or Rice Rule provides the most acurate results so far # strurges works better for smaller data sets if h_method == 'sturge': Bx = np.log2(Nx) + 1 By = np.log2(Ny) + 1 B = [Bx, By] if h_method == 'sqrt': Bx = np.sqrt(Nx) By = np.sqrt(Ny) B = [Bx, By] if h_method == 'rice': Bx = 2*(Nx**(1/3)) By = 2*(Ny**(1/3)) B = [Bx, By] B = np.round(np.array(B)).astype(int) # calculates 1D histogram for both x and y seperately Hx = np.histogram(x, bins=int(B[0]))[0] Hy = np.histogram(y, bins=int(B[1]))[0] # calculates histogram for 2D data on s-q plot with given number of bins Hxy = np.histogram2d(x, y, bins=B)[0] Px = Hx/N Py = Hy/N Pxy = Hxy/N # Ignore divide by zero warnings (NaN set to zero in next line) with np.errstate(divide='ignore', invalid='ignore'): I_matrix = Pxy*np.log(Pxy/(Px*Py)) where_are_NaNs = np.isnan(I_matrix) I_matrix[where_are_NaNs] = 0 I = sum(sum(I_matrix)) return (I)
[docs]def MI_for_delay(ts, plotting=False, method='basic', h_method='sturge', k=2, ranking=True): """This function calculates the mutual information until a first minima is reached, which is estimated as a sufficient embedding dimension for permutation entropy. Args: ts (array): Time series (1d). Kwargs: plotting (bool): Plotting for user interpretation. defaut is False. method (string): Method for calculating MI. Options include basic, kraskov 1, kraskov 2, or adaptive partitions. Default is basic. h_method (string): Bin size selection method for basic method. Methods are struge, sqrt, or rice. Default is sturge. ranking (bool): Whether the ranked or unranked x and y inputs will be used for kraskov and basic methods. Default is ranking = True. k (int): Number of nearest neighbors used in MI estimation for Kraskov methods. Default is k = 2. Returns: (int): tau, The embedding delay for permutation formation based on first mutual information minima. """ delayMax = 250 min_flag = False I = [] # initializes information array tau = [] delay = 0 method_flag = False while delay < delayMax and min_flag == False: delay = delay + 1 if method == 'adaptive partitions': method_flag = True # takes all terms from time series besides last (delay) terms x = ts[:-delay] # takes all terms from time series besides first (delay) terms y = ts[delay:] I.append(MI_DV(x, y)) tau.append(delay) if method == 'kraskov 1': method_flag = True # takes all terms from time series besides last (delay) terms x = ts[:-delay] # takes all terms from time series besides first (delay) terms y = ts[delay:] MI_k = MI_kraskov(x, y) I.append(MI_k[0]) tau.append(delay) if method == 'kraskov 2': method_flag = True # takes all terms from time series besides last (delay) terms x = ts[:-delay] # takes all terms from time series besides first (delay) terms y = ts[delay:] MI_k = MI_kraskov(x, y) I.append(MI_k[1]) tau.append(delay) if method == 'basic': method_flag = True # takes all terms from time series besides last (delay) terms x = ts[:-delay] # takes all terms from time series besides first (delay) terms y = ts[delay:] I.append(MI_basic(x, y)) tau.append(delay) if delay > 1 or delay == delayMax-1: if I[delay-2]-I[delay-1] < 0 or delay == delayMax-1: # if increasing delay_at_min = delay min_flag = True if method_flag == False: print('Warning: invalid method entered.') break if plotting == True: import matplotlib.pyplot as plt plt.figure(figsize=(10, 6)) TextSize = 18 plt.plot(tau, I, label=method + ': ' + str(delay_at_min-1)) plt.xlabel(r'$\tau$', size=TextSize) plt.ylabel('MI', size=TextSize) plt.xticks(size=TextSize) plt.yticks(size=TextSize) #plt.legend(loc = 'upper right', fontsize = TextSize) plt.ylim(0) plt.show() return int(delay_at_min - 1)
# In[ ]: if __name__ == "__main__": from teaspoon.parameter_selection.MI_delay import MI_for_delay import numpy as np fs = 10 t = np.linspace(0, 100, fs*100) ts = np.sin(t) + np.sin((1/np.pi)*t) tau = MI_for_delay(ts, plotting=True, method='basic', h_method='sturge', k=2, ranking=True) print('Delay from MI: ', tau)