Chapter 6: Unscented Kalman Filter (UKF)

Haiyue
20min

Chapter 6: Unscented Kalman Filter (UKF)

Learning Objectives
  • Understand the basic principles of the unscented transform
  • Master sigma point selection and weight calculation
  • Compare advantages and disadvantages of UKF vs EKF

Unscented Transform Principle

The Unscented Kalman Filter (UKF) handles nonlinear systems through the Unscented Transform, avoiding the complex Jacobian calculations in EKF.

Core Idea

Philosophy of the Unscented Transform

“It is easier to approximate a probability distribution than to approximate a nonlinear function”

  • Represent probability distribution with deterministic sampling points (sigma points)
  • Propagate sampling points through nonlinear functions
  • Reconstruct statistical properties of transformed distribution

Sigma Point Generation

For an nn-dimensional state vector xx, generate 2n+12n+1 sigma points:

Mean point: χ0=xˉ\chi_0 = \bar{x}

Positive points: χi=xˉ+((n+λ)P)i,i=1,2,...,n\chi_i = \bar{x} + (\sqrt{(n+\lambda)P})_i, \quad i=1,2,...,n

Negative points: χi+n=xˉ((n+λ)P)i,i=1,2,...,n\chi_{i+n} = \bar{x} - (\sqrt{(n+\lambda)P})_i, \quad i=1,2,...,n

Where:

  • λ=α2(n+κ)n\lambda = \alpha^2(n+\kappa) - n: Scaling parameter
  • α[104,1]\alpha \in [10^{-4}, 1]: Sigma point distribution control parameter
  • κ\kappa: Usually set to 3n3-n or 0
  • ((n+λ)P)i(\sqrt{(n+\lambda)P})_i: i-th column of matrix square root

UKF Algorithm Flow

🔄 正在渲染 Mermaid 图表...

Weight Calculation

Mean weights: W0(m)=λn+λW_0^{(m)} = \frac{\lambda}{n+\lambda} Wi(m)=12(n+λ),i=1,2,...,2nW_i^{(m)} = \frac{1}{2(n+\lambda)}, \quad i=1,2,...,2n

Covariance weights: W0(c)=λn+λ+(1α2+β)W_0^{(c)} = \frac{\lambda}{n+\lambda} + (1-\alpha^2+\beta) Wi(c)=12(n+λ),i=1,2,...,2nW_i^{(c)} = \frac{1}{2(n+\lambda)}, \quad i=1,2,...,2n

where β\beta is the higher-order moment information parameter, typically β=2\beta=2 for Gaussian distributions.

UKF Implementation

import numpy as np
from scipy.linalg import sqrtm, cholesky
from typing import Callable, Optional

