canari.skf#

Switching Kalman Filter (SKF) for detecting regime changes in time series data. It takes as inputs two instances of model, one model is used to model a normal regime, the other is used to model an abnormal one. At each time step, SKF estimates the probability of each model.

On time series data, this model can:

  • Train its Bayesian LSTM network component from the normal model.

  • Detect regime changes (anomalies) and provide probabilities of regime switch.

  • Decompose orginal time serires data into unobserved hidden states. Provide mean values and the associate uncertainties for these hidden states.

class canari.skf.SKF(norm_model: Model, abnorm_model: Model, std_transition_error: float | None = 0.0, norm_to_abnorm_prob: float | None = 0.0001, abnorm_to_norm_prob: float | None = 0.1, norm_model_prior_prob: float | None = 0.99, conditional_likelihood: bool | None = False)[source]#

Bases: object

SKF class for Switching Kalman Filter.

Parameters:
  • norm_model (Model) – Model representing normal behavior.

  • abnorm_model (Model) – Model representing abnormal behavior.

  • std_transition_error (float) – Std deviation of transition error.

  • norm_to_abnorm_prob (float) – Transition probability from normal to abnormal.

  • abnorm_to_norm_prob (float) – Transition probability from abnormal to normal.

  • norm_model_prior_prob (float) – Prior probability of the normal model.

  • conditional_likelihood (bool) – Whether to use conditional log-likelihood. Defaults to False.

Examples

>>> from canari.component import LocalTrend, LocalAcceleration, LstmNetwork, WhiteNoise
>>> from canari import Model, SKF
>>> # Components
>>> local_trend = LocalTrend()
>>> local_acceleration = LocalAcceleration()
>>> lstm_network = LstmNetwork(
        look_back_len=10,
        num_features=2,
        num_layer=1,
        num_hidden_unit=50,
        device="cpu",
        manual_seed=1,
    )
>>> residual = WhiteNoise(std_error=0.05)
>>> # Define model
>>> normal_model = Model(local_trend, lstm_network, residual)
>>> abnormal_model = Model(local_acceleration, lstm_network, residual)
>>> skf = SKF(
        norm_model=normal_model,
        abnorm_model=abnormal_model,
        std_transition_error=1e-4,
        norm_to_abnorm_prob=1e-4,
    )
model#

Dictionary containing 4 instances of model, i.e., 4 transition models including ‘norm_norm’: transition from normal to normal; ‘norm_abnorm’: transition from normal to abnormal; ‘abnorm_norm’: transition from abnormal to normal; ‘abnorm_abnorm’: transition from abnormal to abnormal;

Type:

dict

num_states#

Number of hidden states.

Type:

int

states_names#

Names of hidden states.

Type:

list[str]

mu_states_init#

Mean vector for the hidden states \(X_0\) at the time step t=0.

Type:

np.ndarray

var_states_init#

Covariance matrix for the hidden states \(X_0\) at the time step t=0.

Type:

np.ndarray

mu_states_prior#

Prior mean vector for the marginal hidden states \(X_{t+1|t}\) at the time step t+1.

Type:

np.ndarray

var_states_prior#

Prior covariance matrix for the marginal hidden states \(X_{t+1|t}\) at the time step t+1.

Type:

np.ndarray

mu_states_posterior#

Posteriror mean vector for the marginal hidden states \(X_{t+1|t+1}\) at the time step t+1.

Type:

np.ndarray

var_states_posterior#

Posteriror covariance matrix for the marginal hidden states \(X_{t+1|t+1}\) at the time step t+1.

Type:

np.ndarray

states#

Container for storing prior, posterior, and smoothed values of marginal hidden states over time.

Type:

StatesHistory

transition_prob#

Transition probability matrix: ‘norm_norm’, ‘norm_abnorm’, ‘abnorm_norm’, ‘abnorm_abnorm’.

Type:

dict

marginal_prob#

Current marginal probability for ‘normal’ and ‘abnormal’ at time t.

Type:

dict

filter_marginal_prob_history#

Filter marginal probability history for ‘normal’ and ‘abnormal’ over time.

Type:

dict

smooth_marginal_prob_history#

Smoother marginal probability history for ‘normal’ and ‘abnormal’ over time.

Type:

dict

norm_to_abnorm_prob#

Transition probability from normal to abnormal.

Type:

float

abnorm_to_norm_prob#

Transition probability from abnormal to normal.

Type:

float

norm_model_prior_prob#

Prior probability of the normal model.

Type:

float

conditional_likelihood#

Whether to use conditional log-likelihood. Defaults to False.

Type:

bool

# LSTM-related attributes

only being used when a LstmNetwork component is found.

lstm_net#

It is a pytagi.Sequential instance. LSTM neural network linked from ‘norm_norm’ of model, if LSTM component presents.

Type:

pytagi.Sequential

lstm_output_history#

