Financial Derivatives Pricing and Risk Management

Student
34min

Chapter 13: Financial Derivatives Pricing and Risk Management

Learning Objectives
  • Master the application of Kalman filtering in option pricing
  • Learn dynamic modeling methods for interest rate derivatives
  • Understand Kalman filter methods for credit risk modeling
  • Master volatility modeling and risk measurement techniques
  • Implement a complete framework for derivatives pricing

1. Kalman Filter in Option Pricing

1.1 Dynamic Estimation of Implied Volatility

In option pricing, volatility is the most critical parameter. We can use Kalman filtering to dynamically estimate implied volatility:

import numpy as np
import pandas as pd
from scipy.stats import norm
from scipy.optimize import minimize
import matplotlib.pyplot as plt
from filterpy.kalman import KalmanFilter
import warnings
warnings.filterwarnings('ignore')

class ImpliedVolatilityKF:
    """Estimate implied volatility using Kalman filter"""

    def __init__(self, dt=1/252):
        self.dt = dt
        self.kf = KalmanFilter(dim_x=2, dim_z=1)

        # State transition matrix: [vol, vol_change]
        self.kf.F = np.array([[1., dt],
                             [0., 1.]])

        # Observation matrix: we observe option price
        self.kf.H = np.array([[1., 0.]])

        # Process noise covariance
        q = 0.001
        self.kf.Q = np.array([[q*dt**3/3, q*dt**2/2],
                             [q*dt**2/2, q*dt]])

        # Observation noise
        self.kf.R = np.array([[0.01]])

        # Initial state
        self.kf.x = np.array([[0.2], [0.0]])  # Initial volatility 20%
        self.kf.P = np.eye(2) * 0.1

    def black_scholes_price(self, S, K, T, r, sigma, option_type='call'):
        """Black-Scholes option pricing formula"""
        d1 = (np.log(S/K) + (r + 0.5*sigma**2)*T) / (sigma*np.sqrt(T))
        d2 = d1 - sigma*np.sqrt(T)

        if option_type == 'call':
            price = S*norm.cdf(d1) - K*np.exp(-r*T)*norm.cdf(d2)
        else:
            price = K*np.exp(-r*T)*norm.cdf(-d2) - S*norm.cdf(-d1)

        return price

    def option_vega(self, S, K, T, r, sigma):
        """Option Vega"""
        d1 = (np.log(S/K) + (r + 0.5*sigma**2)*T) / (sigma*np.sqrt(T))
        vega = S * norm.pdf(d1) * np.sqrt(T)
        return vega

    def update_volatility(self, market_price, S, K, T, r, option_type='call'):
        """Update volatility estimate"""
        # Prediction step
        self.kf.predict()

        # Current volatility estimate
        current_vol = max(self.kf.x[0, 0], 0.01)  # Avoid negative volatility

        # Theoretical option price
        theoretical_price = self.black_scholes_price(S, K, T, r, current_vol, option_type)

        # Vega for linearization
        vega = self.option_vega(S, K, T, r, current_vol)

        # Convert price difference to volatility difference
        vol_diff = (market_price - theoretical_price) / vega if vega > 0 else 0

        # Update step
        self.kf.update(vol_diff)

        return self.kf.x[0, 0], theoretical_price

