[docs]
def random_feature_map_model(u_obs, Dr, w=0.005, b=4.0, beta=4e-5, seed=None):
"""
Function for generating a random feature map dynamical system model based on the method presented in https://doi.org/10.1016/j.physd.2021.132911.
Args:
u_obs (array): Array of observations (D x N) D is the dimension, N is the number of training points.
Dr (int): Reservoir dimension
w (float): Random feature weight matrix distribution width parameter.
b (float): Random feature bias vector distribution parameter.
beta (float): Ridge regression regularization parameter.
seed (int): Random seed (optional)
Returns:
W_LR (array): Optimal model weights.
W_in (array): Random weight matrix.
b_in (array): Random bias vector.
"""
import numpy as np
import time
N = np.shape(u_obs)[1]
D = np.shape(u_obs)[0]
if seed:
np.random.seed(seed=seed)
else:
np.random.seed(seed=int(time.time()))
# Fix internal weights and bias
W_in = np.random.uniform(-w,w,size=(Dr,D))
b_in = np.random.uniform(-b,b,size=(Dr,1))
phi_mat = np.tanh(W_in @ u_obs[:,:-1] + b_in)
# Compute W_LR (u_obs does not include u_0)
W_LR = u_obs[:,1:]@phi_mat.T@np.linalg.inv((phi_mat@phi_mat.T+beta*np.eye(Dr)))
return W_LR, W_in, b_in
[docs]
def G_rfm(xn, W_LR, mu, auto_diff=False):
"""
Random Feature Map Model Forecast Function with TensorFlow differentiation support.
Args:
xn (array): Input state vector (D x 1).
W_LR (array): Matrix of model coefficients.
mu (list): List of internal model parameters (W_in, b_in).
auto_diff (bool): Toggle automatic differentiation for tensorflow usage with TADA.
Returns:
x_new (array): Forecasted state vector (D x 1).
"""
import numpy as np
W_in, b_in = mu
if auto_diff:
W_LR = W_LR[0]
try:
import tensorflow as tf
from tensorflow.python.ops.numpy_ops import np_config
np_config.enable_numpy_behavior()
except:
raise ImportError("TensorFlow is required for auto_diff functionality.")
x_new = tf.matmul(W_LR, tf.tanh(tf.matmul(W_in, xn) + b_in))
else:
x_new = (W_LR @ np.tanh(W_in @ xn + b_in)).reshape(-1,1)
return x_new
[docs]
def lstm_model(u_obs, units=500, p=1, epochs=50, batch_size=50):
"""
Function for generating a LSTM forecast model
Args:
u_obs (array): Array of observations (D x N) D is the dimension, N is the number of training points.
units (int): Number of LSTM units.
p (int): Number of past points to use for predicting the next point.
epochs (int): Number of LSTM training epochs.
batch_size (int): Training batch size for LSTM model.
Returns:
model (keras Sequential): Trained LSTM model.
"""
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Reshape
import numpy as np
D = u_obs.shape[0]
N = u_obs.shape[1]
# Prepare input output data for LSTM
X_train, Y_train = [], []
for i in range(N - p):
X_train.append(u_obs[:,i:i+p].T)
Y_train.append(u_obs[:,i+p].reshape(1,-1))
# Reshape for LSTM (samples, time steps, features)
X_train = np.array(X_train)
Y_train = np.array(Y_train)
X_train = X_train[..., np.newaxis]
# Build the LSTM model
model = Sequential([
LSTM(units, return_sequences=False, input_shape=(p, D)),
Dense(D),
Reshape((1, D))
])
model.compile(optimizer='adam', loss='mse')
# Train the model
model.fit(X_train, Y_train, epochs=epochs, batch_size=batch_size)
return model
[docs]
def G_lstm(xp, W_lstm, mu, auto_diff=False):
"""
LSTM Forecast Function with TensorFlow differentiation support.
Args:
xp (np.ndarray or tf.Tensor): Input state vector of shape (D, p)
W_lstm (list): Model weight arrays to set for the LSTM model (not used here but required for compatibility with other models)
mu (list): List containing internal model parameters (lstm_units, p, model)
auto_diff (bool): Enable automatic differentiation functionality
Returns:
x_new: Forecasted state vector of shape (D, 1)
"""
import tensorflow as tf
import numpy as np
_, _, model = mu
# Ensure input is a tf.Tensor of float32
if not isinstance(xp, tf.Tensor):
xp = tf.convert_to_tensor(xp, dtype=tf.float32)
else:
xp = tf.cast(xp, dtype=tf.float32)
# Reshape: from (D, p) to (1, p, D)
xp_reshaped = tf.transpose(xp) # (p, D)
xp_reshaped = tf.expand_dims(xp_reshaped, axis=0) # (1, p, D)
if auto_diff:
# Differentiable model call
x_new = model(xp_reshaped, training=True) # (1, D)
else:
# Non-differentiable prediction
x_new = model(xp_reshaped) # (1, D)
x_new = tf.squeeze(x_new, axis=0) # (D,) if shape was (1, D)
x_new = tf.reshape(x_new, (-1, 1)) # (D, 1)
x_new = tf.cast(x_new, dtype=tf.float64)
return x_new
[docs]
def get_forecast(Xp, W, mu, forecast_len, G=G_rfm, auto_diff=False):
"""
Function for computing a forecast from a given forecast model.
Args:
Xp (array): Starting point(s) for the forecast (D x p) vector.
W (array/object): Model weights or model object for making predictions.
mu (list): List of internal model parameters (model specific).
forecast_len (int): Number of points to forecast into the future.
G (function): Forecast function. Defaults to random feature map (G_rfm).
auto_diff (bool): Toggle automatic differentiation for tensorflow usage with TADA.
Returns:
forecast (array): Array of forecasted states.
"""
import numpy as np
D = np.shape(Xp)[0]
p = np.shape(Xp)[1]
if not auto_diff:
x_next = G(Xp, W, mu)
forecast = Xp[:,-p:].reshape(D,p)
for i in range(1, forecast_len):
x_next = G(forecast[:,-p:].reshape(D,p), W, mu)
forecast = np.hstack((forecast, x_next))
else:
try:
import tensorflow as tf
from tensorflow.python.ops.numpy_ops import np_config
np_config.enable_numpy_behavior()
except ImportError:
raise ImportError("TensorFlow is required for auto_diff functionality.")
forecast = tf.reshape(tf.convert_to_tensor(Xp[:,-p:], dtype=tf.float64), (D,p))
for i in range(forecast_len):
x_next = G(forecast[:,-p:].reshape(D,p), W, mu, auto_diff=True)
forecast = tf.concat([forecast, x_next], axis=1)
return forecast[:,p-1:]
[docs]
def forecast_time(X_model_a, X_truth, dt=1.0, lambda_max=1.0, threshold=0.05):
'''
Function to compute the forecast time using the relative forecast error to compare predictions to measurements with a threshold.
Args:
X_model_a (array): Array of forecast points
X_truth (array): Array of measurements (ground truth)
dt (float): Time step size (defaults to 1 to return number of points)
lambda_max (float): Maximum lyapunov exponent (defaults to 1 to return number of points)
threshold (float): Threshold to use for comparing forecast and measurements.
Returns:
(float): Forecast time for the given threshold.
'''
import numpy as np
for i in range(1,X_model_a.shape[1]):
error = np.divide(np.linalg.norm(X_model_a[:,0:i]-X_truth[:,0:i], axis=1)**2,np.linalg.norm(X_truth[:,0:i], axis=1)**2)
if np.max(error) > threshold:
fc_time = i*dt*lambda_max
return fc_time
raise Warning("Longer forecast required to measure forecast time.")
if __name__ == "__main__":
import numpy as np
from matplotlib import rc
import matplotlib.pyplot as plt
from matplotlib.gridspec import GridSpec
from teaspoon.MakeData.DynSysLib.autonomous_dissipative_flows import lorenz
from teaspoon.DAF.forecasting import random_feature_map_model
from teaspoon.DAF.forecasting import get_forecast
# Set font
rc('font', **{'family': 'sans-serif', 'sans-serif': ['Helvetica']})
rc('text', usetex=True)
plt.rc('text', usetex=True)
plt.rc('font', family='serif')
plt.rcParams.update({'font.size': 16})
# Set model parameters
Dr=300
train_len = 4000
forecast_len = 2000
r_seed = 48824
np.random.seed(r_seed)
# Get training and tesing data at random initial condition
ICs = list(np.random.normal(size=(3,1)).reshape(-1,))
t, ts = lorenz(L=500, fs=50, SampleSize=6001, parameters=[28,10.0,8.0/3.0],InitialConditions=ICs)
ts = np.array(ts)
# Add noise to signals
noise = np.random.normal(scale=0.01, size=np.shape(ts[:,0:train_len+forecast_len]))
u_obs = ts[:,0:train_len+forecast_len] + noise
# Train model
W_LR, W_in, b_in = random_feature_map_model(u_obs[:,0:train_len],Dr, seed=r_seed)
# Generate forecast
forecast_len = 500
X_model= get_forecast(u_obs[:,train_len].reshape(-1,1), W_LR, mu=(W_in, b_in),forecast_len=forecast_len)
X_meas = u_obs[:,train_len:train_len+forecast_len]
# Plot measurements and forecast
fig = plt.figure(figsize=(15, 5))
gs = GridSpec(1, 3)
ax1 = fig.add_subplot(gs[0, 0])
ax1.plot(X_model[0,:],'r', label="Forecast")
ax1.plot(X_meas[0,:], '.b', label="Measurement")
ax1.plot([],[])
ax1.set_title('x', fontsize='x-large')
ax1.tick_params(axis='both', which='major', labelsize='x-large')
ax1.set_ylim((-30,30))
ax2 = fig.add_subplot(gs[0, 1])
ax2.plot(X_model[1,:],'r', label="Forecast")
ax2.plot(X_meas[1,:], '.b', label="Measurement")
ax2.plot([],[])
ax2.set_title('x', fontsize='x-large')
ax2.tick_params(axis='both', which='major', labelsize='x-large')
ax2.set_ylim((-30,30))
ax3 = fig.add_subplot(gs[0, 2])
ax3.plot(X_model[2,:],'r', label="Forecast")
ax3.plot(X_meas[2,:], '.b', label="Measurement")
ax3.plot([],[])
ax3.legend(fontsize='large', loc='upper left')
ax3.set_title('x', fontsize='x-large')
ax3.tick_params(axis='both', which='major', labelsize='x-large')
ax3.set_ylim((0,60))
plt.tight_layout()
plt.show()