Chapter 5: Extended Kalman Filter (EKF)
Chapter 5: Extended Kalman Filter (EKF)
- Understand the challenges of nonlinear systems and solution approaches
- Master Jacobian matrix calculation and linearization methods
- Learn EKF algorithm implementation and application scenarios
Challenges of Nonlinear Systems
Standard Kalman filtering applies to linear Gaussian systems, but many real-world systems are nonlinear. The Extended Kalman Filter (EKF) handles nonlinear systems through local linearization.
Nonlinear State-Space Model
Nonlinear state transition:
Nonlinear observation equation:
Where:
- : Nonlinear state transition function
- : Nonlinear observation function
- : Process noise
- : Observation noise
Linearization Principle
The core idea of EKF is to perform Taylor expansion around the current estimate point and retain only the first-order terms for linearization.
Jacobian Matrix
State transition Jacobian matrix:
Observation Jacobian matrix:
Linearization Process
EKF Algorithm Steps
Prediction Step
State prediction:
Covariance prediction:
where is the Jacobian matrix computed at .
Update Step
Kalman gain:
State update:
Covariance update:
where is the Jacobian matrix computed at .
EKF Implementation
import numpy as np
from scipy.optimize import minimize
import sympy as sp
from typing import Callable, Optional
class ExtendedKalmanFilter:
"""
Extended Kalman filter implementation
"""
def __init__(self, dim_x: int, dim_z: int,
f: Callable, h: Callable,
f_jacobian: Callable, h_jacobian: Callable):
"""
Initialize Extended Kalman filter
Parameters:
dim_x: State dimension
dim_z: Observation dimension
f: State transition function
h: Observation function
f_jacobian: State transition Jacobian function
h_jacobian: Observation Jacobian function
"""
self.dim_x = dim_x
self.dim_z = dim_z
# Nonlinear functions
self.f = f # State transition function
self.h = h # Observation function
self.f_jacobian = f_jacobian # F Jacobian
self.h_jacobian = h_jacobian # H Jacobian
# State and covariance
self.x = np.zeros((dim_x, 1))
self.P = np.eye(dim_x)
# Noise covariances
self.Q = np.eye(dim_x)
self.R = np.eye(dim_z)
# Store intermediate results
self.x_prior = None
self.P_prior = None
self.F = None
self.H = None
self.K = None
self.y = None
self.S = None
def predict(self, u: Optional[np.ndarray] = None):
"""
Prediction step
"""
# Compute Jacobian matrix
self.F = self.f_jacobian(self.x, u)
# Nonlinear state prediction
self.x = self.f(self.x, u)
# Covariance prediction
self.P = self.F @ self.P @ self.F.T + self.Q
# Save prior estimate
self.x_prior = self.x.copy()
self.P_prior = self.P.copy()
def update(self, z: np.ndarray):
"""
Update step
"""
# Ensure z is a column vector
if z.ndim == 1:
z = z.reshape((-1, 1))
# Compute observation Jacobian matrix
self.H = self.h_jacobian(self.x)
# Compute residual
self.y = z - self.h(self.x)
# Compute innovation covariance
self.S = self.H @ self.P @ self.H.T + self.R
# Compute Kalman gain
self.K = self.P @ self.H.T @ np.linalg.inv(self.S)
# State update
self.x = self.x + self.K @ self.y
# Covariance update (Joseph form)
I_KH = np.eye(self.dim_x) - self.K @ self.H
self.P = I_KH @ self.P @ I_KH.T + self.K @ self.R @ self.K.T
Automatic Jacobian Computation
Symbolic Differentiation Method
def symbolic_jacobian(func_expr, vars_expr):
"""
Automatically generate Jacobian matrix using symbolic computation
Parameters:
func_expr: List of symbolic expressions for functions
vars_expr: List of symbolic expressions for variables
"""
jacobian_matrix = []
for f in func_expr:
row = []
for var in vars_expr:
row.append(sp.diff(f, var))
jacobian_matrix.append(row)
return sp.Matrix(jacobian_matrix)
# Example: Define symbolic variables and functions
def define_nonlinear_system():
"""
Define symbolic expressions for nonlinear system
"""
# State variables [x, y, vx, vy]
x, y, vx, vy = sp.symbols('x y vx vy')
dt = sp.Symbol('dt')
# Nonlinear state transition (constant velocity)
f_expr = [
x + vx * dt,
y + vy * dt,
vx,
vy
]
# Nonlinear observation (range and bearing)
h_expr = [
sp.sqrt(x**2 + y**2), # Range
sp.atan2(y, x) # Bearing
]
# Compute Jacobian matrices
states = [x, y, vx, vy]
F_symbolic = symbolic_jacobian(f_expr, states)
H_symbolic = symbolic_jacobian(h_expr, states)
print("State transition Jacobian matrix F:")
print(F_symbolic)
print("\nObservation Jacobian matrix H:")
print(H_symbolic)
return F_symbolic, H_symbolic
# Generate numerical functions
def create_numerical_functions():
"""
Convert symbolic Jacobians to numerical functions
"""
F_sym, H_sym = define_nonlinear_system()
# Convert to numpy functions
F_func = sp.lambdify([sp.symbols('x y vx vy dt')], F_sym, 'numpy')
H_func = sp.lambdify([sp.symbols('x y vx vy')], H_sym, 'numpy')
return F_func, H_func
Numerical Differentiation Method
def numerical_jacobian(func, x, epsilon=1e-7):
"""
Numerically compute Jacobian matrix
"""
x = np.atleast_1d(x).astype(float)
n = len(x)
f0 = func(x)
m = len(f0) if hasattr(f0, '__len__') else 1
jacobian = np.zeros((m, n))
for j in range(n):
x_plus = x.copy()
x_minus = x.copy()
x_plus[j] += epsilon
x_minus[j] -= epsilon
f_plus = func(x_plus)
f_minus = func(x_minus)
jacobian[:, j] = (f_plus - f_minus) / (2 * epsilon)
return jacobian
class AutoDiffEKF(ExtendedKalmanFilter):
"""
EKF implementation with automatic differentiation
"""
def __init__(self, dim_x, dim_z, f, h):
# Automatically compute Jacobians using numerical differentiation
def f_jac(x, u):
return numerical_jacobian(lambda x: f(x, u), x.flatten()).reshape((dim_x, dim_x))
def h_jac(x):
return numerical_jacobian(lambda x: h(x), x.flatten()).reshape((dim_z, dim_x))
super().__init__(dim_x, dim_z, f, h, f_jac, h_jac)
Concrete Application Examples
Example 1: Target Tracking with Polar Observations
def polar_tracking_example():
"""
Target tracking example with polar observations
"""
# State: [x, y, vx, vy] - position and velocity
# Observation: [r, θ] - range and bearing
def f(x, u, dt=1.0):
"""State transition function (constant velocity)"""
F = np.array([
[1, 0, dt, 0],
[0, 1, 0, dt],
[0, 0, 1, 0],
[0, 0, 0, 1]
])
return F @ x
def h(x):
"""Observation function (polar coordinates)"""
px, py = x[0, 0], x[1, 0]
r = np.sqrt(px**2 + py**2)
theta = np.arctan2(py, px)
return np.array([[r], [theta]])
def f_jacobian(x, u, dt=1.0):
"""State transition Jacobian"""
return np.array([
[1, 0, dt, 0],
[0, 1, 0, dt],
[0, 0, 1, 0],
[0, 0, 0, 1]
])
def h_jacobian(x):
"""Observation Jacobian"""
px, py = x[0, 0], x[1, 0]
r_sq = px**2 + py**2
r = np.sqrt(r_sq)
if r < 1e-6: # Avoid division by zero
r = 1e-6
r_sq = r**2
H = np.array([
[px/r, py/r, 0, 0],
[-py/r_sq, px/r_sq, 0, 0]
])
return H
# Create EKF
ekf = ExtendedKalmanFilter(4, 2, f, h, f_jacobian, h_jacobian)
# Initialize
ekf.x = np.array([[0], [0], [1], [1]]) # Initial state
ekf.P = 10 * np.eye(4) # Initial covariance
ekf.Q = np.array([ # Process noise
[0.1, 0, 0, 0],
[0, 0.1, 0, 0],
[0, 0, 0.1, 0],
[0, 0, 0, 0.1]
])
ekf.R = np.array([ # Observation noise
[0.5, 0],
[0, 0.1]
])
# Simulate tracking
true_states = []
observations = []
estimates = []
true_state = np.array([0, 0, 1, 1]) # [x, y, vx, vy]
for k in range(50):
# True state evolution
true_state = f(true_state.reshape(-1, 1), None).flatten()
true_states.append(true_state.copy())
# Generate observation
true_obs = h(true_state.reshape(-1, 1))
noisy_obs = true_obs + np.array([[np.random.normal(0, 0.5)],
[np.random.normal(0, 0.1)]])
observations.append(noisy_obs.flatten())
# EKF steps
ekf.predict()
ekf.update(noisy_obs)
estimates.append(ekf.x.flatten())
# Plot results
import matplotlib.pyplot as plt
true_states = np.array(true_states)
estimates = np.array(estimates)
plt.figure(figsize=(12, 8))
plt.subplot(2, 2, 1)
plt.plot(true_states[:, 0], true_states[:, 1], 'g-',
label='True Trajectory', linewidth=2)
plt.plot(estimates[:, 0], estimates[:, 1], 'b--',
label='EKF Estimate', linewidth=2)
plt.legend()
plt.title('Trajectory Comparison')
plt.xlabel('X Position')
plt.ylabel('Y Position')
plt.grid(True)
plt.subplot(2, 2, 2)
plt.plot(true_states[:, 0] - estimates[:, 0], 'r-', label='X Error')
plt.plot(true_states[:, 1] - estimates[:, 1], 'b-', label='Y Error')
plt.legend()
plt.title('Position Estimation Error')
plt.xlabel('Time Step')
plt.ylabel('Error')
plt.grid(True)
plt.tight_layout()
plt.show()
return true_states, observations, estimates
# Run example
# polar_tracking_example()
Example 2: Nonlinear Volatility Model in Finance
def stochastic_volatility_ekf():
"""
EKF estimation for stochastic volatility model
"""
# State: [log_price, log_volatility]
# Observation: [price]
def f(x, u, dt=1.0):
"""
State transition:
log(S_t) = log(S_{t-1}) + μdt + σ_{t-1}dW_1
log(σ_t²) = log(σ_{t-1}²) + κ(θ - log(σ_{t-1}²))dt + ξdW_2
"""
log_price, log_vol = x[0, 0], x[1, 0]
# Parameters
mu = 0.05 # Expected return
kappa = 0.1 # Mean reversion speed
theta = -2.0 # Long-term variance log
xi = 0.1 # Vol of vol
# Simplified Euler discretization
new_log_price = log_price + mu * dt
new_log_vol = log_vol + kappa * (theta - log_vol) * dt
return np.array([[new_log_price], [new_log_vol]])
def h(x):
"""Observation function: price = exp(log_price)"""
log_price = x[0, 0]
return np.array([[np.exp(log_price)]])
def f_jacobian(x, u, dt=1.0):
"""State transition Jacobian"""
kappa = 0.1
return np.array([
[1, 0],
[0, 1 - kappa * dt]
])
def h_jacobian(x):
"""Observation Jacobian"""
log_price = x[0, 0]
return np.array([[np.exp(log_price), 0]])
# Create EKF
ekf = ExtendedKalmanFilter(2, 1, f, h, f_jacobian, h_jacobian)
# Initialize
ekf.x = np.array([[4.6], [-2.0]]) # log(100), log(0.135)
ekf.P = np.array([[0.1, 0], [0, 0.5]])
ekf.Q = np.array([[0.01, 0], [0, 0.02]]) # Process noise
ekf.R = np.array([[1.0]]) # Observation noise
# Simulate data and run filter
prices = []
log_vols = []
estimated_log_vols = []
for k in range(100):
# Predict and update
ekf.predict()
# Simulate observation (using simulated price data here)
true_price = 100 * np.exp(0.001 * k + 0.1 * np.random.normal())
noisy_price = true_price + np.random.normal(0, 1)
ekf.update(np.array([noisy_price]))
prices.append(noisy_price)
log_vols.append(ekf.x[1, 0])
estimated_log_vols.append(ekf.x[1, 0])
# Plot results
plt.figure(figsize=(12, 6))
plt.subplot(1, 2, 1)
plt.plot(prices, 'b-', label='Observed Prices')
plt.legend()
plt.title('Price Series')
plt.xlabel('Time')
plt.ylabel('Price')
plt.subplot(1, 2, 2)
plt.plot(np.exp(estimated_log_vols), 'r-', label='Estimated Volatility')
plt.legend()
plt.title('Estimated Volatility')
plt.xlabel('Time')
plt.ylabel('Volatility')
plt.tight_layout()
plt.show()
return prices, estimated_log_vols
# Run example
# stochastic_volatility_ekf()
EKF Limitations and Improvements
Main Limitations
- Linearization error: First-order Taylor expansion may not be accurate enough
- Jacobian computation: Requires analytical derivatives or numerical differentiation
- Divergence risk: Strongly nonlinear systems prone to divergence
- Local optimum: May get stuck in local minima
Improvement Methods
1. Iterated Extended Kalman Filter (IEKF):
class IteratedEKF(ExtendedKalmanFilter):
"""
Iterated Extended Kalman Filter
"""
def update(self, z, max_iter=5, tolerance=1e-6):
"""Iterated update"""
x_prev = self.x.copy()
for i in range(max_iter):
# Standard EKF update
super().update(z)
# Check convergence
if np.linalg.norm(self.x - x_prev) < tolerance:
break
x_prev = self.x.copy()
2. Adaptive EKF:
def adaptive_noise_estimation(ekf, window_size=10):
"""
Adaptive noise estimation
"""
# Estimate noise based on innovation sequence
if hasattr(ekf, 'innovation_history'):
if len(ekf.innovation_history) >= window_size:
recent_innovations = ekf.innovation_history[-window_size:]
estimated_R = np.cov(recent_innovations, rowvar=False)
ekf.R = estimated_R
Performance Evaluation
Evaluation Metrics
def ekf_performance_metrics(true_states, estimates):
"""
Compute EKF performance metrics
"""
errors = true_states - estimates
# Root mean square error
rmse = np.sqrt(np.mean(errors**2, axis=0))
# Mean absolute error
mae = np.mean(np.abs(errors), axis=0)
# Normalized estimation error squared
nees = np.mean([err.T @ np.linalg.inv(P) @ err
for err, P in zip(errors, covariances)])
return {
'RMSE': rmse,
'MAE': mae,
'NEES': nees
}
In the next chapter, we will learn about the Unscented Kalman Filter (UKF), another important method for handling nonlinear systems that avoids the complexity of Jacobian computation through sigma point sampling.