# Example: Dynamic implied volatility estimation
def demonstrate_implied_volatility():
    np.random.seed(42)

    # Simulate option market data
    n_days = 100
    S0 = 100  # Stock price
    K = 100   # Strike price
    T = 0.25  # 3 months to expiry
    r = 0.05  # Risk-free rate

    # True volatility path (random walk)
    true_vol = np.zeros(n_days)
    true_vol[0] = 0.2
    for i in range(1, n_days):
        true_vol[i] = max(true_vol[i-1] + np.random.normal(0, 0.01), 0.05)

    # Simulate stock price path
    stock_prices = np.zeros(n_days)
    stock_prices[0] = S0
    for i in range(1, n_days):
        dt = 1/252
        stock_prices[i] = stock_prices[i-1] * np.exp(
            (r - 0.5*true_vol[i]**2)*dt + true_vol[i]*np.sqrt(dt)*np.random.normal()
        )

    # Generate market option prices (add noise)
    iv_kf = ImpliedVolatilityKF()
    estimated_vols = []
    theoretical_prices = []
    market_prices = []

    for i in range(n_days):
        S = stock_prices[i]
        T_remaining = T - i/252

        if T_remaining > 0:
            # True market price (calculated with true volatility + noise)
            true_price = iv_kf.black_scholes_price(S, K, T_remaining, r, true_vol[i])
            market_price = true_price + np.random.normal(0, 0.1)

            # Kalman filter estimation
            est_vol, theo_price = iv_kf.update_volatility(
                market_price, S, K, T_remaining, r
            )

            estimated_vols.append(est_vol)
            theoretical_prices.append(theo_price)
            market_prices.append(market_price)

    # Plot
    fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(15, 10))

    # Stock price trend
    ax1.plot(stock_prices[:len(estimated_vols)])
    ax1.set_title('Stock Price Trend')
    ax1.set_ylabel('Stock Price')

    # Volatility estimation
    ax2.plot(true_vol[:len(estimated_vols)], label='True Volatility', alpha=0.7)
    ax2.plot(estimated_vols, label='Kalman Filter Estimate', alpha=0.7)
    ax2.set_title('Volatility Estimation')
    ax2.set_ylabel('Volatility')
    ax2.legend()

    # Option price
    ax3.plot(market_prices, label='Market Price', alpha=0.7)
    ax3.plot(theoretical_prices, label='Theoretical Price', alpha=0.7)
    ax3.set_title('Option Price')
    ax3.set_ylabel('Price')
    ax3.legend()

    # Estimation error
    vol_errors = np.array(estimated_vols) - true_vol[:len(estimated_vols)]
    ax4.plot(vol_errors)
    ax4.set_title('Volatility Estimation Error')
    ax4.set_ylabel('Error')
    ax4.axhline(y=0, color='r', linestyle='--', alpha=0.5)

    plt.tight_layout()
    plt.show()

    # Calculate performance metrics
    mse = np.mean(vol_errors**2)
    mae = np.mean(np.abs(vol_errors))

    print(f"Volatility Estimation Performance:")
    print(f"MSE: {mse:.6f}")
    print(f"MAE: {mae:.6f}")

    return estimated_vols, true_vol[:len(estimated_vols)]

demonstrate_implied_volatility()

1.2 Multi-Factor Option Pricing Model

For complex derivatives, we need to consider multiple risk factors:

class MultiFactorOptionKF:
    """Multi-factor option pricing Kalman filter model"""

    def __init__(self):
        # State vector: [spot_price, volatility, interest_rate, dividend_yield]
        self.n_factors = 4
        self.kf = KalmanFilter(dim_x=self.n_factors, dim_z=1)

        # State transition matrix (simplified random walk model)
        dt = 1/252
        self.kf.F = np.eye(self.n_factors)

        # Observation matrix: we observe option price
        self.kf.H = np.array([[1., 0., 0., 0.]])

        # Process noise covariance
        q_spot = 0.01      # Stock price noise
        q_vol = 0.001      # Volatility noise
        q_rate = 0.0001    # Interest rate noise
        q_div = 0.0001     # Dividend noise

        self.kf.Q = np.diag([q_spot, q_vol, q_rate, q_div])

        # Observation noise
        self.kf.R = np.array([[0.1]])

        # Initial state
        self.kf.x = np.array([[100.], [0.2], [0.05], [0.02]])
        self.kf.P = np.eye(self.n_factors) * 0.1

    def european_option_price(self, S, K, T, r, q, sigma, option_type='call'):
        """European option pricing with dividends"""
        d1 = (np.log(S/K) + (r - q + 0.5*sigma**2)*T) / (sigma*np.sqrt(T))
        d2 = d1 - sigma*np.sqrt(T)

        if option_type == 'call':
            price = S*np.exp(-q*T)*norm.cdf(d1) - K*np.exp(-r*T)*norm.cdf(d2)
        else:
            price = K*np.exp(-r*T)*norm.cdf(-d2) - S*np.exp(-q*T)*norm.cdf(-d1)

        return price

    def update_factors(self, market_option_price, K, T, option_type='call'):
        """Update multi-factor estimates"""
        # Prediction step
        self.kf.predict()

        # Current factor estimates
        S = max(self.kf.x[0, 0], 1.0)      # Stock price
        vol = max(self.kf.x[1, 0], 0.01)   # Volatility
        r = max(self.kf.x[2, 0], 0.0)      # Interest rate
        q = max(self.kf.x[3, 0], 0.0)      # Dividend yield

        # Theoretical option price
        theoretical_price = self.european_option_price(S, K, T, r, q, vol, option_type)

        # Price residual
        price_residual = market_option_price - theoretical_price

        # Update step (simplified to only update stock price)
        self.kf.update([price_residual])

        return {
            'spot_price': self.kf.x[0, 0],
            'volatility': self.kf.x[1, 0],
            'interest_rate': self.kf.x[2, 0],
            'dividend_yield': self.kf.x[3, 0],
            'theoretical_price': theoretical_price
        }