Container for saving a rolling history of LSTM output over a fixed look-back window. Linked from ‘norm_norm’ of model, if LSTM component presents.

Type:

LstmOutputHistory

# Early stopping attributes

only being used when training a LstmNetwork component.

early_stop_metric#

Best value of the metric being monitored.

Type:

float

early_stop_metric_history#

Logged history of metric values across epochs.

Type:

List[float]

optimal_epoch#

Epoch at which the monitored metric was best.

Type:

int

stop_training#

Flag indicating whether training has been stopped due to early stopping or reaching the maximum number of epochs.

Type:

bool

auto_initialize_baseline_states(y: ndarray)[source]#

Automatically assign initial means and variances for baseline hidden states (level, trend, and acceleration) from input data using time series decomposition defined in decompose_data().

Parameters:

data (np.ndarray) – Time series data.

Examples

>>> skf.auto_initialize_baseline_states(train_set["y"][0:23])
backward(obs: float) Tuple[ndarray, ndarray][source]#

Update step for Swithching Kalman filter. Recall backward() for all transition models in model.

This function is used at the one-time-step level.

Parameters:

obs (float) – Observation at the current time step.

Returns:

Posterior state estimates.

Return type:

Tuple(mu_states_posterior, var_states_posterior)

detect_synthetic_anomaly(data: Dict[str, ndarray], threshold: float | None = 0.5, max_timestep_to_detect: int | None = None, num_anomaly: int | None = None, slope_anomaly: float | None = None, anomaly_start: float | None = 0.33, anomaly_end: float | None = 0.66) Tuple[float, float][source]#

Add synthetic anomalies to orginal data, use Switching Kalman filter to detect those synthetic anomalies, and compute the detection/false-alarm rates.

Parameters:
  • data (Dict[str, np.ndarray]) – Original time series data.

  • threshold (float) – Threshold for the maximal target anomaly detection rate. Defauls to 0.5.

  • max_timestep_to_detect (int) – Maximum number of timesteps to allow detection. Defauls to None (to the end of time series).

  • num_anomaly (int) – Number of synthetic anomalies to add. This will create as many time series, because one time series contains only one anomaly.

  • slope_anomaly (float) – Magnitude of the anomaly slope.

  • anomaly_start (float) – Fractional start position of anomaly.

  • anomaly_end (float) – Fractional end position of anomaly.

Returns:

detection_rate (float): # time series where anomalies detected / # total synthetic time series with anomalies added. false_rate (float): # time series where anomalies NOT detected / # total synthetic time series with anomalies added. false_alarm_train (str): ‘Yes’ if any alarm during training data.

Return type:

Tuple(detection_rate, false_rate, false_alarm_train)

early_stopping(evaluate_metric: float, current_epoch: int, max_epoch: int, mode: str | None = 'min', patience: int | None = 20, skip_epoch: int | None = 5) Tuple[bool, int, float, list][source]#

Apply early stopping based on a monitored metric when training a LSTM neural network.

Recalling early_stopping() for ‘norm_norm’ in model.

Parameters:
  • current_epoch (int) – Current epoch

  • max_epoch (int) – Maximum number of epoch

  • evaluate_metric (float) – Current metric value for this epoch.

  • mode (Optional[str]) – Direction for early stopping: ‘min’ (default).

  • patience (Optional[int]) – Number of epochs without improvement before stopping. Defaults to 20.

  • skip_epoch (Optional[int]) – Number of initial epochs to ignore when looking for improvements. Defaults to 5.

Returns:

  • stop_training: True if training stops.

  • optimal_epoch: Epoch index of when having best metric.

  • early_stop_metric: Best evaluate_metric. .

  • early_stop_metric_history: History of evaluate_metric at all epochs.

Return type:

Tuple[bool, int, float, List[float]]

Examples

>>> skf.early_stopping(evaluate_metric=mse, current_epoch=1, max_epoch=50)
filter(data: Dict[str, ndarray]) Tuple[ndarray, StatesHistory][source]#

Run the Kalman filter over an entire dataset.

This function is used at the entire-dataset-level. Recall repeatedly the function forward() and backward() at one-time-step level from SKF.

Parameters:

data (Dict[str, np.ndarray]) – Includes ‘x’ and ‘y’.

Returns:

A tuple containing:

  • filter_marginal_prob_abnorm (np.ndarray):

    A history of filtering marginal probability for the abnorm model.

  • states:

    The history of marginal hidden states over time.

Return type:

Tuple[np.ndarray, StatesHistory]

Examples

>>> anomaly_prob, states = skf.filter(data=train_set)
forward(obs: float, input_covariates: ndarray | None = None) Tuple[ndarray, ndarray][source]#

Prediction step in the Switching Kalman filter. This makes a one-step-ahead prediction. It is a mixture prediction from all transition models in model.

Recall forward() for all transition models.

This function is used at the one-time-step level.

Parameters:
  • obs (float) – Current observation.

  • input_covariates (Optional[np.ndarray]) – Input covariates for LSTM at time t.

