Chapter 7: Particle Filtering and Advanced Filtering Techniques
Chapter 7: Particle Filtering and Advanced Filtering Techniques
- Understand the basic ideas and algorithm flow of particle filtering
- Master resampling techniques and the particle degeneracy problem
- Learn about other advanced filtering methods (Ensemble Kalman Filter, etc.)
Basic Principles of Particle Filtering
Particle Filtering (PF) is a non-parametric Bayesian filtering technique based on Monte Carlo methods, suitable for highly nonlinear and non-Gaussian systems.
Core Ideas
Use a large number of random samples (particles) to approximate the posterior probability distribution, where each particle represents a possible state in the state space.
Probability Distribution Approximation:
Where:
- : State of the i-th particle
- : Weight of the i-th particle
- : Dirac delta function
- : Number of particles
Algorithm Flow
Sequential Importance Sampling (SIS)
Importance Sampling Principle
Basic Idea: Sample from an easy-to-sample distribution , using weights to compensate for distribution differences.
For target distribution :
where the weight is:
Sequential Importance Sampling Algorithm
Recursive Weight Update:
State Estimation:
Particle Filter Implementation
import numpy as np
from scipy.stats import multivariate_normal
import matplotlib.pyplot as plt
from typing import Callable, Optional
class ParticleFilter:
"""
Particle filter implementation
"""
def __init__(self, num_particles: int, dim_x: int, dim_z: int,
f: Callable, h: Callable,
initial_state_sampler: Callable):
"""
Initialize particle filter
Parameters:
num_particles: Number of particles
dim_x: State dimension
dim_z: Observation dimension
f: State transition function
h: Observation function
initial_state_sampler: Initial state sampling function
"""
self.N = num_particles
self.dim_x = dim_x
self.dim_z = dim_z
self.f = f # State transition function
self.h = h # Observation function
# Particles and weights
self.particles = np.zeros((num_particles, dim_x))
self.weights = np.ones(num_particles) / num_particles
# Noise covariances
self.Q = np.eye(dim_x) # Process noise
self.R = np.eye(dim_z) # Observation noise
# Initialize particles
for i in range(num_particles):
self.particles[i] = initial_state_sampler()
# Resampling parameters
self.resample_threshold = 0.5 # Effective sample ratio threshold
def predict(self, u: Optional[np.ndarray] = None):
"""
Prediction step: propagate particles according to state transition model
"""
for i in range(self.N):
# State transition + process noise
self.particles[i] = self.f(self.particles[i], u) + \
np.random.multivariate_normal(np.zeros(self.dim_x), self.Q)
def update(self, z: np.ndarray):
"""
Update step: update particle weights according to observations
"""
# Compute likelihood for each particle
for i in range(self.N):
# Predicted observation
z_pred = self.h(self.particles[i])
# Compute likelihood p(z|x_i)
likelihood = multivariate_normal.pdf(z, z_pred, self.R)
# Update weight
self.weights[i] *= likelihood
# Normalize weights
self.weights += 1e-300 # Avoid division by zero
self.weights /= np.sum(self.weights)
def resample(self):
"""
Resampling: solve particle degeneracy problem
"""
# Compute effective sample size
N_eff = 1.0 / np.sum(self.weights**2)
if N_eff < self.resample_threshold * self.N:
# Systematic resampling
indices = self._systematic_resample()
# Update particles and weights
self.particles = self.particles[indices]
self.weights = np.ones(self.N) / self.N
def _systematic_resample(self):
"""
Systematic resampling algorithm
"""
# Compute cumulative distribution function
cumulative_sum = np.cumsum(self.weights)
cumulative_sum[-1] = 1.0 # Ensure last value is 1
# Generate uniformly distributed starting point
u = np.random.random() / self.N
indices = np.zeros(self.N, dtype=int)
i = 0
for j in range(self.N):
while u > cumulative_sum[i]:
i += 1
indices[j] = i
u += 1.0 / self.N
return indices
def estimate(self):
"""
Compute state estimate
"""
return np.average(self.particles, weights=self.weights, axis=0)
def get_covariance(self):
"""
Compute covariance estimate
"""
mean = self.estimate()
diff = self.particles - mean
return np.average(diff[:, :, np.newaxis] * diff[:, np.newaxis, :],
weights=self.weights, axis=0)
Resampling Techniques
Necessity of Resampling
Over time, most particle weights will tend toward zero, with only a few particles having significant weights, leading to:
- Decreased computational efficiency
- Reduced estimation accuracy
- Loss of particle diversity
Resampling Algorithm Comparison
class AdvancedParticleFilter(ParticleFilter):
"""
Particle filter with multiple resampling algorithms
"""
def multinomial_resample(self):
"""Multinomial resampling"""
indices = np.random.choice(range(self.N), size=self.N,
p=self.weights, replace=True)
return indices
def stratified_resample(self):
"""Stratified resampling"""
positions = (np.random.random(self.N) + np.arange(self.N)) / self.N
indices = np.zeros(self.N, dtype=int)
cumulative_sum = np.cumsum(self.weights)
i, j = 0, 0
while i < self.N:
if positions[i] < cumulative_sum[j]:
indices[i] = j
i += 1
else:
j += 1
return indices
def residual_resample(self):
"""Residual resampling"""
# Deterministic part
N_copies = (self.N * self.weights).astype(int)
N_residual = self.N - np.sum(N_copies)
# Construct indices
indices = []
for i in range(self.N):
indices.extend([i] * N_copies[i])
# Residual part
if N_residual > 0:
residual_weights = self.N * self.weights - N_copies
residual_weights /= np.sum(residual_weights)
residual_indices = np.random.choice(range(self.N),
size=N_residual,
p=residual_weights,
replace=True)
indices.extend(residual_indices)
return np.array(indices)
def resample(self, method='systematic'):
"""
Select resampling method
"""
N_eff = 1.0 / np.sum(self.weights**2)
if N_eff < self.resample_threshold * self.N:
if method == 'multinomial':
indices = self.multinomial_resample()
elif method == 'stratified':
indices = self.stratified_resample()
elif method == 'residual':
indices = self.residual_resample()
else: # systematic
indices = self._systematic_resample()
self.particles = self.particles[indices]
self.weights = np.ones(self.N) / self.N
[Content continues with application examples, advanced filtering techniques, etc…]
In the next chapter, we will learn about basic applications of Kalman filtering in finance, including stock price modeling, volatility estimation, and interest rate term structure modeling.