# Example: Multi-factor option pricing
mf_option = MultiFactorOptionKF()

2. Interest Rate Derivatives Modeling

2.1 Kalman Filter Implementation of Vasicek Interest Rate Model

class VasicekModelKF:
    """Kalman filter implementation of Vasicek interest rate model"""

    def __init__(self, dt=1/252):
        self.dt = dt
        self.kf = KalmanFilter(dim_x=3, dim_z=1)

        # State vector: [r(t), theta, kappa] (rate, long-term mean, mean reversion speed)
        # Simplified model, assuming sigma is known
        self.sigma = 0.02

        # Vasicek model: dr = kappa*(theta - r)*dt + sigma*dW
        # State transition matrix (linearized)
        self.kf.F = np.array([[1., 0., 0.],
                             [0., 1., 0.],
                             [0., 0., 1.]])

        # Observation matrix: we observe short-term interest rate
        self.kf.H = np.array([[1., 0., 0.]])

        # Process noise covariance
        self.kf.Q = np.array([[self.sigma**2 * dt, 0., 0.],
                             [0., 1e-6, 0.],
                             [0., 0., 1e-6]])

        # Observation noise
        self.kf.R = np.array([[1e-4]])

        # Initial state
        self.kf.x = np.array([[0.05], [0.05], [0.5]])  # r=5%, theta=5%, kappa=0.5
        self.kf.P = np.eye(3) * 0.01

    def update_state_transition(self):
        """Update state transition matrix (considering current parameter estimates)"""
        r = self.kf.x[0, 0]
        theta = self.kf.x[1, 0]
        kappa = self.kf.x[2, 0]

        # Update state transition matrix
        self.kf.F[0, 0] = 1 - kappa * self.dt
        self.kf.F[0, 1] = kappa * self.dt

    def zero_coupon_bond_price(self, T):
        """Zero-coupon bond pricing (Vasicek model)"""
        r = self.kf.x[0, 0]
        theta = self.kf.x[1, 0]
        kappa = self.kf.x[2, 0]

        if kappa == 0:
            B_T = T
            A_T = -0.5 * self.sigma**2 * T**2
        else:
            B_T = (1 - np.exp(-kappa * T)) / kappa
            A_T = (theta - self.sigma**2 / (2 * kappa**2)) * (B_T - T) - \
                  (self.sigma**2 / (4 * kappa)) * B_T**2

        price = np.exp(A_T - B_T * r)
        return price

    def update_interest_rate(self, observed_rate):
        """Update interest rate estimate"""
        # Update state transition matrix
        self.update_state_transition()

        # Prediction step
        self.kf.predict()

        # Update step
        self.kf.update([observed_rate])

        return {
            'rate': self.kf.x[0, 0],
            'theta': self.kf.x[1, 0],
            'kappa': self.kf.x[2, 0]
        }

