第8章:卡尔曼滤波在金融中的基础应用
10/2/25About 10 min
第8章:卡尔曼滤波在金融中的基础应用
学习目标
- 掌握股价动态建模的状态空间方法
- 学会波动率估计和预测的卡尔曼滤波实现
- 理解利率期限结构的动态建模
金融建模的状态空间方法
在金融领域,许多重要的变量都是不可直接观测的隐含状态,需要通过市场观测数据来推断:
- 隐含状态:波动率、风险溢价、流动性、市场情绪
- 观测数据:股票价格、债券价格、期权价格、交易量
状态空间表示的优势
为什么使用状态空间模型
- 隐含因子建模:能够估计不可观测的金融变量
- 动态关系:捕捉金融变量的时变特性
- 多元建模:处理多个相关资产和因子
- 预测能力:提供未来状态的概率分布
- 实时更新:新信息到达时可即时更新估计
股价动态建模
随机游走模型
最简单的股价模型是随机游走:
状态方程:
观测方程:
import numpy as np
import matplotlib.pyplot as plt
from scipy import linalg
import pandas as pd
class StockPriceKF:
"""
股价卡尔曼滤波模型
"""
def __init__(self, initial_price=100.0):
# 状态:[log_price, drift]
self.x = np.array([[np.log(initial_price)], [0.0]])
self.P = np.array([[0.01, 0], [0, 0.001]])
# 状态转移矩阵(随机游走 + 漂移)
self.F = np.array([[1, 1], [0, 1]])
# 观测矩阵(观测指数变换后的价格)
self.H = np.array([[1, 0]])
# 过程噪声协方差
self.Q = np.array([[0.01, 0], [0, 0.0001]])
# 观测噪声协方差
self.R = np.array([[0.001]])
def predict(self):
"""预测步骤"""
self.x = self.F @ self.x
self.P = self.F @ self.P @ self.F.T + self.Q
def update(self, price):
"""更新步骤"""
z = np.array([[np.log(price)]])
# 创新
y = z - self.H @ self.x
S = self.H @ self.P @ self.H.T + self.R
# 卡尔曼增益
K = self.P @ self.H.T @ linalg.inv(S)
# 更新
self.x = self.x + K @ y
self.P = (np.eye(2) - K @ self.H) @ self.P
def get_price_estimate(self):
"""获取价格估计"""
return np.exp(self.x[0, 0])
def get_drift_estimate(self):
"""获取漂移估计"""
return self.x[1, 0]
def stock_price_filtering_example():
"""
股价滤波示例
"""
# 模拟真实股价数据
np.random.seed(42)
T = 252 # 一年交易日
true_drift = 0.0002 # 日漂移率
true_vol = 0.02 # 日波动率
prices = [100.0]
for t in range(1, T):
log_return = true_drift + true_vol * np.random.normal()
prices.append(prices[-1] * np.exp(log_return))
# 添加观测噪声
observed_prices = [p + np.random.normal(0, 0.1) for p in prices]
# 卡尔曼滤波
kf = StockPriceKF(initial_price=observed_prices[0])
estimated_prices = []
estimated_drifts = []
for price in observed_prices:
kf.predict()
kf.update(price)
estimated_prices.append(kf.get_price_estimate())
estimated_drifts.append(kf.get_drift_estimate())
# 绘制结果
plt.figure(figsize=(15, 10))
plt.subplot(3, 1, 1)
plt.plot(prices, 'g-', label='真实价格', linewidth=2)
plt.plot(observed_prices, 'r.', label='观测价格', alpha=0.7, markersize=3)
plt.plot(estimated_prices, 'b-', label='卡尔曼估计', linewidth=2)
plt.legend()
plt.title('股价卡尔曼滤波')
plt.ylabel('价格')
plt.subplot(3, 1, 2)
plt.plot([true_drift * 252] * T, 'g-', label='真实年化漂移', linewidth=2)
plt.plot(np.array(estimated_drifts) * 252, 'b-', label='估计年化漂移', linewidth=2)
plt.legend()
plt.ylabel('年化漂移率')
plt.subplot(3, 1, 3)
errors = np.array(prices) - np.array(estimated_prices)
plt.plot(errors, 'r-', label='估计误差')
plt.axhline(y=0, color='k', linestyle='--', alpha=0.5)
plt.legend()
plt.xlabel('交易日')
plt.ylabel('价格误差')
plt.tight_layout()
plt.show()
return prices, observed_prices, estimated_prices, estimated_drifts
# 运行示例
# stock_price_filtering_example()时变波动率模型
股票波动率通常是时变的,可以用状态空间模型建模:
class StochasticVolatilityKF:
"""
随机波动率卡尔曼滤波模型
"""
def __init__(self, initial_price=100.0, initial_vol=0.2):
# 状态:[log_price, log_variance]
self.x = np.array([[np.log(initial_price)], [np.log(initial_vol**2)]])
self.P = np.array([[0.01, 0], [0, 0.1]])
# 模型参数
self.mu = 0.0001 # 期望收益率
self.kappa = 0.1 # 方差回归速度
self.theta = np.log(0.04) # 长期方差(对数)
self.sigma_v = 0.1 # 波动率的波动率
# 状态转移矩阵(线性化)
self.F = np.array([[1, 0], [0, 1 - self.kappa]])
# 观测矩阵
self.H = np.array([[1, 0]])
# 噪声协方差
self.Q = np.array([[0, 0], [0, self.sigma_v**2]])
self.R = np.array([[0.0001]])
def predict(self, dt=1.0):
"""预测步骤(非线性)"""
log_S = self.x[0, 0]
log_var = self.x[1, 0]
# 非线性状态转移
new_log_S = log_S + (self.mu - 0.5 * np.exp(log_var)) * dt
new_log_var = log_var + self.kappa * (self.theta - log_var) * dt
self.x = np.array([[new_log_S], [new_log_var]])
# 线性化雅可比矩阵
F = np.array([[1, -0.5 * np.exp(log_var) * dt],
[0, 1 - self.kappa * dt]])
# 协方差预测
Q_scaled = self.Q * dt
Q_scaled[0, 0] = np.exp(log_var) * dt # 价格方程的噪声
self.P = F @ self.P @ F.T + Q_scaled
def update(self, price):
"""更新步骤"""
z = np.array([[np.log(price)]])
y = z - self.H @ self.x
S = self.H @ self.P @ self.H.T + self.R
K = self.P @ self.H.T @ linalg.inv(S)
self.x = self.x + K @ y
self.P = (np.eye(2) - K @ self.H) @ self.P
def get_price_estimate(self):
return np.exp(self.x[0, 0])
def get_volatility_estimate(self):
return np.sqrt(np.exp(self.x[1, 0]))
def stochastic_volatility_example():
"""
随机波动率估计示例
"""
# 模拟Heston过程
np.random.seed(42)
T = 252
dt = 1.0
# Heston参数
S0, V0 = 100.0, 0.04
mu, kappa, theta, sigma_v = 0.05/252, 2.0, 0.04, 0.3
rho = -0.5
# 模拟路径
prices = [S0]
volatilities = [np.sqrt(V0)]
S, V = S0, V0
for t in range(1, T):
dW1 = np.random.normal() * np.sqrt(dt)
dW2 = rho * dW1 + np.sqrt(1 - rho**2) * np.random.normal() * np.sqrt(dt)
# 更新方差
V = max(V + kappa * (theta - V) * dt + sigma_v * np.sqrt(max(V, 0)) * dW2, 1e-8)
# 更新价格
S = S * np.exp((mu - 0.5 * V) * dt + np.sqrt(V) * dW1)
prices.append(S)
volatilities.append(np.sqrt(V))
# 卡尔曼滤波估计
kf = StochasticVolatilityKF(prices[0])
estimated_prices = []
estimated_vols = []
for price in prices:
kf.predict()
kf.update(price)
estimated_prices.append(kf.get_price_estimate())
estimated_vols.append(kf.get_volatility_estimate())
# 绘制结果
plt.figure(figsize=(15, 8))
plt.subplot(2, 1, 1)
plt.plot(prices, 'g-', label='真实价格', linewidth=2)
plt.plot(estimated_prices, 'b--', label='估计价格', linewidth=2)
plt.legend()
plt.title('随机波动率模型:价格和波动率估计')
plt.ylabel('价格')
plt.subplot(2, 1, 2)
plt.plot(volatilities, 'g-', label='真实波动率', linewidth=2)
plt.plot(estimated_vols, 'r--', label='估计波动率', linewidth=2)
plt.legend()
plt.xlabel('交易日')
plt.ylabel('波动率')
plt.tight_layout()
plt.show()
return prices, volatilities, estimated_prices, estimated_vols
# 运行示例
# stochastic_volatility_example()利率期限结构建模
Vasicek模型的状态空间表示
Vasicek利率模型可以表示为状态空间形式:
状态方程:
观测方程:
class VasicekTermStructureKF:
"""
Vasicek期限结构卡尔曼滤波模型
"""
def __init__(self, maturities, initial_rate=0.03):
self.maturities = np.array(maturities)
self.n_bonds = len(maturities)
# 状态:[短期利率, 长期均值, 回归速度]
self.x = np.array([[initial_rate], [initial_rate], [0.1]])
self.P = np.array([[0.0001, 0, 0],
[0, 0.0001, 0],
[0, 0, 0.01]])
# Vasicek参数
self.kappa = 0.1
self.sigma = 0.01
# 观测矩阵(零息债券价格)
self.H = self._compute_observation_matrix()
# 过程噪声
self.Q = np.array([[self.sigma**2, 0, 0],
[0, 1e-6, 0],
[0, 0, 1e-6]])
# 观测噪声
self.R = 1e-4 * np.eye(self.n_bonds)
def _compute_observation_matrix(self):
"""计算观测矩阵"""
H = np.zeros((self.n_bonds, 3))
for i, T in enumerate(self.maturities):
if T > 0:
# Vasicek债券价格公式的线性化
B_T = (1 - np.exp(-self.kappa * T)) / self.kappa
A_T = np.exp((B_T - T) * (self.kappa**2 * 0.04 - 0.5 * self.sigma**2) / self.kappa**2 -
0.25 * self.sigma**2 * B_T**2 / self.kappa)
H[i, 0] = -B_T # 对短期利率的敏感性
return H
def predict(self, dt=1.0):
"""预测步骤"""
r_t = self.x[0, 0]
theta = self.x[1, 0]
kappa = self.x[2, 0]
# 状态转移(欧拉离散化)
new_r = r_t + kappa * (theta - r_t) * dt
new_theta = theta # 假设长期均值不变
new_kappa = kappa # 假设回归速度不变
self.x = np.array([[new_r], [new_theta], [new_kappa]])
# 雅可比矩阵
F = np.array([[1 - kappa * dt, kappa * dt, (theta - r_t) * dt],
[0, 1, 0],
[0, 0, 1]])
self.P = F @ self.P @ F.T + self.Q * dt
def update(self, bond_prices):
"""更新步骤"""
z = np.log(bond_prices).reshape(-1, 1)
# 更新观测矩阵
self.H = self._compute_observation_matrix()
# 预测观测
z_pred = self.H @ self.x
y = z - z_pred
S = self.H @ self.P @ self.H.T + self.R
K = self.P @ self.H.T @ linalg.inv(S)
self.x = self.x + K @ y
self.P = (np.eye(3) - K @ self.H) @ self.P
def get_short_rate(self):
return self.x[0, 0]
def get_long_term_mean(self):
return self.x[1, 0]
def get_mean_reversion_speed(self):
return self.x[2, 0]
def term_structure_example():
"""
期限结构建模示例
"""
# 设置到期时间
maturities = [0.25, 0.5, 1, 2, 5, 10] # 年
T = 100 # 观测天数
# 模拟真实Vasicek过程
np.random.seed(42)
true_kappa, true_theta, true_sigma = 0.1, 0.04, 0.01
true_rates = [0.03]
for t in range(1, T):
dr = true_kappa * (true_theta - true_rates[-1]) * (1/252) + \
true_sigma * np.sqrt(1/252) * np.random.normal()
true_rates.append(max(true_rates[-1] + dr, 0.001))
# 生成债券价格观测
def vasicek_bond_price(r, T, kappa, theta, sigma):
B = (1 - np.exp(-kappa * T)) / kappa if kappa != 0 else T
A = np.exp((B - T) * (kappa * theta - 0.5 * sigma**2) / kappa**2 -
0.25 * sigma**2 * B**2 / kappa) if kappa != 0 else np.exp(-0.5 * sigma**2 * T**3 / 3)
return A * np.exp(-B * r)
bond_prices_series = []
for r in true_rates:
prices = []
for mat in maturities:
true_price = vasicek_bond_price(r, mat, true_kappa, true_theta, true_sigma)
noisy_price = true_price * (1 + np.random.normal(0, 0.001))
prices.append(noisy_price)
bond_prices_series.append(prices)
# 卡尔曼滤波估计
kf = VasicekTermStructureKF(maturities)
estimated_rates = []
estimated_theta = []
estimated_kappa = []
for prices in bond_prices_series:
kf.predict()
kf.update(np.array(prices))
estimated_rates.append(kf.get_short_rate())
estimated_theta.append(kf.get_long_term_mean())
estimated_kappa.append(kf.get_mean_reversion_speed())
# 绘制结果
plt.figure(figsize=(15, 12))
plt.subplot(3, 1, 1)
plt.plot(true_rates, 'g-', label='真实短期利率', linewidth=2)
plt.plot(estimated_rates, 'b--', label='估计短期利率', linewidth=2)
plt.legend()
plt.title('Vasicek期限结构模型')
plt.ylabel('利率')
plt.subplot(3, 1, 2)
plt.plot([true_theta] * T, 'g-', label='真实长期均值', linewidth=2)
plt.plot(estimated_theta, 'r--', label='估计长期均值', linewidth=2)
plt.legend()
plt.ylabel('长期均值')
plt.subplot(3, 1, 3)
plt.plot([true_kappa] * T, 'g-', label='真实回归速度', linewidth=2)
plt.plot(estimated_kappa, 'orange', label='估计回归速度', linewidth=2, linestyle='--')
plt.legend()
plt.xlabel('时间')
plt.ylabel('回归速度')
plt.tight_layout()
plt.show()
return true_rates, estimated_rates, estimated_theta, estimated_kappa
# 运行示例
# term_structure_example()风险因子建模
Fama-French三因子模型
使用卡尔曼滤波估计时变的风险因子载荷:
class FamaFrenchKF:
"""
Fama-French三因子模型的卡尔曼滤波实现
"""
def __init__(self, n_assets):
self.n_assets = n_assets
# 状态:每个资产的[alpha, beta_market, beta_size, beta_value]
self.n_factors = 4
self.dim_x = n_assets * self.n_factors
# 初始化状态(随机初始值)
self.x = np.random.normal(0, 0.1, (self.dim_x, 1))
# 初始协方差
self.P = 0.01 * np.eye(self.dim_x)
# 状态转移(随机游走 + 小幅回归)
self.F = 0.99 * np.eye(self.dim_x)
# 过程噪声
self.Q = 1e-5 * np.eye(self.dim_x)
# 观测噪声(特异性风险)
self.R = 0.01 * np.eye(n_assets)
def predict(self):
"""预测步骤"""
self.x = self.F @ self.x
self.P = self.F @ self.P @ self.F.T + self.Q
def update(self, returns, factor_returns):
"""
更新步骤
参数:
returns: 资产收益率向量 (n_assets,)
factor_returns: 因子收益率 [market, size, value] (3,)
"""
# 构建观测矩阵
factors = np.concatenate([[1.0], factor_returns]) # [1, MKT, SMB, HML]
H = np.zeros((self.n_assets, self.dim_x))
for i in range(self.n_assets):
start_idx = i * self.n_factors
end_idx = (i + 1) * self.n_factors
H[i, start_idx:end_idx] = factors
# 预测收益
y_pred = H @ self.x
# 创新
y = returns.reshape(-1, 1) - y_pred
S = H @ self.P @ H.T + self.R
# 卡尔曼增益
K = self.P @ H.T @ linalg.inv(S)
# 更新
self.x = self.x + K @ y
self.P = (np.eye(self.dim_x) - K @ H) @ self.P
def get_factor_loadings(self, asset_idx):
"""获取资产的因子载荷"""
start_idx = asset_idx * self.n_factors
end_idx = (asset_idx + 1) * self.n_factors
return self.x[start_idx:end_idx].flatten()
def fama_french_example():
"""
Fama-French模型示例
"""
# 模拟数据
np.random.seed(42)
T = 252 # 一年数据
n_assets = 5
# 真实因子载荷(时不变,用于验证)
true_loadings = {
'alpha': np.random.normal(0, 0.001, n_assets),
'beta_market': np.random.uniform(0.5, 1.5, n_assets),
'beta_size': np.random.uniform(-0.5, 0.5, n_assets),
'beta_value': np.random.uniform(-0.3, 0.3, n_assets)
}
# 模拟因子收益
factor_returns = []
asset_returns = []
for t in range(T):
# 因子收益
mkt = np.random.normal(0.0004, 0.01) # 市场因子
smb = np.random.normal(0, 0.005) # 规模因子
hml = np.random.normal(0, 0.005) # 价值因子
factors = np.array([mkt, smb, hml])
factor_returns.append(factors)
# 资产收益
returns = []
for i in range(n_assets):
ret = (true_loadings['alpha'][i] +
true_loadings['beta_market'][i] * mkt +
true_loadings['beta_size'][i] * smb +
true_loadings['beta_value'][i] * hml +
np.random.normal(0, 0.01)) # 特异性风险
returns.append(ret)
asset_returns.append(returns)
# 卡尔曼滤波估计
kf = FamaFrenchKF(n_assets)
estimated_loadings = {
'alpha': [[] for _ in range(n_assets)],
'beta_market': [[] for _ in range(n_assets)],
'beta_size': [[] for _ in range(n_assets)],
'beta_value': [[] for _ in range(n_assets)]
}
for t in range(T):
kf.predict()
kf.update(np.array(asset_returns[t]), factor_returns[t])
# 记录估计的因子载荷
for i in range(n_assets):
loadings = kf.get_factor_loadings(i)
estimated_loadings['alpha'][i].append(loadings[0])
estimated_loadings['beta_market'][i].append(loadings[1])
estimated_loadings['beta_size'][i].append(loadings[2])
estimated_loadings['beta_value'][i].append(loadings[3])
# 绘制第一个资产的结果
plt.figure(figsize=(15, 12))
factors = ['alpha', 'beta_market', 'beta_size', 'beta_value']
for i, factor in enumerate(factors):
plt.subplot(2, 2, i+1)
plt.plot([true_loadings[factor][0]] * T, 'g-',
label=f'真实{factor}', linewidth=2)
plt.plot(estimated_loadings[factor][0], 'b--',
label=f'估计{factor}', linewidth=2)
plt.legend()
plt.title(f'资产1的{factor}载荷')
plt.xlabel('时间')
plt.ylabel('载荷值')
plt.tight_layout()
plt.show()
return estimated_loadings, true_loadings
# 运行示例
# fama_french_example()实际应用考虑
1. 数据预处理
金融数据的特殊性
- 缺失数据:节假日、交易暂停
- 异常值:价格跳跃、错误报价
- 频率转换:高频到低频的聚合
- 时区问题:跨市场数据同步
2. 模型验证
def model_validation_metrics(true_values, estimates, predictions=None):
"""
模型验证指标计算
"""
errors = np.array(true_values) - np.array(estimates)
metrics = {
'RMSE': np.sqrt(np.mean(errors**2)),
'MAE': np.mean(np.abs(errors)),
'MAPE': np.mean(np.abs(errors / np.array(true_values))) * 100,
'Max_Error': np.max(np.abs(errors)),
'R_squared': 1 - np.var(errors) / np.var(true_values)
}
if predictions is not None:
pred_errors = np.array(true_values[1:]) - np.array(predictions[:-1])
metrics['Pred_RMSE'] = np.sqrt(np.mean(pred_errors**2))
return metrics
# 示例使用
def validate_stock_model():
"""验证股价模型"""
prices, observed, estimated, _ = stock_price_filtering_example()
metrics = model_validation_metrics(prices, estimated)
print("股价模型验证指标:")
for metric, value in metrics.items():
print(f"{metric}: {value:.6f}")
# validate_stock_model()3. 实时实现考虑
class RealTimeKalmanFilter:
"""
实时卡尔曼滤波实现
"""
def __init__(self, kf_class, **kwargs):
self.kf = kf_class(**kwargs)
self.last_update_time = None
self.buffer = []
def add_observation(self, timestamp, observation):
"""添加新观测"""
if self.last_update_time is not None:
dt = (timestamp - self.last_update_time).total_seconds() / 86400 # 转换为天
self.kf.predict_with_dt(dt)
self.kf.update(observation)
self.last_update_time = timestamp
# 保存历史
self.buffer.append({
'timestamp': timestamp,
'observation': observation,
'estimate': self.kf.get_estimate(),
'covariance': self.kf.P.copy()
})
# 限制缓冲区大小
if len(self.buffer) > 1000:
self.buffer.pop(0)
def get_current_estimate(self):
"""获取当前估计"""
return self.kf.get_estimate()
def get_prediction(self, horizon_days):
"""获取未来预测"""
kf_copy = self.kf.copy()
kf_copy.predict_with_dt(horizon_days)
return kf_copy.get_estimate()下一章预告
下一章我们将学习系统建模与参数调优,包括如何选择合适的状态空间模型、噪声协方差调优和模型验证技术。
