Chapter 8: Fundamental Applications of Kalman Filtering in Finance

Haiyue
23min

Chapter 8: Fundamental Applications of Kalman Filtering in Finance

Learning Objectives
  • Master state-space methods for stock price dynamics modeling
  • Learn Kalman filter implementation for volatility estimation and prediction
  • Understand dynamic modeling of interest rate term structure

State-Space Methods for Financial Modeling

In finance, many important variables are unobservable latent states that need to be inferred from market observation data:

  • Latent States: Volatility, risk premium, liquidity, market sentiment
  • Observed Data: Stock prices, bond prices, option prices, trading volume

Advantages of State-Space Representation

Why Use State-Space Models
  1. Latent Factor Modeling: Ability to estimate unobservable financial variables
  2. Dynamic Relationships: Capture time-varying characteristics of financial variables
  3. Multivariate Modeling: Handle multiple correlated assets and factors
  4. Predictive Capability: Provide probability distributions for future states
  5. Real-Time Updates: Instantly update estimates when new information arrives

Stock Price Dynamics Modeling

Random Walk Model

The simplest stock price model is a random walk:

State Equation: logSt=logSt1+μ+σϵt\log S_t = \log S_{t-1} + \mu + \sigma \epsilon_t

Observation Equation: Pt=St(observe price with noise)P_t = S_t \quad \text{(observe price with noise)}

import numpy as np
import matplotlib.pyplot as plt
from scipy import linalg
import pandas as pd

class StockPriceKF:
    """
    Stock Price Kalman Filter Model
    """

    def __init__(self, initial_price=100.0):
        # State: [log_price, drift]
        self.x = np.array([[np.log(initial_price)], [0.0]])
        self.P = np.array([[0.01, 0], [0, 0.001]])

        # State transition matrix (random walk + drift)
        self.F = np.array([[1, 1], [0, 1]])

        # Observation matrix (observe exponentially transformed price)
        self.H = np.array([[1, 0]])

        # Process noise covariance
        self.Q = np.array([[0.01, 0], [0, 0.0001]])

        # Observation noise covariance
        self.R = np.array([[0.001]])

    def predict(self):
        """Prediction step"""
        self.x = self.F @ self.x
        self.P = self.F @ self.P @ self.F.T + self.Q

    def update(self, price):
        """Update step"""
        z = np.array([[np.log(price)]])

        # Innovation
        y = z - self.H @ self.x
        S = self.H @ self.P @ self.H.T + self.R

        # Kalman gain
        K = self.P @ self.H.T @ linalg.inv(S)

        # Update
        self.x = self.x + K @ y
        self.P = (np.eye(2) - K @ self.H) @ self.P

    def get_price_estimate(self):
        """Get price estimate"""
        return np.exp(self.x[0, 0])

    def get_drift_estimate(self):
        """Get drift estimate"""
        return self.x[1, 0]

def stock_price_filtering_example():
    """
    Stock price filtering example
    """
    # Simulate true stock price data
    np.random.seed(42)
    T = 252  # One year of trading days
    true_drift = 0.0002  # Daily drift rate
    true_vol = 0.02      # Daily volatility

    prices = [100.0]
    for t in range(1, T):
        log_return = true_drift + true_vol * np.random.normal()
        prices.append(prices[-1] * np.exp(log_return))

    # Add observation noise
    observed_prices = [p + np.random.normal(0, 0.1) for p in prices]

    # Kalman filtering
    kf = StockPriceKF(initial_price=observed_prices[0])
    estimated_prices = []
    estimated_drifts = []

    for price in observed_prices:
        kf.predict()
        kf.update(price)
        estimated_prices.append(kf.get_price_estimate())
        estimated_drifts.append(kf.get_drift_estimate())

    # Plot results
    plt.figure(figsize=(15, 10))

    plt.subplot(3, 1, 1)
    plt.plot(prices, 'g-', label='True Price', linewidth=2)
    plt.plot(observed_prices, 'r.', label='Observed Price', alpha=0.7, markersize=3)
    plt.plot(estimated_prices, 'b-', label='Kalman Estimate', linewidth=2)
    plt.legend()
    plt.title('Stock Price Kalman Filtering')
    plt.ylabel('Price')

    plt.subplot(3, 1, 2)
    plt.plot([true_drift * 252] * T, 'g-', label='True Annualized Drift', linewidth=2)
    plt.plot(np.array(estimated_drifts) * 252, 'b-', label='Estimated Annualized Drift', linewidth=2)
    plt.legend()
    plt.ylabel('Annualized Drift Rate')

    plt.subplot(3, 1, 3)
    errors = np.array(prices) - np.array(estimated_prices)
    plt.plot(errors, 'r-', label='Estimation Error')
    plt.axhline(y=0, color='k', linestyle='--', alpha=0.5)
    plt.legend()
    plt.xlabel('Trading Day')
    plt.ylabel('Price Error')

    plt.tight_layout()
    plt.show()

    return prices, observed_prices, estimated_prices, estimated_drifts

