Chapter 12: Applications of Kalman Filtering in Portfolio Management

Haiyue
34min

Chapter 12: Applications of Kalman Filtering in Portfolio Management

Learning Objectives
  • Master state-space modeling for dynamic portfolio optimization
  • Learn dynamic estimation and tracking of risk factors
  • Understand dynamic adjustment strategies in asset allocation

Dynamic Portfolio Optimization Framework

State-Space Portfolio Model

Traditional portfolio theory assumes that returns and risk parameters are constant, but in reality, these parameters are time-varying. Kalman filtering provides a powerful tool for dynamic portfolio management.

import numpy as np
import matplotlib.pyplot as plt
from scipy.optimize import minimize
from scipy import linalg
import pandas as pd
import warnings
warnings.filterwarnings('ignore')

class DynamicPortfolioKF:
    """
    Dynamic portfolio Kalman filter
    """

    def __init__(self, n_assets):
        self.n_assets = n_assets

        # State: [expected returns, risk factor loadings, idiosyncratic risk]
        # Simplified model: expected return for each asset
        self.dim_x = n_assets

        # State vector: expected returns
        self.x = np.zeros((self.dim_x, 1))
        self.P = np.eye(self.dim_x)

        # System matrices
        self.F = 0.99 * np.eye(self.dim_x)  # Mean reversion of returns
        self.H = np.eye(self.n_assets)     # Directly observe returns

        # Noise covariance
        self.Q = 0.001 * np.eye(self.dim_x)  # Expected return changes
        self.R = 0.01 * np.eye(self.n_assets)   # Observation noise

        # Portfolio parameters
        self.risk_aversion = 5.0
        self.transaction_cost = 0.001

        # Historical data
        self.return_history = []
        self.weight_history = []
        self.portfolio_return_history = []

    def predict(self):
        """Prediction step"""
        self.x = self.F @ self.x
        self.P = self.F @ self.P @ self.F.T + self.Q

    def update(self, returns):
        """Update step"""
        returns = np.array(returns).reshape(-1, 1)

        # Standard Kalman update
        y = returns - self.H @ self.x
        S = self.H @ self.P @ self.H.T + self.R
        K = self.P @ self.H.T @ linalg.inv(S)

        self.x = self.x + K @ y
        self.P = (np.eye(self.dim_x) - K @ self.H) @ self.P

        # Save history
        self.return_history.append(returns.flatten())

    def estimate_covariance(self, window=50):
        """Estimate covariance matrix"""
        if len(self.return_history) < window:
            return self.R

        recent_returns = np.array(self.return_history[-window:])
        return np.cov(recent_returns.T)

    def optimize_portfolio(self, prev_weights=None):
        """
        Optimize portfolio weights

        Parameters:
        prev_weights: Previous period weights, used for transaction cost calculation
        """
        mu = self.x.flatten()  # Expected returns
        Sigma = self.estimate_covariance()  # Covariance matrix

        if prev_weights is None:
            prev_weights = np.ones(self.n_assets) / self.n_assets

        # Objective function: maximize utility = expected return - risk penalty - transaction costs
        def objective(w):
            portfolio_return = w @ mu
            portfolio_variance = w @ Sigma @ w
            transaction_costs = self.transaction_cost * np.sum(np.abs(w - prev_weights))

            utility = portfolio_return - 0.5 * self.risk_aversion * portfolio_variance - transaction_costs
            return -utility  # Minimize negative utility

        # Constraint: weights sum to 1
        constraints = [{'type': 'eq', 'fun': lambda w: np.sum(w) - 1}]

        # Bounds: no short selling constraint
        bounds = [(0, 1) for _ in range(self.n_assets)]

        # Initial guess
        x0 = prev_weights

        # Optimize
        result = minimize(objective, x0, method='SLSQP',
                         bounds=bounds, constraints=constraints)

        if result.success:
            return result.x
        else:
            return prev_weights  # If optimization fails, keep previous weights

    def calculate_portfolio_metrics(self, weights, returns):
        """Calculate portfolio metrics"""
        portfolio_return = weights @ returns

        if len(self.return_history) > 1:
            recent_returns = np.array(self.return_history[-252:])  # Last year
            portfolio_returns = recent_returns @ weights

            metrics = {
                'return': portfolio_return,
                'volatility': np.std(portfolio_returns) * np.sqrt(252),
                'sharpe_ratio': np.mean(portfolio_returns) / np.std(portfolio_returns) * np.sqrt(252) if np.std(portfolio_returns) > 0 else 0,
                'max_drawdown': self._calculate_max_drawdown(portfolio_returns)
            }
        else:
            metrics = {
                'return': portfolio_return,
                'volatility': 0,
                'sharpe_ratio': 0,
                'max_drawdown': 0
            }

        return metrics

    def _calculate_max_drawdown(self, returns):
        """Calculate maximum drawdown"""
        cumulative = np.cumprod(1 + returns)
        running_max = np.maximum.accumulate(cumulative)
        drawdown = (cumulative - running_max) / running_max
        return np.min(drawdown)

