Chapter 13: Monte Carlo Simulation and Markov Chains

Haiyue
20min

Chapter 13: Monte Carlo Simulation and Markov Chains

Learning Objectives
  • Understand Markov Chain Monte Carlo (MCMC) methods
  • Implement Metropolis-Hastings algorithm
  • Perform Bayesian estimation of financial models
  • Apply MCMC for risk analysis

Knowledge Summary

1. Fundamentals of MCMC

Markov Chain Monte Carlo (MCMC) is a class of random simulation methods based on Markov chains, used for sampling from complex probability distributions.

Basic Idea: Construct a Markov chain {Xt}\{X_t\} such that its stationary distribution is the target distribution π(x)\pi(x): limtP(XtA)=Aπ(x)dx\lim_{t \to \infty} P(X_t \in A) = \int_A \pi(x)dx

🔄 正在渲染 Mermaid 图表...

2. Metropolis-Hastings Algorithm

Algorithm Steps:

  1. Generate candidate value xx' from proposal distribution q(xx)q(x'|x)
  2. Calculate acceptance probability: α(x,x)=min(1,π(x)q(xx)π(x)q(xx))\alpha(x,x') = \min\left(1, \frac{\pi(x')q(x|x')}{\pi(x)q(x'|x)}\right)
  3. Accept xx' with probability α\alpha, otherwise keep xx

3. Bayesian Financial Modeling

Bayes’ Theorem: p(θy)=p(yθ)p(θ)p(y)p(yθ)p(θ)p(\theta|y) = \frac{p(y|\theta)p(\theta)}{p(y)} \propto p(y|\theta)p(\theta)

Financial Applications:

  • Parameter uncertainty quantification
  • Model selection and averaging
  • Confidence intervals for risk measures

Example Code

Example 1: Metropolis-Hastings Algorithm

import numpy as np
import matplotlib.pyplot as plt
from scipy import stats
import pandas as pd

class MetropolisHastings:
    """Metropolis-Hastings sampler"""

    def __init__(self, target_log_density, proposal_std=1.0):
        self.target_log_density = target_log_density
        self.proposal_std = proposal_std
        self.samples = []
        self.acceptance_rate = 0

    def sample(self, n_samples, initial_value=0.0):
        """Execute MCMC sampling"""
        current_x = initial_value
        current_log_density = self.target_log_density(current_x)

        accepted = 0
        samples = []

        for i in range(n_samples):
            # Propose new state
            proposal_x = current_x + np.random.normal(0, self.proposal_std)
            proposal_log_density = self.target_log_density(proposal_x)

            # Calculate acceptance probability (log scale)
            log_alpha = proposal_log_density - current_log_density
            alpha = min(1.0, np.exp(log_alpha))

            # Accept or reject
            if np.random.random() < alpha:
                current_x = proposal_x
                current_log_density = proposal_log_density
                accepted += 1

            samples.append(current_x)

        self.samples = np.array(samples)
        self.acceptance_rate = accepted / n_samples
        return self.samples

def normal_log_density(x, mu=0, sigma=1):
    """Log density of normal distribution"""
    return -0.5 * ((x - mu) / sigma)**2 - np.log(sigma * np.sqrt(2 * np.pi))

def mixture_log_density(x):
    """Log density of mixture normal distribution"""
    # Mixture of two normals: 0.3*N(-2,1) + 0.7*N(3,1.5)
    log_p1 = np.log(0.3) + normal_log_density(x, -2, 1)
    log_p2 = np.log(0.7) + normal_log_density(x, 3, 1.5)
    return np.logaddexp(log_p1, log_p2)

# Example: Sample mixture normal distribution
np.random.seed(42)
sampler = MetropolisHastings(mixture_log_density, proposal_std=2.0)
samples = sampler.sample(10000, initial_value=0.0)

print(f"MCMC Sampling Results:")
print(f"Acceptance rate: {sampler.acceptance_rate:.3f}")
print(f"Sample mean: {np.mean(samples):.3f}")
print(f"Sample std dev: {np.std(samples):.3f}")

# Calculate theoretical values
theoretical_mean = 0.3 * (-2) + 0.7 * 3
theoretical_var = 0.3 * (1 + 4) + 0.7 * (1.5**2 + 9) - theoretical_mean**2
print(f"Theoretical mean: {theoretical_mean:.3f}")
print(f"Theoretical std dev: {np.sqrt(theoretical_var):.3f}")