# Run example
# stock_price_filtering_example()

Time-Varying Volatility Model

Stock volatility is typically time-varying and can be modeled using state-space models:

class StochasticVolatilityKF:
    """
    Stochastic Volatility Kalman Filter Model
    """

    def __init__(self, initial_price=100.0, initial_vol=0.2):
        # State: [log_price, log_variance]
        self.x = np.array([[np.log(initial_price)], [np.log(initial_vol**2)]])
        self.P = np.array([[0.01, 0], [0, 0.1]])

        # Model parameters
        self.mu = 0.0001      # Expected return
        self.kappa = 0.1      # Variance mean reversion speed
        self.theta = np.log(0.04)  # Long-term variance (log)
        self.sigma_v = 0.1    # Volatility of volatility

        # State transition matrix (linearized)
        self.F = np.array([[1, 0], [0, 1 - self.kappa]])

        # Observation matrix
        self.H = np.array([[1, 0]])

        # Noise covariances
        self.Q = np.array([[0, 0], [0, self.sigma_v**2]])
        self.R = np.array([[0.0001]])

    def predict(self, dt=1.0):
        """Prediction step (nonlinear)"""
        log_S = self.x[0, 0]
        log_var = self.x[1, 0]

        # Nonlinear state transition
        new_log_S = log_S + (self.mu - 0.5 * np.exp(log_var)) * dt
        new_log_var = log_var + self.kappa * (self.theta - log_var) * dt

        self.x = np.array([[new_log_S], [new_log_var]])

        # Linearized Jacobian matrix
        F = np.array([[1, -0.5 * np.exp(log_var) * dt],
                     [0, 1 - self.kappa * dt]])

        # Covariance prediction
        Q_scaled = self.Q * dt
        Q_scaled[0, 0] = np.exp(log_var) * dt  # Noise in price equation

        self.P = F @ self.P @ F.T + Q_scaled

    def update(self, price):
        """Update step"""
        z = np.array([[np.log(price)]])

        y = z - self.H @ self.x
        S = self.H @ self.P @ self.H.T + self.R
        K = self.P @ self.H.T @ linalg.inv(S)

        self.x = self.x + K @ y
        self.P = (np.eye(2) - K @ self.H) @ self.P

    def get_price_estimate(self):
        return np.exp(self.x[0, 0])

    def get_volatility_estimate(self):
        return np.sqrt(np.exp(self.x[1, 0]))