class UnscentedKalmanFilter:
    """
    Unscented Kalman filter implementation
    """

    def __init__(self, dim_x: int, dim_z: int,
                 f: Callable, h: Callable,
                 alpha: float = 1e-3,
                 beta: float = 2.0,
                 kappa: Optional[float] = None):
        """
        Initialize UKF

        Parameters:
        dim_x: State dimension
        dim_z: Observation dimension
        f: State transition function
        h: Observation function
        alpha: Sigma point distribution parameter
        beta: Higher-order moment parameter
        kappa: Auxiliary scaling parameter
        """
        self.dim_x = dim_x
        self.dim_z = dim_z
        self.f = f  # State transition function
        self.h = h  # Observation function

        # UKF parameters
        self.alpha = alpha
        self.beta = beta
        self.kappa = kappa if kappa is not None else 3 - dim_x

        # Compute sigma point parameters
        self.lambda_ = alpha**2 * (dim_x + self.kappa) - dim_x
        self.num_sigma = 2 * dim_x + 1

        # Compute weights
        self._compute_weights()

        # State and covariance
        self.x = np.zeros((dim_x, 1))
        self.P = np.eye(dim_x)

        # Noise covariances
        self.Q = np.eye(dim_x)
        self.R = np.eye(dim_z)

        # Sigma point storage
        self.sigma_points_x = np.zeros((self.num_sigma, dim_x))
        self.sigma_points_f = np.zeros((self.num_sigma, dim_x))
        self.sigma_points_h = np.zeros((self.num_sigma, dim_z))

        # Intermediate results
        self.x_prior = None
        self.P_prior = None
        self.K = None
        self.y = None
        self.S = None

    def _compute_weights(self):
        """Compute sigma point weights"""
        # Mean weights
        self.Wm = np.full(self.num_sigma, 1. / (2 * (self.dim_x + self.lambda_)))
        self.Wm[0] = self.lambda_ / (self.dim_x + self.lambda_)

        # Covariance weights
        self.Wc = self.Wm.copy()
        self.Wc[0] = self.lambda_ / (self.dim_x + self.lambda_) + (1 - self.alpha**2 + self.beta)

    def _generate_sigma_points(self, x: np.ndarray, P: np.ndarray):
        """
        Generate sigma points

        Parameters:
        x: State mean
        P: State covariance

        Returns:
        sigma_points: (2n+1, n) sigma point matrix
        """
        n = len(x)
        sigma_points = np.zeros((self.num_sigma, n))

        # Compute matrix square root
        try:
            sqrt = cholesky((n + self.lambda_) * P).T
        except np.linalg.LinAlgError:
            # If Cholesky decomposition fails, use eigenvalue decomposition
            eigenvals, eigenvecs = np.linalg.eigh((n + self.lambda_) * P)
            sqrt = eigenvecs @ np.diag(np.sqrt(np.maximum(eigenvals, 0)))

        # Generate sigma points
        sigma_points[0] = x.flatten()

        for i in range(n):
            sigma_points[i + 1] = x.flatten() + sqrt[i]
            sigma_points[n + i + 1] = x.flatten() - sqrt[i]

        return sigma_points

    def predict(self, u: Optional[np.ndarray] = None):
        """
        Prediction step
        """
        # Generate sigma points
        self.sigma_points_x = self._generate_sigma_points(self.x, self.P)

        # Propagate sigma points through state transition function
        for i in range(self.num_sigma):
            x_sigma = self.sigma_points_x[i].reshape(-1, 1)
            self.sigma_points_f[i] = self.f(x_sigma, u).flatten()

        # Compute predicted state mean
        self.x = np.zeros((self.dim_x, 1))
        for i in range(self.num_sigma):
            self.x += self.Wm[i] * self.sigma_points_f[i].reshape(-1, 1)

        # Compute predicted state covariance
        self.P = np.zeros((self.dim_x, self.dim_x))
        for i in range(self.num_sigma):
            y = self.sigma_points_f[i].reshape(-1, 1) - self.x
            self.P += self.Wc[i] * (y @ y.T)

        # Add process noise
        self.P += self.Q

        # Save prior estimate
        self.x_prior = self.x.copy()
        self.P_prior = self.P.copy()

    def update(self, z: np.ndarray):
        """
        Update step
        """
        # Ensure z is a column vector
        if z.ndim == 1:
            z = z.reshape(-1, 1)

        # Generate sigma points for current state
        sigma_points = self._generate_sigma_points(self.x, self.P)

        # Propagate sigma points through observation function
        for i in range(self.num_sigma):
            x_sigma = sigma_points[i].reshape(-1, 1)
            self.sigma_points_h[i] = self.h(x_sigma).flatten()

        # Compute predicted observation mean
        z_pred = np.zeros((self.dim_z, 1))
        for i in range(self.num_sigma):
            z_pred += self.Wm[i] * self.sigma_points_h[i].reshape(-1, 1)

        # Compute observation covariance
        Pzz = np.zeros((self.dim_z, self.dim_z))
        for i in range(self.num_sigma):
            y = self.sigma_points_h[i].reshape(-1, 1) - z_pred
            Pzz += self.Wc[i] * (y @ y.T)

        # Add observation noise
        Pzz += self.R

        # Compute cross covariance
        Pxz = np.zeros((self.dim_x, self.dim_z))
        for i in range(self.num_sigma):
            dx = sigma_points[i].reshape(-1, 1) - self.x
            dz = self.sigma_points_h[i].reshape(-1, 1) - z_pred
            Pxz += self.Wc[i] * (dx @ dz.T)

        # Compute Kalman gain
        self.K = Pxz @ np.linalg.inv(Pzz)

        # Compute residual
        self.y = z - z_pred

        # State update
        self.x = self.x + self.K @ self.y

        # Covariance update
        self.P = self.P - self.K @ Pzz @ self.K.T

        # Save intermediate results
        self.S = Pzz