# Visualize results
fig, axes = plt.subplots(2, 2, figsize=(15, 10))

# Trace plot
axes[0, 0].plot(samples[:1000])
axes[0, 0].set_title('MCMC Trace (first 1000 samples)')
axes[0, 0].set_xlabel('Iteration')
axes[0, 0].set_ylabel('Parameter Value')
axes[0, 0].grid(True, alpha=0.3)

# Sample distribution vs theoretical distribution
x_range = np.linspace(-6, 8, 1000)
theoretical_density = np.exp([mixture_log_density(x) for x in x_range])

axes[0, 1].hist(samples[1000:], bins=50, density=True, alpha=0.7,
               color='skyblue', label='MCMC samples')
axes[0, 1].plot(x_range, theoretical_density, 'r-', linewidth=2,
               label='Theoretical density')
axes[0, 1].set_title('Sample Distribution vs Theoretical Distribution')
axes[0, 1].set_xlabel('x')
axes[0, 1].set_ylabel('Density')
axes[0, 1].legend()
axes[0, 1].grid(True, alpha=0.3)

# Autocorrelation function
def autocorrelation(x, max_lags=50):
    n = len(x)
    x = x - np.mean(x)
    autocorrs = np.correlate(x, x, mode='full')
    autocorrs = autocorrs[n-1:]
    autocorrs = autocorrs / autocorrs[0]
    return autocorrs[:max_lags+1]

autocorrs = autocorrelation(samples[1000:])
axes[1, 0].plot(autocorrs, 'b-o', markersize=3)
axes[1, 0].axhline(y=0, color='black', linestyle='-', alpha=0.5)
axes[1, 0].axhline(y=0.05, color='red', linestyle='--', alpha=0.7)
axes[1, 0].set_title('Sample Autocorrelation Function')
axes[1, 0].set_xlabel('Lag')
axes[1, 0].set_ylabel('Autocorrelation')
axes[1, 0].grid(True, alpha=0.3)

# Convergence diagnosis
def running_mean(x):
    return np.cumsum(x) / np.arange(1, len(x) + 1)

running_means = running_mean(samples[100:])
axes[1, 1].plot(running_means)
axes[1, 1].axhline(y=theoretical_mean, color='red', linestyle='--',
                  label=f'Theoretical mean: {theoretical_mean:.3f}')
axes[1, 1].set_title('Sample Mean Convergence')
axes[1, 1].set_xlabel('Sample Size')
axes[1, 1].set_ylabel('Cumulative Mean')
axes[1, 1].legend()
axes[1, 1].grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

Example 2: Bayesian Markov Model

