第10章:卡尔曼滤波的局限性与改进
10/2/25About 14 min
第10章:卡尔曼滤波的局限性与改进
学习目标
- 理解卡尔曼滤波的假设条件和局限性
- 掌握鲁棒卡尔曼滤波和自适应滤波方法
- 了解滤波发散问题的检测和处理
卡尔曼滤波的基本假设与局限性
核心假设
标准卡尔曼滤波基于以下关键假设:
卡尔曼滤波的假设条件
- 线性性:系统动态和观测都是线性的
- 高斯性:所有噪声都服从高斯分布
- 白噪声:噪声是不相关的白噪声过程
- 已知模型:系统矩阵F、H完全已知
- 已知协方差:噪声协方差Q、R已知且恒定
- 完美初始化:初始状态估计和协方差正确
现实中的挑战
import numpy as np
import matplotlib.pyplot as plt
from scipy import stats
import warnings
warnings.filterwarnings('ignore')
class LimitationAnalyzer:
"""
卡尔曼滤波局限性分析工具
"""
def __init__(self):
self.scenarios = {}
def demonstrate_nonlinearity_issue(self):
"""
演示非线性问题的影响
"""
# 高度非线性系统
def true_nonlinear_function(x):
return x + 0.5 * np.sin(5 * x)
def linear_approximation(x, x0):
# 在x0点的线性化
return x0 + (x - x0) * (1 + 2.5 * np.cos(5 * x0))
# 生成数据
x_true = np.linspace(-2, 2, 100)
y_true = [true_nonlinear_function(x) for x in x_true]
# 在不同点的线性化
linearization_points = [-1, 0, 1]
plt.figure(figsize=(12, 8))
plt.plot(x_true, y_true, 'b-', linewidth=3, label='真实非线性函数')
for i, x0 in enumerate(linearization_points):
y_linear = [linear_approximation(x, x0) for x in x_true]
plt.plot(x_true, y_linear, '--', linewidth=2,
label=f'在x={x0}处线性化')
plt.plot(x0, true_nonlinear_function(x0), 'ro', markersize=8)
plt.xlabel('状态 x')
plt.ylabel('输出 y')
plt.title('线性化误差演示')
plt.legend()
plt.grid(True, alpha=0.3)
plt.show()
def demonstrate_non_gaussian_noise(self):
"""
演示非高斯噪声的影响
"""
np.random.seed(42)
n_samples = 1000
# 不同类型的噪声
gaussian_noise = np.random.normal(0, 1, n_samples)
heavy_tail_noise = stats.t.rvs(df=3, size=n_samples) # t分布(重尾)
skewed_noise = stats.skewnorm.rvs(a=5, size=n_samples) # 偏斜分布
bimodal_noise = np.concatenate([
np.random.normal(-2, 0.5, n_samples//2),
np.random.normal(2, 0.5, n_samples//2)
])
noises = {
'高斯噪声': gaussian_noise,
'重尾噪声 (t分布)': heavy_tail_noise,
'偏斜噪声': skewed_noise,
'双峰噪声': bimodal_noise
}
fig, axes = plt.subplots(2, 2, figsize=(15, 10))
axes = axes.flatten()
for i, (name, noise) in enumerate(noises.items()):
axes[i].hist(noise, bins=50, density=True, alpha=0.7,
color=['blue', 'red', 'green', 'orange'][i])
axes[i].set_title(f'{name}')
axes[i].set_xlabel('噪声值')
axes[i].set_ylabel('密度')
axes[i].grid(True, alpha=0.3)
# 添加统计信息
mean_val = np.mean(noise)
std_val = np.std(noise)
skew_val = stats.skew(noise)
kurt_val = stats.kurtosis(noise)
stats_text = f'均值: {mean_val:.2f}\n标准差: {std_val:.2f}\n偏度: {skew_val:.2f}\n峰度: {kurt_val:.2f}'
axes[i].text(0.05, 0.95, stats_text, transform=axes[i].transAxes,
verticalalignment='top', bbox=dict(boxstyle='round', facecolor='white', alpha=0.8))
plt.tight_layout()
plt.show()
def demonstrate_model_mismatch(self):
"""
演示模型不匹配的影响
"""
# 真实系统:非线性增长
def true_system(x, t):
return x * (1 + 0.01 * np.sin(0.1 * t))
# 假设的线性模型
def assumed_model(x):
return 1.01 * x # 固定增长率
# 模拟数据
T = 100
true_states = [100] # 初始值
observations = []
for t in range(1, T):
# 真实状态演化
new_state = true_system(true_states[-1], t) + np.random.normal(0, 1)
true_states.append(new_state)
# 观测
obs = new_state + np.random.normal(0, 2)
observations.append(obs)
# 使用错误模型的卡尔曼滤波
class MismatchedKF:
def __init__(self):
self.x = np.array([[100.0]])
self.P = np.array([[10.0]])
self.F = np.array([[1.01]]) # 错误的模型
self.H = np.array([[1.0]])
self.Q = np.array([[1.0]])
self.R = np.array([[4.0]])
def predict(self):
self.x = self.F @ self.x
self.P = self.F @ self.P @ self.F.T + self.Q
def update(self, z):
z = np.array([[z]])
y = z - self.H @ self.x
S = self.H @ self.P @ self.H.T + self.R
K = self.P @ self.H.T / S
self.x = self.x + K * y
self.P = (np.eye(1) - K @ self.H) @ self.P
# 运行滤波
kf = MismatchedKF()
estimates = []
for obs in observations:
kf.predict()
kf.update(obs)
estimates.append(kf.x[0, 0])
# 绘制结果
plt.figure(figsize=(12, 8))
plt.plot(true_states[1:], 'g-', linewidth=2, label='真实状态')
plt.plot(observations, 'r.', alpha=0.5, label='观测')
plt.plot(estimates, 'b--', linewidth=2, label='KF估计(错误模型)')
# 计算误差
errors = np.array(true_states[1:]) - np.array(estimates)
plt.fill_between(range(len(estimates)),
np.array(estimates) - 2*np.abs(errors),
np.array(estimates) + 2*np.abs(errors),
alpha=0.3, color='blue', label='误差带')
plt.xlabel('时间步')
plt.ylabel('状态值')
plt.title('模型不匹配的影响')
plt.legend()
plt.grid(True, alpha=0.3)
plt.show()
print(f"平均绝对误差: {np.mean(np.abs(errors)):.2f}")
print(f"RMSE: {np.sqrt(np.mean(errors**2)):.2f}")
# 运行演示
analyzer = LimitationAnalyzer()
# analyzer.demonstrate_nonlinearity_issue()
# analyzer.demonstrate_non_gaussian_noise()
# analyzer.demonstrate_model_mismatch()滤波发散问题
发散的原因和检测
class DivergenceDetector:
"""
滤波发散检测和处理
"""
def __init__(self, divergence_threshold=1e6):
self.divergence_threshold = divergence_threshold
self.innovation_history = []
self.covariance_trace_history = []
def detect_divergence(self, kf):
"""
检测滤波发散
"""
# 检测指标
checks = {
'covariance_explosion': False,
'innovation_explosion': False,
'numerical_instability': False,
'negative_eigenvalues': False
}
# 1. 协方差矩阵迹过大
P_trace = np.trace(kf.P)
self.covariance_trace_history.append(P_trace)
if P_trace > self.divergence_threshold:
checks['covariance_explosion'] = True
# 2. 创新过大
if hasattr(kf, 'y') and kf.y is not None:
innovation_norm = np.linalg.norm(kf.y)
self.innovation_history.append(innovation_norm)
if len(self.innovation_history) > 10:
recent_avg = np.mean(self.innovation_history[-10:])
if recent_avg > 10 * np.std(self.innovation_history[:-10]):
checks['innovation_explosion'] = True
# 3. 数值不稳定性
if np.any(np.isnan(kf.P)) or np.any(np.isinf(kf.P)):
checks['numerical_instability'] = True
# 4. 协方差矩阵负特征值
eigenvals = np.linalg.eigvals(kf.P)
if np.any(eigenvals < 0):
checks['negative_eigenvalues'] = True
return checks
def apply_stabilization(self, kf, method='joseph'):
"""
应用稳定化技术
"""
if method == 'joseph':
# Joseph形式协方差更新
if hasattr(kf, 'K') and hasattr(kf, 'H') and hasattr(kf, 'R'):
I_KH = np.eye(kf.P.shape[0]) - kf.K @ kf.H
kf.P = I_KH @ kf.P @ I_KH.T + kf.K @ kf.R @ kf.K.T
elif method == 'regularization':
# 协方差正则化
kf.P += 1e-8 * np.eye(kf.P.shape[0])
elif method == 'eigenvalue_clipping':
# 特征值修剪
eigenvals, eigenvecs = np.linalg.eigh(kf.P)
eigenvals = np.maximum(eigenvals, 1e-12)
kf.P = eigenvecs @ np.diag(eigenvals) @ eigenvecs.T
def reset_filter(self, kf, reset_method='partial'):
"""
重置滤波器
"""
if reset_method == 'partial':
# 部分重置:只重置协方差
kf.P = np.eye(kf.P.shape[0]) * np.trace(kf.P) / kf.P.shape[0]
elif reset_method == 'full':
# 完全重置
kf.P = np.eye(kf.P.shape[0])
# 保持当前状态估计
def divergence_prevention_example():
"""
滤波发散预防示例
"""
# 设计一个容易发散的系统
class ProblematicKF:
def __init__(self):
self.x = np.array([[0.0], [1.0]])
self.P = np.array([[1.0, 0], [0, 1.0]])
self.F = np.array([[1.1, 1], [0, 1.1]]) # 不稳定矩阵
self.H = np.array([[1, 0]])
self.Q = np.array([[0.01, 0], [0, 0.01]]) # Q过小
self.R = np.array([[10.0]]) # R过大
self.K = None
self.y = None
def predict(self):
self.x = self.F @ self.x
self.P = self.F @ self.P @ self.F.T + self.Q
def update(self, z):
z = np.array([[z]])
self.y = z - self.H @ self.x
S = self.H @ self.P @ self.H.T + self.R
self.K = self.P @ self.H.T @ np.linalg.inv(S)
self.x = self.x + self.K @ self.y
self.P = (np.eye(2) - self.K @ self.H) @ self.P
# 运行两个版本:有保护和无保护
np.random.seed(42)
T = 50
observations = np.cumsum(np.random.normal(0, 1, T))
# 无保护版本
kf_unprotected = ProblematicKF()
estimates_unprotected = []
P_trace_unprotected = []
# 有保护版本
kf_protected = ProblematicKF()
detector = DivergenceDetector()
estimates_protected = []
P_trace_protected = []
for obs in observations:
# 无保护滤波
try:
kf_unprotected.predict()
kf_unprotected.update(obs)
estimates_unprotected.append(kf_unprotected.x[0, 0])
P_trace_unprotected.append(np.trace(kf_unprotected.P))
except:
estimates_unprotected.append(np.nan)
P_trace_unprotected.append(np.nan)
# 有保护滤波
kf_protected.predict()
kf_protected.update(obs)
# 检测发散
divergence_checks = detector.detect_divergence(kf_protected)
if any(divergence_checks.values()):
print(f"在时间步 {len(estimates_protected)} 检测到发散: {divergence_checks}")
# 应用稳定化
detector.apply_stabilization(kf_protected, method='joseph')
# 如果仍然有问题,重置
if np.trace(kf_protected.P) > 1e6:
detector.reset_filter(kf_protected, reset_method='partial')
estimates_protected.append(kf_protected.x[0, 0])
P_trace_protected.append(np.trace(kf_protected.P))
# 绘制结果
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 10))
# 状态估计
ax1.plot(observations, 'g-', label='观测', linewidth=2)
ax1.plot(estimates_unprotected, 'r--', label='无保护KF', linewidth=2)
ax1.plot(estimates_protected, 'b-', label='有保护KF', linewidth=2)
ax1.set_ylabel('状态估计')
ax1.set_title('滤波发散比较')
ax1.legend()
ax1.grid(True, alpha=0.3)
# 协方差迹
ax2.semilogy(P_trace_unprotected, 'r--', label='无保护P迹', linewidth=2)
ax2.semilogy(P_trace_protected, 'b-', label='有保护P迹', linewidth=2)
ax2.set_xlabel('时间步')
ax2.set_ylabel('协方差矩阵迹 (对数尺度)')
ax2.set_title('协方差矩阵增长')
ax2.legend()
ax2.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
# 运行示例
# divergence_prevention_example()鲁棒滤波技术
H∞滤波
class HInfinityFilter:
"""
H∞鲁棒滤波器
"""
def __init__(self, dim_x, dim_z, gamma=1.0):
self.dim_x = dim_x
self.dim_z = dim_z
self.gamma = gamma # 鲁棒性参数
# 状态和协方差
self.x = np.zeros((dim_x, 1))
self.P = np.eye(dim_x)
# 系统矩阵
self.F = np.eye(dim_x)
self.H = np.eye(dim_z, dim_x)
self.Q = np.eye(dim_x)
self.R = np.eye(dim_z)
def predict(self):
"""预测步骤"""
self.x = self.F @ self.x
self.P = self.F @ self.P @ self.F.T + self.Q
def update(self, z):
"""H∞更新步骤"""
z = z.reshape(-1, 1)
# 构建增广矩阵
I = np.eye(self.dim_x)
# H∞滤波的Riccati方程
try:
# 计算H∞增益
S = self.H @ self.P @ self.H.T + self.R
# H∞的修正项
Theta = I - (1/self.gamma**2) * self.P + self.P @ self.H.T @ np.linalg.inv(self.R) @ self.H @ self.P
if np.linalg.det(Theta) > 0:
K_hinf = np.linalg.inv(Theta) @ self.P @ self.H.T @ np.linalg.inv(self.R)
else:
# 退化为标准卡尔曼增益
K_hinf = self.P @ self.H.T @ np.linalg.inv(S)
except np.linalg.LinAlgError:
# 数值问题时使用标准增益
K_hinf = self.P @ self.H.T @ np.linalg.inv(S)
# 状态更新
y = z - self.H @ self.x
self.x = self.x + K_hinf @ y
# 协方差更新
self.P = self.P - K_hinf @ self.H @ self.P
def compare_robust_filters():
"""
比较不同鲁棒滤波方法
"""
# 生成带异常值的数据
np.random.seed(42)
T = 100
# 真实状态(随机游走)
true_states = [0]
for t in range(1, T):
true_states.append(true_states[-1] + np.random.normal(0, 0.1))
# 观测(包含异常值)
observations = []
for i, state in enumerate(true_states):
if i in [20, 40, 60, 80]: # 添加异常值
obs = state + np.random.normal(0, 5) # 大噪声
else:
obs = state + np.random.normal(0, 0.1) # 正常噪声
observations.append(obs)
# 标准卡尔曼滤波
class StandardKF:
def __init__(self):
self.x = np.array([[0.0]])
self.P = np.array([[1.0]])
self.F = np.array([[1.0]])
self.H = np.array([[1.0]])
self.Q = np.array([[0.01]])
self.R = np.array([[0.01]])
def predict(self):
self.x = self.F @ self.x
self.P = self.F @ self.P @ self.F.T + self.Q
def update(self, z):
z = np.array([[z]])
y = z - self.H @ self.x
S = self.H @ self.P @ self.H.T + self.R
K = self.P @ self.H.T / S
self.x = self.x + K * y
self.P = (np.eye(1) - K @ self.H) @ self.P
# 鲁棒MM估计器卡尔曼滤波
class RobustKF:
def __init__(self):
self.x = np.array([[0.0]])
self.P = np.array([[1.0]])
self.F = np.array([[1.0]])
self.H = np.array([[1.0]])
self.Q = np.array([[0.01]])
self.R = np.array([[0.01]])
def predict(self):
self.x = self.F @ self.x
self.P = self.F @ self.P @ self.F.T + self.Q
def huber_weight(self, residual, threshold=2.0):
"""Huber权重函数"""
abs_residual = np.abs(residual)
if abs_residual <= threshold:
return 1.0
else:
return threshold / abs_residual
def update(self, z):
z = np.array([[z]])
y = z - self.H @ self.x
S = self.H @ self.P @ self.H.T + self.R
# 计算鲁棒权重
weight = self.huber_weight(y[0, 0] / np.sqrt(S[0, 0]))
# 调整观测噪声
R_robust = self.R / weight
S_robust = self.H @ self.P @ self.H.T + R_robust
K = self.P @ self.H.T / S_robust
self.x = self.x + K * y
self.P = (np.eye(1) - K @ self.H) @ self.P
# 运行滤波器
standard_kf = StandardKF()
robust_kf = RobustKF()
hinf_kf = HInfinityFilter(1, 1, gamma=2.0)
standard_estimates = []
robust_estimates = []
hinf_estimates = []
for obs in observations:
# 标准KF
standard_kf.predict()
standard_kf.update(obs)
standard_estimates.append(standard_kf.x[0, 0])
# 鲁棒KF
robust_kf.predict()
robust_kf.update(obs)
robust_estimates.append(robust_kf.x[0, 0])
# H∞滤波
hinf_kf.predict()
hinf_kf.update(np.array([obs]))
hinf_estimates.append(hinf_kf.x[0, 0])
# 绘制结果
plt.figure(figsize=(15, 10))
plt.subplot(2, 1, 1)
plt.plot(true_states, 'g-', linewidth=3, label='真实状态')
plt.plot(observations, 'k.', alpha=0.6, label='观测(含异常值)')
plt.plot(standard_estimates, 'r--', linewidth=2, label='标准KF')
plt.plot(robust_estimates, 'b-', linewidth=2, label='鲁棒KF')
plt.plot(hinf_estimates, 'm:', linewidth=2, label='H∞滤波')
# 标记异常值
outlier_indices = [20, 40, 60, 80]
for idx in outlier_indices:
plt.axvline(x=idx, color='red', linestyle=':', alpha=0.5)
plt.ylabel('状态值')
plt.title('鲁棒滤波方法比较')
plt.legend()
plt.grid(True, alpha=0.3)
plt.subplot(2, 1, 2)
# 计算误差
standard_errors = np.abs(np.array(true_states) - np.array(standard_estimates))
robust_errors = np.abs(np.array(true_states) - np.array(robust_estimates))
hinf_errors = np.abs(np.array(true_states) - np.array(hinf_estimates))
plt.plot(standard_errors, 'r--', linewidth=2, label='标准KF误差')
plt.plot(robust_errors, 'b-', linewidth=2, label='鲁棒KF误差')
plt.plot(hinf_errors, 'm:', linewidth=2, label='H∞滤波误差')
for idx in outlier_indices:
plt.axvline(x=idx, color='red', linestyle=':', alpha=0.5)
plt.xlabel('时间步')
plt.ylabel('绝对误差')
plt.title('估计误差比较')
plt.legend()
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
# 计算性能指标
print("性能比较:")
print(f"标准KF RMSE: {np.sqrt(np.mean(standard_errors**2)):.4f}")
print(f"鲁棒KF RMSE: {np.sqrt(np.mean(robust_errors**2)):.4f}")
print(f"H∞滤波 RMSE: {np.sqrt(np.mean(hinf_errors**2)):.4f}")
# 运行比较
# compare_robust_filters()自适应滤波方法
多模型自适应估计(MMAE)
class MultipleModelAdaptiveEstimator:
"""
多模型自适应估计器
"""
def __init__(self, models, initial_probabilities=None):
self.models = models
self.n_models = len(models)
if initial_probabilities is None:
self.model_probabilities = np.ones(self.n_models) / self.n_models
else:
self.model_probabilities = np.array(initial_probabilities)
# 模型转移概率矩阵(马尔科夫链)
self.transition_matrix = 0.95 * np.eye(self.n_models) + \
0.05 * np.ones((self.n_models, self.n_models)) / self.n_models
self.likelihood_history = []
def predict(self):
"""预测步骤"""
# 模型概率预测
self.model_probabilities = self.transition_matrix.T @ self.model_probabilities
# 各模型预测
for model in self.models:
model.predict()
def update(self, z):
"""更新步骤"""
likelihoods = np.zeros(self.n_models)
# 计算各模型的似然
for i, model in enumerate(self.models):
model.update(z)
# 计算似然(简化)
if hasattr(model, 'y') and hasattr(model, 'S'):
residual = model.y
S = model.S
likelihood = np.exp(-0.5 * residual.T @ np.linalg.inv(S) @ residual)
likelihoods[i] = likelihood[0, 0] if likelihood.ndim > 0 else likelihood
else:
likelihoods[i] = 1.0
# 防止数值问题
likelihoods = np.maximum(likelihoods, 1e-10)
# 更新模型概率
self.model_probabilities = self.model_probabilities * likelihoods
self.model_probabilities /= np.sum(self.model_probabilities)
self.likelihood_history.append(likelihoods.copy())
def get_combined_estimate(self):
"""获取组合估计"""
combined_state = None
combined_covariance = None
for i, model in enumerate(self.models):
weight = self.model_probabilities[i]
if combined_state is None:
combined_state = weight * model.x
combined_covariance = weight * (model.P + model.x @ model.x.T)
else:
combined_state += weight * model.x
combined_covariance += weight * (model.P + model.x @ model.x.T)
# 调整协方差
combined_covariance -= combined_state @ combined_state.T
return combined_state, combined_covariance
def mmae_example():
"""
多模型自适应估计示例
"""
# 定义不同的模型
class ModelA: # 低噪声模型
def __init__(self):
self.x = np.array([[0.0]])
self.P = np.array([[1.0]])
self.F = np.array([[1.0]])
self.H = np.array([[1.0]])
self.Q = np.array([[0.001]]) # 低过程噪声
self.R = np.array([[0.01]])
self.y = None
self.S = None
def predict(self):
self.x = self.F @ self.x
self.P = self.F @ self.P @ self.F.T + self.Q
def update(self, z):
z = np.array([[z]])
self.y = z - self.H @ self.x
self.S = self.H @ self.P @ self.H.T + self.R
K = self.P @ self.H.T @ np.linalg.inv(self.S)
self.x = self.x + K @ self.y
self.P = (np.eye(1) - K @ self.H) @ self.P
class ModelB: # 中等噪声模型
def __init__(self):
self.x = np.array([[0.0]])
self.P = np.array([[1.0]])
self.F = np.array([[1.0]])
self.H = np.array([[1.0]])
self.Q = np.array([[0.01]]) # 中等过程噪声
self.R = np.array([[0.01]])
self.y = None
self.S = None
def predict(self):
self.x = self.F @ self.x
self.P = self.F @ self.P @ self.F.T + self.Q
def update(self, z):
z = np.array([[z]])
self.y = z - self.H @ self.x
self.S = self.H @ self.P @ self.H.T + self.R
K = self.P @ self.H.T @ np.linalg.inv(self.S)
self.x = self.x + K @ self.y
self.P = (np.eye(1) - K @ self.H) @ self.P
class ModelC: # 高噪声模型
def __init__(self):
self.x = np.array([[0.0]])
self.P = np.array([[1.0]])
self.F = np.array([[1.0]])
self.H = np.array([[1.0]])
self.Q = np.array([[0.1]]) # 高过程噪声
self.R = np.array([[0.01]])
self.y = None
self.S = None
def predict(self):
self.x = self.F @ self.x
self.P = self.F @ self.P @ self.F.T + self.Q
def update(self, z):
z = np.array([[z]])
self.y = z - self.H @ self.x
self.S = self.H @ self.P @ self.H.T + self.R
K = self.P @ self.H.T @ np.linalg.inv(self.S)
self.x = self.x + K @ self.y
self.P = (np.eye(1) - K @ self.H) @ self.P
# 生成切换系统的数据
np.random.seed(42)
T = 200
true_states = [0]
true_model_sequence = []
observations = []
for t in range(1, T):
# 模型切换逻辑
if t < 50:
current_model = 0 # 低噪声
q_true = 0.001
elif t < 100:
current_model = 2 # 高噪声
q_true = 0.1
elif t < 150:
current_model = 1 # 中等噪声
q_true = 0.01
else:
current_model = 0 # 回到低噪声
q_true = 0.001
true_model_sequence.append(current_model)
# 状态演化
new_state = true_states[-1] + np.random.normal(0, np.sqrt(q_true))
true_states.append(new_state)
# 观测
obs = new_state + np.random.normal(0, 0.1)
observations.append(obs)
# 运行MMAE
models = [ModelA(), ModelB(), ModelC()]
mmae = MultipleModelAdaptiveEstimator(models)
combined_estimates = []
model_probabilities_history = []
for obs in observations:
mmae.predict()
mmae.update(obs)
combined_state, _ = mmae.get_combined_estimate()
combined_estimates.append(combined_state[0, 0])
model_probabilities_history.append(mmae.model_probabilities.copy())
# 绘制结果
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(15, 10))
# 状态估计
ax1.plot(true_states[1:], 'g-', linewidth=3, label='真实状态')
ax1.plot(observations, 'k.', alpha=0.3, label='观测')
ax1.plot(combined_estimates, 'b-', linewidth=2, label='MMAE估计')
# 标记模型切换点
switch_points = [50, 100, 150]
for sp in switch_points:
ax1.axvline(x=sp, color='red', linestyle='--', alpha=0.7)
ax1.set_ylabel('状态值')
ax1.set_title('多模型自适应估计')
ax1.legend()
ax1.grid(True, alpha=0.3)
# 模型概率
model_probs = np.array(model_probabilities_history)
ax2.plot(model_probs[:, 0], 'r-', linewidth=2, label='模型A概率(低噪声)')
ax2.plot(model_probs[:, 1], 'g-', linewidth=2, label='模型B概率(中噪声)')
ax2.plot(model_probs[:, 2], 'b-', linewidth=2, label='模型C概率(高噪声)')
# 真实模型序列
for i, model_idx in enumerate(true_model_sequence):
color = ['red', 'green', 'blue'][model_idx]
ax2.scatter(i, 1.1, c=color, s=20, alpha=0.7)
for sp in switch_points:
ax2.axvline(x=sp, color='red', linestyle='--', alpha=0.7)
ax2.set_xlabel('时间步')
ax2.set_ylabel('模型概率')
ax2.set_title('模型概率演化')
ax2.legend()
ax2.grid(True, alpha=0.3)
ax2.set_ylim(-0.1, 1.2)
plt.tight_layout()
plt.show()
# 性能评估
errors = np.abs(np.array(true_states[1:]) - np.array(combined_estimates))
print(f"MMAE RMSE: {np.sqrt(np.mean(errors**2)):.4f}")
print(f"平均绝对误差: {np.mean(errors):.4f}")
# 运行示例
# mmae_example()现代改进方法
约束卡尔曼滤波
class ConstrainedKalmanFilter:
"""
约束卡尔曼滤波器
"""
def __init__(self, dim_x, dim_z, dim_c=0):
self.dim_x = dim_x
self.dim_z = dim_z
self.dim_c = dim_c # 约束维度
# 状态和协方差
self.x = np.zeros((dim_x, 1))
self.P = np.eye(dim_x)
# 系统矩阵
self.F = np.eye(dim_x)
self.H = np.eye(dim_z, dim_x)
self.Q = np.eye(dim_x)
self.R = np.eye(dim_z)
# 约束矩阵
self.D = np.zeros((dim_c, dim_x)) # 约束矩阵
self.d = np.zeros((dim_c, 1)) # 约束向量
def predict(self):
"""预测步骤"""
self.x = self.F @ self.x
self.P = self.F @ self.P @ self.F.T + self.Q
def update_with_constraints(self, z):
"""带约束的更新"""
z = z.reshape(-1, 1)
# 标准卡尔曼更新
y = z - self.H @ self.x
S = self.H @ self.P @ self.H.T + self.R
K = self.P @ self.H.T @ np.linalg.inv(S)
x_unconstrained = self.x + K @ y
P_unconstrained = (np.eye(self.dim_x) - K @ self.H) @ self.P
# 应用约束
if self.dim_c > 0:
# 计算约束违反
constraint_violation = self.D @ x_unconstrained - self.d
# 如果违反约束,进行投影
if np.any(np.abs(constraint_violation) > 1e-10):
# 使用拉格朗日乘数法投影到约束子空间
DPD = self.D @ P_unconstrained @ self.D.T
if np.linalg.det(DPD) > 1e-12:
lambda_mult = np.linalg.inv(DPD) @ constraint_violation
# 约束状态
self.x = x_unconstrained - P_unconstrained @ self.D.T @ lambda_mult
# 约束协方差
self.P = P_unconstrained - P_unconstrained @ self.D.T @ \
np.linalg.inv(DPD) @ self.D @ P_unconstrained
else:
self.x = x_unconstrained
self.P = P_unconstrained
else:
self.x = x_unconstrained
self.P = P_unconstrained
else:
self.x = x_unconstrained
self.P = P_unconstrained
def constrained_filter_example():
"""
约束滤波示例:位置必须非负
"""
# 创建约束滤波器
ckf = ConstrainedKalmanFilter(dim_x=2, dim_z=1, dim_c=1)
# 系统设置:[位置, 速度]
ckf.F = np.array([[1, 1], [0, 1]]) # 匀速运动
ckf.H = np.array([[1, 0]]) # 观测位置
ckf.Q = np.array([[0.01, 0], [0, 0.01]])
ckf.R = np.array([[0.1]])
# 约束:位置 >= 0
ckf.D = np.array([[1, 0]]) # 只约束位置
ckf.d = np.array([[0]]) # 位置 >= 0
# 初始化(违反约束的初始状态)
ckf.x = np.array([[-1], [0.5]]) # 负位置,正速度
ckf.P = np.array([[1, 0], [0, 1]])
# 生成数据(真实轨迹从负位置开始)
np.random.seed(42)
T = 50
true_positions = [-1]
true_velocities = [0.5]
observations = []
for t in range(1, T):
# 真实动态(可能违反约束)
new_vel = true_velocities[-1] + np.random.normal(0, 0.1)
new_pos = true_positions[-1] + new_vel + np.random.normal(0, 0.1)
true_positions.append(new_pos)
true_velocities.append(new_vel)
# 观测
obs = new_pos + np.random.normal(0, 0.3)
observations.append(obs)
# 运行约束滤波和标准滤波
# 标准KF(无约束)
class StandardKF:
def __init__(self):
self.x = np.array([[-1], [0.5]])
self.P = np.array([[1, 0], [0, 1]])
self.F = np.array([[1, 1], [0, 1]])
self.H = np.array([[1, 0]])
self.Q = np.array([[0.01, 0], [0, 0.01]])
self.R = np.array([[0.1]])
def predict(self):
self.x = self.F @ self.x
self.P = self.F @ self.P @ self.F.T + self.Q
def update(self, z):
z = np.array([[z]])
y = z - self.H @ self.x
S = self.H @ self.P @ self.H.T + self.R
K = self.P @ self.H.T @ np.linalg.inv(S)
self.x = self.x + K @ y
self.P = (np.eye(2) - K @ self.H) @ self.P
skf = StandardKF()
constrained_estimates = []
standard_estimates = []
constraint_violations = []
for obs in observations:
# 约束滤波
ckf.predict()
ckf.update_with_constraints(obs)
constrained_estimates.append([ckf.x[0, 0], ckf.x[1, 0]])
# 标准滤波
skf.predict()
skf.update(obs)
standard_estimates.append([skf.x[0, 0], skf.x[1, 0]])
# 记录约束违反
violation = min(0, skf.x[0, 0]) # 负值表示违反约束
constraint_violations.append(violation)
# 绘制结果
constrained_estimates = np.array(constrained_estimates)
standard_estimates = np.array(standard_estimates)
fig, (ax1, ax2, ax3) = plt.subplots(3, 1, figsize=(12, 12))
# 位置比较
ax1.plot(true_positions[1:], 'g-', linewidth=3, label='真实位置')
ax1.plot(observations, 'k.', alpha=0.5, label='观测')
ax1.plot(standard_estimates[:, 0], 'r--', linewidth=2, label='标准KF')
ax1.plot(constrained_estimates[:, 0], 'b-', linewidth=2, label='约束KF')
ax1.axhline(y=0, color='red', linestyle=':', label='约束边界')
ax1.set_ylabel('位置')
ax1.set_title('约束卡尔曼滤波:位置估计')
ax1.legend()
ax1.grid(True, alpha=0.3)
# 速度比较
ax2.plot(true_velocities[1:], 'g-', linewidth=3, label='真实速度')
ax2.plot(standard_estimates[:, 1], 'r--', linewidth=2, label='标准KF')
ax2.plot(constrained_estimates[:, 1], 'b-', linewidth=2, label='约束KF')
ax2.set_ylabel('速度')
ax2.set_title('速度估计比较')
ax2.legend()
ax2.grid(True, alpha=0.3)
# 约束违反
ax3.plot(constraint_violations, 'r-', linewidth=2, label='标准KF约束违反')
ax3.axhline(y=0, color='black', linestyle='-', alpha=0.5)
ax3.fill_between(range(len(constraint_violations)), constraint_violations, 0,
where=np.array(constraint_violations) < 0, alpha=0.3, color='red')
ax3.set_xlabel('时间步')
ax3.set_ylabel('约束违反 (负值)')
ax3.set_title('约束违反程度')
ax3.legend()
ax3.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
# 统计结果
violations_count = np.sum(np.array(constraint_violations) < -1e-6)
print(f"标准KF违反约束的时间步数: {violations_count}/{len(observations)}")
print(f"约束KF保证位置 >= 0")
# 运行示例
# constrained_filter_example()下一章预告
下一章我们将学习多传感器融合与分布式滤波,包括集中式和分布式融合架构、信息滤波和协方差交集方法。
