Source code for teaspoon.TDA.SLSP

# ~O(n) function that interleaves (alternates values from) two lists/arrays
def interleave(list1, list2):
    newlist = []
    a1, a2 = len(list1), len(list2)
    for i in range(max(a1, a2)):
        if i < a1:
            newlist.append(list1[i])
        if i < a2:
            newlist.append(list2[i])
    return newlist


def initialize_Q_M(sample_data):
    #import packages
    import numpy as np
    from scipy.signal import find_peaks
    from sortedcontainers import SortedList

    slope_o, slope_f = sample_data[0] - \
        sample_data[1], sample_data[-1]-sample_data[-2]

    # assumes trend at edges continues to infinity
    NegEnd, PosEnd = -float('inf'), float('inf')
    if slope_o < 0:
        sample_data[0] = NegEnd
    else:
        sample_data[0] = PosEnd
    if slope_f < 0:
        sample_data[-1] = NegEnd
    else:
        sample_data[-1] = PosEnd

    # get extrema locations
    max_locs, _ = find_peaks(sample_data)
    min_locs, _ = find_peaks(-sample_data)

    # add outside borders as infinity extrema
    if slope_o < 0:
        min_locs = np.insert(min_locs, 0, 0, axis=0)
    else:
        max_locs = np.insert(max_locs, 0, 0, axis=0)
    if slope_f < 0:
        min_locs = np.insert(min_locs, len(min_locs), -1, axis=0)
    else:
        max_locs = np.insert(max_locs, len(max_locs), -1, axis=0)

    # get extrema values
    max_vals, min_vals = sample_data[max_locs], sample_data[min_locs]

    if max_locs[0] < min_locs[0]:  # if peak first
        # chronologically ordered peaks and  valleys
        PV = interleave(max_vals, min_vals)
        # indices of ordered peaks and valleys
        I_PV = interleave(max_locs, min_locs)
    else:  # if valley first
        # chronologically ordered peaks and  valleys
        PV = interleave(min_vals, max_vals)
        # indices of ordered peaks and valleys
        I_PV = interleave(min_locs, max_locs)

    # get priority value array as chronological pairwise difference
    v = abs(np.diff(PV))

    # get pointer to M array before sorting
    ptr = np.arange(len(v))

    # generate priority matrix Q by stacking together v, ptr, PV and I_PV
    Q = (np.array([v, ptr, PV[:-1], PV[1:], I_PV[:-1], I_PV[1:]]).T).tolist()

    # generate dictionary for quick looking up indices in Q
    M = Q
    # M is defined as the unordered Q because it allows for pointers back to Q by a value lookup O(log(n)),
    # which is a lot faster than trying to update all the pointers in M to Q for each deleted pair.

    # created sorted list from Q
    Q = SortedList(Q)

    # get indice array for knowing which element of Q have been deleted.
    I = np.arange(len(Q)).tolist()
    I = SortedList(I)

    return Q, M, I


def update_Q_M(Q, M, I):

    # get indices to delete from priority value by searching indices left in I
    # index of I where correct index is found using binary search ~ O(log(n))
    i_q = I.bisect_left(int(Q[0][1]))
    i_prev, i_next = i_q-1, i_q+1  # get previous and next index ~ O(1)

    # get related indices in Q ~ O(log(n))
    I_next, I_q, I_prev = Q.index(M[I[i_next]]), 0, Q.index(M[I[i_prev]])

    # get new difference value after peak/vall diff removed ~ O(1)
    v_new = Q[I_next][0] + Q[I_prev][0] - Q[I_q][0]

    # define new element of Q ~ O(1)
    v1, v2, I_v1, I_v2 = Q[I_prev][2], Q[I_next][3], Q[I_prev][4], Q[I_next][5]
    q_new = [v_new, I[i_prev], v1, v2, I_v1, I_v2]

    # get current minimum difference array from priority queue ~ O(1)
    q = Q[I_q]

    # remove old rows of Q that were combined into one using dictionary values
    Q.pop(0)  # ~ O(1)
    Q.remove(M[I[i_prev]])  # ~ O(log(n))
    Q.remove(M[I[i_next]])  # ~ O(log(n))

    # add new element to Q from combining removed elements
    Q.add(q_new)  # ~ O(log(n))

    # update dictionary (don't need to remove dictionary values of other used elements). ~ O(1)
    M[I[i_prev]] = q_new

    # update index array by removing elements indices where elements of Q were removed. ~ O(2log(n))
    I.pop(i_next)
    I.pop(i_q)
    # Why use I? :
    # I is a list that keeps track of which element that is "left" in M.
    # The reason we are not directly deleting elements from M is that we chose to use M = Q.unsorted(),
    # which allows for quick lookup by index. Otherwise we would need to update multiple elements of M
    # for each iteration, slowing everything down.

    return Q, M, I, q


def get_persistence_pair(q):
    # get persistence values from priority element
    b, d, I_b, I_d = q[2], q[3], q[4], q[5]

    # sort persistence pair values so birth < death value
    if b > d:
        b, d, I_b, I_d = d, b, I_d, I_b  # ~ O(1)

    return b, d, I_b, I_d


[docs]def Persistence0D(ts): """This function calculates the zero-dimensional sublevel set persistence over a closed time domain using a recursively updated priority Q as a SortedList data structure. The algorithm is approximately O(log(n)). Args: ts (1-D array): time series. Returns: [px2 array]: peristence diagram with p persistence pairs. """ # import needed packages import numpy as np # change data to array sample_data = np.array(ts).astype(float) # initialize minmax matrix M and priority Q as a sorted list # auxillary data dictionary (D) and removal indices I Q, M, I = initialize_Q_M(sample_data) # Initialize data for results birth_indices, death_indices, persistenceDgm = [], [], [] while len(Q) >= 3: # while there is still values left in the matrix # update Q with auxilary data D (dictionary), I (indices from M), i (indices for peak/valley) Q, M, I, q = update_Q_M(Q, M, I) # get persistence pair from priority element b, d, I_b, I_d = get_persistence_pair(q) # record time series indices and birth and deaths and store persistence diagram point birth_indices.append(I_b) death_indices.append(I_d) persistenceDgm.append([b, d]) return np.array(birth_indices), np.array(death_indices), np.array(persistenceDgm)
# In[ ]: if __name__ == "__main__": # ___________________example_________________________ from teaspoon.TDA.SLSP import Persistence0D import matplotlib.pyplot as plt import matplotlib.gridspec as gridspec import numpy as np fs, T = 100, 10 t = np.linspace(-0.2, T, fs*T+1) A = 20 ts = A*np.sin(np.pi*t) + A*np.sin(1*t) feature_ind_1, feature_ind_2, persistenceDgm = Persistence0D(ts) D = persistenceDgm print(' Persistence Diagram Pairs: ', D) gs = gridspec.GridSpec(1, 2) plt.figure(figsize=(11, 5)) ax = plt.subplot(gs[0, 0]) plt.title('Time Series') plt.xlabel('$t$') plt.ylabel('$x(t)$') plt.plot(t, ts) ax = plt.subplot(gs[0, 1]) plt.title('Persistence Diagram') plt.xlabel('Birth') plt.ylabel('Death') plt.plot(D.T[0], D.T[1], 'ro') plt.plot([min(ts), max(ts)], [min(ts), max(ts)], 'k--') plt.show()