class BayesianMarkovModel:
    """Bayesian Markov model"""

    def __init__(self, n_states):
        self.n_states = n_states
        self.posterior_samples = None

    def log_likelihood(self, transition_matrix, data):
        """Calculate log-likelihood"""
        log_lik = 0
        for t in range(len(data) - 1):
            current_state = data[t]
            next_state = data[t + 1]
            prob = transition_matrix[current_state, next_state]
            if prob > 0:
                log_lik += np.log(prob)
            else:
                return -np.inf
        return log_lik

    def log_prior(self, transition_matrix, alpha=1.0):
        """Log density of Dirichlet prior"""
        log_prior = 0
        for i in range(self.n_states):
            for j in range(self.n_states):
                if transition_matrix[i, j] > 0:
                    log_prior += (alpha - 1) * np.log(transition_matrix[i, j])
                else:
                    return -np.inf
        return log_prior

    def log_posterior(self, transition_matrix, data, alpha=1.0):
        """Log posterior density"""
        log_lik = self.log_likelihood(transition_matrix, data)
        log_prior = self.log_prior(transition_matrix, alpha)
        return log_lik + log_prior

    def propose_transition_matrix(self, current_matrix, proposal_scale=0.1):
        """Propose new transition matrix"""
        # Propose in log space, then normalize
        log_matrix = np.log(current_matrix + 1e-10)
        log_proposal = log_matrix + np.random.normal(0, proposal_scale,
                                                   (self.n_states, self.n_states))

        # Convert back to probability space and normalize
        proposal_matrix = np.exp(log_proposal)
        for i in range(self.n_states):
            proposal_matrix[i] = proposal_matrix[i] / np.sum(proposal_matrix[i])

        return proposal_matrix

    def mcmc_sample(self, data, n_samples=5000, burn_in=1000, alpha=1.0):
        """MCMC sampling of posterior distribution"""
        # Initialize
        current_matrix = np.random.dirichlet([alpha] * self.n_states, self.n_states)
        current_log_posterior = self.log_posterior(current_matrix, data, alpha)

        samples = []
        accepted = 0

        for i in range(n_samples + burn_in):
            # Propose new matrix
            proposal_matrix = self.propose_transition_matrix(current_matrix)
            proposal_log_posterior = self.log_posterior(proposal_matrix, data, alpha)

            # Metropolis-Hastings step
            log_alpha = proposal_log_posterior - current_log_posterior
            if np.log(np.random.random()) < log_alpha:
                current_matrix = proposal_matrix
                current_log_posterior = proposal_log_posterior
                accepted += 1

            # Save sample (after burn-in)
            if i >= burn_in:
                samples.append(current_matrix.copy())

        self.posterior_samples = np.array(samples)
        acceptance_rate = accepted / (n_samples + burn_in)

        return self.posterior_samples, acceptance_rate

    def posterior_summary(self):
        """Posterior distribution summary statistics"""
        if self.posterior_samples is None:
            raise ValueError("Need to run MCMC sampling first")

        summary = {}
        for i in range(self.n_states):
            for j in range(self.n_states):
                param_name = f'P_{i}{j}'
                samples = self.posterior_samples[:, i, j]
                summary[param_name] = {
                    'mean': np.mean(samples),
                    'std': np.std(samples),
                    'q025': np.percentile(samples, 2.5),
                    'q975': np.percentile(samples, 97.5)
                }

        return summary

# Bayesian analysis example
# Generate test data
np.random.seed(42)
true_P = np.array([[0.7, 0.3], [0.4, 0.6]])
n_obs = 200

# Simulate Markov chain
data = [0]  # Initial state
for _ in range(n_obs - 1):
    current_state = data[-1]
    next_state = np.random.choice(2, p=true_P[current_state])
    data.append(next_state)

data = np.array(data)

print(f"Bayesian Markov Model Analysis:")
print(f"True transition matrix:\n{true_P}")
print(f"Data length: {len(data)}")

# Create Bayesian model
bayes_model = BayesianMarkovModel(n_states=2)

# MCMC sampling
posterior_samples, acceptance_rate = bayes_model.mcmc_sample(
    data, n_samples=3000, burn_in=1000, alpha=1.0
)

print(f"\nMCMC Sampling Results:")
print(f"Acceptance rate: {acceptance_rate:.3f}")

# Posterior summary
posterior_summary = bayes_model.posterior_summary()
print(f"\nPosterior Distribution Summary:")
for param, stats in posterior_summary.items():
    i, j = int(param[2]), int(param[3])
    true_value = true_P[i, j]
    print(f"{param}: Mean={stats['mean']:.3f}, "
          f"95%CI=[{stats['q025']:.3f}, {stats['q975']:.3f}], "
          f"True value={true_value:.3f}")

# Visualize posterior distribution
fig, axes = plt.subplots(2, 2, figsize=(15, 10))

# Posterior distribution for each parameter
for idx, (param, stats) in enumerate(posterior_summary.items()):
    i, j = int(param[2]), int(param[3])
    samples = posterior_samples[:, i, j]

    ax = axes[i, j]
    ax.hist(samples, bins=50, density=True, alpha=0.7, color='lightblue')
    ax.axvline(stats['mean'], color='red', linestyle='-',
              label=f'Posterior mean: {stats["mean"]:.3f}')
    ax.axvline(true_P[i, j], color='green', linestyle='--',
              label=f'True value: {true_P[i, j]:.3f}')
    ax.set_title(f'Posterior Distribution of {param}')
    ax.set_xlabel('Parameter Value')
    ax.set_ylabel('Density')
    ax.legend()
    ax.grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

