Chapter 4: Linear Kalman Filter Algorithm Implementation

Haiyue
16min

Chapter 4: Linear Kalman Filter Algorithm Implementation

Learning Objectives
  • Master the five core equations of the standard Kalman filter algorithm
  • Understand covariance matrix propagation and update
  • Learn numerical implementation and programming techniques

Five Core Equations

The Kalman filter algorithm consists of two main phases with five core equations:

Prediction Phase (Time Update)

Equation 1 - State Prediction: x^kk1=Fkx^k1k1+Bkuk\hat{x}_{k|k-1} = F_k \hat{x}_{k-1|k-1} + B_k u_k

Equation 2 - Covariance Prediction: Pkk1=FkPk1k1FkT+QkP_{k|k-1} = F_k P_{k-1|k-1} F_k^T + Q_k

Update Phase (Measurement Update)

Equation 3 - Kalman Gain: Kk=Pkk1HkT(HkPkk1HkT+Rk)1K_k = P_{k|k-1} H_k^T (H_k P_{k|k-1} H_k^T + R_k)^{-1}

Equation 4 - State Update: x^kk=x^kk1+Kk(zkHkx^kk1)\hat{x}_{k|k} = \hat{x}_{k|k-1} + K_k (z_k - H_k \hat{x}_{k|k-1})

Equation 5 - Covariance Update: Pkk=(IKkHk)Pkk1P_{k|k} = (I - K_k H_k) P_{k|k-1}

Algorithm Flow Chart

🔄 正在渲染 Mermaid 图表...

Complete Algorithm Implementation

Basic Kalman Filter Class

import numpy as np
from typing import Optional, Tuple
import matplotlib.pyplot as plt

class KalmanFilter:
    """
    Linear Kalman filter implementation
    """

    def __init__(self, dim_x: int, dim_z: int, dim_u: int = 0):
        """
        Initialize Kalman filter

        Parameters:
        dim_x: State vector dimension
        dim_z: Observation vector dimension
        dim_u: Control input dimension
        """
        self.dim_x = dim_x
        self.dim_z = dim_z
        self.dim_u = dim_u

        # State vector and covariance matrix
        self.x = np.zeros((dim_x, 1))  # State estimate
        self.P = np.eye(dim_x)         # Covariance matrix

        # System matrices
        self.F = np.eye(dim_x)         # State transition matrix
        self.H = np.zeros((dim_z, dim_x))  # Observation matrix
        self.Q = np.eye(dim_x)         # Process noise covariance
        self.R = np.eye(dim_z)         # Observation noise covariance

        if dim_u > 0:
            self.B = np.zeros((dim_x, dim_u))  # Control input matrix
        else:
            self.B = None

        # Auxiliary variables
        self.x_prior = self.x.copy()   # Prior state estimate
        self.P_prior = self.P.copy()   # Prior covariance
        self.K = np.zeros((dim_x, dim_z))  # Kalman gain
        self.y = np.zeros((dim_z, 1))  # Residual (innovation)
        self.S = np.zeros((dim_z, dim_z))  # Innovation covariance

        # Likelihood and log-likelihood
        self.log_likelihood = 0.0
        self.likelihood = 0.0

    def predict(self, u: Optional[np.ndarray] = None,
                B: Optional[np.ndarray] = None,
                F: Optional[np.ndarray] = None,
                Q: Optional[np.ndarray] = None):
        """
        Prediction step (time update)

        Parameters:
        u: Control input
        B: Control input matrix (optional, overrides self.B)
        F: State transition matrix (optional, overrides self.F)
        Q: Process noise covariance (optional, overrides self.Q)
        """

        if B is not None:
            self.B = B
        if F is not None:
            self.F = F
        if Q is not None:
            self.Q = Q

        # Equation 1: State prediction
        if self.B is not None and u is not None:
            self.x = self.F @ self.x + self.B @ u
        else:
            self.x = self.F @ self.x

        # Equation 2: Covariance prediction
        self.P = self.F @ self.P @ self.F.T + self.Q

        # Save prior estimate
        self.x_prior = self.x.copy()
        self.P_prior = self.P.copy()

    def update(self, z: np.ndarray,
               R: Optional[np.ndarray] = None,
               H: Optional[np.ndarray] = None):
        """
        Update step (measurement update)

        Parameters:
        z: Observation vector
        R: Observation noise covariance (optional, overrides self.R)
        H: Observation matrix (optional, overrides self.H)
        """

        if R is not None:
            self.R = R
        if H is not None:
            self.H = H

        # Ensure z is a column vector
        if z.ndim == 1:
            z = z.reshape((-1, 1))

        # Compute residual (innovation)
        self.y = z - self.H @ self.x

        # Compute innovation covariance
        self.S = self.H @ self.P @ self.H.T + self.R

        # Equation 3: Compute Kalman gain
        self.K = self.P @ self.H.T @ np.linalg.inv(self.S)

        # Equation 4: State update
        self.x = self.x + self.K @ self.y

        # Equation 5: Covariance update (Joseph form, numerically stable)
        I_KH = np.eye(self.dim_x) - self.K @ self.H
        self.P = I_KH @ self.P @ I_KH.T + self.K @ self.R @ self.K.T

        # Compute likelihood
        self._update_likelihood()

    def _update_likelihood(self):
        """Compute likelihood of current observation"""
        det_S = np.linalg.det(self.S)
        if det_S <= 0:
            self.log_likelihood = -np.inf
            self.likelihood = 0.0
        else:
            self.log_likelihood = (-0.5 *
                                 (self.y.T @ np.linalg.inv(self.S) @ self.y +
                                  np.log(det_S) +
                                  self.dim_z * np.log(2 * np.pi))).item()
            self.likelihood = np.exp(self.log_likelihood)