# Example: Interest rate term structure modeling
def demonstrate_yield_curve_modeling():
    np.random.seed(42)

    # Simulate Vasicek process
    n_days = 252
    dt = 1/252
    true_kappa = 0.3
    true_theta = 0.04
    true_sigma = 0.02

    # True interest rate path
    rates = np.zeros(n_days)
    rates[0] = 0.03

    for i in range(1, n_days):
        dr = true_kappa * (true_theta - rates[i-1]) * dt + \
             true_sigma * np.sqrt(dt) * np.random.normal()
        rates[i] = rates[i-1] + dr

    # Kalman filter estimation
    vasicek_kf = VasicekModelKF()
    estimated_params = []

    for rate in rates:
        # Add observation noise
        observed_rate = rate + np.random.normal(0, 0.001)
        params = vasicek_kf.update_interest_rate(observed_rate)
        estimated_params.append(params)

    # Extract estimated parameters
    estimated_rates = [p['rate'] for p in estimated_params]
    estimated_kappa = [p['kappa'] for p in estimated_params]
    estimated_theta = [p['theta'] for p in estimated_params]

    # Plot
    fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(15, 10))

    # Interest rate path
    ax1.plot(rates, label='True Rate', alpha=0.7)
    ax1.plot(estimated_rates, label='Estimated Rate', alpha=0.7)
    ax1.set_title('Interest Rate Path')
    ax1.set_ylabel('Rate')
    ax1.legend()

    # Mean reversion speed
    ax2.plot([true_kappa] * len(estimated_kappa), label=f'True kappa={true_kappa}', alpha=0.7)
    ax2.plot(estimated_kappa, label='Estimated kappa', alpha=0.7)
    ax2.set_title('Mean Reversion Speed (kappa)')
    ax2.set_ylabel('kappa')
    ax2.legend()

    # Long-term mean
    ax3.plot([true_theta] * len(estimated_theta), label=f'True theta={true_theta}', alpha=0.7)
    ax3.plot(estimated_theta, label='Estimated theta', alpha=0.7)
    ax3.set_title('Long-term Mean (theta)')
    ax3.set_ylabel('theta')
    ax3.legend()

    # Parameter estimation error
    kappa_errors = np.array(estimated_kappa) - true_kappa
    theta_errors = np.array(estimated_theta) - true_theta

    ax4.plot(kappa_errors, label='kappa error', alpha=0.7)
    ax4.plot(theta_errors, label='theta error', alpha=0.7)
    ax4.set_title('Parameter Estimation Error')
    ax4.set_ylabel('Error')
    ax4.axhline(y=0, color='r', linestyle='--', alpha=0.5)
    ax4.legend()

    plt.tight_layout()
    plt.show()

    # Calculate convergence performance
    final_kappa_error = abs(estimated_kappa[-1] - true_kappa)
    final_theta_error = abs(estimated_theta[-1] - true_theta)

    print(f"Parameter Estimation Performance:")
    print(f"Final kappa error: {final_kappa_error:.4f}")
    print(f"Final theta error: {final_theta_error:.4f}")

demonstrate_yield_curve_modeling()

2.2 Interest Rate Derivatives Pricing

class InterestRateDerivativesPricer:
    """Interest rate derivatives pricer"""

    def __init__(self, vasicek_model):
        self.model = vasicek_model

    def bond_option_price(self, T_option, T_bond, K, option_type='call'):
        """Bond option pricing (analytical solution under Vasicek model)"""
        r = self.model.kf.x[0, 0]
        theta = self.model.kf.x[1, 0]
        kappa = self.model.kf.x[2, 0]
        sigma = self.model.sigma

        # Bond option pricing parameters
        P_T = self.model.zero_coupon_bond_price(T_bond)
        P_t = self.model.zero_coupon_bond_price(T_option)

        if kappa == 0:
            v = sigma**2 * T_option * (T_bond - T_option)**2 / 3
        else:
            B_T = (1 - np.exp(-kappa * (T_bond - T_option))) / kappa
            v = (sigma**2 / (2 * kappa**2)) * B_T**2 * (1 - np.exp(-2 * kappa * T_option))

        if v <= 0:
            return 0

        h = (1 / np.sqrt(v)) * np.log(P_T / (P_t * K))

        if option_type == 'call':
            price = P_T * norm.cdf(h + 0.5 * np.sqrt(v)) - K * P_t * norm.cdf(h - 0.5 * np.sqrt(v))
        else:
            price = K * P_t * norm.cdf(-h + 0.5 * np.sqrt(v)) - P_T * norm.cdf(-h - 0.5 * np.sqrt(v))

        return price

    def interest_rate_swap_value(self, swap_rate, tenor_years, payment_freq=2):
        """Interest rate swap valuation"""
        # Calculate present value of fixed and floating legs
        n_payments = int(tenor_years * payment_freq)
        payment_dates = np.array([i/payment_freq for i in range(1, n_payments + 1)])

        # Fixed leg PV
        fixed_leg_pv = 0
        for t in payment_dates:
            bond_price = self.model.zero_coupon_bond_price(t)
            fixed_leg_pv += swap_rate * bond_price / payment_freq

        # Floating leg PV (approximate calculation)
        current_rate = self.model.kf.x[0, 0]
        floating_leg_pv = 0
        for t in payment_dates:
            bond_price = self.model.zero_coupon_bond_price(t)
            floating_leg_pv += current_rate * bond_price / payment_freq

        # Swap value (from the perspective of receiving fixed, paying floating)
        swap_value = floating_leg_pv - fixed_leg_pv

        return {
            'swap_value': swap_value,
            'fixed_leg_pv': fixed_leg_pv,
            'floating_leg_pv': floating_leg_pv
        }

