Detecting Change in Time Series Data

abdulrehman2050

Abdul Rehman

Posted on January 27, 2023

Detecting Change in Time Series Data

There are several ways to detect a major change in time series data or in a 1D dataset, some of the most common methods are:

Statistical Tests:

You can use statistical tests, such as the CUSUM test or the Page-Hinkley test, to detect changes in the mean or variance of the data.

Change Point Detection:

You can use change point detection algorithms, such as the Binary Segmentation or the Bayesian Change Point Detection, to identify the point at which the data distribution changes.

Machine Learning:

You can use machine learning methods, such as anomaly detection or novelty detection, to identify patterns or behaviors that deviate from the norm.

Visual Inspection:

You can also visually inspect the data, by plotting it and looking for sudden changes in the trend or pattern.

Point to remember

It is important to note that the best method to detect major changes in 1D dataset depends on the characteristics of the data and the specific requirements of the task.

It is also important to note that the 1D data might not have any meaningful change to detect, for example, if the data is random noise or constant data.

Python Implementations

Let's check out few python implementations of the change detection in the time series data.

Statistical Methods

First of all let's check simple methods which are statistical approach

The CUSUM Test Python code

The CUSUM test, or the Cumulative Sum test, is a statistical test used to detect a change in the mean of a time series data. Here is an example of how to implement the CUSUM test in Python on a time series dataset:

import numpy as np

# Create sample time series data
data = np.random.normal(0, 1, 100)

# Insert a change in the mean at index 50
data[50:] += 3

# Define the threshold for the test
threshold = 2

# Initialize the cumulative sum and the test statistics
cusum = 0
test_statistics = []

# Iterate over the data
for i in range(len(data)):
    # Update the cumulative sum
    cusum += data[i] - 0  # assuming the change is from 0 to a positive value
    test_statistics.append(cusum)
    # Check if the cumulative sum exceeds the threshold
    if cusum > threshold:
        print("Change detected at index", i)
        break
Enter fullscreen mode Exit fullscreen mode

In this example, the CUSUM test is used to detect a change in the mean of the time series data from 0 to 3 at index 50. The threshold for the test is set to 2 and the cumulative sum is initialized to 0. The test statistics are calculated by iterating over the data and updating the cumulative sum at each step. If the cumulative sum exceeds the threshold, the change is detected and the index at which it occurred is printed.

It is important to note that the CUSUM test assumes that the change is from a known value to another, in this case, the change is assumed to be from 0 to a positive value.
Also, the threshold value should be set based on the specific requirements and the characteristics of the data.

Page-Hinklay Test

The Page-Hinkley test, also known as the Page test, is a statistical test used to detect a change in the mean of a time series data. Here is an example of how to implement the Page-Hinkley test in Python on a time series dataset:


import numpy as np

# Create sample time series data
data = np.random.normal(0, 1, 100)

# Insert a change in the mean at index 50
data[50:] += 3

# Define the parameters for the test
alpha = 0.01  # significance level
lambda_ = 0.1  # decay rate
delta = 3  # change point

# Initialize the test statistics
cumulative_sum = 0
test_statistics = []

# Iterate over the data
for i in range(len(data)):
    # Update the cumulative sum
    cumulative_sum += data[i] - delta
    test_statistics.append(cumulative_sum)
    # Update the test statistics
    if cumulative_sum > 0:
        cumulative_sum = cumulative_sum * lambda_
    else:
        cumulative_sum = 0
    # Check if the test statistics exceed the threshold
    if cumulative_sum > -np.log(alpha):
        print("Change detected at index", i)
        break
Enter fullscreen mode Exit fullscreen mode

In this example, the Page-Hinkley test is used to detect a change in the mean of the time series data from 0 to 3 at index 50. The significance level is set to 0.01 and the decay rate is set to 0.1. The cumulative sum is initialized to 0. The test statistics are calculated by iterating over the data, updating the cumulative sum and decay it at each step. If the test statistics exceed the threshold, the change is detected and the index at which it occurred is printed.

The Page-Hinkley test is similar to the CUSUM test, but it has the ability to adapt to the variance of the data, which means it is more robust to false alarms. The parameters of the test should be set based on the specific requirements and the characteristics of the data, especially the significance level and the decay rate.

Bayesian Change Point Detection

There are several libraries and packages available in Python for Bayesian Change Point Detection. One popular library is the pystruct library, which provides a simple and efficient implementation of Bayesian Change Point Detection. Here's an example of how you can use it to detect change points in a time series data:

from pystruct.models import ChainCRF
from pystruct.learners import OneSlackSSVM
from pystruct.datasets import load_letters

# load the data
X, y, _, _ = load_letters()

# define the model
model = ChainCRF()

# define the learner
learner = OneSlackSSVM(model=model, C=0.1, max_iter=100)

# fit the model to the data
learner.fit(X, y)

# predict change points
change_points = learner.predict(X)
Enter fullscreen mode Exit fullscreen mode

Another package for Bayesian Change Point Detection is changepoint package. It has the functionality for detecting change points in a variety of different statistical models.

from changepoint import changepoint

# Define data
data = [1,2,3,4,5,6,7,8,9,10,11,12,13,14,15]

# Detect changepoints
changepoints = changepoint.detect(data, method='PELT')
Enter fullscreen mode Exit fullscreen mode

In both examples above, the libraries are general purpose, It's up to you to define your own data and the specific model you want to use.

Anomaly Detection

