学习目标
- 掌握卡尔曼滤波与机器学习的融合技术
- 学习深度卡尔曼滤波网络的实现
- 理解量化投资中的最新应用
- 掌握实时金融系统的构建
- 完成综合实战项目
1. 卡尔曼滤波与机器学习的融合
1.1 机器学习增强的卡尔曼滤波
结合机器学习技术可以显著提升卡尔曼滤波的性能:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.ensemble import RandomForestRegressor
from sklearn.neural_network import MLPRegressor
from sklearn.preprocessing import StandardScaler
from filterpy.kalman import KalmanFilter
import torch
import torch.nn as nn
import torch.optim as optim
from collections import deque
import warnings
warnings.filterwarnings('ignore')
class MLEnhancedKalmanFilter:
"""机器学习增强的卡尔曼滤波器"""
def __init__(self, state_dim=2, obs_dim=1, use_ml=True):
self.state_dim = state_dim
self.obs_dim = obs_dim
self.use_ml = use_ml
# 传统卡尔曼滤波器
self.kf = KalmanFilter(dim_x=state_dim, dim_z=obs_dim)
# 初始化滤波器参数
self.kf.F = np.array([[1., 1.], [0., 1.]]) # 状态转移矩阵
self.kf.H = np.array([[1., 0.]]) # 观测矩阵
self.kf.Q = np.eye(state_dim) * 0.01 # 过程噪声
self.kf.R = np.array([[0.1]]) # 观测噪声
self.kf.x = np.array([[0.], [0.]]) # 初始状态
self.kf.P = np.eye(state_dim) # 初始协方差
# 机器学习组件
if use_ml:
self.noise_predictor = RandomForestRegressor(n_estimators=50, random_state=42)
self.parameter_adapter = MLPRegressor(hidden_layer_sizes=(20, 10),
max_iter=1000, random_state=42)
self.scaler = StandardScaler()
# 数据存储
self.history = deque(maxlen=100)
self.features_history = deque(maxlen=100)
self.is_trained = False
def extract_features(self, price_series, volume_series=None):
"""提取特征用于机器学习"""
if len(price_series) < 10:
return np.zeros(8)
features = []
# 价格特征
returns = np.diff(price_series) / price_series[:-1]
features.extend([
np.mean(returns[-5:]), # 短期均值
np.std(returns[-5:]), # 短期波动率
np.mean(returns[-20:]), # 长期均值
np.std(returns[-20:]), # 长期波动率
price_series[-1] / np.mean(price_series[-20:]) # 价格相对位置
])
# 技术指标
if len(price_series) >= 20:
sma_20 = np.mean(price_series[-20:])
features.append((price_series[-1] - sma_20) / sma_20) # 价格偏离
else:
features.append(0)
# 动量指标
if len(price_series) >= 5:
momentum = (price_series[-1] - price_series[-5]) / price_series[-5]
features.append(momentum)
else:
features.append(0)
# 趋势强度
if len(returns) >= 10:
trend_strength = np.corrcoef(np.arange(len(returns[-10:])), returns[-10:])[0, 1]
features.append(trend_strength if not np.isnan(trend_strength) else 0)
else:
features.append(0)
return np.array(features)
def update_ml_models(self):
"""更新机器学习模型"""
if len(self.history) < 30 or not self.use_ml:
return
# 准备训练数据
X = np.array(list(self.features_history))
y_noise = []
y_params = []
for i, record in enumerate(self.history):
if i > 0:
# 噪声目标:实际观测与预测的差异
y_noise.append(abs(record['innovation']))
# 参数目标:当前最优参数设置
y_params.append([record['volatility'], record['prediction_error']])
if len(y_noise) >= 20:
X_scaled = self.scaler.fit_transform(X[1:len(y_noise)+1])
# 训练噪声预测器
self.noise_predictor.fit(X_scaled, y_noise)
# 训练参数适配器
self.parameter_adapter.fit(X_scaled, y_params)
self.is_trained = True
def adaptive_update(self, observation, price_series, volume_series=None):
"""自适应更新"""
# 提取特征
features = self.extract_features(price_series, volume_series)
self.features_history.append(features)
# 预测步骤
self.kf.predict()
# 机器学习增强
if self.is_trained and len(self.features_history) > 0:
# 预测当前条件下的最优参数
current_features = self.scaler.transform([features])
# 预测噪声水平
predicted_noise = self.noise_predictor.predict(current_features)[0]
# 预测最优参数
predicted_params = self.parameter_adapter.predict(current_features)[0]
# 动态调整噪声协方差
self.kf.R[0, 0] = max(0.01, predicted_noise)
self.kf.Q = np.eye(self.state_dim) * max(0.001, predicted_params[0])
# 更新步骤
self.kf.update([observation])
# 记录历史
innovation = observation - self.kf.H @ self.kf.x
record = {
'state': self.kf.x.copy(),
'observation': observation,
'innovation': innovation[0],
'volatility': np.sqrt(self.kf.P[0, 0]),
'prediction_error': abs(innovation[0])
}
self.history.append(record)
# 定期重训练模型
if len(self.history) % 20 == 0:
self.update_ml_models()
return {
'filtered_value': self.kf.x[0, 0],
'velocity': self.kf.x[1, 0],
'uncertainty': np.sqrt(self.kf.P[0, 0]),
'ml_enhanced': self.is_trained
}
# 深度学习卡尔曼滤波网络
class DeepKalmanFilter(nn.Module):
"""深度卡尔曼滤波网络"""
def __init__(self, state_dim=2, obs_dim=1, hidden_dim=64):
super(DeepKalmanFilter, self).__init__()
self.state_dim = state_dim
self.obs_dim = obs_dim
self.hidden_dim = hidden_dim
# 编码器网络
self.encoder = nn.Sequential(
nn.Linear(obs_dim + state_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, state_dim * 2) # 均值和方差
)
# 状态转移网络
self.transition = nn.Sequential(
nn.Linear(state_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, state_dim)
)
# 观测网络
self.observation = nn.Sequential(
nn.Linear(state_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, obs_dim)
)
# 噪声预测网络
self.noise_predictor = nn.Sequential(
nn.Linear(state_dim + obs_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, 2) # 过程噪声和观测噪声
)
def forward(self, observations, sequence_length=50):
"""前向传播"""
batch_size = observations.size(0)
device = observations.device
# 初始化状态
state = torch.zeros(batch_size, self.state_dim, device=device)
states = []
predicted_obs = []
for t in range(sequence_length):
if t < observations.size(1):
obs = observations[:, t:t+1]
# 预测噪声
noise_input = torch.cat([state, obs], dim=1)
noise_params = torch.softplus(self.noise_predictor(noise_input))
# 状态预测
predicted_state = self.transition(state)
# 编码当前信息
encoder_input = torch.cat([obs, predicted_state], dim=1)
encoded = self.encoder(encoder_input)
# 分离均值和方差
state_mean = encoded[:, :self.state_dim]
state_var = torch.softplus(encoded[:, self.state_dim:])
# 更新状态
state = state_mean + torch.sqrt(state_var) * torch.randn_like(state_var)
# 预测观测
pred_obs = self.observation(state)
states.append(state)
predicted_obs.append(pred_obs)
return torch.stack(states, dim=1), torch.stack(predicted_obs, dim=1)
def loss_function(self, true_observations, predicted_observations, states):
"""损失函数"""
# 重构损失
reconstruction_loss = nn.MSELoss()(predicted_observations, true_observations)
# 状态正则化
state_reg = torch.mean(torch.sum(states**2, dim=-1))
return reconstruction_loss + 0.01 * state_reg
# 示例:机器学习增强的卡尔曼滤波
def demonstrate_ml_enhanced_filtering():
print("开始机器学习增强卡尔曼滤波演示...")
# 生成复杂的模拟数据
np.random.seed(42)
n_points = 500
# 生成非线性、时变的价格序列
true_prices = np.zeros(n_points)
true_volatility = np.zeros(n_points)
true_prices[0] = 100
true_volatility[0] = 0.02
for t in range(1, n_points):
# 时变波动率
vol_change = 0.001 * np.sin(2 * np.pi * t / 50) + 0.0005 * np.random.normal()
true_volatility[t] = max(0.005, true_volatility[t-1] + vol_change)
# 价格变化
if t > 100 and t < 120: # 市场冲击
shock = -0.1 * np.exp(-(t-110)**2/20)
elif t > 300 and t < 350: # 趋势变化
shock = 0.001 * (t - 300)
else:
shock = 0
return_t = shock + true_volatility[t] * np.random.normal()
true_prices[t] = true_prices[t-1] * (1 + return_t)
# 添加观测噪声
observed_prices = true_prices + 0.5 * np.random.normal(size=n_points)
# 创建传统和ML增强的滤波器
traditional_kf = MLEnhancedKalmanFilter(use_ml=False)
ml_enhanced_kf = MLEnhancedKalmanFilter(use_ml=True)
# 运行滤波
traditional_results = []
ml_results = []
for t, obs_price in enumerate(observed_prices):
# 价格序列
price_window = observed_prices[max(0, t-50):t+1]
# 传统滤波
trad_result = traditional_kf.adaptive_update(obs_price, price_window)
traditional_results.append(trad_result)
# ML增强滤波
ml_result = ml_enhanced_kf.adaptive_update(obs_price, price_window)
ml_results.append(ml_result)
# 转换为DataFrame
trad_df = pd.DataFrame(traditional_results)
ml_df = pd.DataFrame(ml_results)
# 绘图比较
fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(15, 12))
# 价格跟踪比较
ax1.plot(true_prices, label='真实价格', alpha=0.7)
ax1.plot(observed_prices, label='观测价格', alpha=0.5)
ax1.plot(trad_df['filtered_value'], label='传统KF', alpha=0.8)
ax1.plot(ml_df['filtered_value'], label='ML增强KF', alpha=0.8)
ax1.set_title('价格跟踪比较')
ax1.set_ylabel('价格')
ax1.legend()
ax1.grid(True, alpha=0.3)
# 跟踪误差
trad_errors = np.abs(trad_df['filtered_value'] - true_prices)
ml_errors = np.abs(ml_df['filtered_value'] - true_prices)
ax2.plot(trad_errors, label='传统KF误差', alpha=0.7)
ax2.plot(ml_errors, label='ML增强KF误差', alpha=0.7)
ax2.set_title('跟踪误差比较')
ax2.set_ylabel('绝对误差')
ax2.legend()
ax2.grid(True, alpha=0.3)
# 不确定性估计
ax3.plot(trad_df['uncertainty'], label='传统KF不确定性', alpha=0.7)
ax3.plot(ml_df['uncertainty'], label='ML增强KF不确定性', alpha=0.7)
ax3.plot(true_volatility * 10, label='真实波动率×10', alpha=0.7)
ax3.set_title('不确定性估计')
ax3.set_ylabel('不确定性')
ax3.legend()
ax3.grid(True, alpha=0.3)
# 性能统计
trad_mse = np.mean(trad_errors**2)
ml_mse = np.mean(ml_errors**2)
improvement = (trad_mse - ml_mse) / trad_mse * 100
ax4.bar(['传统KF', 'ML增强KF'], [trad_mse, ml_mse], alpha=0.7)
ax4.set_title(f'MSE比较 (改进: {improvement:.1f}%)')
ax4.set_ylabel('均方误差')
# 添加性能文本
ax4.text(0.5, max(trad_mse, ml_mse) * 0.8,
f'传统KF MSE: {trad_mse:.4f}\nML增强KF MSE: {ml_mse:.4f}\n改进: {improvement:.1f}%',
ha='center', fontsize=10, bbox=dict(boxstyle="round,pad=0.3", facecolor="lightgray"))
plt.tight_layout()
plt.show()
# 性能分析
print(f"\n性能比较结果:")
print(f"传统卡尔曼滤波 MSE: {trad_mse:.6f}")
print(f"ML增强卡尔曼滤波 MSE: {ml_mse:.6f}")
print(f"性能改进: {improvement:.2f}%")
# ML模型训练状态
ml_trained_points = sum(1 for r in ml_results if r['ml_enhanced'])
print(f"ML模型训练完成时点: {ml_trained_points}/{len(ml_results)}")
return trad_df, ml_df, {'traditional_mse': trad_mse, 'ml_mse': ml_mse, 'improvement': improvement}
trad_results, ml_results, performance_stats = demonstrate_ml_enhanced_filtering()
12/19/24About 17 min