def portfolio_management_example():
    """
    Dynamic portfolio management example
    """
    # Setup
    n_assets = 4
    asset_names = ['Stock A', 'Stock B', 'Bond A', 'Commodity A']

    # Create dynamic portfolio manager
    portfolio = DynamicPortfolioKF(n_assets)

    # Simulate market data
    np.random.seed(42)
    T = 252 * 2  # 2 years of data

    # True expected returns (time-varying)
    true_mu = np.array([0.08, 0.10, 0.04, 0.06]) / 252  # Daily returns

    # Simulate return data
    correlation_matrix = np.array([
        [1.0, 0.6, -0.2, 0.3],
        [0.6, 1.0, -0.1, 0.4],
        [-0.2, -0.1, 1.0, -0.1],
        [0.3, 0.4, -0.1, 1.0]
    ])

    volatilities = np.array([0.20, 0.25, 0.08, 0.30]) / np.sqrt(252)  # Daily volatility
    covariance_matrix = np.outer(volatilities, volatilities) * correlation_matrix

    # Generate return series
    all_returns = []
    weights_history = []
    portfolio_values = []
    portfolio_value = 100000  # Initial portfolio value

    current_weights = np.ones(n_assets) / n_assets  # Equal weight start

    for t in range(T):
        # Generate daily returns
        if t < T // 3:
            mu_t = true_mu
        elif t < 2 * T // 3:
            mu_t = true_mu * 1.5  # Bull market
        else:
            mu_t = true_mu * 0.5  # Bear market

        daily_returns = np.random.multivariate_normal(mu_t, covariance_matrix)
        all_returns.append(daily_returns)

        # Update Kalman filter
        portfolio.predict()
        portfolio.update(daily_returns)

        # Rebalance every 10 days
        if t % 10 == 0 and t > 20:  # Start after sufficient history
            new_weights = portfolio.optimize_portfolio(current_weights)
            current_weights = new_weights

        weights_history.append(current_weights.copy())

        # Calculate portfolio return
        portfolio_return = current_weights @ daily_returns
        portfolio_value *= (1 + portfolio_return)
        portfolio_values.append(portfolio_value)

        # Save portfolio metrics
        portfolio.weight_history.append(current_weights.copy())
        portfolio.portfolio_return_history.append(portfolio_return)

    # Benchmark: equal weight portfolio
    benchmark_values = []
    benchmark_value = 100000
    benchmark_weights = np.ones(n_assets) / n_assets

    for daily_returns in all_returns:
        benchmark_return = benchmark_weights @ daily_returns
        benchmark_value *= (1 + benchmark_return)
        benchmark_values.append(benchmark_value)

    # Convert to arrays
    all_returns = np.array(all_returns)
    weights_history = np.array(weights_history)

    # Plot results
    fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(16, 12))

    # Portfolio value
    ax1.plot(portfolio_values, 'b-', linewidth=2, label='Dynamic Portfolio')
    ax1.plot(benchmark_values, 'r--', linewidth=2, label='Equal Weight Benchmark')
    ax1.set_ylabel('Portfolio Value')
    ax1.set_title('Portfolio Value Comparison')
    ax1.legend()
    ax1.grid(True, alpha=0.3)

    # Weight changes
    colors = ['blue', 'red', 'green', 'orange']
    for i, (name, color) in enumerate(zip(asset_names, colors)):
        ax2.plot(weights_history[:, i], color=color, linewidth=2, label=name)
    ax2.set_ylabel('Weight')
    ax2.set_title('Dynamic Weight Allocation')
    ax2.legend()
    ax2.grid(True, alpha=0.3)

    # Expected return estimates
    mu_estimates = np.array([est.flatten() for est in portfolio.return_history[1:]])
    for i, (name, color) in enumerate(zip(asset_names, colors)):
        ax3.plot(mu_estimates[:, i] * 252, color=color, linewidth=1, alpha=0.7, label=f'{name} Estimate')
        ax3.axhline(y=true_mu[i] * 252, color=color, linestyle='--', alpha=0.5)
    ax3.set_ylabel('Annualized Expected Return')
    ax3.set_title('Expected Return Estimation')
    ax3.legend()
    ax3.grid(True, alpha=0.3)

    # Rolling Sharpe ratio
    window = 60
    rolling_sharpe = []
    benchmark_sharpe = []

    for i in range(window, len(portfolio.portfolio_return_history)):
        # Dynamic portfolio
        port_returns = portfolio.portfolio_return_history[i-window:i]
        if np.std(port_returns) > 0:
            sharpe = np.mean(port_returns) / np.std(port_returns) * np.sqrt(252)
        else:
            sharpe = 0
        rolling_sharpe.append(sharpe)

        # Benchmark
        bench_returns = [benchmark_weights @ all_returns[j] for j in range(i-window, i)]
        if np.std(bench_returns) > 0:
            bench_sharpe = np.mean(bench_returns) / np.std(bench_returns) * np.sqrt(252)
        else:
            bench_sharpe = 0
        benchmark_sharpe.append(bench_sharpe)

    ax4.plot(rolling_sharpe, 'b-', linewidth=2, label='Dynamic Portfolio')
    ax4.plot(benchmark_sharpe, 'r--', linewidth=2, label='Equal Weight Benchmark')
    ax4.set_xlabel('Time')
    ax4.set_ylabel('Rolling Sharpe Ratio')
    ax4.set_title(f'{window}-Day Rolling Sharpe Ratio')
    ax4.legend()
    ax4.grid(True, alpha=0.3)

    plt.tight_layout()
    plt.show()

    # Performance summary
    final_dynamic_value = portfolio_values[-1]
    final_benchmark_value = benchmark_values[-1]

    dynamic_returns = np.array(portfolio.portfolio_return_history)
    benchmark_returns = [benchmark_weights @ ret for ret in all_returns]

    print("Portfolio Performance Comparison:")
    print("-" * 50)
    print(f"Dynamic Portfolio:")
    print(f"  Final Value: ${final_dynamic_value:,.0f}")
    print(f"  Total Return: {(final_dynamic_value / 100000 - 1) * 100:.2f}%")
    print(f"  Annualized Return: {(final_dynamic_value / 100000) ** (252 / T) - 1:.3f}")
    print(f"  Annualized Volatility: {np.std(dynamic_returns) * np.sqrt(252):.3f}")
    print(f"  Sharpe Ratio: {np.mean(dynamic_returns) / np.std(dynamic_returns) * np.sqrt(252):.3f}")

    print(f"\nEqual Weight Benchmark:")
    print(f"  Final Value: ${final_benchmark_value:,.0f}")
    print(f"  Total Return: {(final_benchmark_value / 100000 - 1) * 100:.2f}%")
    print(f"  Annualized Return: {(final_benchmark_value / 100000) ** (252 / T) - 1:.3f}")
    print(f"  Annualized Volatility: {np.std(benchmark_returns) * np.sqrt(252):.3f}")
    print(f"  Sharpe Ratio: {np.mean(benchmark_returns) / np.std(benchmark_returns) * np.sqrt(252):.3f}")

    print(f"\nExcess Return: ${final_dynamic_value - final_benchmark_value:,.0f}")

