第9章:系统建模与参数调优
10/2/25About 10 min
第9章:系统建模与参数调优
学习目标
- 学会建立适当的状态空间模型
- 掌握噪声协方差矩阵的选择和调优方法
- 理解模型验证和性能评估指标
状态空间模型设计
模型选择原则
选择合适的状态空间模型是卡尔曼滤波成功的关键:
模型设计考虑因素
- 物理意义:状态变量应有明确的经济或金融解释
- 可观测性:系统必须是可观测的
- 可控性:系统应该是可控的(如果有控制输入)
- 简洁性:避免过度复杂化,遵循奥卡姆剃刀原理
- 计算效率:考虑实时应用的计算约束
可观测性分析
import numpy as np
from scipy.linalg import matrix_rank
import matplotlib.pyplot as plt
def check_observability(F, H):
"""
检查系统的可观测性
参数:
F: 状态转移矩阵
H: 观测矩阵
返回:
is_observable: 布尔值,系统是否可观测
obs_matrix: 可观测性矩阵
"""
n = F.shape[0] # 状态维度
# 构建可观测性矩阵
obs_matrix = H.copy()
H_F_power = H.copy()
for i in range(1, n):
H_F_power = H_F_power @ F
obs_matrix = np.vstack([obs_matrix, H_F_power])
# 检查矩阵的秩
rank = matrix_rank(obs_matrix)
is_observable = (rank == n)
return is_observable, obs_matrix
def observability_example():
"""
可观测性分析示例
"""
# 示例1:可观测系统
F1 = np.array([[1, 1], [0, 1]]) # 匀速运动模型
H1 = np.array([[1, 0]]) # 只观测位置
obs1, obs_matrix1 = check_observability(F1, H1)
print(f"系统1可观测性: {obs1}")
print(f"可观测性矩阵秩: {matrix_rank(obs_matrix1)}")
# 示例2:不可观测系统
F2 = np.array([[1, 1], [0, 1]])
H2 = np.array([[0, 1]]) # 只观测速度
obs2, obs_matrix2 = check_observability(F2, H2)
print(f"系统2可观测性: {obs2}")
print(f"可观测性矩阵秩: {matrix_rank(obs_matrix2)}")
# observability_example()状态变量选择
在金融建模中,状态变量的选择至关重要:
class ModelDesignFramework:
"""
模型设计框架
"""
def __init__(self):
self.model_candidates = {}
self.evaluation_metrics = {}
def add_model_candidate(self, name, state_vars, obs_vars, F, H, Q, R):
"""
添加模型候选
参数:
name: 模型名称
state_vars: 状态变量描述列表
obs_vars: 观测变量描述列表
F, H, Q, R: 模型矩阵
"""
self.model_candidates[name] = {
'state_vars': state_vars,
'obs_vars': obs_vars,
'F': F,
'H': H,
'Q': Q,
'R': R,
'dim_x': F.shape[0],
'dim_z': H.shape[0]
}
def evaluate_model_complexity(self, name):
"""
评估模型复杂度
"""
model = self.model_candidates[name]
# 参数数量
n_params = (model['dim_x']**2 + # F矩阵
model['dim_x'] * model['dim_z'] + # H矩阵
model['dim_x'] * (model['dim_x'] + 1) // 2 + # Q矩阵(对称)
model['dim_z'] * (model['dim_z'] + 1) // 2) # R矩阵(对称)
# 计算复杂度(每步)
comp_predict = model['dim_x']**3 # 协方差预测
comp_update = (model['dim_z']**3 +
model['dim_x'] * model['dim_z']**2) # 卡尔曼增益和更新
complexity = {
'n_parameters': n_params,
'comp_predict': comp_predict,
'comp_update': comp_update,
'total_per_step': comp_predict + comp_update
}
return complexity
# 示例:不同复杂度的股价模型
def stock_price_model_comparison():
"""
比较不同复杂度的股价模型
"""
framework = ModelDesignFramework()
# 模型1:简单随机游走
F1 = np.array([[1]])
H1 = np.array([[1]])
Q1 = np.array([[0.01]])
R1 = np.array([[0.001]])
framework.add_model_candidate(
'Simple_RW',
['log_price'],
['log_price_obs'],
F1, H1, Q1, R1
)
# 模型2:带漂移的随机游走
F2 = np.array([[1, 1], [0, 1]])
H2 = np.array([[1, 0]])
Q2 = np.array([[0.01, 0], [0, 0.0001]])
R2 = np.array([[0.001]])
framework.add_model_candidate(
'RW_with_Drift',
['log_price', 'drift'],
['log_price_obs'],
F2, H2, Q2, R2
)
# 模型3:随机波动率模型
F3 = np.array([[1, 0], [0, 0.99]])
H3 = np.array([[1, 0]])
Q3 = np.array([[0, 0], [0, 0.01]])
R3 = np.array([[0.001]])
framework.add_model_candidate(
'Stochastic_Vol',
['log_price', 'log_variance'],
['log_price_obs'],
F3, H3, Q3, R3
)
# 评估复杂度
for name in framework.model_candidates:
complexity = framework.evaluate_model_complexity(name)
print(f"\n模型 {name}:")
print(f" 参数数量: {complexity['n_parameters']}")
print(f" 每步计算复杂度: {complexity['total_per_step']}")
# stock_price_model_comparison()噪声协方差调优
协方差矩阵的选择原则
噪声协方差矩阵的设置直接影响滤波性能:
协方差设置准则
- 过程噪声Q:反映模型的不确定性
- Q过小:滤波器过度自信,适应性差
- Q过大:滤波器不稳定,估计噪声大
- 观测噪声R:反映测量的不确定性
- R过小:过度相信观测,可能被噪声误导
- R过大:忽略观测信息,滤波效果差
参数调优方法
class ParameterOptimizer:
"""
卡尔曼滤波参数优化器
"""
def __init__(self, kf_class, data):
self.kf_class = kf_class
self.data = data
self.optimization_history = []
def objective_function(self, params, method='likelihood'):
"""
目标函数:最大似然或最小预测误差
参数:
params: 参数向量 [Q_diag, R_diag]
method: 'likelihood' 或 'prediction_error'
"""
try:
# 解析参数
n_q = self.kf_class.dim_x
n_r = self.kf_class.dim_z
Q_diag = params[:n_q]
R_diag = params[n_q:n_q+n_r]
# 确保正定性
Q = np.diag(np.maximum(Q_diag, 1e-8))
R = np.diag(np.maximum(R_diag, 1e-8))
# 创建滤波器
kf = self.kf_class()
kf.Q = Q
kf.R = R
if method == 'likelihood':
return self._negative_log_likelihood(kf)
else:
return self._prediction_error(kf)
except Exception as e:
return 1e10 # 返回大值表示失败
def _negative_log_likelihood(self, kf):
"""
计算负对数似然
"""
log_likelihood = 0
for obs in self.data:
kf.predict()
kf.update(obs)
# 累加对数似然
if hasattr(kf, 'log_likelihood'):
log_likelihood += kf.log_likelihood
return -log_likelihood
def _prediction_error(self, kf):
"""
计算预测误差
"""
prediction_errors = []
for i, obs in enumerate(self.data):
if i > 0:
# 预测
kf.predict()
predicted_obs = kf.H @ kf.x
# 计算预测误差
error = np.linalg.norm(obs - predicted_obs.flatten())
prediction_errors.append(error)
# 更新
kf.predict()
kf.update(obs)
return np.mean(prediction_errors)
def optimize(self, initial_params=None, method='likelihood'):
"""
优化参数
"""
from scipy.optimize import minimize
if initial_params is None:
# 默认初始参数
initial_params = np.concatenate([
0.01 * np.ones(self.kf_class.dim_x), # Q对角元素
0.01 * np.ones(self.kf_class.dim_z) # R对角元素
])
# 参数边界
bounds = [(1e-8, 1.0) for _ in initial_params]
# 优化
result = minimize(
self.objective_function,
initial_params,
args=(method,),
bounds=bounds,
method='L-BFGS-B'
)
return result
def parameter_optimization_example():
"""
参数优化示例
"""
# 生成模拟数据
np.random.seed(42)
T = 100
true_Q = 0.01
true_R = 0.005
# 模拟随机游走过程
true_states = [0]
observations = []
for t in range(1, T):
# 真实状态演化
true_states.append(true_states[-1] + np.random.normal(0, np.sqrt(true_Q)))
# 观测
obs = true_states[-1] + np.random.normal(0, np.sqrt(true_R))
observations.append(obs)
# 定义简单的卡尔曼滤波器类
class SimpleKF:
def __init__(self):
self.dim_x = 1
self.dim_z = 1
self.x = np.array([[0]])
self.P = np.array([[1]])
self.F = np.array([[1]])
self.H = np.array([[1]])
self.Q = np.array([[0.01]])
self.R = np.array([[0.01]])
self.log_likelihood = 0
def predict(self):
self.x = self.F @ self.x
self.P = self.F @ self.P @ self.F.T + self.Q
def update(self, z):
z = np.array([[z]])
y = z - self.H @ self.x
S = self.H @ self.P @ self.H.T + self.R
K = self.P @ self.H.T / S
self.x = self.x + K * y
self.P = (np.eye(1) - K @ self.H) @ self.P
# 计算对数似然
self.log_likelihood = -0.5 * (np.log(2 * np.pi * S) + y**2 / S)
# 参数优化
optimizer = ParameterOptimizer(SimpleKF, observations)
result = optimizer.optimize(method='likelihood')
optimal_Q = result.x[0]
optimal_R = result.x[1]
print(f"真实参数: Q={true_Q}, R={true_R}")
print(f"优化参数: Q={optimal_Q:.6f}, R={optimal_R:.6f}")
print(f"优化是否成功: {result.success}")
# parameter_optimization_example()自适应噪声估计
class AdaptiveNoiseEstimator:
"""
自适应噪声协方差估计
"""
def __init__(self, window_size=50):
self.window_size = window_size
self.innovation_history = []
self.residual_history = []
def update_noise_estimates(self, kf):
"""
基于创新序列更新噪声估计
"""
# 保存创新
if hasattr(kf, 'y') and kf.y is not None:
self.innovation_history.append(kf.y.copy())
# 保持窗口大小
if len(self.innovation_history) > self.window_size:
self.innovation_history.pop(0)
# 如果有足够的历史数据,更新噪声估计
if len(self.innovation_history) >= self.window_size:
# 估计观测噪声
innovations = np.array(self.innovation_history)
# 理论上,创新的协方差应该等于 H*P*H' + R
# 我们可以用这个关系来估计R
empirical_cov = np.cov(innovations, rowvar=False)
theoretical_cov = kf.H @ kf.P @ kf.H.T
# 更新R(简单方法)
R_new = empirical_cov - theoretical_cov
R_new = np.maximum(R_new, 0.001 * np.eye(kf.dim_z)) # 确保正定
# 平滑更新
alpha = 0.1
kf.R = (1 - alpha) * kf.R + alpha * R_new
def adaptive_Q_estimation(self, kf, prediction_errors):
"""
自适应过程噪声估计
"""
if len(prediction_errors) >= self.window_size:
recent_errors = prediction_errors[-self.window_size:]
# 如果预测误差增大,增加Q
if np.mean(recent_errors) > np.std(recent_errors):
kf.Q *= 1.1
else:
kf.Q *= 0.99
# 确保Q的最小值
kf.Q = np.maximum(kf.Q, 1e-6 * np.eye(kf.dim_x))
class AdaptiveKalmanFilter:
"""
带自适应噪声估计的卡尔曼滤波器
"""
def __init__(self, dim_x, dim_z):
self.dim_x = dim_x
self.dim_z = dim_z
# 初始化卡尔曼滤波器参数
self.x = np.zeros((dim_x, 1))
self.P = np.eye(dim_x)
self.F = np.eye(dim_x)
self.H = np.eye(dim_z, dim_x)
self.Q = 0.01 * np.eye(dim_x)
self.R = 0.01 * np.eye(dim_z)
# 自适应估计器
self.noise_estimator = AdaptiveNoiseEstimator()
self.prediction_errors = []
def predict(self):
"""预测步骤"""
self.x = self.F @ self.x
self.P = self.F @ self.P @ self.F.T + self.Q
def update(self, z):
"""更新步骤"""
z = z.reshape(-1, 1)
# 标准卡尔曼更新
self.y = z - self.H @ self.x
S = self.H @ self.P @ self.H.T + self.R
K = self.P @ self.H.T @ np.linalg.inv(S)
self.x = self.x + K @ self.y
self.P = (np.eye(self.dim_x) - K @ self.H) @ self.P
# 记录预测误差
pred_error = np.linalg.norm(self.y)
self.prediction_errors.append(pred_error)
# 自适应噪声估计
self.noise_estimator.update_noise_estimates(self)
self.noise_estimator.adaptive_Q_estimation(self, self.prediction_errors)
def adaptive_filter_example():
"""
自适应滤波器示例
"""
# 生成时变噪声的数据
np.random.seed(42)
T = 200
true_states = [0]
observations = []
for t in range(1, T):
# 时变过程噪声
if t < 50:
Q_t = 0.001
elif t < 100:
Q_t = 0.01 # 噪声突然增大
elif t < 150:
Q_t = 0.1 # 噪声进一步增大
else:
Q_t = 0.001 # 噪声回到小值
# 状态演化
true_states.append(true_states[-1] + np.random.normal(0, np.sqrt(Q_t)))
# 观测(固定噪声)
obs = true_states[-1] + np.random.normal(0, 0.1)
observations.append(obs)
# 标准卡尔曼滤波
standard_kf = AdaptiveKalmanFilter(1, 1)
standard_kf.Q = np.array([[0.01]]) # 固定Q
standard_estimates = []
# 自适应卡尔曼滤波
adaptive_kf = AdaptiveKalmanFilter(1, 1)
adaptive_estimates = []
Q_history = []
for obs in observations:
# 标准滤波
standard_kf.predict()
standard_kf.update(np.array([obs]))
standard_estimates.append(standard_kf.x[0, 0])
# 自适应滤波
adaptive_kf.predict()
adaptive_kf.update(np.array([obs]))
adaptive_estimates.append(adaptive_kf.x[0, 0])
Q_history.append(adaptive_kf.Q[0, 0])
# 绘制结果
plt.figure(figsize=(15, 12))
plt.subplot(3, 1, 1)
plt.plot(true_states[1:], 'g-', label='真实状态', linewidth=2)
plt.plot(observations, 'r.', label='观测', alpha=0.5)
plt.plot(standard_estimates, 'b--', label='标准KF', linewidth=2)
plt.plot(adaptive_estimates, 'orange', label='自适应KF', linewidth=2)
plt.legend()
plt.title('自适应vs标准卡尔曼滤波')
plt.ylabel('状态值')
plt.subplot(3, 1, 2)
standard_errors = np.abs(np.array(true_states[1:]) - np.array(standard_estimates))
adaptive_errors = np.abs(np.array(true_states[1:]) - np.array(adaptive_estimates))
plt.plot(standard_errors, 'b-', label='标准KF误差')
plt.plot(adaptive_errors, 'orange', label='自适应KF误差')
plt.legend()
plt.ylabel('绝对误差')
plt.subplot(3, 1, 3)
plt.plot(Q_history, 'purple', label='自适应Q估计')
plt.axhline(y=0.01, color='b', linestyle='--', label='标准Q值')
plt.legend()
plt.xlabel('时间步')
plt.ylabel('Q值')
plt.tight_layout()
plt.show()
# 计算性能指标
standard_rmse = np.sqrt(np.mean(standard_errors**2))
adaptive_rmse = np.sqrt(np.mean(adaptive_errors**2))
print(f"标准KF RMSE: {standard_rmse:.6f}")
print(f"自适应KF RMSE: {adaptive_rmse:.6f}")
print(f"改进幅度: {(standard_rmse - adaptive_rmse) / standard_rmse * 100:.2f}%")
# adaptive_filter_example()模型验证技术
残差分析
class ModelValidation:
"""
模型验证工具
"""
def __init__(self):
self.validation_results = {}
def innovation_analysis(self, innovations, S_matrices):
"""
创新序列分析
"""
innovations = np.array(innovations)
# 标准化创新
standardized_innovations = []
for i, (innov, S) in enumerate(zip(innovations, S_matrices)):
std_innov = innov / np.sqrt(np.diag(S))
standardized_innovations.append(std_innov)
standardized_innovations = np.array(standardized_innovations)
# 统计检验
results = {
'mean': np.mean(standardized_innovations, axis=0),
'std': np.std(standardized_innovations, axis=0),
'skewness': self._compute_skewness(standardized_innovations),
'kurtosis': self._compute_kurtosis(standardized_innovations),
'ljung_box_p': self._ljung_box_test(standardized_innovations),
'normality_p': self._normality_test(standardized_innovations)
}
return results
def _compute_skewness(self, data):
"""计算偏度"""
from scipy.stats import skew
return skew(data, axis=0)
def _compute_kurtosis(self, data):
"""计算峰度"""
from scipy.stats import kurtosis
return kurtosis(data, axis=0)
def _ljung_box_test(self, data):
"""Ljung-Box序列相关性检验"""
from scipy.stats import jarque_bera
# 简化实现,实际应使用专门的检验
p_values = []
for col in range(data.shape[1]):
_, p_val = jarque_bera(data[:, col])
p_values.append(p_val)
return p_values
def _normality_test(self, data):
"""正态性检验"""
from scipy.stats import jarque_bera
p_values = []
for col in range(data.shape[1]):
_, p_val = jarque_bera(data[:, col])
p_values.append(p_val)
return p_values
def cross_validation(self, kf_class, data, n_folds=5):
"""
交叉验证
"""
n_samples = len(data)
fold_size = n_samples // n_folds
cv_scores = []
for fold in range(n_folds):
# 分割数据
start_idx = fold * fold_size
end_idx = (fold + 1) * fold_size if fold < n_folds - 1 else n_samples
test_data = data[start_idx:end_idx]
train_data = np.concatenate([data[:start_idx], data[end_idx:]])
# 在训练数据上训练
kf = kf_class()
# 这里应该包含参数估计过程
# 在测试数据上评估
predictions = []
actuals = []
for obs in test_data:
kf.predict()
pred = kf.H @ kf.x
predictions.append(pred.flatten())
kf.update(obs)
actuals.append(obs)
# 计算评分
mse = np.mean((np.array(actuals) - np.array(predictions))**2)
cv_scores.append(mse)
return {
'cv_scores': cv_scores,
'mean_cv_score': np.mean(cv_scores),
'std_cv_score': np.std(cv_scores)
}
def model_validation_example():
"""
模型验证示例
"""
# 生成测试数据
np.random.seed(42)
T = 200
# 简单随机游走
true_states = [0]
observations = []
for t in range(1, T):
true_states.append(true_states[-1] + np.random.normal(0, 0.1))
obs = true_states[-1] + np.random.normal(0, 0.05)
observations.append(obs)
# 运行滤波器并收集创新
class TestKF:
def __init__(self):
self.x = np.array([[0]])
self.P = np.array([[1]])
self.F = np.array([[1]])
self.H = np.array([[1]])
self.Q = np.array([[0.01]])
self.R = np.array([[0.0025]])
self.y = None
self.S = None
def predict(self):
self.x = self.F @ self.x
self.P = self.F @ self.P @ self.F.T + self.Q
def update(self, z):
z = np.array([[z]])
self.y = z - self.H @ self.x
self.S = self.H @ self.P @ self.H.T + self.R
K = self.P @ self.H.T / self.S
self.x = self.x + K * self.y
self.P = (np.eye(1) - K @ self.H) @ self.P
# 运行滤波并收集数据
kf = TestKF()
innovations = []
S_matrices = []
for obs in observations:
kf.predict()
kf.update(obs)
if kf.y is not None:
innovations.append(kf.y.copy())
S_matrices.append(kf.S.copy())
# 验证分析
validator = ModelValidation()
innovation_results = validator.innovation_analysis(innovations, S_matrices)
print("创新序列分析结果:")
print(f"均值: {innovation_results['mean']}")
print(f"标准差: {innovation_results['std']}")
print(f"偏度: {innovation_results['skewness']}")
print(f"峰度: {innovation_results['kurtosis']}")
print(f"正态性检验p值: {innovation_results['normality_p']}")
# 绘制创新序列
innovations_flat = [inn[0, 0] for inn in innovations]
plt.figure(figsize=(12, 8))
plt.subplot(2, 2, 1)
plt.plot(innovations_flat)
plt.title('创新序列')
plt.xlabel('时间')
plt.ylabel('创新值')
plt.subplot(2, 2, 2)
plt.hist(innovations_flat, bins=30, density=True, alpha=0.7)
plt.title('创新分布')
plt.xlabel('创新值')
plt.ylabel('密度')
plt.subplot(2, 2, 3)
from scipy.stats import probplot
probplot(innovations_flat, dist="norm", plot=plt)
plt.title('Q-Q图')
plt.subplot(2, 2, 4)
# ACF图(简化版)
lags = range(1, 21)
acf_values = []
for lag in lags:
if len(innovations_flat) > lag:
corr = np.corrcoef(innovations_flat[:-lag], innovations_flat[lag:])[0, 1]
acf_values.append(corr)
else:
acf_values.append(0)
plt.plot(lags, acf_values, 'bo-')
plt.axhline(y=0, color='k', linestyle='-')
plt.axhline(y=0.05, color='r', linestyle='--', alpha=0.5)
plt.axhline(y=-0.05, color='r', linestyle='--', alpha=0.5)
plt.title('自相关函数')
plt.xlabel('滞后期')
plt.ylabel('自相关系数')
plt.tight_layout()
plt.show()
# model_validation_example()下一章预告
下一章我们将学习卡尔曼滤波的局限性与改进方法,包括鲁棒滤波、自适应算法和滤波发散问题的解决方案。
