Chapter 6: Unscented Kalman Filter (UKF)
Haiyue
20min
Chapter 6: Unscented Kalman Filter (UKF)
Learning Objectives
- Understand the basic principles of the unscented transform
- Master sigma point selection and weight calculation
- Compare advantages and disadvantages of UKF vs EKF
Unscented Transform Principle
The Unscented Kalman Filter (UKF) handles nonlinear systems through the Unscented Transform, avoiding the complex Jacobian calculations in EKF.
Core Idea
Philosophy of the Unscented Transform
“It is easier to approximate a probability distribution than to approximate a nonlinear function”
- Represent probability distribution with deterministic sampling points (sigma points)
- Propagate sampling points through nonlinear functions
- Reconstruct statistical properties of transformed distribution
Sigma Point Generation
For an -dimensional state vector , generate sigma points:
Mean point:
Positive points:
Negative points:
Where:
- : Scaling parameter
- : Sigma point distribution control parameter
- : Usually set to or 0
- : i-th column of matrix square root
UKF Algorithm Flow
🔄 正在渲染 Mermaid 图表...
Weight Calculation
Mean weights:
Covariance weights:
where is the higher-order moment information parameter, typically for Gaussian distributions.
UKF Implementation
import numpy as np
from scipy.linalg import sqrtm, cholesky
from typing import Callable, Optional
class UnscentedKalmanFilter:
"""
Unscented Kalman filter implementation
"""
def __init__(self, dim_x: int, dim_z: int,
f: Callable, h: Callable,
alpha: float = 1e-3,
beta: float = 2.0,
kappa: Optional[float] = None):
"""
Initialize UKF
Parameters:
dim_x: State dimension
dim_z: Observation dimension
f: State transition function
h: Observation function
alpha: Sigma point distribution parameter
beta: Higher-order moment parameter
kappa: Auxiliary scaling parameter
"""
self.dim_x = dim_x
self.dim_z = dim_z
self.f = f # State transition function
self.h = h # Observation function
# UKF parameters
self.alpha = alpha
self.beta = beta
self.kappa = kappa if kappa is not None else 3 - dim_x
# Compute sigma point parameters
self.lambda_ = alpha**2 * (dim_x + self.kappa) - dim_x
self.num_sigma = 2 * dim_x + 1
# Compute weights
self._compute_weights()
# State and covariance
self.x = np.zeros((dim_x, 1))
self.P = np.eye(dim_x)
# Noise covariances
self.Q = np.eye(dim_x)
self.R = np.eye(dim_z)
# Sigma point storage
self.sigma_points_x = np.zeros((self.num_sigma, dim_x))
self.sigma_points_f = np.zeros((self.num_sigma, dim_x))
self.sigma_points_h = np.zeros((self.num_sigma, dim_z))
# Intermediate results
self.x_prior = None
self.P_prior = None
self.K = None
self.y = None
self.S = None
def _compute_weights(self):
"""Compute sigma point weights"""
# Mean weights
self.Wm = np.full(self.num_sigma, 1. / (2 * (self.dim_x + self.lambda_)))
self.Wm[0] = self.lambda_ / (self.dim_x + self.lambda_)
# Covariance weights
self.Wc = self.Wm.copy()
self.Wc[0] = self.lambda_ / (self.dim_x + self.lambda_) + (1 - self.alpha**2 + self.beta)
def _generate_sigma_points(self, x: np.ndarray, P: np.ndarray):
"""
Generate sigma points
Parameters:
x: State mean
P: State covariance
Returns:
sigma_points: (2n+1, n) sigma point matrix
"""
n = len(x)
sigma_points = np.zeros((self.num_sigma, n))
# Compute matrix square root
try:
sqrt = cholesky((n + self.lambda_) * P).T
except np.linalg.LinAlgError:
# If Cholesky decomposition fails, use eigenvalue decomposition
eigenvals, eigenvecs = np.linalg.eigh((n + self.lambda_) * P)
sqrt = eigenvecs @ np.diag(np.sqrt(np.maximum(eigenvals, 0)))
# Generate sigma points
sigma_points[0] = x.flatten()
for i in range(n):
sigma_points[i + 1] = x.flatten() + sqrt[i]
sigma_points[n + i + 1] = x.flatten() - sqrt[i]
return sigma_points
def predict(self, u: Optional[np.ndarray] = None):
"""
Prediction step
"""
# Generate sigma points
self.sigma_points_x = self._generate_sigma_points(self.x, self.P)
# Propagate sigma points through state transition function
for i in range(self.num_sigma):
x_sigma = self.sigma_points_x[i].reshape(-1, 1)
self.sigma_points_f[i] = self.f(x_sigma, u).flatten()
# Compute predicted state mean
self.x = np.zeros((self.dim_x, 1))
for i in range(self.num_sigma):
self.x += self.Wm[i] * self.sigma_points_f[i].reshape(-1, 1)
# Compute predicted state covariance
self.P = np.zeros((self.dim_x, self.dim_x))
for i in range(self.num_sigma):
y = self.sigma_points_f[i].reshape(-1, 1) - self.x
self.P += self.Wc[i] * (y @ y.T)
# Add process noise
self.P += self.Q
# Save prior estimate
self.x_prior = self.x.copy()
self.P_prior = self.P.copy()
def update(self, z: np.ndarray):
"""
Update step
"""
# Ensure z is a column vector
if z.ndim == 1:
z = z.reshape(-1, 1)
# Generate sigma points for current state
sigma_points = self._generate_sigma_points(self.x, self.P)
# Propagate sigma points through observation function
for i in range(self.num_sigma):
x_sigma = sigma_points[i].reshape(-1, 1)
self.sigma_points_h[i] = self.h(x_sigma).flatten()
# Compute predicted observation mean
z_pred = np.zeros((self.dim_z, 1))
for i in range(self.num_sigma):
z_pred += self.Wm[i] * self.sigma_points_h[i].reshape(-1, 1)
# Compute observation covariance
Pzz = np.zeros((self.dim_z, self.dim_z))
for i in range(self.num_sigma):
y = self.sigma_points_h[i].reshape(-1, 1) - z_pred
Pzz += self.Wc[i] * (y @ y.T)
# Add observation noise
Pzz += self.R
# Compute cross covariance
Pxz = np.zeros((self.dim_x, self.dim_z))
for i in range(self.num_sigma):
dx = sigma_points[i].reshape(-1, 1) - self.x
dz = self.sigma_points_h[i].reshape(-1, 1) - z_pred
Pxz += self.Wc[i] * (dx @ dz.T)
# Compute Kalman gain
self.K = Pxz @ np.linalg.inv(Pzz)
# Compute residual
self.y = z - z_pred
# State update
self.x = self.x + self.K @ self.y
# Covariance update
self.P = self.P - self.K @ Pzz @ self.K.T
# Save intermediate results
self.S = Pzz
UKF Application Examples
Example 1: Nonlinear Oscillator Tracking
def nonlinear_oscillator_tracking():
"""
Nonlinear oscillator tracking example
"""
# State: [position, velocity]
# Nonlinear dynamics: position = position + velocity*dt, velocity = velocity - ω²*sin(position)*dt
def f(x, u, dt=0.1):
"""Nonlinear state transition"""
omega = 1.0 # Oscillator frequency
pos, vel = x[0, 0], x[1, 0]
new_pos = pos + vel * dt
new_vel = vel - omega**2 * np.sin(pos) * dt
return np.array([[new_pos], [new_vel]])
def h(x):
"""Observation function (observe position)"""
return np.array([[x[0, 0]]])
# Create UKF
ukf = UnscentedKalmanFilter(dim_x=2, dim_z=1, f=f, h=h)
# Initialize
ukf.x = np.array([[0.1], [0.0]]) # Initial state
ukf.P = np.array([[0.1, 0], [0, 0.1]]) # Initial covariance
ukf.Q = np.array([[0.001, 0], [0, 0.001]]) # Process noise
ukf.R = np.array([[0.1]]) # Observation noise
# Simulation and filtering
true_states = []
observations = []
estimates = []
# True system initial state
true_state = np.array([0.1, 0.0])
for k in range(200):
# True system evolution
dt = 0.1
omega = 1.0
pos, vel = true_state[0], true_state[1]
new_pos = pos + vel * dt
new_vel = vel - omega**2 * np.sin(pos) * dt
true_state = np.array([new_pos, new_vel])
true_states.append(true_state.copy())
# Generate observation
true_obs = true_state[0]
noisy_obs = true_obs + np.random.normal(0, np.sqrt(0.1))
observations.append(noisy_obs)
# UKF steps
ukf.predict()
ukf.update(np.array([noisy_obs]))
estimates.append(ukf.x.flatten())
# Plot results
import matplotlib.pyplot as plt
true_states = np.array(true_states)
estimates = np.array(estimates)
observations = np.array(observations)
plt.figure(figsize=(14, 10))
plt.subplot(3, 1, 1)
plt.plot(true_states[:, 0], 'g-', label='True Position', linewidth=2)
plt.plot(observations, 'r.', label='Observations', alpha=0.7, markersize=3)
plt.plot(estimates[:, 0], 'b-', label='UKF Estimate', linewidth=2)
plt.legend()
plt.title('Nonlinear Oscillator Position Tracking')
plt.ylabel('Position')
plt.subplot(3, 1, 2)
plt.plot(true_states[:, 1], 'g-', label='True Velocity', linewidth=2)
plt.plot(estimates[:, 1], 'b-', label='UKF Estimate', linewidth=2)
plt.legend()
plt.ylabel('Velocity')
plt.subplot(3, 1, 3)
pos_error = true_states[:, 0] - estimates[:, 0]
vel_error = true_states[:, 1] - estimates[:, 1]
plt.plot(pos_error, 'r-', label='Position Error')
plt.plot(vel_error, 'b-', label='Velocity Error')
plt.axhline(y=0, color='k', linestyle='--', alpha=0.5)
plt.legend()
plt.xlabel('Time Step')
plt.ylabel('Error')
plt.tight_layout()
plt.show()
return true_states, observations, estimates
# Run example
# nonlinear_oscillator_tracking()
Example 2: Heston Stochastic Volatility Model in Finance
def heston_volatility_ukf():
"""
UKF estimation for Heston stochastic volatility model
"""
# State: [log_price, variance]
# Heston model:
# dS = μS dt + √V S dW₁
# dV = κ(θ - V) dt + σ√V dW₂
def f(x, u, dt=1/252): # Daily frequency data
"""Heston model state transition (Euler discretization)"""
log_S, V = x[0, 0], x[1, 0]
# Heston parameters
mu = 0.05 # Drift rate
kappa = 2.0 # Mean reversion speed
theta = 0.04 # Long-term variance
sigma = 0.3 # Vol of vol
# Ensure non-negative variance
V = max(V, 1e-8)
# Euler discretization
new_log_S = log_S + (mu - 0.5 * V) * dt
new_V = V + kappa * (theta - V) * dt
# Ensure new variance is non-negative
new_V = max(new_V, 1e-8)
return np.array([[new_log_S], [new_V]])
def h(x):
"""Observation function: observe stock price"""
log_S = x[0, 0]
return np.array([[np.exp(log_S)]])
# Create UKF
ukf = UnscentedKalmanFilter(dim_x=2, dim_z=1, f=f, h=h,
alpha=1e-3, beta=2.0, kappa=1.0)
# Initialize
ukf.x = np.array([[np.log(100)], [0.04]]) # log(S₀), V₀
ukf.P = np.array([[0.01, 0], [0, 0.001]])
# Noise covariances
dt = 1/252
ukf.Q = np.array([[0.01*dt, 0], [0, 0.001*dt]])
ukf.R = np.array([[1.0]])
# Simulate data
prices = []
volatilities = []
estimated_vols = []
# Simulate Heston process to generate true data
np.random.seed(42)
S = 100.0
V = 0.04
for k in range(252): # One year of data
# Simulate true price
dW1 = np.random.normal(0, np.sqrt(dt))
dW2 = np.random.normal(0, np.sqrt(dt))
# Heston parameters
mu, kappa, theta, sigma = 0.05, 2.0, 0.04, 0.3
rho = -0.5 # Correlation
# Correlated random numbers
dW2 = rho * dW1 + np.sqrt(1 - rho**2) * dW2
# Update price and volatility
V = max(V + kappa * (theta - V) * dt + sigma * np.sqrt(max(V, 0)) * dW2, 1e-8)
S = S * np.exp((mu - 0.5 * V) * dt + np.sqrt(V) * dW1)
# Add observation noise
observed_price = S + np.random.normal(0, 1.0)
prices.append(observed_price)
volatilities.append(V)
# UKF steps
ukf.predict()
ukf.update(np.array([observed_price]))
estimated_vols.append(ukf.x[1, 0])
# Plot results
plt.figure(figsize=(14, 8))
plt.subplot(2, 1, 1)
plt.plot(prices, 'b-', label='Observed Prices', linewidth=1)
plt.legend()
plt.title('Heston Model: Price and Volatility Estimation')
plt.ylabel('Price')
plt.subplot(2, 1, 2)
plt.plot(volatilities, 'g-', label='True Volatility', linewidth=2)
plt.plot(estimated_vols, 'r--', label='UKF Estimated Volatility', linewidth=2)
plt.legend()
plt.xlabel('Trading Days')
plt.ylabel('Volatility')
plt.tight_layout()
plt.show()
return prices, volatilities, estimated_vols
# Run example
# heston_volatility_ukf()
UKF vs EKF Comparison
Advantages Comparison
| Feature | UKF | EKF |
|---|---|---|
| Jacobian Computation | Not needed | Requires analytical or numerical differentiation |
| Implementation Complexity | Medium | High (requires derivatives) |
| Numerical Stability | Better | Depends on Jacobian accuracy |
| Higher-Order Terms | Captures up to 3rd order | Only 1st order |
| Computational Complexity | O(n³) | O(n³) |
| Parameter Tuning | Need to tune α,β,κ | Fewer parameters |
Performance Comparison Experiment
def compare_ukf_ekf():
"""
UKF vs EKF performance comparison
"""
# Highly nonlinear system
def f_nonlinear(x, u):
"""Strongly nonlinear state transition"""
return np.array([[x[0, 0] + 0.1 * np.sin(x[0, 0])],
[x[1, 0] + 0.1 * np.cos(x[0, 0])]])
def h_nonlinear(x):
"""Nonlinear observation"""
return np.array([[x[0, 0]**2 + x[1, 0]**2]])
# Jacobian functions needed for EKF
def f_jac(x, u):
return np.array([[1 + 0.1 * np.cos(x[0, 0]), 0],
[-0.1 * np.sin(x[0, 0]), 1]])
def h_jac(x):
return np.array([[2 * x[0, 0], 2 * x[1, 0]]])
# Create filters
ukf = UnscentedKalmanFilter(2, 1, f_nonlinear, h_nonlinear)
ekf = ExtendedKalmanFilter(2, 1, f_nonlinear, h_nonlinear, f_jac, h_jac)
# Same initialization
for filt in [ukf, ekf]:
filt.x = np.array([[1.0], [1.0]])
filt.P = np.eye(2)
filt.Q = 0.01 * np.eye(2)
filt.R = np.array([[0.1]])
# Run comparison
true_states = []
ukf_estimates = []
ekf_estimates = []
true_x = np.array([1.0, 1.0])
for k in range(100):
# True state evolution
true_x = f_nonlinear(true_x.reshape(-1, 1), None).flatten()
true_x += np.random.multivariate_normal([0, 0], 0.01 * np.eye(2))
true_states.append(true_x.copy())
# Observation
obs = h_nonlinear(true_x.reshape(-1, 1)) + np.random.normal(0, np.sqrt(0.1))
# UKF
ukf.predict()
ukf.update(obs)
ukf_estimates.append(ukf.x.flatten())
# EKF
ekf.predict()
ekf.update(obs)
ekf_estimates.append(ekf.x.flatten())
# Compute errors
true_states = np.array(true_states)
ukf_estimates = np.array(ukf_estimates)
ekf_estimates = np.array(ekf_estimates)
ukf_rmse = np.sqrt(np.mean((true_states - ukf_estimates)**2, axis=0))
ekf_rmse = np.sqrt(np.mean((true_states - ekf_estimates)**2, axis=0))
print(f"UKF RMSE: {ukf_rmse}")
print(f"EKF RMSE: {ekf_rmse}")
return true_states, ukf_estimates, ekf_estimates
# Run comparison
# compare_ukf_ekf()
Parameter Tuning Guide
Alpha Parameter
Role of Alpha
- Alpha → 0: Sigma points concentrate near mean, suitable for linear systems
- Alpha → 1: Sigma points spread wider, suitable for strongly nonlinear systems
- Recommended values: 1e-4 to 1e-1
Beta Parameter
Meaning of Beta
- Beta = 2: Optimal value for Gaussian distributions
- Beta > 2: Suitable for heavy-tailed distributions
- Beta < 2: Suitable for light-tailed distributions
Adaptive Parameter Tuning
class AdaptiveUKF(UnscentedKalmanFilter):
"""
UKF with adaptive parameter tuning
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.innovation_history = []
def adapt_parameters(self, window_size=20):
"""Adaptive parameter tuning based on innovation sequence"""
if len(self.innovation_history) < window_size:
return
# Compute innovation statistics
innovations = np.array(self.innovation_history[-window_size:])
innovation_norm = np.linalg.norm(innovations, axis=1)
# If innovations too large, increase alpha
if np.mean(innovation_norm) > np.std(innovation_norm) * 2:
self.alpha = min(self.alpha * 1.1, 1.0)
self._compute_weights()
# If innovations too small, decrease alpha
elif np.mean(innovation_norm) < np.std(innovation_norm) * 0.5:
self.alpha = max(self.alpha * 0.9, 1e-4)
self._compute_weights()
def update(self, z):
"""Update with adaptation"""
super().update(z)
self.innovation_history.append(self.y.flatten())
self.adapt_parameters()
Preview of Next Chapter
In the next chapter, we will learn about particle filtering, a powerful method for handling highly nonlinear and non-Gaussian systems, not limited to Gaussian assumptions.