Macroeconomic Modeling and Policy Analysis

Student
45min

Chapter 15: Macroeconomic Modeling and Policy Analysis

Learning Objectives
  • Master state-space modeling of macroeconomic variables
  • Learn Kalman filter analysis of monetary policy transmission mechanisms
  • Understand dynamic estimation methods for business cycles
  • Master real-time assessment of fiscal policy effects
  • Implement a complete macroeconomic forecasting framework

1. Macroeconomic State-Space Models

1.1 Dynamic Modeling of Economic Growth

Economic growth involves multiple interacting factors. We use Kalman filters to model these dynamic relationships:

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from filterpy.kalman import KalmanFilter
from scipy import linalg
import warnings
warnings.filterwarnings('ignore')

class MacroeconomicGrowthModel:
    """State-space model for macroeconomic growth"""

    def __init__(self):
        # State vector: [GDP_growth, productivity, capital, labor, TFP]
        self.n_states = 5
        self.n_obs = 2  # Observe GDP growth rate and unemployment rate

        self.kf = KalmanFilter(dim_x=self.n_states, dim_z=self.n_obs)

        # Time step (quarterly data)
        dt = 0.25

        # State transition matrix (simplified economic growth model)
        self.kf.F = np.array([
            [0.8, 0.3, 0.2, 0.1, 0.4],    # GDP growth
            [0.0, 0.95, 0.05, 0.0, 0.1],  # Productivity
            [0.05, 0.0, 0.98, 0.0, 0.0],  # Capital
            [0.0, 0.0, 0.0, 0.9, 0.0],    # Labor
            [0.0, 0.1, 0.0, 0.0, 0.99]    # Total Factor Productivity
        ])

        # Observation matrix
        self.kf.H = np.array([
            [1.0, 0.0, 0.0, 0.0, 0.0],     # Direct observation of GDP growth
            [0.0, 0.0, 0.0, -2.0, 0.0]     # Unemployment rate negatively correlated with labor
        ])

        # Process noise covariance
        q_gdp = 0.01        # GDP growth noise
        q_prod = 0.005      # Productivity noise
        q_capital = 0.001   # Capital noise
        q_labor = 0.002     # Labor noise
        q_tfp = 0.001       # TFP noise

        self.kf.Q = np.diag([q_gdp, q_prod, q_capital, q_labor, q_tfp])

        # Observation noise covariance
        self.kf.R = np.diag([0.005, 0.01])  # GDP and unemployment observation noise

        # Initial state
        self.kf.x = np.array([[0.02], [1.0], [1.0], [0.95], [1.0]])  # Initial values
        self.kf.P = np.eye(self.n_states) * 0.01

        # Data storage
        self.history = {
            'gdp_growth': [],
            'productivity': [],
            'capital': [],
            'labor': [],
            'tfp': [],
            'unemployment': []
        }

    def update_economy(self, gdp_growth_obs, unemployment_obs):
        """Update economic state"""
        # Prediction step
        self.kf.predict()

        # Update step
        observations = np.array([gdp_growth_obs, unemployment_obs])
        self.kf.update(observations)

        # Store historical data
        self.history['gdp_growth'].append(self.kf.x[0, 0])
        self.history['productivity'].append(self.kf.x[1, 0])
        self.history['capital'].append(self.kf.x[2, 0])
        self.history['labor'].append(self.kf.x[3, 0])
        self.history['tfp'].append(self.kf.x[4, 0])
        self.history['unemployment'].append(unemployment_obs)

        return {
            'gdp_growth': self.kf.x[0, 0],
            'productivity': self.kf.x[1, 0],
            'capital': self.kf.x[2, 0],
            'labor': self.kf.x[3, 0],
            'tfp': self.kf.x[4, 0],
            'state_uncertainty': np.diagonal(self.kf.P)
        }

    def forecast_growth(self, horizons=8):
        """Forecast future economic growth"""
        forecasts = []

        # Save current state
        x_current = self.kf.x.copy()
        P_current = self.kf.P.copy()

        for h in range(1, horizons + 1):
            # Multi-step prediction
            x_forecast = np.linalg.matrix_power(self.kf.F, h) @ x_current
            P_forecast = self.kf.P.copy()

            # Calculate prediction uncertainty
            for i in range(h):
                P_forecast = self.kf.F @ P_forecast @ self.kf.F.T + self.kf.Q

            forecasts.append({
                'horizon': h,
                'gdp_growth_forecast': x_forecast[0, 0],
                'productivity_forecast': x_forecast[1, 0],
                'forecast_std': np.sqrt(P_forecast[0, 0])
            })

        return forecasts

    def get_business_cycle_component(self):
        """Extract business cycle component"""
        if len(self.history['gdp_growth']) < 8:
            return None

        # Use Hodrick-Prescott filter to separate trend and cycle
        gdp_series = np.array(self.history['gdp_growth'])
        trend, cycle = self._hp_filter(gdp_series, lambda_param=1600)

        return {
            'trend': trend,
            'cycle': cycle,
            'cycle_amplitude': np.std(cycle),
            'current_cycle_position': cycle[-1] if len(cycle) > 0 else 0
        }

    @staticmethod
    def _hp_filter(y, lambda_param=1600):
        """Hodrick-Prescott filter"""
        n = len(y)
        if n < 4:
            return y, np.zeros_like(y)

        # Construct second-order difference matrix
        D = np.zeros((n-2, n))
        for i in range(n-2):
            D[i, i] = 1
            D[i, i+1] = -2
            D[i, i+2] = 1

        # Calculate trend
        I = np.eye(n)
        trend = np.linalg.solve(I + lambda_param * D.T @ D, y)
        cycle = y - trend

        return trend, cycle

