Macroeconomic Modeling and Policy Analysis
Chapter 15: Macroeconomic Modeling and Policy Analysis
- Master state-space modeling of macroeconomic variables
- Learn Kalman filter analysis of monetary policy transmission mechanisms
- Understand dynamic estimation methods for business cycles
- Master real-time assessment of fiscal policy effects
- Implement a complete macroeconomic forecasting framework
1. Macroeconomic State-Space Models
1.1 Dynamic Modeling of Economic Growth
Economic growth involves multiple interacting factors. We use Kalman filters to model these dynamic relationships:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from filterpy.kalman import KalmanFilter
from scipy import linalg
import warnings
warnings.filterwarnings('ignore')
class MacroeconomicGrowthModel:
"""State-space model for macroeconomic growth"""
def __init__(self):
# State vector: [GDP_growth, productivity, capital, labor, TFP]
self.n_states = 5
self.n_obs = 2 # Observe GDP growth rate and unemployment rate
self.kf = KalmanFilter(dim_x=self.n_states, dim_z=self.n_obs)
# Time step (quarterly data)
dt = 0.25
# State transition matrix (simplified economic growth model)
self.kf.F = np.array([
[0.8, 0.3, 0.2, 0.1, 0.4], # GDP growth
[0.0, 0.95, 0.05, 0.0, 0.1], # Productivity
[0.05, 0.0, 0.98, 0.0, 0.0], # Capital
[0.0, 0.0, 0.0, 0.9, 0.0], # Labor
[0.0, 0.1, 0.0, 0.0, 0.99] # Total Factor Productivity
])
# Observation matrix
self.kf.H = np.array([
[1.0, 0.0, 0.0, 0.0, 0.0], # Direct observation of GDP growth
[0.0, 0.0, 0.0, -2.0, 0.0] # Unemployment rate negatively correlated with labor
])
# Process noise covariance
q_gdp = 0.01 # GDP growth noise
q_prod = 0.005 # Productivity noise
q_capital = 0.001 # Capital noise
q_labor = 0.002 # Labor noise
q_tfp = 0.001 # TFP noise
self.kf.Q = np.diag([q_gdp, q_prod, q_capital, q_labor, q_tfp])
# Observation noise covariance
self.kf.R = np.diag([0.005, 0.01]) # GDP and unemployment observation noise
# Initial state
self.kf.x = np.array([[0.02], [1.0], [1.0], [0.95], [1.0]]) # Initial values
self.kf.P = np.eye(self.n_states) * 0.01
# Data storage
self.history = {
'gdp_growth': [],
'productivity': [],
'capital': [],
'labor': [],
'tfp': [],
'unemployment': []
}
def update_economy(self, gdp_growth_obs, unemployment_obs):
"""Update economic state"""
# Prediction step
self.kf.predict()
# Update step
observations = np.array([gdp_growth_obs, unemployment_obs])
self.kf.update(observations)
# Store historical data
self.history['gdp_growth'].append(self.kf.x[0, 0])
self.history['productivity'].append(self.kf.x[1, 0])
self.history['capital'].append(self.kf.x[2, 0])
self.history['labor'].append(self.kf.x[3, 0])
self.history['tfp'].append(self.kf.x[4, 0])
self.history['unemployment'].append(unemployment_obs)
return {
'gdp_growth': self.kf.x[0, 0],
'productivity': self.kf.x[1, 0],
'capital': self.kf.x[2, 0],
'labor': self.kf.x[3, 0],
'tfp': self.kf.x[4, 0],
'state_uncertainty': np.diagonal(self.kf.P)
}
def forecast_growth(self, horizons=8):
"""Forecast future economic growth"""
forecasts = []
# Save current state
x_current = self.kf.x.copy()
P_current = self.kf.P.copy()
for h in range(1, horizons + 1):
# Multi-step prediction
x_forecast = np.linalg.matrix_power(self.kf.F, h) @ x_current
P_forecast = self.kf.P.copy()
# Calculate prediction uncertainty
for i in range(h):
P_forecast = self.kf.F @ P_forecast @ self.kf.F.T + self.kf.Q
forecasts.append({
'horizon': h,
'gdp_growth_forecast': x_forecast[0, 0],
'productivity_forecast': x_forecast[1, 0],
'forecast_std': np.sqrt(P_forecast[0, 0])
})
return forecasts
def get_business_cycle_component(self):
"""Extract business cycle component"""
if len(self.history['gdp_growth']) < 8:
return None
# Use Hodrick-Prescott filter to separate trend and cycle
gdp_series = np.array(self.history['gdp_growth'])
trend, cycle = self._hp_filter(gdp_series, lambda_param=1600)
return {
'trend': trend,
'cycle': cycle,
'cycle_amplitude': np.std(cycle),
'current_cycle_position': cycle[-1] if len(cycle) > 0 else 0
}
@staticmethod
def _hp_filter(y, lambda_param=1600):
"""Hodrick-Prescott filter"""
n = len(y)
if n < 4:
return y, np.zeros_like(y)
# Construct second-order difference matrix
D = np.zeros((n-2, n))
for i in range(n-2):
D[i, i] = 1
D[i, i+1] = -2
D[i, i+2] = 1
# Calculate trend
I = np.eye(n)
trend = np.linalg.solve(I + lambda_param * D.T @ D, y)
cycle = y - trend
return trend, cycle
# Example: Macroeconomic growth modeling
def demonstrate_macro_growth_modeling():
print("Starting macroeconomic growth modeling demonstration...")
# Create model
macro_model = MacroeconomicGrowthModel()
# Simulate quarterly economic data
np.random.seed(42)
n_quarters = 40 # 10 years of data
# Generate true economic data (simplified simulation)
true_growth_trend = 0.02 # 2% base growth rate
cycle_length = 16 # 4-year cycle
cycle_amplitude = 0.01
results = []
for t in range(n_quarters):
# Generate cyclical GDP growth
cycle_component = cycle_amplitude * np.sin(2 * np.pi * t / cycle_length)
trend_component = true_growth_trend + 0.001 * np.random.normal()
shock = 0.005 * np.random.normal()
# Add external shocks
if t == 20: # Financial crisis
shock = -0.03
elif t == 25: # Stimulus policy
shock = 0.02
gdp_growth = trend_component + cycle_component + shock
# Unemployment rate (simplified relationship)
unemployment = 0.05 - 0.5 * gdp_growth + 0.01 * np.random.normal()
unemployment = max(0.02, min(0.15, unemployment)) # Limit to reasonable range
# Update model
result = macro_model.update_economy(gdp_growth, unemployment)
result['quarter'] = t
result['true_gdp_growth'] = gdp_growth
result['unemployment_obs'] = unemployment
results.append(result)
# Convert to DataFrame
df = pd.DataFrame(results)
# Perform economic forecasts
forecasts = macro_model.forecast_growth(horizons=8)
forecast_df = pd.DataFrame(forecasts)
# Business cycle analysis
cycle_analysis = macro_model.get_business_cycle_component()
# Plot analysis
fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(15, 12))
# GDP growth rate
ax1.plot(df['quarter'], df['true_gdp_growth'], label='True GDP Growth', alpha=0.7)
ax1.plot(df['quarter'], df['gdp_growth'], label='Estimated GDP Growth', alpha=0.7)
# Add forecasts
forecast_quarters = np.arange(len(df), len(df) + len(forecast_df))
forecast_values = forecast_df['gdp_growth_forecast']
forecast_std = forecast_df['forecast_std']
ax1.plot(forecast_quarters, forecast_values, 'r--', label='Forecast', alpha=0.8)
ax1.fill_between(forecast_quarters,
forecast_values - 1.96 * forecast_std,
forecast_values + 1.96 * forecast_std,
alpha=0.3, color='red', label='95% Confidence Interval')
ax1.set_title('GDP Growth Rate Tracking and Forecast')
ax1.set_ylabel('Growth Rate')
ax1.legend()
ax1.grid(True, alpha=0.3)
# Production factors
ax2.plot(df['quarter'], df['productivity'], label='Productivity', alpha=0.7)
ax2.plot(df['quarter'], df['capital'], label='Capital', alpha=0.7)
ax2.plot(df['quarter'], df['tfp'], label='Total Factor Productivity', alpha=0.7)
ax2.set_title('Production Factors Dynamics')
ax2.set_ylabel('Index Value')
ax2.legend()
ax2.grid(True, alpha=0.3)
# Unemployment rate
ax3.plot(df['quarter'], df['unemployment_obs'], label='Unemployment Rate', alpha=0.7)
ax3.plot(df['quarter'], df['labor'], label='Labor Index', alpha=0.7)
ax3.set_title('Labor Market')
ax3.set_ylabel('Rate/Index')
ax3.legend()
ax3.grid(True, alpha=0.3)
# Business cycle
if cycle_analysis:
ax4.plot(df['quarter'], cycle_analysis['trend'], label='Growth Trend', alpha=0.7)
ax4.plot(df['quarter'], cycle_analysis['cycle'], label='Cycle Component', alpha=0.7)
ax4.axhline(y=0, color='black', linestyle='--', alpha=0.5)
ax4.set_title(f'Business Cycle Decomposition (Cycle Amplitude: {cycle_analysis["cycle_amplitude"]:.3f})')
ax4.set_ylabel('Growth Rate Deviation')
ax4.legend()
ax4.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
# Print analysis results
print(f"\nMacroeconomic modeling results:")
print(f"Current GDP growth rate: {df['gdp_growth'].iloc[-1]:.3f}")
print(f"Current productivity: {df['productivity'].iloc[-1]:.3f}")
print(f"Current TFP: {df['tfp'].iloc[-1]:.3f}")
if cycle_analysis:
print(f"Current cycle position: {cycle_analysis['current_cycle_position']:.3f}")
print(f"Cycle amplitude: {cycle_analysis['cycle_amplitude']:.3f}")
print(f"\nTwo-year growth forecast:")
for i in range(min(8, len(forecast_df))):
forecast = forecast_df.iloc[i]
print(f"Quarter {i+1}: {forecast['gdp_growth_forecast']:.3f} ± {1.96*forecast['forecast_std']:.3f}")
return df, forecast_df, cycle_analysis
macro_results, macro_forecasts, cycle_info = demonstrate_macro_growth_modeling()
1.2 Inflation Dynamics Modeling
class InflationDynamicsModel:
"""State-space model for inflation dynamics"""
def __init__(self):
# State vector: [core_inflation, demand_shock, supply_shock, expectations]
self.kf = KalmanFilter(dim_x=4, dim_z=2)
# State transition matrix (extended Phillips curve)
self.kf.F = np.array([
[0.7, 0.3, 0.2, 0.3], # Core inflation
[0.0, 0.8, 0.0, 0.0], # Demand shock
[0.0, 0.0, 0.6, 0.0], # Supply shock
[0.1, 0.0, 0.0, 0.9] # Inflation expectations
])
# Observation matrix: observe headline and core inflation
self.kf.H = np.array([
[1.0, 1.0, 1.0, 0.0], # Headline inflation = core + demand + supply shocks
[1.0, 0.0, 0.0, 0.0] # Core inflation direct observation
])
# Process noise covariance
self.kf.Q = np.diag([0.001, 0.005, 0.01, 0.001])
# Observation noise covariance
self.kf.R = np.diag([0.002, 0.001])
# Initial state
self.kf.x = np.array([[0.02], [0.0], [0.0], [0.02]]) # 2% inflation target
self.kf.P = np.eye(4) * 0.01
# Historical data
self.inflation_history = []
self.decomposition_history = []
def update_inflation(self, headline_inflation, core_inflation, unemployment_gap=0):
"""Update inflation state"""
# Introduce unemployment gap effect on demand shock
self.kf.F[1, 0] = -0.5 * unemployment_gap # Okun's law
# Predict and update
self.kf.predict()
observations = np.array([headline_inflation, core_inflation])
self.kf.update(observations)
# Decompose inflation components
decomposition = {
'core_inflation': self.kf.x[0, 0],
'demand_shock': self.kf.x[1, 0],
'supply_shock': self.kf.x[2, 0],
'expectations': self.kf.x[3, 0],
'total_predicted': self.kf.x[0, 0] + self.kf.x[1, 0] + self.kf.x[2, 0]
}
self.inflation_history.append({
'headline': headline_inflation,
'core': core_inflation,
'unemployment_gap': unemployment_gap
})
self.decomposition_history.append(decomposition)
return decomposition
def forecast_inflation(self, horizons=12, policy_shock=None):
"""Forecast inflation path"""
forecasts = []
# Save current state
x_current = self.kf.x.copy()
F_current = self.kf.F.copy()
for h in range(1, horizons + 1):
# Consider policy shocks
if policy_shock and h <= policy_shock.get('duration', 0):
# Monetary policy shock affects demand
if policy_shock['type'] == 'monetary_tightening':
x_current[1, 0] -= policy_shock['magnitude'] * 0.1
elif policy_shock['type'] == 'monetary_easing':
x_current[1, 0] += policy_shock['magnitude'] * 0.1
# Predict state
x_forecast = self.kf.F @ x_current
x_current = x_forecast
total_inflation = x_forecast[0, 0] + x_forecast[1, 0] + x_forecast[2, 0]
forecasts.append({
'horizon': h,
'total_inflation': total_inflation,
'core_inflation': x_forecast[0, 0],
'demand_component': x_forecast[1, 0],
'supply_component': x_forecast[2, 0],
'expectations': x_forecast[3, 0]
})
return forecasts
def calculate_inflation_persistence(self):
"""Calculate inflation persistence"""
if len(self.inflation_history) < 12:
return None
# Calculate autocorrelation of core inflation
core_series = [d['core_inflation'] for d in self.decomposition_history]
persistence = np.corrcoef(core_series[:-1], core_series[1:])[0, 1]
return {
'persistence': persistence,
'half_life': -np.log(2) / np.log(abs(persistence)) if abs(persistence) > 0 else np.inf
}
# Example: Inflation dynamics modeling
def demonstrate_inflation_dynamics():
print("Starting inflation dynamics modeling demonstration...")
# Create inflation model
inflation_model = InflationDynamicsModel()
# Simulate inflation data
np.random.seed(42)
n_months = 60 # 5 years of monthly data
results = []
# Base inflation parameters
target_inflation = 0.02
oil_shock_month = 24 # Oil shock
policy_response_month = 30 # Policy response
for t in range(n_months):
# Generate base inflation
base_inflation = target_inflation + 0.003 * np.sin(2 * np.pi * t / 12)
# Supply shock (oil prices)
if t >= oil_shock_month and t < oil_shock_month + 6:
supply_shock = 0.02 * np.exp(-(t - oil_shock_month) * 0.3)
else:
supply_shock = 0.001 * np.random.normal()
# Demand shock
demand_shock = 0.002 * np.random.normal()
# Unemployment gap
unemployment_gap = 0.5 * np.sin(2 * np.pi * t / 24) + 0.2 * np.random.normal()
# Calculate observed inflation
headline_inflation = base_inflation + supply_shock + demand_shock
core_inflation = base_inflation + 0.5 * demand_shock
# Update model
decomp = inflation_model.update_inflation(
headline_inflation, core_inflation, unemployment_gap
)
result = {
'month': t,
'headline_inflation': headline_inflation,
'core_inflation': core_inflation,
'unemployment_gap': unemployment_gap,
**decomp
}
results.append(result)
# Convert to DataFrame
df = pd.DataFrame(results)
# Forecast inflation (including policy shock)
policy_shock = {
'type': 'monetary_tightening',
'magnitude': 0.5, # 50 basis points
'duration': 6
}
forecasts = inflation_model.forecast_inflation(horizons=12, policy_shock=policy_shock)
forecast_df = pd.DataFrame(forecasts)
# Calculate inflation persistence
persistence_info = inflation_model.calculate_inflation_persistence()
# Plot
fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(15, 12))
# Inflation decomposition
ax1.plot(df['month'], df['headline_inflation'], label='Headline Inflation', alpha=0.8)
ax1.plot(df['month'], df['core_inflation'], label='Core Inflation', alpha=0.8)
ax1.plot(df['month'], df['total_predicted'], label='Model Prediction', linestyle='--', alpha=0.8)
# Add forecast
forecast_months = np.arange(len(df), len(df) + len(forecast_df))
ax1.plot(forecast_months, forecast_df['total_inflation'], 'r--',
label='Future Forecast', alpha=0.8)
ax1.axhline(y=target_inflation, color='black', linestyle=':', alpha=0.5, label='Inflation Target')
ax1.set_title('Inflation Dynamics Tracking and Forecast')
ax1.set_ylabel('Inflation Rate')
ax1.legend()
ax1.grid(True, alpha=0.3)
# Inflation component decomposition
ax2.plot(df['month'], df['core_inflation'], label='Core Component', alpha=0.7)
ax2.plot(df['month'], df['demand_shock'], label='Demand Shock', alpha=0.7)
ax2.plot(df['month'], df['supply_shock'], label='Supply Shock', alpha=0.7)
ax2.axhline(y=0, color='black', linestyle='--', alpha=0.5)
ax2.set_title('Inflation Component Decomposition')
ax2.set_ylabel('Contribution')
ax2.legend()
ax2.grid(True, alpha=0.3)
# Inflation expectations
ax3.plot(df['month'], df['expectations'], label='Inflation Expectations', alpha=0.7)
ax3.plot(df['month'], df['core_inflation'], label='Core Inflation', alpha=0.7)
ax3.axhline(y=target_inflation, color='black', linestyle=':', alpha=0.5, label='Target')
ax3.set_title('Inflation Expectations Anchoring')
ax3.set_ylabel('Inflation Rate')
ax3.legend()
ax3.grid(True, alpha=0.3)
# Forecast decomposition
ax4.plot(forecast_df['horizon'], forecast_df['total_inflation'], label='Total Forecast', alpha=0.8)
ax4.plot(forecast_df['horizon'], forecast_df['core_inflation'], label='Core Forecast', alpha=0.8)
ax4.fill_between(forecast_df['horizon'],
forecast_df['total_inflation'] - 0.005,
forecast_df['total_inflation'] + 0.005,
alpha=0.3, label='Uncertainty Band')
ax4.axhline(y=target_inflation, color='black', linestyle=':', alpha=0.5, label='Target')
ax4.set_title('Inflation Forecast under Policy Shock')
ax4.set_ylabel('Inflation Rate')
ax4.set_xlabel('Forecast Horizon (Months)')
ax4.legend()
ax4.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
# Analysis results
print(f"\nInflation dynamics analysis results:")
print(f"Current headline inflation: {df['headline_inflation'].iloc[-1]:.3f}")
print(f"Current core inflation: {df['core_inflation'].iloc[-1]:.3f}")
print(f"Current demand shock: {df['demand_shock'].iloc[-1]:.3f}")
print(f"Current supply shock: {df['supply_shock'].iloc[-1]:.3f}")
print(f"Current inflation expectations: {df['expectations'].iloc[-1]:.3f}")
if persistence_info:
print(f"\nInflation persistence analysis:")
print(f"Core inflation autocorrelation: {persistence_info['persistence']:.3f}")
print(f"Half-life: {persistence_info['half_life']:.1f} months")
print(f"\n12-month inflation forecast (with policy shock):")
for i in range(min(6, len(forecast_df))):
forecast = forecast_df.iloc[i]
print(f"Month {i+1}: Total inflation {forecast['total_inflation']:.3f}, "
f"Core inflation {forecast['core_inflation']:.3f}")
return df, forecast_df, persistence_info
inflation_results, inflation_forecasts, persistence_stats = demonstrate_inflation_dynamics()
2. Monetary Policy Transmission Mechanism Analysis
2.1 Modeling Interest Rate Transmission Channels
class MonetaryTransmissionModel:
"""Monetary policy transmission mechanism model"""
def __init__(self):
# State vector: [policy_rate, short_rate, long_rate, credit_spread, bank_lending]
self.kf = KalmanFilter(dim_x=5, dim_z=3)
# State transition matrix (interest rate transmission chain)
self.kf.F = np.array([
[0.95, 0.0, 0.0, 0.0, 0.0], # Policy rate
[0.8, 0.9, 0.0, 0.0, 0.0], # Short-term rate
[0.2, 0.7, 0.95, 0.0, 0.0], # Long-term rate
[0.1, 0.1, 0.1, 0.85, 0.0], # Credit spread
[0.0, -0.3, -0.5, -0.4, 0.9] # Bank lending
])
# Observation matrix: observe short-term rate, long-term rate, and credit spread
self.kf.H = np.array([
[0.0, 1.0, 0.0, 0.0, 0.0], # Short-term rate
[0.0, 0.0, 1.0, 0.0, 0.0], # Long-term rate
[0.0, 0.0, 0.0, 1.0, 0.0] # Credit spread
])
# Process noise covariance
self.kf.Q = np.diag([0.0001, 0.001, 0.002, 0.005, 0.01])
# Observation noise covariance
self.kf.R = np.diag([0.0005, 0.001, 0.002])
# Initial state
self.kf.x = np.array([[0.025], [0.025], [0.035], [0.01], [1.0]])
self.kf.P = np.eye(5) * 0.001
# Policy history
self.policy_history = []
self.transmission_history = []
def implement_policy_change(self, policy_rate_change, announcement_effect=0):
"""Implement monetary policy change"""
# Update policy rate
self.kf.x[0, 0] += policy_rate_change
# Forward guidance effect (affects long-term rates)
if announcement_effect != 0:
self.kf.x[2, 0] += announcement_effect * 0.5
# Record policy change
self.policy_history.append({
'policy_change': policy_rate_change,
'announcement_effect': announcement_effect,
'new_policy_rate': self.kf.x[0, 0]
})
def update_transmission(self, short_rate_obs, long_rate_obs, credit_spread_obs):
"""Update transmission mechanism state"""
# Predict and update
self.kf.predict()
observations = np.array([short_rate_obs, long_rate_obs, credit_spread_obs])
self.kf.update(observations)
# Calculate transmission efficiency
transmission_state = {
'policy_rate': self.kf.x[0, 0],
'short_rate': self.kf.x[1, 0],
'long_rate': self.kf.x[2, 0],
'credit_spread': self.kf.x[3, 0],
'bank_lending': self.kf.x[4, 0],
'short_transmission': (self.kf.x[1, 0] - self.kf.x[0, 0]) / 0.01 if abs(self.kf.x[0, 0]) > 1e-6 else 0,
'long_transmission': (self.kf.x[2, 0] - self.kf.x[0, 0]) / 0.01 if abs(self.kf.x[0, 0]) > 1e-6 else 0
}
self.transmission_history.append(transmission_state)
return transmission_state
def calculate_policy_effectiveness(self, target_variable='bank_lending'):
"""Calculate policy effectiveness"""
if len(self.transmission_history) < 12:
return None
# Analyze target variable response to policy rate
policy_rates = [t['policy_rate'] for t in self.transmission_history]
target_values = [t[target_variable] for t in self.transmission_history]
# Calculate elasticity
policy_changes = np.diff(policy_rates)
target_changes = np.diff(target_values)
if len(policy_changes) > 0 and np.std(policy_changes) > 1e-6:
elasticity = np.mean(target_changes) / np.mean(policy_changes)
else:
elasticity = 0
return {
'elasticity': elasticity,
'transmission_lag': self._estimate_transmission_lag(policy_rates, target_values),
'effectiveness_score': abs(elasticity) * np.exp(-0.1 * abs(self._estimate_transmission_lag(policy_rates, target_values)))
}
def _estimate_transmission_lag(self, policy_series, target_series, max_lag=6):
"""Estimate transmission lag"""
if len(policy_series) < max_lag + 2:
return 0
best_corr = 0
best_lag = 0
for lag in range(max_lag + 1):
if len(policy_series) > lag:
corr = np.corrcoef(policy_series[:-lag-1] if lag > 0 else policy_series[:-1],
target_series[lag+1:])[0, 1]
if abs(corr) > abs(best_corr):
best_corr = corr
best_lag = lag
return best_lag
# Example: Monetary policy transmission analysis
def demonstrate_monetary_transmission():
print("Starting monetary policy transmission mechanism analysis...")
# Create transmission model
transmission_model = MonetaryTransmissionModel()
# Simulate policy cycle
np.random.seed(42)
n_periods = 48 # 4 years of monthly data
results = []
# Policy scenarios
policy_events = [
{'period': 12, 'change': 0.0025, 'announcement': 0.001}, # 25bp rate hike
{'period': 15, 'change': 0.0025, 'announcement': 0.0005}, # Another 25bp hike
{'period': 30, 'change': -0.005, 'announcement': -0.002}, # 50bp cut (crisis)
{'period': 33, 'change': -0.0025, 'announcement': -0.001}, # Another 25bp cut
]
for t in range(n_periods):
# Check for policy events
for event in policy_events:
if t == event['period']:
transmission_model.implement_policy_change(
event['change'], event['announcement']
)
# Generate market observation data
base_short = transmission_model.kf.x[1, 0]
base_long = transmission_model.kf.x[2, 0]
base_spread = transmission_model.kf.x[3, 0]
# Add market noise
short_rate_obs = base_short + 0.001 * np.random.normal()
long_rate_obs = base_long + 0.002 * np.random.normal()
credit_spread_obs = base_spread + 0.001 * np.random.normal()
# Special effects during financial crisis
if 28 <= t <= 35:
credit_spread_obs += 0.01 * np.exp(-(t-30)**2/8) # Credit crunch
long_rate_obs -= 0.005 * (35-t)/7 # Flight to safety
# Update transmission state
transmission_state = transmission_model.update_transmission(
short_rate_obs, long_rate_obs, credit_spread_obs
)
result = {
'period': t,
'policy_implemented': any(t == event['period'] for event in policy_events),
**transmission_state
}
results.append(result)
# Convert to DataFrame
df = pd.DataFrame(results)
# Calculate policy effectiveness
effectiveness = transmission_model.calculate_policy_effectiveness('bank_lending')
# Plot analysis
fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(15, 12))
# Interest rate transmission chain
ax1.plot(df['period'], df['policy_rate'], label='Policy Rate', linewidth=2)
ax1.plot(df['period'], df['short_rate'], label='Short-term Rate', alpha=0.8)
ax1.plot(df['period'], df['long_rate'], label='Long-term Rate', alpha=0.8)
# Mark policy change points
policy_periods = [event['period'] for event in policy_events]
for period in policy_periods:
ax1.axvline(x=period, color='red', linestyle='--', alpha=0.5)
ax1.set_title('Monetary Policy Transmission: Interest Rate Chain')
ax1.set_ylabel('Interest Rate')
ax1.legend()
ax1.grid(True, alpha=0.3)
# Credit transmission
ax2.plot(df['period'], df['credit_spread'], label='Credit Spread', alpha=0.8)
ax2.plot(df['period'], df['bank_lending'], label='Bank Lending Index', alpha=0.8)
ax2.set_title('Monetary Policy Transmission: Credit Channel')
ax2.set_ylabel('Spread/Index')
ax2.legend()
ax2.grid(True, alpha=0.3)
# Transmission efficiency
ax3.plot(df['period'], df['short_transmission'], label='Short-term Transmission Efficiency', alpha=0.8)
ax3.plot(df['period'], df['long_transmission'], label='Long-term Transmission Efficiency', alpha=0.8)
ax3.axhline(y=1.0, color='black', linestyle=':', alpha=0.5, label='Full Transmission')
ax3.set_title('Transmission Efficiency Analysis')
ax3.set_ylabel('Transmission Ratio')
ax3.legend()
ax3.grid(True, alpha=0.3)
# Policy effectiveness over time
if len(df) >= 12:
rolling_effectiveness = []
for i in range(12, len(df)):
# Calculate rolling window policy effectiveness
window_data = df.iloc[i-12:i]
policy_var = np.var(window_data['policy_rate'])
lending_var = np.var(window_data['bank_lending'])
effectiveness_score = lending_var / (policy_var + 1e-6)
rolling_effectiveness.append(effectiveness_score)
ax4.plot(df['period'].iloc[12:], rolling_effectiveness, alpha=0.8)
ax4.set_title('Rolling Policy Effectiveness')
ax4.set_ylabel('Effectiveness Score')
ax4.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
# Analysis results
print(f"\nMonetary policy transmission analysis results:")
print(f"Current policy rate: {df['policy_rate'].iloc[-1]:.3f}")
print(f"Current short-term rate: {df['short_rate'].iloc[-1]:.3f}")
print(f"Current long-term rate: {df['long_rate'].iloc[-1]:.3f}")
print(f"Current credit spread: {df['credit_spread'].iloc[-1]:.3f}")
print(f"Current bank lending index: {df['bank_lending'].iloc[-1]:.3f}")
if effectiveness:
print(f"\nPolicy effectiveness analysis:")
print(f"Bank lending elasticity: {effectiveness['elasticity']:.3f}")
print(f"Transmission lag: {effectiveness['transmission_lag']} periods")
print(f"Overall effectiveness score: {effectiveness['effectiveness_score']:.3f}")
# Calculate average transmission time
avg_short_transmission = np.mean(df['short_transmission'])
avg_long_transmission = np.mean(df['long_transmission'])
print(f"\nAverage transmission efficiency:")
print(f"Short-term rate transmission: {avg_short_transmission:.3f}")
print(f"Long-term rate transmission: {avg_long_transmission:.3f}")
return df, effectiveness
transmission_results, transmission_effectiveness = demonstrate_monetary_transmission()
3. Fiscal Policy Effect Assessment
3.1 Dynamic Estimation of Fiscal Multipliers
class FiscalMultiplierModel:
"""Fiscal multiplier dynamic estimation model"""
def __init__(self):
# State vector: [output_gap, fiscal_impulse, automatic_stabilizer, multiplier]
self.kf = KalmanFilter(dim_x=4, dim_z=2)
# State transition matrix
self.kf.F = np.array([
[0.8, 1.0, 0.5, 0.0], # Output gap
[0.0, 0.7, 0.0, 0.0], # Fiscal impulse
[-0.3, 0.0, 0.9, 0.0], # Automatic stabilizer
[0.0, 0.0, 0.0, 0.95] # Fiscal multiplier
])
# Observation matrix: observe output gap and fiscal balance
self.kf.H = np.array([
[1.0, 0.0, 0.0, 0.0], # Output gap
[0.0, 1.0, 1.0, 0.0] # Fiscal balance = impulse + automatic stabilizer
])
# Process noise covariance
self.kf.Q = np.diag([0.01, 0.005, 0.002, 0.001])
# Observation noise covariance
self.kf.R = np.diag([0.005, 0.003])
# Initial state
self.kf.x = np.array([[0.0], [0.0], [0.0], [1.2]]) # Initial multiplier 1.2
self.kf.P = np.eye(4) * 0.01
# Historical data
self.fiscal_history = []
def implement_fiscal_policy(self, fiscal_stimulus, policy_type='spending'):
"""Implement fiscal policy"""
# Adjust transmission mechanism based on policy type
if policy_type == 'spending':
# Government spending directly affects output
self.kf.F[0, 1] = 1.0
elif policy_type == 'tax_cut':
# Tax cuts affect output through consumption, milder effect
self.kf.F[0, 1] = 0.7
elif policy_type == 'investment':
# Infrastructure investment, higher long-term multiplier
self.kf.F[0, 1] = 1.3
# Update fiscal impulse
self.kf.x[1, 0] = fiscal_stimulus
return {
'fiscal_impulse': fiscal_stimulus,
'policy_type': policy_type,
'expected_multiplier': self.kf.x[3, 0]
}
def update_fiscal_effects(self, output_gap_obs, fiscal_balance_obs,
economic_cycle='normal'):
"""Update fiscal policy effects"""
# Adjust multiplier based on economic cycle
if economic_cycle == 'recession':
# Higher multiplier during recession
self.kf.Q[3, 3] *= 1.5
elif economic_cycle == 'expansion':
# Lower multiplier during expansion
self.kf.Q[3, 3] *= 0.7
# Predict and update
self.kf.predict()
observations = np.array([output_gap_obs, fiscal_balance_obs])
self.kf.update(observations)
# Calculate instant multiplier
current_multiplier = self.kf.x[3, 0]
fiscal_impulse = self.kf.x[1, 0]
fiscal_state = {
'output_gap': self.kf.x[0, 0],
'fiscal_impulse': fiscal_impulse,
'automatic_stabilizer': self.kf.x[2, 0],
'fiscal_multiplier': current_multiplier,
'fiscal_contribution': fiscal_impulse * current_multiplier,
'economic_cycle': economic_cycle
}
self.fiscal_history.append(fiscal_state)
return fiscal_state
def estimate_multiplier_by_state(self):
"""Estimate multiplier by economic state"""
if len(self.fiscal_history) < 12:
return None
# Analyze multipliers under different economic states
recession_periods = [h for h in self.fiscal_history if h['economic_cycle'] == 'recession']
normal_periods = [h for h in self.fiscal_history if h['economic_cycle'] == 'normal']
expansion_periods = [h for h in self.fiscal_history if h['economic_cycle'] == 'expansion']
results = {}
for state, periods in [('recession', recession_periods),
('normal', normal_periods),
('expansion', expansion_periods)]:
if len(periods) > 0:
multipliers = [p['fiscal_multiplier'] for p in periods]
results[state] = {
'avg_multiplier': np.mean(multipliers),
'std_multiplier': np.std(multipliers),
'observations': len(periods)
}
return results
def forecast_fiscal_impact(self, planned_policies, horizons=8):
"""Forecast fiscal policy impact"""
forecasts = []
# Save current state
x_current = self.kf.x.copy()
for h in range(1, horizons + 1):
# Apply planned policies
if h <= len(planned_policies):
policy = planned_policies[h-1]
x_current[1, 0] = policy.get('fiscal_impulse', 0)
# Adjust based on policy type
if policy.get('type') == 'investment':
x_current[3, 0] *= 1.1 # Higher investment multiplier
# Predict state
x_forecast = self.kf.F @ x_current
x_current = x_forecast
forecasts.append({
'horizon': h,
'output_gap_forecast': x_forecast[0, 0],
'fiscal_multiplier_forecast': x_forecast[3, 0],
'cumulative_impact': np.sum([x_forecast[1, 0] * x_forecast[3, 0]
for _ in range(h)])
})
return forecasts
# Example: Fiscal policy effect assessment
def demonstrate_fiscal_policy_analysis():
print("Starting fiscal policy effect assessment...")
# Create fiscal model
fiscal_model = FiscalMultiplierModel()
# Simulate economic cycle and fiscal policy
np.random.seed(42)
n_quarters = 32 # 8 years of quarterly data
results = []
# Define economic cycle
cycle_phases = ['expansion'] * 12 + ['recession'] * 6 + ['normal'] * 8 + ['expansion'] * 6
# Fiscal policy events
fiscal_events = [
{'quarter': 8, 'stimulus': 0.02, 'type': 'tax_cut'}, # Tax cut
{'quarter': 12, 'stimulus': 0.05, 'type': 'spending'}, # Crisis spending
{'quarter': 14, 'stimulus': 0.03, 'type': 'investment'}, # Infrastructure investment
{'quarter': 24, 'stimulus': -0.02, 'type': 'consolidation'} # Fiscal consolidation
]
for t in range(n_quarters):
# Economic cycle
cycle_phase = cycle_phases[t] if t < len(cycle_phases) else 'normal'
# Check fiscal policy events
fiscal_stimulus = 0
policy_type = 'none'
for event in fiscal_events:
if t == event['quarter']:
fiscal_stimulus = event['stimulus']
policy_type = event['type']
fiscal_model.implement_fiscal_policy(fiscal_stimulus, policy_type)
# Generate observation data
# Base output gap
if cycle_phase == 'recession':
base_output_gap = -0.03 + 0.01 * np.random.normal()
elif cycle_phase == 'expansion':
base_output_gap = 0.02 + 0.005 * np.random.normal()
else:
base_output_gap = 0.005 * np.random.normal()
# Fiscal balance (including cyclical factors)
cyclical_balance = -0.3 * base_output_gap # Automatic stabilizer
fiscal_balance = fiscal_stimulus + cyclical_balance + 0.005 * np.random.normal()
# Update model
fiscal_state = fiscal_model.update_fiscal_effects(
base_output_gap, fiscal_balance, cycle_phase
)
result = {
'quarter': t,
'cycle_phase': cycle_phase,
'policy_implemented': policy_type != 'none',
'policy_type': policy_type,
**fiscal_state
}
results.append(result)
# Convert to DataFrame
df = pd.DataFrame(results)
# Estimate multiplier by state
multiplier_by_state = fiscal_model.estimate_multiplier_by_state()
# Future policy planning
planned_policies = [
{'fiscal_impulse': 0.01, 'type': 'investment'},
{'fiscal_impulse': 0.015, 'type': 'investment'},
{'fiscal_impulse': 0.01, 'type': 'spending'},
{'fiscal_impulse': 0.005, 'type': 'spending'}
]
forecasts = fiscal_model.forecast_fiscal_impact(planned_policies)
forecast_df = pd.DataFrame(forecasts)
# Plot analysis
fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(15, 12))
# Output gap and fiscal impulse
ax1.plot(df['quarter'], df['output_gap'], label='Output Gap', alpha=0.8)
ax1.plot(df['quarter'], df['fiscal_impulse'], label='Fiscal Impulse', alpha=0.8)
# Mark different economic cycles
for i, phase in enumerate(df['cycle_phase']):
color = {'recession': 'red', 'expansion': 'green', 'normal': 'blue'}[phase]
ax1.axvspan(i-0.4, i+0.4, alpha=0.1, color=color)
ax1.set_title('Output Gap and Fiscal Policy')
ax1.set_ylabel('Ratio')
ax1.legend()
ax1.grid(True, alpha=0.3)
# Fiscal multiplier changes
ax2.plot(df['quarter'], df['fiscal_multiplier'], label='Fiscal Multiplier', alpha=0.8)
# Add average multiplier lines
if multiplier_by_state:
for state, info in multiplier_by_state.items():
ax2.axhline(y=info['avg_multiplier'],
linestyle='--', alpha=0.6,
label=f'{state} avg: {info["avg_multiplier"]:.2f}')
ax2.set_title('Dynamic Fiscal Multiplier')
ax2.set_ylabel('Multiplier')
ax2.legend()
ax2.grid(True, alpha=0.3)
# Fiscal contribution decomposition
ax3.plot(df['quarter'], df['fiscal_contribution'], label='Fiscal Contribution', alpha=0.8)
ax3.plot(df['quarter'], df['automatic_stabilizer'], label='Automatic Stabilizer', alpha=0.8)
ax3.axhline(y=0, color='black', linestyle='--', alpha=0.5)
ax3.set_title('Fiscal Policy Contribution to Growth')
ax3.set_ylabel('Contribution')
ax3.legend()
ax3.grid(True, alpha=0.3)
# Policy effect forecast
forecast_quarters = np.arange(len(df), len(df) + len(forecast_df))
ax4.plot(forecast_quarters, forecast_df['output_gap_forecast'],
label='Output Gap Forecast', alpha=0.8)
ax4.plot(forecast_quarters, forecast_df['cumulative_impact'],
label='Cumulative Fiscal Impact', alpha=0.8)
ax4.set_title('Fiscal Policy Effect Forecast')
ax4.set_ylabel('Impact')
ax4.set_xlabel('Quarter')
ax4.legend()
ax4.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
# Analysis results
print(f"\nFiscal policy effect analysis:")
print(f"Current output gap: {df['output_gap'].iloc[-1]:.3f}")
print(f"Current fiscal multiplier: {df['fiscal_multiplier'].iloc[-1]:.2f}")
print(f"Current fiscal contribution: {df['fiscal_contribution'].iloc[-1]:.3f}")
if multiplier_by_state:
print(f"\nFiscal multipliers by economic state:")
for state, info in multiplier_by_state.items():
print(f"{state}: {info['avg_multiplier']:.2f} ± {info['std_multiplier']:.2f} "
f"(observations: {info['observations']})")
print(f"\nExpected effects of planned policies:")
total_impact = forecast_df['cumulative_impact'].iloc[-1]
avg_multiplier = forecast_df['fiscal_multiplier_forecast'].mean()
print(f"Cumulative impact: {total_impact:.3f}")
print(f"Average expected multiplier: {avg_multiplier:.2f}")
return df, forecast_df, multiplier_by_state
fiscal_results, fiscal_forecasts, multiplier_analysis = demonstrate_fiscal_policy_analysis()
Chapter Summary
This chapter explored in depth the application of Kalman filters in macroeconomic modeling and policy analysis:
-
Macroeconomic Modeling:
- Dynamic state-space models for economic growth
- Inflation dynamics and component decomposition
- Real-time identification of business cycles
-
Monetary Policy Analysis:
- Modeling interest rate transmission mechanisms
- Assessment of policy effectiveness
- Analysis of transmission lags
-
Fiscal Policy Assessment:
- Dynamic fiscal multiplier estimation
- Analysis of policy type differences
- Economic cycle dependence
-
Policy Tool Applications:
- Real-time policy effect monitoring
- Policy shock identification
- Forward-looking policy simulation
These applications demonstrate the powerful capabilities of Kalman filters in macroeconomic analysis, particularly in handling multivariate dynamic systems, quantifying policy effects, and economic forecasting.
Next Chapter Preview: Chapter 16 will cover “Modern Developments and Practical Applications in Financial Econometrics,” which is the final chapter of the course. It will comprehensively apply all the techniques learned previously and explore the latest developments and practical application cases of Kalman filters.