Numerically Stable Forms of Covariance Update

Kalman filtering may face numerical instability issues in practical applications, especially in covariance matrix updates:

def joseph_form_update(self):
    """
    Joseph form covariance update, ensures numerical stability and positive definiteness

    Standard form: P = (I - KH)P
    Joseph form: P = (I - KH)P(I - KH)ᵀ + KRKᵀ
    """
    I_KH = np.eye(self.dim_x) - self.K @ self.H
    self.P = I_KH @ self.P @ I_KH.T + self.K @ self.R @ self.K.T

def square_root_update(self):
    """
    Square root form update, ensures positive definiteness of covariance matrix
    """
    # Use Cholesky decomposition
    try:
        # P = LLᵀ form
        L = np.linalg.cholesky(self.P)
        # Update square root factor
        # Specific square root update algorithm omitted here
        pass
    except np.linalg.LinAlgError:
        # If Cholesky decomposition fails, use regularization
        self.P += 1e-9 * np.eye(self.dim_x)

1D Example: Tracking Constant Velocity Motion

def track_1d_constant_velocity():
    """
    1D constant velocity motion tracking example
    """

    # System parameters
    dt = 1.0  # Time step

    # Create Kalman filter: state [position, velocity]
    kf = KalmanFilter(dim_x=2, dim_z=1)

    # State transition matrix (constant velocity model)
    kf.F = np.array([[1, dt],
                     [0, 1]])

    # Observation matrix (observe position only)
    kf.H = np.array([[1, 0]])

    # Process noise covariance
    q = 0.1  # Process noise intensity
    kf.Q = q * np.array([[dt**4/4, dt**3/2],
                         [dt**3/2, dt**2]])

    # Observation noise covariance
    kf.R = np.array([[1.0]])  # Observation noise variance

    # Initial state and covariance
    kf.x = np.array([[0], [1]])  # Initial position 0, velocity 1
    kf.P = np.array([[1000, 0],
                     [0, 1000]])  # Large initial uncertainty

    # Simulate true motion
    true_positions = []
    observations = []
    estimates = []

    true_pos = 0
    true_vel = 1

    for k in range(50):
        # True motion
        true_pos += true_vel * dt
        true_positions.append(true_pos)

        # Generate noisy observation
        obs = true_pos + np.random.normal(0, 1)
        observations.append(obs)

        # Kalman filtering
        kf.predict()
        kf.update(np.array([obs]))

        estimates.append(kf.x[0, 0])

    # Plot results
    plt.figure(figsize=(12, 8))

    plt.subplot(2, 1, 1)
    plt.plot(true_positions, 'g-', label='True Position', linewidth=2)
    plt.plot(observations, 'r.', label='Observations', alpha=0.7)
    plt.plot(estimates, 'b-', label='Kalman Estimate', linewidth=2)
    plt.legend()
    plt.title('1D Constant Velocity Motion Tracking')
    plt.ylabel('Position')

    plt.subplot(2, 1, 2)
    errors = np.array(estimates) - np.array(true_positions)
    plt.plot(errors, 'r-', label='Estimation Error')
    plt.axhline(y=0, color='k', linestyle='--', alpha=0.5)
    plt.legend()
    plt.xlabel('Time Step')
    plt.ylabel('Error')

    plt.tight_layout()
    plt.show()

    return true_positions, observations, estimates