# Example: Macroeconomic growth modeling
def demonstrate_macro_growth_modeling():
    print("Starting macroeconomic growth modeling demonstration...")

    # Create model
    macro_model = MacroeconomicGrowthModel()

    # Simulate quarterly economic data
    np.random.seed(42)
    n_quarters = 40  # 10 years of data

    # Generate true economic data (simplified simulation)
    true_growth_trend = 0.02  # 2% base growth rate
    cycle_length = 16  # 4-year cycle
    cycle_amplitude = 0.01

    results = []

    for t in range(n_quarters):
        # Generate cyclical GDP growth
        cycle_component = cycle_amplitude * np.sin(2 * np.pi * t / cycle_length)
        trend_component = true_growth_trend + 0.001 * np.random.normal()
        shock = 0.005 * np.random.normal()

        # Add external shocks
        if t == 20:  # Financial crisis
            shock = -0.03
        elif t == 25:  # Stimulus policy
            shock = 0.02

        gdp_growth = trend_component + cycle_component + shock

        # Unemployment rate (simplified relationship)
        unemployment = 0.05 - 0.5 * gdp_growth + 0.01 * np.random.normal()
        unemployment = max(0.02, min(0.15, unemployment))  # Limit to reasonable range

        # Update model
        result = macro_model.update_economy(gdp_growth, unemployment)
        result['quarter'] = t
        result['true_gdp_growth'] = gdp_growth
        result['unemployment_obs'] = unemployment
        results.append(result)

    # Convert to DataFrame
    df = pd.DataFrame(results)

    # Perform economic forecasts
    forecasts = macro_model.forecast_growth(horizons=8)
    forecast_df = pd.DataFrame(forecasts)

    # Business cycle analysis
    cycle_analysis = macro_model.get_business_cycle_component()

    # Plot analysis
    fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(15, 12))

    # GDP growth rate
    ax1.plot(df['quarter'], df['true_gdp_growth'], label='True GDP Growth', alpha=0.7)
    ax1.plot(df['quarter'], df['gdp_growth'], label='Estimated GDP Growth', alpha=0.7)

    # Add forecasts
    forecast_quarters = np.arange(len(df), len(df) + len(forecast_df))
    forecast_values = forecast_df['gdp_growth_forecast']
    forecast_std = forecast_df['forecast_std']

    ax1.plot(forecast_quarters, forecast_values, 'r--', label='Forecast', alpha=0.8)
    ax1.fill_between(forecast_quarters,
                     forecast_values - 1.96 * forecast_std,
                     forecast_values + 1.96 * forecast_std,
                     alpha=0.3, color='red', label='95% Confidence Interval')

    ax1.set_title('GDP Growth Rate Tracking and Forecast')
    ax1.set_ylabel('Growth Rate')
    ax1.legend()
    ax1.grid(True, alpha=0.3)

    # Production factors
    ax2.plot(df['quarter'], df['productivity'], label='Productivity', alpha=0.7)
    ax2.plot(df['quarter'], df['capital'], label='Capital', alpha=0.7)
    ax2.plot(df['quarter'], df['tfp'], label='Total Factor Productivity', alpha=0.7)
    ax2.set_title('Production Factors Dynamics')
    ax2.set_ylabel('Index Value')
    ax2.legend()
    ax2.grid(True, alpha=0.3)

    # Unemployment rate
    ax3.plot(df['quarter'], df['unemployment_obs'], label='Unemployment Rate', alpha=0.7)
    ax3.plot(df['quarter'], df['labor'], label='Labor Index', alpha=0.7)
    ax3.set_title('Labor Market')
    ax3.set_ylabel('Rate/Index')
    ax3.legend()
    ax3.grid(True, alpha=0.3)

    # Business cycle
    if cycle_analysis:
        ax4.plot(df['quarter'], cycle_analysis['trend'], label='Growth Trend', alpha=0.7)
        ax4.plot(df['quarter'], cycle_analysis['cycle'], label='Cycle Component', alpha=0.7)
        ax4.axhline(y=0, color='black', linestyle='--', alpha=0.5)
        ax4.set_title(f'Business Cycle Decomposition (Cycle Amplitude: {cycle_analysis["cycle_amplitude"]:.3f})')
        ax4.set_ylabel('Growth Rate Deviation')
        ax4.legend()
        ax4.grid(True, alpha=0.3)

    plt.tight_layout()
    plt.show()

    # Print analysis results
    print(f"\nMacroeconomic modeling results:")
    print(f"Current GDP growth rate: {df['gdp_growth'].iloc[-1]:.3f}")
    print(f"Current productivity: {df['productivity'].iloc[-1]:.3f}")
    print(f"Current TFP: {df['tfp'].iloc[-1]:.3f}")

    if cycle_analysis:
        print(f"Current cycle position: {cycle_analysis['current_cycle_position']:.3f}")
        print(f"Cycle amplitude: {cycle_analysis['cycle_amplitude']:.3f}")

    print(f"\nTwo-year growth forecast:")
    for i in range(min(8, len(forecast_df))):
        forecast = forecast_df.iloc[i]
        print(f"Quarter {i+1}: {forecast['gdp_growth_forecast']:.3f} ± {1.96*forecast['forecast_std']:.3f}")

    return df, forecast_df, cycle_analysis

macro_results, macro_forecasts, cycle_info = demonstrate_macro_growth_modeling()

1.2 Inflation Dynamics Modeling

