# '''
# Machine learning featurization method
# If you make use of this code, please cite the following paper:<br/>
# J.A. Perea, E. Munch, and F. Khasawneh. "Approximating Continuous Functions On Persistence Diagrams." Preprint, 2017.
# '''
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
from persim import PersistenceImager
import math
from math import pi
from numpy.linalg import norm as lnorm
from sympy.abc import t
from sympy import Piecewise
from sympy import diff, integrate
from itertools import combinations
from numba import guvectorize
from sklearn import mixture
import copy
# -------------------------------------------- #
# -------------------------------------------- #
# ----------Featurization--------------------- #
# -------------------------------------------- #
# -------------------------------------------- #
"""
.. module::feature_functions
"""
[docs]
def tent(Dgm, params, dgm_type='BirthDeath'):
'''
Applies the tent function to a diagram.
Parameters:
:Dgm:
A persistence diagram, given as a :math:`K \\times 2` numpy array
:params:
An tents.ParameterBucket object. Really, we need d, delta, and epsilon from that.
:dgm_type:
This code accepts diagrams either
1. in (birth, death) coordinates, in which case `type = 'BirthDeath'`, or
2. in (birth, lifetime) = (birth, death-birth) coordinates, in which case `type = 'BirthLifetime'`
:Returns:
:math:`\sum_{x,y \in \\text{Dgm}}g_{i,j}(x,y)` where
.. math::
\\bigg| 1- \\max\\left\\{ \\left|\\frac{x}{\\delta} - i\\right|, \\left|\\frac{y-x}{\\delta} - j\\right|\\right\\} \\bigg|_+]
where :math:`| * |_+` is positive part; equivalently, min of * and 0.
.. note:: This code does not take care of the maxPower polynomial stuff. The tbuild_G() function does it after all the rows have been calculated.
'''
d = params.d
delta = params.delta
epsilon = params.epsilon
# print(Dgm[:3])
# Move to birth,lifetime plane
if dgm_type == 'BirthDeath':
T = np.array(((1, -1), (0, 1)))
A = np.dot(Dgm, T)
elif dgm_type == 'BirthLifetime':
A = Dgm
else:
print('Your choices for type are "BirthDeath" or "BirthLifetime".')
print('Exiting...')
return
I, J = np.meshgrid(range(d + 1), range(1, d + 1))
Iflat = delta * I.reshape(np.prod(I.shape))
Jflat = delta * J.reshape(np.prod(I.shape)) + epsilon
Irepeated = Iflat.repeat(Dgm.shape[0])
Jrepeated = Jflat.repeat(Dgm.shape[0])
DgmRepeated = np.tile(A, (len(Iflat), 1))
BigIJ = np.array((Irepeated, Jrepeated)).T
B = DgmRepeated - BigIJ
B = np.abs(B)
B = np.max(B, axis=1)
B = delta - B
B = np.where(B >= 0, B, 0)
B = B.reshape((Iflat.shape[0], Dgm.shape[0]))
out = np.sum(B, axis=1)
out = out.reshape((d, d + 1)).T.flatten()
out = out / delta
# TO BE REMOVED.... THIS HAS BEEN MOVED TO build_G()
# if params.maxPower >1:
# BigOuts = [out]
# # Make 2 using np.triu_indices
# indices = np.array(np.triu_indices(len(out)))
# C = out[indices.T]
# C = np.prod(C,1)
# BigOuts.append(C)
# # Make 3 or above using itertools
# # NOTE: This is incredibly slow and should be improved.
# for i in range(3,params.maxPower + 1):
# C = [a for a in itertools.combinations_with_replacement(out,i)]
# C = np.array(C)
# C = np.prod(C,1)
# BigOuts.append(C)
# # turn all of them into one long vector
# out = np.concatenate(BigOuts)
return out
def sub2ind(array_shape, rows, cols):
'''
Converts subscripts to indices
'''
ind = rows * array_shape[1] + cols
ind[ind < 0] = -1
ind[ind >= array_shape[0] * array_shape[1]] = -1
return ind
# convert indices to subscripts
def ind2sub(array_shape, ind):
'''
Converts indices to subscripts
'''
ind[ind < 0] = -1
ind[ind >= array_shape[0] * array_shape[1]] = -1
rows = (ind.astype('int') / array_shape[1])
cols = ind % array_shape[1]
return rows, cols
# find the quadrature/interpolation weights for common orthognal functions
# define the function blocks
# Chebyshev-Gauss points of the first kind
def quad_cheb1(npts=10):
'''
Find the quadrature/interpolation weights for common orthognal functions
Define the function blocks
Chebyshev-Gauss points of the first kind
'''
# get the Chebyshev nodes of the first kind
x = np.cos(np.pi * np.arange(0, npts + 1) / npts)
# get the corresponding quadrature weights
w = np.pi / (1 + np.repeat(npts, npts + 1))
return x, w
# Computes the Legendre-Gauss-Lobatto nodes, and their quadrate weights.
# The LGL nodes are the zeros of (1-x**2)*P'_N(x), wher P_N is the nth Legendre
# polynomial
def quad_legendre(npts=10):
'''
Computes the Legendre-Gauss-Lobatto nodes, and their quadrate weights.
The LGL nodes are the zeros of (1-x**2)*P'_N(x), wher P_N is the nth Legendre polynomial
'''
# Truncation + 1
nptsPlusOne = npts + 1
eps = np.spacing(1) # find epsilon, the machine precision
# Use the Chebyshev-Gauss-Lobatto nodes as the first guess
x = np.cos(np.pi * np.arange(0, npts + 1) / npts)
# Get the Legendre Vandermonde Matrix
P = np.zeros((nptsPlusOne, nptsPlusOne))
# Compute P_(N) using the recursion relation
# Compute its first and second derivatives and
# update x using the Newton-Raphson method.
xold = 2
while np.max(np.abs(x - xold)) > eps:
xold = x
P[:, 0] = 1
P[:, 1] = x
for k in range(1, npts):
P[:, k + 1] = ((2 * k + 1) * x * P[:, k] -
k * P[:, k - 1]) / (k + 1)
x = xold - (x * P[:, npts] - P[:, npts - 1]) \
/ (nptsPlusOne * P[:, npts])
# get the corresponding quadrature weights
w = 2 / (npts * nptsPlusOne * P[:, npts] ** 2)
return x, w
'''
Map the inputs to the function block.
You can invoke the desired function block using:
quad_pts_and_weights['name_of_function'](npts)
'''
quad_pts_and_weights = {'cheb1': quad_cheb1, 'legendre': quad_legendre}
# map the inputs to the function blocks
# you can invoke the desired function block using:
# quad_pts_and_weights['name_of_function'](npts)
quad_pts_and_weights = {'cheb1': quad_cheb1,
'legendre': quad_legendre
}
# find the barycentric interplation weights
def bary_weights(x):
'''
Find the barycentric interplation weights
Calculate Barycentric weights for the base points x.
.. note:: this algorithm may be numerically unstable for high degree
Polynomials (e.g. 15+). If using linear or Chebyshev spacing of base
points then explicit formulae should then be used. See Berrut and
Trefethen, SIAM Review, 2004.
'''
# find the length of x
n = x.size
# Rescale for numerical stability
eps = np.spacing(1) # find epsilon, the machine precision
x = x * (1 / np.prod(x[x > -1 + eps] + 1)) ** (1 / n)
# find the weights
w = np.zeros((1, n))
for i in np.arange(n):
w[0, i] = np.prod(x[i] - np.delete(x, i))
return 1 / w
def bary_diff_matrix(xnew, xbase, w=None):
'''
Calculate both the derivative and plain Lagrange interpolation matrix
using Barycentric formulae from Berrut and Trefethen, SIAM Review, 2004.
Parameters:
:Parameter xnew:
Interpolation points
:Parameter xbase:
Base points for interpolation
:Parameter w:
Weights calculated for base points (optional)
'''
# if w is not set, set it
if w is None:
w = bary_weights(xbase)
# get the length of the base points
n = xbase.size
# get the length of the requested points
nn = xnew.size
# replicate the weights vector into a matrix
wex = np.tile(w, (nn, 1))
# Barycentric Lagrange interpolation (from Berrut and Trefethen, SIAM Review, 2004)
xdiff = np.tile(xnew[np.newaxis].T, (1, n)) - np.tile(xbase, (nn, 1))
M = wex / xdiff
divisor = np.tile(np.sum(M, axis=1)[np.newaxis].T, (1, n))
divisor[np.isinf(divisor)] = np.inf
M[np.isinf(M)] = np.inf
M = M / divisor
# M[np.isnan(M)] = 0
M[xdiff == 0] = 1
# # Construct the derivative (Section 9.3 of Berrut and Trefethen)
# xdiff2 = xdiff ** 2
#
# frac1 = wex / xdiff
# frac1[np.isinf(frac1)] = float("inf")
#
# frac2 = wex / xdiff2
#
# DM = (M * np.tile(np.sum(frac2, axis=1)[np.newaxis].T, (1, n)) - frac2) / np.tile(np.sum(frac1, axis=1)[np.newaxis].T, (1, n))
# row, col = np.where(xdiff == 0)
#
#
# if np.all(row == 0): # or, row.size == 0:
# DM[row, ] = (wex[row, ] / np.tile(w[col].T, (1, n))) / xdiff[row, ]
# idx = sub2ind(DM.shape, row, col)
# DM[idx] = 0
# DM[idx] = -np.sum(DM[row, ], axis=1)
return M
[docs]
def interp_polynomial(Dgm, params, dgm_type='BirthDeath'):
'''
Extracts the weights on the interpolation mesh using barycentric Lagrange interpolation.
Parameters:
:Dgm:
A persistence diagram, given as a :math:`K \\times 2` numpy array
:params:
An tents.ParameterBucket object. Really, we need d, delta, and epsilon from that.
:dgm_type:
This code accepts diagrams either
1. in (birth, death) coordinates, in which case `type = 'BirthDeath'`, or
2. in (birth, lifetime) = (birth, death-birth) coordinates, in which case `dgm_type = 'BirthLifetime'`
:Returns:
interp_weight
A matrix with each entry representiting the weight of an interpolation
function on the base mesh. This matrix assumes that on a 2D mesh the functions are ordered row-wise.
'''
# jacobi_func = params.jacobi_func
# check if we asked for a squre mesh or not
if isinstance(params.d, int):
nx = params.d
ny = params.d
else:
nx, ny = params.d
# check if the Dgm is empty. If it is, pass back zeros
if Dgm.size == 0:
return np.zeros((nx + 1) * (ny + 1))
# Move to birth,lifetime plane
if dgm_type == 'BirthDeath':
T = np.array(((1, -1), (0, 1)))
A = np.dot(Dgm, T)
elif dgm_type == 'BirthLifetime':
A = Dgm
else:
print('Your choices for type are "BirthDeath" or "BirthLifetime".')
print('Exiting...')
return
if params.partitions == None:
xmin = A[:, 0].min()
xmax = A[:, 0].max()
ymin = A[:, 1].min()
ymax = A[:, 1].max()
box = {'nodes': np.array([xmin, xmax, ymin, ymax]), 'npts': A.shape[0]}
params.partitions = [box]
# first, get the entries in Dgm that are within each partition
for partition in params.partitions:
query_Dgm_pts = getSubset(A, partition)
# get the number of query points
num_query_pts = len(query_Dgm_pts)
# check if the intersection of the Dgm and the partition is empty.
# If it is, pass back zeros
if num_query_pts == 0:
return np.zeros((nx + 1) * (ny + 1))
# get the query points. xq are the brith times, yq are the death times.
xq, yq = query_Dgm_pts[:, 0], query_Dgm_pts[:, 1]
# xq, yq = np.sort(query_Dgm_pts[:, 0]), np.sort(query_Dgm_pts[:, 1])
# 1) Get the base nodes:
# get the 1D base nodes in x and y
xmesh, w = quad_pts_and_weights[params.jacobi_poly](nx)
ymesh, w = quad_pts_and_weights[params.jacobi_poly](ny)
xmesh = np.sort(xmesh)
ymesh = np.sort(ymesh)
# shift the base mesh points to the interval of interpolation [ax, bx], and
# [ay, by]
ax, bx, ay, by = partition['nodes']
# ax = 5
# bx = 6
xmesh = (bx - ax) / 2 * xmesh + (bx + ax) / 2
# ay = 5
# by = 6
ymesh = (by - ay) / 2 * ymesh + (by + ay) / 2
# define a mesh on the base points
x_base, y_base = np.meshgrid(xmesh, ymesh, sparse=False, indexing='ij')
# get the x and y interpolation matrices
# get the 1D interpolation matrix for x
x_interp_mat = bary_diff_matrix(xnew=xq, xbase=xmesh)
x_interp_mat = x_interp_mat.T # transpose the x-interplation matrix
# get the 1D interpolation matrix for y
y_interp_mat = bary_diff_matrix(xnew=yq, xbase=ymesh)
# replicate each column in the x-interpolation matrix n times
Gamma = np.repeat(x_interp_mat, ny + 1, axis=1)
# unravel, then replicate each row in the y-interpolation matrix m times
y_interp_mat.shape = (1, y_interp_mat.size)
Phi = np.repeat(y_interp_mat, nx + 1, axis=0)
# element-wise multiply Gamma and Phi
Psi = Gamma * Phi
# split column-wise, then concatenate row-wise
# if Psi.size > 0: # check that Psi is not empty
Psi = np.concatenate(np.split(Psi, num_query_pts, axis=1), axis=0)
# now reshape Psi so that each row corresponds to the weights of one query pt
Psi = np.reshape(Psi, (num_query_pts, -1))
# get the weights for each interpolation function/base-point
interp_weights = np.sum(np.abs(Psi), axis=0)
# print('I ran the feature function!')
# plt.figure(10)
# plt.plot(np.abs(interp_weights),'x')
# plt.show()
return np.abs(interp_weights)
# this function returns the points from querSet that are within the baseRecatangle
def getSubset(querySet, baseRectangle):
"""
Parameters
----------
querySet : TYPE
DESCRIPTION.
baseRectangle : TYPE
DESCRIPTION.
Returns
-------
TYPE
DESCRIPTION.
"""
# get the rectangel corners
xmin = baseRectangle['nodes'][0]
xmax = baseRectangle['nodes'][1]
ymin = baseRectangle['nodes'][2]
ymax = baseRectangle['nodes'][3]
# subset
return querySet[(querySet[:, 0] >= xmin) &
(querySet[:, 0] <= xmax) &
(querySet[:, 1] >= ymin) &
(querySet[:, 1] <= ymax), :]
def PLandscapes(A, L_number=[]):
"""
This function computes persistence landscapes for given persistence diagrams.
Function has two inputs which are persistence diagrams and chosen landscapes function.
Algorithm computes all persistence landscapes for the persistence diagram.
The output is an dictionary that includes all landscape functions, total number of landscapes and desired landscapes (if user specify L_number).
"""
# :param ndarray (A):
# Persistence diagram points--(Nx2) matrix.
# :param list (L_number):
# Landscape numbers user wants to compute
# :Returns:
# :output:
# (dict) Dictionary that includes the outputs
# * all: (DataFrame) Includes all landscape functions of the persistece diagram
# * LN : (int) Total number of landscapes for persistence diagram A
# * DesPL: (ndarray) Includes only selected landscapes functions given in L_number
# sort persistence diagrams with respect to increasing order of birth time
A = np.array(sorted(A, key=lambda x: x[0]))
# sort persistence diagrams that has same birth time with respect to descending order of death time
# group persistence diagrams with respect to same birth times
compressed_data = np.split(A, np.unique(
A[:, 0], return_index=1)[1][1:], axis=0)
# convert list object to array
compressed_data = np.array(compressed_data, dtype=object)
# sorting each birth time arrays with respect to descending order of death time
N = np.arange(0, len(compressed_data), 1)
compressed_data = [np.array(
sorted(compressed_data[i], key=lambda x: x[1], reverse=True)) for i in N]
A = np.vstack(compressed_data)
# create p as zero matrix
p = np.zeros((1, 2))
infnt = float('inf') # define infinity value
l_n = 0
pl = np.zeros((1, 3))
Land = []
A1 = [0, 0]
while A1 != []:
# initialize L
L = []
# the first term (b,d) form A
b = A[0, 0]
d = A[0, 1]
# let next point to be p
length = len(A)
if length > 1:
p[0, 0] = A[1, 0]
p[0, 1] = A[1, 1]
else:
p[0, 0] = infnt
p[0, 1] = 0
# add (-inf,0),(b,0),((b+d)/2,(d-b)/2) to L
L.append([-infnt, 0])
L.append([b, 0])
L.append([0.5*(b+d), 0.5*(d-b)])
# pop the first term (b,d) form A
A = np.delete(A, (0), axis=0)
while L[-1] != [infnt, 0]:
# convert death_time into list to find dmax
death_t = list(A[:, 1])
if death_t == []:
dmax = d
else:
dmax = np.amax(death_t)
# if maximum death time of the remaning terms in A is bigger or equal to d, add those two terms into the L
if d >= dmax:
L.append([d, 0])
L.append([infnt, 0])
else: # if maximum death time of the remaning terms in A is not bigger or equal to d
for index, item in enumerate(death_t):
if item > d: # find the first item that is bigger than d
d_p = item # define new d_prime
b_p = A[index, 0] # define new b_prime
break
# assign next elment as p if d_p is not last element of matrix A
if d_p != A[-1, 1]:
# let next point to be p
p[0, 0] = A[index+1, 0]
p[0, 1] = A[index+1, 1]
else:
p[0, 0] = infnt
p[0, 1] = 0
# delete (b_p,d_p) form A
A = np.delete(A, (index), axis=0)
if b_p > d:
L.append([d, 0]) # add (d,0) to L
if b_p >= d:
L.append([b_p, 0]) # add (b_p,0) to L
else:
L.append([(b_p+d)/2, (d-b_p)/2])
# push (b_p,d) in A in order of p
A = np.insert(A, 0, np.array((b_p, d)), axis=0)
# sort persistence diagrams with respect to increasing order of birth time
A = np.array(sorted(A, key=lambda x: x[0]))
# sort persistence diagrams that has same birth time with respect to descending order of death time
# group persistence diagrams with respect to same birth times
compressed_data = np.split(A, np.unique(
A[:, 0], return_index=1)[1][1:], axis=0)
# convert list object to array
compressed_data = np.array(compressed_data)
N = np.arange(0, len(compressed_data), 1)
compressed_data = [np.array(
sorted(compressed_data[i], key=lambda x: x[1], reverse=True)) for i in N]
A = np.vstack(compressed_data)
# find index of point p - use min command if multiple of rows of A matrix has same value
index_p = min(((np.where(A[:, 0] == b_p)))[0])
# assign next elment as p if b_p is not last element of matrix A
if (len(A)-1) != index_p:
p[0, 0] = A[index_p+1, 0]
p[0, 1] = A[index_p+1, 1]
else:
p[0, 0] = infnt
p[0, 1] = 0
L.append([(b_p+d_p)/2, (d_p-b_p)/2])
b = b_p # b_prime becomes b
d = d_p # d_prime becomes d
l_n = l_n+1 # counter for persistence landscape
# add tag information to calculated landscape so that np.groupby can be used
tag = np.full((len(L), 1), l_n)
Land = np.append(L, tag, axis=1)
# exclude the infinity terms inside the landscape
Land = Land[1:-1]
# add tagging information to list of landscapes point
pl = np.concatenate((pl, Land), axis=0)
# create list of last version of A to be able to check while condition above
A1 = list(A[:, 0])
# delete first zero row from pl
pl = np.delete(pl, 0, axis=0)
# convert landscape matrix into dataframe
pl = pd.DataFrame(pl)
# rename columns
pl = pl.rename(columns={0: "x", 1: "y", 2: "Landscape"})
# make group of points with respect to their landscape number
comp_pl = pl.groupby(['Landscape']).apply(
lambda x: np.transpose(np.array([x.x, x.y])))
comp_pl = comp_pl.reset_index()
comp_pl.columns = ['Landscape Number', 'Points']
PL = comp_pl.iloc[:, 1].values
Landscape_number = l_n
output = {}
output['all'] = comp_pl
output['LN'] = Landscape_number
# in the case of user wants specific landscape data points
if len(L_number) != 0 and np.all(np.array(L_number) < Landscape_number):
L_number[:] = [number - 1 for number in L_number]
output['DesPL'] = PL[L_number]
return output
class PLandscape():
# if user wants to see the plot and the points of the specific landscapes, L_number variable should be given as input.
def __init__(self, PD, L_number=[]):
"""
This class uses landscapes algorithm (:meth:`PD_Featurization.PLandscapes`) to compute persistence landscapes and plot them based on user preference.
The algorithm computes the persistence landscapes is written based on Ref. :cite:`1 <Bubenik2017>`.
Parameters
----------
PD : ndarray
Persistence diagram points--(Nx2) matrix..
L_number : list
Desired landscape numbers in a list form. If the list is empty, all landscapes will be plotted.
Returns
-------
PL_number : int
Total number of landscapes for given persistence diagram
DesPL : ndarray
Includes only selected landscapes functions given in L_number
AllPL : ndarray
Includes all landscape functions of the persistece diagram
"""
# L_number is a Nx1 matrix that includes the desired numbers of landscapes
out = PLandscapes(PD, L_number)
self.PL_number = out['LN']
if len(L_number) != 0:
# if user gives landscape number which does not exist:
for i in range(0, len(L_number)):
if L_number[i] > out['LN']:
print('Warning:'+' Landscape number {} does not exist. Number of Landscapes are {}. Please enter desired landscape values less than or equal to {}.'.format(
L_number, out['LN'], out['LN']))
break
# Warning: If user enters a specific landscape number which does not exist for current persistence diagrams,
# class returns a warning message to user.
if np.all(np.array(L_number) < out['LN']):
self.DesPL = out['DesPL']
else:
self.DesPL = 'Warning: Desired landscape numbers were not specified.'
self.AllPL = out['all']
def PLandscape_plot(self, PL, L_number=[], show_mesh=False):
"""
This function plots selected persistence landscapes or it plots all of them if user does not provide desired landscape functions.
Parameters
----------
PL : ndarray
Persistence diagram points--(Nx2) matrix.
L_number : list
Desired landscape numbers in a list form. If the list is empty, all landscapes will be plotted.
show_mesh : bool
Shows the mesh
Returns
-------
PL_plot : figure
The figure that includes chosen or all landsape functions.
"""
import matplotlib.pyplot as plt
import numpy as np
from matplotlib import rc
k = len(PL) # number of landscapes
# if user does not enter any landscape function, all landscpaes will be plotted
if L_number == []:
L_number = np.linspace(1, k, k)
plt.figure()
plt.ioff()
for i in range(0, len(L_number)): # plotting the landscapes
index1 = int(L_number[i])
x = PL[index1-1][:, 0]
y = PL[index1-1][:, 1]
plt.plot(x, y, label=r'Landscape %s' % index1)
if show_mesh == True:
plt.scatter(x, np.zeros(len(x)), marker='.', c='Red')
plt.scatter(x, y, marker='.')
plt.vlines(x, ymin=0, ymax=y, colors='Red', linestyles='--', linewidth=0.5)
plt.legend(bbox_to_anchor=(1.05, 1), loc=2, borderaxespad=0.)
fig = plt.gcf()
PL_plot = fig
return PL_plot
[docs]
def F_Landscape(PL, params, max_l_number=None):
"""
This function computes the features for selected persistence landscape numbers.
There are three inputs to the function.
These are all landscape functions for each persistence diagram, parameter bucket object and the maximum level of landscape function.
If user does not specify the third input, algorithm will automatically compute it.
The second parameter includes the parameters needed to compute features and perform classification.
Please see :meth:`PD_ParameterBucket.LandscapesParameterBucket` for more details about parameters.
Parameters
----------
PL : ndarray
Object array that includes all landscape functions for each persistence diagram.
params : parameterbucket object
Parameterbucket object. We need landscape numbers defined by user to generate feature matrix.
max_l_number : int, optional
Maximum number of landscape functions for a given persistence diagram. The default is None.
Returns
-------
feature : ndarray
NxM matrix that includes the features for each persistence diagram, where N is the number of
persistence diagrams and M is the numbers of features which is equal to length of sorted
mesh of landscapes functions.
Sorted_mesh : list
It includes the sorted mesh for each landscape function chosen by user..
"""
a = PL
N = len(a)
# landscape number will be used to generate feature matrix
PLN = params.PL_Number
if PLN == None:
PLN[0] = 1
# find mesh for given landscape number
l_number = []
if max_l_number == None: # find maximum number of landscapes in whole sets
[l_number.append(len(a[i])) for i in np.arange(0, N, 1)]
# maximum landscape number for given persistence diagram set
max_l_number = max(l_number)
# this list includes x points of specific landscapes for whole landscapes in increasing order and duplicates are removed
Sorted_mesh = []
interp_y = []
for j in range(len(PLN)):
X = []
for i in range(0, N):
k = a[i].iloc[:, 1].values # landscapes set
if len(k) >= PLN[j]:
m = k[PLN[j]-1][:, 0]
# combine whole x values of same landscapes into one vector
X.append(m)
X = np.hstack(X)
# sorting x values in increasing order
K = np.array(X)
Sorted_X = np.array(sorted(K))
Sorted_X_unique = np.unique(Sorted_X) # removing duplicates
Sorted_mesh.append(Sorted_X_unique)
desPL = PLN[j]
# taking mesh for nth landscapes
xvals = np.array(Sorted_mesh[j])
y_interp = np.zeros((len(xvals), N))
for i in range(0, N): # the loop which iterates for whole cases
# check if the landscape function exist for the current set of landscapes
if len(a[i]) >= PLN[j]:
L = a[i].iloc[:, 1].values # landscapes set
# x values of nth landscape
x = L[desPL-1][:, 0]
# y values of nth landscape
y = L[desPL-1][:, 1]
# piecewise linear interpolation
y_interp[:, i] = np.interp(xvals, x, y)
interp_y.append(y_interp[0:len(xvals), 0:N])
feature = np.zeros((N, 1))
for i in range(0, len(PLN)):
Piecewise_Linear = np.array(interp_y[i])
ftr = Piecewise_Linear.transpose()
feature = np.concatenate((feature, ftr), axis=1)
feature = feature[:, 1:]
return feature, Sorted_mesh
[docs]
def F_Image(PD1, PS, var, pers_imager=None, training=True, parallel=False):
"""
This function computes the persistence images of given persistence diagrams
using `Persim <https://persim.scikit-tda.org/en/latest/notebooks/Classification%20with%20persistence%20images.html>`_
package of Python. Then it provides user with the feature matrix for the diagrams.
Parameters
----------
PD1 : ndarray
Object array that includes all persistence diagrams.
PS : float
Pixel size.
var : float
Variance of the Gaussian distribution.
pers_imager : persistence image object, optional
Persistence image object fit to training set diagrams. This oject is only required when the feature function
for test set is computed. The default is None.
training : boolean
This flag tells function if user wants to compute the feature matrix for training and or test set. The default is None.
parallel : boolean
This flag tells function if user wants to run the computation in parallel. Default is false.
Returns
-------
output : dict
Includes feature matrix, persistence image object and persistence images created (for plotting)
"""
output = {}
# number of persistence diagrams
N1 = len(PD1)
if training == True:
# adjust the image parameters and compute images
pers_imager = PersistenceImager()
pers_imager.pixel_size = PS
pers_imager.kernel_params = {'sigma': var}
PDs = PD1.tolist()
pers_imager.fit(PDs, skew=True)
if parallel == True:
pers_img = pers_imager.transform(PD1, skew=True, n_jobs=-1)
else:
pers_img = pers_imager.transform(PD1, skew=True)
else:
if parallel == True:
pers_img = pers_imager.transform(PD1, skew=True, n_jobs=-1)
else:
pers_img = pers_imager.transform(PD1, skew=True)
# generate feature matrix
feature_PI = np.zeros((N1, len(pers_img[0][:, 0])*len(pers_img[0][0, :])))
for i in range(N1):
feature_PI[i, :] = pers_img[i].flatten()
output['F_Matrix'] = feature_PI
output['pers_imager'] = pers_imager
output['pers_images'] = pers_img
return output
[docs]
def F_CCoordinates(PD, FN):
"""
This code generates feature matrix to be used in machine learning applications using Carlsson Coordinates which is composed of five different functions shown in Eq. :eq:`1st_coord` - :eq:`5th_coord`.
The first four functions are taken from Ref. :cite:`2 <Adcock2016>` and the last one is obtained from Ref. :cite:`3 <Khasawneh2018>`.
There are two inputs to the function. These are persistence diagrams and number of coordinates that user wants to use in feature matrix.
Algorithm will return feature matrix object array that includes feature matrices for different combinations, and total number of combinations will be :math:`\sum_{i=1}^{FN} {FN \choose i}`.
.. math:: f_{1}(PD) = \sum b_{i}(d_{i}-b_{i})
:label: 1st_coord
.. math:: f_{2}(PD) = \sum (d_{max}-d_{i})(d_{i}-b_{i})
:label: 2nd_coord
.. math:: f_{3}(PD) = \sum b_{i}^{2}(d_{i}-b_{i})^{4}
:label: 3rd_coord
.. math:: f_{4}(PD) = \sum (d_{max}-d_{i})^{2}(d_{i}-b_{i})^{4}
:label: 4th_coord
.. math:: f_{5}(PD) = max(d_{i}-b_{i})
:label: 5th_coord
Parameters
----------
PD : ndarray
Object array that includes all persistence diagrams.
FN : int
Number of features. It can take integer values between 1 and 5.
Returns
-------
FeatureMatrix : object array
Object array that contains the feature matrices of each feature combinations.
Each feature matrix has a size of NxFN, where N is the number of persistence diagrams and FN is the number of feature chosen.
TotalNumComb : int
Number of combinations.
CombList : list
List of combinations.
"""
N = len(PD)
# Create combinations for features with respect to user choice
Combinations = [] # define a list that includes all combinations
TotalNumComb = 0
for i in range(0, FN):
poss_comb = list(combinations(range(1, FN+1), i+1))
Combinations.append(poss_comb)
TotalNumComb = TotalNumComb+len(poss_comb)
# Generating feature matrix that includes whole features inside of it.
# Then this matrix will be used to create feature matrix for different combinations of features.
feature = np.zeros((N, 5))
for i in range(0, N):
PerDgm = PD[i]
birth_time = PerDgm[:, 0]
death_time = PerDgm[:, 1]
life_time = death_time-birth_time
max_death_time = max(death_time)
maxdtminusdt = max_death_time-death_time[:]
btsquare = np.square(birth_time)
# features
feature[i, 0] = np.sum(np.multiply(birth_time, life_time))
feature[i, 1] = np.sum(np.multiply(maxdtminusdt, life_time))
feature[i, 2] = np.sum(np.multiply(btsquare, np.power(life_time, 4)))
feature[i, 3] = np.sum(np.multiply(
np.power(maxdtminusdt, 2), np.power(life_time, 4)))
feature[i, 4] = max(life_time)
# faeture matrix will be modified depending on how many features user wants
feature = feature[:, 0:FN]
# Create a matrix that includes feature matrix for different combinations
FeatureMatrix = np.ndarray(shape=(TotalNumComb), dtype=object)
# Create a matrix that includes whole possible combinations for given number of feature
CombList = np.zeros((TotalNumComb, 5))
increment = 0
for j in range(0, FN):
# combinations with n number inside total number of features
listofCombinations = Combinations[j]
# number of elements inside of the combination
numberincombination = len(listofCombinations[0])
# number of combinations
numberofcombination = len(listofCombinations)
# create feature matrices for all combinations
for i in range(0, numberofcombination):
# create an array will take columns from feature matrix with FN=5 with respect to combination
featmat = np.zeros((N, numberincombination))
combfeat = np.array(listofCombinations[i]) # combinations
CombList[increment, 0:len(combfeat)] = combfeat
for k in range(0, numberincombination):
featmat[:, k] = feature[:, combfeat[k]-1]
# storing feature matrix for each combination in a object type array
FeatureMatrix[increment] = featmat
increment = increment + 1 # increment
return FeatureMatrix, TotalNumComb, CombList
[docs]
def F_PSignature(PL, L_Number=[]):
"""
This function takes the persistence landscape set and returns the feature matrix which is computed using path signatures :cite:`4 <Chevyrev2016,Chevyrev2020>`.
Function takes two inputs and these are persistence landcsape set in an object array and the landscape numbers that user wants to compute their signatures.
Parameters
----------
PL : ndarray
Object array that includes all landscape functions for each persistence diagram.
L_Number : list
Landscape numbers that user wants to use in feature matrix generation. If this parameter is not specified, algorithm will generate feature matrix using first landscapes.
Returns
-------
feature_PS : ndarray
Nx6 matrix that includes the features for each persistence diagram, where N is the number of persistence landscape sets.
"""
N = len(PL)
# generate feature matrix
feature_PS = np.zeros((N, 6*len(L_Number)))
if L_Number == []:
L_Number = [1]
# the loop that computes the signature for selected landscape functions
for l in range(0, len(L_Number)):
# the loop that computes the signature for each case or persistence landscape set
for i in range(0, N):
Landscape = PL[i].iloc[L_Number[l]-1].values[1]
# node points of the landscape functions
x = Landscape[:, 0]
y = Landscape[:, 1]
# define first function of path
g = Piecewise((0, t < 0), (t, t >= 0))
for j in range(0, len(x)-1):
function_name = 'f_%d_%d_case%d' % (j+1, 1, i+1)
exec(
"%s = ((y[j+1]-y[j])/(x[j+1]-x[j]))*t+(y[j+1]*(x[j+1]-x[j])+x[j+1]*(y[j]-y[j+1]))/(x[j+1]-x[j])" % (function_name))
lower_bound = x[0]
upper_bound = x[-1]
# S(1)
S_1 = upper_bound - lower_bound
# S(1,1)
S_1_1 = (np.power(upper_bound, 2)/2) + \
(np.power(lower_bound, 2)/2)-(lower_bound*upper_bound)
S_2 = 0
S_1_2 = 0
S_2_1 = 0
S_2_2 = 0
for j in range(0, len(x)-1):
f_name = 'f_%d_%d_case%d' % (j+1, 1, i+1)
exec(
'global PL_function; PL_function = Piecewise((0,t<x[j]),(0,t>x[j+1]),(%s ,True))' % (f_name))
# S(2)
Signature_2 = integrate(diff(PL_function), (t, x[j], x[j+1]))
S_2 = S_2 + float(Signature_2)
# S(1,2)
Signature_1_2 = integrate(
(t-x[0])*diff(PL_function), (t, x[j], x[j+1]))
S_1_2 = S_1_2 + float(Signature_1_2)
# S(2,1)
Signature_2_1 = integrate(
(t-x[j])*diff(PL_function), (t, x[j], x[j+1]))
S_2_1 = S_2_1 + float(Signature_2_1)
# S(2,2)
Signature_2_2 = integrate(
(t-x[j])*np.power(diff(PL_function), 2), (t, x[j], x[j+1]))
S_2_2 = S_2_2 + float(Signature_2_2)
# storing the signatures in feature matrix
feature_PS[i, 6*l] = S_1
feature_PS[i, 6*l+1] = S_2
feature_PS[i, 6*l+2] = S_1_1
feature_PS[i, 6*l+3] = S_1_2
feature_PS[i, 6*l+4] = S_2_1
feature_PS[i, 6*l+5] = S_2_2
return feature_PS
[docs]
def KernelMethod(perDgm1, perDgm2, sigma):
"""
This function computes the kernel for given two persistence diagram based on the formula provided in Ref. :cite:`5 <Reininghaus2015>`.
There are three inputs and these are two persistence diagrams and the kernel scale sigma.
Parameters
----------
perDgm1 : ndarray
Object array that includes first persistence diagram set.
perDgm2 : ndarray
Object array that includes second persistence diagram set.
sigma : float
Kernel scale.
Returns
-------
Kernel : float
The kernel value for given two persistence diagrams.
"""
L1 = len(perDgm1)
L2 = len(perDgm2)
kernel = np.zeros((L2, L1))
Kernel = 0
for i in range(0, L1):
p = perDgm1[i]
p = np.reshape(p, (2, 1))
for j in range(0, L2):
q = perDgm2[j]
q = np.reshape(q, (2, 1))
q_bar = np.zeros((2, 1))
q_bar[0] = q[1]
q_bar[1] = q[0]
dist1 = lnorm(p-q)
dist2 = lnorm(p-q_bar)
kernel[j, i] = np.exp(-(math.pow(dist1, 2))/(8*sigma)) - \
np.exp(-(math.pow(dist2, 2))/(8*sigma))
Kernel = Kernel+kernel[j, i]
Kernel = Kernel*(1/(8*pi*sigma))
return Kernel
def plot_F_Images(PI_features, num_plots=6, rows=2, cols=3, index=[], labels=[]):
"""
This function plots the persistence images given a number of plots or index
Parameters
----------
PI_features : ndarray
Object array that includes all persistence images from F_Image (output['pers_images'])
num_plots : int
Number of plots
rows : int
Number of rows in plot object
cols : int
Number of columns in plot objects
index :
Optional index for observations to plot
Returns
-------
Plot of selected images
"""
import scipy.ndimage as ndimage
import matplotlib.pyplot as plt
if num_plots != rows*cols:
raise Exception("Rows * Columns must be equal to the Number of Plots")
if index != [] and labels == []:
raise Exception("Labels for plots must be set")
pers_img = PI_features['pers_images']
fig, axs = plt.subplots(rows, cols, constrained_layout=True)
i = 0
if index == [] and labels == []:
id = np.arange(0, num_plots)
tid = ['Image ' + str(i) for i in range(num_plots)]
else:
id = index
tid = labels
for r in range(0, rows):
for c in range(0, cols):
img = ndimage.rotate(pers_img[id[i]], 90, reshape=True)
axs[r, c].imshow(img)
axs[r, c].set(xlabel='birth', ylabel='persistence',
xticks=([]), yticks=([]), title=tid[i])
i += 1
@guvectorize(["void(float64[:,:,:], float64[:,:], float64, float64[:])",], "(p,m,n), (m,n), ()->(p)", target='parallel', nopython=True)
def kernel_distance_parallel(dgms0, dgms1, sigma, result):
"""
This function computes (in parallel) the multiscale heat kernel distance for an array of persistence diagrams based on the formula provided in Ref. :cite:`5 <Reininghaus2015>`.
There are three inputs and these are two persistence diagram arrays and the kernel scale sigma. This function should be used for medium size datasets.
Parameters
----------
dgms0 : ndarray
Object array that includes first persistence diagram set.
dgms1 : ndarray
Object array that includes second persistence diagram set.
sigma : float
Kernel scale.
Returns
-------
result : np array
The kernel matrix
"""
n_train = len(dgms0)
for i in range(n_train):
dgm0 = dgms0[i]
dgm1 = dgms1
kSigma0 = 0
kSigma1 = 0
kSigma2 = 0
for k in range(dgm0.shape[0]):
p = dgm0[k, 0:2]
if np.sum(p) == -2:
continue
for l in range(dgm0.shape[0]):
q = dgm0[l, 0:2]
if np.sum(q) == -2:
continue
qc = dgm0[l, 1::-1]
pq = (p[0] - q[0])**2 + (p[1] - q[1])**2
pqc = (p[0] - qc[0])**2 + (p[1] - qc[1])**2
kSigma0 += math.exp(-(pq) / (8 * sigma)) - \
math.exp(-(pqc) / (8 * sigma))
for k in range(dgm1.shape[0]):
p = dgm1[k, 0:2]
if np.sum(p) == -2:
continue
for l in range(dgm1.shape[0]):
q = dgm1[l, 0:2]
if np.sum(q) == -2:
continue
qc = dgm1[l, 1::-1]
pq = (p[0] - q[0])**2 + (p[1] - q[1])**2
pqc = (p[0] - qc[0])**2 + (p[1] - qc[1])**2
kSigma1 += math.exp(-(pq) / (8 * sigma)) - \
math.exp(-(pqc) / (8 * sigma))
for k in range(dgm0.shape[0]):
p = dgm0[k, 0:2]
if np.sum(p) == -2:
continue
for l in range(dgm1.shape[0]):
q = dgm1[l, 0:2]
if np.sum(q) == -2:
continue
qc = dgm1[l, 1::-1]
pq = (p[0] - q[0])**2 + (p[1] - q[1])**2
pqc = (p[0] - qc[0])**2 + (p[1] - qc[1])**2
kSigma2 += math.exp(-(pq) / (8 * sigma)) - \
math.exp(-(pqc) / (8 * sigma))
kSigma0 = kSigma0/(8 * np.pi * sigma)
kSigma1 = kSigma1/(8 * np.pi * sigma)
kSigma2 = kSigma2/(8 * np.pi * sigma)
result[i] = math.sqrt(kSigma1 + kSigma0-2*kSigma2)
@guvectorize(["void(float64[:,:,:], float64[:,:], float64, float64[:])",], "(p,m,n), (m,n), ()->(p)", target='cpu', nopython=True)
def kernel_distance(dgms0, dgms1, s, result):
"""
This function computes the multiscale heat kernel distance for an array of persistence diagrams based on the formula provided in Ref. :cite:`5 <Reininghaus2015>`.
There are three inputs and these are two persistence diagram arrays and the kernel scale sigma. This function should be used for small size datasets.
Parameters
----------
dgms0 : ndarray
Object array that includes first persistence diagram set.
dgms1 : ndarray
Object array that includes second persistence diagram set.
sigma : float
Kernel scale.
Returns
-------
result : np array
The kernel matrix
"""
n_train = len(dgms0)
for i in range(n_train):
dgm0 = dgms0[i]
dgm1 = dgms1
kSigma0 = 0
kSigma1 = 0
kSigma2 = 0
sigma = s
for k in range(dgm0.shape[0]):
p = dgm0[k, 0:2]
if np.sum(p) == -2:
continue
for l in range(dgm0.shape[0]):
q = dgm0[l, 0:2]
if np.sum(q) == -2:
continue
qc = dgm0[l, 1::-1]
pq = (p[0] - q[0])**2 + (p[1] - q[1])**2
pqc = (p[0] - qc[0])**2 + (p[1] - qc[1])**2
kSigma0 += math.exp(-(pq) / (8 * sigma)) - \
math.exp(-(pqc) / (8 * sigma))
for k in range(dgm1.shape[0]):
p = dgm1[k, 0:2]
if np.sum(p) == -2:
continue
for l in range(dgm1.shape[0]):
q = dgm1[l, 0:2]
if np.sum(q) == -2:
continue
qc = dgm1[l, 1::-1]
pq = (p[0] - q[0])**2 + (p[1] - q[1])**2
pqc = (p[0] - qc[0])**2 + (p[1] - qc[1])**2
kSigma1 += math.exp(-(pq) / (8 * sigma)) - \
math.exp(-(pqc) / (8 * sigma))
for k in range(dgm0.shape[0]):
p = dgm0[k, 0:2]
if np.sum(p) == -2:
continue
for l in range(dgm1.shape[0]):
q = dgm1[l, 0:2]
if np.sum(q) == -2:
continue
qc = dgm1[l, 1::-1]
pq = (p[0] - q[0])**2 + (p[1] - q[1])**2
pqc = (p[0] - qc[0])**2 + (p[1] - qc[1])**2
kSigma2 += math.exp(-(pq) / (8 * sigma)) - \
math.exp(-(pqc) / (8 * sigma))
kSigma0 = kSigma0/(8 * np.pi * sigma)
kSigma1 = kSigma1/(8 * np.pi * sigma)
kSigma2 = kSigma2/(8 * np.pi * sigma)
result[i] = math.sqrt(kSigma1 + kSigma0-2*kSigma2)
def f_dgm(dgm, function, **keyargs):
'''
Given a persistence diagram :math:`D = (S,\mu)` and a compactly supported function in :math:`\mathbb{R}^2`, this function computes
.. math::
\\nu_{D}(f) = \sum_{x\in S} f(x)\mu(x)
:param dgm: persistent diagram, array of points in :math:`\mathbb{R}^2`.
:type dgm: Numpy array
:param function: Compactly supported function in :math:`\mathbb{R}^2`.
:type function: function
:param keyargs: Additional arguments required by `funciton`
:type keyargs: Dicctionary
:return: float -- value of :math:`\\nu_{D}(f)`.
'''
temp = function(dgm, **keyargs)
return sum(temp)
def f_ellipse(x, center=np.array([0, 0]), axis=np.array([1, 1]), rotation=np.array([[1, 0], [0, 1]])):
'''
Computes a bump function centered with an ellipsoidal domain centered ac `c`, rotaded by 'rotation' and with axis given by 'axis'. The bump function is computed using the gollowing formula
.. math::
f_{A,c} (x) = \max \\left\{ 0, 1 - (x - c)^T A (x - c)\\right\}
:param x: point to avelatuate the function :math:`f_{A,c}`
:type z: Numpy array
:param center: center of the ellipse
:type center: Numpy array
:param axis: Size f themjor an minor axis of the ellipse
:type axis: Numpy array
:param rotation: Rotation matrix for the ellipse
:type rotation: Numpy array
:return: float -- value of :math:`f_{A,c} (x)`.
'''
sigma = np.diag(np.power(axis, -2))
x_centered = np.subtract(x, center)
temp = x_centered@rotation@sigma@np.transpose(
rotation)@np.transpose(x_centered)
temp = np.diag(temp)
return np.maximum(0, 1-temp)
def adaptive_feature(list_dgms, function, **keyargs):
'''
Given a collection of persistent diagrams and a compactly supported function in :math:`\mathbb{R}^2`, computes :math:`\\nu_{D}(f)` for each diagram :math:`D` in the collection.
:param list_dgms: list of persistent diagrams
:type list_dgms: list
:param function: Compactly supported function in :math:`\mathbb{R}^2`.
:type function: function
:param keyargs: Additional arguments required by `funciton`
:type keyargs: Dicctionary
:return: Numpy array -- Array of values :math:`\\nu_{D}(f)` for each diagram :math:`D` in the collection `list_dgms`.
'''
num_diagrams = len(list_dgms)
feat = np.zeros(num_diagrams)
for i in range(num_diagrams):
feat[i] = f_dgm(list_dgms[i], function, **keyargs)
return feat
def adaptive_template_functions(train_dgms, d=25, threshold=.05):
"""
This function computes adaptive template functions as in the paper :cite:`6 <polanco2019adaptive>` from a training set and returns the ellipses needed to fit features to a training and testing dataset.
Parameters
----------
train_dgms : ndarray
Object array that includes training diagrams
d : integer
maximum number of functions.
threshold: float
cutoff for inclusion as
Returns
-------
ellipses : list
list of ellipses that are the adaptive template functions
"""
# Combine all diagrams
X_train_temp = np.vstack(train_dgms)
# return means for scoring
gmm = mixture.BayesianGaussianMixture(
n_components=d, covariance_type='full', max_iter=int(10e4)).fit(X_train_temp)
ellipses = []
for i in range(len(gmm.means_)):
L, v = np.linalg.eig(gmm.covariances_[i])
temp = {'mean': gmm.means_[i], 'std': np.sqrt(L), 'rotation': v.transpose(
), 'radius': max(np.sqrt(L)), 'entropy': gmm.weights_[i]}
temp['std'] = 3*temp['std']
ellipses.append(temp)
return ellipses
def adaptive_template_features(list_dgms, list_ellipses, function=f_ellipse):
'''
This function computes all the features for a collection of persistent diagrams given a list ot ellipses.
:param list_dgms: list of persistent diagrams
:type list_dgms: list
:param list_ellipses: List of dicctionaries. Each dicctionary represents a ellipse. It must have the following keys: `mean`, `std` and `rotation`.
:type list_ellipses: list
:param function: Compactly supported function in :math:`\mathbb{R}^2`.
:type function: function
:return: Numpy array --
'''
features = np.zeros((len(list_dgms), len(list_ellipses)))
for i in range(len(list_ellipses)):
args = {key: list_ellipses[i][key]
for key in ['mean', 'std', 'rotation']}
args['center'] = args.pop('mean')
args['axis'] = args.pop('std')
features[:, i] = adaptive_feature(list_dgms, function, **args)
return features
def reshape_to_array(diagrams):
ndgms = copy.deepcopy(diagrams)
d0 = len(ndgms)
d1 = 0
d2 = 2
for i in range(0, d0):
d1 = max(d1, len(ndgms[i]))
for i in range(0, d0):
d = len(ndgms[i])
if d == d1:
continue
ndgms[i] = list(ndgms[i])
for k in range(0, d1-d):
ndgms[i].append([-1, -1])
dgms_array = []
for i in range(0, d0):
dgms_array.append(np.array(ndgms[i]))
return np.reshape(dgms_array, (d0, d1, d2))