# Run example
# portfolio_management_example()

Risk Factor Modeling

Dynamic Estimation of Multi-Factor Models

class DynamicFactorModel:
    """
    Dynamic factor model: Fama-French + dynamic adjustment
    """

    def __init__(self, n_assets, n_factors=3):
        self.n_assets = n_assets
        self.n_factors = n_factors

        # State: factor loadings for each asset [alpha, beta_market, beta_size, beta_value, ...]
        self.dim_x = n_assets * (n_factors + 1)  # +1 for alpha

        # State vector organization: [asset1_params, asset2_params, ...]
        # asset_params = [alpha, beta_market, beta_size, beta_value]
        self.x = np.random.normal(0, 0.1, (self.dim_x, 1))
        self.P = 0.1 * np.eye(self.dim_x)

        # State transition (random walk + small mean reversion)
        self.F = 0.99 * np.eye(self.dim_x)
        self.Q = 1e-5 * np.eye(self.dim_x)

        # Observation noise (idiosyncratic risk)
        self.R = 0.01 * np.eye(n_assets)

        # Factor history
        self.factor_history = []
        self.asset_returns_history = []

    def update_factors(self, asset_returns, factor_returns):
        """
        Update factor model

        Parameters:
        asset_returns: Asset return vector (n_assets,)
        factor_returns: Factor return vector (n_factors,)
        """
        asset_returns = np.array(asset_returns).reshape(-1, 1)
        factor_returns = np.array(factor_returns)

        # Build observation matrix
        H = np.zeros((self.n_assets, self.dim_x))

        for i in range(self.n_assets):
            start_idx = i * (self.n_factors + 1)
            end_idx = (i + 1) * (self.n_factors + 1)

            # [1, factor1, factor2, factor3] for asset i
            H[i, start_idx:end_idx] = np.concatenate([[1], factor_returns])

        # Predict
        self.x = self.F @ self.x
        self.P = self.F @ self.P @ self.F.T + self.Q

        # Update
        y = asset_returns - H @ self.x
        S = H @ self.P @ H.T + self.R
        K = self.P @ H.T @ linalg.inv(S)

        self.x = self.x + K @ y
        self.P = (np.eye(self.dim_x) - K @ H) @ self.P

        # Save history
        self.factor_history.append(factor_returns.copy())
        self.asset_returns_history.append(asset_returns.flatten())

    def get_factor_loadings(self, asset_idx):
        """Get factor loadings for an asset"""
        start_idx = asset_idx * (self.n_factors + 1)
        end_idx = (asset_idx + 1) * (self.n_factors + 1)
        return self.x[start_idx:end_idx].flatten()

    def predict_returns(self, factor_forecasts):
        """
        Predict asset returns based on factor forecasts

        Parameters:
        factor_forecasts: Factor return forecasts
        """
        predictions = []

        for i in range(self.n_assets):
            loadings = self.get_factor_loadings(i)
            alpha = loadings[0]
            betas = loadings[1:]

            predicted_return = alpha + betas @ factor_forecasts
            predictions.append(predicted_return)

        return np.array(predictions)

    def calculate_factor_contributions(self, asset_idx, factor_returns):
        """Calculate factor contributions to returns"""
        loadings = self.get_factor_loadings(asset_idx)
        alpha = loadings[0]
        betas = loadings[1:]

        contributions = {
            'alpha': alpha,
            'market': betas[0] * factor_returns[0] if len(factor_returns) > 0 else 0,
            'size': betas[1] * factor_returns[1] if len(factor_returns) > 1 else 0,
            'value': betas[2] * factor_returns[2] if len(factor_returns) > 2 else 0
        }

        return contributions

