Chapter 5: Extended Kalman Filter (EKF)

Haiyue
17min

Chapter 5: Extended Kalman Filter (EKF)

Learning Objectives
  • Understand the challenges of nonlinear systems and solution approaches
  • Master Jacobian matrix calculation and linearization methods
  • Learn EKF algorithm implementation and application scenarios

Challenges of Nonlinear Systems

Standard Kalman filtering applies to linear Gaussian systems, but many real-world systems are nonlinear. The Extended Kalman Filter (EKF) handles nonlinear systems through local linearization.

Nonlinear State-Space Model

Nonlinear state transition: xk=f(xk1,uk)+wkx_k = f(x_{k-1}, u_k) + w_k

Nonlinear observation equation: zk=h(xk)+vkz_k = h(x_k) + v_k

Where:

  • f()f(\cdot): Nonlinear state transition function
  • h()h(\cdot): Nonlinear observation function
  • wkN(0,Qk)w_k \sim \mathcal{N}(0, Q_k): Process noise
  • vkN(0,Rk)v_k \sim \mathcal{N}(0, R_k): Observation noise

Linearization Principle

The core idea of EKF is to perform Taylor expansion around the current estimate point and retain only the first-order terms for linearization.

Jacobian Matrix

State transition Jacobian matrix: Fk=fxx=x^k1k1,u=ukF_k = \frac{\partial f}{\partial x}\bigg|_{x=\hat{x}_{k-1|k-1}, u=u_k}

Observation Jacobian matrix: Hk=hxx=x^kk1H_k = \frac{\partial h}{\partial x}\bigg|_{x=\hat{x}_{k|k-1}}

Linearization Process

🔄 正在渲染 Mermaid 图表...

EKF Algorithm Steps

Prediction Step

State prediction: x^kk1=f(x^k1k1,uk)\hat{x}_{k|k-1} = f(\hat{x}_{k-1|k-1}, u_k)

Covariance prediction: Pkk1=FkPk1k1FkT+QkP_{k|k-1} = F_k P_{k-1|k-1} F_k^T + Q_k

where FkF_k is the Jacobian matrix computed at x^k1k1\hat{x}_{k-1|k-1}.

Update Step

Kalman gain: Kk=Pkk1HkT(HkPkk1HkT+Rk)1K_k = P_{k|k-1} H_k^T (H_k P_{k|k-1} H_k^T + R_k)^{-1}

State update: x^kk=x^kk1+Kk(zkh(x^kk1))\hat{x}_{k|k} = \hat{x}_{k|k-1} + K_k (z_k - h(\hat{x}_{k|k-1}))

Covariance update: Pkk=(IKkHk)Pkk1P_{k|k} = (I - K_k H_k) P_{k|k-1}

where HkH_k is the Jacobian matrix computed at x^kk1\hat{x}_{k|k-1}.

EKF Implementation

import numpy as np
from scipy.optimize import minimize
import sympy as sp
from typing import Callable, Optional

class ExtendedKalmanFilter:
    """
    Extended Kalman filter implementation
    """

    def __init__(self, dim_x: int, dim_z: int,
                 f: Callable, h: Callable,
                 f_jacobian: Callable, h_jacobian: Callable):
        """
        Initialize Extended Kalman filter

        Parameters:
        dim_x: State dimension
        dim_z: Observation dimension
        f: State transition function
        h: Observation function
        f_jacobian: State transition Jacobian function
        h_jacobian: Observation Jacobian function
        """
        self.dim_x = dim_x
        self.dim_z = dim_z

        # Nonlinear functions
        self.f = f  # State transition function
        self.h = h  # Observation function
        self.f_jacobian = f_jacobian  # F Jacobian
        self.h_jacobian = h_jacobian  # H Jacobian

        # State and covariance
        self.x = np.zeros((dim_x, 1))
        self.P = np.eye(dim_x)

        # Noise covariances
        self.Q = np.eye(dim_x)
        self.R = np.eye(dim_z)

        # Store intermediate results
        self.x_prior = None
        self.P_prior = None
        self.F = None
        self.H = None
        self.K = None
        self.y = None
        self.S = None

    def predict(self, u: Optional[np.ndarray] = None):
        """
        Prediction step
        """
        # Compute Jacobian matrix
        self.F = self.f_jacobian(self.x, u)

        # Nonlinear state prediction
        self.x = self.f(self.x, u)

        # Covariance prediction
        self.P = self.F @ self.P @ self.F.T + self.Q

        # Save prior estimate
        self.x_prior = self.x.copy()
        self.P_prior = self.P.copy()

    def update(self, z: np.ndarray):
        """
        Update step
        """
        # Ensure z is a column vector
        if z.ndim == 1:
            z = z.reshape((-1, 1))

        # Compute observation Jacobian matrix
        self.H = self.h_jacobian(self.x)

        # Compute residual
        self.y = z - self.h(self.x)

        # Compute innovation covariance
        self.S = self.H @ self.P @ self.H.T + self.R

        # Compute Kalman gain
        self.K = self.P @ self.H.T @ np.linalg.inv(self.S)

        # State update
        self.x = self.x + self.K @ self.y

        # Covariance update (Joseph form)
        I_KH = np.eye(self.dim_x) - self.K @ self.H
        self.P = I_KH @ self.P @ I_KH.T + self.K @ self.R @ self.K.T