# Example: Interest rate derivatives pricing
vasicek_model = VasicekModelKF()
derivatives_pricer = InterestRateDerivativesPricer(vasicek_model)

# Update model parameters
vasicek_model.update_interest_rate(0.03)

# Bond option pricing
bond_option_price = derivatives_pricer.bond_option_price(
    T_option=0.25,  # 3-month option
    T_bond=2.0,     # 2-year bond
    K=0.95,         # Strike price
    option_type='call'
)

print(f"Bond call option price: {bond_option_price:.4f}")

# Interest rate swap valuation
swap_value = derivatives_pricer.interest_rate_swap_value(
    swap_rate=0.04,    # 4% fixed rate
    tenor_years=5,     # 5-year tenor
    payment_freq=2     # Semi-annual payments
)

print(f"Interest rate swap value: {swap_value['swap_value']:.4f}")
print(f"Fixed leg PV: {swap_value['fixed_leg_pv']:.4f}")
print(f"Floating leg PV: {swap_value['floating_leg_pv']:.4f}")

3. Credit Risk Modeling

3.1 Structural Credit Risk Model

class StructuralCreditRiskKF:
    """Structural credit risk model (Merton model extension)"""

    def __init__(self, dt=1/252):
        self.dt = dt
        # State vector: [asset_value, asset_volatility, default_barrier]
        self.kf = KalmanFilter(dim_x=3, dim_z=2)

        # State transition matrix
        self.kf.F = np.eye(3)

        # Observation matrix: we observe stock price and bond price
        self.kf.H = np.array([[1., 0., 0.],   # Asset value affects stock price
                             [1., 0., -1.]])  # Asset value minus default barrier affects bond price

        # Process noise covariance
        asset_vol_noise = 0.01
        vol_vol_noise = 0.001
        barrier_noise = 0.001

        self.kf.Q = np.diag([asset_vol_noise, vol_vol_noise, barrier_noise])

        # Observation noise
        self.kf.R = np.diag([0.01, 0.005])  # Stock and bond price noise

        # Initial state
        self.kf.x = np.array([[100.], [0.3], [50.]])  # Asset value, volatility, default barrier
        self.kf.P = np.eye(3) * 0.1

    def merton_equity_value(self, asset_value, debt_value, T, r, asset_vol):
        """Merton model equity value"""
        d1 = (np.log(asset_value/debt_value) + (r + 0.5*asset_vol**2)*T) / (asset_vol*np.sqrt(T))
        d2 = d1 - asset_vol*np.sqrt(T)

        equity_value = asset_value*norm.cdf(d1) - debt_value*np.exp(-r*T)*norm.cdf(d2)
        return max(equity_value, 0)

    def merton_debt_value(self, asset_value, debt_value, T, r, asset_vol):
        """Merton model debt value"""
        d1 = (np.log(asset_value/debt_value) + (r + 0.5*asset_vol**2)*T) / (asset_vol*np.sqrt(T))
        d2 = d1 - asset_vol*np.sqrt(T)

        debt_value_current = debt_value*np.exp(-r*T)*norm.cdf(d2) + \
                           asset_value*norm.cdf(-d1)
        return debt_value_current

    def default_probability(self, T, r):
        """Default probability calculation"""
        asset_value = self.kf.x[0, 0]
        asset_vol = self.kf.x[1, 0]
        barrier = self.kf.x[2, 0]

        # Using first passage time method
        mu = r - 0.5*asset_vol**2

        if asset_value <= barrier:
            return 1.0

        # Default probability approximation formula
        d = (np.log(asset_value/barrier) + mu*T) / (asset_vol*np.sqrt(T))

        prob = norm.cdf(-d) + (barrier/asset_value)**(2*mu/asset_vol**2) * norm.cdf(
            -d + 2*mu*np.sqrt(T)/asset_vol
        )

        return min(prob, 1.0)

    def credit_spread(self, face_value, T, r):
        """Credit spread calculation"""
        asset_value = self.kf.x[0, 0]
        asset_vol = self.kf.x[1, 0]

        # Risky bond value
        risky_bond_value = self.merton_debt_value(asset_value, face_value, T, r, asset_vol)

        # Risk-free bond value
        risk_free_bond = face_value * np.exp(-r*T)

        # Credit spread
        if risky_bond_value > 0:
            spread = -np.log(risky_bond_value/risk_free_bond) / T - r
        else:
            spread = float('inf')

        return max(spread, 0)

    def update_credit_model(self, stock_price, bond_price, face_value, T, r):
        """Update credit risk model"""
        # Prediction step
        self.kf.predict()

        # Current state estimate
        asset_value = self.kf.x[0, 0]
        asset_vol = self.kf.x[1, 0]

        # Theoretical stock price and bond price
        theoretical_equity = self.merton_equity_value(asset_value, face_value, T, r, asset_vol)
        theoretical_debt = self.merton_debt_value(asset_value, face_value, T, r, asset_vol)

        # Observation vector
        observations = np.array([stock_price - theoretical_equity,
                               bond_price - theoretical_debt])

        # Update step
        self.kf.update(observations)

        return {
            'asset_value': self.kf.x[0, 0],
            'asset_volatility': self.kf.x[1, 0],
            'default_barrier': self.kf.x[2, 0],
            'default_probability': self.default_probability(T, r),
            'credit_spread': self.credit_spread(face_value, T, r)
        }

