Chapter 10: Limitations and Improvements of Kalman Filtering

Haiyue
37min

Chapter 10: Limitations and Improvements of Kalman Filtering

Learning Objectives
  • Understand the assumptions and limitations of Kalman filtering
  • Master robust Kalman filtering and adaptive filtering methods
  • Learn about detection and handling of filter divergence

Basic Assumptions and Limitations of Kalman Filtering

Core Assumptions

Standard Kalman filtering is based on the following key assumptions:

Kalman Filter Assumptions
  1. Linearity: System dynamics and observations are linear
  2. Gaussianity: All noise follows Gaussian distributions
  3. White Noise: Noise is uncorrelated white noise process
  4. Known Model: System matrices F, H are completely known
  5. Known Covariance: Noise covariances Q, R are known and constant
  6. Perfect Initialization: Initial state estimate and covariance are correct

Real-World Challenges

import numpy as np
import matplotlib.pyplot as plt
from scipy import stats
import warnings
warnings.filterwarnings('ignore')

class LimitationAnalyzer:
    """
    Kalman filter limitation analysis tool
    """

    def __init__(self):
        self.scenarios = {}

    def demonstrate_nonlinearity_issue(self):
        """
        Demonstrate impact of nonlinearity
        """
        # Highly nonlinear system
        def true_nonlinear_function(x):
            return x + 0.5 * np.sin(5 * x)

        def linear_approximation(x, x0):
            # Linearization at point x0
            return x0 + (x - x0) * (1 + 2.5 * np.cos(5 * x0))

        # Generate data
        x_true = np.linspace(-2, 2, 100)
        y_true = [true_nonlinear_function(x) for x in x_true]

        # Linearization at different points
        linearization_points = [-1, 0, 1]

        plt.figure(figsize=(12, 8))
        plt.plot(x_true, y_true, 'b-', linewidth=3, label='True Nonlinear Function')

        for i, x0 in enumerate(linearization_points):
            y_linear = [linear_approximation(x, x0) for x in x_true]
            plt.plot(x_true, y_linear, '--', linewidth=2,
                    label=f'Linearized at x={x0}')
            plt.plot(x0, true_nonlinear_function(x0), 'ro', markersize=8)

        plt.xlabel('State x')
        plt.ylabel('Output y')
        plt.title('Linearization Error Demonstration')
        plt.legend()
        plt.grid(True, alpha=0.3)
        plt.show()

    def demonstrate_non_gaussian_noise(self):
        """
        Demonstrate impact of non-Gaussian noise
        """
        np.random.seed(42)
        n_samples = 1000

        # Different types of noise
        gaussian_noise = np.random.normal(0, 1, n_samples)
        heavy_tail_noise = stats.t.rvs(df=3, size=n_samples)  # t-distribution (heavy tail)
        skewed_noise = stats.skewnorm.rvs(a=5, size=n_samples)  # Skewed distribution
        bimodal_noise = np.concatenate([
            np.random.normal(-2, 0.5, n_samples//2),
            np.random.normal(2, 0.5, n_samples//2)
        ])

        noises = {
            'Gaussian Noise': gaussian_noise,
            'Heavy-Tail Noise (t-dist)': heavy_tail_noise,
            'Skewed Noise': skewed_noise,
            'Bimodal Noise': bimodal_noise
        }

        fig, axes = plt.subplots(2, 2, figsize=(15, 10))
        axes = axes.flatten()

        for i, (name, noise) in enumerate(noises.items()):
            axes[i].hist(noise, bins=50, density=True, alpha=0.7,
                        color=['blue', 'red', 'green', 'orange'][i])
            axes[i].set_title(f'{name}')
            axes[i].set_xlabel('Noise Value')
            axes[i].set_ylabel('Density')
            axes[i].grid(True, alpha=0.3)

            # Add statistical information
            mean_val = np.mean(noise)
            std_val = np.std(noise)
            skew_val = stats.skew(noise)
            kurt_val = stats.kurtosis(noise)

            stats_text = f'Mean: {mean_val:.2f}\nStd: {std_val:.2f}\nSkew: {skew_val:.2f}\nKurt: {kurt_val:.2f}'
            axes[i].text(0.05, 0.95, stats_text, transform=axes[i].transAxes,
                        verticalalignment='top', bbox=dict(boxstyle='round', facecolor='white', alpha=0.8))

        plt.tight_layout()
        plt.show()

    def demonstrate_model_mismatch(self):
        """
        Demonstrate impact of model mismatch
        """
        # True system: nonlinear growth
        def true_system(x, t):
            return x * (1 + 0.01 * np.sin(0.1 * t))

        # Assumed linear model
        def assumed_model(x):
            return 1.01 * x  # Fixed growth rate

        # Simulate data
        T = 100
        true_states = [100]  # Initial value
        observations = []

        for t in range(1, T):
            # True state evolution
            new_state = true_system(true_states[-1], t) + np.random.normal(0, 1)
            true_states.append(new_state)

            # Observation
            obs = new_state + np.random.normal(0, 2)
            observations.append(obs)

        # Kalman filter with incorrect model
        class MismatchedKF:
            def __init__(self):
                self.x = np.array([[100.0]])
                self.P = np.array([[10.0]])
                self.F = np.array([[1.01]])  # Incorrect model
                self.H = np.array([[1.0]])
                self.Q = np.array([[1.0]])
                self.R = np.array([[4.0]])

            def predict(self):
                self.x = self.F @ self.x
                self.P = self.F @ self.P @ self.F.T + self.Q

            def update(self, z):
                z = np.array([[z]])
                y = z - self.H @ self.x
                S = self.H @ self.P @ self.H.T + self.R
                K = self.P @ self.H.T / S

                self.x = self.x + K * y
                self.P = (np.eye(1) - K @ self.H) @ self.P

        # Run filter
        kf = MismatchedKF()
        estimates = []

        for obs in observations:
            kf.predict()
            kf.update(obs)
            estimates.append(kf.x[0, 0])

        # Plot results
        plt.figure(figsize=(12, 8))
        plt.plot(true_states[1:], 'g-', linewidth=2, label='True State')
        plt.plot(observations, 'r.', alpha=0.5, label='Observations')
        plt.plot(estimates, 'b--', linewidth=2, label='KF Estimate (Wrong Model)')

        # Calculate error
        errors = np.array(true_states[1:]) - np.array(estimates)
        plt.fill_between(range(len(estimates)),
                        np.array(estimates) - 2*np.abs(errors),
                        np.array(estimates) + 2*np.abs(errors),
                        alpha=0.3, color='blue', label='Error Band')

        plt.xlabel('Time Step')
        plt.ylabel('State Value')
        plt.title('Impact of Model Mismatch')
        plt.legend()
        plt.grid(True, alpha=0.3)
        plt.show()

        print(f"Mean Absolute Error: {np.mean(np.abs(errors)):.2f}")
        print(f"RMSE: {np.sqrt(np.mean(errors**2)):.2f}")

# Run demonstrations
analyzer = LimitationAnalyzer()
# analyzer.demonstrate_nonlinearity_issue()
# analyzer.demonstrate_non_gaussian_noise()
# analyzer.demonstrate_model_mismatch()

Filter Divergence Problem

Causes and Detection of Divergence

class DivergenceDetector:
    """
    Filter divergence detection and handling
    """

    def __init__(self, divergence_threshold=1e6):
        self.divergence_threshold = divergence_threshold
        self.innovation_history = []
        self.covariance_trace_history = []

    def detect_divergence(self, kf):
        """
        Detect filter divergence
        """
        # Detection indicators
        checks = {
            'covariance_explosion': False,
            'innovation_explosion': False,
            'numerical_instability': False,
            'negative_eigenvalues': False
        }

        # 1. Covariance matrix trace too large
        P_trace = np.trace(kf.P)
        self.covariance_trace_history.append(P_trace)
        if P_trace > self.divergence_threshold:
            checks['covariance_explosion'] = True

        # 2. Innovation too large
        if hasattr(kf, 'y') and kf.y is not None:
            innovation_norm = np.linalg.norm(kf.y)
            self.innovation_history.append(innovation_norm)

            if len(self.innovation_history) > 10:
                recent_avg = np.mean(self.innovation_history[-10:])
                if recent_avg > 10 * np.std(self.innovation_history[:-10]):
                    checks['innovation_explosion'] = True

        # 3. Numerical instability
        if np.any(np.isnan(kf.P)) or np.any(np.isinf(kf.P)):
            checks['numerical_instability'] = True

        # 4. Negative eigenvalues in covariance matrix
        eigenvals = np.linalg.eigvals(kf.P)
        if np.any(eigenvals < 0):
            checks['negative_eigenvalues'] = True

        return checks

    def apply_stabilization(self, kf, method='joseph'):
        """
        Apply stabilization techniques
        """
        if method == 'joseph':
            # Joseph form covariance update
            if hasattr(kf, 'K') and hasattr(kf, 'H') and hasattr(kf, 'R'):
                I_KH = np.eye(kf.P.shape[0]) - kf.K @ kf.H
                kf.P = I_KH @ kf.P @ I_KH.T + kf.K @ kf.R @ kf.K.T

        elif method == 'regularization':
            # Covariance regularization
            kf.P += 1e-8 * np.eye(kf.P.shape[0])

        elif method == 'eigenvalue_clipping':
            # Eigenvalue clipping
            eigenvals, eigenvecs = np.linalg.eigh(kf.P)
            eigenvals = np.maximum(eigenvals, 1e-12)
            kf.P = eigenvecs @ np.diag(eigenvals) @ eigenvecs.T

    def reset_filter(self, kf, reset_method='partial'):
        """
        Reset filter
        """
        if reset_method == 'partial':
            # Partial reset: only reset covariance
            kf.P = np.eye(kf.P.shape[0]) * np.trace(kf.P) / kf.P.shape[0]

        elif reset_method == 'full':
            # Full reset
            kf.P = np.eye(kf.P.shape[0])
            # Keep current state estimate

def divergence_prevention_example():
    """
    Filter divergence prevention example
    """
    # Design an easily divergent system
    class ProblematicKF:
        def __init__(self):
            self.x = np.array([[0.0], [1.0]])
            self.P = np.array([[1.0, 0], [0, 1.0]])
            self.F = np.array([[1.1, 1], [0, 1.1]])  # Unstable matrix
            self.H = np.array([[1, 0]])
            self.Q = np.array([[0.01, 0], [0, 0.01]])  # Q too small
            self.R = np.array([[10.0]])  # R too large
            self.K = None
            self.y = None

        def predict(self):
            self.x = self.F @ self.x
            self.P = self.F @ self.P @ self.F.T + self.Q

        def update(self, z):
            z = np.array([[z]])
            self.y = z - self.H @ self.x
            S = self.H @ self.P @ self.H.T + self.R
            self.K = self.P @ self.H.T @ np.linalg.inv(S)

            self.x = self.x + self.K @ self.y
            self.P = (np.eye(2) - self.K @ self.H) @ self.P

    # Run two versions: protected and unprotected
    np.random.seed(42)
    T = 50
    observations = np.cumsum(np.random.normal(0, 1, T))

    # Unprotected version
    kf_unprotected = ProblematicKF()
    estimates_unprotected = []
    P_trace_unprotected = []

    # Protected version
    kf_protected = ProblematicKF()
    detector = DivergenceDetector()
    estimates_protected = []
    P_trace_protected = []

    for obs in observations:
        # Unprotected filter
        try:
            kf_unprotected.predict()
            kf_unprotected.update(obs)
            estimates_unprotected.append(kf_unprotected.x[0, 0])
            P_trace_unprotected.append(np.trace(kf_unprotected.P))
        except:
            estimates_unprotected.append(np.nan)
            P_trace_unprotected.append(np.nan)

        # Protected filter
        kf_protected.predict()
        kf_protected.update(obs)

        # Detect divergence
        divergence_checks = detector.detect_divergence(kf_protected)

        if any(divergence_checks.values()):
            print(f"Divergence detected at time step {len(estimates_protected)}: {divergence_checks}")

            # Apply stabilization
            detector.apply_stabilization(kf_protected, method='joseph')

            # If still problematic, reset
            if np.trace(kf_protected.P) > 1e6:
                detector.reset_filter(kf_protected, reset_method='partial')

        estimates_protected.append(kf_protected.x[0, 0])
        P_trace_protected.append(np.trace(kf_protected.P))

    # Plot results
    fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 10))

    # State estimate
    ax1.plot(observations, 'g-', label='Observations', linewidth=2)
    ax1.plot(estimates_unprotected, 'r--', label='Unprotected KF', linewidth=2)
    ax1.plot(estimates_protected, 'b-', label='Protected KF', linewidth=2)
    ax1.set_ylabel('State Estimate')
    ax1.set_title('Filter Divergence Comparison')
    ax1.legend()
    ax1.grid(True, alpha=0.3)

    # Covariance trace
    ax2.semilogy(P_trace_unprotected, 'r--', label='Unprotected P Trace', linewidth=2)
    ax2.semilogy(P_trace_protected, 'b-', label='Protected P Trace', linewidth=2)
    ax2.set_xlabel('Time Step')
    ax2.set_ylabel('Covariance Matrix Trace (log scale)')
    ax2.set_title('Covariance Matrix Growth')
    ax2.legend()
    ax2.grid(True, alpha=0.3)

    plt.tight_layout()
    plt.show()