# Run example
# track_1d_constant_velocity()

2D Example: Tracking Constant Acceleration Motion

def track_2d_constant_acceleration():
    """
    2D constant acceleration motion tracking example
    """

    dt = 0.1  # Time step

    # State vector: [x, vx, ax, y, vy, ay]
    kf = KalmanFilter(dim_x=6, dim_z=2)

    # State transition matrix
    kf.F = np.array([
        [1, dt, dt**2/2, 0, 0, 0],
        [0, 1, dt, 0, 0, 0],
        [0, 0, 1, 0, 0, 0],
        [0, 0, 0, 1, dt, dt**2/2],
        [0, 0, 0, 0, 1, dt],
        [0, 0, 0, 0, 0, 1]
    ])

    # Observation matrix (observe position)
    kf.H = np.array([
        [1, 0, 0, 0, 0, 0],
        [0, 0, 0, 1, 0, 0]
    ])

    # Process noise covariance (add noise only to acceleration)
    q = 0.1
    kf.Q = np.zeros((6, 6))
    kf.Q[2, 2] = q  # x-direction acceleration noise
    kf.Q[5, 5] = q  # y-direction acceleration noise

    # Observation noise covariance
    kf.R = np.array([[0.1, 0],
                     [0, 0.1]])

    # Initialization
    kf.x = np.array([[0], [1], [0.1], [0], [1], [0.1]])  # Initial state
    kf.P = 100 * np.eye(6)  # Initial covariance

    # Simulation and filtering
    positions = []
    estimates = []

    for k in range(100):
        # Predict and update
        kf.predict()

        # Simulate observation (assuming we have true observations)
        true_x = k * dt + 0.5 * 0.1 * (k * dt)**2
        true_y = k * dt + 0.5 * 0.1 * (k * dt)**2
        obs = np.array([true_x + np.random.normal(0, 0.1),
                       true_y + np.random.normal(0, 0.1)])

        kf.update(obs)

        positions.append([true_x, true_y])
        estimates.append([kf.x[0, 0], kf.x[3, 0]])

    # Plot trajectory
    positions = np.array(positions)
    estimates = np.array(estimates)

    plt.figure(figsize=(10, 8))
    plt.plot(positions[:, 0], positions[:, 1], 'g-',
             label='True Trajectory', linewidth=2)
    plt.plot(estimates[:, 0], estimates[:, 1], 'b-',
             label='Kalman Estimate', linewidth=2)
    plt.legend()
    plt.title('2D Constant Acceleration Motion Tracking')
    plt.xlabel('X Position')
    plt.ylabel('Y Position')
    plt.grid(True)
    plt.axis('equal')
    plt.show()