class InflationDynamicsModel:
    """State-space model for inflation dynamics"""

    def __init__(self):
        # State vector: [core_inflation, demand_shock, supply_shock, expectations]
        self.kf = KalmanFilter(dim_x=4, dim_z=2)

        # State transition matrix (extended Phillips curve)
        self.kf.F = np.array([
            [0.7, 0.3, 0.2, 0.3],    # Core inflation
            [0.0, 0.8, 0.0, 0.0],    # Demand shock
            [0.0, 0.0, 0.6, 0.0],    # Supply shock
            [0.1, 0.0, 0.0, 0.9]     # Inflation expectations
        ])

        # Observation matrix: observe headline and core inflation
        self.kf.H = np.array([
            [1.0, 1.0, 1.0, 0.0],   # Headline inflation = core + demand + supply shocks
            [1.0, 0.0, 0.0, 0.0]    # Core inflation direct observation
        ])

        # Process noise covariance
        self.kf.Q = np.diag([0.001, 0.005, 0.01, 0.001])

        # Observation noise covariance
        self.kf.R = np.diag([0.002, 0.001])

        # Initial state
        self.kf.x = np.array([[0.02], [0.0], [0.0], [0.02]])  # 2% inflation target
        self.kf.P = np.eye(4) * 0.01

        # Historical data
        self.inflation_history = []
        self.decomposition_history = []

    def update_inflation(self, headline_inflation, core_inflation, unemployment_gap=0):
        """Update inflation state"""
        # Introduce unemployment gap effect on demand shock
        self.kf.F[1, 0] = -0.5 * unemployment_gap  # Okun's law

        # Predict and update
        self.kf.predict()
        observations = np.array([headline_inflation, core_inflation])
        self.kf.update(observations)

        # Decompose inflation components
        decomposition = {
            'core_inflation': self.kf.x[0, 0],
            'demand_shock': self.kf.x[1, 0],
            'supply_shock': self.kf.x[2, 0],
            'expectations': self.kf.x[3, 0],
            'total_predicted': self.kf.x[0, 0] + self.kf.x[1, 0] + self.kf.x[2, 0]
        }

        self.inflation_history.append({
            'headline': headline_inflation,
            'core': core_inflation,
            'unemployment_gap': unemployment_gap
        })
        self.decomposition_history.append(decomposition)

        return decomposition

    def forecast_inflation(self, horizons=12, policy_shock=None):
        """Forecast inflation path"""
        forecasts = []

        # Save current state
        x_current = self.kf.x.copy()
        F_current = self.kf.F.copy()

        for h in range(1, horizons + 1):
            # Consider policy shocks
            if policy_shock and h <= policy_shock.get('duration', 0):
                # Monetary policy shock affects demand
                if policy_shock['type'] == 'monetary_tightening':
                    x_current[1, 0] -= policy_shock['magnitude'] * 0.1
                elif policy_shock['type'] == 'monetary_easing':
                    x_current[1, 0] += policy_shock['magnitude'] * 0.1

            # Predict state
            x_forecast = self.kf.F @ x_current
            x_current = x_forecast

            total_inflation = x_forecast[0, 0] + x_forecast[1, 0] + x_forecast[2, 0]

            forecasts.append({
                'horizon': h,
                'total_inflation': total_inflation,
                'core_inflation': x_forecast[0, 0],
                'demand_component': x_forecast[1, 0],
                'supply_component': x_forecast[2, 0],
                'expectations': x_forecast[3, 0]
            })

        return forecasts

    def calculate_inflation_persistence(self):
        """Calculate inflation persistence"""
        if len(self.inflation_history) < 12:
            return None

        # Calculate autocorrelation of core inflation
        core_series = [d['core_inflation'] for d in self.decomposition_history]
        persistence = np.corrcoef(core_series[:-1], core_series[1:])[0, 1]

        return {
            'persistence': persistence,
            'half_life': -np.log(2) / np.log(abs(persistence)) if abs(persistence) > 0 else np.inf
        }

