#import packages
from teaspoon.SP import network
from teaspoon.SP import tsa_tools
from teaspoon.SP import network_tools
from ripser import ripser
import numpy as np
import networkx as nx
# import sub modules
import os
import sys
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', '..'))
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
[docs]def DistanceMatrix(A, method='shortest_unweighted_path'):
"""This function calculates the distance matrix from a connected graph represented as an adjacency matrix using an available method."
Args:
A (2-D array): 2-D square adjacency matrix
method (str): Method for calculating distances between nodes. default is shortest_unweighted_path. Options are shortest_unweighted_path, shortest_weighted_path, weighted_shortest_path, and diffusion_distance.
Returns:
[2-D array]: Distance matrix between all node pairs.
"""
methods = ['shortest_unweighted_path', 'shortest_weighted_path',
'weighted_shortest_path', 'diffusion_distance']
if method not in methods:
print('Error: method listed for distance matrix not available.')
print('Defaulting to unweighted shortest path.')
method = 'shortest_unweighted_path'
A = network_tools.remove_zeros(A)
np.fill_diagonal(A, 0)
A = A + A.T
if method == 'shortest_unweighted_path':
D = network_tools.shortest_path(A, weighted_path_lengths=False)
if method == 'shortest_weighted_path':
D = network_tools.shortest_path(A, weighted_path_lengths=True)
if method == 'weighted_shortest_path':
D = network_tools.weighted_shortest_path(A)
if method == 'diffusion_distance':
G = nx.from_numpy_array(A)
diam = nx.algorithms.distance_measures.diameter(G)
walk_steps = int(2*diam)
# degree vector used later.
deg_vec = network_tools.degree_vector(np.copy(A))
D = network_tools.diffusion_distance(
A, d=deg_vec, t=walk_steps, lazy=True)
return D
[docs]def point_summaries(diagram, A):
"""This function calculates the persistent homology statistics for a graph from the paper "Persistent Homology of Complex Networks for Dynamic State Detection."
Args:
A (2-D array): 2-D square adjacency matrix
diagram (list): persistence diagram from ripser from a graph's distance matrix
Returns:
[array 1-D]: statistics (R, En M) as (maximum persistence ratio, persistent entropy normalized, homology class ratio). Returns NaNs if empty diagram.
"""
# assertion errors to check data types
assert (len(A[0]) == len(A.T[0])), "A is not square adjacency matrix."
assert (len(diagram) > 1), "Diagram should include atleast 0D and 1D persistene diagrams as a list of numpy.ndarrays."
assert (type(diagram) is list), "Diagram should include atleast 0D and 1D persistene diagrams as a list of numpy.ndarrays."
assert (type(
diagram[0]) is np.ndarray), "Diagram should include atleast 0D and 1D persistene diagrams as a list of numpy.ndarrays."
def persistentEntropy(lt):
if len(lt) > 1:
L = sum(lt)
p = lt/L
E = sum(-p*np.log2(p))
Emax = np.log2(sum(lt))
E = E/Emax
if len(lt) == 1:
E = 0
if len(lt) == 0:
E = 1
return E
if len(diagram[1]) > 0:
# ------------Entropy---------------
lt = np.array(diagram[1].T[1]-diagram[1].T[0])
D1 = np.array([diagram[1].T[0], diagram[1].T[1]])
D1 = D1
lt = lt[lt < 10**10]
num_lifetimes1 = len(lt)
num_unique = len(A[0])
En = persistentEntropy(lt)
# ------------maximum persistence ratio---------------
H1 = diagram[1].T
delta = 0.1
R = (1-np.nanmax(H1)/np.floor((num_unique/3)-delta))
# ------------homology class ratio---------------
M = num_lifetimes1/num_unique
statistics = [R, En, M]
else:
statistics = [np.nan, np.nan, np.nan]
return statistics
[docs]def PH_network(D, max_homology_dimension=1):
"""This function calculates the persistent homology of the graph represented by the adjacency matrix A using a distance algorithm defined by user.
Args:
D (2-D array): Distance matrix between all node pairs.
Other Parameters:
max_homology_dimension (Optional[int]): maximum dimension of the homology.
Returns:
[list]: list of lists where ech list is a persistence diagram (standard ripser format).
"""
from scipy import sparse
D_sparse = sparse.coo_matrix(D).tocsr()
result = ripser(D_sparse, distance_matrix=True,
maxdim=max_homology_dimension)
diagram = result['dgms']
return diagram
# In[ ]:
# Only runs if running from this file (This will show basic example)
if __name__ == "__main__":
import os
import sys
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', '..'))
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
# ---------------------------Complex Example---------------------------------------
# import needed packages
import numpy as np
from teaspoon.SP.network import ordinal_partition_graph
from teaspoon.TDA.PHN import PH_network
from teaspoon.SP.network_tools import make_network
from teaspoon.parameter_selection.MsPE import MsPE_tau
# generate a siple sinusoidal time series
t = np.linspace(0, 30, 300)
ts = np.sin(t) + np.sin(2*t)
# Get appropriate dimension and delay parameters for permutations
tau = int(MsPE_tau(ts))
n = 5
# create adjacency matrix, this
A = ordinal_partition_graph(ts, n, tau)
# get distance matrix
D = DistanceMatrix(A, method='shortest_unweighted_path')
# get networkx representation of network for plotting
G, pos = make_network(A, position_iterations=1000,
remove_deg_zero_nodes=True)
# calculate persistence diagram
diagram = PH_network(D)
print('1-D Persistent Homology (loops): ', diagram[1])
stats = point_summaries(diagram, A)
print('Persistent homology of network statistics: ', stats)
import matplotlib.pyplot as plt
import matplotlib.gridspec as gridspec
import networkx as nx
TextSize = 14
plt.figure(2)
plt.figure(figsize=(8, 8))
gs = gridspec.GridSpec(4, 2)
ax = plt.subplot(gs[0:2, 0:2]) # plot time series
plt.title('Time Series', size=TextSize)
plt.plot(ts, 'k')
plt.xticks(size=TextSize)
plt.yticks(size=TextSize)
plt.xlabel('$t$', size=TextSize)
plt.ylabel('$x(t)$', size=TextSize)
plt.xlim(0, len(ts))
ax = plt.subplot(gs[2:4, 0])
plt.title('Network', size=TextSize)
nx.draw(G, pos, with_labels=False, font_weight='bold', node_color='blue',
width=1, font_size=10, node_size=30)
ax = plt.subplot(gs[2:4, 1])
plt.title('Persistence Diagram', size=TextSize)
MS = 3
if len(diagram[1]) > 0:
top = max(diagram[1].T[1])
else:
top = 1
plt.plot([0, top*1.25], [0, top*1.25], 'k--')
plt.yticks(size=TextSize)
plt.xticks(size=TextSize)
plt.xlabel('Birth', size=TextSize)
plt.ylabel('Death', size=TextSize)
plt.plot(diagram[1].T[0], diagram[1].T[1], 'go', markersize=MS+2)
plt.xlim(0, top*1.25)
plt.ylim(0, top*1.25)
plt.subplots_adjust(hspace=0.8)
plt.subplots_adjust(wspace=0.35)
plt.show()