UKF Application Examples

Example 1: Nonlinear Oscillator Tracking

def nonlinear_oscillator_tracking():
    """
    Nonlinear oscillator tracking example
    """

    # State: [position, velocity]
    # Nonlinear dynamics: position = position + velocity*dt, velocity = velocity - ω²*sin(position)*dt

    def f(x, u, dt=0.1):
        """Nonlinear state transition"""
        omega = 1.0  # Oscillator frequency
        pos, vel = x[0, 0], x[1, 0]

        new_pos = pos + vel * dt
        new_vel = vel - omega**2 * np.sin(pos) * dt

        return np.array([[new_pos], [new_vel]])

    def h(x):
        """Observation function (observe position)"""
        return np.array([[x[0, 0]]])

    # Create UKF
    ukf = UnscentedKalmanFilter(dim_x=2, dim_z=1, f=f, h=h)

    # Initialize
    ukf.x = np.array([[0.1], [0.0]])  # Initial state
    ukf.P = np.array([[0.1, 0], [0, 0.1]])  # Initial covariance

    ukf.Q = np.array([[0.001, 0], [0, 0.001]])  # Process noise
    ukf.R = np.array([[0.1]])  # Observation noise

    # Simulation and filtering
    true_states = []
    observations = []
    estimates = []

    # True system initial state
    true_state = np.array([0.1, 0.0])

    for k in range(200):
        # True system evolution
        dt = 0.1
        omega = 1.0
        pos, vel = true_state[0], true_state[1]
        new_pos = pos + vel * dt
        new_vel = vel - omega**2 * np.sin(pos) * dt
        true_state = np.array([new_pos, new_vel])

        true_states.append(true_state.copy())

        # Generate observation
        true_obs = true_state[0]
        noisy_obs = true_obs + np.random.normal(0, np.sqrt(0.1))
        observations.append(noisy_obs)

        # UKF steps
        ukf.predict()
        ukf.update(np.array([noisy_obs]))

        estimates.append(ukf.x.flatten())

    # Plot results
    import matplotlib.pyplot as plt

    true_states = np.array(true_states)
    estimates = np.array(estimates)
    observations = np.array(observations)

    plt.figure(figsize=(14, 10))

    plt.subplot(3, 1, 1)
    plt.plot(true_states[:, 0], 'g-', label='True Position', linewidth=2)
    plt.plot(observations, 'r.', label='Observations', alpha=0.7, markersize=3)
    plt.plot(estimates[:, 0], 'b-', label='UKF Estimate', linewidth=2)
    plt.legend()
    plt.title('Nonlinear Oscillator Position Tracking')
    plt.ylabel('Position')

    plt.subplot(3, 1, 2)
    plt.plot(true_states[:, 1], 'g-', label='True Velocity', linewidth=2)
    plt.plot(estimates[:, 1], 'b-', label='UKF Estimate', linewidth=2)
    plt.legend()
    plt.ylabel('Velocity')

    plt.subplot(3, 1, 3)
    pos_error = true_states[:, 0] - estimates[:, 0]
    vel_error = true_states[:, 1] - estimates[:, 1]
    plt.plot(pos_error, 'r-', label='Position Error')
    plt.plot(vel_error, 'b-', label='Velocity Error')
    plt.axhline(y=0, color='k', linestyle='--', alpha=0.5)
    plt.legend()
    plt.xlabel('Time Step')
    plt.ylabel('Error')

    plt.tight_layout()
    plt.show()

    return true_states, observations, estimates

# Run example
# nonlinear_oscillator_tracking()

Example 2: Heston Stochastic Volatility Model in Finance