# Example: Inflation dynamics modeling
def demonstrate_inflation_dynamics():
    print("Starting inflation dynamics modeling demonstration...")

    # Create inflation model
    inflation_model = InflationDynamicsModel()

    # Simulate inflation data
    np.random.seed(42)
    n_months = 60  # 5 years of monthly data

    results = []

    # Base inflation parameters
    target_inflation = 0.02
    oil_shock_month = 24  # Oil shock
    policy_response_month = 30  # Policy response

    for t in range(n_months):
        # Generate base inflation
        base_inflation = target_inflation + 0.003 * np.sin(2 * np.pi * t / 12)

        # Supply shock (oil prices)
        if t >= oil_shock_month and t < oil_shock_month + 6:
            supply_shock = 0.02 * np.exp(-(t - oil_shock_month) * 0.3)
        else:
            supply_shock = 0.001 * np.random.normal()

        # Demand shock
        demand_shock = 0.002 * np.random.normal()

        # Unemployment gap
        unemployment_gap = 0.5 * np.sin(2 * np.pi * t / 24) + 0.2 * np.random.normal()

        # Calculate observed inflation
        headline_inflation = base_inflation + supply_shock + demand_shock
        core_inflation = base_inflation + 0.5 * demand_shock

        # Update model
        decomp = inflation_model.update_inflation(
            headline_inflation, core_inflation, unemployment_gap
        )

        result = {
            'month': t,
            'headline_inflation': headline_inflation,
            'core_inflation': core_inflation,
            'unemployment_gap': unemployment_gap,
            **decomp
        }
        results.append(result)

    # Convert to DataFrame
    df = pd.DataFrame(results)

    # Forecast inflation (including policy shock)
    policy_shock = {
        'type': 'monetary_tightening',
        'magnitude': 0.5,  # 50 basis points
        'duration': 6
    }

    forecasts = inflation_model.forecast_inflation(horizons=12, policy_shock=policy_shock)
    forecast_df = pd.DataFrame(forecasts)

    # Calculate inflation persistence
    persistence_info = inflation_model.calculate_inflation_persistence()

    # Plot
    fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(15, 12))

    # Inflation decomposition
    ax1.plot(df['month'], df['headline_inflation'], label='Headline Inflation', alpha=0.8)
    ax1.plot(df['month'], df['core_inflation'], label='Core Inflation', alpha=0.8)
    ax1.plot(df['month'], df['total_predicted'], label='Model Prediction', linestyle='--', alpha=0.8)

    # Add forecast
    forecast_months = np.arange(len(df), len(df) + len(forecast_df))
    ax1.plot(forecast_months, forecast_df['total_inflation'], 'r--',
             label='Future Forecast', alpha=0.8)

    ax1.axhline(y=target_inflation, color='black', linestyle=':', alpha=0.5, label='Inflation Target')
    ax1.set_title('Inflation Dynamics Tracking and Forecast')
    ax1.set_ylabel('Inflation Rate')
    ax1.legend()
    ax1.grid(True, alpha=0.3)

    # Inflation component decomposition
    ax2.plot(df['month'], df['core_inflation'], label='Core Component', alpha=0.7)
    ax2.plot(df['month'], df['demand_shock'], label='Demand Shock', alpha=0.7)
    ax2.plot(df['month'], df['supply_shock'], label='Supply Shock', alpha=0.7)
    ax2.axhline(y=0, color='black', linestyle='--', alpha=0.5)
    ax2.set_title('Inflation Component Decomposition')
    ax2.set_ylabel('Contribution')
    ax2.legend()
    ax2.grid(True, alpha=0.3)

    # Inflation expectations
    ax3.plot(df['month'], df['expectations'], label='Inflation Expectations', alpha=0.7)
    ax3.plot(df['month'], df['core_inflation'], label='Core Inflation', alpha=0.7)
    ax3.axhline(y=target_inflation, color='black', linestyle=':', alpha=0.5, label='Target')
    ax3.set_title('Inflation Expectations Anchoring')
    ax3.set_ylabel('Inflation Rate')
    ax3.legend()
    ax3.grid(True, alpha=0.3)

    # Forecast decomposition
    ax4.plot(forecast_df['horizon'], forecast_df['total_inflation'], label='Total Forecast', alpha=0.8)
    ax4.plot(forecast_df['horizon'], forecast_df['core_inflation'], label='Core Forecast', alpha=0.8)
    ax4.fill_between(forecast_df['horizon'],
                     forecast_df['total_inflation'] - 0.005,
                     forecast_df['total_inflation'] + 0.005,
                     alpha=0.3, label='Uncertainty Band')
    ax4.axhline(y=target_inflation, color='black', linestyle=':', alpha=0.5, label='Target')
    ax4.set_title('Inflation Forecast under Policy Shock')
    ax4.set_ylabel('Inflation Rate')
    ax4.set_xlabel('Forecast Horizon (Months)')
    ax4.legend()
    ax4.grid(True, alpha=0.3)

    plt.tight_layout()
    plt.show()

    # Analysis results
    print(f"\nInflation dynamics analysis results:")
    print(f"Current headline inflation: {df['headline_inflation'].iloc[-1]:.3f}")
    print(f"Current core inflation: {df['core_inflation'].iloc[-1]:.3f}")
    print(f"Current demand shock: {df['demand_shock'].iloc[-1]:.3f}")
    print(f"Current supply shock: {df['supply_shock'].iloc[-1]:.3f}")
    print(f"Current inflation expectations: {df['expectations'].iloc[-1]:.3f}")

    if persistence_info:
        print(f"\nInflation persistence analysis:")
        print(f"Core inflation autocorrelation: {persistence_info['persistence']:.3f}")
        print(f"Half-life: {persistence_info['half_life']:.1f} months")

    print(f"\n12-month inflation forecast (with policy shock):")
    for i in range(min(6, len(forecast_df))):
        forecast = forecast_df.iloc[i]
        print(f"Month {i+1}: Total inflation {forecast['total_inflation']:.3f}, "
              f"Core inflation {forecast['core_inflation']:.3f}")

    return df, forecast_df, persistence_info

inflation_results, inflation_forecasts, persistence_stats = demonstrate_inflation_dynamics()

2. Monetary Policy Transmission Mechanism Analysis

2.1 Modeling Interest Rate Transmission Channels