# Run example
# divergence_prevention_example()

Robust Filtering Techniques

H-Infinity Filtering

class HInfinityFilter:
    """
    H-infinity robust filter
    """

    def __init__(self, dim_x, dim_z, gamma=1.0):
        self.dim_x = dim_x
        self.dim_z = dim_z
        self.gamma = gamma  # Robustness parameter

        # State and covariance
        self.x = np.zeros((dim_x, 1))
        self.P = np.eye(dim_x)

        # System matrices
        self.F = np.eye(dim_x)
        self.H = np.eye(dim_z, dim_x)
        self.Q = np.eye(dim_x)
        self.R = np.eye(dim_z)

    def predict(self):
        """Prediction step"""
        self.x = self.F @ self.x
        self.P = self.F @ self.P @ self.F.T + self.Q

    def update(self, z):
        """H-infinity update step"""
        z = z.reshape(-1, 1)

        # Build augmented matrix
        I = np.eye(self.dim_x)

        # Riccati equation for H-infinity filter
        try:
            # Calculate H-infinity gain
            S = self.H @ self.P @ self.H.T + self.R

            # H-infinity correction term
            Theta = I - (1/self.gamma**2) * self.P + self.P @ self.H.T @ np.linalg.inv(self.R) @ self.H @ self.P

            if np.linalg.det(Theta) > 0:
                K_hinf = np.linalg.inv(Theta) @ self.P @ self.H.T @ np.linalg.inv(self.R)
            else:
                # Fallback to standard Kalman gain
                K_hinf = self.P @ self.H.T @ np.linalg.inv(S)

        except np.linalg.LinAlgError:
            # Use standard gain when numerical issues occur
            K_hinf = self.P @ self.H.T @ np.linalg.inv(S)

        # State update
        y = z - self.H @ self.x
        self.x = self.x + K_hinf @ y

        # Covariance update
        self.P = self.P - K_hinf @ self.H @ self.P