Automatic Jacobian Computation

Symbolic Differentiation Method

def symbolic_jacobian(func_expr, vars_expr):
    """
    Automatically generate Jacobian matrix using symbolic computation

    Parameters:
    func_expr: List of symbolic expressions for functions
    vars_expr: List of symbolic expressions for variables
    """
    jacobian_matrix = []
    for f in func_expr:
        row = []
        for var in vars_expr:
            row.append(sp.diff(f, var))
        jacobian_matrix.append(row)

    return sp.Matrix(jacobian_matrix)

# Example: Define symbolic variables and functions
def define_nonlinear_system():
    """
    Define symbolic expressions for nonlinear system
    """
    # State variables [x, y, vx, vy]
    x, y, vx, vy = sp.symbols('x y vx vy')
    dt = sp.Symbol('dt')

    # Nonlinear state transition (constant velocity)
    f_expr = [
        x + vx * dt,
        y + vy * dt,
        vx,
        vy
    ]

    # Nonlinear observation (range and bearing)
    h_expr = [
        sp.sqrt(x**2 + y**2),    # Range
        sp.atan2(y, x)           # Bearing
    ]

    # Compute Jacobian matrices
    states = [x, y, vx, vy]
    F_symbolic = symbolic_jacobian(f_expr, states)
    H_symbolic = symbolic_jacobian(h_expr, states)

    print("State transition Jacobian matrix F:")
    print(F_symbolic)
    print("\nObservation Jacobian matrix H:")
    print(H_symbolic)

    return F_symbolic, H_symbolic

# Generate numerical functions
def create_numerical_functions():
    """
    Convert symbolic Jacobians to numerical functions
    """
    F_sym, H_sym = define_nonlinear_system()

    # Convert to numpy functions
    F_func = sp.lambdify([sp.symbols('x y vx vy dt')], F_sym, 'numpy')
    H_func = sp.lambdify([sp.symbols('x y vx vy')], H_sym, 'numpy')

    return F_func, H_func

Numerical Differentiation Method

def numerical_jacobian(func, x, epsilon=1e-7):
    """
    Numerically compute Jacobian matrix
    """
    x = np.atleast_1d(x).astype(float)
    n = len(x)
    f0 = func(x)
    m = len(f0) if hasattr(f0, '__len__') else 1

    jacobian = np.zeros((m, n))

    for j in range(n):
        x_plus = x.copy()
        x_minus = x.copy()
        x_plus[j] += epsilon
        x_minus[j] -= epsilon

        f_plus = func(x_plus)
        f_minus = func(x_minus)

        jacobian[:, j] = (f_plus - f_minus) / (2 * epsilon)

    return jacobian

class AutoDiffEKF(ExtendedKalmanFilter):
    """
    EKF implementation with automatic differentiation
    """

    def __init__(self, dim_x, dim_z, f, h):
        # Automatically compute Jacobians using numerical differentiation
        def f_jac(x, u):
            return numerical_jacobian(lambda x: f(x, u), x.flatten()).reshape((dim_x, dim_x))

        def h_jac(x):
            return numerical_jacobian(lambda x: h(x), x.flatten()).reshape((dim_z, dim_x))

        super().__init__(dim_x, dim_z, f, h, f_jac, h_jac)

Concrete Application Examples

Example 1: Target Tracking with Polar Observations