def dynamic_factor_model_example():
    """
    Dynamic factor model example
    """
    # Setup
    n_assets = 5
    n_factors = 3
    asset_names = ['Large Cap A', 'Small Cap B', 'Value C', 'Growth D', 'Neutral E']
    factor_names = ['Market Factor', 'Size Factor', 'Value Factor']

    # Create dynamic factor model
    dfm = DynamicFactorModel(n_assets, n_factors)

    # Simulate factor returns and asset returns
    np.random.seed(42)
    T = 252

    # True factor loadings (for data generation)
    true_loadings = {
        0: [0.001, 1.2, -0.3, 0.1],   # Large cap: high market beta, negative size factor
        1: [0.002, 0.8, 0.5, -0.2],   # Small cap: positive size factor
        2: [0.0005, 1.0, 0.1, 0.6],   # Value: high value factor
        3: [0.003, 1.1, 0.2, -0.4],   # Growth: negative value factor
        4: [0.001, 1.0, 0.0, 0.0]     # Neutral: close to market
    }

    # Generate data
    factor_returns_history = []
    asset_returns_history = []
    factor_loadings_history = {i: [] for i in range(n_assets)}

    for t in range(T):
        # Generate factor returns
        if t < T // 2:
            # First half: normal market
            factor_returns = np.random.multivariate_normal(
                [0.0004, 0.0, 0.0],  # Market, size, value factors
                [[0.0001, 0, 0], [0, 0.00005, 0], [0, 0, 0.00003]]
            )
        else:
            # Second half: market stress
            factor_returns = np.random.multivariate_normal(
                [-0.001, 0.001, -0.0005],  # Market down, small caps resilient, value stressed
                [[0.0004, 0, 0], [0, 0.0001, 0], [0, 0, 0.00008]]
            )

        factor_returns_history.append(factor_returns)

        # Generate asset returns
        asset_returns = []
        for i in range(n_assets):
            loadings = true_loadings[i]
            asset_return = (loadings[0] +
                          loadings[1] * factor_returns[0] +
                          loadings[2] * factor_returns[1] +
                          loadings[3] * factor_returns[2] +
                          np.random.normal(0, 0.01))  # Idiosyncratic risk
            asset_returns.append(asset_return)

        asset_returns_history.append(asset_returns)

        # Update dynamic factor model
        dfm.update_factors(asset_returns, factor_returns)

        # Record estimated factor loadings
        for i in range(n_assets):
            estimated_loadings = dfm.get_factor_loadings(i)
            factor_loadings_history[i].append(estimated_loadings.copy())

    # Convert to arrays
    factor_returns_history = np.array(factor_returns_history)
    asset_returns_history = np.array(asset_returns_history)

    # Plot results
    fig, axes = plt.subplots(3, 2, figsize=(16, 18))

    # Factor return time series
    for i, factor_name in enumerate(factor_names):
        axes[0, 0].plot(factor_returns_history[:, i], label=factor_name)
    axes[0, 0].set_title('Factor Return Time Series')
    axes[0, 0].legend()
    axes[0, 0].grid(True, alpha=0.3)

    # Asset return time series
    for i, asset_name in enumerate(asset_names):
        axes[0, 1].plot(asset_returns_history[:, i], label=asset_name, alpha=0.7)
    axes[0, 1].set_title('Asset Return Time Series')
    axes[0, 1].legend()
    axes[0, 1].grid(True, alpha=0.3)

    # Market beta estimates
    for i, asset_name in enumerate(asset_names):
        market_betas = [loadings[1] for loadings in factor_loadings_history[i]]
        axes[1, 0].plot(market_betas, label=f'{asset_name}', linewidth=2)
        axes[1, 0].axhline(y=true_loadings[i][1], color=f'C{i}', linestyle='--', alpha=0.5)
    axes[1, 0].set_title('Market Beta Estimates (dashed lines are true values)')
    axes[1, 0].legend()
    axes[1, 0].grid(True, alpha=0.3)

    # Size factor loading estimates
    for i, asset_name in enumerate(asset_names):
        size_loadings = [loadings[2] for loadings in factor_loadings_history[i]]
        axes[1, 1].plot(size_loadings, label=f'{asset_name}', linewidth=2)
        axes[1, 1].axhline(y=true_loadings[i][2], color=f'C{i}', linestyle='--', alpha=0.5)
    axes[1, 1].set_title('Size Factor Loading Estimates')
    axes[1, 1].legend()
    axes[1, 1].grid(True, alpha=0.3)

    # Value factor loading estimates
    for i, asset_name in enumerate(asset_names):
        value_loadings = [loadings[3] for loadings in factor_loadings_history[i]]
        axes[2, 0].plot(value_loadings, label=f'{asset_name}', linewidth=2)
        axes[2, 0].axhline(y=true_loadings[i][3], color=f'C{i}', linestyle='--', alpha=0.5)
    axes[2, 0].set_title('Value Factor Loading Estimates')
    axes[2, 0].legend()
    axes[2, 0].grid(True, alpha=0.3)

    # Factor contribution decomposition (last period)
    last_factor_returns = factor_returns_history[-1]
    contributions_data = []

    for i, asset_name in enumerate(asset_names):
        contributions = dfm.calculate_factor_contributions(i, last_factor_returns)
        contributions_data.append([
            contributions['alpha'],
            contributions['market'],
            contributions['size'],
            contributions['value']
        ])

    contributions_data = np.array(contributions_data)

    # Stacked bar chart
    bottom = np.zeros(n_assets)
    colors = ['gray', 'blue', 'green', 'red']
    labels = ['Alpha', 'Market', 'Size', 'Value']

    for j, (color, label) in enumerate(zip(colors, labels)):
        axes[2, 1].bar(asset_names, contributions_data[:, j], bottom=bottom,
                      color=color, alpha=0.7, label=label)
        bottom += contributions_data[:, j]

    axes[2, 1].set_title('Last Period Factor Return Contribution Decomposition')
    axes[2, 1].legend()
    axes[2, 1].tick_params(axis='x', rotation=45)

    plt.tight_layout()
    plt.show()

    # Model fit evaluation
    print("Dynamic Factor Model Fit Evaluation:")
    print("-" * 50)

    for i, asset_name in enumerate(asset_names):
        # Calculate R²
        actual_returns = asset_returns_history[:, i]

        # Predict returns using model
        predicted_returns = []
        for t in range(len(factor_returns_history)):
            if t < len(factor_loadings_history[i]):
                loadings = factor_loadings_history[i][t]
                pred = (loadings[0] +
                       loadings[1] * factor_returns_history[t, 0] +
                       loadings[2] * factor_returns_history[t, 1] +
                       loadings[3] * factor_returns_history[t, 2])
                predicted_returns.append(pred)

        if len(predicted_returns) == len(actual_returns):
            r_squared = 1 - np.var(actual_returns - predicted_returns) / np.var(actual_returns)
            rmse = np.sqrt(np.mean((actual_returns - predicted_returns)**2))

            print(f"{asset_name}:")
            print(f"  R²: {r_squared:.3f}")
            print(f"  RMSE: {rmse:.5f}")

            # Final estimated loadings vs true loadings
            final_loadings = factor_loadings_history[i][-1]
            true_vals = true_loadings[i]
            print(f"  Loading estimates vs true:")
            print(f"    Alpha: {final_loadings[0]:.4f} vs {true_vals[0]:.4f}")
            print(f"    Market: {final_loadings[1]:.3f} vs {true_vals[1]:.3f}")
            print(f"    Size: {final_loadings[2]:.3f} vs {true_vals[2]:.3f}")
            print(f"    Value: {final_loadings[3]:.3f} vs {true_vals[3]:.3f}")
            print()