def compare_robust_filters():
    """
    Compare different robust filtering methods
    """
    # Generate data with outliers
    np.random.seed(42)
    T = 100

    # True state (random walk)
    true_states = [0]
    for t in range(1, T):
        true_states.append(true_states[-1] + np.random.normal(0, 0.1))

    # Observations (with outliers)
    observations = []
    for i, state in enumerate(true_states):
        if i in [20, 40, 60, 80]:  # Add outliers
            obs = state + np.random.normal(0, 5)  # Large noise
        else:
            obs = state + np.random.normal(0, 0.1)  # Normal noise
        observations.append(obs)

    # Standard Kalman filter
    class StandardKF:
        def __init__(self):
            self.x = np.array([[0.0]])
            self.P = np.array([[1.0]])
            self.F = np.array([[1.0]])
            self.H = np.array([[1.0]])
            self.Q = np.array([[0.01]])
            self.R = np.array([[0.01]])

        def predict(self):
            self.x = self.F @ self.x
            self.P = self.F @ self.P @ self.F.T + self.Q

        def update(self, z):
            z = np.array([[z]])
            y = z - self.H @ self.x
            S = self.H @ self.P @ self.H.T + self.R
            K = self.P @ self.H.T / S

            self.x = self.x + K * y
            self.P = (np.eye(1) - K @ self.H) @ self.P

    # Robust MM estimator Kalman filter
    class RobustKF:
        def __init__(self):
            self.x = np.array([[0.0]])
            self.P = np.array([[1.0]])
            self.F = np.array([[1.0]])
            self.H = np.array([[1.0]])
            self.Q = np.array([[0.01]])
            self.R = np.array([[0.01]])

        def predict(self):
            self.x = self.F @ self.x
            self.P = self.F @ self.P @ self.F.T + self.Q

        def huber_weight(self, residual, threshold=2.0):
            """Huber weight function"""
            abs_residual = np.abs(residual)
            if abs_residual <= threshold:
                return 1.0
            else:
                return threshold / abs_residual

        def update(self, z):
            z = np.array([[z]])
            y = z - self.H @ self.x
            S = self.H @ self.P @ self.H.T + self.R

            # Calculate robust weight
            weight = self.huber_weight(y[0, 0] / np.sqrt(S[0, 0]))

            # Adjust observation noise
            R_robust = self.R / weight
            S_robust = self.H @ self.P @ self.H.T + R_robust
            K = self.P @ self.H.T / S_robust

            self.x = self.x + K * y
            self.P = (np.eye(1) - K @ self.H) @ self.P

    # Run filters
    standard_kf = StandardKF()
    robust_kf = RobustKF()
    hinf_kf = HInfinityFilter(1, 1, gamma=2.0)

    standard_estimates = []
    robust_estimates = []
    hinf_estimates = []

    for obs in observations:
        # Standard KF
        standard_kf.predict()
        standard_kf.update(obs)
        standard_estimates.append(standard_kf.x[0, 0])

        # Robust KF
        robust_kf.predict()
        robust_kf.update(obs)
        robust_estimates.append(robust_kf.x[0, 0])

        # H-infinity filter
        hinf_kf.predict()
        hinf_kf.update(np.array([obs]))
        hinf_estimates.append(hinf_kf.x[0, 0])

    # Plot results
    plt.figure(figsize=(15, 10))

    plt.subplot(2, 1, 1)
    plt.plot(true_states, 'g-', linewidth=3, label='True State')
    plt.plot(observations, 'k.', alpha=0.6, label='Observations (with outliers)')
    plt.plot(standard_estimates, 'r--', linewidth=2, label='Standard KF')
    plt.plot(robust_estimates, 'b-', linewidth=2, label='Robust KF')
    plt.plot(hinf_estimates, 'm:', linewidth=2, label='H-infinity Filter')

    # Mark outliers
    outlier_indices = [20, 40, 60, 80]
    for idx in outlier_indices:
        plt.axvline(x=idx, color='red', linestyle=':', alpha=0.5)

    plt.ylabel('State Value')
    plt.title('Comparison of Robust Filtering Methods')
    plt.legend()
    plt.grid(True, alpha=0.3)

    plt.subplot(2, 1, 2)
    # Calculate errors
    standard_errors = np.abs(np.array(true_states) - np.array(standard_estimates))
    robust_errors = np.abs(np.array(true_states) - np.array(robust_estimates))
    hinf_errors = np.abs(np.array(true_states) - np.array(hinf_estimates))

    plt.plot(standard_errors, 'r--', linewidth=2, label='Standard KF Error')
    plt.plot(robust_errors, 'b-', linewidth=2, label='Robust KF Error')
    plt.plot(hinf_errors, 'm:', linewidth=2, label='H-infinity Filter Error')

    for idx in outlier_indices:
        plt.axvline(x=idx, color='red', linestyle=':', alpha=0.5)

    plt.xlabel('Time Step')
    plt.ylabel('Absolute Error')
    plt.title('Estimation Error Comparison')
    plt.legend()
    plt.grid(True, alpha=0.3)

    plt.tight_layout()
    plt.show()

    # Calculate performance metrics
    print("Performance Comparison:")
    print(f"Standard KF RMSE: {np.sqrt(np.mean(standard_errors**2)):.4f}")
    print(f"Robust KF RMSE: {np.sqrt(np.mean(robust_errors**2)):.4f}")
    print(f"H-infinity Filter RMSE: {np.sqrt(np.mean(hinf_errors**2)):.4f}")