class MonetaryTransmissionModel:
    """Monetary policy transmission mechanism model"""

    def __init__(self):
        # State vector: [policy_rate, short_rate, long_rate, credit_spread, bank_lending]
        self.kf = KalmanFilter(dim_x=5, dim_z=3)

        # State transition matrix (interest rate transmission chain)
        self.kf.F = np.array([
            [0.95, 0.0, 0.0, 0.0, 0.0],   # Policy rate
            [0.8, 0.9, 0.0, 0.0, 0.0],    # Short-term rate
            [0.2, 0.7, 0.95, 0.0, 0.0],   # Long-term rate
            [0.1, 0.1, 0.1, 0.85, 0.0],   # Credit spread
            [0.0, -0.3, -0.5, -0.4, 0.9]  # Bank lending
        ])

        # Observation matrix: observe short-term rate, long-term rate, and credit spread
        self.kf.H = np.array([
            [0.0, 1.0, 0.0, 0.0, 0.0],   # Short-term rate
            [0.0, 0.0, 1.0, 0.0, 0.0],   # Long-term rate
            [0.0, 0.0, 0.0, 1.0, 0.0]    # Credit spread
        ])

        # Process noise covariance
        self.kf.Q = np.diag([0.0001, 0.001, 0.002, 0.005, 0.01])

        # Observation noise covariance
        self.kf.R = np.diag([0.0005, 0.001, 0.002])

        # Initial state
        self.kf.x = np.array([[0.025], [0.025], [0.035], [0.01], [1.0]])
        self.kf.P = np.eye(5) * 0.001

        # Policy history
        self.policy_history = []
        self.transmission_history = []

    def implement_policy_change(self, policy_rate_change, announcement_effect=0):
        """Implement monetary policy change"""
        # Update policy rate
        self.kf.x[0, 0] += policy_rate_change

        # Forward guidance effect (affects long-term rates)
        if announcement_effect != 0:
            self.kf.x[2, 0] += announcement_effect * 0.5

        # Record policy change
        self.policy_history.append({
            'policy_change': policy_rate_change,
            'announcement_effect': announcement_effect,
            'new_policy_rate': self.kf.x[0, 0]
        })

    def update_transmission(self, short_rate_obs, long_rate_obs, credit_spread_obs):
        """Update transmission mechanism state"""
        # Predict and update
        self.kf.predict()
        observations = np.array([short_rate_obs, long_rate_obs, credit_spread_obs])
        self.kf.update(observations)

        # Calculate transmission efficiency
        transmission_state = {
            'policy_rate': self.kf.x[0, 0],
            'short_rate': self.kf.x[1, 0],
            'long_rate': self.kf.x[2, 0],
            'credit_spread': self.kf.x[3, 0],
            'bank_lending': self.kf.x[4, 0],
            'short_transmission': (self.kf.x[1, 0] - self.kf.x[0, 0]) / 0.01 if abs(self.kf.x[0, 0]) > 1e-6 else 0,
            'long_transmission': (self.kf.x[2, 0] - self.kf.x[0, 0]) / 0.01 if abs(self.kf.x[0, 0]) > 1e-6 else 0
        }

        self.transmission_history.append(transmission_state)
        return transmission_state

    def calculate_policy_effectiveness(self, target_variable='bank_lending'):
        """Calculate policy effectiveness"""
        if len(self.transmission_history) < 12:
            return None

        # Analyze target variable response to policy rate
        policy_rates = [t['policy_rate'] for t in self.transmission_history]
        target_values = [t[target_variable] for t in self.transmission_history]

        # Calculate elasticity
        policy_changes = np.diff(policy_rates)
        target_changes = np.diff(target_values)

        if len(policy_changes) > 0 and np.std(policy_changes) > 1e-6:
            elasticity = np.mean(target_changes) / np.mean(policy_changes)
        else:
            elasticity = 0

        return {
            'elasticity': elasticity,
            'transmission_lag': self._estimate_transmission_lag(policy_rates, target_values),
            'effectiveness_score': abs(elasticity) * np.exp(-0.1 * abs(self._estimate_transmission_lag(policy_rates, target_values)))
        }

    def _estimate_transmission_lag(self, policy_series, target_series, max_lag=6):
        """Estimate transmission lag"""
        if len(policy_series) < max_lag + 2:
            return 0

        best_corr = 0
        best_lag = 0

        for lag in range(max_lag + 1):
            if len(policy_series) > lag:
                corr = np.corrcoef(policy_series[:-lag-1] if lag > 0 else policy_series[:-1],
                                 target_series[lag+1:])[0, 1]
                if abs(corr) > abs(best_corr):
                    best_corr = corr
                    best_lag = lag

        return best_lag