def heston_volatility_ukf():
    """
    UKF estimation for Heston stochastic volatility model
    """

    # State: [log_price, variance]
    # Heston model:
    # dS = μS dt + √V S dW₁
    # dV = κ(θ - V) dt + σ√V dW₂

    def f(x, u, dt=1/252):  # Daily frequency data
        """Heston model state transition (Euler discretization)"""
        log_S, V = x[0, 0], x[1, 0]

        # Heston parameters
        mu = 0.05      # Drift rate
        kappa = 2.0    # Mean reversion speed
        theta = 0.04   # Long-term variance
        sigma = 0.3    # Vol of vol

        # Ensure non-negative variance
        V = max(V, 1e-8)

        # Euler discretization
        new_log_S = log_S + (mu - 0.5 * V) * dt
        new_V = V + kappa * (theta - V) * dt

        # Ensure new variance is non-negative
        new_V = max(new_V, 1e-8)

        return np.array([[new_log_S], [new_V]])

    def h(x):
        """Observation function: observe stock price"""
        log_S = x[0, 0]
        return np.array([[np.exp(log_S)]])

    # Create UKF
    ukf = UnscentedKalmanFilter(dim_x=2, dim_z=1, f=f, h=h,
                               alpha=1e-3, beta=2.0, kappa=1.0)

    # Initialize
    ukf.x = np.array([[np.log(100)], [0.04]])  # log(S₀), V₀
    ukf.P = np.array([[0.01, 0], [0, 0.001]])

    # Noise covariances
    dt = 1/252
    ukf.Q = np.array([[0.01*dt, 0], [0, 0.001*dt]])
    ukf.R = np.array([[1.0]])

    # Simulate data
    prices = []
    volatilities = []
    estimated_vols = []

    # Simulate Heston process to generate true data
    np.random.seed(42)
    S = 100.0
    V = 0.04

    for k in range(252):  # One year of data
        # Simulate true price
        dW1 = np.random.normal(0, np.sqrt(dt))
        dW2 = np.random.normal(0, np.sqrt(dt))

        # Heston parameters
        mu, kappa, theta, sigma = 0.05, 2.0, 0.04, 0.3
        rho = -0.5  # Correlation

        # Correlated random numbers
        dW2 = rho * dW1 + np.sqrt(1 - rho**2) * dW2

        # Update price and volatility
        V = max(V + kappa * (theta - V) * dt + sigma * np.sqrt(max(V, 0)) * dW2, 1e-8)
        S = S * np.exp((mu - 0.5 * V) * dt + np.sqrt(V) * dW1)

        # Add observation noise
        observed_price = S + np.random.normal(0, 1.0)

        prices.append(observed_price)
        volatilities.append(V)

        # UKF steps
        ukf.predict()
        ukf.update(np.array([observed_price]))

        estimated_vols.append(ukf.x[1, 0])

    # Plot results
    plt.figure(figsize=(14, 8))

    plt.subplot(2, 1, 1)
    plt.plot(prices, 'b-', label='Observed Prices', linewidth=1)
    plt.legend()
    plt.title('Heston Model: Price and Volatility Estimation')
    plt.ylabel('Price')

    plt.subplot(2, 1, 2)
    plt.plot(volatilities, 'g-', label='True Volatility', linewidth=2)
    plt.plot(estimated_vols, 'r--', label='UKF Estimated Volatility', linewidth=2)
    plt.legend()
    plt.xlabel('Trading Days')
    plt.ylabel('Volatility')

    plt.tight_layout()
    plt.show()

    return prices, volatilities, estimated_vols

# Run example
# heston_volatility_ukf()

UKF vs EKF Comparison

Advantages Comparison

FeatureUKFEKF
Jacobian ComputationNot neededRequires analytical or numerical differentiation
Implementation ComplexityMediumHigh (requires derivatives)
Numerical StabilityBetterDepends on Jacobian accuracy
Higher-Order TermsCaptures up to 3rd orderOnly 1st order
Computational ComplexityO(n³)O(n³)
Parameter TuningNeed to tune α,β,κFewer parameters

Performance Comparison Experiment