Returns:

A tuple containing:

  • mu_obs_predict (np.ndarray):

    The predictive mean of the observation at t+1.

  • var_obs_predict (np.ndarray):

    The predictive variance of the observation at t+1.

Return type:

Tuple[np.ndarray, np.ndarray]

get_dict() dict[source]#

Export an SKF object into a dictionary.

Returns:

Serializable model dictionary containing neccessary attributes.

Return type:

dict

Examples

>>> saved_dict = skf.get_dict()
initialize_states_history()[source]#

Reinitialize prior, posterior, and smoothed values for marginal hidden states in states with empty lists, as well as for all transition models in model.

static load_dict(save_dict: dict)[source]#

Reconstruct an SKF instance from a saved serialized dictionary.

Parameters:

save_dict (dict) – Dictionary produced by get_dict().

Returns:

A new SKF object with loaded parameters and states.

Return type:

SKF

Examples

>>> saved_dict = skf.get_dict()
>>> loaded_skf = SKF.load_dict(saved_dict)
load_initial_states()[source]#

Restore saved initial states into the transition model ‘norm_norm’ stored in model.

lstm_train(train_data: Dict[str, ndarray], validation_data: Dict[str, ndarray], white_noise_decay: bool | None = True, white_noise_max_std: float | None = 5, white_noise_decay_factor: float | None = 0.9) Tuple[ndarray, ndarray, StatesHistory][source]#

Train the LstmNetwork component on the provided training set, then evaluate on the validation set.

Recalling lstm_train() for ‘norm_norm’ in model

Parameters:
  • train_data (Dict[str, np.ndarray]) – Dictionary with keys ‘x’ and ‘y’ for training inputs and targets.

  • validation_data (Dict[str, np.ndarray]) – Dictionary with keys ‘x’ and ‘y’ for validation inputs and targets.

  • white_noise_decay (bool, optional) – If True, apply an exponential decay on the white noise standard deviation over epochs, if a white noise component exists. Defaults to True.

  • white_noise_max_std (float, optional) – Upper bound on the white-noise standard deviation when decaying. Defaults to 5.

  • white_noise_decay_factor (float, optional) – Multiplicative decay factor applied to the white‐noise standard deviation each epoch. Defaults to 0.9.

Returns:

A tuple containing:

  • mu_obs_preds (np.ndarray):

    The means for multi-step-ahead predictions for the validation set.

  • std_obs_preds (np.ndarray):

    The standard deviations for multi-step-ahead predictions for the validation set.

  • states:

    The history of hidden states over time.

Return type:

Tuple[np.ndarray, np.ndarray, StatesHistory]

Examples

>>> mu_preds_val, std_preds_val, states = skf.lstm_train(train_data=train_set,validation_data=val_set)
rts_smoother(time_step: int, matrix_inversion_tol: float | None = 0.001, tol_type: str | None = 'relative')[source]#

Smoother for the Switching Kalman filter at a given time step.

Recall rts_smoother() for all transition models in model.

This function is used at the one-time-step level.

Parameters:
  • time_step (int) – Index at which to perform smoothing.

  • matrix_inversion_tol (float) – Numerical stability threshold for matrix pseudoinversion (pinv). Defaults to 1E-4.

Returns:

None

save_initial_states()[source]#

Save initial SKF hidden states (mean/variance) for reuse in subsequent runs.

Set mu_states_init and var_states_init using the mu_states and var_states from the transition model ‘norm_norm’ stored in model.

set_memory(states: StatesHistory, time_step: int)[source]#

Apply set_memory() for the transition model ‘norm_norm’ stored in model. If time_step=0, reset marginal_prob using norm_model_prior_prob.

Parameters:
  • states (StatesHistory) – Full history of hidden states over time.

  • time_step (int) – Index of timestep to restore.

Examples

>>> # If the next analysis starts from the beginning of the time series
>>> skf.set_memory(states=skf.states, time_step=0))
>>> # If the next analysis starts from t = 200
>>> skf.set_memory(states=skf.states, time_step=200))
set_states()[source]#

Set ‘mu_states’ and ‘var_states’ for each transition models in model using their posterior.

smoother(matrix_inversion_tol: float | None = 0.0001, tol_type: str | None = 'relative') Tuple[ndarray, StatesHistory][source]#

Run the Kalman smoother over an entire time series data.

This function is used at the entire-dataset-level. Recall repeatedly the function rts_smoother() at one-time-step level from SKF.

Parameters:

matrix_inversion_tol (float) – Numerical stability threshold for matrix pseudoinversion (pinv). Defaults to 1E-4.

Returns:

A tuple containing:

  • smooth_marginal_prob_abnorm (np.ndarray):

    A history of smoother marginal probability for the abnorm model.

  • states:

    The history of marginal hidden states over time.

Return type:

Tuple[np.ndarray, StatesHistory]