Chapter 9: System Modeling and Parameter Tuning
Haiyue
25min
Chapter 9: System Modeling and Parameter Tuning
Learning Objectives
- Learn to establish appropriate state-space models
- Master methods for selecting and tuning noise covariance matrices
- Understand model validation and performance evaluation metrics
State-Space Model Design
Model Selection Principles
Choosing an appropriate state-space model is crucial for successful Kalman filtering:
Model Design Considerations
- Physical Meaning: State variables should have clear economic or financial interpretations
- Observability: The system must be observable
- Controllability: The system should be controllable (if there are control inputs)
- Simplicity: Avoid over-complication, follow Occam’s razor principle
- Computational Efficiency: Consider computational constraints for real-time applications
Observability Analysis
import numpy as np
from scipy.linalg import matrix_rank
import matplotlib.pyplot as plt
def check_observability(F, H):
"""
Check system observability
Parameters:
F: State transition matrix
H: Observation matrix
Returns:
is_observable: Boolean indicating if system is observable
obs_matrix: Observability matrix
"""
n = F.shape[0] # State dimension
# Build observability matrix
obs_matrix = H.copy()
H_F_power = H.copy()
for i in range(1, n):
H_F_power = H_F_power @ F
obs_matrix = np.vstack([obs_matrix, H_F_power])
# Check matrix rank
rank = matrix_rank(obs_matrix)
is_observable = (rank == n)
return is_observable, obs_matrix
def observability_example():
"""
Observability analysis example
"""
# Example 1: Observable system
F1 = np.array([[1, 1], [0, 1]]) # Constant velocity model
H1 = np.array([[1, 0]]) # Only observe position
obs1, obs_matrix1 = check_observability(F1, H1)
print(f"System 1 observability: {obs1}")
print(f"Observability matrix rank: {matrix_rank(obs_matrix1)}")
# Example 2: Unobservable system
F2 = np.array([[1, 1], [0, 1]])
H2 = np.array([[0, 1]]) # Only observe velocity
obs2, obs_matrix2 = check_observability(F2, H2)
print(f"System 2 observability: {obs2}")
print(f"Observability matrix rank: {matrix_rank(obs_matrix2)}")
# observability_example()
State Variable Selection
In financial modeling, state variable selection is crucial:
class ModelDesignFramework:
"""
Model design framework
"""
def __init__(self):
self.model_candidates = {}
self.evaluation_metrics = {}
def add_model_candidate(self, name, state_vars, obs_vars, F, H, Q, R):
"""
Add model candidate
Parameters:
name: Model name
state_vars: State variable description list
obs_vars: Observation variable description list
F, H, Q, R: Model matrices
"""
self.model_candidates[name] = {
'state_vars': state_vars,
'obs_vars': obs_vars,
'F': F,
'H': H,
'Q': Q,
'R': R,
'dim_x': F.shape[0],
'dim_z': H.shape[0]
}
def evaluate_model_complexity(self, name):
"""
Evaluate model complexity
"""
model = self.model_candidates[name]
# Number of parameters
n_params = (model['dim_x']**2 + # F matrix
model['dim_x'] * model['dim_z'] + # H matrix
model['dim_x'] * (model['dim_x'] + 1) // 2 + # Q matrix (symmetric)
model['dim_z'] * (model['dim_z'] + 1) // 2) # R matrix (symmetric)
# Computational complexity (per step)
comp_predict = model['dim_x']**3 # Covariance prediction
comp_update = (model['dim_z']**3 +
model['dim_x'] * model['dim_z']**2) # Kalman gain and update
complexity = {
'n_parameters': n_params,
'comp_predict': comp_predict,
'comp_update': comp_update,
'total_per_step': comp_predict + comp_update
}
return complexity
# Example: Stock price models with different complexities
def stock_price_model_comparison():
"""
Compare stock price models with different complexities
"""
framework = ModelDesignFramework()
# Model 1: Simple random walk
F1 = np.array([[1]])
H1 = np.array([[1]])
Q1 = np.array([[0.01]])
R1 = np.array([[0.001]])
framework.add_model_candidate(
'Simple_RW',
['log_price'],
['log_price_obs'],
F1, H1, Q1, R1
)
# Model 2: Random walk with drift
F2 = np.array([[1, 1], [0, 1]])
H2 = np.array([[1, 0]])
Q2 = np.array([[0.01, 0], [0, 0.0001]])
R2 = np.array([[0.001]])
framework.add_model_candidate(
'RW_with_Drift',
['log_price', 'drift'],
['log_price_obs'],
F2, H2, Q2, R2
)
# Model 3: Stochastic volatility model
F3 = np.array([[1, 0], [0, 0.99]])
H3 = np.array([[1, 0]])
Q3 = np.array([[0, 0], [0, 0.01]])
R3 = np.array([[0.001]])
framework.add_model_candidate(
'Stochastic_Vol',
['log_price', 'log_variance'],
['log_price_obs'],
F3, H3, Q3, R3
)
# Evaluate complexity
for name in framework.model_candidates:
complexity = framework.evaluate_model_complexity(name)
print(f"\nModel {name}:")
print(f" Number of parameters: {complexity['n_parameters']}")
print(f" Computational complexity per step: {complexity['total_per_step']}")
# stock_price_model_comparison()
Noise Covariance Tuning
Covariance Matrix Selection Principles
The setting of noise covariance matrices directly affects filtering performance:
warning Covariance Setting Guidelines
- Process noise Q: Reflects model uncertainty
- Q too small: Filter over-confident, poor adaptability
- Q too large: Filter unstable, noisy estimates
- Observation noise R: Reflects measurement uncertainty
- R too small: Over-trust observations, may be misled by noise
- R too large: Ignore observation information, poor filtering effect
Parameter Tuning Methods
class ParameterOptimizer:
"""
Kalman filter parameter optimizer
"""
def __init__(self, kf_class, data):
self.kf_class = kf_class
self.data = data
self.optimization_history = []
def objective_function(self, params, method='likelihood'):
"""
Objective function: Maximum likelihood or minimum prediction error
Parameters:
params: Parameter vector [Q_diag, R_diag]
method: 'likelihood' or 'prediction_error'
"""
try:
# Parse parameters
n_q = self.kf_class.dim_x
n_r = self.kf_class.dim_z
Q_diag = params[:n_q]
R_diag = params[n_q:n_q+n_r]
# Ensure positive definiteness
Q = np.diag(np.maximum(Q_diag, 1e-8))
R = np.diag(np.maximum(R_diag, 1e-8))
# Create filter
kf = self.kf_class()
kf.Q = Q
kf.R = R
if method == 'likelihood':
return self._negative_log_likelihood(kf)
else:
return self._prediction_error(kf)
except Exception as e:
return 1e10 # Return large value to indicate failure
def _negative_log_likelihood(self, kf):
"""
Calculate negative log-likelihood
"""
log_likelihood = 0
for obs in self.data:
kf.predict()
kf.update(obs)
# Accumulate log-likelihood
if hasattr(kf, 'log_likelihood'):
log_likelihood += kf.log_likelihood
return -log_likelihood
def _prediction_error(self, kf):
"""
Calculate prediction error
"""
prediction_errors = []
for i, obs in enumerate(self.data):
if i > 0:
# Predict
kf.predict()
predicted_obs = kf.H @ kf.x
# Calculate prediction error
error = np.linalg.norm(obs - predicted_obs.flatten())
prediction_errors.append(error)
# Update
kf.predict()
kf.update(obs)
return np.mean(prediction_errors)
def optimize(self, initial_params=None, method='likelihood'):
"""
Optimize parameters
"""
from scipy.optimize import minimize
if initial_params is None:
# Default initial parameters
initial_params = np.concatenate([
0.01 * np.ones(self.kf_class.dim_x), # Q diagonal elements
0.01 * np.ones(self.kf_class.dim_z) # R diagonal elements
])
# Parameter bounds
bounds = [(1e-8, 1.0) for _ in initial_params]
# Optimize
result = minimize(
self.objective_function,
initial_params,
args=(method,),
bounds=bounds,
method='L-BFGS-B'
)
return result
def parameter_optimization_example():
"""
Parameter optimization example
"""
# Generate simulated data
np.random.seed(42)
T = 100
true_Q = 0.01
true_R = 0.005
# Simulate random walk process
true_states = [0]
observations = []
for t in range(1, T):
# True state evolution
true_states.append(true_states[-1] + np.random.normal(0, np.sqrt(true_Q)))
# Observation
obs = true_states[-1] + np.random.normal(0, np.sqrt(true_R))
observations.append(obs)
# Define simple Kalman filter class
class SimpleKF:
def __init__(self):
self.dim_x = 1
self.dim_z = 1
self.x = np.array([[0]])
self.P = np.array([[1]])
self.F = np.array([[1]])
self.H = np.array([[1]])
self.Q = np.array([[0.01]])
self.R = np.array([[0.01]])
self.log_likelihood = 0
def predict(self):
self.x = self.F @ self.x
self.P = self.F @ self.P @ self.F.T + self.Q
def update(self, z):
z = np.array([[z]])
y = z - self.H @ self.x
S = self.H @ self.P @ self.H.T + self.R
K = self.P @ self.H.T / S
self.x = self.x + K * y
self.P = (np.eye(1) - K @ self.H) @ self.P
# Calculate log-likelihood
self.log_likelihood = -0.5 * (np.log(2 * np.pi * S) + y**2 / S)
# Parameter optimization
optimizer = ParameterOptimizer(SimpleKF, observations)
result = optimizer.optimize(method='likelihood')
optimal_Q = result.x[0]
optimal_R = result.x[1]
print(f"True parameters: Q={true_Q}, R={true_R}")
print(f"Optimized parameters: Q={optimal_Q:.6f}, R={optimal_R:.6f}")
print(f"Optimization successful: {result.success}")
# parameter_optimization_example()
Adaptive Noise Estimation
class AdaptiveNoiseEstimator:
"""
Adaptive noise covariance estimation
"""
def __init__(self, window_size=50):
self.window_size = window_size
self.innovation_history = []
self.residual_history = []
def update_noise_estimates(self, kf):
"""
Update noise estimates based on innovation sequence
"""
# Save innovation
if hasattr(kf, 'y') and kf.y is not None:
self.innovation_history.append(kf.y.copy())
# Maintain window size
if len(self.innovation_history) > self.window_size:
self.innovation_history.pop(0)
# If enough historical data, update noise estimate
if len(self.innovation_history) >= self.window_size:
# Estimate observation noise
innovations = np.array(self.innovation_history)
# Theoretically, innovation covariance should equal H*P*H' + R
# We can use this relationship to estimate R
empirical_cov = np.cov(innovations, rowvar=False)
theoretical_cov = kf.H @ kf.P @ kf.H.T
# Update R (simple method)
R_new = empirical_cov - theoretical_cov
R_new = np.maximum(R_new, 0.001 * np.eye(kf.dim_z)) # Ensure positive definiteness
# Smooth update
alpha = 0.1
kf.R = (1 - alpha) * kf.R + alpha * R_new
def adaptive_Q_estimation(self, kf, prediction_errors):
"""
Adaptive process noise estimation
"""
if len(prediction_errors) >= self.window_size:
recent_errors = prediction_errors[-self.window_size:]
# If prediction error increases, increase Q
if np.mean(recent_errors) > np.std(recent_errors):
kf.Q *= 1.1
else:
kf.Q *= 0.99
# Ensure minimum value of Q
kf.Q = np.maximum(kf.Q, 1e-6 * np.eye(kf.dim_x))
class AdaptiveKalmanFilter:
"""
Kalman filter with adaptive noise estimation
"""
def __init__(self, dim_x, dim_z):
self.dim_x = dim_x
self.dim_z = dim_z
# Initialize Kalman filter parameters
self.x = np.zeros((dim_x, 1))
self.P = np.eye(dim_x)
self.F = np.eye(dim_x)
self.H = np.eye(dim_z, dim_x)
self.Q = 0.01 * np.eye(dim_x)
self.R = 0.01 * np.eye(dim_z)
# Adaptive estimator
self.noise_estimator = AdaptiveNoiseEstimator()
self.prediction_errors = []
def predict(self):
"""Prediction step"""
self.x = self.F @ self.x
self.P = self.F @ self.P @ self.F.T + self.Q
def update(self, z):
"""Update step"""
z = z.reshape(-1, 1)
# Standard Kalman update
self.y = z - self.H @ self.x
S = self.H @ self.P @ self.H.T + self.R
K = self.P @ self.H.T @ np.linalg.inv(S)
self.x = self.x + K @ self.y
self.P = (np.eye(self.dim_x) - K @ self.H) @ self.P
# Record prediction error
pred_error = np.linalg.norm(self.y)
self.prediction_errors.append(pred_error)
# Adaptive noise estimation
self.noise_estimator.update_noise_estimates(self)
self.noise_estimator.adaptive_Q_estimation(self, self.prediction_errors)
def adaptive_filter_example():
"""
Adaptive filter example
"""
# Generate data with time-varying noise
np.random.seed(42)
T = 200
true_states = [0]
observations = []
for t in range(1, T):
# Time-varying process noise
if t < 50:
Q_t = 0.001
elif t < 100:
Q_t = 0.01 # Noise suddenly increases
elif t < 150:
Q_t = 0.1 # Noise increases further
else:
Q_t = 0.001 # Noise returns to small value
# State evolution
true_states.append(true_states[-1] + np.random.normal(0, np.sqrt(Q_t)))
# Observation (fixed noise)
obs = true_states[-1] + np.random.normal(0, 0.1)
observations.append(obs)
# Standard Kalman filter
standard_kf = AdaptiveKalmanFilter(1, 1)
standard_kf.Q = np.array([[0.01]]) # Fixed Q
standard_estimates = []
# Adaptive Kalman filter
adaptive_kf = AdaptiveKalmanFilter(1, 1)
adaptive_estimates = []
Q_history = []
for obs in observations:
# Standard filter
standard_kf.predict()
standard_kf.update(np.array([obs]))
standard_estimates.append(standard_kf.x[0, 0])
# Adaptive filter
adaptive_kf.predict()
adaptive_kf.update(np.array([obs]))
adaptive_estimates.append(adaptive_kf.x[0, 0])
Q_history.append(adaptive_kf.Q[0, 0])
# Plot results
plt.figure(figsize=(15, 12))
plt.subplot(3, 1, 1)
plt.plot(true_states[1:], 'g-', label='True State', linewidth=2)
plt.plot(observations, 'r.', label='Observations', alpha=0.5)
plt.plot(standard_estimates, 'b--', label='Standard KF', linewidth=2)
plt.plot(adaptive_estimates, 'orange', label='Adaptive KF', linewidth=2)
plt.legend()
plt.title('Adaptive vs Standard Kalman Filter')
plt.ylabel('State Value')
plt.subplot(3, 1, 2)
standard_errors = np.abs(np.array(true_states[1:]) - np.array(standard_estimates))
adaptive_errors = np.abs(np.array(true_states[1:]) - np.array(adaptive_estimates))
plt.plot(standard_errors, 'b-', label='Standard KF Error')
plt.plot(adaptive_errors, 'orange', label='Adaptive KF Error')
plt.legend()
plt.ylabel('Absolute Error')
plt.subplot(3, 1, 3)
plt.plot(Q_history, 'purple', label='Adaptive Q Estimate')
plt.axhline(y=0.01, color='b', linestyle='--', label='Standard Q Value')
plt.legend()
plt.xlabel('Time Step')
plt.ylabel('Q Value')
plt.tight_layout()
plt.show()
# Calculate performance metrics
standard_rmse = np.sqrt(np.mean(standard_errors**2))
adaptive_rmse = np.sqrt(np.mean(adaptive_errors**2))
print(f"Standard KF RMSE: {standard_rmse:.6f}")
print(f"Adaptive KF RMSE: {adaptive_rmse:.6f}")
print(f"Improvement: {(standard_rmse - adaptive_rmse) / standard_rmse * 100:.2f}%")
# adaptive_filter_example()
Model Validation Techniques
Residual Analysis
class ModelValidation:
"""
Model validation tools
"""
def __init__(self):
self.validation_results = {}
def innovation_analysis(self, innovations, S_matrices):
"""
Innovation sequence analysis
"""
innovations = np.array(innovations)
# Standardized innovations
standardized_innovations = []
for i, (innov, S) in enumerate(zip(innovations, S_matrices)):
std_innov = innov / np.sqrt(np.diag(S))
standardized_innovations.append(std_innov)
standardized_innovations = np.array(standardized_innovations)
# Statistical tests
results = {
'mean': np.mean(standardized_innovations, axis=0),
'std': np.std(standardized_innovations, axis=0),
'skewness': self._compute_skewness(standardized_innovations),
'kurtosis': self._compute_kurtosis(standardized_innovations),
'ljung_box_p': self._ljung_box_test(standardized_innovations),
'normality_p': self._normality_test(standardized_innovations)
}
return results
def _compute_skewness(self, data):
"""Calculate skewness"""
from scipy.stats import skew
return skew(data, axis=0)
def _compute_kurtosis(self, data):
"""Calculate kurtosis"""
from scipy.stats import kurtosis
return kurtosis(data, axis=0)
def _ljung_box_test(self, data):
"""Ljung-Box serial correlation test"""
from scipy.stats import jarque_bera
# Simplified implementation, should use specialized test in practice
p_values = []
for col in range(data.shape[1]):
_, p_val = jarque_bera(data[:, col])
p_values.append(p_val)
return p_values
def _normality_test(self, data):
"""Normality test"""
from scipy.stats import jarque_bera
p_values = []
for col in range(data.shape[1]):
_, p_val = jarque_bera(data[:, col])
p_values.append(p_val)
return p_values
def cross_validation(self, kf_class, data, n_folds=5):
"""
Cross-validation
"""
n_samples = len(data)
fold_size = n_samples // n_folds
cv_scores = []
for fold in range(n_folds):
# Split data
start_idx = fold * fold_size
end_idx = (fold + 1) * fold_size if fold < n_folds - 1 else n_samples
test_data = data[start_idx:end_idx]
train_data = np.concatenate([data[:start_idx], data[end_idx:]])
# Train on training data
kf = kf_class()
# Parameter estimation process should be included here
# Evaluate on test data
predictions = []
actuals = []
for obs in test_data:
kf.predict()
pred = kf.H @ kf.x
predictions.append(pred.flatten())
kf.update(obs)
actuals.append(obs)
# Calculate score
mse = np.mean((np.array(actuals) - np.array(predictions))**2)
cv_scores.append(mse)
return {
'cv_scores': cv_scores,
'mean_cv_score': np.mean(cv_scores),
'std_cv_score': np.std(cv_scores)
}
def model_validation_example():
"""
Model validation example
"""
# Generate test data
np.random.seed(42)
T = 200
# Simple random walk
true_states = [0]
observations = []
for t in range(1, T):
true_states.append(true_states[-1] + np.random.normal(0, 0.1))
obs = true_states[-1] + np.random.normal(0, 0.05)
observations.append(obs)
# Run filter and collect innovations
class TestKF:
def __init__(self):
self.x = np.array([[0]])
self.P = np.array([[1]])
self.F = np.array([[1]])
self.H = np.array([[1]])
self.Q = np.array([[0.01]])
self.R = np.array([[0.0025]])
self.y = None
self.S = None
def predict(self):
self.x = self.F @ self.x
self.P = self.F @ self.P @ self.F.T + self.Q
def update(self, z):
z = np.array([[z]])
self.y = z - self.H @ self.x
self.S = self.H @ self.P @ self.H.T + self.R
K = self.P @ self.H.T / self.S
self.x = self.x + K * self.y
self.P = (np.eye(1) - K @ self.H) @ self.P
# Run filter and collect data
kf = TestKF()
innovations = []
S_matrices = []
for obs in observations:
kf.predict()
kf.update(obs)
if kf.y is not None:
innovations.append(kf.y.copy())
S_matrices.append(kf.S.copy())
# Validation analysis
validator = ModelValidation()
innovation_results = validator.innovation_analysis(innovations, S_matrices)
print("Innovation sequence analysis results:")
print(f"Mean: {innovation_results['mean']}")
print(f"Standard deviation: {innovation_results['std']}")
print(f"Skewness: {innovation_results['skewness']}")
print(f"Kurtosis: {innovation_results['kurtosis']}")
print(f"Normality test p-value: {innovation_results['normality_p']}")
# Plot innovation sequence
innovations_flat = [inn[0, 0] for inn in innovations]
plt.figure(figsize=(12, 8))
plt.subplot(2, 2, 1)
plt.plot(innovations_flat)
plt.title('Innovation Sequence')
plt.xlabel('Time')
plt.ylabel('Innovation Value')
plt.subplot(2, 2, 2)
plt.hist(innovations_flat, bins=30, density=True, alpha=0.7)
plt.title('Innovation Distribution')
plt.xlabel('Innovation Value')
plt.ylabel('Density')
plt.subplot(2, 2, 3)
from scipy.stats import probplot
probplot(innovations_flat, dist="norm", plot=plt)
plt.title('Q-Q Plot')
plt.subplot(2, 2, 4)
# ACF plot (simplified version)
lags = range(1, 21)
acf_values = []
for lag in lags:
if len(innovations_flat) > lag:
corr = np.corrcoef(innovations_flat[:-lag], innovations_flat[lag:])[0, 1]
acf_values.append(corr)
else:
acf_values.append(0)
plt.plot(lags, acf_values, 'bo-')
plt.axhline(y=0, color='k', linestyle='-')
plt.axhline(y=0.05, color='r', linestyle='--', alpha=0.5)
plt.axhline(y=-0.05, color='r', linestyle='--', alpha=0.5)
plt.title('Autocorrelation Function')
plt.xlabel('Lag')
plt.ylabel('Autocorrelation')
plt.tight_layout()
plt.show()
# model_validation_example()
Next Chapter Preview
In the next chapter, we will learn about the limitations and improvements of Kalman filtering, including robust filtering, adaptive algorithms, and solutions to filter divergence problems.