def stochastic_volatility_example():
    """
    Stochastic volatility estimation example
    """
    # Simulate Heston process
    np.random.seed(42)
    T = 252
    dt = 1.0

    # Heston parameters
    S0, V0 = 100.0, 0.04
    mu, kappa, theta, sigma_v = 0.05/252, 2.0, 0.04, 0.3
    rho = -0.5

    # Simulate path
    prices = [S0]
    volatilities = [np.sqrt(V0)]
    S, V = S0, V0

    for t in range(1, T):
        dW1 = np.random.normal() * np.sqrt(dt)
        dW2 = rho * dW1 + np.sqrt(1 - rho**2) * np.random.normal() * np.sqrt(dt)

        # Update variance
        V = max(V + kappa * (theta - V) * dt + sigma_v * np.sqrt(max(V, 0)) * dW2, 1e-8)

        # Update price
        S = S * np.exp((mu - 0.5 * V) * dt + np.sqrt(V) * dW1)

        prices.append(S)
        volatilities.append(np.sqrt(V))

    # Kalman filter estimation
    kf = StochasticVolatilityKF(prices[0])
    estimated_prices = []
    estimated_vols = []

    for price in prices:
        kf.predict()
        kf.update(price)
        estimated_prices.append(kf.get_price_estimate())
        estimated_vols.append(kf.get_volatility_estimate())

    # Plot results
    plt.figure(figsize=(15, 8))

    plt.subplot(2, 1, 1)
    plt.plot(prices, 'g-', label='True Price', linewidth=2)
    plt.plot(estimated_prices, 'b--', label='Estimated Price', linewidth=2)
    plt.legend()
    plt.title('Stochastic Volatility Model: Price and Volatility Estimation')
    plt.ylabel('Price')

    plt.subplot(2, 1, 2)
    plt.plot(volatilities, 'g-', label='True Volatility', linewidth=2)
    plt.plot(estimated_vols, 'r--', label='Estimated Volatility', linewidth=2)
    plt.legend()
    plt.xlabel('Trading Day')
    plt.ylabel('Volatility')

    plt.tight_layout()
    plt.show()

    return prices, volatilities, estimated_prices, estimated_vols

# Run example
# stochastic_volatility_example()

Interest Rate Term Structure Modeling

State-Space Representation of the Vasicek Model

The Vasicek interest rate model can be expressed in state-space form:

State Equation: drt=κ(θrt)dt+σdWtdr_t = \kappa(\theta - r_t)dt + \sigma dW_t

Observation Equation: B(t,T)=A(t,T)rtBduration(t,T)B(t,T) = A(t,T) - r_t \cdot B_{duration}(t,T)

class VasicekTermStructureKF:
    """
    Vasicek Term Structure Kalman Filter Model
    """

    def __init__(self, maturities, initial_rate=0.03):
        self.maturities = np.array(maturities)
        self.n_bonds = len(maturities)

        # State: [short_rate, long_term_mean, mean_reversion_speed]
        self.x = np.array([[initial_rate], [initial_rate], [0.1]])
        self.P = np.array([[0.0001, 0, 0],
                          [0, 0.0001, 0],
                          [0, 0, 0.01]])

        # Vasicek parameters
        self.kappa = 0.1
        self.sigma = 0.01

        # Observation matrix (zero-coupon bond prices)
        self.H = self._compute_observation_matrix()

        # Process noise
        self.Q = np.array([[self.sigma**2, 0, 0],
                          [0, 1e-6, 0],
                          [0, 0, 1e-6]])

        # Observation noise
        self.R = 1e-4 * np.eye(self.n_bonds)

    def _compute_observation_matrix(self):
        """Compute observation matrix"""
        H = np.zeros((self.n_bonds, 3))

        for i, T in enumerate(self.maturities):
            if T > 0:
                # Linearization of Vasicek bond price formula
                B_T = (1 - np.exp(-self.kappa * T)) / self.kappa
                A_T = np.exp((B_T - T) * (self.kappa**2 * 0.04 - 0.5 * self.sigma**2) / self.kappa**2 -
                           0.25 * self.sigma**2 * B_T**2 / self.kappa)

                H[i, 0] = -B_T  # Sensitivity to short rate

        return H

    def predict(self, dt=1.0):
        """Prediction step"""
        r_t = self.x[0, 0]
        theta = self.x[1, 0]
        kappa = self.x[2, 0]

        # State transition (Euler discretization)
        new_r = r_t + kappa * (theta - r_t) * dt
        new_theta = theta  # Assume long-term mean is constant
        new_kappa = kappa  # Assume mean reversion speed is constant

        self.x = np.array([[new_r], [new_theta], [new_kappa]])

        # Jacobian matrix
        F = np.array([[1 - kappa * dt, kappa * dt, (theta - r_t) * dt],
                     [0, 1, 0],
                     [0, 0, 1]])

        self.P = F @ self.P @ F.T + self.Q * dt

    def update(self, bond_prices):
        """Update step"""
        z = np.log(bond_prices).reshape(-1, 1)

        # Update observation matrix
        self.H = self._compute_observation_matrix()

        # Predict observation
        z_pred = self.H @ self.x

        y = z - z_pred
        S = self.H @ self.P @ self.H.T + self.R
        K = self.P @ self.H.T @ linalg.inv(S)

        self.x = self.x + K @ y
        self.P = (np.eye(3) - K @ self.H) @ self.P

    def get_short_rate(self):
        return self.x[0, 0]

    def get_long_term_mean(self):
        return self.x[1, 0]

    def get_mean_reversion_speed(self):
        return self.x[2, 0]