# Example: Credit risk modeling
def demonstrate_credit_risk_modeling():
    np.random.seed(42)

    # Simulate firm asset value path
    n_days = 252
    initial_asset_value = 100
    asset_vol = 0.3
    r = 0.05
    dt = 1/252

    # True asset value path
    asset_values = np.zeros(n_days)
    asset_values[0] = initial_asset_value

    for i in range(1, n_days):
        dA = asset_values[i-1] * (r*dt + asset_vol*np.sqrt(dt)*np.random.normal())
        asset_values[i] = asset_values[i-1] + dA

    # Debt face value and maturity
    face_value = 70
    T = 1.0  # 1-year debt

    # Credit risk model
    credit_model = StructuralCreditRiskKF()

    # Simulate observation data
    results = []

    for i, true_asset_value in enumerate(asset_values):
        T_remaining = T - i/252

        if T_remaining > 0:
            # Generate "observed" stock price and bond price
            true_equity = credit_model.merton_equity_value(
                true_asset_value, face_value, T_remaining, r, asset_vol
            )
            true_debt = credit_model.merton_debt_value(
                true_asset_value, face_value, T_remaining, r, asset_vol
            )

            # Add observation noise
            observed_stock = true_equity + np.random.normal(0, 0.5)
            observed_bond = true_debt + np.random.normal(0, 0.2)

            # Update model
            result = credit_model.update_credit_model(
                observed_stock, observed_bond, face_value, T_remaining, r
            )

            result['true_asset_value'] = true_asset_value
            result['day'] = i
            results.append(result)

    # Extract results
    days = [r['day'] for r in results]
    estimated_assets = [r['asset_value'] for r in results]
    true_assets = [r['true_asset_value'] for r in results]
    default_probs = [r['default_probability'] for r in results]
    credit_spreads = [r['credit_spread'] for r in results]

    # Plot
    fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(15, 10))

    # Asset value estimation
    ax1.plot(days, true_assets, label='True Asset Value', alpha=0.7)
    ax1.plot(days, estimated_assets, label='Estimated Asset Value', alpha=0.7)
    ax1.axhline(y=face_value, color='r', linestyle='--', label='Debt Face Value', alpha=0.5)
    ax1.set_title('Asset Value Estimation')
    ax1.set_ylabel('Value')
    ax1.legend()

    # Default probability
    ax2.plot(days, default_probs)
    ax2.set_title('Default Probability')
    ax2.set_ylabel('Probability')
    ax2.set_ylim(0, 1)

    # Credit spread
    ax3.plot(days, credit_spreads)
    ax3.set_title('Credit Spread (bp)')
    ax3.set_ylabel('Spread')

    # Asset value estimation error
    asset_errors = np.array(estimated_assets) - np.array(true_assets)
    ax4.plot(days, asset_errors)
    ax4.set_title('Asset Value Estimation Error')
    ax4.set_ylabel('Error')
    ax4.axhline(y=0, color='r', linestyle='--', alpha=0.5)

    plt.tight_layout()
    plt.show()

    # Calculate performance metrics
    final_default_prob = default_probs[-1] if default_probs else 0
    avg_credit_spread = np.mean(credit_spreads) if credit_spreads else 0
    asset_rmse = np.sqrt(np.mean(asset_errors**2))

    print(f"Credit Risk Modeling Results:")
    print(f"Final default probability: {final_default_prob:.4f}")
    print(f"Average credit spread: {avg_credit_spread:.4f}")
    print(f"Asset value estimation RMSE: {asset_rmse:.4f}")