# Example: Monetary policy transmission analysis
def demonstrate_monetary_transmission():
    print("Starting monetary policy transmission mechanism analysis...")

    # Create transmission model
    transmission_model = MonetaryTransmissionModel()

    # Simulate policy cycle
    np.random.seed(42)
    n_periods = 48  # 4 years of monthly data

    results = []

    # Policy scenarios
    policy_events = [
        {'period': 12, 'change': 0.0025, 'announcement': 0.001},  # 25bp rate hike
        {'period': 15, 'change': 0.0025, 'announcement': 0.0005}, # Another 25bp hike
        {'period': 30, 'change': -0.005, 'announcement': -0.002}, # 50bp cut (crisis)
        {'period': 33, 'change': -0.0025, 'announcement': -0.001}, # Another 25bp cut
    ]

    for t in range(n_periods):
        # Check for policy events
        for event in policy_events:
            if t == event['period']:
                transmission_model.implement_policy_change(
                    event['change'], event['announcement']
                )

        # Generate market observation data
        base_short = transmission_model.kf.x[1, 0]
        base_long = transmission_model.kf.x[2, 0]
        base_spread = transmission_model.kf.x[3, 0]

        # Add market noise
        short_rate_obs = base_short + 0.001 * np.random.normal()
        long_rate_obs = base_long + 0.002 * np.random.normal()
        credit_spread_obs = base_spread + 0.001 * np.random.normal()

        # Special effects during financial crisis
        if 28 <= t <= 35:
            credit_spread_obs += 0.01 * np.exp(-(t-30)**2/8)  # Credit crunch
            long_rate_obs -= 0.005 * (35-t)/7  # Flight to safety

        # Update transmission state
        transmission_state = transmission_model.update_transmission(
            short_rate_obs, long_rate_obs, credit_spread_obs
        )

        result = {
            'period': t,
            'policy_implemented': any(t == event['period'] for event in policy_events),
            **transmission_state
        }
        results.append(result)

    # Convert to DataFrame
    df = pd.DataFrame(results)

    # Calculate policy effectiveness
    effectiveness = transmission_model.calculate_policy_effectiveness('bank_lending')

    # Plot analysis
    fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(15, 12))

    # Interest rate transmission chain
    ax1.plot(df['period'], df['policy_rate'], label='Policy Rate', linewidth=2)
    ax1.plot(df['period'], df['short_rate'], label='Short-term Rate', alpha=0.8)
    ax1.plot(df['period'], df['long_rate'], label='Long-term Rate', alpha=0.8)

    # Mark policy change points
    policy_periods = [event['period'] for event in policy_events]
    for period in policy_periods:
        ax1.axvline(x=period, color='red', linestyle='--', alpha=0.5)

    ax1.set_title('Monetary Policy Transmission: Interest Rate Chain')
    ax1.set_ylabel('Interest Rate')
    ax1.legend()
    ax1.grid(True, alpha=0.3)

    # Credit transmission
    ax2.plot(df['period'], df['credit_spread'], label='Credit Spread', alpha=0.8)
    ax2.plot(df['period'], df['bank_lending'], label='Bank Lending Index', alpha=0.8)
    ax2.set_title('Monetary Policy Transmission: Credit Channel')
    ax2.set_ylabel('Spread/Index')
    ax2.legend()
    ax2.grid(True, alpha=0.3)

    # Transmission efficiency
    ax3.plot(df['period'], df['short_transmission'], label='Short-term Transmission Efficiency', alpha=0.8)
    ax3.plot(df['period'], df['long_transmission'], label='Long-term Transmission Efficiency', alpha=0.8)
    ax3.axhline(y=1.0, color='black', linestyle=':', alpha=0.5, label='Full Transmission')
    ax3.set_title('Transmission Efficiency Analysis')
    ax3.set_ylabel('Transmission Ratio')
    ax3.legend()
    ax3.grid(True, alpha=0.3)

    # Policy effectiveness over time
    if len(df) >= 12:
        rolling_effectiveness = []
        for i in range(12, len(df)):
            # Calculate rolling window policy effectiveness
            window_data = df.iloc[i-12:i]
            policy_var = np.var(window_data['policy_rate'])
            lending_var = np.var(window_data['bank_lending'])
            effectiveness_score = lending_var / (policy_var + 1e-6)
            rolling_effectiveness.append(effectiveness_score)

        ax4.plot(df['period'].iloc[12:], rolling_effectiveness, alpha=0.8)
        ax4.set_title('Rolling Policy Effectiveness')
        ax4.set_ylabel('Effectiveness Score')
        ax4.grid(True, alpha=0.3)

    plt.tight_layout()
    plt.show()

    # Analysis results
    print(f"\nMonetary policy transmission analysis results:")
    print(f"Current policy rate: {df['policy_rate'].iloc[-1]:.3f}")
    print(f"Current short-term rate: {df['short_rate'].iloc[-1]:.3f}")
    print(f"Current long-term rate: {df['long_rate'].iloc[-1]:.3f}")
    print(f"Current credit spread: {df['credit_spread'].iloc[-1]:.3f}")
    print(f"Current bank lending index: {df['bank_lending'].iloc[-1]:.3f}")

    if effectiveness:
        print(f"\nPolicy effectiveness analysis:")
        print(f"Bank lending elasticity: {effectiveness['elasticity']:.3f}")
        print(f"Transmission lag: {effectiveness['transmission_lag']} periods")
        print(f"Overall effectiveness score: {effectiveness['effectiveness_score']:.3f}")

    # Calculate average transmission time
    avg_short_transmission = np.mean(df['short_transmission'])
    avg_long_transmission = np.mean(df['long_transmission'])

    print(f"\nAverage transmission efficiency:")
    print(f"Short-term rate transmission: {avg_short_transmission:.3f}")
    print(f"Long-term rate transmission: {avg_long_transmission:.3f}")

    return df, effectiveness

transmission_results, transmission_effectiveness = demonstrate_monetary_transmission()

3. Fiscal Policy Effect Assessment

3.1 Dynamic Estimation of Fiscal Multipliers

