Chapter 10: Limitations and Improvements of Kalman Filtering
Haiyue
37min
Chapter 10: Limitations and Improvements of Kalman Filtering
Learning Objectives
- Understand the assumptions and limitations of Kalman filtering
- Master robust Kalman filtering and adaptive filtering methods
- Learn about detection and handling of filter divergence
Basic Assumptions and Limitations of Kalman Filtering
Core Assumptions
Standard Kalman filtering is based on the following key assumptions:
Kalman Filter Assumptions
- Linearity: System dynamics and observations are linear
- Gaussianity: All noise follows Gaussian distributions
- White Noise: Noise is uncorrelated white noise process
- Known Model: System matrices F, H are completely known
- Known Covariance: Noise covariances Q, R are known and constant
- Perfect Initialization: Initial state estimate and covariance are correct
Real-World Challenges
import numpy as np
import matplotlib.pyplot as plt
from scipy import stats
import warnings
warnings.filterwarnings('ignore')
class LimitationAnalyzer:
"""
Kalman filter limitation analysis tool
"""
def __init__(self):
self.scenarios = {}
def demonstrate_nonlinearity_issue(self):
"""
Demonstrate impact of nonlinearity
"""
# Highly nonlinear system
def true_nonlinear_function(x):
return x + 0.5 * np.sin(5 * x)
def linear_approximation(x, x0):
# Linearization at point x0
return x0 + (x - x0) * (1 + 2.5 * np.cos(5 * x0))
# Generate data
x_true = np.linspace(-2, 2, 100)
y_true = [true_nonlinear_function(x) for x in x_true]
# Linearization at different points
linearization_points = [-1, 0, 1]
plt.figure(figsize=(12, 8))
plt.plot(x_true, y_true, 'b-', linewidth=3, label='True Nonlinear Function')
for i, x0 in enumerate(linearization_points):
y_linear = [linear_approximation(x, x0) for x in x_true]
plt.plot(x_true, y_linear, '--', linewidth=2,
label=f'Linearized at x={x0}')
plt.plot(x0, true_nonlinear_function(x0), 'ro', markersize=8)
plt.xlabel('State x')
plt.ylabel('Output y')
plt.title('Linearization Error Demonstration')
plt.legend()
plt.grid(True, alpha=0.3)
plt.show()
def demonstrate_non_gaussian_noise(self):
"""
Demonstrate impact of non-Gaussian noise
"""
np.random.seed(42)
n_samples = 1000
# Different types of noise
gaussian_noise = np.random.normal(0, 1, n_samples)
heavy_tail_noise = stats.t.rvs(df=3, size=n_samples) # t-distribution (heavy tail)
skewed_noise = stats.skewnorm.rvs(a=5, size=n_samples) # Skewed distribution
bimodal_noise = np.concatenate([
np.random.normal(-2, 0.5, n_samples//2),
np.random.normal(2, 0.5, n_samples//2)
])
noises = {
'Gaussian Noise': gaussian_noise,
'Heavy-Tail Noise (t-dist)': heavy_tail_noise,
'Skewed Noise': skewed_noise,
'Bimodal Noise': bimodal_noise
}
fig, axes = plt.subplots(2, 2, figsize=(15, 10))
axes = axes.flatten()
for i, (name, noise) in enumerate(noises.items()):
axes[i].hist(noise, bins=50, density=True, alpha=0.7,
color=['blue', 'red', 'green', 'orange'][i])
axes[i].set_title(f'{name}')
axes[i].set_xlabel('Noise Value')
axes[i].set_ylabel('Density')
axes[i].grid(True, alpha=0.3)
# Add statistical information
mean_val = np.mean(noise)
std_val = np.std(noise)
skew_val = stats.skew(noise)
kurt_val = stats.kurtosis(noise)
stats_text = f'Mean: {mean_val:.2f}\nStd: {std_val:.2f}\nSkew: {skew_val:.2f}\nKurt: {kurt_val:.2f}'
axes[i].text(0.05, 0.95, stats_text, transform=axes[i].transAxes,
verticalalignment='top', bbox=dict(boxstyle='round', facecolor='white', alpha=0.8))
plt.tight_layout()
plt.show()
def demonstrate_model_mismatch(self):
"""
Demonstrate impact of model mismatch
"""
# True system: nonlinear growth
def true_system(x, t):
return x * (1 + 0.01 * np.sin(0.1 * t))
# Assumed linear model
def assumed_model(x):
return 1.01 * x # Fixed growth rate
# Simulate data
T = 100
true_states = [100] # Initial value
observations = []
for t in range(1, T):
# True state evolution
new_state = true_system(true_states[-1], t) + np.random.normal(0, 1)
true_states.append(new_state)
# Observation
obs = new_state + np.random.normal(0, 2)
observations.append(obs)
# Kalman filter with incorrect model
class MismatchedKF:
def __init__(self):
self.x = np.array([[100.0]])
self.P = np.array([[10.0]])
self.F = np.array([[1.01]]) # Incorrect model
self.H = np.array([[1.0]])
self.Q = np.array([[1.0]])
self.R = np.array([[4.0]])
def predict(self):
self.x = self.F @ self.x
self.P = self.F @ self.P @ self.F.T + self.Q
def update(self, z):
z = np.array([[z]])
y = z - self.H @ self.x
S = self.H @ self.P @ self.H.T + self.R
K = self.P @ self.H.T / S
self.x = self.x + K * y
self.P = (np.eye(1) - K @ self.H) @ self.P
# Run filter
kf = MismatchedKF()
estimates = []
for obs in observations:
kf.predict()
kf.update(obs)
estimates.append(kf.x[0, 0])
# Plot results
plt.figure(figsize=(12, 8))
plt.plot(true_states[1:], 'g-', linewidth=2, label='True State')
plt.plot(observations, 'r.', alpha=0.5, label='Observations')
plt.plot(estimates, 'b--', linewidth=2, label='KF Estimate (Wrong Model)')
# Calculate error
errors = np.array(true_states[1:]) - np.array(estimates)
plt.fill_between(range(len(estimates)),
np.array(estimates) - 2*np.abs(errors),
np.array(estimates) + 2*np.abs(errors),
alpha=0.3, color='blue', label='Error Band')
plt.xlabel('Time Step')
plt.ylabel('State Value')
plt.title('Impact of Model Mismatch')
plt.legend()
plt.grid(True, alpha=0.3)
plt.show()
print(f"Mean Absolute Error: {np.mean(np.abs(errors)):.2f}")
print(f"RMSE: {np.sqrt(np.mean(errors**2)):.2f}")
# Run demonstrations
analyzer = LimitationAnalyzer()
# analyzer.demonstrate_nonlinearity_issue()
# analyzer.demonstrate_non_gaussian_noise()
# analyzer.demonstrate_model_mismatch()
Filter Divergence Problem
Causes and Detection of Divergence
class DivergenceDetector:
"""
Filter divergence detection and handling
"""
def __init__(self, divergence_threshold=1e6):
self.divergence_threshold = divergence_threshold
self.innovation_history = []
self.covariance_trace_history = []
def detect_divergence(self, kf):
"""
Detect filter divergence
"""
# Detection indicators
checks = {
'covariance_explosion': False,
'innovation_explosion': False,
'numerical_instability': False,
'negative_eigenvalues': False
}
# 1. Covariance matrix trace too large
P_trace = np.trace(kf.P)
self.covariance_trace_history.append(P_trace)
if P_trace > self.divergence_threshold:
checks['covariance_explosion'] = True
# 2. Innovation too large
if hasattr(kf, 'y') and kf.y is not None:
innovation_norm = np.linalg.norm(kf.y)
self.innovation_history.append(innovation_norm)
if len(self.innovation_history) > 10:
recent_avg = np.mean(self.innovation_history[-10:])
if recent_avg > 10 * np.std(self.innovation_history[:-10]):
checks['innovation_explosion'] = True
# 3. Numerical instability
if np.any(np.isnan(kf.P)) or np.any(np.isinf(kf.P)):
checks['numerical_instability'] = True
# 4. Negative eigenvalues in covariance matrix
eigenvals = np.linalg.eigvals(kf.P)
if np.any(eigenvals < 0):
checks['negative_eigenvalues'] = True
return checks
def apply_stabilization(self, kf, method='joseph'):
"""
Apply stabilization techniques
"""
if method == 'joseph':
# Joseph form covariance update
if hasattr(kf, 'K') and hasattr(kf, 'H') and hasattr(kf, 'R'):
I_KH = np.eye(kf.P.shape[0]) - kf.K @ kf.H
kf.P = I_KH @ kf.P @ I_KH.T + kf.K @ kf.R @ kf.K.T
elif method == 'regularization':
# Covariance regularization
kf.P += 1e-8 * np.eye(kf.P.shape[0])
elif method == 'eigenvalue_clipping':
# Eigenvalue clipping
eigenvals, eigenvecs = np.linalg.eigh(kf.P)
eigenvals = np.maximum(eigenvals, 1e-12)
kf.P = eigenvecs @ np.diag(eigenvals) @ eigenvecs.T
def reset_filter(self, kf, reset_method='partial'):
"""
Reset filter
"""
if reset_method == 'partial':
# Partial reset: only reset covariance
kf.P = np.eye(kf.P.shape[0]) * np.trace(kf.P) / kf.P.shape[0]
elif reset_method == 'full':
# Full reset
kf.P = np.eye(kf.P.shape[0])
# Keep current state estimate
def divergence_prevention_example():
"""
Filter divergence prevention example
"""
# Design an easily divergent system
class ProblematicKF:
def __init__(self):
self.x = np.array([[0.0], [1.0]])
self.P = np.array([[1.0, 0], [0, 1.0]])
self.F = np.array([[1.1, 1], [0, 1.1]]) # Unstable matrix
self.H = np.array([[1, 0]])
self.Q = np.array([[0.01, 0], [0, 0.01]]) # Q too small
self.R = np.array([[10.0]]) # R too large
self.K = None
self.y = None
def predict(self):
self.x = self.F @ self.x
self.P = self.F @ self.P @ self.F.T + self.Q
def update(self, z):
z = np.array([[z]])
self.y = z - self.H @ self.x
S = self.H @ self.P @ self.H.T + self.R
self.K = self.P @ self.H.T @ np.linalg.inv(S)
self.x = self.x + self.K @ self.y
self.P = (np.eye(2) - self.K @ self.H) @ self.P
# Run two versions: protected and unprotected
np.random.seed(42)
T = 50
observations = np.cumsum(np.random.normal(0, 1, T))
# Unprotected version
kf_unprotected = ProblematicKF()
estimates_unprotected = []
P_trace_unprotected = []
# Protected version
kf_protected = ProblematicKF()
detector = DivergenceDetector()
estimates_protected = []
P_trace_protected = []
for obs in observations:
# Unprotected filter
try:
kf_unprotected.predict()
kf_unprotected.update(obs)
estimates_unprotected.append(kf_unprotected.x[0, 0])
P_trace_unprotected.append(np.trace(kf_unprotected.P))
except:
estimates_unprotected.append(np.nan)
P_trace_unprotected.append(np.nan)
# Protected filter
kf_protected.predict()
kf_protected.update(obs)
# Detect divergence
divergence_checks = detector.detect_divergence(kf_protected)
if any(divergence_checks.values()):
print(f"Divergence detected at time step {len(estimates_protected)}: {divergence_checks}")
# Apply stabilization
detector.apply_stabilization(kf_protected, method='joseph')
# If still problematic, reset
if np.trace(kf_protected.P) > 1e6:
detector.reset_filter(kf_protected, reset_method='partial')
estimates_protected.append(kf_protected.x[0, 0])
P_trace_protected.append(np.trace(kf_protected.P))
# Plot results
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 10))
# State estimate
ax1.plot(observations, 'g-', label='Observations', linewidth=2)
ax1.plot(estimates_unprotected, 'r--', label='Unprotected KF', linewidth=2)
ax1.plot(estimates_protected, 'b-', label='Protected KF', linewidth=2)
ax1.set_ylabel('State Estimate')
ax1.set_title('Filter Divergence Comparison')
ax1.legend()
ax1.grid(True, alpha=0.3)
# Covariance trace
ax2.semilogy(P_trace_unprotected, 'r--', label='Unprotected P Trace', linewidth=2)
ax2.semilogy(P_trace_protected, 'b-', label='Protected P Trace', linewidth=2)
ax2.set_xlabel('Time Step')
ax2.set_ylabel('Covariance Matrix Trace (log scale)')
ax2.set_title('Covariance Matrix Growth')
ax2.legend()
ax2.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
# Run example
# divergence_prevention_example()
Robust Filtering Techniques
H-Infinity Filtering
class HInfinityFilter:
"""
H-infinity robust filter
"""
def __init__(self, dim_x, dim_z, gamma=1.0):
self.dim_x = dim_x
self.dim_z = dim_z
self.gamma = gamma # Robustness parameter
# State and covariance
self.x = np.zeros((dim_x, 1))
self.P = np.eye(dim_x)
# System matrices
self.F = np.eye(dim_x)
self.H = np.eye(dim_z, dim_x)
self.Q = np.eye(dim_x)
self.R = np.eye(dim_z)
def predict(self):
"""Prediction step"""
self.x = self.F @ self.x
self.P = self.F @ self.P @ self.F.T + self.Q
def update(self, z):
"""H-infinity update step"""
z = z.reshape(-1, 1)
# Build augmented matrix
I = np.eye(self.dim_x)
# Riccati equation for H-infinity filter
try:
# Calculate H-infinity gain
S = self.H @ self.P @ self.H.T + self.R
# H-infinity correction term
Theta = I - (1/self.gamma**2) * self.P + self.P @ self.H.T @ np.linalg.inv(self.R) @ self.H @ self.P
if np.linalg.det(Theta) > 0:
K_hinf = np.linalg.inv(Theta) @ self.P @ self.H.T @ np.linalg.inv(self.R)
else:
# Fallback to standard Kalman gain
K_hinf = self.P @ self.H.T @ np.linalg.inv(S)
except np.linalg.LinAlgError:
# Use standard gain when numerical issues occur
K_hinf = self.P @ self.H.T @ np.linalg.inv(S)
# State update
y = z - self.H @ self.x
self.x = self.x + K_hinf @ y
# Covariance update
self.P = self.P - K_hinf @ self.H @ self.P
def compare_robust_filters():
"""
Compare different robust filtering methods
"""
# Generate data with outliers
np.random.seed(42)
T = 100
# True state (random walk)
true_states = [0]
for t in range(1, T):
true_states.append(true_states[-1] + np.random.normal(0, 0.1))
# Observations (with outliers)
observations = []
for i, state in enumerate(true_states):
if i in [20, 40, 60, 80]: # Add outliers
obs = state + np.random.normal(0, 5) # Large noise
else:
obs = state + np.random.normal(0, 0.1) # Normal noise
observations.append(obs)
# Standard Kalman filter
class StandardKF:
def __init__(self):
self.x = np.array([[0.0]])
self.P = np.array([[1.0]])
self.F = np.array([[1.0]])
self.H = np.array([[1.0]])
self.Q = np.array([[0.01]])
self.R = np.array([[0.01]])
def predict(self):
self.x = self.F @ self.x
self.P = self.F @ self.P @ self.F.T + self.Q
def update(self, z):
z = np.array([[z]])
y = z - self.H @ self.x
S = self.H @ self.P @ self.H.T + self.R
K = self.P @ self.H.T / S
self.x = self.x + K * y
self.P = (np.eye(1) - K @ self.H) @ self.P
# Robust MM estimator Kalman filter
class RobustKF:
def __init__(self):
self.x = np.array([[0.0]])
self.P = np.array([[1.0]])
self.F = np.array([[1.0]])
self.H = np.array([[1.0]])
self.Q = np.array([[0.01]])
self.R = np.array([[0.01]])
def predict(self):
self.x = self.F @ self.x
self.P = self.F @ self.P @ self.F.T + self.Q
def huber_weight(self, residual, threshold=2.0):
"""Huber weight function"""
abs_residual = np.abs(residual)
if abs_residual <= threshold:
return 1.0
else:
return threshold / abs_residual
def update(self, z):
z = np.array([[z]])
y = z - self.H @ self.x
S = self.H @ self.P @ self.H.T + self.R
# Calculate robust weight
weight = self.huber_weight(y[0, 0] / np.sqrt(S[0, 0]))
# Adjust observation noise
R_robust = self.R / weight
S_robust = self.H @ self.P @ self.H.T + R_robust
K = self.P @ self.H.T / S_robust
self.x = self.x + K * y
self.P = (np.eye(1) - K @ self.H) @ self.P
# Run filters
standard_kf = StandardKF()
robust_kf = RobustKF()
hinf_kf = HInfinityFilter(1, 1, gamma=2.0)
standard_estimates = []
robust_estimates = []
hinf_estimates = []
for obs in observations:
# Standard KF
standard_kf.predict()
standard_kf.update(obs)
standard_estimates.append(standard_kf.x[0, 0])
# Robust KF
robust_kf.predict()
robust_kf.update(obs)
robust_estimates.append(robust_kf.x[0, 0])
# H-infinity filter
hinf_kf.predict()
hinf_kf.update(np.array([obs]))
hinf_estimates.append(hinf_kf.x[0, 0])
# Plot results
plt.figure(figsize=(15, 10))
plt.subplot(2, 1, 1)
plt.plot(true_states, 'g-', linewidth=3, label='True State')
plt.plot(observations, 'k.', alpha=0.6, label='Observations (with outliers)')
plt.plot(standard_estimates, 'r--', linewidth=2, label='Standard KF')
plt.plot(robust_estimates, 'b-', linewidth=2, label='Robust KF')
plt.plot(hinf_estimates, 'm:', linewidth=2, label='H-infinity Filter')
# Mark outliers
outlier_indices = [20, 40, 60, 80]
for idx in outlier_indices:
plt.axvline(x=idx, color='red', linestyle=':', alpha=0.5)
plt.ylabel('State Value')
plt.title('Comparison of Robust Filtering Methods')
plt.legend()
plt.grid(True, alpha=0.3)
plt.subplot(2, 1, 2)
# Calculate errors
standard_errors = np.abs(np.array(true_states) - np.array(standard_estimates))
robust_errors = np.abs(np.array(true_states) - np.array(robust_estimates))
hinf_errors = np.abs(np.array(true_states) - np.array(hinf_estimates))
plt.plot(standard_errors, 'r--', linewidth=2, label='Standard KF Error')
plt.plot(robust_errors, 'b-', linewidth=2, label='Robust KF Error')
plt.plot(hinf_errors, 'm:', linewidth=2, label='H-infinity Filter Error')
for idx in outlier_indices:
plt.axvline(x=idx, color='red', linestyle=':', alpha=0.5)
plt.xlabel('Time Step')
plt.ylabel('Absolute Error')
plt.title('Estimation Error Comparison')
plt.legend()
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
# Calculate performance metrics
print("Performance Comparison:")
print(f"Standard KF RMSE: {np.sqrt(np.mean(standard_errors**2)):.4f}")
print(f"Robust KF RMSE: {np.sqrt(np.mean(robust_errors**2)):.4f}")
print(f"H-infinity Filter RMSE: {np.sqrt(np.mean(hinf_errors**2)):.4f}")
# Run comparison
# compare_robust_filters()
Adaptive Filtering Methods
Multiple Model Adaptive Estimation (MMAE)
class MultipleModelAdaptiveEstimator:
"""
Multiple model adaptive estimator
"""
def __init__(self, models, initial_probabilities=None):
self.models = models
self.n_models = len(models)
if initial_probabilities is None:
self.model_probabilities = np.ones(self.n_models) / self.n_models
else:
self.model_probabilities = np.array(initial_probabilities)
# Model transition probability matrix (Markov chain)
self.transition_matrix = 0.95 * np.eye(self.n_models) + \
0.05 * np.ones((self.n_models, self.n_models)) / self.n_models
self.likelihood_history = []
def predict(self):
"""Prediction step"""
# Model probability prediction
self.model_probabilities = self.transition_matrix.T @ self.model_probabilities
# Predict each model
for model in self.models:
model.predict()
def update(self, z):
"""Update step"""
likelihoods = np.zeros(self.n_models)
# Calculate likelihood for each model
for i, model in enumerate(self.models):
model.update(z)
# Calculate likelihood (simplified)
if hasattr(model, 'y') and hasattr(model, 'S'):
residual = model.y
S = model.S
likelihood = np.exp(-0.5 * residual.T @ np.linalg.inv(S) @ residual)
likelihoods[i] = likelihood[0, 0] if likelihood.ndim > 0 else likelihood
else:
likelihoods[i] = 1.0
# Prevent numerical issues
likelihoods = np.maximum(likelihoods, 1e-10)
# Update model probabilities
self.model_probabilities = self.model_probabilities * likelihoods
self.model_probabilities /= np.sum(self.model_probabilities)
self.likelihood_history.append(likelihoods.copy())
def get_combined_estimate(self):
"""Get combined estimate"""
combined_state = None
combined_covariance = None
for i, model in enumerate(self.models):
weight = self.model_probabilities[i]
if combined_state is None:
combined_state = weight * model.x
combined_covariance = weight * (model.P + model.x @ model.x.T)
else:
combined_state += weight * model.x
combined_covariance += weight * (model.P + model.x @ model.x.T)
# Adjust covariance
combined_covariance -= combined_state @ combined_state.T
return combined_state, combined_covariance
def mmae_example():
"""
Multiple model adaptive estimation example
"""
# Define different models
class ModelA: # Low noise model
def __init__(self):
self.x = np.array([[0.0]])
self.P = np.array([[1.0]])
self.F = np.array([[1.0]])
self.H = np.array([[1.0]])
self.Q = np.array([[0.001]]) # Low process noise
self.R = np.array([[0.01]])
self.y = None
self.S = None
def predict(self):
self.x = self.F @ self.x
self.P = self.F @ self.P @ self.F.T + self.Q
def update(self, z):
z = np.array([[z]])
self.y = z - self.H @ self.x
self.S = self.H @ self.P @ self.H.T + self.R
K = self.P @ self.H.T @ np.linalg.inv(self.S)
self.x = self.x + K @ self.y
self.P = (np.eye(1) - K @ self.H) @ self.P
class ModelB: # Medium noise model
def __init__(self):
self.x = np.array([[0.0]])
self.P = np.array([[1.0]])
self.F = np.array([[1.0]])
self.H = np.array([[1.0]])
self.Q = np.array([[0.01]]) # Medium process noise
self.R = np.array([[0.01]])
self.y = None
self.S = None
def predict(self):
self.x = self.F @ self.x
self.P = self.F @ self.P @ self.F.T + self.Q
def update(self, z):
z = np.array([[z]])
self.y = z - self.H @ self.x
self.S = self.H @ self.P @ self.H.T + self.R
K = self.P @ self.H.T @ np.linalg.inv(self.S)
self.x = self.x + K @ self.y
self.P = (np.eye(1) - K @ self.H) @ self.P
class ModelC: # High noise model
def __init__(self):
self.x = np.array([[0.0]])
self.P = np.array([[1.0]])
self.F = np.array([[1.0]])
self.H = np.array([[1.0]])
self.Q = np.array([[0.1]]) # High process noise
self.R = np.array([[0.01]])
self.y = None
self.S = None
def predict(self):
self.x = self.F @ self.x
self.P = self.F @ self.P @ self.F.T + self.Q
def update(self, z):
z = np.array([[z]])
self.y = z - self.H @ self.x
self.S = self.H @ self.P @ self.H.T + self.R
K = self.P @ self.H.T @ np.linalg.inv(self.S)
self.x = self.x + K @ self.y
self.P = (np.eye(1) - K @ self.H) @ self.P
# Generate data for switching system
np.random.seed(42)
T = 200
true_states = [0]
true_model_sequence = []
observations = []
for t in range(1, T):
# Model switching logic
if t < 50:
current_model = 0 # Low noise
q_true = 0.001
elif t < 100:
current_model = 2 # High noise
q_true = 0.1
elif t < 150:
current_model = 1 # Medium noise
q_true = 0.01
else:
current_model = 0 # Back to low noise
q_true = 0.001
true_model_sequence.append(current_model)
# State evolution
new_state = true_states[-1] + np.random.normal(0, np.sqrt(q_true))
true_states.append(new_state)
# Observation
obs = new_state + np.random.normal(0, 0.1)
observations.append(obs)
# Run MMAE
models = [ModelA(), ModelB(), ModelC()]
mmae = MultipleModelAdaptiveEstimator(models)
combined_estimates = []
model_probabilities_history = []
for obs in observations:
mmae.predict()
mmae.update(obs)
combined_state, _ = mmae.get_combined_estimate()
combined_estimates.append(combined_state[0, 0])
model_probabilities_history.append(mmae.model_probabilities.copy())
# Plot results
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(15, 10))
# State estimate
ax1.plot(true_states[1:], 'g-', linewidth=3, label='True State')
ax1.plot(observations, 'k.', alpha=0.3, label='Observations')
ax1.plot(combined_estimates, 'b-', linewidth=2, label='MMAE Estimate')
# Mark model switch points
switch_points = [50, 100, 150]
for sp in switch_points:
ax1.axvline(x=sp, color='red', linestyle='--', alpha=0.7)
ax1.set_ylabel('State Value')
ax1.set_title('Multiple Model Adaptive Estimation')
ax1.legend()
ax1.grid(True, alpha=0.3)
# Model probabilities
model_probs = np.array(model_probabilities_history)
ax2.plot(model_probs[:, 0], 'r-', linewidth=2, label='Model A Prob (Low Noise)')
ax2.plot(model_probs[:, 1], 'g-', linewidth=2, label='Model B Prob (Med Noise)')
ax2.plot(model_probs[:, 2], 'b-', linewidth=2, label='Model C Prob (High Noise)')
# True model sequence
for i, model_idx in enumerate(true_model_sequence):
color = ['red', 'green', 'blue'][model_idx]
ax2.scatter(i, 1.1, c=color, s=20, alpha=0.7)
for sp in switch_points:
ax2.axvline(x=sp, color='red', linestyle='--', alpha=0.7)
ax2.set_xlabel('Time Step')
ax2.set_ylabel('Model Probability')
ax2.set_title('Model Probability Evolution')
ax2.legend()
ax2.grid(True, alpha=0.3)
ax2.set_ylim(-0.1, 1.2)
plt.tight_layout()
plt.show()
# Performance evaluation
errors = np.abs(np.array(true_states[1:]) - np.array(combined_estimates))
print(f"MMAE RMSE: {np.sqrt(np.mean(errors**2)):.4f}")
print(f"Mean Absolute Error: {np.mean(errors):.4f}")
# Run example
# mmae_example()
Modern Improvement Methods
Constrained Kalman Filter
class ConstrainedKalmanFilter:
"""
Constrained Kalman filter
"""
def __init__(self, dim_x, dim_z, dim_c=0):
self.dim_x = dim_x
self.dim_z = dim_z
self.dim_c = dim_c # Constraint dimension
# State and covariance
self.x = np.zeros((dim_x, 1))
self.P = np.eye(dim_x)
# System matrices
self.F = np.eye(dim_x)
self.H = np.eye(dim_z, dim_x)
self.Q = np.eye(dim_x)
self.R = np.eye(dim_z)
# Constraint matrices
self.D = np.zeros((dim_c, dim_x)) # Constraint matrix
self.d = np.zeros((dim_c, 1)) # Constraint vector
def predict(self):
"""Prediction step"""
self.x = self.F @ self.x
self.P = self.F @ self.P @ self.F.T + self.Q
def update_with_constraints(self, z):
"""Update with constraints"""
z = z.reshape(-1, 1)
# Standard Kalman update
y = z - self.H @ self.x
S = self.H @ self.P @ self.H.T + self.R
K = self.P @ self.H.T @ np.linalg.inv(S)
x_unconstrained = self.x + K @ y
P_unconstrained = (np.eye(self.dim_x) - K @ self.H) @ self.P
# Apply constraints
if self.dim_c > 0:
# Calculate constraint violation
constraint_violation = self.D @ x_unconstrained - self.d
# If constraints violated, project to constraint subspace
if np.any(np.abs(constraint_violation) > 1e-10):
# Use Lagrange multiplier method to project
DPD = self.D @ P_unconstrained @ self.D.T
if np.linalg.det(DPD) > 1e-12:
lambda_mult = np.linalg.inv(DPD) @ constraint_violation
# Constrained state
self.x = x_unconstrained - P_unconstrained @ self.D.T @ lambda_mult
# Constrained covariance
self.P = P_unconstrained - P_unconstrained @ self.D.T @ \
np.linalg.inv(DPD) @ self.D @ P_unconstrained
else:
self.x = x_unconstrained
self.P = P_unconstrained
else:
self.x = x_unconstrained
self.P = P_unconstrained
else:
self.x = x_unconstrained
self.P = P_unconstrained
def constrained_filter_example():
"""
Constrained filter example: position must be non-negative
"""
# Create constrained filter
ckf = ConstrainedKalmanFilter(dim_x=2, dim_z=1, dim_c=1)
# System setup: [position, velocity]
ckf.F = np.array([[1, 1], [0, 1]]) # Constant velocity motion
ckf.H = np.array([[1, 0]]) # Observe position
ckf.Q = np.array([[0.01, 0], [0, 0.01]])
ckf.R = np.array([[0.1]])
# Constraint: position >= 0
ckf.D = np.array([[1, 0]]) # Only constrain position
ckf.d = np.array([[0]]) # Position >= 0
# Initialize (with constraint-violating initial state)
ckf.x = np.array([[-1], [0.5]]) # Negative position, positive velocity
ckf.P = np.array([[1, 0], [0, 1]])
# Generate data (true trajectory starts from negative position)
np.random.seed(42)
T = 50
true_positions = [-1]
true_velocities = [0.5]
observations = []
for t in range(1, T):
# True dynamics (may violate constraints)
new_vel = true_velocities[-1] + np.random.normal(0, 0.1)
new_pos = true_positions[-1] + new_vel + np.random.normal(0, 0.1)
true_positions.append(new_pos)
true_velocities.append(new_vel)
# Observation
obs = new_pos + np.random.normal(0, 0.3)
observations.append(obs)
# Run constrained filter and standard filter
# Standard KF (no constraints)
class StandardKF:
def __init__(self):
self.x = np.array([[-1], [0.5]])
self.P = np.array([[1, 0], [0, 1]])
self.F = np.array([[1, 1], [0, 1]])
self.H = np.array([[1, 0]])
self.Q = np.array([[0.01, 0], [0, 0.01]])
self.R = np.array([[0.1]])
def predict(self):
self.x = self.F @ self.x
self.P = self.F @ self.P @ self.F.T + self.Q
def update(self, z):
z = np.array([[z]])
y = z - self.H @ self.x
S = self.H @ self.P @ self.H.T + self.R
K = self.P @ self.H.T @ np.linalg.inv(S)
self.x = self.x + K @ y
self.P = (np.eye(2) - K @ self.H) @ self.P
skf = StandardKF()
constrained_estimates = []
standard_estimates = []
constraint_violations = []
for obs in observations:
# Constrained filter
ckf.predict()
ckf.update_with_constraints(obs)
constrained_estimates.append([ckf.x[0, 0], ckf.x[1, 0]])
# Standard filter
skf.predict()
skf.update(obs)
standard_estimates.append([skf.x[0, 0], skf.x[1, 0]])
# Record constraint violations
violation = min(0, skf.x[0, 0]) # Negative value indicates violation
constraint_violations.append(violation)
# Plot results
constrained_estimates = np.array(constrained_estimates)
standard_estimates = np.array(standard_estimates)
fig, (ax1, ax2, ax3) = plt.subplots(3, 1, figsize=(12, 12))
# Position comparison
ax1.plot(true_positions[1:], 'g-', linewidth=3, label='True Position')
ax1.plot(observations, 'k.', alpha=0.5, label='Observations')
ax1.plot(standard_estimates[:, 0], 'r--', linewidth=2, label='Standard KF')
ax1.plot(constrained_estimates[:, 0], 'b-', linewidth=2, label='Constrained KF')
ax1.axhline(y=0, color='red', linestyle=':', label='Constraint Boundary')
ax1.set_ylabel('Position')
ax1.set_title('Constrained Kalman Filter: Position Estimation')
ax1.legend()
ax1.grid(True, alpha=0.3)
# Velocity comparison
ax2.plot(true_velocities[1:], 'g-', linewidth=3, label='True Velocity')
ax2.plot(standard_estimates[:, 1], 'r--', linewidth=2, label='Standard KF')
ax2.plot(constrained_estimates[:, 1], 'b-', linewidth=2, label='Constrained KF')
ax2.set_ylabel('Velocity')
ax2.set_title('Velocity Estimation Comparison')
ax2.legend()
ax2.grid(True, alpha=0.3)
# Constraint violations
ax3.plot(constraint_violations, 'r-', linewidth=2, label='Standard KF Constraint Violation')
ax3.axhline(y=0, color='black', linestyle='-', alpha=0.5)
ax3.fill_between(range(len(constraint_violations)), constraint_violations, 0,
where=np.array(constraint_violations) < 0, alpha=0.3, color='red')
ax3.set_xlabel('Time Step')
ax3.set_ylabel('Constraint Violation (negative)')
ax3.set_title('Constraint Violation Level')
ax3.legend()
ax3.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
# Statistical results
violations_count = np.sum(np.array(constraint_violations) < -1e-6)
print(f"Standard KF constraint violations: {violations_count}/{len(observations)} time steps")
print(f"Constrained KF guarantees position >= 0")
# Run example
# constrained_filter_example()
Next Chapter Preview
In the next chapter, we will learn about multi-sensor fusion and distributed filtering, including centralized and distributed fusion architectures, information filtering, and covariance intersection methods.