# Run comparison
# compare_robust_filters()

Adaptive Filtering Methods

Multiple Model Adaptive Estimation (MMAE)

class MultipleModelAdaptiveEstimator:
    """
    Multiple model adaptive estimator
    """

    def __init__(self, models, initial_probabilities=None):
        self.models = models
        self.n_models = len(models)

        if initial_probabilities is None:
            self.model_probabilities = np.ones(self.n_models) / self.n_models
        else:
            self.model_probabilities = np.array(initial_probabilities)

        # Model transition probability matrix (Markov chain)
        self.transition_matrix = 0.95 * np.eye(self.n_models) + \
                               0.05 * np.ones((self.n_models, self.n_models)) / self.n_models

        self.likelihood_history = []

    def predict(self):
        """Prediction step"""
        # Model probability prediction
        self.model_probabilities = self.transition_matrix.T @ self.model_probabilities

        # Predict each model
        for model in self.models:
            model.predict()

    def update(self, z):
        """Update step"""
        likelihoods = np.zeros(self.n_models)

        # Calculate likelihood for each model
        for i, model in enumerate(self.models):
            model.update(z)

            # Calculate likelihood (simplified)
            if hasattr(model, 'y') and hasattr(model, 'S'):
                residual = model.y
                S = model.S
                likelihood = np.exp(-0.5 * residual.T @ np.linalg.inv(S) @ residual)
                likelihoods[i] = likelihood[0, 0] if likelihood.ndim > 0 else likelihood
            else:
                likelihoods[i] = 1.0

        # Prevent numerical issues
        likelihoods = np.maximum(likelihoods, 1e-10)

        # Update model probabilities
        self.model_probabilities = self.model_probabilities * likelihoods
        self.model_probabilities /= np.sum(self.model_probabilities)

        self.likelihood_history.append(likelihoods.copy())

    def get_combined_estimate(self):
        """Get combined estimate"""
        combined_state = None
        combined_covariance = None

        for i, model in enumerate(self.models):
            weight = self.model_probabilities[i]

            if combined_state is None:
                combined_state = weight * model.x
                combined_covariance = weight * (model.P + model.x @ model.x.T)
            else:
                combined_state += weight * model.x
                combined_covariance += weight * (model.P + model.x @ model.x.T)

        # Adjust covariance
        combined_covariance -= combined_state @ combined_state.T

        return combined_state, combined_covariance