# Posterior predictive check
def posterior_predictive_check(posterior_samples, observed_data, n_replications=1000):
    """Posterior predictive check"""
    n_samples = len(posterior_samples)
    n_obs = len(observed_data)

    # Test statistic: proportion of state 0
    observed_stat = np.mean(observed_data == 0)

    replicated_stats = []

    for _ in range(n_replications):
        # Randomly select a posterior sample
        sample_idx = np.random.randint(n_samples)
        transition_matrix = posterior_samples[sample_idx]

        # Simulate data
        sim_data = [0]  # Same initial state
        for t in range(n_obs - 1):
            current_state = sim_data[-1]
            next_state = np.random.choice(2, p=transition_matrix[current_state])
            sim_data.append(next_state)

        # Calculate statistic
        sim_stat = np.mean(np.array(sim_data) == 0)
        replicated_stats.append(sim_stat)

    return observed_stat, np.array(replicated_stats)

observed_stat, replicated_stats = posterior_predictive_check(
    posterior_samples, data, n_replications=1000
)

print(f"\nPosterior Predictive Check:")
print(f"Observed statistic (state 0 proportion): {observed_stat:.3f}")
print(f"Replicated statistic mean: {np.mean(replicated_stats):.3f}")
print(f"p-value (two-sided): {2 * min(np.mean(replicated_stats >= observed_stat),
                       np.mean(replicated_stats <= observed_stat)):.3f}")

# Visualize posterior predictive check
plt.figure(figsize=(10, 6))
plt.hist(replicated_stats, bins=50, density=True, alpha=0.7,
         color='lightcoral', label='Posterior predictive distribution')
plt.axvline(observed_stat, color='blue', linestyle='-', linewidth=2,
           label=f'Observed: {observed_stat:.3f}')
plt.axvline(np.mean(replicated_stats), color='red', linestyle='--',
           label=f'Predicted mean: {np.mean(replicated_stats):.3f}')
plt.title('Posterior Predictive Check: Proportion of State 0')
plt.xlabel('State 0 Proportion')
plt.ylabel('Density')
plt.legend()
plt.grid(True, alpha=0.3)
plt.show()

Example 3: MCMC Analysis of Financial Risk

class BayesianRiskModel:
    """Bayesian risk model"""

    def __init__(self):
        self.samples = None

    def simulate_portfolio_returns(self, transition_matrix, state_params, n_periods=252):
        """Simulate portfolio returns"""
        returns = []
        current_state = 0  # Initial state

        for _ in range(n_periods):
            # Return parameters for current state
            mu, sigma = state_params[current_state]

            # Generate return
            period_return = np.random.normal(mu, sigma)
            returns.append(period_return)

            # Transition to next state
            next_state = np.random.choice(len(state_params),
                                        p=transition_matrix[current_state])
            current_state = next_state

        return np.array(returns)

    def bayesian_var_estimation(self, posterior_samples, state_params,
                               confidence_level=0.05, n_simulations=1000):
        """Bayesian VaR estimation"""
        var_samples = []

        for transition_matrix in posterior_samples:
            # Multiple simulations for each posterior sample
            portfolio_returns = []
            for _ in range(n_simulations):
                returns = self.simulate_portfolio_returns(
                    transition_matrix, state_params, n_periods=252
                )
                portfolio_returns.extend(returns)

            # Calculate VaR
            var = np.percentile(portfolio_returns, confidence_level * 100)
            var_samples.append(var)

        return np.array(var_samples)

# Bayesian analysis of financial risk
risk_model = BayesianRiskModel()

# Define state parameters (mean, std dev)
state_params = [
    (-0.01, 0.02),  # State 0: Bear market
    (0.005, 0.015)  # State 1: Bull market
]

print(f"\nBayesian Risk Analysis:")
print(f"State parameters: {state_params}")

# Use previous posterior samples for VaR estimation
var_samples = risk_model.bayesian_var_estimation(
    posterior_samples[:500],  # Use subset to speed up calculation
    state_params,
    confidence_level=0.05,
    n_simulations=100
)

print(f"\nVaR Analysis Results:")
print(f"5% VaR mean: {np.mean(var_samples):.4f}")
print(f"5% VaR std dev: {np.std(var_samples):.4f}")
print(f"VaR 95% confidence interval: [{np.percentile(var_samples, 2.5):.4f}, "
      f"{np.percentile(var_samples, 97.5):.4f}]")