def compare_ukf_ekf():
    """
    UKF vs EKF performance comparison
    """

    # Highly nonlinear system
    def f_nonlinear(x, u):
        """Strongly nonlinear state transition"""
        return np.array([[x[0, 0] + 0.1 * np.sin(x[0, 0])],
                        [x[1, 0] + 0.1 * np.cos(x[0, 0])]])

    def h_nonlinear(x):
        """Nonlinear observation"""
        return np.array([[x[0, 0]**2 + x[1, 0]**2]])

    # Jacobian functions needed for EKF
    def f_jac(x, u):
        return np.array([[1 + 0.1 * np.cos(x[0, 0]), 0],
                        [-0.1 * np.sin(x[0, 0]), 1]])

    def h_jac(x):
        return np.array([[2 * x[0, 0], 2 * x[1, 0]]])

    # Create filters
    ukf = UnscentedKalmanFilter(2, 1, f_nonlinear, h_nonlinear)
    ekf = ExtendedKalmanFilter(2, 1, f_nonlinear, h_nonlinear, f_jac, h_jac)

    # Same initialization
    for filt in [ukf, ekf]:
        filt.x = np.array([[1.0], [1.0]])
        filt.P = np.eye(2)
        filt.Q = 0.01 * np.eye(2)
        filt.R = np.array([[0.1]])

    # Run comparison
    true_states = []
    ukf_estimates = []
    ekf_estimates = []

    true_x = np.array([1.0, 1.0])

    for k in range(100):
        # True state evolution
        true_x = f_nonlinear(true_x.reshape(-1, 1), None).flatten()
        true_x += np.random.multivariate_normal([0, 0], 0.01 * np.eye(2))
        true_states.append(true_x.copy())

        # Observation
        obs = h_nonlinear(true_x.reshape(-1, 1)) + np.random.normal(0, np.sqrt(0.1))

        # UKF
        ukf.predict()
        ukf.update(obs)
        ukf_estimates.append(ukf.x.flatten())

        # EKF
        ekf.predict()
        ekf.update(obs)
        ekf_estimates.append(ekf.x.flatten())

    # Compute errors
    true_states = np.array(true_states)
    ukf_estimates = np.array(ukf_estimates)
    ekf_estimates = np.array(ekf_estimates)

    ukf_rmse = np.sqrt(np.mean((true_states - ukf_estimates)**2, axis=0))
    ekf_rmse = np.sqrt(np.mean((true_states - ekf_estimates)**2, axis=0))

    print(f"UKF RMSE: {ukf_rmse}")
    print(f"EKF RMSE: {ekf_rmse}")

    return true_states, ukf_estimates, ekf_estimates

# Run comparison
# compare_ukf_ekf()

Parameter Tuning Guide

Alpha Parameter

Role of Alpha
  • Alpha → 0: Sigma points concentrate near mean, suitable for linear systems
  • Alpha → 1: Sigma points spread wider, suitable for strongly nonlinear systems
  • Recommended values: 1e-4 to 1e-1

Beta Parameter

Meaning of Beta
  • Beta = 2: Optimal value for Gaussian distributions
  • Beta > 2: Suitable for heavy-tailed distributions
  • Beta < 2: Suitable for light-tailed distributions

Adaptive Parameter Tuning

class AdaptiveUKF(UnscentedKalmanFilter):
    """
    UKF with adaptive parameter tuning
    """

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.innovation_history = []

    def adapt_parameters(self, window_size=20):
        """Adaptive parameter tuning based on innovation sequence"""
        if len(self.innovation_history) < window_size:
            return

        # Compute innovation statistics
        innovations = np.array(self.innovation_history[-window_size:])
        innovation_norm = np.linalg.norm(innovations, axis=1)

        # If innovations too large, increase alpha
        if np.mean(innovation_norm) > np.std(innovation_norm) * 2:
            self.alpha = min(self.alpha * 1.1, 1.0)
            self._compute_weights()

        # If innovations too small, decrease alpha
        elif np.mean(innovation_norm) < np.std(innovation_norm) * 0.5:
            self.alpha = max(self.alpha * 0.9, 1e-4)
            self._compute_weights()

    def update(self, z):
        """Update with adaptation"""
        super().update(z)
        self.innovation_history.append(self.y.flatten())
        self.adapt_parameters()
Preview of Next Chapter

In the next chapter, we will learn about particle filtering, a powerful method for handling highly nonlinear and non-Gaussian systems, not limited to Gaussian assumptions.