def polar_tracking_example():
    """
    Target tracking example with polar observations
    """

    # State: [x, y, vx, vy] - position and velocity
    # Observation: [r, θ] - range and bearing

    def f(x, u, dt=1.0):
        """State transition function (constant velocity)"""
        F = np.array([
            [1, 0, dt, 0],
            [0, 1, 0, dt],
            [0, 0, 1, 0],
            [0, 0, 0, 1]
        ])
        return F @ x

    def h(x):
        """Observation function (polar coordinates)"""
        px, py = x[0, 0], x[1, 0]
        r = np.sqrt(px**2 + py**2)
        theta = np.arctan2(py, px)
        return np.array([[r], [theta]])

    def f_jacobian(x, u, dt=1.0):
        """State transition Jacobian"""
        return np.array([
            [1, 0, dt, 0],
            [0, 1, 0, dt],
            [0, 0, 1, 0],
            [0, 0, 0, 1]
        ])

    def h_jacobian(x):
        """Observation Jacobian"""
        px, py = x[0, 0], x[1, 0]
        r_sq = px**2 + py**2
        r = np.sqrt(r_sq)

        if r < 1e-6:  # Avoid division by zero
            r = 1e-6
            r_sq = r**2

        H = np.array([
            [px/r, py/r, 0, 0],
            [-py/r_sq, px/r_sq, 0, 0]
        ])
        return H

    # Create EKF
    ekf = ExtendedKalmanFilter(4, 2, f, h, f_jacobian, h_jacobian)

    # Initialize
    ekf.x = np.array([[0], [0], [1], [1]])  # Initial state
    ekf.P = 10 * np.eye(4)  # Initial covariance

    ekf.Q = np.array([  # Process noise
        [0.1, 0, 0, 0],
        [0, 0.1, 0, 0],
        [0, 0, 0.1, 0],
        [0, 0, 0, 0.1]
    ])

    ekf.R = np.array([  # Observation noise
        [0.5, 0],
        [0, 0.1]
    ])

    # Simulate tracking
    true_states = []
    observations = []
    estimates = []

    true_state = np.array([0, 0, 1, 1])  # [x, y, vx, vy]

    for k in range(50):
        # True state evolution
        true_state = f(true_state.reshape(-1, 1), None).flatten()
        true_states.append(true_state.copy())

        # Generate observation
        true_obs = h(true_state.reshape(-1, 1))
        noisy_obs = true_obs + np.array([[np.random.normal(0, 0.5)],
                                       [np.random.normal(0, 0.1)]])
        observations.append(noisy_obs.flatten())

        # EKF steps
        ekf.predict()
        ekf.update(noisy_obs)

        estimates.append(ekf.x.flatten())

    # Plot results
    import matplotlib.pyplot as plt

    true_states = np.array(true_states)
    estimates = np.array(estimates)

    plt.figure(figsize=(12, 8))

    plt.subplot(2, 2, 1)
    plt.plot(true_states[:, 0], true_states[:, 1], 'g-',
             label='True Trajectory', linewidth=2)
    plt.plot(estimates[:, 0], estimates[:, 1], 'b--',
             label='EKF Estimate', linewidth=2)
    plt.legend()
    plt.title('Trajectory Comparison')
    plt.xlabel('X Position')
    plt.ylabel('Y Position')
    plt.grid(True)

    plt.subplot(2, 2, 2)
    plt.plot(true_states[:, 0] - estimates[:, 0], 'r-', label='X Error')
    plt.plot(true_states[:, 1] - estimates[:, 1], 'b-', label='Y Error')
    plt.legend()
    plt.title('Position Estimation Error')
    plt.xlabel('Time Step')
    plt.ylabel('Error')
    plt.grid(True)

    plt.tight_layout()
    plt.show()

    return true_states, observations, estimates

# Run example
# polar_tracking_example()

Example 2: Nonlinear Volatility Model in Finance

