Chapter 13: Monte Carlo Simulation and Markov Chains
Haiyue
20min
Chapter 13: Monte Carlo Simulation and Markov Chains
Learning Objectives
- Understand Markov Chain Monte Carlo (MCMC) methods
- Implement Metropolis-Hastings algorithm
- Perform Bayesian estimation of financial models
- Apply MCMC for risk analysis
Knowledge Summary
1. Fundamentals of MCMC
Markov Chain Monte Carlo (MCMC) is a class of random simulation methods based on Markov chains, used for sampling from complex probability distributions.
Basic Idea: Construct a Markov chain such that its stationary distribution is the target distribution :
🔄 正在渲染 Mermaid 图表...
2. Metropolis-Hastings Algorithm
Algorithm Steps:
- Generate candidate value from proposal distribution
- Calculate acceptance probability:
- Accept with probability , otherwise keep
3. Bayesian Financial Modeling
Bayes’ Theorem:
Financial Applications:
- Parameter uncertainty quantification
- Model selection and averaging
- Confidence intervals for risk measures
Example Code
Example 1: Metropolis-Hastings Algorithm
import numpy as np
import matplotlib.pyplot as plt
from scipy import stats
import pandas as pd
class MetropolisHastings:
"""Metropolis-Hastings sampler"""
def __init__(self, target_log_density, proposal_std=1.0):
self.target_log_density = target_log_density
self.proposal_std = proposal_std
self.samples = []
self.acceptance_rate = 0
def sample(self, n_samples, initial_value=0.0):
"""Execute MCMC sampling"""
current_x = initial_value
current_log_density = self.target_log_density(current_x)
accepted = 0
samples = []
for i in range(n_samples):
# Propose new state
proposal_x = current_x + np.random.normal(0, self.proposal_std)
proposal_log_density = self.target_log_density(proposal_x)
# Calculate acceptance probability (log scale)
log_alpha = proposal_log_density - current_log_density
alpha = min(1.0, np.exp(log_alpha))
# Accept or reject
if np.random.random() < alpha:
current_x = proposal_x
current_log_density = proposal_log_density
accepted += 1
samples.append(current_x)
self.samples = np.array(samples)
self.acceptance_rate = accepted / n_samples
return self.samples
def normal_log_density(x, mu=0, sigma=1):
"""Log density of normal distribution"""
return -0.5 * ((x - mu) / sigma)**2 - np.log(sigma * np.sqrt(2 * np.pi))
def mixture_log_density(x):
"""Log density of mixture normal distribution"""
# Mixture of two normals: 0.3*N(-2,1) + 0.7*N(3,1.5)
log_p1 = np.log(0.3) + normal_log_density(x, -2, 1)
log_p2 = np.log(0.7) + normal_log_density(x, 3, 1.5)
return np.logaddexp(log_p1, log_p2)
# Example: Sample mixture normal distribution
np.random.seed(42)
sampler = MetropolisHastings(mixture_log_density, proposal_std=2.0)
samples = sampler.sample(10000, initial_value=0.0)
print(f"MCMC Sampling Results:")
print(f"Acceptance rate: {sampler.acceptance_rate:.3f}")
print(f"Sample mean: {np.mean(samples):.3f}")
print(f"Sample std dev: {np.std(samples):.3f}")
# Calculate theoretical values
theoretical_mean = 0.3 * (-2) + 0.7 * 3
theoretical_var = 0.3 * (1 + 4) + 0.7 * (1.5**2 + 9) - theoretical_mean**2
print(f"Theoretical mean: {theoretical_mean:.3f}")
print(f"Theoretical std dev: {np.sqrt(theoretical_var):.3f}")
# Visualize results
fig, axes = plt.subplots(2, 2, figsize=(15, 10))
# Trace plot
axes[0, 0].plot(samples[:1000])
axes[0, 0].set_title('MCMC Trace (first 1000 samples)')
axes[0, 0].set_xlabel('Iteration')
axes[0, 0].set_ylabel('Parameter Value')
axes[0, 0].grid(True, alpha=0.3)
# Sample distribution vs theoretical distribution
x_range = np.linspace(-6, 8, 1000)
theoretical_density = np.exp([mixture_log_density(x) for x in x_range])
axes[0, 1].hist(samples[1000:], bins=50, density=True, alpha=0.7,
color='skyblue', label='MCMC samples')
axes[0, 1].plot(x_range, theoretical_density, 'r-', linewidth=2,
label='Theoretical density')
axes[0, 1].set_title('Sample Distribution vs Theoretical Distribution')
axes[0, 1].set_xlabel('x')
axes[0, 1].set_ylabel('Density')
axes[0, 1].legend()
axes[0, 1].grid(True, alpha=0.3)
# Autocorrelation function
def autocorrelation(x, max_lags=50):
n = len(x)
x = x - np.mean(x)
autocorrs = np.correlate(x, x, mode='full')
autocorrs = autocorrs[n-1:]
autocorrs = autocorrs / autocorrs[0]
return autocorrs[:max_lags+1]
autocorrs = autocorrelation(samples[1000:])
axes[1, 0].plot(autocorrs, 'b-o', markersize=3)
axes[1, 0].axhline(y=0, color='black', linestyle='-', alpha=0.5)
axes[1, 0].axhline(y=0.05, color='red', linestyle='--', alpha=0.7)
axes[1, 0].set_title('Sample Autocorrelation Function')
axes[1, 0].set_xlabel('Lag')
axes[1, 0].set_ylabel('Autocorrelation')
axes[1, 0].grid(True, alpha=0.3)
# Convergence diagnosis
def running_mean(x):
return np.cumsum(x) / np.arange(1, len(x) + 1)
running_means = running_mean(samples[100:])
axes[1, 1].plot(running_means)
axes[1, 1].axhline(y=theoretical_mean, color='red', linestyle='--',
label=f'Theoretical mean: {theoretical_mean:.3f}')
axes[1, 1].set_title('Sample Mean Convergence')
axes[1, 1].set_xlabel('Sample Size')
axes[1, 1].set_ylabel('Cumulative Mean')
axes[1, 1].legend()
axes[1, 1].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
Example 2: Bayesian Markov Model
class BayesianMarkovModel:
"""Bayesian Markov model"""
def __init__(self, n_states):
self.n_states = n_states
self.posterior_samples = None
def log_likelihood(self, transition_matrix, data):
"""Calculate log-likelihood"""
log_lik = 0
for t in range(len(data) - 1):
current_state = data[t]
next_state = data[t + 1]
prob = transition_matrix[current_state, next_state]
if prob > 0:
log_lik += np.log(prob)
else:
return -np.inf
return log_lik
def log_prior(self, transition_matrix, alpha=1.0):
"""Log density of Dirichlet prior"""
log_prior = 0
for i in range(self.n_states):
for j in range(self.n_states):
if transition_matrix[i, j] > 0:
log_prior += (alpha - 1) * np.log(transition_matrix[i, j])
else:
return -np.inf
return log_prior
def log_posterior(self, transition_matrix, data, alpha=1.0):
"""Log posterior density"""
log_lik = self.log_likelihood(transition_matrix, data)
log_prior = self.log_prior(transition_matrix, alpha)
return log_lik + log_prior
def propose_transition_matrix(self, current_matrix, proposal_scale=0.1):
"""Propose new transition matrix"""
# Propose in log space, then normalize
log_matrix = np.log(current_matrix + 1e-10)
log_proposal = log_matrix + np.random.normal(0, proposal_scale,
(self.n_states, self.n_states))
# Convert back to probability space and normalize
proposal_matrix = np.exp(log_proposal)
for i in range(self.n_states):
proposal_matrix[i] = proposal_matrix[i] / np.sum(proposal_matrix[i])
return proposal_matrix
def mcmc_sample(self, data, n_samples=5000, burn_in=1000, alpha=1.0):
"""MCMC sampling of posterior distribution"""
# Initialize
current_matrix = np.random.dirichlet([alpha] * self.n_states, self.n_states)
current_log_posterior = self.log_posterior(current_matrix, data, alpha)
samples = []
accepted = 0
for i in range(n_samples + burn_in):
# Propose new matrix
proposal_matrix = self.propose_transition_matrix(current_matrix)
proposal_log_posterior = self.log_posterior(proposal_matrix, data, alpha)
# Metropolis-Hastings step
log_alpha = proposal_log_posterior - current_log_posterior
if np.log(np.random.random()) < log_alpha:
current_matrix = proposal_matrix
current_log_posterior = proposal_log_posterior
accepted += 1
# Save sample (after burn-in)
if i >= burn_in:
samples.append(current_matrix.copy())
self.posterior_samples = np.array(samples)
acceptance_rate = accepted / (n_samples + burn_in)
return self.posterior_samples, acceptance_rate
def posterior_summary(self):
"""Posterior distribution summary statistics"""
if self.posterior_samples is None:
raise ValueError("Need to run MCMC sampling first")
summary = {}
for i in range(self.n_states):
for j in range(self.n_states):
param_name = f'P_{i}{j}'
samples = self.posterior_samples[:, i, j]
summary[param_name] = {
'mean': np.mean(samples),
'std': np.std(samples),
'q025': np.percentile(samples, 2.5),
'q975': np.percentile(samples, 97.5)
}
return summary
# Bayesian analysis example
# Generate test data
np.random.seed(42)
true_P = np.array([[0.7, 0.3], [0.4, 0.6]])
n_obs = 200
# Simulate Markov chain
data = [0] # Initial state
for _ in range(n_obs - 1):
current_state = data[-1]
next_state = np.random.choice(2, p=true_P[current_state])
data.append(next_state)
data = np.array(data)
print(f"Bayesian Markov Model Analysis:")
print(f"True transition matrix:\n{true_P}")
print(f"Data length: {len(data)}")
# Create Bayesian model
bayes_model = BayesianMarkovModel(n_states=2)
# MCMC sampling
posterior_samples, acceptance_rate = bayes_model.mcmc_sample(
data, n_samples=3000, burn_in=1000, alpha=1.0
)
print(f"\nMCMC Sampling Results:")
print(f"Acceptance rate: {acceptance_rate:.3f}")
# Posterior summary
posterior_summary = bayes_model.posterior_summary()
print(f"\nPosterior Distribution Summary:")
for param, stats in posterior_summary.items():
i, j = int(param[2]), int(param[3])
true_value = true_P[i, j]
print(f"{param}: Mean={stats['mean']:.3f}, "
f"95%CI=[{stats['q025']:.3f}, {stats['q975']:.3f}], "
f"True value={true_value:.3f}")
# Visualize posterior distribution
fig, axes = plt.subplots(2, 2, figsize=(15, 10))
# Posterior distribution for each parameter
for idx, (param, stats) in enumerate(posterior_summary.items()):
i, j = int(param[2]), int(param[3])
samples = posterior_samples[:, i, j]
ax = axes[i, j]
ax.hist(samples, bins=50, density=True, alpha=0.7, color='lightblue')
ax.axvline(stats['mean'], color='red', linestyle='-',
label=f'Posterior mean: {stats["mean"]:.3f}')
ax.axvline(true_P[i, j], color='green', linestyle='--',
label=f'True value: {true_P[i, j]:.3f}')
ax.set_title(f'Posterior Distribution of {param}')
ax.set_xlabel('Parameter Value')
ax.set_ylabel('Density')
ax.legend()
ax.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
# Posterior predictive check
def posterior_predictive_check(posterior_samples, observed_data, n_replications=1000):
"""Posterior predictive check"""
n_samples = len(posterior_samples)
n_obs = len(observed_data)
# Test statistic: proportion of state 0
observed_stat = np.mean(observed_data == 0)
replicated_stats = []
for _ in range(n_replications):
# Randomly select a posterior sample
sample_idx = np.random.randint(n_samples)
transition_matrix = posterior_samples[sample_idx]
# Simulate data
sim_data = [0] # Same initial state
for t in range(n_obs - 1):
current_state = sim_data[-1]
next_state = np.random.choice(2, p=transition_matrix[current_state])
sim_data.append(next_state)
# Calculate statistic
sim_stat = np.mean(np.array(sim_data) == 0)
replicated_stats.append(sim_stat)
return observed_stat, np.array(replicated_stats)
observed_stat, replicated_stats = posterior_predictive_check(
posterior_samples, data, n_replications=1000
)
print(f"\nPosterior Predictive Check:")
print(f"Observed statistic (state 0 proportion): {observed_stat:.3f}")
print(f"Replicated statistic mean: {np.mean(replicated_stats):.3f}")
print(f"p-value (two-sided): {2 * min(np.mean(replicated_stats >= observed_stat),
np.mean(replicated_stats <= observed_stat)):.3f}")
# Visualize posterior predictive check
plt.figure(figsize=(10, 6))
plt.hist(replicated_stats, bins=50, density=True, alpha=0.7,
color='lightcoral', label='Posterior predictive distribution')
plt.axvline(observed_stat, color='blue', linestyle='-', linewidth=2,
label=f'Observed: {observed_stat:.3f}')
plt.axvline(np.mean(replicated_stats), color='red', linestyle='--',
label=f'Predicted mean: {np.mean(replicated_stats):.3f}')
plt.title('Posterior Predictive Check: Proportion of State 0')
plt.xlabel('State 0 Proportion')
plt.ylabel('Density')
plt.legend()
plt.grid(True, alpha=0.3)
plt.show()
Example 3: MCMC Analysis of Financial Risk
class BayesianRiskModel:
"""Bayesian risk model"""
def __init__(self):
self.samples = None
def simulate_portfolio_returns(self, transition_matrix, state_params, n_periods=252):
"""Simulate portfolio returns"""
returns = []
current_state = 0 # Initial state
for _ in range(n_periods):
# Return parameters for current state
mu, sigma = state_params[current_state]
# Generate return
period_return = np.random.normal(mu, sigma)
returns.append(period_return)
# Transition to next state
next_state = np.random.choice(len(state_params),
p=transition_matrix[current_state])
current_state = next_state
return np.array(returns)
def bayesian_var_estimation(self, posterior_samples, state_params,
confidence_level=0.05, n_simulations=1000):
"""Bayesian VaR estimation"""
var_samples = []
for transition_matrix in posterior_samples:
# Multiple simulations for each posterior sample
portfolio_returns = []
for _ in range(n_simulations):
returns = self.simulate_portfolio_returns(
transition_matrix, state_params, n_periods=252
)
portfolio_returns.extend(returns)
# Calculate VaR
var = np.percentile(portfolio_returns, confidence_level * 100)
var_samples.append(var)
return np.array(var_samples)
# Bayesian analysis of financial risk
risk_model = BayesianRiskModel()
# Define state parameters (mean, std dev)
state_params = [
(-0.01, 0.02), # State 0: Bear market
(0.005, 0.015) # State 1: Bull market
]
print(f"\nBayesian Risk Analysis:")
print(f"State parameters: {state_params}")
# Use previous posterior samples for VaR estimation
var_samples = risk_model.bayesian_var_estimation(
posterior_samples[:500], # Use subset to speed up calculation
state_params,
confidence_level=0.05,
n_simulations=100
)
print(f"\nVaR Analysis Results:")
print(f"5% VaR mean: {np.mean(var_samples):.4f}")
print(f"5% VaR std dev: {np.std(var_samples):.4f}")
print(f"VaR 95% confidence interval: [{np.percentile(var_samples, 2.5):.4f}, "
f"{np.percentile(var_samples, 97.5):.4f}]")
# Visualize VaR uncertainty
plt.figure(figsize=(12, 8))
plt.subplot(2, 2, 1)
plt.hist(var_samples, bins=30, density=True, alpha=0.7, color='lightgreen')
plt.axvline(np.mean(var_samples), color='red', linestyle='-',
label=f'Mean: {np.mean(var_samples):.4f}')
plt.title('Posterior Distribution of 5% VaR')
plt.xlabel('VaR')
plt.ylabel('Density')
plt.legend()
plt.grid(True, alpha=0.3)
# VaR trace
plt.subplot(2, 2, 2)
plt.plot(var_samples[:100])
plt.title('VaR Posterior Sample Trace')
plt.xlabel('Sample Number')
plt.ylabel('VaR')
plt.grid(True, alpha=0.3)
# VaR at different confidence levels
confidence_levels = [0.01, 0.05, 0.1]
var_distributions = {}
for cl in confidence_levels:
var_dist = risk_model.bayesian_var_estimation(
posterior_samples[:100], state_params,
confidence_level=cl, n_simulations=50
)
var_distributions[cl] = var_dist
plt.subplot(2, 2, 3)
for cl, var_dist in var_distributions.items():
plt.hist(var_dist, bins=20, alpha=0.6, density=True,
label=f'{cl*100}% VaR')
plt.title('VaR Distribution at Different Confidence Levels')
plt.xlabel('VaR')
plt.ylabel('Density')
plt.legend()
plt.grid(True, alpha=0.3)
# VaR convergence
plt.subplot(2, 2, 4)
cumulative_means = np.cumsum(var_samples) / np.arange(1, len(var_samples) + 1)
plt.plot(cumulative_means)
plt.axhline(y=np.mean(var_samples), color='red', linestyle='--',
label=f'Final mean: {np.mean(var_samples):.4f}')
plt.title('VaR Estimation Convergence')
plt.xlabel('Sample Size')
plt.ylabel('Cumulative Mean')
plt.legend()
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
print(f"\nMCMC Financial Modeling Summary:")
print(f"1. Bayesian methods can quantify parameter uncertainty")
print(f"2. Posterior distribution provides complete inference information")
print(f"3. Confidence intervals for risk measures are more reliable")
print(f"4. Model selection can be performed via Bayes factors")
Theoretical Analysis
MCMC Convergence Theory
Ergodicity Theorem: If the Markov chain is aperiodic, irreducible, and a stationary distribution exists, then:
Central Limit Theorem:
Bayesian Computation
Marginal Likelihood:
Posterior Predictive Distribution:
Mathematical Formula Summary
-
Metropolis-Hastings Acceptance Probability:
-
Gibbs Sampling Conditional Distribution:
-
Effective Sample Size:
-
Bayesian Information Criterion:
MCMC Application Considerations
- Convergence diagnosis is a critical step in MCMC
- Need sufficient burn-in period to remove initial value influence
- Autocorrelation reduces effective sample size
- Multiple chain runs help diagnose convergence
- Computational complexity is high, need to balance accuracy and efficiency