def mmae_example():
    """
    Multiple model adaptive estimation example
    """
    # Define different models
    class ModelA:  # Low noise model
        def __init__(self):
            self.x = np.array([[0.0]])
            self.P = np.array([[1.0]])
            self.F = np.array([[1.0]])
            self.H = np.array([[1.0]])
            self.Q = np.array([[0.001]])  # Low process noise
            self.R = np.array([[0.01]])
            self.y = None
            self.S = None

        def predict(self):
            self.x = self.F @ self.x
            self.P = self.F @ self.P @ self.F.T + self.Q

        def update(self, z):
            z = np.array([[z]])
            self.y = z - self.H @ self.x
            self.S = self.H @ self.P @ self.H.T + self.R
            K = self.P @ self.H.T @ np.linalg.inv(self.S)

            self.x = self.x + K @ self.y
            self.P = (np.eye(1) - K @ self.H) @ self.P

    class ModelB:  # Medium noise model
        def __init__(self):
            self.x = np.array([[0.0]])
            self.P = np.array([[1.0]])
            self.F = np.array([[1.0]])
            self.H = np.array([[1.0]])
            self.Q = np.array([[0.01]])  # Medium process noise
            self.R = np.array([[0.01]])
            self.y = None
            self.S = None

        def predict(self):
            self.x = self.F @ self.x
            self.P = self.F @ self.P @ self.F.T + self.Q

        def update(self, z):
            z = np.array([[z]])
            self.y = z - self.H @ self.x
            self.S = self.H @ self.P @ self.H.T + self.R
            K = self.P @ self.H.T @ np.linalg.inv(self.S)

            self.x = self.x + K @ self.y
            self.P = (np.eye(1) - K @ self.H) @ self.P

    class ModelC:  # High noise model
        def __init__(self):
            self.x = np.array([[0.0]])
            self.P = np.array([[1.0]])
            self.F = np.array([[1.0]])
            self.H = np.array([[1.0]])
            self.Q = np.array([[0.1]])  # High process noise
            self.R = np.array([[0.01]])
            self.y = None
            self.S = None

        def predict(self):
            self.x = self.F @ self.x
            self.P = self.F @ self.P @ self.F.T + self.Q

        def update(self, z):
            z = np.array([[z]])
            self.y = z - self.H @ self.x
            self.S = self.H @ self.P @ self.H.T + self.R
            K = self.P @ self.H.T @ np.linalg.inv(self.S)

            self.x = self.x + K @ self.y
            self.P = (np.eye(1) - K @ self.H) @ self.P

    # Generate data for switching system
    np.random.seed(42)
    T = 200

    true_states = [0]
    true_model_sequence = []
    observations = []

    for t in range(1, T):
        # Model switching logic
        if t < 50:
            current_model = 0  # Low noise
            q_true = 0.001
        elif t < 100:
            current_model = 2  # High noise
            q_true = 0.1
        elif t < 150:
            current_model = 1  # Medium noise
            q_true = 0.01
        else:
            current_model = 0  # Back to low noise
            q_true = 0.001

        true_model_sequence.append(current_model)

        # State evolution
        new_state = true_states[-1] + np.random.normal(0, np.sqrt(q_true))
        true_states.append(new_state)

        # Observation
        obs = new_state + np.random.normal(0, 0.1)
        observations.append(obs)

    # Run MMAE
    models = [ModelA(), ModelB(), ModelC()]
    mmae = MultipleModelAdaptiveEstimator(models)

    combined_estimates = []
    model_probabilities_history = []

    for obs in observations:
        mmae.predict()
        mmae.update(obs)

        combined_state, _ = mmae.get_combined_estimate()
        combined_estimates.append(combined_state[0, 0])
        model_probabilities_history.append(mmae.model_probabilities.copy())

    # Plot results
    fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(15, 10))

    # State estimate
    ax1.plot(true_states[1:], 'g-', linewidth=3, label='True State')
    ax1.plot(observations, 'k.', alpha=0.3, label='Observations')
    ax1.plot(combined_estimates, 'b-', linewidth=2, label='MMAE Estimate')

    # Mark model switch points
    switch_points = [50, 100, 150]
    for sp in switch_points:
        ax1.axvline(x=sp, color='red', linestyle='--', alpha=0.7)

    ax1.set_ylabel('State Value')
    ax1.set_title('Multiple Model Adaptive Estimation')
    ax1.legend()
    ax1.grid(True, alpha=0.3)

    # Model probabilities
    model_probs = np.array(model_probabilities_history)
    ax2.plot(model_probs[:, 0], 'r-', linewidth=2, label='Model A Prob (Low Noise)')
    ax2.plot(model_probs[:, 1], 'g-', linewidth=2, label='Model B Prob (Med Noise)')
    ax2.plot(model_probs[:, 2], 'b-', linewidth=2, label='Model C Prob (High Noise)')

    # True model sequence
    for i, model_idx in enumerate(true_model_sequence):
        color = ['red', 'green', 'blue'][model_idx]
        ax2.scatter(i, 1.1, c=color, s=20, alpha=0.7)

    for sp in switch_points:
        ax2.axvline(x=sp, color='red', linestyle='--', alpha=0.7)

    ax2.set_xlabel('Time Step')
    ax2.set_ylabel('Model Probability')
    ax2.set_title('Model Probability Evolution')
    ax2.legend()
    ax2.grid(True, alpha=0.3)
    ax2.set_ylim(-0.1, 1.2)

    plt.tight_layout()
    plt.show()

    # Performance evaluation
    errors = np.abs(np.array(true_states[1:]) - np.array(combined_estimates))
    print(f"MMAE RMSE: {np.sqrt(np.mean(errors**2)):.4f}")
    print(f"Mean Absolute Error: {np.mean(errors):.4f}")