demonstrate_credit_risk_modeling()

4. Risk Measurement and Management

4.1 Dynamic VaR Calculation

class DynamicVaRKF:
    """Dynamic VaR calculation using Kalman filter"""

    def __init__(self, confidence_level=0.05):
        self.confidence_level = confidence_level
        # State vector: [return, volatility]
        self.kf = KalmanFilter(dim_x=2, dim_z=1)

        dt = 1/252

        # State transition matrix
        self.kf.F = np.array([[0., 0.],      # Returns are white noise
                             [0., 1.]])      # Volatility is random walk

        # Observation matrix: we observe returns
        self.kf.H = np.array([[1., 0.]])

        # Process noise covariance
        vol_of_vol = 0.1  # Volatility of volatility
        self.kf.Q = np.array([[0.01, 0.],
                             [0., vol_of_vol**2 * dt]])

        # Observation noise (very small, as we trust observed returns)
        self.kf.R = np.array([[1e-6]])

        # Initial state
        self.kf.x = np.array([[0.], [0.02]])  # Initial return 0%, volatility 2%
        self.kf.P = np.eye(2) * 0.01

    def update_volatility(self, return_):
        """Update volatility estimate"""
        # Prediction step
        self.kf.predict()

        # Update step
        self.kf.update([return_])

        # Update volatility from returns (via squared residuals)
        residual = return_ - self.kf.x[0, 0]
        self.kf.x[1, 0] = 0.95 * self.kf.x[1, 0] + 0.05 * abs(residual)

        return {
            'expected_return': self.kf.x[0, 0],
            'volatility': self.kf.x[1, 0]
        }

    def calculate_var(self, portfolio_value, holding_period=1):
        """Calculate VaR"""
        expected_return = self.kf.x[0, 0]
        volatility = self.kf.x[1, 0]

        # Adjust to holding period
        period_return = expected_return * holding_period
        period_vol = volatility * np.sqrt(holding_period)

        # VaR calculation (normal distribution assumption)
        var_percentile = norm.ppf(self.confidence_level)
        var_amount = portfolio_value * (var_percentile * period_vol - period_return)

        return {
            'var_amount': var_amount,
            'var_percentage': var_amount / portfolio_value,
            'expected_return': period_return,
            'volatility': period_vol
        }

    def calculate_conditional_var(self, portfolio_value, holding_period=1):
        """Calculate conditional VaR (CVaR/Expected Shortfall)"""
        var_result = self.calculate_var(portfolio_value, holding_period)

        expected_return = var_result['expected_return']
        volatility = var_result['volatility']

        # CVaR calculation
        var_percentile = norm.ppf(self.confidence_level)
        phi_var = norm.pdf(var_percentile)

        cvar_multiplier = phi_var / self.confidence_level
        cvar_amount = portfolio_value * (cvar_multiplier * volatility - expected_return)

        return {
            'cvar_amount': cvar_amount,
            'cvar_percentage': cvar_amount / portfolio_value,
            'var_amount': var_result['var_amount']
        }