class FiscalMultiplierModel:
    """Fiscal multiplier dynamic estimation model"""

    def __init__(self):
        # State vector: [output_gap, fiscal_impulse, automatic_stabilizer, multiplier]
        self.kf = KalmanFilter(dim_x=4, dim_z=2)

        # State transition matrix
        self.kf.F = np.array([
            [0.8, 1.0, 0.5, 0.0],    # Output gap
            [0.0, 0.7, 0.0, 0.0],    # Fiscal impulse
            [-0.3, 0.0, 0.9, 0.0],   # Automatic stabilizer
            [0.0, 0.0, 0.0, 0.95]    # Fiscal multiplier
        ])

        # Observation matrix: observe output gap and fiscal balance
        self.kf.H = np.array([
            [1.0, 0.0, 0.0, 0.0],    # Output gap
            [0.0, 1.0, 1.0, 0.0]     # Fiscal balance = impulse + automatic stabilizer
        ])

        # Process noise covariance
        self.kf.Q = np.diag([0.01, 0.005, 0.002, 0.001])

        # Observation noise covariance
        self.kf.R = np.diag([0.005, 0.003])

        # Initial state
        self.kf.x = np.array([[0.0], [0.0], [0.0], [1.2]])  # Initial multiplier 1.2
        self.kf.P = np.eye(4) * 0.01

        # Historical data
        self.fiscal_history = []

    def implement_fiscal_policy(self, fiscal_stimulus, policy_type='spending'):
        """Implement fiscal policy"""
        # Adjust transmission mechanism based on policy type
        if policy_type == 'spending':
            # Government spending directly affects output
            self.kf.F[0, 1] = 1.0
        elif policy_type == 'tax_cut':
            # Tax cuts affect output through consumption, milder effect
            self.kf.F[0, 1] = 0.7
        elif policy_type == 'investment':
            # Infrastructure investment, higher long-term multiplier
            self.kf.F[0, 1] = 1.3

        # Update fiscal impulse
        self.kf.x[1, 0] = fiscal_stimulus

        return {
            'fiscal_impulse': fiscal_stimulus,
            'policy_type': policy_type,
            'expected_multiplier': self.kf.x[3, 0]
        }

    def update_fiscal_effects(self, output_gap_obs, fiscal_balance_obs,
                            economic_cycle='normal'):
        """Update fiscal policy effects"""
        # Adjust multiplier based on economic cycle
        if economic_cycle == 'recession':
            # Higher multiplier during recession
            self.kf.Q[3, 3] *= 1.5
        elif economic_cycle == 'expansion':
            # Lower multiplier during expansion
            self.kf.Q[3, 3] *= 0.7

        # Predict and update
        self.kf.predict()
        observations = np.array([output_gap_obs, fiscal_balance_obs])
        self.kf.update(observations)

        # Calculate instant multiplier
        current_multiplier = self.kf.x[3, 0]
        fiscal_impulse = self.kf.x[1, 0]

        fiscal_state = {
            'output_gap': self.kf.x[0, 0],
            'fiscal_impulse': fiscal_impulse,
            'automatic_stabilizer': self.kf.x[2, 0],
            'fiscal_multiplier': current_multiplier,
            'fiscal_contribution': fiscal_impulse * current_multiplier,
            'economic_cycle': economic_cycle
        }

        self.fiscal_history.append(fiscal_state)
        return fiscal_state

    def estimate_multiplier_by_state(self):
        """Estimate multiplier by economic state"""
        if len(self.fiscal_history) < 12:
            return None

        # Analyze multipliers under different economic states
        recession_periods = [h for h in self.fiscal_history if h['economic_cycle'] == 'recession']
        normal_periods = [h for h in self.fiscal_history if h['economic_cycle'] == 'normal']
        expansion_periods = [h for h in self.fiscal_history if h['economic_cycle'] == 'expansion']

        results = {}

        for state, periods in [('recession', recession_periods),
                              ('normal', normal_periods),
                              ('expansion', expansion_periods)]:
            if len(periods) > 0:
                multipliers = [p['fiscal_multiplier'] for p in periods]
                results[state] = {
                    'avg_multiplier': np.mean(multipliers),
                    'std_multiplier': np.std(multipliers),
                    'observations': len(periods)
                }

        return results

    def forecast_fiscal_impact(self, planned_policies, horizons=8):
        """Forecast fiscal policy impact"""
        forecasts = []

        # Save current state
        x_current = self.kf.x.copy()

        for h in range(1, horizons + 1):
            # Apply planned policies
            if h <= len(planned_policies):
                policy = planned_policies[h-1]
                x_current[1, 0] = policy.get('fiscal_impulse', 0)

                # Adjust based on policy type
                if policy.get('type') == 'investment':
                    x_current[3, 0] *= 1.1  # Higher investment multiplier

            # Predict state
            x_forecast = self.kf.F @ x_current
            x_current = x_forecast

            forecasts.append({
                'horizon': h,
                'output_gap_forecast': x_forecast[0, 0],
                'fiscal_multiplier_forecast': x_forecast[3, 0],
                'cumulative_impact': np.sum([x_forecast[1, 0] * x_forecast[3, 0]
                                           for _ in range(h)])
            })

        return forecasts

