Chapter 7: Particle Filtering and Advanced Filtering Techniques

Haiyue
9min

Chapter 7: Particle Filtering and Advanced Filtering Techniques

Learning Objectives
  • Understand the basic ideas and algorithm flow of particle filtering
  • Master resampling techniques and the particle degeneracy problem
  • Learn about other advanced filtering methods (Ensemble Kalman Filter, etc.)

Basic Principles of Particle Filtering

Particle Filtering (PF) is a non-parametric Bayesian filtering technique based on Monte Carlo methods, suitable for highly nonlinear and non-Gaussian systems.

Core Ideas

Particle Filtering Philosophy

Use a large number of random samples (particles) to approximate the posterior probability distribution, where each particle represents a possible state in the state space.

Probability Distribution Approximation: p(xkz1:k)i=1Nwk(i)δ(xkxk(i))p(x_k|z_{1:k}) \approx \sum_{i=1}^N w_k^{(i)} \delta(x_k - x_k^{(i)})

Where:

  • xk(i)x_k^{(i)}: State of the i-th particle
  • wk(i)w_k^{(i)}: Weight of the i-th particle
  • δ()\delta(\cdot): Dirac delta function
  • NN: Number of particles

Algorithm Flow

🔄 正在渲染 Mermaid 图表...

Sequential Importance Sampling (SIS)

Importance Sampling Principle

Basic Idea: Sample from an easy-to-sample distribution q(x)q(x), using weights to compensate for distribution differences.

For target distribution p(x)p(x): Ep[f(x)]=f(x)p(x)dx=f(x)p(x)q(x)q(x)dx1Ni=1Nw(i)f(x(i))E_p[f(x)] = \int f(x)p(x)dx = \int f(x)\frac{p(x)}{q(x)}q(x)dx \approx \frac{1}{N}\sum_{i=1}^N w^{(i)}f(x^{(i)})

where the weight is: w(i)=p(x(i))q(x(i))w^{(i)} = \frac{p(x^{(i)})}{q(x^{(i)})}

Sequential Importance Sampling Algorithm

Recursive Weight Update: wk(i)=wk1(i)p(zkxk(i))p(xk(i)xk1(i))q(xk(i)xk1(i),zk)w_k^{(i)} = w_{k-1}^{(i)} \frac{p(z_k|x_k^{(i)})p(x_k^{(i)}|x_{k-1}^{(i)})}{q(x_k^{(i)}|x_{k-1}^{(i)}, z_k)}

State Estimation: x^k=i=1Nwk(i)xk(i)\hat{x}_k = \sum_{i=1}^N w_k^{(i)} x_k^{(i)}

Particle Filter Implementation

import numpy as np
from scipy.stats import multivariate_normal
import matplotlib.pyplot as plt
from typing import Callable, Optional