# Run example
# dynamic_factor_model_example()

Risk Budgeting Model

Kalman Filtering-Based Risk Budgeting

class RiskBudgetingKF:
    """
    Kalman filtering-based risk budgeting model
    """

    def __init__(self, n_assets):
        self.n_assets = n_assets

        # State: asset risk contribution rates
        self.dim_x = n_assets
        self.x = np.ones((self.dim_x, 1)) / self.n_assets  # Initial equal risk contribution
        self.P = 0.01 * np.eye(self.dim_x)

        # System model
        self.F = np.eye(self.dim_x)  # Random walk of risk contributions
        self.Q = 0.0001 * np.eye(self.dim_x)  # Small variations

        # Target risk budget
        self.target_risk_budget = np.ones(n_assets) / n_assets  # Equal risk budget

        # Historical data
        self.returns_history = []
        self.weights_history = []
        self.risk_contributions_history = []

    def update_risk_model(self, returns, weights):
        """
        Update risk model

        Parameters:
        returns: Asset returns
        weights: Current portfolio weights
        """
        returns = np.array(returns)
        weights = np.array(weights)

        # Calculate actual risk contributions
        if len(self.returns_history) >= 20:  # Need sufficient historical data
            recent_returns = np.array(self.returns_history[-20:])
            cov_matrix = np.cov(recent_returns.T)

            # Calculate marginal risk contributions
            portfolio_var = weights @ cov_matrix @ weights
            marginal_contributions = cov_matrix @ weights

            # Risk contribution = weight × marginal contribution / portfolio variance
            if portfolio_var > 0:
                risk_contributions = weights * marginal_contributions / portfolio_var
                risk_contributions = np.abs(risk_contributions)  # Ensure non-negative
                risk_contributions /= np.sum(risk_contributions)  # Normalize
            else:
                risk_contributions = weights / np.sum(weights)
        else:
            risk_contributions = weights / np.sum(weights)

        # Observation equation: observe actual risk contributions
        H = np.eye(self.n_assets)
        R = 0.01 * np.eye(self.n_assets)

        # Kalman filter update
        self.x = self.F @ self.x
        self.P = self.F @ self.P @ self.F.T + self.Q

        z = risk_contributions.reshape(-1, 1)
        y = z - H @ self.x
        S = H @ self.P @ H.T + R
        K = self.P @ H.T @ linalg.inv(S)

        self.x = self.x + K @ y
        self.P = (np.eye(self.dim_x) - K @ H) @ self.P

        # Save history
        self.returns_history.append(returns)
        self.weights_history.append(weights)
        self.risk_contributions_history.append(risk_contributions)

    def optimize_risk_budget_weights(self, expected_returns, cov_matrix):
        """
        Optimize weights based on risk budget

        Parameters:
        expected_returns: Expected returns
        cov_matrix: Covariance matrix
        """
        # Objective: find weights such that risk contributions match target budget
        def risk_budget_objective(weights):
            weights = np.abs(weights)
            weights /= np.sum(weights)  # Normalize

            # Calculate risk contributions
            portfolio_var = weights @ cov_matrix @ weights
            if portfolio_var <= 0:
                return 1e10

            marginal_contributions = cov_matrix @ weights
            risk_contributions = weights * marginal_contributions / portfolio_var
            risk_contributions = np.abs(risk_contributions)
            risk_contributions /= np.sum(risk_contributions)

            # Objective: minimize difference from target risk budget
            budget_error = np.sum((risk_contributions - self.target_risk_budget)**2)

            # Add return constraint (optional)
            expected_portfolio_return = weights @ expected_returns
            return_penalty = max(0, 0.08/252 - expected_portfolio_return) * 1000

            return budget_error + return_penalty

        # Constraints and bounds
        constraints = [{'type': 'eq', 'fun': lambda w: np.sum(np.abs(w)) - 1}]
        bounds = [(0, 1) for _ in range(self.n_assets)]

        # Initial guess
        x0 = np.ones(self.n_assets) / self.n_assets

        # Optimize
        from scipy.optimize import minimize
        result = minimize(risk_budget_objective, x0, method='SLSQP',
                         bounds=bounds, constraints=constraints)

        if result.success:
            optimal_weights = np.abs(result.x)
            optimal_weights /= np.sum(optimal_weights)
            return optimal_weights
        else:
            return x0

    def get_current_risk_budget(self):
        """Get current estimated risk budget"""
        return self.x.flatten()