# Run example
# mmae_example()

Modern Improvement Methods

Constrained Kalman Filter

class ConstrainedKalmanFilter:
    """
    Constrained Kalman filter
    """

    def __init__(self, dim_x, dim_z, dim_c=0):
        self.dim_x = dim_x
        self.dim_z = dim_z
        self.dim_c = dim_c  # Constraint dimension

        # State and covariance
        self.x = np.zeros((dim_x, 1))
        self.P = np.eye(dim_x)

        # System matrices
        self.F = np.eye(dim_x)
        self.H = np.eye(dim_z, dim_x)
        self.Q = np.eye(dim_x)
        self.R = np.eye(dim_z)

        # Constraint matrices
        self.D = np.zeros((dim_c, dim_x))  # Constraint matrix
        self.d = np.zeros((dim_c, 1))      # Constraint vector

    def predict(self):
        """Prediction step"""
        self.x = self.F @ self.x
        self.P = self.F @ self.P @ self.F.T + self.Q

    def update_with_constraints(self, z):
        """Update with constraints"""
        z = z.reshape(-1, 1)

        # Standard Kalman update
        y = z - self.H @ self.x
        S = self.H @ self.P @ self.H.T + self.R
        K = self.P @ self.H.T @ np.linalg.inv(S)

        x_unconstrained = self.x + K @ y
        P_unconstrained = (np.eye(self.dim_x) - K @ self.H) @ self.P

        # Apply constraints
        if self.dim_c > 0:
            # Calculate constraint violation
            constraint_violation = self.D @ x_unconstrained - self.d

            # If constraints violated, project to constraint subspace
            if np.any(np.abs(constraint_violation) > 1e-10):
                # Use Lagrange multiplier method to project
                DPD = self.D @ P_unconstrained @ self.D.T

                if np.linalg.det(DPD) > 1e-12:
                    lambda_mult = np.linalg.inv(DPD) @ constraint_violation

                    # Constrained state
                    self.x = x_unconstrained - P_unconstrained @ self.D.T @ lambda_mult

                    # Constrained covariance
                    self.P = P_unconstrained - P_unconstrained @ self.D.T @ \
                           np.linalg.inv(DPD) @ self.D @ P_unconstrained
                else:
                    self.x = x_unconstrained
                    self.P = P_unconstrained
            else:
                self.x = x_unconstrained
                self.P = P_unconstrained
        else:
            self.x = x_unconstrained
            self.P = P_unconstrained

