学习目标
- 理解卡尔曼滤波的核心思想和工作原理
- 掌握预测步骤和更新步骤的数学推导
- 理解最优性证明和贝叶斯框架
核心思想
卡尔曼滤波是一种递归的状态估计算法,其核心思想是通过预测-更新循环,利用系统的动态模型和观测数据来获得系统状态的最优估计。
基本哲学
卡尔曼滤波基于以下几个核心概念:
10/2/25About 6 min
学习目标
卡尔曼滤波是一种递归的状态估计算法,其核心思想是通过预测-更新循环,利用系统的动态模型和观测数据来获得系统状态的最优估计。
卡尔曼滤波基于以下几个核心概念:
学习目标
卡尔曼滤波算法包含两个主要阶段,共五个核心方程:
方程1 - 状态预测:
x^k∣k−1=Fkx^k−1∣k−1+Bkuk
学习目标
标准卡尔曼滤波适用于线性高斯系统,但现实中许多系统都是非线性的。扩展卡尔曼滤波(EKF)通过局部线性化来处理非线性系统。
非线性状态转移:
学习目标
无迹卡尔曼滤波(UKF)通过无迹变换(Unscented Transform)来处理非线性系统,避免了EKF中复杂的雅可比计算。
无迹变换的哲学
"近似概率分布比近似非线性函数更容易"
学习目标
粒子滤波(Particle Filter,PF)是一种基于蒙特卡洛方法的非参数贝叶斯滤波技术,适用于高度非线性和非高斯系统。
粒子滤波哲学
用大量随机样本(粒子)来近似后验概率分布,每个粒子代表状态空间中的一个可能状态。
学习目标
在金融领域,许多重要的变量都是不可直接观测的隐含状态,需要通过市场观测数据来推断:
学习目标
选择合适的状态空间模型是卡尔曼滤波成功的关键:
模型设计考虑因素
学习目标
标准卡尔曼滤波基于以下关键假设:
卡尔曼滤波的假设条件
学习目标
在现代金融系统中,信息来源多样化:
金融多数据源
学习目标
传统的投资组合理论假设收益率和风险参数是恒定的,但现实中这些参数是时变的。卡尔曼滤波为动态投资组合管理提供了强大的工具。
import numpy as np
import matplotlib.pyplot as plt
from scipy.optimize import minimize
from scipy import linalg
import pandas as pd
import warnings
warnings.filterwarnings('ignore')
class DynamicPortfolioKF:
"""
动态投资组合卡尔曼滤波器
"""
def __init__(self, n_assets):
self.n_assets = n_assets
# 状态:[预期收益率, 风险因子载荷, 特异性风险]
# 简化模型:每个资产的预期收益率
self.dim_x = n_assets
# 状态向量:预期收益率
self.x = np.zeros((self.dim_x, 1))
self.P = np.eye(self.dim_x)
# 系统矩阵
self.F = 0.99 * np.eye(self.dim_x) # 收益率均值回归
self.H = np.eye(self.n_assets) # 直接观测收益率
# 噪声协方差
self.Q = 0.001 * np.eye(self.dim_x) # 预期收益率变化
self.R = 0.01 * np.eye(self.n_assets) # 观测噪声
# 投资组合参数
self.risk_aversion = 5.0
self.transaction_cost = 0.001
# 历史数据
self.return_history = []
self.weight_history = []
self.portfolio_return_history = []
def predict(self):
"""预测步骤"""
self.x = self.F @ self.x
self.P = self.F @ self.P @ self.F.T + self.Q
def update(self, returns):
"""更新步骤"""
returns = np.array(returns).reshape(-1, 1)
# 标准卡尔曼更新
y = returns - self.H @ self.x
S = self.H @ self.P @ self.H.T + self.R
K = self.P @ self.H.T @ linalg.inv(S)
self.x = self.x + K @ y
self.P = (np.eye(self.dim_x) - K @ self.H) @ self.P
# 保存历史
self.return_history.append(returns.flatten())
def estimate_covariance(self, window=50):
"""估计协方差矩阵"""
if len(self.return_history) < window:
return self.R
recent_returns = np.array(self.return_history[-window:])
return np.cov(recent_returns.T)
def optimize_portfolio(self, prev_weights=None):
"""
优化投资组合权重
参数:
prev_weights: 前期权重,用于计算交易成本
"""
mu = self.x.flatten() # 预期收益率
Sigma = self.estimate_covariance() # 协方差矩阵
if prev_weights is None:
prev_weights = np.ones(self.n_assets) / self.n_assets
# 目标函数:最大化效用 = 预期收益 - 风险惩罚 - 交易成本
def objective(w):
portfolio_return = w @ mu
portfolio_variance = w @ Sigma @ w
transaction_costs = self.transaction_cost * np.sum(np.abs(w - prev_weights))
utility = portfolio_return - 0.5 * self.risk_aversion * portfolio_variance - transaction_costs
return -utility # 最小化负效用
# 约束:权重和为1
constraints = [{'type': 'eq', 'fun': lambda w: np.sum(w) - 1}]
# 边界:无卖空约束
bounds = [(0, 1) for _ in range(self.n_assets)]
# 初始猜测
x0 = prev_weights
# 优化
result = minimize(objective, x0, method='SLSQP',
bounds=bounds, constraints=constraints)
if result.success:
return result.x
else:
return prev_weights # 如果优化失败,保持前期权重
def calculate_portfolio_metrics(self, weights, returns):
"""计算投资组合指标"""
portfolio_return = weights @ returns
if len(self.return_history) > 1:
recent_returns = np.array(self.return_history[-252:]) # 最近一年
portfolio_returns = recent_returns @ weights
metrics = {
'return': portfolio_return,
'volatility': np.std(portfolio_returns) * np.sqrt(252),
'sharpe_ratio': np.mean(portfolio_returns) / np.std(portfolio_returns) * np.sqrt(252) if np.std(portfolio_returns) > 0 else 0,
'max_drawdown': self._calculate_max_drawdown(portfolio_returns)
}
else:
metrics = {
'return': portfolio_return,
'volatility': 0,
'sharpe_ratio': 0,
'max_drawdown': 0
}
return metrics
def _calculate_max_drawdown(self, returns):
"""计算最大回撤"""
cumulative = np.cumprod(1 + returns)
running_max = np.maximum.accumulate(cumulative)
drawdown = (cumulative - running_max) / running_max
return np.min(drawdown)
def portfolio_management_example():
"""
动态投资组合管理示例
"""
# 设置
n_assets = 4
asset_names = ['股票A', '股票B', '债券A', '商品A']
# 创建动态投资组合管理器
portfolio = DynamicPortfolioKF(n_assets)
# 模拟市场数据
np.random.seed(42)
T = 252 * 2 # 2年数据
# 真实的预期收益率(时变)
true_mu = np.array([0.08, 0.10, 0.04, 0.06]) / 252 # 日化收益率
# 模拟收益率数据
correlation_matrix = np.array([
[1.0, 0.6, -0.2, 0.3],
[0.6, 1.0, -0.1, 0.4],
[-0.2, -0.1, 1.0, -0.1],
[0.3, 0.4, -0.1, 1.0]
])
volatilities = np.array([0.20, 0.25, 0.08, 0.30]) / np.sqrt(252) # 日化波动率
covariance_matrix = np.outer(volatilities, volatilities) * correlation_matrix
# 生成收益率序列
all_returns = []
weights_history = []
portfolio_values = []
portfolio_value = 100000 # 初始投资组合价值
current_weights = np.ones(n_assets) / n_assets # 等权重开始
for t in range(T):
# 生成当日收益率
if t < T // 3:
mu_t = true_mu
elif t < 2 * T // 3:
mu_t = true_mu * 1.5 # 牛市
else:
mu_t = true_mu * 0.5 # 熊市
daily_returns = np.random.multivariate_normal(mu_t, covariance_matrix)
all_returns.append(daily_returns)
# 更新卡尔曼滤波器
portfolio.predict()
portfolio.update(daily_returns)
# 每10天重新平衡
if t % 10 == 0 and t > 20: # 有足够历史数据后开始
new_weights = portfolio.optimize_portfolio(current_weights)
current_weights = new_weights
weights_history.append(current_weights.copy())
# 计算投资组合收益
portfolio_return = current_weights @ daily_returns
portfolio_value *= (1 + portfolio_return)
portfolio_values.append(portfolio_value)
# 保存投资组合指标
portfolio.weight_history.append(current_weights.copy())
portfolio.portfolio_return_history.append(portfolio_return)
# 基准:等权重投资组合
benchmark_values = []
benchmark_value = 100000
benchmark_weights = np.ones(n_assets) / n_assets
for daily_returns in all_returns:
benchmark_return = benchmark_weights @ daily_returns
benchmark_value *= (1 + benchmark_return)
benchmark_values.append(benchmark_value)
# 转换为数组
all_returns = np.array(all_returns)
weights_history = np.array(weights_history)
# 绘制结果
fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(16, 12))
# 投资组合价值
ax1.plot(portfolio_values, 'b-', linewidth=2, label='动态投资组合')
ax1.plot(benchmark_values, 'r--', linewidth=2, label='等权重基准')
ax1.set_ylabel('投资组合价值')
ax1.set_title('投资组合价值对比')
ax1.legend()
ax1.grid(True, alpha=0.3)
# 权重变化
colors = ['blue', 'red', 'green', 'orange']
for i, (name, color) in enumerate(zip(asset_names, colors)):
ax2.plot(weights_history[:, i], color=color, linewidth=2, label=name)
ax2.set_ylabel('权重')
ax2.set_title('动态权重配置')
ax2.legend()
ax2.grid(True, alpha=0.3)
# 预期收益率估计
mu_estimates = np.array([est.flatten() for est in portfolio.return_history[1:]])
for i, (name, color) in enumerate(zip(asset_names, colors)):
ax3.plot(mu_estimates[:, i] * 252, color=color, linewidth=1, alpha=0.7, label=f'{name} 估计')
ax3.axhline(y=true_mu[i] * 252, color=color, linestyle='--', alpha=0.5)
ax3.set_ylabel('年化预期收益率')
ax3.set_title('预期收益率估计')
ax3.legend()
ax3.grid(True, alpha=0.3)
# 滚动夏普比率
window = 60
rolling_sharpe = []
benchmark_sharpe = []
for i in range(window, len(portfolio.portfolio_return_history)):
# 动态投资组合
port_returns = portfolio.portfolio_return_history[i-window:i]
if np.std(port_returns) > 0:
sharpe = np.mean(port_returns) / np.std(port_returns) * np.sqrt(252)
else:
sharpe = 0
rolling_sharpe.append(sharpe)
# 基准
bench_returns = [benchmark_weights @ all_returns[j] for j in range(i-window, i)]
if np.std(bench_returns) > 0:
bench_sharpe = np.mean(bench_returns) / np.std(bench_returns) * np.sqrt(252)
else:
bench_sharpe = 0
benchmark_sharpe.append(bench_sharpe)
ax4.plot(rolling_sharpe, 'b-', linewidth=2, label='动态投资组合')
ax4.plot(benchmark_sharpe, 'r--', linewidth=2, label='等权重基准')
ax4.set_xlabel('时间')
ax4.set_ylabel('滚动夏普比率')
ax4.set_title(f'{window}天滚动夏普比率')
ax4.legend()
ax4.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
# 性能总结
final_dynamic_value = portfolio_values[-1]
final_benchmark_value = benchmark_values[-1]
dynamic_returns = np.array(portfolio.portfolio_return_history)
benchmark_returns = [benchmark_weights @ ret for ret in all_returns]
print("投资组合性能比较:")
print("-" * 50)
print(f"动态投资组合:")
print(f" 最终价值: ${final_dynamic_value:,.0f}")
print(f" 总收益率: {(final_dynamic_value / 100000 - 1) * 100:.2f}%")
print(f" 年化收益率: {(final_dynamic_value / 100000) ** (252 / T) - 1:.3f}")
print(f" 年化波动率: {np.std(dynamic_returns) * np.sqrt(252):.3f}")
print(f" 夏普比率: {np.mean(dynamic_returns) / np.std(dynamic_returns) * np.sqrt(252):.3f}")
print(f"\n等权重基准:")
print(f" 最终价值: ${final_benchmark_value:,.0f}")
print(f" 总收益率: {(final_benchmark_value / 100000 - 1) * 100:.2f}%")
print(f" 年化收益率: {(final_benchmark_value / 100000) ** (252 / T) - 1:.3f}")
print(f" 年化波动率: {np.std(benchmark_returns) * np.sqrt(252):.3f}")
print(f" 夏普比率: {np.mean(benchmark_returns) / np.std(benchmark_returns) * np.sqrt(252):.3f}")
print(f"\n超额收益: ${final_dynamic_value - final_benchmark_value:,.0f}")
# 运行示例
# portfolio_management_example()