# Run example
# track_2d_constant_acceleration()

Algorithm Performance Analysis

Computational Complexity

Time complexity analysis of Kalman filtering:

OperationComplexityDescription
State PredictionO(n2)O(n^2)Matrix multiplication FxF \cdot x
Covariance PredictionO(n3)O(n^3)Matrix multiplication FPFTF \cdot P \cdot F^T
Kalman GainO(m3+nm2)O(m^3 + nm^2)Matrix inversion and multiplication
State UpdateO(nm)O(nm)Matrix multiplication
Covariance UpdateO(n2m+n3)O(n^2m + n^3)Joseph form

Total complexity: O(n3+m3+n2m)O(n^3 + m^3 + n^2m), where nn is state dimension and mm is observation dimension.

Numerical Stability Techniques

class StableKalmanFilter(KalmanFilter):
    """
    Numerically stable Kalman filter implementation
    """

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.regularization = 1e-12

    def _regularize_covariance(self):
        """Regularize covariance matrix to ensure positive definiteness"""
        eigenvals = np.linalg.eigvals(self.P)
        if np.any(eigenvals <= 0):
            self.P += self.regularization * np.eye(self.dim_x)

    def _check_condition_number(self):
        """Check condition number, handle if too large"""
        cond_P = np.linalg.cond(self.P)
        cond_S = np.linalg.cond(self.S)

        if cond_P > 1e12 or cond_S > 1e12:
            print(f"Warning: Condition number too large P:{cond_P:.2e}, S:{cond_S:.2e}")
            self._regularize_covariance()

    def update(self, z, R=None, H=None):
        """Numerically stable update"""
        super().update(z, R, H)
        self._regularize_covariance()
        self._check_condition_number()

Parameter Tuning Guide

Noise Covariance Matrix Settings

Parameter Tuning Principles
  • Process noise Q: Reflects model uncertainty; Q too small leads to overconfident filter, Q too large leads to unstable estimates
  • Observation noise R: Reflects measurement uncertainty; should be set based on sensor specifications
  • Initial covariance P₀: Reflects initial estimate uncertainty; typically set to large values
def parameter_tuning_example():
    """
    Parameter tuning example
    """

    # Comparison of different Q values
    q_values = [0.01, 0.1, 1.0, 10.0]

    for q in q_values:
        kf = KalmanFilter(dim_x=2, dim_z=1)

        # Set system matrices
        dt = 1.0
        kf.F = np.array([[1, dt], [0, 1]])
        kf.H = np.array([[1, 0]])
        kf.Q = q * np.array([[dt**4/4, dt**3/2],
                             [dt**3/2, dt**2]])
        kf.R = np.array([[1.0]])

        # Run filter and analyze performance
        # ... Specific implementation omitted

        print(f"Q = {q}: Performance metrics...")

# Run parameter tuning
# parameter_tuning_example()

Common Issues and Solutions

1. Filter Divergence

Symptoms: Estimation error grows over time Causes:

  • Q matrix set too small
  • Model mismatch
  • Numerical instability

Solution:

def divergence_detection(kf, threshold=1e6):
    """Detect filter divergence"""
    trace_P = np.trace(kf.P)
    if trace_P > threshold:
        print(f"Warning: Covariance trace too large {trace_P:.2e}")
        return True
    return False

2. Non-Positive Definite Covariance Matrix

Solution:

def ensure_positive_definite(P, min_eigenval=1e-12):
    """Ensure matrix is positive definite"""
    eigenvals, eigenvecs = np.linalg.eigh(P)
    eigenvals = np.maximum(eigenvals, min_eigenval)
    return eigenvecs @ np.diag(eigenvals) @ eigenvecs.T
Preview of Next Chapter

In the next chapter, we will learn about the Extended Kalman Filter (EKF) and how to handle state estimation problems for nonlinear systems.