# Example: Fiscal policy effect assessment
def demonstrate_fiscal_policy_analysis():
    print("Starting fiscal policy effect assessment...")

    # Create fiscal model
    fiscal_model = FiscalMultiplierModel()

    # Simulate economic cycle and fiscal policy
    np.random.seed(42)
    n_quarters = 32  # 8 years of quarterly data

    results = []

    # Define economic cycle
    cycle_phases = ['expansion'] * 12 + ['recession'] * 6 + ['normal'] * 8 + ['expansion'] * 6

    # Fiscal policy events
    fiscal_events = [
        {'quarter': 8, 'stimulus': 0.02, 'type': 'tax_cut'},      # Tax cut
        {'quarter': 12, 'stimulus': 0.05, 'type': 'spending'},    # Crisis spending
        {'quarter': 14, 'stimulus': 0.03, 'type': 'investment'},  # Infrastructure investment
        {'quarter': 24, 'stimulus': -0.02, 'type': 'consolidation'} # Fiscal consolidation
    ]

    for t in range(n_quarters):
        # Economic cycle
        cycle_phase = cycle_phases[t] if t < len(cycle_phases) else 'normal'

        # Check fiscal policy events
        fiscal_stimulus = 0
        policy_type = 'none'

        for event in fiscal_events:
            if t == event['quarter']:
                fiscal_stimulus = event['stimulus']
                policy_type = event['type']
                fiscal_model.implement_fiscal_policy(fiscal_stimulus, policy_type)

        # Generate observation data
        # Base output gap
        if cycle_phase == 'recession':
            base_output_gap = -0.03 + 0.01 * np.random.normal()
        elif cycle_phase == 'expansion':
            base_output_gap = 0.02 + 0.005 * np.random.normal()
        else:
            base_output_gap = 0.005 * np.random.normal()

        # Fiscal balance (including cyclical factors)
        cyclical_balance = -0.3 * base_output_gap  # Automatic stabilizer
        fiscal_balance = fiscal_stimulus + cyclical_balance + 0.005 * np.random.normal()

        # Update model
        fiscal_state = fiscal_model.update_fiscal_effects(
            base_output_gap, fiscal_balance, cycle_phase
        )

        result = {
            'quarter': t,
            'cycle_phase': cycle_phase,
            'policy_implemented': policy_type != 'none',
            'policy_type': policy_type,
            **fiscal_state
        }
        results.append(result)

    # Convert to DataFrame
    df = pd.DataFrame(results)

    # Estimate multiplier by state
    multiplier_by_state = fiscal_model.estimate_multiplier_by_state()

    # Future policy planning
    planned_policies = [
        {'fiscal_impulse': 0.01, 'type': 'investment'},
        {'fiscal_impulse': 0.015, 'type': 'investment'},
        {'fiscal_impulse': 0.01, 'type': 'spending'},
        {'fiscal_impulse': 0.005, 'type': 'spending'}
    ]

    forecasts = fiscal_model.forecast_fiscal_impact(planned_policies)
    forecast_df = pd.DataFrame(forecasts)

    # Plot analysis
    fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(15, 12))

    # Output gap and fiscal impulse
    ax1.plot(df['quarter'], df['output_gap'], label='Output Gap', alpha=0.8)
    ax1.plot(df['quarter'], df['fiscal_impulse'], label='Fiscal Impulse', alpha=0.8)

    # Mark different economic cycles
    for i, phase in enumerate(df['cycle_phase']):
        color = {'recession': 'red', 'expansion': 'green', 'normal': 'blue'}[phase]
        ax1.axvspan(i-0.4, i+0.4, alpha=0.1, color=color)

    ax1.set_title('Output Gap and Fiscal Policy')
    ax1.set_ylabel('Ratio')
    ax1.legend()
    ax1.grid(True, alpha=0.3)

    # Fiscal multiplier changes
    ax2.plot(df['quarter'], df['fiscal_multiplier'], label='Fiscal Multiplier', alpha=0.8)

    # Add average multiplier lines
    if multiplier_by_state:
        for state, info in multiplier_by_state.items():
            ax2.axhline(y=info['avg_multiplier'],
                       linestyle='--', alpha=0.6,
                       label=f'{state} avg: {info["avg_multiplier"]:.2f}')

    ax2.set_title('Dynamic Fiscal Multiplier')
    ax2.set_ylabel('Multiplier')
    ax2.legend()
    ax2.grid(True, alpha=0.3)

    # Fiscal contribution decomposition
    ax3.plot(df['quarter'], df['fiscal_contribution'], label='Fiscal Contribution', alpha=0.8)
    ax3.plot(df['quarter'], df['automatic_stabilizer'], label='Automatic Stabilizer', alpha=0.8)
    ax3.axhline(y=0, color='black', linestyle='--', alpha=0.5)
    ax3.set_title('Fiscal Policy Contribution to Growth')
    ax3.set_ylabel('Contribution')
    ax3.legend()
    ax3.grid(True, alpha=0.3)

    # Policy effect forecast
    forecast_quarters = np.arange(len(df), len(df) + len(forecast_df))
    ax4.plot(forecast_quarters, forecast_df['output_gap_forecast'],
             label='Output Gap Forecast', alpha=0.8)
    ax4.plot(forecast_quarters, forecast_df['cumulative_impact'],
             label='Cumulative Fiscal Impact', alpha=0.8)
    ax4.set_title('Fiscal Policy Effect Forecast')
    ax4.set_ylabel('Impact')
    ax4.set_xlabel('Quarter')
    ax4.legend()
    ax4.grid(True, alpha=0.3)

    plt.tight_layout()
    plt.show()

    # Analysis results
    print(f"\nFiscal policy effect analysis:")
    print(f"Current output gap: {df['output_gap'].iloc[-1]:.3f}")
    print(f"Current fiscal multiplier: {df['fiscal_multiplier'].iloc[-1]:.2f}")
    print(f"Current fiscal contribution: {df['fiscal_contribution'].iloc[-1]:.3f}")

    if multiplier_by_state:
        print(f"\nFiscal multipliers by economic state:")
        for state, info in multiplier_by_state.items():
            print(f"{state}: {info['avg_multiplier']:.2f} ± {info['std_multiplier']:.2f} "
                  f"(observations: {info['observations']})")

    print(f"\nExpected effects of planned policies:")
    total_impact = forecast_df['cumulative_impact'].iloc[-1]
    avg_multiplier = forecast_df['fiscal_multiplier_forecast'].mean()
    print(f"Cumulative impact: {total_impact:.3f}")
    print(f"Average expected multiplier: {avg_multiplier:.2f}")

    return df, forecast_df, multiplier_by_state

fiscal_results, fiscal_forecasts, multiplier_analysis = demonstrate_fiscal_policy_analysis()

Chapter Summary

This chapter explored in depth the application of Kalman filters in macroeconomic modeling and policy analysis:

  1. Macroeconomic Modeling:

    • Dynamic state-space models for economic growth
    • Inflation dynamics and component decomposition
    • Real-time identification of business cycles
  2. Monetary Policy Analysis:

    • Modeling interest rate transmission mechanisms
    • Assessment of policy effectiveness
    • Analysis of transmission lags
  3. Fiscal Policy Assessment:

    • Dynamic fiscal multiplier estimation
    • Analysis of policy type differences
    • Economic cycle dependence
  4. Policy Tool Applications:

    • Real-time policy effect monitoring
    • Policy shock identification
    • Forward-looking policy simulation

These applications demonstrate the powerful capabilities of Kalman filters in macroeconomic analysis, particularly in handling multivariate dynamic systems, quantifying policy effects, and economic forecasting.


Next Chapter Preview: Chapter 16 will cover “Modern Developments and Practical Applications in Financial Econometrics,” which is the final chapter of the course. It will comprehensively apply all the techniques learned previously and explore the latest developments and practical application cases of Kalman filters.

🔄 正在渲染 Mermaid 图表...