Anomaly detection in time series analysis involves identifying unusual or unexpected patterns in the data that deviate significantly from the norm. There are several approaches to anomaly detection for time series data, including:

Statistical methods: These methods involve comparing the current data point to a statistical model of the normal behavior of the time series. For example, you can use a moving average or a Gaussian distribution to represent the normal behavior and then identify data points that fall outside of this model as anomalies.

Machine learning methods: These methods involve training a model on the normal behavior of the time series and then using this model to identify data points that deviate significantly from the norm. Popular machine learning models for anomaly detection include neural networks, decision trees, and clustering algorithms.

Time series decomposition: This method involves breaking down the time series into its constituent parts, such as trend, seasonal, and residual components, and then identifying anomalies in the residual component.

Spectral analysis: This method involves analyzing the frequency components of the time series data and identifying anomalies in the frequency domain.

There are also libraries in Python that can be used for anomaly detection such as anomalydetection package, pyculiarity package, and tsfresh package.

# Using anomalydetection package
from anomalydetection import detect_anoms

#define data
data = [1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30]

#detect anomalies
result = detect_anoms(data, k=2, alpha=0.05, direction='both')

Enter fullscreen mode Exit fullscreen mode

and similarly by using pyculiarity package

from pyculiarity import detect_ts

#define data
data = [1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30]

#detect anomalies
result = detect_ts(data)
Enter fullscreen mode Exit fullscreen mode

Gaussian Distribution for Anomaly Detection

Here's an example of how you can use a Gaussian distribution to detect anomalies in time series data using Python:


import numpy as np
from scipy.stats import norm

# Define the data
data = [1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30]

# Compute the mean and standard deviation of the data
mean = np.mean(data)
std = np.std(data)

# Define the threshold for identifying anomalies
threshold = 3

# Define the Gaussian distribution
gaussian = norm(mean, std)

# Identify the anomalies
anomalies = []
for i in range(len(data)):
    if abs(gaussian.pdf(data[i]) - gaussian.pdf(mean)) > threshold:
        anomalies.append(i)

print("Anomalies:", anomalies)
Enter fullscreen mode Exit fullscreen mode

This code first computes the mean and standard deviation of the time series data. Then, it defines a Gaussian distribution using these values. After that, it uses the abs(gaussian.pdf(data[i]) - gaussian.pdf(mean)) to compare the probability density function of each data point with the mean. The threshold is set to 3, if the difference is greater than the threshold value, it's considered as an anomaly and stored in the list.

You can adjust the threshold value to increase or decrease the sensitivity of the anomaly detection. Keep in mind that this is a simple example and a more robust implementation may need more advanced techniques such as using rolling window, dynamic threshold calculation and so on.

Moving Average for Anomaly Detection

Here's an example of how you can use a moving average to detect anomalies in real-time data using Python:

import numpy as np

# Define the data
data = [1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30]

# Define the window size for the moving average
window_size = 5

# Compute the moving average of the data
moving_average = np.convolve(data, np.ones(window_size)/window_size, mode='valid')

# Define the threshold for identifying anomalies
threshold = 0.5

# Identify the anomalies
anomalies = []
for i in range(len(data) - window_size + 1):
    if abs(data[i+window_size-1] - moving_average[i]) > threshold:
        anomalies.append(i)

print("Anomalies:", anomalies)
Enter fullscreen mode Exit fullscreen mode

This code first defines the window size for the moving average, in this case, it is 5. Then, it uses np.convolve method to compute the moving average of the data. The mode='valid' is used to ignore the data points that fall outside the window range. After that, it uses the abs(data[i+window_size-1] - moving_average[i]) to compare the each data point with the moving average. The threshold is set to 0.5, if the difference is greater than the threshold value, it's considered as an anomaly and stored in the list.

You can adjust the window size and threshold value to increase or decrease the sensitivity of the anomaly detection. Keep in mind that this is a simple example and a more robust implementation may need more advanced techniques such as using dynamic threshold calculation, data normalization and so on.

Novelty Detection using OCSVM

Novelty detection in time series data involves identifying new patterns or behaviors that deviate significantly from the previously observed data. One popular method for novelty detection in time series data is the One-Class Support Vector Machine (OCSVM) algorithm. Here's an example of how you can use the OCSVM algorithm to detect novelties in time series data using Python:

import numpy as np
from sklearn.svm import OneClassSVM

# Define the data
data = [1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30]

# Fit the OCSVM model to the data
model = OneClassSVM(nu=0.1, kernel="rbf", gamma=0.1)
model.fit(data)

# Identify the novelties
novelties = model.predict(data)

# Print the index of the novelties
for i in range(len(novelties)):
    if novelties[i] == -1:
        print("Novelty detected at index:", i)

Enter fullscreen mode Exit fullscreen mode

This code first fits the OneClassSVM model to the time series data using the radial basis function (RBF) kernel. Then, it uses the predict() method to identify the novelties in the data. The nu parameter represents an upper bound on the fraction of training errors and a lower bound of the fraction of support vectors. The gamma parameter is used to define the width of the RBF kernel. A smaller value will result in a wider kernel, a larger value will result in a narrower kernel. The output will be the index of the novelty data point, if any.

The OCSVM algorithm is sensitive to the parameters, so you may need to experiment with different parameter values to achieve optimal results. Additionally, this is a simple example and a more robust implementation may need more advanced techniques such as using rolling window, dynamic threshold calculation and so on

💖 💪 🙅 🚩
abdulrehman2050
Abdul Rehman

Posted on January 27, 2023

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related