def constrained_filter_example():
    """
    Constrained filter example: position must be non-negative
    """
    # Create constrained filter
    ckf = ConstrainedKalmanFilter(dim_x=2, dim_z=1, dim_c=1)

    # System setup: [position, velocity]
    ckf.F = np.array([[1, 1], [0, 1]])  # Constant velocity motion
    ckf.H = np.array([[1, 0]])          # Observe position
    ckf.Q = np.array([[0.01, 0], [0, 0.01]])
    ckf.R = np.array([[0.1]])

    # Constraint: position >= 0
    ckf.D = np.array([[1, 0]])  # Only constrain position
    ckf.d = np.array([[0]])     # Position >= 0

    # Initialize (with constraint-violating initial state)
    ckf.x = np.array([[-1], [0.5]])  # Negative position, positive velocity
    ckf.P = np.array([[1, 0], [0, 1]])

    # Generate data (true trajectory starts from negative position)
    np.random.seed(42)
    T = 50

    true_positions = [-1]
    true_velocities = [0.5]
    observations = []

    for t in range(1, T):
        # True dynamics (may violate constraints)
        new_vel = true_velocities[-1] + np.random.normal(0, 0.1)
        new_pos = true_positions[-1] + new_vel + np.random.normal(0, 0.1)

        true_positions.append(new_pos)
        true_velocities.append(new_vel)

        # Observation
        obs = new_pos + np.random.normal(0, 0.3)
        observations.append(obs)

    # Run constrained filter and standard filter
    # Standard KF (no constraints)
    class StandardKF:
        def __init__(self):
            self.x = np.array([[-1], [0.5]])
            self.P = np.array([[1, 0], [0, 1]])
            self.F = np.array([[1, 1], [0, 1]])
            self.H = np.array([[1, 0]])
            self.Q = np.array([[0.01, 0], [0, 0.01]])
            self.R = np.array([[0.1]])

        def predict(self):
            self.x = self.F @ self.x
            self.P = self.F @ self.P @ self.F.T + self.Q

        def update(self, z):
            z = np.array([[z]])
            y = z - self.H @ self.x
            S = self.H @ self.P @ self.H.T + self.R
            K = self.P @ self.H.T @ np.linalg.inv(S)

            self.x = self.x + K @ y
            self.P = (np.eye(2) - K @ self.H) @ self.P

    skf = StandardKF()

    constrained_estimates = []
    standard_estimates = []
    constraint_violations = []

    for obs in observations:
        # Constrained filter
        ckf.predict()
        ckf.update_with_constraints(obs)
        constrained_estimates.append([ckf.x[0, 0], ckf.x[1, 0]])

        # Standard filter
        skf.predict()
        skf.update(obs)
        standard_estimates.append([skf.x[0, 0], skf.x[1, 0]])

        # Record constraint violations
        violation = min(0, skf.x[0, 0])  # Negative value indicates violation
        constraint_violations.append(violation)

    # Plot results
    constrained_estimates = np.array(constrained_estimates)
    standard_estimates = np.array(standard_estimates)

    fig, (ax1, ax2, ax3) = plt.subplots(3, 1, figsize=(12, 12))

    # Position comparison
    ax1.plot(true_positions[1:], 'g-', linewidth=3, label='True Position')
    ax1.plot(observations, 'k.', alpha=0.5, label='Observations')
    ax1.plot(standard_estimates[:, 0], 'r--', linewidth=2, label='Standard KF')
    ax1.plot(constrained_estimates[:, 0], 'b-', linewidth=2, label='Constrained KF')
    ax1.axhline(y=0, color='red', linestyle=':', label='Constraint Boundary')
    ax1.set_ylabel('Position')
    ax1.set_title('Constrained Kalman Filter: Position Estimation')
    ax1.legend()
    ax1.grid(True, alpha=0.3)

    # Velocity comparison
    ax2.plot(true_velocities[1:], 'g-', linewidth=3, label='True Velocity')
    ax2.plot(standard_estimates[:, 1], 'r--', linewidth=2, label='Standard KF')
    ax2.plot(constrained_estimates[:, 1], 'b-', linewidth=2, label='Constrained KF')
    ax2.set_ylabel('Velocity')
    ax2.set_title('Velocity Estimation Comparison')
    ax2.legend()
    ax2.grid(True, alpha=0.3)

    # Constraint violations
    ax3.plot(constraint_violations, 'r-', linewidth=2, label='Standard KF Constraint Violation')
    ax3.axhline(y=0, color='black', linestyle='-', alpha=0.5)
    ax3.fill_between(range(len(constraint_violations)), constraint_violations, 0,
                    where=np.array(constraint_violations) < 0, alpha=0.3, color='red')
    ax3.set_xlabel('Time Step')
    ax3.set_ylabel('Constraint Violation (negative)')
    ax3.set_title('Constraint Violation Level')
    ax3.legend()
    ax3.grid(True, alpha=0.3)

    plt.tight_layout()
    plt.show()

    # Statistical results
    violations_count = np.sum(np.array(constraint_violations) < -1e-6)
    print(f"Standard KF constraint violations: {violations_count}/{len(observations)} time steps")
    print(f"Constrained KF guarantees position >= 0")

# Run example
# constrained_filter_example()
Next Chapter Preview

In the next chapter, we will learn about multi-sensor fusion and distributed filtering, including centralized and distributed fusion architectures, information filtering, and covariance intersection methods.