Chapter 8: Fundamental Applications of Kalman Filtering in Finance
Haiyue
23min
Chapter 8: Fundamental Applications of Kalman Filtering in Finance
Learning Objectives
- Master state-space methods for stock price dynamics modeling
- Learn Kalman filter implementation for volatility estimation and prediction
- Understand dynamic modeling of interest rate term structure
State-Space Methods for Financial Modeling
In finance, many important variables are unobservable latent states that need to be inferred from market observation data:
- Latent States: Volatility, risk premium, liquidity, market sentiment
- Observed Data: Stock prices, bond prices, option prices, trading volume
Advantages of State-Space Representation
Why Use State-Space Models
- Latent Factor Modeling: Ability to estimate unobservable financial variables
- Dynamic Relationships: Capture time-varying characteristics of financial variables
- Multivariate Modeling: Handle multiple correlated assets and factors
- Predictive Capability: Provide probability distributions for future states
- Real-Time Updates: Instantly update estimates when new information arrives
Stock Price Dynamics Modeling
Random Walk Model
The simplest stock price model is a random walk:
State Equation:
Observation Equation:
import numpy as np
import matplotlib.pyplot as plt
from scipy import linalg
import pandas as pd
class StockPriceKF:
"""
Stock Price Kalman Filter Model
"""
def __init__(self, initial_price=100.0):
# State: [log_price, drift]
self.x = np.array([[np.log(initial_price)], [0.0]])
self.P = np.array([[0.01, 0], [0, 0.001]])
# State transition matrix (random walk + drift)
self.F = np.array([[1, 1], [0, 1]])
# Observation matrix (observe exponentially transformed price)
self.H = np.array([[1, 0]])
# Process noise covariance
self.Q = np.array([[0.01, 0], [0, 0.0001]])
# Observation noise covariance
self.R = np.array([[0.001]])
def predict(self):
"""Prediction step"""
self.x = self.F @ self.x
self.P = self.F @ self.P @ self.F.T + self.Q
def update(self, price):
"""Update step"""
z = np.array([[np.log(price)]])
# Innovation
y = z - self.H @ self.x
S = self.H @ self.P @ self.H.T + self.R
# Kalman gain
K = self.P @ self.H.T @ linalg.inv(S)
# Update
self.x = self.x + K @ y
self.P = (np.eye(2) - K @ self.H) @ self.P
def get_price_estimate(self):
"""Get price estimate"""
return np.exp(self.x[0, 0])
def get_drift_estimate(self):
"""Get drift estimate"""
return self.x[1, 0]
def stock_price_filtering_example():
"""
Stock price filtering example
"""
# Simulate true stock price data
np.random.seed(42)
T = 252 # One year of trading days
true_drift = 0.0002 # Daily drift rate
true_vol = 0.02 # Daily volatility
prices = [100.0]
for t in range(1, T):
log_return = true_drift + true_vol * np.random.normal()
prices.append(prices[-1] * np.exp(log_return))
# Add observation noise
observed_prices = [p + np.random.normal(0, 0.1) for p in prices]
# Kalman filtering
kf = StockPriceKF(initial_price=observed_prices[0])
estimated_prices = []
estimated_drifts = []
for price in observed_prices:
kf.predict()
kf.update(price)
estimated_prices.append(kf.get_price_estimate())
estimated_drifts.append(kf.get_drift_estimate())
# Plot results
plt.figure(figsize=(15, 10))
plt.subplot(3, 1, 1)
plt.plot(prices, 'g-', label='True Price', linewidth=2)
plt.plot(observed_prices, 'r.', label='Observed Price', alpha=0.7, markersize=3)
plt.plot(estimated_prices, 'b-', label='Kalman Estimate', linewidth=2)
plt.legend()
plt.title('Stock Price Kalman Filtering')
plt.ylabel('Price')
plt.subplot(3, 1, 2)
plt.plot([true_drift * 252] * T, 'g-', label='True Annualized Drift', linewidth=2)
plt.plot(np.array(estimated_drifts) * 252, 'b-', label='Estimated Annualized Drift', linewidth=2)
plt.legend()
plt.ylabel('Annualized Drift Rate')
plt.subplot(3, 1, 3)
errors = np.array(prices) - np.array(estimated_prices)
plt.plot(errors, 'r-', label='Estimation Error')
plt.axhline(y=0, color='k', linestyle='--', alpha=0.5)
plt.legend()
plt.xlabel('Trading Day')
plt.ylabel('Price Error')
plt.tight_layout()
plt.show()
return prices, observed_prices, estimated_prices, estimated_drifts
# Run example
# stock_price_filtering_example()
Time-Varying Volatility Model
Stock volatility is typically time-varying and can be modeled using state-space models:
class StochasticVolatilityKF:
"""
Stochastic Volatility Kalman Filter Model
"""
def __init__(self, initial_price=100.0, initial_vol=0.2):
# State: [log_price, log_variance]
self.x = np.array([[np.log(initial_price)], [np.log(initial_vol**2)]])
self.P = np.array([[0.01, 0], [0, 0.1]])
# Model parameters
self.mu = 0.0001 # Expected return
self.kappa = 0.1 # Variance mean reversion speed
self.theta = np.log(0.04) # Long-term variance (log)
self.sigma_v = 0.1 # Volatility of volatility
# State transition matrix (linearized)
self.F = np.array([[1, 0], [0, 1 - self.kappa]])
# Observation matrix
self.H = np.array([[1, 0]])
# Noise covariances
self.Q = np.array([[0, 0], [0, self.sigma_v**2]])
self.R = np.array([[0.0001]])
def predict(self, dt=1.0):
"""Prediction step (nonlinear)"""
log_S = self.x[0, 0]
log_var = self.x[1, 0]
# Nonlinear state transition
new_log_S = log_S + (self.mu - 0.5 * np.exp(log_var)) * dt
new_log_var = log_var + self.kappa * (self.theta - log_var) * dt
self.x = np.array([[new_log_S], [new_log_var]])
# Linearized Jacobian matrix
F = np.array([[1, -0.5 * np.exp(log_var) * dt],
[0, 1 - self.kappa * dt]])
# Covariance prediction
Q_scaled = self.Q * dt
Q_scaled[0, 0] = np.exp(log_var) * dt # Noise in price equation
self.P = F @ self.P @ F.T + Q_scaled
def update(self, price):
"""Update step"""
z = np.array([[np.log(price)]])
y = z - self.H @ self.x
S = self.H @ self.P @ self.H.T + self.R
K = self.P @ self.H.T @ linalg.inv(S)
self.x = self.x + K @ y
self.P = (np.eye(2) - K @ self.H) @ self.P
def get_price_estimate(self):
return np.exp(self.x[0, 0])
def get_volatility_estimate(self):
return np.sqrt(np.exp(self.x[1, 0]))
def stochastic_volatility_example():
"""
Stochastic volatility estimation example
"""
# Simulate Heston process
np.random.seed(42)
T = 252
dt = 1.0
# Heston parameters
S0, V0 = 100.0, 0.04
mu, kappa, theta, sigma_v = 0.05/252, 2.0, 0.04, 0.3
rho = -0.5
# Simulate path
prices = [S0]
volatilities = [np.sqrt(V0)]
S, V = S0, V0
for t in range(1, T):
dW1 = np.random.normal() * np.sqrt(dt)
dW2 = rho * dW1 + np.sqrt(1 - rho**2) * np.random.normal() * np.sqrt(dt)
# Update variance
V = max(V + kappa * (theta - V) * dt + sigma_v * np.sqrt(max(V, 0)) * dW2, 1e-8)
# Update price
S = S * np.exp((mu - 0.5 * V) * dt + np.sqrt(V) * dW1)
prices.append(S)
volatilities.append(np.sqrt(V))
# Kalman filter estimation
kf = StochasticVolatilityKF(prices[0])
estimated_prices = []
estimated_vols = []
for price in prices:
kf.predict()
kf.update(price)
estimated_prices.append(kf.get_price_estimate())
estimated_vols.append(kf.get_volatility_estimate())
# Plot results
plt.figure(figsize=(15, 8))
plt.subplot(2, 1, 1)
plt.plot(prices, 'g-', label='True Price', linewidth=2)
plt.plot(estimated_prices, 'b--', label='Estimated Price', linewidth=2)
plt.legend()
plt.title('Stochastic Volatility Model: Price and Volatility Estimation')
plt.ylabel('Price')
plt.subplot(2, 1, 2)
plt.plot(volatilities, 'g-', label='True Volatility', linewidth=2)
plt.plot(estimated_vols, 'r--', label='Estimated Volatility', linewidth=2)
plt.legend()
plt.xlabel('Trading Day')
plt.ylabel('Volatility')
plt.tight_layout()
plt.show()
return prices, volatilities, estimated_prices, estimated_vols
# Run example
# stochastic_volatility_example()
Interest Rate Term Structure Modeling
State-Space Representation of the Vasicek Model
The Vasicek interest rate model can be expressed in state-space form:
State Equation:
Observation Equation:
class VasicekTermStructureKF:
"""
Vasicek Term Structure Kalman Filter Model
"""
def __init__(self, maturities, initial_rate=0.03):
self.maturities = np.array(maturities)
self.n_bonds = len(maturities)
# State: [short_rate, long_term_mean, mean_reversion_speed]
self.x = np.array([[initial_rate], [initial_rate], [0.1]])
self.P = np.array([[0.0001, 0, 0],
[0, 0.0001, 0],
[0, 0, 0.01]])
# Vasicek parameters
self.kappa = 0.1
self.sigma = 0.01
# Observation matrix (zero-coupon bond prices)
self.H = self._compute_observation_matrix()
# Process noise
self.Q = np.array([[self.sigma**2, 0, 0],
[0, 1e-6, 0],
[0, 0, 1e-6]])
# Observation noise
self.R = 1e-4 * np.eye(self.n_bonds)
def _compute_observation_matrix(self):
"""Compute observation matrix"""
H = np.zeros((self.n_bonds, 3))
for i, T in enumerate(self.maturities):
if T > 0:
# Linearization of Vasicek bond price formula
B_T = (1 - np.exp(-self.kappa * T)) / self.kappa
A_T = np.exp((B_T - T) * (self.kappa**2 * 0.04 - 0.5 * self.sigma**2) / self.kappa**2 -
0.25 * self.sigma**2 * B_T**2 / self.kappa)
H[i, 0] = -B_T # Sensitivity to short rate
return H
def predict(self, dt=1.0):
"""Prediction step"""
r_t = self.x[0, 0]
theta = self.x[1, 0]
kappa = self.x[2, 0]
# State transition (Euler discretization)
new_r = r_t + kappa * (theta - r_t) * dt
new_theta = theta # Assume long-term mean is constant
new_kappa = kappa # Assume mean reversion speed is constant
self.x = np.array([[new_r], [new_theta], [new_kappa]])
# Jacobian matrix
F = np.array([[1 - kappa * dt, kappa * dt, (theta - r_t) * dt],
[0, 1, 0],
[0, 0, 1]])
self.P = F @ self.P @ F.T + self.Q * dt
def update(self, bond_prices):
"""Update step"""
z = np.log(bond_prices).reshape(-1, 1)
# Update observation matrix
self.H = self._compute_observation_matrix()
# Predict observation
z_pred = self.H @ self.x
y = z - z_pred
S = self.H @ self.P @ self.H.T + self.R
K = self.P @ self.H.T @ linalg.inv(S)
self.x = self.x + K @ y
self.P = (np.eye(3) - K @ self.H) @ self.P
def get_short_rate(self):
return self.x[0, 0]
def get_long_term_mean(self):
return self.x[1, 0]
def get_mean_reversion_speed(self):
return self.x[2, 0]
def term_structure_example():
"""
Term structure modeling example
"""
# Set maturities
maturities = [0.25, 0.5, 1, 2, 5, 10] # years
T = 100 # Observation days
# Simulate true Vasicek process
np.random.seed(42)
true_kappa, true_theta, true_sigma = 0.1, 0.04, 0.01
true_rates = [0.03]
for t in range(1, T):
dr = true_kappa * (true_theta - true_rates[-1]) * (1/252) + \
true_sigma * np.sqrt(1/252) * np.random.normal()
true_rates.append(max(true_rates[-1] + dr, 0.001))
# Generate bond price observations
def vasicek_bond_price(r, T, kappa, theta, sigma):
B = (1 - np.exp(-kappa * T)) / kappa if kappa != 0 else T
A = np.exp((B - T) * (kappa * theta - 0.5 * sigma**2) / kappa**2 -
0.25 * sigma**2 * B**2 / kappa) if kappa != 0 else np.exp(-0.5 * sigma**2 * T**3 / 3)
return A * np.exp(-B * r)
bond_prices_series = []
for r in true_rates:
prices = []
for mat in maturities:
true_price = vasicek_bond_price(r, mat, true_kappa, true_theta, true_sigma)
noisy_price = true_price * (1 + np.random.normal(0, 0.001))
prices.append(noisy_price)
bond_prices_series.append(prices)
# Kalman filter estimation
kf = VasicekTermStructureKF(maturities)
estimated_rates = []
estimated_theta = []
estimated_kappa = []
for prices in bond_prices_series:
kf.predict()
kf.update(np.array(prices))
estimated_rates.append(kf.get_short_rate())
estimated_theta.append(kf.get_long_term_mean())
estimated_kappa.append(kf.get_mean_reversion_speed())
# Plot results
plt.figure(figsize=(15, 12))
plt.subplot(3, 1, 1)
plt.plot(true_rates, 'g-', label='True Short Rate', linewidth=2)
plt.plot(estimated_rates, 'b--', label='Estimated Short Rate', linewidth=2)
plt.legend()
plt.title('Vasicek Term Structure Model')
plt.ylabel('Interest Rate')
plt.subplot(3, 1, 2)
plt.plot([true_theta] * T, 'g-', label='True Long-Term Mean', linewidth=2)
plt.plot(estimated_theta, 'r--', label='Estimated Long-Term Mean', linewidth=2)
plt.legend()
plt.ylabel('Long-Term Mean')
plt.subplot(3, 1, 3)
plt.plot([true_kappa] * T, 'g-', label='True Mean Reversion Speed', linewidth=2)
plt.plot(estimated_kappa, 'orange', label='Estimated Mean Reversion Speed', linewidth=2, linestyle='--')
plt.legend()
plt.xlabel('Time')
plt.ylabel('Mean Reversion Speed')
plt.tight_layout()
plt.show()
return true_rates, estimated_rates, estimated_theta, estimated_kappa
# Run example
# term_structure_example()
Risk Factor Modeling
Fama-French Three-Factor Model
Using Kalman filtering to estimate time-varying risk factor loadings:
class FamaFrenchKF:
"""
Fama-French Three-Factor Model Kalman Filter Implementation
"""
def __init__(self, n_assets):
self.n_assets = n_assets
# State: [alpha, beta_market, beta_size, beta_value] for each asset
self.n_factors = 4
self.dim_x = n_assets * self.n_factors
# Initialize state (random initial values)
self.x = np.random.normal(0, 0.1, (self.dim_x, 1))
# Initial covariance
self.P = 0.01 * np.eye(self.dim_x)
# State transition (random walk + slight mean reversion)
self.F = 0.99 * np.eye(self.dim_x)
# Process noise
self.Q = 1e-5 * np.eye(self.dim_x)
# Observation noise (idiosyncratic risk)
self.R = 0.01 * np.eye(n_assets)
def predict(self):
"""Prediction step"""
self.x = self.F @ self.x
self.P = self.F @ self.P @ self.F.T + self.Q
def update(self, returns, factor_returns):
"""
Update step
Parameters:
returns: Asset return vector (n_assets,)
factor_returns: Factor returns [market, size, value] (3,)
"""
# Construct observation matrix
factors = np.concatenate([[1.0], factor_returns]) # [1, MKT, SMB, HML]
H = np.zeros((self.n_assets, self.dim_x))
for i in range(self.n_assets):
start_idx = i * self.n_factors
end_idx = (i + 1) * self.n_factors
H[i, start_idx:end_idx] = factors
# Predict returns
y_pred = H @ self.x
# Innovation
y = returns.reshape(-1, 1) - y_pred
S = H @ self.P @ H.T + self.R
# Kalman gain
K = self.P @ H.T @ linalg.inv(S)
# Update
self.x = self.x + K @ y
self.P = (np.eye(self.dim_x) - K @ H) @ self.P
def get_factor_loadings(self, asset_idx):
"""Get factor loadings for an asset"""
start_idx = asset_idx * self.n_factors
end_idx = (asset_idx + 1) * self.n_factors
return self.x[start_idx:end_idx].flatten()
def fama_french_example():
"""
Fama-French model example
"""
# Simulate data
np.random.seed(42)
T = 252 # One year of data
n_assets = 5
# True factor loadings (time-invariant for validation)
true_loadings = {
'alpha': np.random.normal(0, 0.001, n_assets),
'beta_market': np.random.uniform(0.5, 1.5, n_assets),
'beta_size': np.random.uniform(-0.5, 0.5, n_assets),
'beta_value': np.random.uniform(-0.3, 0.3, n_assets)
}
# Simulate factor returns
factor_returns = []
asset_returns = []
for t in range(T):
# Factor returns
mkt = np.random.normal(0.0004, 0.01) # Market factor
smb = np.random.normal(0, 0.005) # Size factor
hml = np.random.normal(0, 0.005) # Value factor
factors = np.array([mkt, smb, hml])
factor_returns.append(factors)
# Asset returns
returns = []
for i in range(n_assets):
ret = (true_loadings['alpha'][i] +
true_loadings['beta_market'][i] * mkt +
true_loadings['beta_size'][i] * smb +
true_loadings['beta_value'][i] * hml +
np.random.normal(0, 0.01)) # Idiosyncratic risk
returns.append(ret)
asset_returns.append(returns)
# Kalman filter estimation
kf = FamaFrenchKF(n_assets)
estimated_loadings = {
'alpha': [[] for _ in range(n_assets)],
'beta_market': [[] for _ in range(n_assets)],
'beta_size': [[] for _ in range(n_assets)],
'beta_value': [[] for _ in range(n_assets)]
}
for t in range(T):
kf.predict()
kf.update(np.array(asset_returns[t]), factor_returns[t])
# Record estimated factor loadings
for i in range(n_assets):
loadings = kf.get_factor_loadings(i)
estimated_loadings['alpha'][i].append(loadings[0])
estimated_loadings['beta_market'][i].append(loadings[1])
estimated_loadings['beta_size'][i].append(loadings[2])
estimated_loadings['beta_value'][i].append(loadings[3])
# Plot results for first asset
plt.figure(figsize=(15, 12))
factors = ['alpha', 'beta_market', 'beta_size', 'beta_value']
for i, factor in enumerate(factors):
plt.subplot(2, 2, i+1)
plt.plot([true_loadings[factor][0]] * T, 'g-',
label=f'True {factor}', linewidth=2)
plt.plot(estimated_loadings[factor][0], 'b--',
label=f'Estimated {factor}', linewidth=2)
plt.legend()
plt.title(f'{factor} Loading for Asset 1')
plt.xlabel('Time')
plt.ylabel('Loading Value')
plt.tight_layout()
plt.show()
return estimated_loadings, true_loadings
# Run example
# fama_french_example()
Practical Implementation Considerations
1. Data Preprocessing
Specifics of Financial Data
- Missing Data: Holidays, trading halts
- Outliers: Price jumps, erroneous quotes
- Frequency Conversion: Aggregating high-frequency to low-frequency
- Time Zone Issues: Synchronizing cross-market data
2. Model Validation
def model_validation_metrics(true_values, estimates, predictions=None):
"""
Compute model validation metrics
"""
errors = np.array(true_values) - np.array(estimates)
metrics = {
'RMSE': np.sqrt(np.mean(errors**2)),
'MAE': np.mean(np.abs(errors)),
'MAPE': np.mean(np.abs(errors / np.array(true_values))) * 100,
'Max_Error': np.max(np.abs(errors)),
'R_squared': 1 - np.var(errors) / np.var(true_values)
}
if predictions is not None:
pred_errors = np.array(true_values[1:]) - np.array(predictions[:-1])
metrics['Pred_RMSE'] = np.sqrt(np.mean(pred_errors**2))
return metrics
# Example usage
def validate_stock_model():
"""Validate stock price model"""
prices, observed, estimated, _ = stock_price_filtering_example()
metrics = model_validation_metrics(prices, estimated)
print("Stock Price Model Validation Metrics:")
for metric, value in metrics.items():
print(f"{metric}: {value:.6f}")
# validate_stock_model()
3. Real-Time Implementation Considerations
class RealTimeKalmanFilter:
"""
Real-Time Kalman Filter Implementation
"""
def __init__(self, kf_class, **kwargs):
self.kf = kf_class(**kwargs)
self.last_update_time = None
self.buffer = []
def add_observation(self, timestamp, observation):
"""Add new observation"""
if self.last_update_time is not None:
dt = (timestamp - self.last_update_time).total_seconds() / 86400 # Convert to days
self.kf.predict_with_dt(dt)
self.kf.update(observation)
self.last_update_time = timestamp
# Save history
self.buffer.append({
'timestamp': timestamp,
'observation': observation,
'estimate': self.kf.get_estimate(),
'covariance': self.kf.P.copy()
})
# Limit buffer size
if len(self.buffer) > 1000:
self.buffer.pop(0)
def get_current_estimate(self):
"""Get current estimate"""
return self.kf.get_estimate()
def get_prediction(self, horizon_days):
"""Get future prediction"""
kf_copy = self.kf.copy()
kf_copy.predict_with_dt(horizon_days)
return kf_copy.get_estimate()
Next Chapter Preview
In the next chapter, we will learn about system modeling and parameter tuning, including how to select appropriate state-space models, noise covariance tuning, and model validation techniques.