def term_structure_example():
    """
    Term structure modeling example
    """
    # Set maturities
    maturities = [0.25, 0.5, 1, 2, 5, 10]  # years
    T = 100  # Observation days

    # Simulate true Vasicek process
    np.random.seed(42)
    true_kappa, true_theta, true_sigma = 0.1, 0.04, 0.01
    true_rates = [0.03]

    for t in range(1, T):
        dr = true_kappa * (true_theta - true_rates[-1]) * (1/252) + \
             true_sigma * np.sqrt(1/252) * np.random.normal()
        true_rates.append(max(true_rates[-1] + dr, 0.001))

    # Generate bond price observations
    def vasicek_bond_price(r, T, kappa, theta, sigma):
        B = (1 - np.exp(-kappa * T)) / kappa if kappa != 0 else T
        A = np.exp((B - T) * (kappa * theta - 0.5 * sigma**2) / kappa**2 -
                  0.25 * sigma**2 * B**2 / kappa) if kappa != 0 else np.exp(-0.5 * sigma**2 * T**3 / 3)
        return A * np.exp(-B * r)

    bond_prices_series = []
    for r in true_rates:
        prices = []
        for mat in maturities:
            true_price = vasicek_bond_price(r, mat, true_kappa, true_theta, true_sigma)
            noisy_price = true_price * (1 + np.random.normal(0, 0.001))
            prices.append(noisy_price)
        bond_prices_series.append(prices)

    # Kalman filter estimation
    kf = VasicekTermStructureKF(maturities)
    estimated_rates = []
    estimated_theta = []
    estimated_kappa = []

    for prices in bond_prices_series:
        kf.predict()
        kf.update(np.array(prices))
        estimated_rates.append(kf.get_short_rate())
        estimated_theta.append(kf.get_long_term_mean())
        estimated_kappa.append(kf.get_mean_reversion_speed())

    # Plot results
    plt.figure(figsize=(15, 12))

    plt.subplot(3, 1, 1)
    plt.plot(true_rates, 'g-', label='True Short Rate', linewidth=2)
    plt.plot(estimated_rates, 'b--', label='Estimated Short Rate', linewidth=2)
    plt.legend()
    plt.title('Vasicek Term Structure Model')
    plt.ylabel('Interest Rate')

    plt.subplot(3, 1, 2)
    plt.plot([true_theta] * T, 'g-', label='True Long-Term Mean', linewidth=2)
    plt.plot(estimated_theta, 'r--', label='Estimated Long-Term Mean', linewidth=2)
    plt.legend()
    plt.ylabel('Long-Term Mean')

    plt.subplot(3, 1, 3)
    plt.plot([true_kappa] * T, 'g-', label='True Mean Reversion Speed', linewidth=2)
    plt.plot(estimated_kappa, 'orange', label='Estimated Mean Reversion Speed', linewidth=2, linestyle='--')
    plt.legend()
    plt.xlabel('Time')
    plt.ylabel('Mean Reversion Speed')

    plt.tight_layout()
    plt.show()

    return true_rates, estimated_rates, estimated_theta, estimated_kappa