# Visualize VaR uncertainty
plt.figure(figsize=(12, 8))

plt.subplot(2, 2, 1)
plt.hist(var_samples, bins=30, density=True, alpha=0.7, color='lightgreen')
plt.axvline(np.mean(var_samples), color='red', linestyle='-',
           label=f'Mean: {np.mean(var_samples):.4f}')
plt.title('Posterior Distribution of 5% VaR')
plt.xlabel('VaR')
plt.ylabel('Density')
plt.legend()
plt.grid(True, alpha=0.3)

# VaR trace
plt.subplot(2, 2, 2)
plt.plot(var_samples[:100])
plt.title('VaR Posterior Sample Trace')
plt.xlabel('Sample Number')
plt.ylabel('VaR')
plt.grid(True, alpha=0.3)

# VaR at different confidence levels
confidence_levels = [0.01, 0.05, 0.1]
var_distributions = {}

for cl in confidence_levels:
    var_dist = risk_model.bayesian_var_estimation(
        posterior_samples[:100], state_params,
        confidence_level=cl, n_simulations=50
    )
    var_distributions[cl] = var_dist

plt.subplot(2, 2, 3)
for cl, var_dist in var_distributions.items():
    plt.hist(var_dist, bins=20, alpha=0.6, density=True,
             label=f'{cl*100}% VaR')

plt.title('VaR Distribution at Different Confidence Levels')
plt.xlabel('VaR')
plt.ylabel('Density')
plt.legend()
plt.grid(True, alpha=0.3)

# VaR convergence
plt.subplot(2, 2, 4)
cumulative_means = np.cumsum(var_samples) / np.arange(1, len(var_samples) + 1)
plt.plot(cumulative_means)
plt.axhline(y=np.mean(var_samples), color='red', linestyle='--',
           label=f'Final mean: {np.mean(var_samples):.4f}')
plt.title('VaR Estimation Convergence')
plt.xlabel('Sample Size')
plt.ylabel('Cumulative Mean')
plt.legend()
plt.grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

print(f"\nMCMC Financial Modeling Summary:")
print(f"1. Bayesian methods can quantify parameter uncertainty")
print(f"2. Posterior distribution provides complete inference information")
print(f"3. Confidence intervals for risk measures are more reliable")
print(f"4. Model selection can be performed via Bayes factors")

Theoretical Analysis

MCMC Convergence Theory

Ergodicity Theorem: If the Markov chain is aperiodic, irreducible, and a stationary distribution exists, then: limtPt(x,)π()TV=0\lim_{t \to \infty} \|P^t(x, \cdot) - \pi(\cdot)\|_{TV} = 0

Central Limit Theorem: n(fˉnEπ[f])dN(0,σf2)\sqrt{n}(\bar{f}_n - E_\pi[f]) \xrightarrow{d} N(0, \sigma_f^2)

Bayesian Computation

Marginal Likelihood: p(y)=p(yθ)p(θ)dθp(y) = \int p(y|\theta)p(\theta)d\theta

Posterior Predictive Distribution: p(yy)=p(yθ)p(θy)dθp(y^* | y) = \int p(y^* | \theta)p(\theta | y)d\theta

Mathematical Formula Summary

  1. Metropolis-Hastings Acceptance Probability: α=min(1,π(x)q(xx)π(x)q(xx))\alpha = \min\left(1, \frac{\pi(x')q(x|x')}{\pi(x)q(x'|x)}\right)

  2. Gibbs Sampling Conditional Distribution: p(xixi)p(x)p(x_i | x_{-i}) \propto p(x)

  3. Effective Sample Size: ESS=n1+2k=1ρkESS = \frac{n}{1 + 2\sum_{k=1}^{\infty}\rho_k}

  4. Bayesian Information Criterion: BIC=2logp(yθ^)+klognBIC = -2\log p(y|\hat{\theta}) + k\log n

MCMC Application Considerations
  • Convergence diagnosis is a critical step in MCMC
  • Need sufficient burn-in period to remove initial value influence
  • Autocorrelation reduces effective sample size
  • Multiple chain runs help diagnose convergence
  • Computational complexity is high, need to balance accuracy and efficiency