Financial Derivatives Pricing and Risk Management
Chapter 13: Financial Derivatives Pricing and Risk Management
- Master the application of Kalman filtering in option pricing
- Learn dynamic modeling methods for interest rate derivatives
- Understand Kalman filter methods for credit risk modeling
- Master volatility modeling and risk measurement techniques
- Implement a complete framework for derivatives pricing
1. Kalman Filter in Option Pricing
1.1 Dynamic Estimation of Implied Volatility
In option pricing, volatility is the most critical parameter. We can use Kalman filtering to dynamically estimate implied volatility:
import numpy as np
import pandas as pd
from scipy.stats import norm
from scipy.optimize import minimize
import matplotlib.pyplot as plt
from filterpy.kalman import KalmanFilter
import warnings
warnings.filterwarnings('ignore')
class ImpliedVolatilityKF:
"""Estimate implied volatility using Kalman filter"""
def __init__(self, dt=1/252):
self.dt = dt
self.kf = KalmanFilter(dim_x=2, dim_z=1)
# State transition matrix: [vol, vol_change]
self.kf.F = np.array([[1., dt],
[0., 1.]])
# Observation matrix: we observe option price
self.kf.H = np.array([[1., 0.]])
# Process noise covariance
q = 0.001
self.kf.Q = np.array([[q*dt**3/3, q*dt**2/2],
[q*dt**2/2, q*dt]])
# Observation noise
self.kf.R = np.array([[0.01]])
# Initial state
self.kf.x = np.array([[0.2], [0.0]]) # Initial volatility 20%
self.kf.P = np.eye(2) * 0.1
def black_scholes_price(self, S, K, T, r, sigma, option_type='call'):
"""Black-Scholes option pricing formula"""
d1 = (np.log(S/K) + (r + 0.5*sigma**2)*T) / (sigma*np.sqrt(T))
d2 = d1 - sigma*np.sqrt(T)
if option_type == 'call':
price = S*norm.cdf(d1) - K*np.exp(-r*T)*norm.cdf(d2)
else:
price = K*np.exp(-r*T)*norm.cdf(-d2) - S*norm.cdf(-d1)
return price
def option_vega(self, S, K, T, r, sigma):
"""Option Vega"""
d1 = (np.log(S/K) + (r + 0.5*sigma**2)*T) / (sigma*np.sqrt(T))
vega = S * norm.pdf(d1) * np.sqrt(T)
return vega
def update_volatility(self, market_price, S, K, T, r, option_type='call'):
"""Update volatility estimate"""
# Prediction step
self.kf.predict()
# Current volatility estimate
current_vol = max(self.kf.x[0, 0], 0.01) # Avoid negative volatility
# Theoretical option price
theoretical_price = self.black_scholes_price(S, K, T, r, current_vol, option_type)
# Vega for linearization
vega = self.option_vega(S, K, T, r, current_vol)
# Convert price difference to volatility difference
vol_diff = (market_price - theoretical_price) / vega if vega > 0 else 0
# Update step
self.kf.update(vol_diff)
return self.kf.x[0, 0], theoretical_price
# Example: Dynamic implied volatility estimation
def demonstrate_implied_volatility():
np.random.seed(42)
# Simulate option market data
n_days = 100
S0 = 100 # Stock price
K = 100 # Strike price
T = 0.25 # 3 months to expiry
r = 0.05 # Risk-free rate
# True volatility path (random walk)
true_vol = np.zeros(n_days)
true_vol[0] = 0.2
for i in range(1, n_days):
true_vol[i] = max(true_vol[i-1] + np.random.normal(0, 0.01), 0.05)
# Simulate stock price path
stock_prices = np.zeros(n_days)
stock_prices[0] = S0
for i in range(1, n_days):
dt = 1/252
stock_prices[i] = stock_prices[i-1] * np.exp(
(r - 0.5*true_vol[i]**2)*dt + true_vol[i]*np.sqrt(dt)*np.random.normal()
)
# Generate market option prices (add noise)
iv_kf = ImpliedVolatilityKF()
estimated_vols = []
theoretical_prices = []
market_prices = []
for i in range(n_days):
S = stock_prices[i]
T_remaining = T - i/252
if T_remaining > 0:
# True market price (calculated with true volatility + noise)
true_price = iv_kf.black_scholes_price(S, K, T_remaining, r, true_vol[i])
market_price = true_price + np.random.normal(0, 0.1)
# Kalman filter estimation
est_vol, theo_price = iv_kf.update_volatility(
market_price, S, K, T_remaining, r
)
estimated_vols.append(est_vol)
theoretical_prices.append(theo_price)
market_prices.append(market_price)
# Plot
fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(15, 10))
# Stock price trend
ax1.plot(stock_prices[:len(estimated_vols)])
ax1.set_title('Stock Price Trend')
ax1.set_ylabel('Stock Price')
# Volatility estimation
ax2.plot(true_vol[:len(estimated_vols)], label='True Volatility', alpha=0.7)
ax2.plot(estimated_vols, label='Kalman Filter Estimate', alpha=0.7)
ax2.set_title('Volatility Estimation')
ax2.set_ylabel('Volatility')
ax2.legend()
# Option price
ax3.plot(market_prices, label='Market Price', alpha=0.7)
ax3.plot(theoretical_prices, label='Theoretical Price', alpha=0.7)
ax3.set_title('Option Price')
ax3.set_ylabel('Price')
ax3.legend()
# Estimation error
vol_errors = np.array(estimated_vols) - true_vol[:len(estimated_vols)]
ax4.plot(vol_errors)
ax4.set_title('Volatility Estimation Error')
ax4.set_ylabel('Error')
ax4.axhline(y=0, color='r', linestyle='--', alpha=0.5)
plt.tight_layout()
plt.show()
# Calculate performance metrics
mse = np.mean(vol_errors**2)
mae = np.mean(np.abs(vol_errors))
print(f"Volatility Estimation Performance:")
print(f"MSE: {mse:.6f}")
print(f"MAE: {mae:.6f}")
return estimated_vols, true_vol[:len(estimated_vols)]
demonstrate_implied_volatility()
1.2 Multi-Factor Option Pricing Model
For complex derivatives, we need to consider multiple risk factors:
class MultiFactorOptionKF:
"""Multi-factor option pricing Kalman filter model"""
def __init__(self):
# State vector: [spot_price, volatility, interest_rate, dividend_yield]
self.n_factors = 4
self.kf = KalmanFilter(dim_x=self.n_factors, dim_z=1)
# State transition matrix (simplified random walk model)
dt = 1/252
self.kf.F = np.eye(self.n_factors)
# Observation matrix: we observe option price
self.kf.H = np.array([[1., 0., 0., 0.]])
# Process noise covariance
q_spot = 0.01 # Stock price noise
q_vol = 0.001 # Volatility noise
q_rate = 0.0001 # Interest rate noise
q_div = 0.0001 # Dividend noise
self.kf.Q = np.diag([q_spot, q_vol, q_rate, q_div])
# Observation noise
self.kf.R = np.array([[0.1]])
# Initial state
self.kf.x = np.array([[100.], [0.2], [0.05], [0.02]])
self.kf.P = np.eye(self.n_factors) * 0.1
def european_option_price(self, S, K, T, r, q, sigma, option_type='call'):
"""European option pricing with dividends"""
d1 = (np.log(S/K) + (r - q + 0.5*sigma**2)*T) / (sigma*np.sqrt(T))
d2 = d1 - sigma*np.sqrt(T)
if option_type == 'call':
price = S*np.exp(-q*T)*norm.cdf(d1) - K*np.exp(-r*T)*norm.cdf(d2)
else:
price = K*np.exp(-r*T)*norm.cdf(-d2) - S*np.exp(-q*T)*norm.cdf(-d1)
return price
def update_factors(self, market_option_price, K, T, option_type='call'):
"""Update multi-factor estimates"""
# Prediction step
self.kf.predict()
# Current factor estimates
S = max(self.kf.x[0, 0], 1.0) # Stock price
vol = max(self.kf.x[1, 0], 0.01) # Volatility
r = max(self.kf.x[2, 0], 0.0) # Interest rate
q = max(self.kf.x[3, 0], 0.0) # Dividend yield
# Theoretical option price
theoretical_price = self.european_option_price(S, K, T, r, q, vol, option_type)
# Price residual
price_residual = market_option_price - theoretical_price
# Update step (simplified to only update stock price)
self.kf.update([price_residual])
return {
'spot_price': self.kf.x[0, 0],
'volatility': self.kf.x[1, 0],
'interest_rate': self.kf.x[2, 0],
'dividend_yield': self.kf.x[3, 0],
'theoretical_price': theoretical_price
}
# Example: Multi-factor option pricing
mf_option = MultiFactorOptionKF()
2. Interest Rate Derivatives Modeling
2.1 Kalman Filter Implementation of Vasicek Interest Rate Model
class VasicekModelKF:
"""Kalman filter implementation of Vasicek interest rate model"""
def __init__(self, dt=1/252):
self.dt = dt
self.kf = KalmanFilter(dim_x=3, dim_z=1)
# State vector: [r(t), theta, kappa] (rate, long-term mean, mean reversion speed)
# Simplified model, assuming sigma is known
self.sigma = 0.02
# Vasicek model: dr = kappa*(theta - r)*dt + sigma*dW
# State transition matrix (linearized)
self.kf.F = np.array([[1., 0., 0.],
[0., 1., 0.],
[0., 0., 1.]])
# Observation matrix: we observe short-term interest rate
self.kf.H = np.array([[1., 0., 0.]])
# Process noise covariance
self.kf.Q = np.array([[self.sigma**2 * dt, 0., 0.],
[0., 1e-6, 0.],
[0., 0., 1e-6]])
# Observation noise
self.kf.R = np.array([[1e-4]])
# Initial state
self.kf.x = np.array([[0.05], [0.05], [0.5]]) # r=5%, theta=5%, kappa=0.5
self.kf.P = np.eye(3) * 0.01
def update_state_transition(self):
"""Update state transition matrix (considering current parameter estimates)"""
r = self.kf.x[0, 0]
theta = self.kf.x[1, 0]
kappa = self.kf.x[2, 0]
# Update state transition matrix
self.kf.F[0, 0] = 1 - kappa * self.dt
self.kf.F[0, 1] = kappa * self.dt
def zero_coupon_bond_price(self, T):
"""Zero-coupon bond pricing (Vasicek model)"""
r = self.kf.x[0, 0]
theta = self.kf.x[1, 0]
kappa = self.kf.x[2, 0]
if kappa == 0:
B_T = T
A_T = -0.5 * self.sigma**2 * T**2
else:
B_T = (1 - np.exp(-kappa * T)) / kappa
A_T = (theta - self.sigma**2 / (2 * kappa**2)) * (B_T - T) - \
(self.sigma**2 / (4 * kappa)) * B_T**2
price = np.exp(A_T - B_T * r)
return price
def update_interest_rate(self, observed_rate):
"""Update interest rate estimate"""
# Update state transition matrix
self.update_state_transition()
# Prediction step
self.kf.predict()
# Update step
self.kf.update([observed_rate])
return {
'rate': self.kf.x[0, 0],
'theta': self.kf.x[1, 0],
'kappa': self.kf.x[2, 0]
}
# Example: Interest rate term structure modeling
def demonstrate_yield_curve_modeling():
np.random.seed(42)
# Simulate Vasicek process
n_days = 252
dt = 1/252
true_kappa = 0.3
true_theta = 0.04
true_sigma = 0.02
# True interest rate path
rates = np.zeros(n_days)
rates[0] = 0.03
for i in range(1, n_days):
dr = true_kappa * (true_theta - rates[i-1]) * dt + \
true_sigma * np.sqrt(dt) * np.random.normal()
rates[i] = rates[i-1] + dr
# Kalman filter estimation
vasicek_kf = VasicekModelKF()
estimated_params = []
for rate in rates:
# Add observation noise
observed_rate = rate + np.random.normal(0, 0.001)
params = vasicek_kf.update_interest_rate(observed_rate)
estimated_params.append(params)
# Extract estimated parameters
estimated_rates = [p['rate'] for p in estimated_params]
estimated_kappa = [p['kappa'] for p in estimated_params]
estimated_theta = [p['theta'] for p in estimated_params]
# Plot
fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(15, 10))
# Interest rate path
ax1.plot(rates, label='True Rate', alpha=0.7)
ax1.plot(estimated_rates, label='Estimated Rate', alpha=0.7)
ax1.set_title('Interest Rate Path')
ax1.set_ylabel('Rate')
ax1.legend()
# Mean reversion speed
ax2.plot([true_kappa] * len(estimated_kappa), label=f'True kappa={true_kappa}', alpha=0.7)
ax2.plot(estimated_kappa, label='Estimated kappa', alpha=0.7)
ax2.set_title('Mean Reversion Speed (kappa)')
ax2.set_ylabel('kappa')
ax2.legend()
# Long-term mean
ax3.plot([true_theta] * len(estimated_theta), label=f'True theta={true_theta}', alpha=0.7)
ax3.plot(estimated_theta, label='Estimated theta', alpha=0.7)
ax3.set_title('Long-term Mean (theta)')
ax3.set_ylabel('theta')
ax3.legend()
# Parameter estimation error
kappa_errors = np.array(estimated_kappa) - true_kappa
theta_errors = np.array(estimated_theta) - true_theta
ax4.plot(kappa_errors, label='kappa error', alpha=0.7)
ax4.plot(theta_errors, label='theta error', alpha=0.7)
ax4.set_title('Parameter Estimation Error')
ax4.set_ylabel('Error')
ax4.axhline(y=0, color='r', linestyle='--', alpha=0.5)
ax4.legend()
plt.tight_layout()
plt.show()
# Calculate convergence performance
final_kappa_error = abs(estimated_kappa[-1] - true_kappa)
final_theta_error = abs(estimated_theta[-1] - true_theta)
print(f"Parameter Estimation Performance:")
print(f"Final kappa error: {final_kappa_error:.4f}")
print(f"Final theta error: {final_theta_error:.4f}")
demonstrate_yield_curve_modeling()
2.2 Interest Rate Derivatives Pricing
class InterestRateDerivativesPricer:
"""Interest rate derivatives pricer"""
def __init__(self, vasicek_model):
self.model = vasicek_model
def bond_option_price(self, T_option, T_bond, K, option_type='call'):
"""Bond option pricing (analytical solution under Vasicek model)"""
r = self.model.kf.x[0, 0]
theta = self.model.kf.x[1, 0]
kappa = self.model.kf.x[2, 0]
sigma = self.model.sigma
# Bond option pricing parameters
P_T = self.model.zero_coupon_bond_price(T_bond)
P_t = self.model.zero_coupon_bond_price(T_option)
if kappa == 0:
v = sigma**2 * T_option * (T_bond - T_option)**2 / 3
else:
B_T = (1 - np.exp(-kappa * (T_bond - T_option))) / kappa
v = (sigma**2 / (2 * kappa**2)) * B_T**2 * (1 - np.exp(-2 * kappa * T_option))
if v <= 0:
return 0
h = (1 / np.sqrt(v)) * np.log(P_T / (P_t * K))
if option_type == 'call':
price = P_T * norm.cdf(h + 0.5 * np.sqrt(v)) - K * P_t * norm.cdf(h - 0.5 * np.sqrt(v))
else:
price = K * P_t * norm.cdf(-h + 0.5 * np.sqrt(v)) - P_T * norm.cdf(-h - 0.5 * np.sqrt(v))
return price
def interest_rate_swap_value(self, swap_rate, tenor_years, payment_freq=2):
"""Interest rate swap valuation"""
# Calculate present value of fixed and floating legs
n_payments = int(tenor_years * payment_freq)
payment_dates = np.array([i/payment_freq for i in range(1, n_payments + 1)])
# Fixed leg PV
fixed_leg_pv = 0
for t in payment_dates:
bond_price = self.model.zero_coupon_bond_price(t)
fixed_leg_pv += swap_rate * bond_price / payment_freq
# Floating leg PV (approximate calculation)
current_rate = self.model.kf.x[0, 0]
floating_leg_pv = 0
for t in payment_dates:
bond_price = self.model.zero_coupon_bond_price(t)
floating_leg_pv += current_rate * bond_price / payment_freq
# Swap value (from the perspective of receiving fixed, paying floating)
swap_value = floating_leg_pv - fixed_leg_pv
return {
'swap_value': swap_value,
'fixed_leg_pv': fixed_leg_pv,
'floating_leg_pv': floating_leg_pv
}
# Example: Interest rate derivatives pricing
vasicek_model = VasicekModelKF()
derivatives_pricer = InterestRateDerivativesPricer(vasicek_model)
# Update model parameters
vasicek_model.update_interest_rate(0.03)
# Bond option pricing
bond_option_price = derivatives_pricer.bond_option_price(
T_option=0.25, # 3-month option
T_bond=2.0, # 2-year bond
K=0.95, # Strike price
option_type='call'
)
print(f"Bond call option price: {bond_option_price:.4f}")
# Interest rate swap valuation
swap_value = derivatives_pricer.interest_rate_swap_value(
swap_rate=0.04, # 4% fixed rate
tenor_years=5, # 5-year tenor
payment_freq=2 # Semi-annual payments
)
print(f"Interest rate swap value: {swap_value['swap_value']:.4f}")
print(f"Fixed leg PV: {swap_value['fixed_leg_pv']:.4f}")
print(f"Floating leg PV: {swap_value['floating_leg_pv']:.4f}")
3. Credit Risk Modeling
3.1 Structural Credit Risk Model
class StructuralCreditRiskKF:
"""Structural credit risk model (Merton model extension)"""
def __init__(self, dt=1/252):
self.dt = dt
# State vector: [asset_value, asset_volatility, default_barrier]
self.kf = KalmanFilter(dim_x=3, dim_z=2)
# State transition matrix
self.kf.F = np.eye(3)
# Observation matrix: we observe stock price and bond price
self.kf.H = np.array([[1., 0., 0.], # Asset value affects stock price
[1., 0., -1.]]) # Asset value minus default barrier affects bond price
# Process noise covariance
asset_vol_noise = 0.01
vol_vol_noise = 0.001
barrier_noise = 0.001
self.kf.Q = np.diag([asset_vol_noise, vol_vol_noise, barrier_noise])
# Observation noise
self.kf.R = np.diag([0.01, 0.005]) # Stock and bond price noise
# Initial state
self.kf.x = np.array([[100.], [0.3], [50.]]) # Asset value, volatility, default barrier
self.kf.P = np.eye(3) * 0.1
def merton_equity_value(self, asset_value, debt_value, T, r, asset_vol):
"""Merton model equity value"""
d1 = (np.log(asset_value/debt_value) + (r + 0.5*asset_vol**2)*T) / (asset_vol*np.sqrt(T))
d2 = d1 - asset_vol*np.sqrt(T)
equity_value = asset_value*norm.cdf(d1) - debt_value*np.exp(-r*T)*norm.cdf(d2)
return max(equity_value, 0)
def merton_debt_value(self, asset_value, debt_value, T, r, asset_vol):
"""Merton model debt value"""
d1 = (np.log(asset_value/debt_value) + (r + 0.5*asset_vol**2)*T) / (asset_vol*np.sqrt(T))
d2 = d1 - asset_vol*np.sqrt(T)
debt_value_current = debt_value*np.exp(-r*T)*norm.cdf(d2) + \
asset_value*norm.cdf(-d1)
return debt_value_current
def default_probability(self, T, r):
"""Default probability calculation"""
asset_value = self.kf.x[0, 0]
asset_vol = self.kf.x[1, 0]
barrier = self.kf.x[2, 0]
# Using first passage time method
mu = r - 0.5*asset_vol**2
if asset_value <= barrier:
return 1.0
# Default probability approximation formula
d = (np.log(asset_value/barrier) + mu*T) / (asset_vol*np.sqrt(T))
prob = norm.cdf(-d) + (barrier/asset_value)**(2*mu/asset_vol**2) * norm.cdf(
-d + 2*mu*np.sqrt(T)/asset_vol
)
return min(prob, 1.0)
def credit_spread(self, face_value, T, r):
"""Credit spread calculation"""
asset_value = self.kf.x[0, 0]
asset_vol = self.kf.x[1, 0]
# Risky bond value
risky_bond_value = self.merton_debt_value(asset_value, face_value, T, r, asset_vol)
# Risk-free bond value
risk_free_bond = face_value * np.exp(-r*T)
# Credit spread
if risky_bond_value > 0:
spread = -np.log(risky_bond_value/risk_free_bond) / T - r
else:
spread = float('inf')
return max(spread, 0)
def update_credit_model(self, stock_price, bond_price, face_value, T, r):
"""Update credit risk model"""
# Prediction step
self.kf.predict()
# Current state estimate
asset_value = self.kf.x[0, 0]
asset_vol = self.kf.x[1, 0]
# Theoretical stock price and bond price
theoretical_equity = self.merton_equity_value(asset_value, face_value, T, r, asset_vol)
theoretical_debt = self.merton_debt_value(asset_value, face_value, T, r, asset_vol)
# Observation vector
observations = np.array([stock_price - theoretical_equity,
bond_price - theoretical_debt])
# Update step
self.kf.update(observations)
return {
'asset_value': self.kf.x[0, 0],
'asset_volatility': self.kf.x[1, 0],
'default_barrier': self.kf.x[2, 0],
'default_probability': self.default_probability(T, r),
'credit_spread': self.credit_spread(face_value, T, r)
}
# Example: Credit risk modeling
def demonstrate_credit_risk_modeling():
np.random.seed(42)
# Simulate firm asset value path
n_days = 252
initial_asset_value = 100
asset_vol = 0.3
r = 0.05
dt = 1/252
# True asset value path
asset_values = np.zeros(n_days)
asset_values[0] = initial_asset_value
for i in range(1, n_days):
dA = asset_values[i-1] * (r*dt + asset_vol*np.sqrt(dt)*np.random.normal())
asset_values[i] = asset_values[i-1] + dA
# Debt face value and maturity
face_value = 70
T = 1.0 # 1-year debt
# Credit risk model
credit_model = StructuralCreditRiskKF()
# Simulate observation data
results = []
for i, true_asset_value in enumerate(asset_values):
T_remaining = T - i/252
if T_remaining > 0:
# Generate "observed" stock price and bond price
true_equity = credit_model.merton_equity_value(
true_asset_value, face_value, T_remaining, r, asset_vol
)
true_debt = credit_model.merton_debt_value(
true_asset_value, face_value, T_remaining, r, asset_vol
)
# Add observation noise
observed_stock = true_equity + np.random.normal(0, 0.5)
observed_bond = true_debt + np.random.normal(0, 0.2)
# Update model
result = credit_model.update_credit_model(
observed_stock, observed_bond, face_value, T_remaining, r
)
result['true_asset_value'] = true_asset_value
result['day'] = i
results.append(result)
# Extract results
days = [r['day'] for r in results]
estimated_assets = [r['asset_value'] for r in results]
true_assets = [r['true_asset_value'] for r in results]
default_probs = [r['default_probability'] for r in results]
credit_spreads = [r['credit_spread'] for r in results]
# Plot
fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(15, 10))
# Asset value estimation
ax1.plot(days, true_assets, label='True Asset Value', alpha=0.7)
ax1.plot(days, estimated_assets, label='Estimated Asset Value', alpha=0.7)
ax1.axhline(y=face_value, color='r', linestyle='--', label='Debt Face Value', alpha=0.5)
ax1.set_title('Asset Value Estimation')
ax1.set_ylabel('Value')
ax1.legend()
# Default probability
ax2.plot(days, default_probs)
ax2.set_title('Default Probability')
ax2.set_ylabel('Probability')
ax2.set_ylim(0, 1)
# Credit spread
ax3.plot(days, credit_spreads)
ax3.set_title('Credit Spread (bp)')
ax3.set_ylabel('Spread')
# Asset value estimation error
asset_errors = np.array(estimated_assets) - np.array(true_assets)
ax4.plot(days, asset_errors)
ax4.set_title('Asset Value Estimation Error')
ax4.set_ylabel('Error')
ax4.axhline(y=0, color='r', linestyle='--', alpha=0.5)
plt.tight_layout()
plt.show()
# Calculate performance metrics
final_default_prob = default_probs[-1] if default_probs else 0
avg_credit_spread = np.mean(credit_spreads) if credit_spreads else 0
asset_rmse = np.sqrt(np.mean(asset_errors**2))
print(f"Credit Risk Modeling Results:")
print(f"Final default probability: {final_default_prob:.4f}")
print(f"Average credit spread: {avg_credit_spread:.4f}")
print(f"Asset value estimation RMSE: {asset_rmse:.4f}")
demonstrate_credit_risk_modeling()
4. Risk Measurement and Management
4.1 Dynamic VaR Calculation
class DynamicVaRKF:
"""Dynamic VaR calculation using Kalman filter"""
def __init__(self, confidence_level=0.05):
self.confidence_level = confidence_level
# State vector: [return, volatility]
self.kf = KalmanFilter(dim_x=2, dim_z=1)
dt = 1/252
# State transition matrix
self.kf.F = np.array([[0., 0.], # Returns are white noise
[0., 1.]]) # Volatility is random walk
# Observation matrix: we observe returns
self.kf.H = np.array([[1., 0.]])
# Process noise covariance
vol_of_vol = 0.1 # Volatility of volatility
self.kf.Q = np.array([[0.01, 0.],
[0., vol_of_vol**2 * dt]])
# Observation noise (very small, as we trust observed returns)
self.kf.R = np.array([[1e-6]])
# Initial state
self.kf.x = np.array([[0.], [0.02]]) # Initial return 0%, volatility 2%
self.kf.P = np.eye(2) * 0.01
def update_volatility(self, return_):
"""Update volatility estimate"""
# Prediction step
self.kf.predict()
# Update step
self.kf.update([return_])
# Update volatility from returns (via squared residuals)
residual = return_ - self.kf.x[0, 0]
self.kf.x[1, 0] = 0.95 * self.kf.x[1, 0] + 0.05 * abs(residual)
return {
'expected_return': self.kf.x[0, 0],
'volatility': self.kf.x[1, 0]
}
def calculate_var(self, portfolio_value, holding_period=1):
"""Calculate VaR"""
expected_return = self.kf.x[0, 0]
volatility = self.kf.x[1, 0]
# Adjust to holding period
period_return = expected_return * holding_period
period_vol = volatility * np.sqrt(holding_period)
# VaR calculation (normal distribution assumption)
var_percentile = norm.ppf(self.confidence_level)
var_amount = portfolio_value * (var_percentile * period_vol - period_return)
return {
'var_amount': var_amount,
'var_percentage': var_amount / portfolio_value,
'expected_return': period_return,
'volatility': period_vol
}
def calculate_conditional_var(self, portfolio_value, holding_period=1):
"""Calculate conditional VaR (CVaR/Expected Shortfall)"""
var_result = self.calculate_var(portfolio_value, holding_period)
expected_return = var_result['expected_return']
volatility = var_result['volatility']
# CVaR calculation
var_percentile = norm.ppf(self.confidence_level)
phi_var = norm.pdf(var_percentile)
cvar_multiplier = phi_var / self.confidence_level
cvar_amount = portfolio_value * (cvar_multiplier * volatility - expected_return)
return {
'cvar_amount': cvar_amount,
'cvar_percentage': cvar_amount / portfolio_value,
'var_amount': var_result['var_amount']
}
# Example: Dynamic VaR calculation
def demonstrate_dynamic_var():
np.random.seed(42)
# Simulate stock return data
n_days = 500
true_vol = 0.02
returns = []
# GARCH-like simulation
for i in range(n_days):
if i == 0:
vol_t = true_vol
else:
# Volatility clustering effect
vol_t = 0.95 * vol_t + 0.05 * abs(returns[-1]) + 0.01 * np.random.normal()
return_t = np.random.normal(0, vol_t)
returns.append(return_t)
# Portfolio value
portfolio_value = 1000000 # 1 million
# Dynamic VaR model
var_model = DynamicVaRKF(confidence_level=0.05) # 95% VaR
# Calculate dynamic VaR
var_results = []
cvar_results = []
volatility_estimates = []
for return_ in returns:
# Update model
vol_result = var_model.update_volatility(return_)
volatility_estimates.append(vol_result['volatility'])
# Calculate VaR and CVaR
var_result = var_model.calculate_var(portfolio_value)
cvar_result = var_model.calculate_conditional_var(portfolio_value)
var_results.append(var_result['var_amount'])
cvar_results.append(cvar_result['cvar_amount'])
# Calculate actual losses (for backtesting)
portfolio_returns = np.array(returns) * portfolio_value
actual_losses = -portfolio_returns # Negative returns are losses
# VaR backtesting
var_breaches = actual_losses > np.array(var_results)
var_breach_rate = np.mean(var_breaches)
expected_breach_rate = 0.05
# Plot
fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(15, 12))
# Returns and volatility
ax1.plot(returns, alpha=0.7, label='Returns')
ax1_twin = ax1.twinx()
ax1_twin.plot(volatility_estimates, color='red', alpha=0.7, label='Estimated Volatility')
ax1.set_title('Returns and Volatility Estimation')
ax1.set_ylabel('Returns')
ax1_twin.set_ylabel('Volatility')
ax1.legend(loc='upper left')
ax1_twin.legend(loc='upper right')
# VaR and CVaR
ax2.plot(var_results, label='VaR (95%)', alpha=0.7)
ax2.plot(cvar_results, label='CVaR (95%)', alpha=0.7)
ax2.set_title('Dynamic Risk Measures')
ax2.set_ylabel('Risk Amount')
ax2.legend()
# VaR backtesting
ax3.plot(actual_losses, alpha=0.5, label='Actual Loss')
ax3.plot(var_results, color='red', label='VaR Threshold', alpha=0.7)
breach_points = np.where(var_breaches)[0]
ax3.scatter(breach_points, actual_losses[breach_points],
color='red', s=20, label='VaR Breach Points')
ax3.set_title(f'VaR Backtesting (Breach Rate: {var_breach_rate:.2%})')
ax3.set_ylabel('Loss Amount')
ax3.legend()
# Breach rate statistics
window_size = 50
rolling_breach_rates = []
for i in range(window_size, len(var_breaches)):
window_breaches = var_breaches[i-window_size:i]
rolling_breach_rates.append(np.mean(window_breaches))
ax4.plot(rolling_breach_rates, alpha=0.7)
ax4.axhline(y=expected_breach_rate, color='red', linestyle='--',
label=f'Expected Breach Rate: {expected_breach_rate:.1%}')
ax4.set_title('Rolling VaR Breach Rate')
ax4.set_ylabel('Breach Rate')
ax4.legend()
plt.tight_layout()
plt.show()
# Statistical results
print(f"VaR Backtesting Results:")
print(f"Total observations: {len(var_breaches)}")
print(f"VaR breaches: {np.sum(var_breaches)}")
print(f"Actual breach rate: {var_breach_rate:.2%}")
print(f"Expected breach rate: {expected_breach_rate:.1%}")
# Kupiec test (likelihood ratio test)
n = len(var_breaches)
x = np.sum(var_breaches)
p = expected_breach_rate
if x > 0 and x < n:
lr_stat = 2 * (x * np.log(var_breach_rate/p) + (n-x) * np.log((1-var_breach_rate)/(1-p)))
print(f"Kupiec LR statistic: {lr_stat:.4f}")
print(f"Critical value (5%): 3.841")
print(f"VaR model passes test: {'Yes' if lr_stat < 3.841 else 'No'}")
demonstrate_dynamic_var()
Chapter Summary
This chapter delved into the application of Kalman filtering in financial derivatives pricing and risk management:
-
Option Pricing Applications:
- Dynamic implied volatility estimation
- Multi-factor option pricing models
- Extensions of the Black-Scholes model
-
Interest Rate Derivatives Modeling:
- Vasicek interest rate model implementation
- Interest rate term structure modeling
- Bond option and interest rate swap pricing
-
Credit Risk Modeling:
- Structural credit risk models
- Kalman filter implementation of Merton model
- Default probability and credit spread calculation
-
Risk Measurement and Management:
- Dynamic VaR calculation
- Conditional VaR (CVaR) estimation
- VaR model backtesting methods
These applications demonstrate the powerful capabilities of Kalman filtering in handling the complexity of financial derivatives, particularly its advantages in time-varying parameters and uncertainty quantification.
Next Chapter Preview: Chapter 14 will cover “Algorithmic Trading and High-Frequency Data Processing,” exploring the application of Kalman filtering in high-frequency trading, market microstructure modeling, and real-time risk management.