Chapter 12: Applications of Kalman Filtering in Portfolio Management
Haiyue
34min
Chapter 12: Applications of Kalman Filtering in Portfolio Management
Learning Objectives
- Master state-space modeling for dynamic portfolio optimization
- Learn dynamic estimation and tracking of risk factors
- Understand dynamic adjustment strategies in asset allocation
Dynamic Portfolio Optimization Framework
State-Space Portfolio Model
Traditional portfolio theory assumes that returns and risk parameters are constant, but in reality, these parameters are time-varying. Kalman filtering provides a powerful tool for dynamic portfolio management.
import numpy as np
import matplotlib.pyplot as plt
from scipy.optimize import minimize
from scipy import linalg
import pandas as pd
import warnings
warnings.filterwarnings('ignore')
class DynamicPortfolioKF:
"""
Dynamic portfolio Kalman filter
"""
def __init__(self, n_assets):
self.n_assets = n_assets
# State: [expected returns, risk factor loadings, idiosyncratic risk]
# Simplified model: expected return for each asset
self.dim_x = n_assets
# State vector: expected returns
self.x = np.zeros((self.dim_x, 1))
self.P = np.eye(self.dim_x)
# System matrices
self.F = 0.99 * np.eye(self.dim_x) # Mean reversion of returns
self.H = np.eye(self.n_assets) # Directly observe returns
# Noise covariance
self.Q = 0.001 * np.eye(self.dim_x) # Expected return changes
self.R = 0.01 * np.eye(self.n_assets) # Observation noise
# Portfolio parameters
self.risk_aversion = 5.0
self.transaction_cost = 0.001
# Historical data
self.return_history = []
self.weight_history = []
self.portfolio_return_history = []
def predict(self):
"""Prediction step"""
self.x = self.F @ self.x
self.P = self.F @ self.P @ self.F.T + self.Q
def update(self, returns):
"""Update step"""
returns = np.array(returns).reshape(-1, 1)
# Standard Kalman update
y = returns - self.H @ self.x
S = self.H @ self.P @ self.H.T + self.R
K = self.P @ self.H.T @ linalg.inv(S)
self.x = self.x + K @ y
self.P = (np.eye(self.dim_x) - K @ self.H) @ self.P
# Save history
self.return_history.append(returns.flatten())
def estimate_covariance(self, window=50):
"""Estimate covariance matrix"""
if len(self.return_history) < window:
return self.R
recent_returns = np.array(self.return_history[-window:])
return np.cov(recent_returns.T)
def optimize_portfolio(self, prev_weights=None):
"""
Optimize portfolio weights
Parameters:
prev_weights: Previous period weights, used for transaction cost calculation
"""
mu = self.x.flatten() # Expected returns
Sigma = self.estimate_covariance() # Covariance matrix
if prev_weights is None:
prev_weights = np.ones(self.n_assets) / self.n_assets
# Objective function: maximize utility = expected return - risk penalty - transaction costs
def objective(w):
portfolio_return = w @ mu
portfolio_variance = w @ Sigma @ w
transaction_costs = self.transaction_cost * np.sum(np.abs(w - prev_weights))
utility = portfolio_return - 0.5 * self.risk_aversion * portfolio_variance - transaction_costs
return -utility # Minimize negative utility
# Constraint: weights sum to 1
constraints = [{'type': 'eq', 'fun': lambda w: np.sum(w) - 1}]
# Bounds: no short selling constraint
bounds = [(0, 1) for _ in range(self.n_assets)]
# Initial guess
x0 = prev_weights
# Optimize
result = minimize(objective, x0, method='SLSQP',
bounds=bounds, constraints=constraints)
if result.success:
return result.x
else:
return prev_weights # If optimization fails, keep previous weights
def calculate_portfolio_metrics(self, weights, returns):
"""Calculate portfolio metrics"""
portfolio_return = weights @ returns
if len(self.return_history) > 1:
recent_returns = np.array(self.return_history[-252:]) # Last year
portfolio_returns = recent_returns @ weights
metrics = {
'return': portfolio_return,
'volatility': np.std(portfolio_returns) * np.sqrt(252),
'sharpe_ratio': np.mean(portfolio_returns) / np.std(portfolio_returns) * np.sqrt(252) if np.std(portfolio_returns) > 0 else 0,
'max_drawdown': self._calculate_max_drawdown(portfolio_returns)
}
else:
metrics = {
'return': portfolio_return,
'volatility': 0,
'sharpe_ratio': 0,
'max_drawdown': 0
}
return metrics
def _calculate_max_drawdown(self, returns):
"""Calculate maximum drawdown"""
cumulative = np.cumprod(1 + returns)
running_max = np.maximum.accumulate(cumulative)
drawdown = (cumulative - running_max) / running_max
return np.min(drawdown)
def portfolio_management_example():
"""
Dynamic portfolio management example
"""
# Setup
n_assets = 4
asset_names = ['Stock A', 'Stock B', 'Bond A', 'Commodity A']
# Create dynamic portfolio manager
portfolio = DynamicPortfolioKF(n_assets)
# Simulate market data
np.random.seed(42)
T = 252 * 2 # 2 years of data
# True expected returns (time-varying)
true_mu = np.array([0.08, 0.10, 0.04, 0.06]) / 252 # Daily returns
# Simulate return data
correlation_matrix = np.array([
[1.0, 0.6, -0.2, 0.3],
[0.6, 1.0, -0.1, 0.4],
[-0.2, -0.1, 1.0, -0.1],
[0.3, 0.4, -0.1, 1.0]
])
volatilities = np.array([0.20, 0.25, 0.08, 0.30]) / np.sqrt(252) # Daily volatility
covariance_matrix = np.outer(volatilities, volatilities) * correlation_matrix
# Generate return series
all_returns = []
weights_history = []
portfolio_values = []
portfolio_value = 100000 # Initial portfolio value
current_weights = np.ones(n_assets) / n_assets # Equal weight start
for t in range(T):
# Generate daily returns
if t < T // 3:
mu_t = true_mu
elif t < 2 * T // 3:
mu_t = true_mu * 1.5 # Bull market
else:
mu_t = true_mu * 0.5 # Bear market
daily_returns = np.random.multivariate_normal(mu_t, covariance_matrix)
all_returns.append(daily_returns)
# Update Kalman filter
portfolio.predict()
portfolio.update(daily_returns)
# Rebalance every 10 days
if t % 10 == 0 and t > 20: # Start after sufficient history
new_weights = portfolio.optimize_portfolio(current_weights)
current_weights = new_weights
weights_history.append(current_weights.copy())
# Calculate portfolio return
portfolio_return = current_weights @ daily_returns
portfolio_value *= (1 + portfolio_return)
portfolio_values.append(portfolio_value)
# Save portfolio metrics
portfolio.weight_history.append(current_weights.copy())
portfolio.portfolio_return_history.append(portfolio_return)
# Benchmark: equal weight portfolio
benchmark_values = []
benchmark_value = 100000
benchmark_weights = np.ones(n_assets) / n_assets
for daily_returns in all_returns:
benchmark_return = benchmark_weights @ daily_returns
benchmark_value *= (1 + benchmark_return)
benchmark_values.append(benchmark_value)
# Convert to arrays
all_returns = np.array(all_returns)
weights_history = np.array(weights_history)
# Plot results
fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(16, 12))
# Portfolio value
ax1.plot(portfolio_values, 'b-', linewidth=2, label='Dynamic Portfolio')
ax1.plot(benchmark_values, 'r--', linewidth=2, label='Equal Weight Benchmark')
ax1.set_ylabel('Portfolio Value')
ax1.set_title('Portfolio Value Comparison')
ax1.legend()
ax1.grid(True, alpha=0.3)
# Weight changes
colors = ['blue', 'red', 'green', 'orange']
for i, (name, color) in enumerate(zip(asset_names, colors)):
ax2.plot(weights_history[:, i], color=color, linewidth=2, label=name)
ax2.set_ylabel('Weight')
ax2.set_title('Dynamic Weight Allocation')
ax2.legend()
ax2.grid(True, alpha=0.3)
# Expected return estimates
mu_estimates = np.array([est.flatten() for est in portfolio.return_history[1:]])
for i, (name, color) in enumerate(zip(asset_names, colors)):
ax3.plot(mu_estimates[:, i] * 252, color=color, linewidth=1, alpha=0.7, label=f'{name} Estimate')
ax3.axhline(y=true_mu[i] * 252, color=color, linestyle='--', alpha=0.5)
ax3.set_ylabel('Annualized Expected Return')
ax3.set_title('Expected Return Estimation')
ax3.legend()
ax3.grid(True, alpha=0.3)
# Rolling Sharpe ratio
window = 60
rolling_sharpe = []
benchmark_sharpe = []
for i in range(window, len(portfolio.portfolio_return_history)):
# Dynamic portfolio
port_returns = portfolio.portfolio_return_history[i-window:i]
if np.std(port_returns) > 0:
sharpe = np.mean(port_returns) / np.std(port_returns) * np.sqrt(252)
else:
sharpe = 0
rolling_sharpe.append(sharpe)
# Benchmark
bench_returns = [benchmark_weights @ all_returns[j] for j in range(i-window, i)]
if np.std(bench_returns) > 0:
bench_sharpe = np.mean(bench_returns) / np.std(bench_returns) * np.sqrt(252)
else:
bench_sharpe = 0
benchmark_sharpe.append(bench_sharpe)
ax4.plot(rolling_sharpe, 'b-', linewidth=2, label='Dynamic Portfolio')
ax4.plot(benchmark_sharpe, 'r--', linewidth=2, label='Equal Weight Benchmark')
ax4.set_xlabel('Time')
ax4.set_ylabel('Rolling Sharpe Ratio')
ax4.set_title(f'{window}-Day Rolling Sharpe Ratio')
ax4.legend()
ax4.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
# Performance summary
final_dynamic_value = portfolio_values[-1]
final_benchmark_value = benchmark_values[-1]
dynamic_returns = np.array(portfolio.portfolio_return_history)
benchmark_returns = [benchmark_weights @ ret for ret in all_returns]
print("Portfolio Performance Comparison:")
print("-" * 50)
print(f"Dynamic Portfolio:")
print(f" Final Value: ${final_dynamic_value:,.0f}")
print(f" Total Return: {(final_dynamic_value / 100000 - 1) * 100:.2f}%")
print(f" Annualized Return: {(final_dynamic_value / 100000) ** (252 / T) - 1:.3f}")
print(f" Annualized Volatility: {np.std(dynamic_returns) * np.sqrt(252):.3f}")
print(f" Sharpe Ratio: {np.mean(dynamic_returns) / np.std(dynamic_returns) * np.sqrt(252):.3f}")
print(f"\nEqual Weight Benchmark:")
print(f" Final Value: ${final_benchmark_value:,.0f}")
print(f" Total Return: {(final_benchmark_value / 100000 - 1) * 100:.2f}%")
print(f" Annualized Return: {(final_benchmark_value / 100000) ** (252 / T) - 1:.3f}")
print(f" Annualized Volatility: {np.std(benchmark_returns) * np.sqrt(252):.3f}")
print(f" Sharpe Ratio: {np.mean(benchmark_returns) / np.std(benchmark_returns) * np.sqrt(252):.3f}")
print(f"\nExcess Return: ${final_dynamic_value - final_benchmark_value:,.0f}")
# Run example
# portfolio_management_example()
Risk Factor Modeling
Dynamic Estimation of Multi-Factor Models
class DynamicFactorModel:
"""
Dynamic factor model: Fama-French + dynamic adjustment
"""
def __init__(self, n_assets, n_factors=3):
self.n_assets = n_assets
self.n_factors = n_factors
# State: factor loadings for each asset [alpha, beta_market, beta_size, beta_value, ...]
self.dim_x = n_assets * (n_factors + 1) # +1 for alpha
# State vector organization: [asset1_params, asset2_params, ...]
# asset_params = [alpha, beta_market, beta_size, beta_value]
self.x = np.random.normal(0, 0.1, (self.dim_x, 1))
self.P = 0.1 * np.eye(self.dim_x)
# State transition (random walk + small mean reversion)
self.F = 0.99 * np.eye(self.dim_x)
self.Q = 1e-5 * np.eye(self.dim_x)
# Observation noise (idiosyncratic risk)
self.R = 0.01 * np.eye(n_assets)
# Factor history
self.factor_history = []
self.asset_returns_history = []
def update_factors(self, asset_returns, factor_returns):
"""
Update factor model
Parameters:
asset_returns: Asset return vector (n_assets,)
factor_returns: Factor return vector (n_factors,)
"""
asset_returns = np.array(asset_returns).reshape(-1, 1)
factor_returns = np.array(factor_returns)
# Build observation matrix
H = np.zeros((self.n_assets, self.dim_x))
for i in range(self.n_assets):
start_idx = i * (self.n_factors + 1)
end_idx = (i + 1) * (self.n_factors + 1)
# [1, factor1, factor2, factor3] for asset i
H[i, start_idx:end_idx] = np.concatenate([[1], factor_returns])
# Predict
self.x = self.F @ self.x
self.P = self.F @ self.P @ self.F.T + self.Q
# Update
y = asset_returns - H @ self.x
S = H @ self.P @ H.T + self.R
K = self.P @ H.T @ linalg.inv(S)
self.x = self.x + K @ y
self.P = (np.eye(self.dim_x) - K @ H) @ self.P
# Save history
self.factor_history.append(factor_returns.copy())
self.asset_returns_history.append(asset_returns.flatten())
def get_factor_loadings(self, asset_idx):
"""Get factor loadings for an asset"""
start_idx = asset_idx * (self.n_factors + 1)
end_idx = (asset_idx + 1) * (self.n_factors + 1)
return self.x[start_idx:end_idx].flatten()
def predict_returns(self, factor_forecasts):
"""
Predict asset returns based on factor forecasts
Parameters:
factor_forecasts: Factor return forecasts
"""
predictions = []
for i in range(self.n_assets):
loadings = self.get_factor_loadings(i)
alpha = loadings[0]
betas = loadings[1:]
predicted_return = alpha + betas @ factor_forecasts
predictions.append(predicted_return)
return np.array(predictions)
def calculate_factor_contributions(self, asset_idx, factor_returns):
"""Calculate factor contributions to returns"""
loadings = self.get_factor_loadings(asset_idx)
alpha = loadings[0]
betas = loadings[1:]
contributions = {
'alpha': alpha,
'market': betas[0] * factor_returns[0] if len(factor_returns) > 0 else 0,
'size': betas[1] * factor_returns[1] if len(factor_returns) > 1 else 0,
'value': betas[2] * factor_returns[2] if len(factor_returns) > 2 else 0
}
return contributions
def dynamic_factor_model_example():
"""
Dynamic factor model example
"""
# Setup
n_assets = 5
n_factors = 3
asset_names = ['Large Cap A', 'Small Cap B', 'Value C', 'Growth D', 'Neutral E']
factor_names = ['Market Factor', 'Size Factor', 'Value Factor']
# Create dynamic factor model
dfm = DynamicFactorModel(n_assets, n_factors)
# Simulate factor returns and asset returns
np.random.seed(42)
T = 252
# True factor loadings (for data generation)
true_loadings = {
0: [0.001, 1.2, -0.3, 0.1], # Large cap: high market beta, negative size factor
1: [0.002, 0.8, 0.5, -0.2], # Small cap: positive size factor
2: [0.0005, 1.0, 0.1, 0.6], # Value: high value factor
3: [0.003, 1.1, 0.2, -0.4], # Growth: negative value factor
4: [0.001, 1.0, 0.0, 0.0] # Neutral: close to market
}
# Generate data
factor_returns_history = []
asset_returns_history = []
factor_loadings_history = {i: [] for i in range(n_assets)}
for t in range(T):
# Generate factor returns
if t < T // 2:
# First half: normal market
factor_returns = np.random.multivariate_normal(
[0.0004, 0.0, 0.0], # Market, size, value factors
[[0.0001, 0, 0], [0, 0.00005, 0], [0, 0, 0.00003]]
)
else:
# Second half: market stress
factor_returns = np.random.multivariate_normal(
[-0.001, 0.001, -0.0005], # Market down, small caps resilient, value stressed
[[0.0004, 0, 0], [0, 0.0001, 0], [0, 0, 0.00008]]
)
factor_returns_history.append(factor_returns)
# Generate asset returns
asset_returns = []
for i in range(n_assets):
loadings = true_loadings[i]
asset_return = (loadings[0] +
loadings[1] * factor_returns[0] +
loadings[2] * factor_returns[1] +
loadings[3] * factor_returns[2] +
np.random.normal(0, 0.01)) # Idiosyncratic risk
asset_returns.append(asset_return)
asset_returns_history.append(asset_returns)
# Update dynamic factor model
dfm.update_factors(asset_returns, factor_returns)
# Record estimated factor loadings
for i in range(n_assets):
estimated_loadings = dfm.get_factor_loadings(i)
factor_loadings_history[i].append(estimated_loadings.copy())
# Convert to arrays
factor_returns_history = np.array(factor_returns_history)
asset_returns_history = np.array(asset_returns_history)
# Plot results
fig, axes = plt.subplots(3, 2, figsize=(16, 18))
# Factor return time series
for i, factor_name in enumerate(factor_names):
axes[0, 0].plot(factor_returns_history[:, i], label=factor_name)
axes[0, 0].set_title('Factor Return Time Series')
axes[0, 0].legend()
axes[0, 0].grid(True, alpha=0.3)
# Asset return time series
for i, asset_name in enumerate(asset_names):
axes[0, 1].plot(asset_returns_history[:, i], label=asset_name, alpha=0.7)
axes[0, 1].set_title('Asset Return Time Series')
axes[0, 1].legend()
axes[0, 1].grid(True, alpha=0.3)
# Market beta estimates
for i, asset_name in enumerate(asset_names):
market_betas = [loadings[1] for loadings in factor_loadings_history[i]]
axes[1, 0].plot(market_betas, label=f'{asset_name}', linewidth=2)
axes[1, 0].axhline(y=true_loadings[i][1], color=f'C{i}', linestyle='--', alpha=0.5)
axes[1, 0].set_title('Market Beta Estimates (dashed lines are true values)')
axes[1, 0].legend()
axes[1, 0].grid(True, alpha=0.3)
# Size factor loading estimates
for i, asset_name in enumerate(asset_names):
size_loadings = [loadings[2] for loadings in factor_loadings_history[i]]
axes[1, 1].plot(size_loadings, label=f'{asset_name}', linewidth=2)
axes[1, 1].axhline(y=true_loadings[i][2], color=f'C{i}', linestyle='--', alpha=0.5)
axes[1, 1].set_title('Size Factor Loading Estimates')
axes[1, 1].legend()
axes[1, 1].grid(True, alpha=0.3)
# Value factor loading estimates
for i, asset_name in enumerate(asset_names):
value_loadings = [loadings[3] for loadings in factor_loadings_history[i]]
axes[2, 0].plot(value_loadings, label=f'{asset_name}', linewidth=2)
axes[2, 0].axhline(y=true_loadings[i][3], color=f'C{i}', linestyle='--', alpha=0.5)
axes[2, 0].set_title('Value Factor Loading Estimates')
axes[2, 0].legend()
axes[2, 0].grid(True, alpha=0.3)
# Factor contribution decomposition (last period)
last_factor_returns = factor_returns_history[-1]
contributions_data = []
for i, asset_name in enumerate(asset_names):
contributions = dfm.calculate_factor_contributions(i, last_factor_returns)
contributions_data.append([
contributions['alpha'],
contributions['market'],
contributions['size'],
contributions['value']
])
contributions_data = np.array(contributions_data)
# Stacked bar chart
bottom = np.zeros(n_assets)
colors = ['gray', 'blue', 'green', 'red']
labels = ['Alpha', 'Market', 'Size', 'Value']
for j, (color, label) in enumerate(zip(colors, labels)):
axes[2, 1].bar(asset_names, contributions_data[:, j], bottom=bottom,
color=color, alpha=0.7, label=label)
bottom += contributions_data[:, j]
axes[2, 1].set_title('Last Period Factor Return Contribution Decomposition')
axes[2, 1].legend()
axes[2, 1].tick_params(axis='x', rotation=45)
plt.tight_layout()
plt.show()
# Model fit evaluation
print("Dynamic Factor Model Fit Evaluation:")
print("-" * 50)
for i, asset_name in enumerate(asset_names):
# Calculate R²
actual_returns = asset_returns_history[:, i]
# Predict returns using model
predicted_returns = []
for t in range(len(factor_returns_history)):
if t < len(factor_loadings_history[i]):
loadings = factor_loadings_history[i][t]
pred = (loadings[0] +
loadings[1] * factor_returns_history[t, 0] +
loadings[2] * factor_returns_history[t, 1] +
loadings[3] * factor_returns_history[t, 2])
predicted_returns.append(pred)
if len(predicted_returns) == len(actual_returns):
r_squared = 1 - np.var(actual_returns - predicted_returns) / np.var(actual_returns)
rmse = np.sqrt(np.mean((actual_returns - predicted_returns)**2))
print(f"{asset_name}:")
print(f" R²: {r_squared:.3f}")
print(f" RMSE: {rmse:.5f}")
# Final estimated loadings vs true loadings
final_loadings = factor_loadings_history[i][-1]
true_vals = true_loadings[i]
print(f" Loading estimates vs true:")
print(f" Alpha: {final_loadings[0]:.4f} vs {true_vals[0]:.4f}")
print(f" Market: {final_loadings[1]:.3f} vs {true_vals[1]:.3f}")
print(f" Size: {final_loadings[2]:.3f} vs {true_vals[2]:.3f}")
print(f" Value: {final_loadings[3]:.3f} vs {true_vals[3]:.3f}")
print()
# Run example
# dynamic_factor_model_example()
Risk Budgeting Model
Kalman Filtering-Based Risk Budgeting
class RiskBudgetingKF:
"""
Kalman filtering-based risk budgeting model
"""
def __init__(self, n_assets):
self.n_assets = n_assets
# State: asset risk contribution rates
self.dim_x = n_assets
self.x = np.ones((self.dim_x, 1)) / self.n_assets # Initial equal risk contribution
self.P = 0.01 * np.eye(self.dim_x)
# System model
self.F = np.eye(self.dim_x) # Random walk of risk contributions
self.Q = 0.0001 * np.eye(self.dim_x) # Small variations
# Target risk budget
self.target_risk_budget = np.ones(n_assets) / n_assets # Equal risk budget
# Historical data
self.returns_history = []
self.weights_history = []
self.risk_contributions_history = []
def update_risk_model(self, returns, weights):
"""
Update risk model
Parameters:
returns: Asset returns
weights: Current portfolio weights
"""
returns = np.array(returns)
weights = np.array(weights)
# Calculate actual risk contributions
if len(self.returns_history) >= 20: # Need sufficient historical data
recent_returns = np.array(self.returns_history[-20:])
cov_matrix = np.cov(recent_returns.T)
# Calculate marginal risk contributions
portfolio_var = weights @ cov_matrix @ weights
marginal_contributions = cov_matrix @ weights
# Risk contribution = weight × marginal contribution / portfolio variance
if portfolio_var > 0:
risk_contributions = weights * marginal_contributions / portfolio_var
risk_contributions = np.abs(risk_contributions) # Ensure non-negative
risk_contributions /= np.sum(risk_contributions) # Normalize
else:
risk_contributions = weights / np.sum(weights)
else:
risk_contributions = weights / np.sum(weights)
# Observation equation: observe actual risk contributions
H = np.eye(self.n_assets)
R = 0.01 * np.eye(self.n_assets)
# Kalman filter update
self.x = self.F @ self.x
self.P = self.F @ self.P @ self.F.T + self.Q
z = risk_contributions.reshape(-1, 1)
y = z - H @ self.x
S = H @ self.P @ H.T + R
K = self.P @ H.T @ linalg.inv(S)
self.x = self.x + K @ y
self.P = (np.eye(self.dim_x) - K @ H) @ self.P
# Save history
self.returns_history.append(returns)
self.weights_history.append(weights)
self.risk_contributions_history.append(risk_contributions)
def optimize_risk_budget_weights(self, expected_returns, cov_matrix):
"""
Optimize weights based on risk budget
Parameters:
expected_returns: Expected returns
cov_matrix: Covariance matrix
"""
# Objective: find weights such that risk contributions match target budget
def risk_budget_objective(weights):
weights = np.abs(weights)
weights /= np.sum(weights) # Normalize
# Calculate risk contributions
portfolio_var = weights @ cov_matrix @ weights
if portfolio_var <= 0:
return 1e10
marginal_contributions = cov_matrix @ weights
risk_contributions = weights * marginal_contributions / portfolio_var
risk_contributions = np.abs(risk_contributions)
risk_contributions /= np.sum(risk_contributions)
# Objective: minimize difference from target risk budget
budget_error = np.sum((risk_contributions - self.target_risk_budget)**2)
# Add return constraint (optional)
expected_portfolio_return = weights @ expected_returns
return_penalty = max(0, 0.08/252 - expected_portfolio_return) * 1000
return budget_error + return_penalty
# Constraints and bounds
constraints = [{'type': 'eq', 'fun': lambda w: np.sum(np.abs(w)) - 1}]
bounds = [(0, 1) for _ in range(self.n_assets)]
# Initial guess
x0 = np.ones(self.n_assets) / self.n_assets
# Optimize
from scipy.optimize import minimize
result = minimize(risk_budget_objective, x0, method='SLSQP',
bounds=bounds, constraints=constraints)
if result.success:
optimal_weights = np.abs(result.x)
optimal_weights /= np.sum(optimal_weights)
return optimal_weights
else:
return x0
def get_current_risk_budget(self):
"""Get current estimated risk budget"""
return self.x.flatten()
def risk_budgeting_example():
"""
Risk budgeting example
"""
# Setup
n_assets = 4
asset_names = ['Stocks', 'Bonds', 'Real Estate', 'Commodities']
# Create risk budgeting model
rb = RiskBudgetingKF(n_assets)
# Set target risk budget
rb.target_risk_budget = np.array([0.4, 0.3, 0.2, 0.1]) # Stocks bear 40% risk
# Simulate data
np.random.seed(42)
T = 252
# Asset characteristics
expected_returns = np.array([0.08, 0.04, 0.06, 0.05]) / 252 # Daily
volatilities = np.array([0.20, 0.05, 0.15, 0.25]) / np.sqrt(252)
# Correlation matrix
correlation = np.array([
[1.0, -0.2, 0.6, 0.3],
[-0.2, 1.0, 0.1, -0.1],
[0.6, 0.1, 1.0, 0.2],
[0.3, -0.1, 0.2, 1.0]
])
# Covariance matrix
cov_matrix = np.outer(volatilities, volatilities) * correlation
# Initial weights (equal weight)
current_weights = np.ones(n_assets) / n_assets
# Store results
all_returns = []
all_weights = []
portfolio_values = []
risk_budget_evolution = []
actual_risk_contributions = []
portfolio_value = 100000
for t in range(T):
# Generate returns
daily_returns = np.random.multivariate_normal(expected_returns, cov_matrix)
all_returns.append(daily_returns)
# Update risk model
rb.update_risk_model(daily_returns, current_weights)
# Rebalance every 20 days
if t % 20 == 0 and t > 40:
# Update covariance matrix estimate
recent_returns = np.array(all_returns[-40:])
updated_cov = np.cov(recent_returns.T)
# Optimize weights
new_weights = rb.optimize_risk_budget_weights(expected_returns, updated_cov)
current_weights = new_weights
all_weights.append(current_weights.copy())
# Calculate portfolio return
portfolio_return = current_weights @ daily_returns
portfolio_value *= (1 + portfolio_return)
portfolio_values.append(portfolio_value)
# Record risk budget evolution
risk_budget_evolution.append(rb.get_current_risk_budget())
# Calculate actual risk contributions (if sufficient data)
if len(all_returns) >= 20:
recent_returns = np.array(all_returns[-20:])
recent_cov = np.cov(recent_returns.T)
portfolio_var = current_weights @ recent_cov @ current_weights
if portfolio_var > 0:
marginal_contrib = recent_cov @ current_weights
risk_contrib = current_weights * marginal_contrib / portfolio_var
risk_contrib = np.abs(risk_contrib) / np.sum(np.abs(risk_contrib))
else:
risk_contrib = current_weights / np.sum(current_weights)
else:
risk_contrib = current_weights / np.sum(current_weights)
actual_risk_contributions.append(risk_contrib)
# Convert to arrays
all_weights = np.array(all_weights)
risk_budget_evolution = np.array(risk_budget_evolution)
actual_risk_contributions = np.array(actual_risk_contributions)
# Plot results
fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(16, 12))
# Portfolio value
ax1.plot(portfolio_values, 'b-', linewidth=2)
ax1.set_ylabel('Portfolio Value')
ax1.set_title('Risk Budget Portfolio Value')
ax1.grid(True, alpha=0.3)
# Weight evolution
colors = ['blue', 'red', 'green', 'orange']
for i, (name, color) in enumerate(zip(asset_names, colors)):
ax2.plot(all_weights[:, i], color=color, linewidth=2, label=name)
ax2.set_ylabel('Weight')
ax2.set_title('Portfolio Weight Evolution')
ax2.legend()
ax2.grid(True, alpha=0.3)
# Target vs actual risk contributions
for i, (name, color) in enumerate(zip(asset_names, colors)):
ax3.plot(actual_risk_contributions[:, i], color=color,
linewidth=2, label=f'{name} Actual')
ax3.axhline(y=rb.target_risk_budget[i], color=color,
linestyle='--', alpha=0.7, label=f'{name} Target')
ax3.set_ylabel('Risk Contribution')
ax3.set_title('Risk Contribution: Target vs Actual')
ax3.legend()
ax3.grid(True, alpha=0.3)
# Risk budget tracking error
tracking_errors = []
for i in range(len(actual_risk_contributions)):
error = np.sqrt(np.mean((actual_risk_contributions[i] - rb.target_risk_budget)**2))
tracking_errors.append(error)
ax4.plot(tracking_errors, 'purple', linewidth=2)
ax4.set_xlabel('Time')
ax4.set_ylabel('Risk Budget Tracking Error (RMSE)')
ax4.set_title('Risk Budget Tracking Quality')
ax4.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
# Performance summary
final_value = portfolio_values[-1]
total_return = (final_value / 100000 - 1) * 100
portfolio_returns = []
for i in range(len(all_returns)):
port_ret = all_weights[i] @ all_returns[i]
portfolio_returns.append(port_ret)
annual_return = np.mean(portfolio_returns) * 252
annual_vol = np.std(portfolio_returns) * np.sqrt(252)
sharpe_ratio = annual_return / annual_vol if annual_vol > 0 else 0
print("Risk Budget Portfolio Performance:")
print("-" * 40)
print(f"Final Value: ${final_value:,.0f}")
print(f"Total Return: {total_return:.2f}%")
print(f"Annualized Return: {annual_return:.3f}")
print(f"Annualized Volatility: {annual_vol:.3f}")
print(f"Sharpe Ratio: {sharpe_ratio:.3f}")
print(f"\nRisk Budget Tracking Quality:")
print(f"Average Tracking Error: {np.mean(tracking_errors):.4f}")
print(f"Maximum Tracking Error: {np.max(tracking_errors):.4f}")
print(f"\nFinal Risk Contribution vs Target:")
final_contributions = actual_risk_contributions[-1]
for i, name in enumerate(asset_names):
print(f"{name}: {final_contributions[i]:.3f} vs {rb.target_risk_budget[i]:.3f}")
# Run example
# risk_budgeting_example()
Next Chapter Preview
In the next chapter, we will learn about financial derivatives pricing and risk management, including state-space representation of option pricing models, dynamic hedging strategies for interest rate derivatives, and dynamic modeling methods for credit risk.