def stochastic_volatility_ekf():
    """
    EKF estimation for stochastic volatility model
    """

    # State: [log_price, log_volatility]
    # Observation: [price]

    def f(x, u, dt=1.0):
        """
        State transition:
        log(S_t) = log(S_{t-1}) + μdt + σ_{t-1}dW_1
        log(σ_t²) = log(σ_{t-1}²) + κ(θ - log(σ_{t-1}²))dt + ξdW_2
        """
        log_price, log_vol = x[0, 0], x[1, 0]

        # Parameters
        mu = 0.05      # Expected return
        kappa = 0.1    # Mean reversion speed
        theta = -2.0   # Long-term variance log
        xi = 0.1       # Vol of vol

        # Simplified Euler discretization
        new_log_price = log_price + mu * dt
        new_log_vol = log_vol + kappa * (theta - log_vol) * dt

        return np.array([[new_log_price], [new_log_vol]])

    def h(x):
        """Observation function: price = exp(log_price)"""
        log_price = x[0, 0]
        return np.array([[np.exp(log_price)]])

    def f_jacobian(x, u, dt=1.0):
        """State transition Jacobian"""
        kappa = 0.1
        return np.array([
            [1, 0],
            [0, 1 - kappa * dt]
        ])

    def h_jacobian(x):
        """Observation Jacobian"""
        log_price = x[0, 0]
        return np.array([[np.exp(log_price), 0]])

    # Create EKF
    ekf = ExtendedKalmanFilter(2, 1, f, h, f_jacobian, h_jacobian)

    # Initialize
    ekf.x = np.array([[4.6], [-2.0]])  # log(100), log(0.135)
    ekf.P = np.array([[0.1, 0], [0, 0.5]])

    ekf.Q = np.array([[0.01, 0], [0, 0.02]])  # Process noise
    ekf.R = np.array([[1.0]])  # Observation noise

    # Simulate data and run filter
    prices = []
    log_vols = []
    estimated_log_vols = []

    for k in range(100):
        # Predict and update
        ekf.predict()

        # Simulate observation (using simulated price data here)
        true_price = 100 * np.exp(0.001 * k + 0.1 * np.random.normal())
        noisy_price = true_price + np.random.normal(0, 1)

        ekf.update(np.array([noisy_price]))

        prices.append(noisy_price)
        log_vols.append(ekf.x[1, 0])
        estimated_log_vols.append(ekf.x[1, 0])

    # Plot results
    plt.figure(figsize=(12, 6))

    plt.subplot(1, 2, 1)
    plt.plot(prices, 'b-', label='Observed Prices')
    plt.legend()
    plt.title('Price Series')
    plt.xlabel('Time')
    plt.ylabel('Price')

    plt.subplot(1, 2, 2)
    plt.plot(np.exp(estimated_log_vols), 'r-', label='Estimated Volatility')
    plt.legend()
    plt.title('Estimated Volatility')
    plt.xlabel('Time')
    plt.ylabel('Volatility')

    plt.tight_layout()
    plt.show()

    return prices, estimated_log_vols

# Run example
# stochastic_volatility_ekf()

EKF Limitations and Improvements

Main Limitations

EKF Issues
  1. Linearization error: First-order Taylor expansion may not be accurate enough
  2. Jacobian computation: Requires analytical derivatives or numerical differentiation
  3. Divergence risk: Strongly nonlinear systems prone to divergence
  4. Local optimum: May get stuck in local minima

Improvement Methods

1. Iterated Extended Kalman Filter (IEKF):

class IteratedEKF(ExtendedKalmanFilter):
    """
    Iterated Extended Kalman Filter
    """

    def update(self, z, max_iter=5, tolerance=1e-6):
        """Iterated update"""
        x_prev = self.x.copy()

        for i in range(max_iter):
            # Standard EKF update
            super().update(z)

            # Check convergence
            if np.linalg.norm(self.x - x_prev) < tolerance:
                break

            x_prev = self.x.copy()

2. Adaptive EKF:

def adaptive_noise_estimation(ekf, window_size=10):
    """
    Adaptive noise estimation
    """
    # Estimate noise based on innovation sequence
    if hasattr(ekf, 'innovation_history'):
        if len(ekf.innovation_history) >= window_size:
            recent_innovations = ekf.innovation_history[-window_size:]
            estimated_R = np.cov(recent_innovations, rowvar=False)
            ekf.R = estimated_R

Performance Evaluation

Evaluation Metrics

def ekf_performance_metrics(true_states, estimates):
    """
    Compute EKF performance metrics
    """
    errors = true_states - estimates

    # Root mean square error
    rmse = np.sqrt(np.mean(errors**2, axis=0))

    # Mean absolute error
    mae = np.mean(np.abs(errors), axis=0)

    # Normalized estimation error squared
    nees = np.mean([err.T @ np.linalg.inv(P) @ err
                   for err, P in zip(errors, covariances)])

    return {
        'RMSE': rmse,
        'MAE': mae,
        'NEES': nees
    }
Preview of Next Chapter

In the next chapter, we will learn about the Unscented Kalman Filter (UKF), another important method for handling nonlinear systems that avoids the complexity of Jacobian computation through sigma point sampling.