def risk_budgeting_example():
    """
    Risk budgeting example
    """
    # Setup
    n_assets = 4
    asset_names = ['Stocks', 'Bonds', 'Real Estate', 'Commodities']

    # Create risk budgeting model
    rb = RiskBudgetingKF(n_assets)

    # Set target risk budget
    rb.target_risk_budget = np.array([0.4, 0.3, 0.2, 0.1])  # Stocks bear 40% risk

    # Simulate data
    np.random.seed(42)
    T = 252

    # Asset characteristics
    expected_returns = np.array([0.08, 0.04, 0.06, 0.05]) / 252  # Daily
    volatilities = np.array([0.20, 0.05, 0.15, 0.25]) / np.sqrt(252)

    # Correlation matrix
    correlation = np.array([
        [1.0, -0.2, 0.6, 0.3],
        [-0.2, 1.0, 0.1, -0.1],
        [0.6, 0.1, 1.0, 0.2],
        [0.3, -0.1, 0.2, 1.0]
    ])

    # Covariance matrix
    cov_matrix = np.outer(volatilities, volatilities) * correlation

    # Initial weights (equal weight)
    current_weights = np.ones(n_assets) / n_assets

    # Store results
    all_returns = []
    all_weights = []
    portfolio_values = []
    risk_budget_evolution = []
    actual_risk_contributions = []

    portfolio_value = 100000

    for t in range(T):
        # Generate returns
        daily_returns = np.random.multivariate_normal(expected_returns, cov_matrix)
        all_returns.append(daily_returns)

        # Update risk model
        rb.update_risk_model(daily_returns, current_weights)

        # Rebalance every 20 days
        if t % 20 == 0 and t > 40:
            # Update covariance matrix estimate
            recent_returns = np.array(all_returns[-40:])
            updated_cov = np.cov(recent_returns.T)

            # Optimize weights
            new_weights = rb.optimize_risk_budget_weights(expected_returns, updated_cov)
            current_weights = new_weights

        all_weights.append(current_weights.copy())

        # Calculate portfolio return
        portfolio_return = current_weights @ daily_returns
        portfolio_value *= (1 + portfolio_return)
        portfolio_values.append(portfolio_value)

        # Record risk budget evolution
        risk_budget_evolution.append(rb.get_current_risk_budget())

        # Calculate actual risk contributions (if sufficient data)
        if len(all_returns) >= 20:
            recent_returns = np.array(all_returns[-20:])
            recent_cov = np.cov(recent_returns.T)

            portfolio_var = current_weights @ recent_cov @ current_weights
            if portfolio_var > 0:
                marginal_contrib = recent_cov @ current_weights
                risk_contrib = current_weights * marginal_contrib / portfolio_var
                risk_contrib = np.abs(risk_contrib) / np.sum(np.abs(risk_contrib))
            else:
                risk_contrib = current_weights / np.sum(current_weights)
        else:
            risk_contrib = current_weights / np.sum(current_weights)

        actual_risk_contributions.append(risk_contrib)

    # Convert to arrays
    all_weights = np.array(all_weights)
    risk_budget_evolution = np.array(risk_budget_evolution)
    actual_risk_contributions = np.array(actual_risk_contributions)

    # Plot results
    fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(16, 12))

    # Portfolio value
    ax1.plot(portfolio_values, 'b-', linewidth=2)
    ax1.set_ylabel('Portfolio Value')
    ax1.set_title('Risk Budget Portfolio Value')
    ax1.grid(True, alpha=0.3)

    # Weight evolution
    colors = ['blue', 'red', 'green', 'orange']
    for i, (name, color) in enumerate(zip(asset_names, colors)):
        ax2.plot(all_weights[:, i], color=color, linewidth=2, label=name)
    ax2.set_ylabel('Weight')
    ax2.set_title('Portfolio Weight Evolution')
    ax2.legend()
    ax2.grid(True, alpha=0.3)

    # Target vs actual risk contributions
    for i, (name, color) in enumerate(zip(asset_names, colors)):
        ax3.plot(actual_risk_contributions[:, i], color=color,
                linewidth=2, label=f'{name} Actual')
        ax3.axhline(y=rb.target_risk_budget[i], color=color,
                   linestyle='--', alpha=0.7, label=f'{name} Target')
    ax3.set_ylabel('Risk Contribution')
    ax3.set_title('Risk Contribution: Target vs Actual')
    ax3.legend()
    ax3.grid(True, alpha=0.3)

    # Risk budget tracking error
    tracking_errors = []
    for i in range(len(actual_risk_contributions)):
        error = np.sqrt(np.mean((actual_risk_contributions[i] - rb.target_risk_budget)**2))
        tracking_errors.append(error)

    ax4.plot(tracking_errors, 'purple', linewidth=2)
    ax4.set_xlabel('Time')
    ax4.set_ylabel('Risk Budget Tracking Error (RMSE)')
    ax4.set_title('Risk Budget Tracking Quality')
    ax4.grid(True, alpha=0.3)

    plt.tight_layout()
    plt.show()

    # Performance summary
    final_value = portfolio_values[-1]
    total_return = (final_value / 100000 - 1) * 100

    portfolio_returns = []
    for i in range(len(all_returns)):
        port_ret = all_weights[i] @ all_returns[i]
        portfolio_returns.append(port_ret)

    annual_return = np.mean(portfolio_returns) * 252
    annual_vol = np.std(portfolio_returns) * np.sqrt(252)
    sharpe_ratio = annual_return / annual_vol if annual_vol > 0 else 0

    print("Risk Budget Portfolio Performance:")
    print("-" * 40)
    print(f"Final Value: ${final_value:,.0f}")
    print(f"Total Return: {total_return:.2f}%")
    print(f"Annualized Return: {annual_return:.3f}")
    print(f"Annualized Volatility: {annual_vol:.3f}")
    print(f"Sharpe Ratio: {sharpe_ratio:.3f}")

    print(f"\nRisk Budget Tracking Quality:")
    print(f"Average Tracking Error: {np.mean(tracking_errors):.4f}")
    print(f"Maximum Tracking Error: {np.max(tracking_errors):.4f}")

    print(f"\nFinal Risk Contribution vs Target:")
    final_contributions = actual_risk_contributions[-1]
    for i, name in enumerate(asset_names):
        print(f"{name}: {final_contributions[i]:.3f} vs {rb.target_risk_budget[i]:.3f}")

# Run example
# risk_budgeting_example()
Next Chapter Preview

In the next chapter, we will learn about financial derivatives pricing and risk management, including state-space representation of option pricing models, dynamic hedging strategies for interest rate derivatives, and dynamic modeling methods for credit risk.