# Run example
# term_structure_example()

Risk Factor Modeling

Fama-French Three-Factor Model

Using Kalman filtering to estimate time-varying risk factor loadings:

class FamaFrenchKF:
    """
    Fama-French Three-Factor Model Kalman Filter Implementation
    """

    def __init__(self, n_assets):
        self.n_assets = n_assets

        # State: [alpha, beta_market, beta_size, beta_value] for each asset
        self.n_factors = 4
        self.dim_x = n_assets * self.n_factors

        # Initialize state (random initial values)
        self.x = np.random.normal(0, 0.1, (self.dim_x, 1))

        # Initial covariance
        self.P = 0.01 * np.eye(self.dim_x)

        # State transition (random walk + slight mean reversion)
        self.F = 0.99 * np.eye(self.dim_x)

        # Process noise
        self.Q = 1e-5 * np.eye(self.dim_x)

        # Observation noise (idiosyncratic risk)
        self.R = 0.01 * np.eye(n_assets)

    def predict(self):
        """Prediction step"""
        self.x = self.F @ self.x
        self.P = self.F @ self.P @ self.F.T + self.Q

    def update(self, returns, factor_returns):
        """
        Update step

        Parameters:
        returns: Asset return vector (n_assets,)
        factor_returns: Factor returns [market, size, value] (3,)
        """
        # Construct observation matrix
        factors = np.concatenate([[1.0], factor_returns])  # [1, MKT, SMB, HML]
        H = np.zeros((self.n_assets, self.dim_x))

        for i in range(self.n_assets):
            start_idx = i * self.n_factors
            end_idx = (i + 1) * self.n_factors
            H[i, start_idx:end_idx] = factors

        # Predict returns
        y_pred = H @ self.x

        # Innovation
        y = returns.reshape(-1, 1) - y_pred
        S = H @ self.P @ H.T + self.R

        # Kalman gain
        K = self.P @ H.T @ linalg.inv(S)

        # Update
        self.x = self.x + K @ y
        self.P = (np.eye(self.dim_x) - K @ H) @ self.P

    def get_factor_loadings(self, asset_idx):
        """Get factor loadings for an asset"""
        start_idx = asset_idx * self.n_factors
        end_idx = (asset_idx + 1) * self.n_factors
        return self.x[start_idx:end_idx].flatten()

def fama_french_example():
    """
    Fama-French model example
    """
    # Simulate data
    np.random.seed(42)
    T = 252  # One year of data
    n_assets = 5

    # True factor loadings (time-invariant for validation)
    true_loadings = {
        'alpha': np.random.normal(0, 0.001, n_assets),
        'beta_market': np.random.uniform(0.5, 1.5, n_assets),
        'beta_size': np.random.uniform(-0.5, 0.5, n_assets),
        'beta_value': np.random.uniform(-0.3, 0.3, n_assets)
    }

    # Simulate factor returns
    factor_returns = []
    asset_returns = []

    for t in range(T):
        # Factor returns
        mkt = np.random.normal(0.0004, 0.01)  # Market factor
        smb = np.random.normal(0, 0.005)      # Size factor
        hml = np.random.normal(0, 0.005)      # Value factor
        factors = np.array([mkt, smb, hml])
        factor_returns.append(factors)

        # Asset returns
        returns = []
        for i in range(n_assets):
            ret = (true_loadings['alpha'][i] +
                  true_loadings['beta_market'][i] * mkt +
                  true_loadings['beta_size'][i] * smb +
                  true_loadings['beta_value'][i] * hml +
                  np.random.normal(0, 0.01))  # Idiosyncratic risk
            returns.append(ret)
        asset_returns.append(returns)

    # Kalman filter estimation
    kf = FamaFrenchKF(n_assets)

    estimated_loadings = {
        'alpha': [[] for _ in range(n_assets)],
        'beta_market': [[] for _ in range(n_assets)],
        'beta_size': [[] for _ in range(n_assets)],
        'beta_value': [[] for _ in range(n_assets)]
    }

    for t in range(T):
        kf.predict()
        kf.update(np.array(asset_returns[t]), factor_returns[t])

        # Record estimated factor loadings
        for i in range(n_assets):
            loadings = kf.get_factor_loadings(i)
            estimated_loadings['alpha'][i].append(loadings[0])
            estimated_loadings['beta_market'][i].append(loadings[1])
            estimated_loadings['beta_size'][i].append(loadings[2])
            estimated_loadings['beta_value'][i].append(loadings[3])

    # Plot results for first asset
    plt.figure(figsize=(15, 12))

    factors = ['alpha', 'beta_market', 'beta_size', 'beta_value']
    for i, factor in enumerate(factors):
        plt.subplot(2, 2, i+1)
        plt.plot([true_loadings[factor][0]] * T, 'g-',
                label=f'True {factor}', linewidth=2)
        plt.plot(estimated_loadings[factor][0], 'b--',
                label=f'Estimated {factor}', linewidth=2)
        plt.legend()
        plt.title(f'{factor} Loading for Asset 1')
        plt.xlabel('Time')
        plt.ylabel('Loading Value')

    plt.tight_layout()
    plt.show()

    return estimated_loadings, true_loadings