class ParticleFilter:
    """
    Particle filter implementation
    """

    def __init__(self, num_particles: int, dim_x: int, dim_z: int,
                 f: Callable, h: Callable,
                 initial_state_sampler: Callable):
        """
        Initialize particle filter

        Parameters:
        num_particles: Number of particles
        dim_x: State dimension
        dim_z: Observation dimension
        f: State transition function
        h: Observation function
        initial_state_sampler: Initial state sampling function
        """
        self.N = num_particles
        self.dim_x = dim_x
        self.dim_z = dim_z
        self.f = f  # State transition function
        self.h = h  # Observation function

        # Particles and weights
        self.particles = np.zeros((num_particles, dim_x))
        self.weights = np.ones(num_particles) / num_particles

        # Noise covariances
        self.Q = np.eye(dim_x)  # Process noise
        self.R = np.eye(dim_z)  # Observation noise

        # Initialize particles
        for i in range(num_particles):
            self.particles[i] = initial_state_sampler()

        # Resampling parameters
        self.resample_threshold = 0.5  # Effective sample ratio threshold

    def predict(self, u: Optional[np.ndarray] = None):
        """
        Prediction step: propagate particles according to state transition model
        """
        for i in range(self.N):
            # State transition + process noise
            self.particles[i] = self.f(self.particles[i], u) + \
                              np.random.multivariate_normal(np.zeros(self.dim_x), self.Q)

    def update(self, z: np.ndarray):
        """
        Update step: update particle weights according to observations
        """
        # Compute likelihood for each particle
        for i in range(self.N):
            # Predicted observation
            z_pred = self.h(self.particles[i])

            # Compute likelihood p(z|x_i)
            likelihood = multivariate_normal.pdf(z, z_pred, self.R)

            # Update weight
            self.weights[i] *= likelihood

        # Normalize weights
        self.weights += 1e-300  # Avoid division by zero
        self.weights /= np.sum(self.weights)

    def resample(self):
        """
        Resampling: solve particle degeneracy problem
        """
        # Compute effective sample size
        N_eff = 1.0 / np.sum(self.weights**2)

        if N_eff < self.resample_threshold * self.N:
            # Systematic resampling
            indices = self._systematic_resample()

            # Update particles and weights
            self.particles = self.particles[indices]
            self.weights = np.ones(self.N) / self.N

    def _systematic_resample(self):
        """
        Systematic resampling algorithm
        """
        # Compute cumulative distribution function
        cumulative_sum = np.cumsum(self.weights)
        cumulative_sum[-1] = 1.0  # Ensure last value is 1

        # Generate uniformly distributed starting point
        u = np.random.random() / self.N

        indices = np.zeros(self.N, dtype=int)
        i = 0
        for j in range(self.N):
            while u > cumulative_sum[i]:
                i += 1
            indices[j] = i
            u += 1.0 / self.N

        return indices

    def estimate(self):
        """
        Compute state estimate
        """
        return np.average(self.particles, weights=self.weights, axis=0)

    def get_covariance(self):
        """
        Compute covariance estimate
        """
        mean = self.estimate()
        diff = self.particles - mean
        return np.average(diff[:, :, np.newaxis] * diff[:, np.newaxis, :],
                         weights=self.weights, axis=0)

Resampling Techniques

Necessity of Resampling

Particle Degeneracy Problem

Over time, most particle weights will tend toward zero, with only a few particles having significant weights, leading to:

  1. Decreased computational efficiency
  2. Reduced estimation accuracy
  3. Loss of particle diversity

Resampling Algorithm Comparison

class AdvancedParticleFilter(ParticleFilter):
    """
    Particle filter with multiple resampling algorithms
    """

    def multinomial_resample(self):
        """Multinomial resampling"""
        indices = np.random.choice(range(self.N), size=self.N,
                                 p=self.weights, replace=True)
        return indices

    def stratified_resample(self):
        """Stratified resampling"""
        positions = (np.random.random(self.N) + np.arange(self.N)) / self.N
        indices = np.zeros(self.N, dtype=int)
        cumulative_sum = np.cumsum(self.weights)

        i, j = 0, 0
        while i < self.N:
            if positions[i] < cumulative_sum[j]:
                indices[i] = j
                i += 1
            else:
                j += 1

        return indices

    def residual_resample(self):
        """Residual resampling"""
        # Deterministic part
        N_copies = (self.N * self.weights).astype(int)
        N_residual = self.N - np.sum(N_copies)

        # Construct indices
        indices = []
        for i in range(self.N):
            indices.extend([i] * N_copies[i])

        # Residual part
        if N_residual > 0:
            residual_weights = self.N * self.weights - N_copies
            residual_weights /= np.sum(residual_weights)

            residual_indices = np.random.choice(range(self.N),
                                              size=N_residual,
                                              p=residual_weights,
                                              replace=True)
            indices.extend(residual_indices)

        return np.array(indices)

    def resample(self, method='systematic'):
        """
        Select resampling method
        """
        N_eff = 1.0 / np.sum(self.weights**2)

        if N_eff < self.resample_threshold * self.N:
            if method == 'multinomial':
                indices = self.multinomial_resample()
            elif method == 'stratified':
                indices = self.stratified_resample()
            elif method == 'residual':
                indices = self.residual_resample()
            else:  # systematic
                indices = self._systematic_resample()

            self.particles = self.particles[indices]
            self.weights = np.ones(self.N) / self.N

[Content continues with application examples, advanced filtering techniques, etc…]

Preview of Next Chapter

In the next chapter, we will learn about basic applications of Kalman filtering in finance, including stock price modeling, volatility estimation, and interest rate term structure modeling.