# Example: Dynamic VaR calculation
def demonstrate_dynamic_var():
    np.random.seed(42)

    # Simulate stock return data
    n_days = 500
    true_vol = 0.02
    returns = []

    # GARCH-like simulation
    for i in range(n_days):
        if i == 0:
            vol_t = true_vol
        else:
            # Volatility clustering effect
            vol_t = 0.95 * vol_t + 0.05 * abs(returns[-1]) + 0.01 * np.random.normal()

        return_t = np.random.normal(0, vol_t)
        returns.append(return_t)

    # Portfolio value
    portfolio_value = 1000000  # 1 million

    # Dynamic VaR model
    var_model = DynamicVaRKF(confidence_level=0.05)  # 95% VaR

    # Calculate dynamic VaR
    var_results = []
    cvar_results = []
    volatility_estimates = []

    for return_ in returns:
        # Update model
        vol_result = var_model.update_volatility(return_)
        volatility_estimates.append(vol_result['volatility'])

        # Calculate VaR and CVaR
        var_result = var_model.calculate_var(portfolio_value)
        cvar_result = var_model.calculate_conditional_var(portfolio_value)

        var_results.append(var_result['var_amount'])
        cvar_results.append(cvar_result['cvar_amount'])

    # Calculate actual losses (for backtesting)
    portfolio_returns = np.array(returns) * portfolio_value
    actual_losses = -portfolio_returns  # Negative returns are losses

    # VaR backtesting
    var_breaches = actual_losses > np.array(var_results)
    var_breach_rate = np.mean(var_breaches)
    expected_breach_rate = 0.05

    # Plot
    fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(15, 12))

    # Returns and volatility
    ax1.plot(returns, alpha=0.7, label='Returns')
    ax1_twin = ax1.twinx()
    ax1_twin.plot(volatility_estimates, color='red', alpha=0.7, label='Estimated Volatility')
    ax1.set_title('Returns and Volatility Estimation')
    ax1.set_ylabel('Returns')
    ax1_twin.set_ylabel('Volatility')
    ax1.legend(loc='upper left')
    ax1_twin.legend(loc='upper right')

    # VaR and CVaR
    ax2.plot(var_results, label='VaR (95%)', alpha=0.7)
    ax2.plot(cvar_results, label='CVaR (95%)', alpha=0.7)
    ax2.set_title('Dynamic Risk Measures')
    ax2.set_ylabel('Risk Amount')
    ax2.legend()

    # VaR backtesting
    ax3.plot(actual_losses, alpha=0.5, label='Actual Loss')
    ax3.plot(var_results, color='red', label='VaR Threshold', alpha=0.7)
    breach_points = np.where(var_breaches)[0]
    ax3.scatter(breach_points, actual_losses[breach_points],
               color='red', s=20, label='VaR Breach Points')
    ax3.set_title(f'VaR Backtesting (Breach Rate: {var_breach_rate:.2%})')
    ax3.set_ylabel('Loss Amount')
    ax3.legend()

    # Breach rate statistics
    window_size = 50
    rolling_breach_rates = []
    for i in range(window_size, len(var_breaches)):
        window_breaches = var_breaches[i-window_size:i]
        rolling_breach_rates.append(np.mean(window_breaches))

    ax4.plot(rolling_breach_rates, alpha=0.7)
    ax4.axhline(y=expected_breach_rate, color='red', linestyle='--',
               label=f'Expected Breach Rate: {expected_breach_rate:.1%}')
    ax4.set_title('Rolling VaR Breach Rate')
    ax4.set_ylabel('Breach Rate')
    ax4.legend()

    plt.tight_layout()
    plt.show()

    # Statistical results
    print(f"VaR Backtesting Results:")
    print(f"Total observations: {len(var_breaches)}")
    print(f"VaR breaches: {np.sum(var_breaches)}")
    print(f"Actual breach rate: {var_breach_rate:.2%}")
    print(f"Expected breach rate: {expected_breach_rate:.1%}")

    # Kupiec test (likelihood ratio test)
    n = len(var_breaches)
    x = np.sum(var_breaches)
    p = expected_breach_rate

    if x > 0 and x < n:
        lr_stat = 2 * (x * np.log(var_breach_rate/p) + (n-x) * np.log((1-var_breach_rate)/(1-p)))
        print(f"Kupiec LR statistic: {lr_stat:.4f}")
        print(f"Critical value (5%): 3.841")
        print(f"VaR model passes test: {'Yes' if lr_stat < 3.841 else 'No'}")

demonstrate_dynamic_var()

Chapter Summary

This chapter delved into the application of Kalman filtering in financial derivatives pricing and risk management:

  1. Option Pricing Applications:

    • Dynamic implied volatility estimation
    • Multi-factor option pricing models
    • Extensions of the Black-Scholes model
  2. Interest Rate Derivatives Modeling:

    • Vasicek interest rate model implementation
    • Interest rate term structure modeling
    • Bond option and interest rate swap pricing
  3. Credit Risk Modeling:

    • Structural credit risk models
    • Kalman filter implementation of Merton model
    • Default probability and credit spread calculation
  4. Risk Measurement and Management:

    • Dynamic VaR calculation
    • Conditional VaR (CVaR) estimation
    • VaR model backtesting methods

These applications demonstrate the powerful capabilities of Kalman filtering in handling the complexity of financial derivatives, particularly its advantages in time-varying parameters and uncertainty quantification.


Next Chapter Preview: Chapter 14 will cover “Algorithmic Trading and High-Frequency Data Processing,” exploring the application of Kalman filtering in high-frequency trading, market microstructure modeling, and real-time risk management.

🔄 正在渲染 Mermaid 图表...