# Run example
# fama_french_example()

Practical Implementation Considerations

1. Data Preprocessing

Specifics of Financial Data
  • Missing Data: Holidays, trading halts
  • Outliers: Price jumps, erroneous quotes
  • Frequency Conversion: Aggregating high-frequency to low-frequency
  • Time Zone Issues: Synchronizing cross-market data

2. Model Validation

def model_validation_metrics(true_values, estimates, predictions=None):
    """
    Compute model validation metrics
    """
    errors = np.array(true_values) - np.array(estimates)

    metrics = {
        'RMSE': np.sqrt(np.mean(errors**2)),
        'MAE': np.mean(np.abs(errors)),
        'MAPE': np.mean(np.abs(errors / np.array(true_values))) * 100,
        'Max_Error': np.max(np.abs(errors)),
        'R_squared': 1 - np.var(errors) / np.var(true_values)
    }

    if predictions is not None:
        pred_errors = np.array(true_values[1:]) - np.array(predictions[:-1])
        metrics['Pred_RMSE'] = np.sqrt(np.mean(pred_errors**2))

    return metrics

# Example usage
def validate_stock_model():
    """Validate stock price model"""
    prices, observed, estimated, _ = stock_price_filtering_example()
    metrics = model_validation_metrics(prices, estimated)

    print("Stock Price Model Validation Metrics:")
    for metric, value in metrics.items():
        print(f"{metric}: {value:.6f}")

# validate_stock_model()

3. Real-Time Implementation Considerations

class RealTimeKalmanFilter:
    """
    Real-Time Kalman Filter Implementation
    """

    def __init__(self, kf_class, **kwargs):
        self.kf = kf_class(**kwargs)
        self.last_update_time = None
        self.buffer = []

    def add_observation(self, timestamp, observation):
        """Add new observation"""
        if self.last_update_time is not None:
            dt = (timestamp - self.last_update_time).total_seconds() / 86400  # Convert to days
            self.kf.predict_with_dt(dt)

        self.kf.update(observation)
        self.last_update_time = timestamp

        # Save history
        self.buffer.append({
            'timestamp': timestamp,
            'observation': observation,
            'estimate': self.kf.get_estimate(),
            'covariance': self.kf.P.copy()
        })

        # Limit buffer size
        if len(self.buffer) > 1000:
            self.buffer.pop(0)

    def get_current_estimate(self):
        """Get current estimate"""
        return self.kf.get_estimate()

    def get_prediction(self, horizon_days):
        """Get future prediction"""
        kf_copy = self.kf.copy()
        kf_copy.predict_with_dt(horizon_days)
        return kf_copy.get_estimate()
Next Chapter Preview

In the next chapter, we will learn about system modeling and parameter tuning, including how to select appropriate state-space models, noise covariance tuning, and model validation techniques.