第12章:卡尔曼滤波在投资组合管理中的应用
10/2/25About 14 min
第12章:卡尔曼滤波在投资组合管理中的应用
学习目标
- 掌握动态投资组合优化的状态空间建模
- 学会风险因子的动态估计和跟踪
- 理解资产配置中的动态调整策略
动态投资组合优化框架
状态空间投资组合模型
传统的投资组合理论假设收益率和风险参数是恒定的,但现实中这些参数是时变的。卡尔曼滤波为动态投资组合管理提供了强大的工具。
import numpy as np
import matplotlib.pyplot as plt
from scipy.optimize import minimize
from scipy import linalg
import pandas as pd
import warnings
warnings.filterwarnings('ignore')
class DynamicPortfolioKF:
"""
动态投资组合卡尔曼滤波器
"""
def __init__(self, n_assets):
self.n_assets = n_assets
# 状态:[预期收益率, 风险因子载荷, 特异性风险]
# 简化模型:每个资产的预期收益率
self.dim_x = n_assets
# 状态向量:预期收益率
self.x = np.zeros((self.dim_x, 1))
self.P = np.eye(self.dim_x)
# 系统矩阵
self.F = 0.99 * np.eye(self.dim_x) # 收益率均值回归
self.H = np.eye(self.n_assets) # 直接观测收益率
# 噪声协方差
self.Q = 0.001 * np.eye(self.dim_x) # 预期收益率变化
self.R = 0.01 * np.eye(self.n_assets) # 观测噪声
# 投资组合参数
self.risk_aversion = 5.0
self.transaction_cost = 0.001
# 历史数据
self.return_history = []
self.weight_history = []
self.portfolio_return_history = []
def predict(self):
"""预测步骤"""
self.x = self.F @ self.x
self.P = self.F @ self.P @ self.F.T + self.Q
def update(self, returns):
"""更新步骤"""
returns = np.array(returns).reshape(-1, 1)
# 标准卡尔曼更新
y = returns - self.H @ self.x
S = self.H @ self.P @ self.H.T + self.R
K = self.P @ self.H.T @ linalg.inv(S)
self.x = self.x + K @ y
self.P = (np.eye(self.dim_x) - K @ self.H) @ self.P
# 保存历史
self.return_history.append(returns.flatten())
def estimate_covariance(self, window=50):
"""估计协方差矩阵"""
if len(self.return_history) < window:
return self.R
recent_returns = np.array(self.return_history[-window:])
return np.cov(recent_returns.T)
def optimize_portfolio(self, prev_weights=None):
"""
优化投资组合权重
参数:
prev_weights: 前期权重,用于计算交易成本
"""
mu = self.x.flatten() # 预期收益率
Sigma = self.estimate_covariance() # 协方差矩阵
if prev_weights is None:
prev_weights = np.ones(self.n_assets) / self.n_assets
# 目标函数:最大化效用 = 预期收益 - 风险惩罚 - 交易成本
def objective(w):
portfolio_return = w @ mu
portfolio_variance = w @ Sigma @ w
transaction_costs = self.transaction_cost * np.sum(np.abs(w - prev_weights))
utility = portfolio_return - 0.5 * self.risk_aversion * portfolio_variance - transaction_costs
return -utility # 最小化负效用
# 约束:权重和为1
constraints = [{'type': 'eq', 'fun': lambda w: np.sum(w) - 1}]
# 边界:无卖空约束
bounds = [(0, 1) for _ in range(self.n_assets)]
# 初始猜测
x0 = prev_weights
# 优化
result = minimize(objective, x0, method='SLSQP',
bounds=bounds, constraints=constraints)
if result.success:
return result.x
else:
return prev_weights # 如果优化失败,保持前期权重
def calculate_portfolio_metrics(self, weights, returns):
"""计算投资组合指标"""
portfolio_return = weights @ returns
if len(self.return_history) > 1:
recent_returns = np.array(self.return_history[-252:]) # 最近一年
portfolio_returns = recent_returns @ weights
metrics = {
'return': portfolio_return,
'volatility': np.std(portfolio_returns) * np.sqrt(252),
'sharpe_ratio': np.mean(portfolio_returns) / np.std(portfolio_returns) * np.sqrt(252) if np.std(portfolio_returns) > 0 else 0,
'max_drawdown': self._calculate_max_drawdown(portfolio_returns)
}
else:
metrics = {
'return': portfolio_return,
'volatility': 0,
'sharpe_ratio': 0,
'max_drawdown': 0
}
return metrics
def _calculate_max_drawdown(self, returns):
"""计算最大回撤"""
cumulative = np.cumprod(1 + returns)
running_max = np.maximum.accumulate(cumulative)
drawdown = (cumulative - running_max) / running_max
return np.min(drawdown)
def portfolio_management_example():
"""
动态投资组合管理示例
"""
# 设置
n_assets = 4
asset_names = ['股票A', '股票B', '债券A', '商品A']
# 创建动态投资组合管理器
portfolio = DynamicPortfolioKF(n_assets)
# 模拟市场数据
np.random.seed(42)
T = 252 * 2 # 2年数据
# 真实的预期收益率(时变)
true_mu = np.array([0.08, 0.10, 0.04, 0.06]) / 252 # 日化收益率
# 模拟收益率数据
correlation_matrix = np.array([
[1.0, 0.6, -0.2, 0.3],
[0.6, 1.0, -0.1, 0.4],
[-0.2, -0.1, 1.0, -0.1],
[0.3, 0.4, -0.1, 1.0]
])
volatilities = np.array([0.20, 0.25, 0.08, 0.30]) / np.sqrt(252) # 日化波动率
covariance_matrix = np.outer(volatilities, volatilities) * correlation_matrix
# 生成收益率序列
all_returns = []
weights_history = []
portfolio_values = []
portfolio_value = 100000 # 初始投资组合价值
current_weights = np.ones(n_assets) / n_assets # 等权重开始
for t in range(T):
# 生成当日收益率
if t < T // 3:
mu_t = true_mu
elif t < 2 * T // 3:
mu_t = true_mu * 1.5 # 牛市
else:
mu_t = true_mu * 0.5 # 熊市
daily_returns = np.random.multivariate_normal(mu_t, covariance_matrix)
all_returns.append(daily_returns)
# 更新卡尔曼滤波器
portfolio.predict()
portfolio.update(daily_returns)
# 每10天重新平衡
if t % 10 == 0 and t > 20: # 有足够历史数据后开始
new_weights = portfolio.optimize_portfolio(current_weights)
current_weights = new_weights
weights_history.append(current_weights.copy())
# 计算投资组合收益
portfolio_return = current_weights @ daily_returns
portfolio_value *= (1 + portfolio_return)
portfolio_values.append(portfolio_value)
# 保存投资组合指标
portfolio.weight_history.append(current_weights.copy())
portfolio.portfolio_return_history.append(portfolio_return)
# 基准:等权重投资组合
benchmark_values = []
benchmark_value = 100000
benchmark_weights = np.ones(n_assets) / n_assets
for daily_returns in all_returns:
benchmark_return = benchmark_weights @ daily_returns
benchmark_value *= (1 + benchmark_return)
benchmark_values.append(benchmark_value)
# 转换为数组
all_returns = np.array(all_returns)
weights_history = np.array(weights_history)
# 绘制结果
fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(16, 12))
# 投资组合价值
ax1.plot(portfolio_values, 'b-', linewidth=2, label='动态投资组合')
ax1.plot(benchmark_values, 'r--', linewidth=2, label='等权重基准')
ax1.set_ylabel('投资组合价值')
ax1.set_title('投资组合价值对比')
ax1.legend()
ax1.grid(True, alpha=0.3)
# 权重变化
colors = ['blue', 'red', 'green', 'orange']
for i, (name, color) in enumerate(zip(asset_names, colors)):
ax2.plot(weights_history[:, i], color=color, linewidth=2, label=name)
ax2.set_ylabel('权重')
ax2.set_title('动态权重配置')
ax2.legend()
ax2.grid(True, alpha=0.3)
# 预期收益率估计
mu_estimates = np.array([est.flatten() for est in portfolio.return_history[1:]])
for i, (name, color) in enumerate(zip(asset_names, colors)):
ax3.plot(mu_estimates[:, i] * 252, color=color, linewidth=1, alpha=0.7, label=f'{name} 估计')
ax3.axhline(y=true_mu[i] * 252, color=color, linestyle='--', alpha=0.5)
ax3.set_ylabel('年化预期收益率')
ax3.set_title('预期收益率估计')
ax3.legend()
ax3.grid(True, alpha=0.3)
# 滚动夏普比率
window = 60
rolling_sharpe = []
benchmark_sharpe = []
for i in range(window, len(portfolio.portfolio_return_history)):
# 动态投资组合
port_returns = portfolio.portfolio_return_history[i-window:i]
if np.std(port_returns) > 0:
sharpe = np.mean(port_returns) / np.std(port_returns) * np.sqrt(252)
else:
sharpe = 0
rolling_sharpe.append(sharpe)
# 基准
bench_returns = [benchmark_weights @ all_returns[j] for j in range(i-window, i)]
if np.std(bench_returns) > 0:
bench_sharpe = np.mean(bench_returns) / np.std(bench_returns) * np.sqrt(252)
else:
bench_sharpe = 0
benchmark_sharpe.append(bench_sharpe)
ax4.plot(rolling_sharpe, 'b-', linewidth=2, label='动态投资组合')
ax4.plot(benchmark_sharpe, 'r--', linewidth=2, label='等权重基准')
ax4.set_xlabel('时间')
ax4.set_ylabel('滚动夏普比率')
ax4.set_title(f'{window}天滚动夏普比率')
ax4.legend()
ax4.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
# 性能总结
final_dynamic_value = portfolio_values[-1]
final_benchmark_value = benchmark_values[-1]
dynamic_returns = np.array(portfolio.portfolio_return_history)
benchmark_returns = [benchmark_weights @ ret for ret in all_returns]
print("投资组合性能比较:")
print("-" * 50)
print(f"动态投资组合:")
print(f" 最终价值: ${final_dynamic_value:,.0f}")
print(f" 总收益率: {(final_dynamic_value / 100000 - 1) * 100:.2f}%")
print(f" 年化收益率: {(final_dynamic_value / 100000) ** (252 / T) - 1:.3f}")
print(f" 年化波动率: {np.std(dynamic_returns) * np.sqrt(252):.3f}")
print(f" 夏普比率: {np.mean(dynamic_returns) / np.std(dynamic_returns) * np.sqrt(252):.3f}")
print(f"\n等权重基准:")
print(f" 最终价值: ${final_benchmark_value:,.0f}")
print(f" 总收益率: {(final_benchmark_value / 100000 - 1) * 100:.2f}%")
print(f" 年化收益率: {(final_benchmark_value / 100000) ** (252 / T) - 1:.3f}")
print(f" 年化波动率: {np.std(benchmark_returns) * np.sqrt(252):.3f}")
print(f" 夏普比率: {np.mean(benchmark_returns) / np.std(benchmark_returns) * np.sqrt(252):.3f}")
print(f"\n超额收益: ${final_dynamic_value - final_benchmark_value:,.0f}")
# 运行示例
# portfolio_management_example()风险因子建模
多因子模型的动态估计
class DynamicFactorModel:
"""
动态因子模型:Fama-French + 动态调整
"""
def __init__(self, n_assets, n_factors=3):
self.n_assets = n_assets
self.n_factors = n_factors
# 状态:每个资产的因子载荷 [alpha, beta_market, beta_size, beta_value, ...]
self.dim_x = n_assets * (n_factors + 1) # +1 for alpha
# 状态向量组织:[asset1_params, asset2_params, ...]
# asset_params = [alpha, beta_market, beta_size, beta_value]
self.x = np.random.normal(0, 0.1, (self.dim_x, 1))
self.P = 0.1 * np.eye(self.dim_x)
# 状态转移(随机游走 + 小幅均值回归)
self.F = 0.99 * np.eye(self.dim_x)
self.Q = 1e-5 * np.eye(self.dim_x)
# 观测噪声(特异性风险)
self.R = 0.01 * np.eye(n_assets)
# 因子历史
self.factor_history = []
self.asset_returns_history = []
def update_factors(self, asset_returns, factor_returns):
"""
更新因子模型
参数:
asset_returns: 资产收益率向量 (n_assets,)
factor_returns: 因子收益率向量 (n_factors,)
"""
asset_returns = np.array(asset_returns).reshape(-1, 1)
factor_returns = np.array(factor_returns)
# 构建观测矩阵
H = np.zeros((self.n_assets, self.dim_x))
for i in range(self.n_assets):
start_idx = i * (self.n_factors + 1)
end_idx = (i + 1) * (self.n_factors + 1)
# [1, factor1, factor2, factor3] for asset i
H[i, start_idx:end_idx] = np.concatenate([[1], factor_returns])
# 预测
self.x = self.F @ self.x
self.P = self.F @ self.P @ self.F.T + self.Q
# 更新
y = asset_returns - H @ self.x
S = H @ self.P @ H.T + self.R
K = self.P @ H.T @ linalg.inv(S)
self.x = self.x + K @ y
self.P = (np.eye(self.dim_x) - K @ H) @ self.P
# 保存历史
self.factor_history.append(factor_returns.copy())
self.asset_returns_history.append(asset_returns.flatten())
def get_factor_loadings(self, asset_idx):
"""获取资产的因子载荷"""
start_idx = asset_idx * (self.n_factors + 1)
end_idx = (asset_idx + 1) * (self.n_factors + 1)
return self.x[start_idx:end_idx].flatten()
def predict_returns(self, factor_forecasts):
"""
基于因子预测预测资产收益率
参数:
factor_forecasts: 因子收益率预测
"""
predictions = []
for i in range(self.n_assets):
loadings = self.get_factor_loadings(i)
alpha = loadings[0]
betas = loadings[1:]
predicted_return = alpha + betas @ factor_forecasts
predictions.append(predicted_return)
return np.array(predictions)
def calculate_factor_contributions(self, asset_idx, factor_returns):
"""计算因子对收益的贡献"""
loadings = self.get_factor_loadings(asset_idx)
alpha = loadings[0]
betas = loadings[1:]
contributions = {
'alpha': alpha,
'market': betas[0] * factor_returns[0] if len(factor_returns) > 0 else 0,
'size': betas[1] * factor_returns[1] if len(factor_returns) > 1 else 0,
'value': betas[2] * factor_returns[2] if len(factor_returns) > 2 else 0
}
return contributions
def dynamic_factor_model_example():
"""
动态因子模型示例
"""
# 设置
n_assets = 5
n_factors = 3
asset_names = ['大盘股A', '小盘股B', '价值股C', '成长股D', '中性股E']
factor_names = ['市场因子', '规模因子', '价值因子']
# 创建动态因子模型
dfm = DynamicFactorModel(n_assets, n_factors)
# 模拟因子收益率和资产收益率
np.random.seed(42)
T = 252
# 真实因子载荷(用于生成数据)
true_loadings = {
0: [0.001, 1.2, -0.3, 0.1], # 大盘股:高市场beta,负规模因子
1: [0.002, 0.8, 0.5, -0.2], # 小盘股:正规模因子
2: [0.0005, 1.0, 0.1, 0.6], # 价值股:高价值因子
3: [0.003, 1.1, 0.2, -0.4], # 成长股:负价值因子
4: [0.001, 1.0, 0.0, 0.0] # 中性股:接近市场
}
# 生成数据
factor_returns_history = []
asset_returns_history = []
factor_loadings_history = {i: [] for i in range(n_assets)}
for t in range(T):
# 生成因子收益率
if t < T // 2:
# 前半期:正常市场
factor_returns = np.random.multivariate_normal(
[0.0004, 0.0, 0.0], # 市场、规模、价值因子
[[0.0001, 0, 0], [0, 0.00005, 0], [0, 0, 0.00003]]
)
else:
# 后半期:市场压力
factor_returns = np.random.multivariate_normal(
[-0.001, 0.001, -0.0005], # 市场下跌,小盘股抗跌,价值股受挫
[[0.0004, 0, 0], [0, 0.0001, 0], [0, 0, 0.00008]]
)
factor_returns_history.append(factor_returns)
# 生成资产收益率
asset_returns = []
for i in range(n_assets):
loadings = true_loadings[i]
asset_return = (loadings[0] +
loadings[1] * factor_returns[0] +
loadings[2] * factor_returns[1] +
loadings[3] * factor_returns[2] +
np.random.normal(0, 0.01)) # 特异性风险
asset_returns.append(asset_return)
asset_returns_history.append(asset_returns)
# 更新动态因子模型
dfm.update_factors(asset_returns, factor_returns)
# 记录估计的因子载荷
for i in range(n_assets):
estimated_loadings = dfm.get_factor_loadings(i)
factor_loadings_history[i].append(estimated_loadings.copy())
# 转换为数组
factor_returns_history = np.array(factor_returns_history)
asset_returns_history = np.array(asset_returns_history)
# 绘制结果
fig, axes = plt.subplots(3, 2, figsize=(16, 18))
# 因子收益率时间序列
for i, factor_name in enumerate(factor_names):
axes[0, 0].plot(factor_returns_history[:, i], label=factor_name)
axes[0, 0].set_title('因子收益率时间序列')
axes[0, 0].legend()
axes[0, 0].grid(True, alpha=0.3)
# 资产收益率时间序列
for i, asset_name in enumerate(asset_names):
axes[0, 1].plot(asset_returns_history[:, i], label=asset_name, alpha=0.7)
axes[0, 1].set_title('资产收益率时间序列')
axes[0, 1].legend()
axes[0, 1].grid(True, alpha=0.3)
# 市场beta估计
for i, asset_name in enumerate(asset_names):
market_betas = [loadings[1] for loadings in factor_loadings_history[i]]
axes[1, 0].plot(market_betas, label=f'{asset_name}', linewidth=2)
axes[1, 0].axhline(y=true_loadings[i][1], color=f'C{i}', linestyle='--', alpha=0.5)
axes[1, 0].set_title('市场Beta估计(虚线为真实值)')
axes[1, 0].legend()
axes[1, 0].grid(True, alpha=0.3)
# 规模因子载荷估计
for i, asset_name in enumerate(asset_names):
size_loadings = [loadings[2] for loadings in factor_loadings_history[i]]
axes[1, 1].plot(size_loadings, label=f'{asset_name}', linewidth=2)
axes[1, 1].axhline(y=true_loadings[i][2], color=f'C{i}', linestyle='--', alpha=0.5)
axes[1, 1].set_title('规模因子载荷估计')
axes[1, 1].legend()
axes[1, 1].grid(True, alpha=0.3)
# 价值因子载荷估计
for i, asset_name in enumerate(asset_names):
value_loadings = [loadings[3] for loadings in factor_loadings_history[i]]
axes[2, 0].plot(value_loadings, label=f'{asset_name}', linewidth=2)
axes[2, 0].axhline(y=true_loadings[i][3], color=f'C{i}', linestyle='--', alpha=0.5)
axes[2, 0].set_title('价值因子载荷估计')
axes[2, 0].legend()
axes[2, 0].grid(True, alpha=0.3)
# 因子贡献分解(最后一期)
last_factor_returns = factor_returns_history[-1]
contributions_data = []
for i, asset_name in enumerate(asset_names):
contributions = dfm.calculate_factor_contributions(i, last_factor_returns)
contributions_data.append([
contributions['alpha'],
contributions['market'],
contributions['size'],
contributions['value']
])
contributions_data = np.array(contributions_data)
# 堆叠柱状图
bottom = np.zeros(n_assets)
colors = ['gray', 'blue', 'green', 'red']
labels = ['Alpha', '市场', '规模', '价值']
for j, (color, label) in enumerate(zip(colors, labels)):
axes[2, 1].bar(asset_names, contributions_data[:, j], bottom=bottom,
color=color, alpha=0.7, label=label)
bottom += contributions_data[:, j]
axes[2, 1].set_title('最后一期因子收益贡献分解')
axes[2, 1].legend()
axes[2, 1].tick_params(axis='x', rotation=45)
plt.tight_layout()
plt.show()
# 模型拟合度评估
print("动态因子模型拟合度评估:")
print("-" * 50)
for i, asset_name in enumerate(asset_names):
# 计算R²
actual_returns = asset_returns_history[:, i]
# 用模型预测收益率
predicted_returns = []
for t in range(len(factor_returns_history)):
if t < len(factor_loadings_history[i]):
loadings = factor_loadings_history[i][t]
pred = (loadings[0] +
loadings[1] * factor_returns_history[t, 0] +
loadings[2] * factor_returns_history[t, 1] +
loadings[3] * factor_returns_history[t, 2])
predicted_returns.append(pred)
if len(predicted_returns) == len(actual_returns):
r_squared = 1 - np.var(actual_returns - predicted_returns) / np.var(actual_returns)
rmse = np.sqrt(np.mean((actual_returns - predicted_returns)**2))
print(f"{asset_name}:")
print(f" R²: {r_squared:.3f}")
print(f" RMSE: {rmse:.5f}")
# 最终估计的载荷 vs 真实载荷
final_loadings = factor_loadings_history[i][-1]
true_vals = true_loadings[i]
print(f" 载荷估计 vs 真实:")
print(f" Alpha: {final_loadings[0]:.4f} vs {true_vals[0]:.4f}")
print(f" Market: {final_loadings[1]:.3f} vs {true_vals[1]:.3f}")
print(f" Size: {final_loadings[2]:.3f} vs {true_vals[2]:.3f}")
print(f" Value: {final_loadings[3]:.3f} vs {true_vals[3]:.3f}")
print()
# 运行示例
# dynamic_factor_model_example()风险预算模型
基于卡尔曼滤波的风险预算
class RiskBudgetingKF:
"""
基于卡尔曼滤波的风险预算模型
"""
def __init__(self, n_assets):
self.n_assets = n_assets
# 状态:资产的风险贡献率
self.dim_x = n_assets
self.x = np.ones((self.dim_x, 1)) / self.n_assets # 初始等风险贡献
self.P = 0.01 * np.eye(self.dim_x)
# 系统模型
self.F = np.eye(self.dim_x) # 风险贡献率的随机游走
self.Q = 0.0001 * np.eye(self.dim_x) # 小幅变动
# 目标风险预算
self.target_risk_budget = np.ones(n_assets) / n_assets # 等风险预算
# 历史数据
self.returns_history = []
self.weights_history = []
self.risk_contributions_history = []
def update_risk_model(self, returns, weights):
"""
更新风险模型
参数:
returns: 资产收益率
weights: 当前投资组合权重
"""
returns = np.array(returns)
weights = np.array(weights)
# 计算实际风险贡献
if len(self.returns_history) >= 20: # 需要足够的历史数据
recent_returns = np.array(self.returns_history[-20:])
cov_matrix = np.cov(recent_returns.T)
# 计算边际风险贡献
portfolio_var = weights @ cov_matrix @ weights
marginal_contributions = cov_matrix @ weights
# 风险贡献 = 权重 × 边际贡献 / 组合方差
if portfolio_var > 0:
risk_contributions = weights * marginal_contributions / portfolio_var
risk_contributions = np.abs(risk_contributions) # 确保非负
risk_contributions /= np.sum(risk_contributions) # 归一化
else:
risk_contributions = weights / np.sum(weights)
else:
risk_contributions = weights / np.sum(weights)
# 观测方程:观测实际风险贡献
H = np.eye(self.n_assets)
R = 0.01 * np.eye(self.n_assets)
# 卡尔曼滤波更新
self.x = self.F @ self.x
self.P = self.F @ self.P @ self.F.T + self.Q
z = risk_contributions.reshape(-1, 1)
y = z - H @ self.x
S = H @ self.P @ H.T + R
K = self.P @ H.T @ linalg.inv(S)
self.x = self.x + K @ y
self.P = (np.eye(self.dim_x) - K @ H) @ self.P
# 保存历史
self.returns_history.append(returns)
self.weights_history.append(weights)
self.risk_contributions_history.append(risk_contributions)
def optimize_risk_budget_weights(self, expected_returns, cov_matrix):
"""
基于风险预算优化权重
参数:
expected_returns: 预期收益率
cov_matrix: 协方差矩阵
"""
# 目标:找到权重使得风险贡献接近目标预算
def risk_budget_objective(weights):
weights = np.abs(weights)
weights /= np.sum(weights) # 归一化
# 计算风险贡献
portfolio_var = weights @ cov_matrix @ weights
if portfolio_var <= 0:
return 1e10
marginal_contributions = cov_matrix @ weights
risk_contributions = weights * marginal_contributions / portfolio_var
risk_contributions = np.abs(risk_contributions)
risk_contributions /= np.sum(risk_contributions)
# 目标:最小化与目标风险预算的差异
budget_error = np.sum((risk_contributions - self.target_risk_budget)**2)
# 加入收益约束(可选)
expected_portfolio_return = weights @ expected_returns
return_penalty = max(0, 0.08/252 - expected_portfolio_return) * 1000
return budget_error + return_penalty
# 约束和边界
constraints = [{'type': 'eq', 'fun': lambda w: np.sum(np.abs(w)) - 1}]
bounds = [(0, 1) for _ in range(self.n_assets)]
# 初始猜测
x0 = np.ones(self.n_assets) / self.n_assets
# 优化
from scipy.optimize import minimize
result = minimize(risk_budget_objective, x0, method='SLSQP',
bounds=bounds, constraints=constraints)
if result.success:
optimal_weights = np.abs(result.x)
optimal_weights /= np.sum(optimal_weights)
return optimal_weights
else:
return x0
def get_current_risk_budget(self):
"""获取当前估计的风险预算"""
return self.x.flatten()
def risk_budgeting_example():
"""
风险预算示例
"""
# 设置
n_assets = 4
asset_names = ['股票', '债券', '房地产', '商品']
# 创建风险预算模型
rb = RiskBudgetingKF(n_assets)
# 设置目标风险预算
rb.target_risk_budget = np.array([0.4, 0.3, 0.2, 0.1]) # 股票承担40%风险
# 模拟数据
np.random.seed(42)
T = 252
# 资产特征
expected_returns = np.array([0.08, 0.04, 0.06, 0.05]) / 252 # 日化
volatilities = np.array([0.20, 0.05, 0.15, 0.25]) / np.sqrt(252)
# 相关性矩阵
correlation = np.array([
[1.0, -0.2, 0.6, 0.3],
[-0.2, 1.0, 0.1, -0.1],
[0.6, 0.1, 1.0, 0.2],
[0.3, -0.1, 0.2, 1.0]
])
# 协方差矩阵
cov_matrix = np.outer(volatilities, volatilities) * correlation
# 初始权重(等权重)
current_weights = np.ones(n_assets) / n_assets
# 存储结果
all_returns = []
all_weights = []
portfolio_values = []
risk_budget_evolution = []
actual_risk_contributions = []
portfolio_value = 100000
for t in range(T):
# 生成收益率
daily_returns = np.random.multivariate_normal(expected_returns, cov_matrix)
all_returns.append(daily_returns)
# 更新风险模型
rb.update_risk_model(daily_returns, current_weights)
# 每20天重新平衡
if t % 20 == 0 and t > 40:
# 更新协方差矩阵估计
recent_returns = np.array(all_returns[-40:])
updated_cov = np.cov(recent_returns.T)
# 优化权重
new_weights = rb.optimize_risk_budget_weights(expected_returns, updated_cov)
current_weights = new_weights
all_weights.append(current_weights.copy())
# 计算投资组合收益
portfolio_return = current_weights @ daily_returns
portfolio_value *= (1 + portfolio_return)
portfolio_values.append(portfolio_value)
# 记录风险预算演化
risk_budget_evolution.append(rb.get_current_risk_budget())
# 计算实际风险贡献(如果有足够数据)
if len(all_returns) >= 20:
recent_returns = np.array(all_returns[-20:])
recent_cov = np.cov(recent_returns.T)
portfolio_var = current_weights @ recent_cov @ current_weights
if portfolio_var > 0:
marginal_contrib = recent_cov @ current_weights
risk_contrib = current_weights * marginal_contrib / portfolio_var
risk_contrib = np.abs(risk_contrib) / np.sum(np.abs(risk_contrib))
else:
risk_contrib = current_weights / np.sum(current_weights)
else:
risk_contrib = current_weights / np.sum(current_weights)
actual_risk_contributions.append(risk_contrib)
# 转换为数组
all_weights = np.array(all_weights)
risk_budget_evolution = np.array(risk_budget_evolution)
actual_risk_contributions = np.array(actual_risk_contributions)
# 绘制结果
fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(16, 12))
# 投资组合价值
ax1.plot(portfolio_values, 'b-', linewidth=2)
ax1.set_ylabel('投资组合价值')
ax1.set_title('风险预算投资组合价值')
ax1.grid(True, alpha=0.3)
# 权重演化
colors = ['blue', 'red', 'green', 'orange']
for i, (name, color) in enumerate(zip(asset_names, colors)):
ax2.plot(all_weights[:, i], color=color, linewidth=2, label=name)
ax2.set_ylabel('权重')
ax2.set_title('投资组合权重演化')
ax2.legend()
ax2.grid(True, alpha=0.3)
# 目标 vs 实际风险贡献
for i, (name, color) in enumerate(zip(asset_names, colors)):
ax3.plot(actual_risk_contributions[:, i], color=color,
linewidth=2, label=f'{name} 实际')
ax3.axhline(y=rb.target_risk_budget[i], color=color,
linestyle='--', alpha=0.7, label=f'{name} 目标')
ax3.set_ylabel('风险贡献')
ax3.set_title('风险贡献:目标 vs 实际')
ax3.legend()
ax3.grid(True, alpha=0.3)
# 风险预算跟踪误差
tracking_errors = []
for i in range(len(actual_risk_contributions)):
error = np.sqrt(np.mean((actual_risk_contributions[i] - rb.target_risk_budget)**2))
tracking_errors.append(error)
ax4.plot(tracking_errors, 'purple', linewidth=2)
ax4.set_xlabel('时间')
ax4.set_ylabel('风险预算跟踪误差 (RMSE)')
ax4.set_title('风险预算跟踪质量')
ax4.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
# 性能总结
final_value = portfolio_values[-1]
total_return = (final_value / 100000 - 1) * 100
portfolio_returns = []
for i in range(len(all_returns)):
port_ret = all_weights[i] @ all_returns[i]
portfolio_returns.append(port_ret)
annual_return = np.mean(portfolio_returns) * 252
annual_vol = np.std(portfolio_returns) * np.sqrt(252)
sharpe_ratio = annual_return / annual_vol if annual_vol > 0 else 0
print("风险预算投资组合性能:")
print("-" * 40)
print(f"最终价值: ${final_value:,.0f}")
print(f"总收益率: {total_return:.2f}%")
print(f"年化收益率: {annual_return:.3f}")
print(f"年化波动率: {annual_vol:.3f}")
print(f"夏普比率: {sharpe_ratio:.3f}")
print(f"\n风险预算跟踪质量:")
print(f"平均跟踪误差: {np.mean(tracking_errors):.4f}")
print(f"最大跟踪误差: {np.max(tracking_errors):.4f}")
print(f"\n最终风险贡献 vs 目标:")
final_contributions = actual_risk_contributions[-1]
for i, name in enumerate(asset_names):
print(f"{name}: {final_contributions[i]:.3f} vs {rb.target_risk_budget[i]:.3f}")
# 运行示例
# risk_budgeting_example()下一章预告
下一章我们将学习金融衍生品定价与风险管理,包括期权定价模型的状态空间表示、利率衍生品的动态对冲策略和信用风险的动态建模方法。
