Chapter 9: System Modeling and Parameter Tuning

Haiyue
25min

Chapter 9: System Modeling and Parameter Tuning

Learning Objectives
  • Learn to establish appropriate state-space models
  • Master methods for selecting and tuning noise covariance matrices
  • Understand model validation and performance evaluation metrics

State-Space Model Design

Model Selection Principles

Choosing an appropriate state-space model is crucial for successful Kalman filtering:

Model Design Considerations
  1. Physical Meaning: State variables should have clear economic or financial interpretations
  2. Observability: The system must be observable
  3. Controllability: The system should be controllable (if there are control inputs)
  4. Simplicity: Avoid over-complication, follow Occam’s razor principle
  5. Computational Efficiency: Consider computational constraints for real-time applications

Observability Analysis

import numpy as np
from scipy.linalg import matrix_rank
import matplotlib.pyplot as plt

def check_observability(F, H):
    """
    Check system observability

    Parameters:
    F: State transition matrix
    H: Observation matrix

    Returns:
    is_observable: Boolean indicating if system is observable
    obs_matrix: Observability matrix
    """
    n = F.shape[0]  # State dimension

    # Build observability matrix
    obs_matrix = H.copy()
    H_F_power = H.copy()

    for i in range(1, n):
        H_F_power = H_F_power @ F
        obs_matrix = np.vstack([obs_matrix, H_F_power])

    # Check matrix rank
    rank = matrix_rank(obs_matrix)
    is_observable = (rank == n)

    return is_observable, obs_matrix

def observability_example():
    """
    Observability analysis example
    """
    # Example 1: Observable system
    F1 = np.array([[1, 1], [0, 1]])  # Constant velocity model
    H1 = np.array([[1, 0]])          # Only observe position

    obs1, obs_matrix1 = check_observability(F1, H1)
    print(f"System 1 observability: {obs1}")
    print(f"Observability matrix rank: {matrix_rank(obs_matrix1)}")

    # Example 2: Unobservable system
    F2 = np.array([[1, 1], [0, 1]])
    H2 = np.array([[0, 1]])          # Only observe velocity

    obs2, obs_matrix2 = check_observability(F2, H2)
    print(f"System 2 observability: {obs2}")
    print(f"Observability matrix rank: {matrix_rank(obs_matrix2)}")

# observability_example()

State Variable Selection

In financial modeling, state variable selection is crucial:

class ModelDesignFramework:
    """
    Model design framework
    """

    def __init__(self):
        self.model_candidates = {}
        self.evaluation_metrics = {}

    def add_model_candidate(self, name, state_vars, obs_vars, F, H, Q, R):
        """
        Add model candidate

        Parameters:
        name: Model name
        state_vars: State variable description list
        obs_vars: Observation variable description list
        F, H, Q, R: Model matrices
        """
        self.model_candidates[name] = {
            'state_vars': state_vars,
            'obs_vars': obs_vars,
            'F': F,
            'H': H,
            'Q': Q,
            'R': R,
            'dim_x': F.shape[0],
            'dim_z': H.shape[0]
        }

    def evaluate_model_complexity(self, name):
        """
        Evaluate model complexity
        """
        model = self.model_candidates[name]

        # Number of parameters
        n_params = (model['dim_x']**2 +  # F matrix
                   model['dim_x'] * model['dim_z'] +  # H matrix
                   model['dim_x'] * (model['dim_x'] + 1) // 2 +  # Q matrix (symmetric)
                   model['dim_z'] * (model['dim_z'] + 1) // 2)   # R matrix (symmetric)

        # Computational complexity (per step)
        comp_predict = model['dim_x']**3  # Covariance prediction
        comp_update = (model['dim_z']**3 +
                      model['dim_x'] * model['dim_z']**2)  # Kalman gain and update

        complexity = {
            'n_parameters': n_params,
            'comp_predict': comp_predict,
            'comp_update': comp_update,
            'total_per_step': comp_predict + comp_update
        }

        return complexity

# Example: Stock price models with different complexities
def stock_price_model_comparison():
    """
    Compare stock price models with different complexities
    """
    framework = ModelDesignFramework()

    # Model 1: Simple random walk
    F1 = np.array([[1]])
    H1 = np.array([[1]])
    Q1 = np.array([[0.01]])
    R1 = np.array([[0.001]])

    framework.add_model_candidate(
        'Simple_RW',
        ['log_price'],
        ['log_price_obs'],
        F1, H1, Q1, R1
    )

    # Model 2: Random walk with drift
    F2 = np.array([[1, 1], [0, 1]])
    H2 = np.array([[1, 0]])
    Q2 = np.array([[0.01, 0], [0, 0.0001]])
    R2 = np.array([[0.001]])

    framework.add_model_candidate(
        'RW_with_Drift',
        ['log_price', 'drift'],
        ['log_price_obs'],
        F2, H2, Q2, R2
    )

    # Model 3: Stochastic volatility model
    F3 = np.array([[1, 0], [0, 0.99]])
    H3 = np.array([[1, 0]])
    Q3 = np.array([[0, 0], [0, 0.01]])
    R3 = np.array([[0.001]])

    framework.add_model_candidate(
        'Stochastic_Vol',
        ['log_price', 'log_variance'],
        ['log_price_obs'],
        F3, H3, Q3, R3
    )

    # Evaluate complexity
    for name in framework.model_candidates:
        complexity = framework.evaluate_model_complexity(name)
        print(f"\nModel {name}:")
        print(f"  Number of parameters: {complexity['n_parameters']}")
        print(f"  Computational complexity per step: {complexity['total_per_step']}")

# stock_price_model_comparison()

Noise Covariance Tuning

Covariance Matrix Selection Principles

The setting of noise covariance matrices directly affects filtering performance:

warning Covariance Setting Guidelines

  • Process noise Q: Reflects model uncertainty
    • Q too small: Filter over-confident, poor adaptability
    • Q too large: Filter unstable, noisy estimates
  • Observation noise R: Reflects measurement uncertainty
    • R too small: Over-trust observations, may be misled by noise
    • R too large: Ignore observation information, poor filtering effect

Parameter Tuning Methods

class ParameterOptimizer:
    """
    Kalman filter parameter optimizer
    """

    def __init__(self, kf_class, data):
        self.kf_class = kf_class
        self.data = data
        self.optimization_history = []

    def objective_function(self, params, method='likelihood'):
        """
        Objective function: Maximum likelihood or minimum prediction error

        Parameters:
        params: Parameter vector [Q_diag, R_diag]
        method: 'likelihood' or 'prediction_error'
        """
        try:
            # Parse parameters
            n_q = self.kf_class.dim_x
            n_r = self.kf_class.dim_z

            Q_diag = params[:n_q]
            R_diag = params[n_q:n_q+n_r]

            # Ensure positive definiteness
            Q = np.diag(np.maximum(Q_diag, 1e-8))
            R = np.diag(np.maximum(R_diag, 1e-8))

            # Create filter
            kf = self.kf_class()
            kf.Q = Q
            kf.R = R

            if method == 'likelihood':
                return self._negative_log_likelihood(kf)
            else:
                return self._prediction_error(kf)

        except Exception as e:
            return 1e10  # Return large value to indicate failure

    def _negative_log_likelihood(self, kf):
        """
        Calculate negative log-likelihood
        """
        log_likelihood = 0

        for obs in self.data:
            kf.predict()
            kf.update(obs)

            # Accumulate log-likelihood
            if hasattr(kf, 'log_likelihood'):
                log_likelihood += kf.log_likelihood

        return -log_likelihood

    def _prediction_error(self, kf):
        """
        Calculate prediction error
        """
        prediction_errors = []

        for i, obs in enumerate(self.data):
            if i > 0:
                # Predict
                kf.predict()
                predicted_obs = kf.H @ kf.x

                # Calculate prediction error
                error = np.linalg.norm(obs - predicted_obs.flatten())
                prediction_errors.append(error)

            # Update
            kf.predict()
            kf.update(obs)

        return np.mean(prediction_errors)

    def optimize(self, initial_params=None, method='likelihood'):
        """
        Optimize parameters
        """
        from scipy.optimize import minimize

        if initial_params is None:
            # Default initial parameters
            initial_params = np.concatenate([
                0.01 * np.ones(self.kf_class.dim_x),  # Q diagonal elements
                0.01 * np.ones(self.kf_class.dim_z)   # R diagonal elements
            ])

        # Parameter bounds
        bounds = [(1e-8, 1.0) for _ in initial_params]

        # Optimize
        result = minimize(
            self.objective_function,
            initial_params,
            args=(method,),
            bounds=bounds,
            method='L-BFGS-B'
        )

        return result

def parameter_optimization_example():
    """
    Parameter optimization example
    """
    # Generate simulated data
    np.random.seed(42)
    T = 100
    true_Q = 0.01
    true_R = 0.005

    # Simulate random walk process
    true_states = [0]
    observations = []

    for t in range(1, T):
        # True state evolution
        true_states.append(true_states[-1] + np.random.normal(0, np.sqrt(true_Q)))

        # Observation
        obs = true_states[-1] + np.random.normal(0, np.sqrt(true_R))
        observations.append(obs)

    # Define simple Kalman filter class
    class SimpleKF:
        def __init__(self):
            self.dim_x = 1
            self.dim_z = 1
            self.x = np.array([[0]])
            self.P = np.array([[1]])
            self.F = np.array([[1]])
            self.H = np.array([[1]])
            self.Q = np.array([[0.01]])
            self.R = np.array([[0.01]])
            self.log_likelihood = 0

        def predict(self):
            self.x = self.F @ self.x
            self.P = self.F @ self.P @ self.F.T + self.Q

        def update(self, z):
            z = np.array([[z]])
            y = z - self.H @ self.x
            S = self.H @ self.P @ self.H.T + self.R
            K = self.P @ self.H.T / S

            self.x = self.x + K * y
            self.P = (np.eye(1) - K @ self.H) @ self.P

            # Calculate log-likelihood
            self.log_likelihood = -0.5 * (np.log(2 * np.pi * S) + y**2 / S)

    # Parameter optimization
    optimizer = ParameterOptimizer(SimpleKF, observations)
    result = optimizer.optimize(method='likelihood')

    optimal_Q = result.x[0]
    optimal_R = result.x[1]

    print(f"True parameters: Q={true_Q}, R={true_R}")
    print(f"Optimized parameters: Q={optimal_Q:.6f}, R={optimal_R:.6f}")
    print(f"Optimization successful: {result.success}")

# parameter_optimization_example()

Adaptive Noise Estimation

class AdaptiveNoiseEstimator:
    """
    Adaptive noise covariance estimation
    """

    def __init__(self, window_size=50):
        self.window_size = window_size
        self.innovation_history = []
        self.residual_history = []

    def update_noise_estimates(self, kf):
        """
        Update noise estimates based on innovation sequence
        """
        # Save innovation
        if hasattr(kf, 'y') and kf.y is not None:
            self.innovation_history.append(kf.y.copy())

        # Maintain window size
        if len(self.innovation_history) > self.window_size:
            self.innovation_history.pop(0)

        # If enough historical data, update noise estimate
        if len(self.innovation_history) >= self.window_size:
            # Estimate observation noise
            innovations = np.array(self.innovation_history)

            # Theoretically, innovation covariance should equal H*P*H' + R
            # We can use this relationship to estimate R
            empirical_cov = np.cov(innovations, rowvar=False)
            theoretical_cov = kf.H @ kf.P @ kf.H.T

            # Update R (simple method)
            R_new = empirical_cov - theoretical_cov
            R_new = np.maximum(R_new, 0.001 * np.eye(kf.dim_z))  # Ensure positive definiteness

            # Smooth update
            alpha = 0.1
            kf.R = (1 - alpha) * kf.R + alpha * R_new

    def adaptive_Q_estimation(self, kf, prediction_errors):
        """
        Adaptive process noise estimation
        """
        if len(prediction_errors) >= self.window_size:
            recent_errors = prediction_errors[-self.window_size:]

            # If prediction error increases, increase Q
            if np.mean(recent_errors) > np.std(recent_errors):
                kf.Q *= 1.1
            else:
                kf.Q *= 0.99

            # Ensure minimum value of Q
            kf.Q = np.maximum(kf.Q, 1e-6 * np.eye(kf.dim_x))

class AdaptiveKalmanFilter:
    """
    Kalman filter with adaptive noise estimation
    """

    def __init__(self, dim_x, dim_z):
        self.dim_x = dim_x
        self.dim_z = dim_z

        # Initialize Kalman filter parameters
        self.x = np.zeros((dim_x, 1))
        self.P = np.eye(dim_x)
        self.F = np.eye(dim_x)
        self.H = np.eye(dim_z, dim_x)
        self.Q = 0.01 * np.eye(dim_x)
        self.R = 0.01 * np.eye(dim_z)

        # Adaptive estimator
        self.noise_estimator = AdaptiveNoiseEstimator()
        self.prediction_errors = []

    def predict(self):
        """Prediction step"""
        self.x = self.F @ self.x
        self.P = self.F @ self.P @ self.F.T + self.Q

    def update(self, z):
        """Update step"""
        z = z.reshape(-1, 1)

        # Standard Kalman update
        self.y = z - self.H @ self.x
        S = self.H @ self.P @ self.H.T + self.R
        K = self.P @ self.H.T @ np.linalg.inv(S)

        self.x = self.x + K @ self.y
        self.P = (np.eye(self.dim_x) - K @ self.H) @ self.P

        # Record prediction error
        pred_error = np.linalg.norm(self.y)
        self.prediction_errors.append(pred_error)

        # Adaptive noise estimation
        self.noise_estimator.update_noise_estimates(self)
        self.noise_estimator.adaptive_Q_estimation(self, self.prediction_errors)

def adaptive_filter_example():
    """
    Adaptive filter example
    """
    # Generate data with time-varying noise
    np.random.seed(42)
    T = 200

    true_states = [0]
    observations = []

    for t in range(1, T):
        # Time-varying process noise
        if t < 50:
            Q_t = 0.001
        elif t < 100:
            Q_t = 0.01  # Noise suddenly increases
        elif t < 150:
            Q_t = 0.1   # Noise increases further
        else:
            Q_t = 0.001  # Noise returns to small value

        # State evolution
        true_states.append(true_states[-1] + np.random.normal(0, np.sqrt(Q_t)))

        # Observation (fixed noise)
        obs = true_states[-1] + np.random.normal(0, 0.1)
        observations.append(obs)

    # Standard Kalman filter
    standard_kf = AdaptiveKalmanFilter(1, 1)
    standard_kf.Q = np.array([[0.01]])  # Fixed Q
    standard_estimates = []

    # Adaptive Kalman filter
    adaptive_kf = AdaptiveKalmanFilter(1, 1)
    adaptive_estimates = []
    Q_history = []

    for obs in observations:
        # Standard filter
        standard_kf.predict()
        standard_kf.update(np.array([obs]))
        standard_estimates.append(standard_kf.x[0, 0])

        # Adaptive filter
        adaptive_kf.predict()
        adaptive_kf.update(np.array([obs]))
        adaptive_estimates.append(adaptive_kf.x[0, 0])
        Q_history.append(adaptive_kf.Q[0, 0])

    # Plot results
    plt.figure(figsize=(15, 12))

    plt.subplot(3, 1, 1)
    plt.plot(true_states[1:], 'g-', label='True State', linewidth=2)
    plt.plot(observations, 'r.', label='Observations', alpha=0.5)
    plt.plot(standard_estimates, 'b--', label='Standard KF', linewidth=2)
    plt.plot(adaptive_estimates, 'orange', label='Adaptive KF', linewidth=2)
    plt.legend()
    plt.title('Adaptive vs Standard Kalman Filter')
    plt.ylabel('State Value')

    plt.subplot(3, 1, 2)
    standard_errors = np.abs(np.array(true_states[1:]) - np.array(standard_estimates))
    adaptive_errors = np.abs(np.array(true_states[1:]) - np.array(adaptive_estimates))
    plt.plot(standard_errors, 'b-', label='Standard KF Error')
    plt.plot(adaptive_errors, 'orange', label='Adaptive KF Error')
    plt.legend()
    plt.ylabel('Absolute Error')

    plt.subplot(3, 1, 3)
    plt.plot(Q_history, 'purple', label='Adaptive Q Estimate')
    plt.axhline(y=0.01, color='b', linestyle='--', label='Standard Q Value')
    plt.legend()
    plt.xlabel('Time Step')
    plt.ylabel('Q Value')

    plt.tight_layout()
    plt.show()

    # Calculate performance metrics
    standard_rmse = np.sqrt(np.mean(standard_errors**2))
    adaptive_rmse = np.sqrt(np.mean(adaptive_errors**2))

    print(f"Standard KF RMSE: {standard_rmse:.6f}")
    print(f"Adaptive KF RMSE: {adaptive_rmse:.6f}")
    print(f"Improvement: {(standard_rmse - adaptive_rmse) / standard_rmse * 100:.2f}%")

# adaptive_filter_example()

Model Validation Techniques

Residual Analysis

class ModelValidation:
    """
    Model validation tools
    """

    def __init__(self):
        self.validation_results = {}

    def innovation_analysis(self, innovations, S_matrices):
        """
        Innovation sequence analysis
        """
        innovations = np.array(innovations)

        # Standardized innovations
        standardized_innovations = []
        for i, (innov, S) in enumerate(zip(innovations, S_matrices)):
            std_innov = innov / np.sqrt(np.diag(S))
            standardized_innovations.append(std_innov)

        standardized_innovations = np.array(standardized_innovations)

        # Statistical tests
        results = {
            'mean': np.mean(standardized_innovations, axis=0),
            'std': np.std(standardized_innovations, axis=0),
            'skewness': self._compute_skewness(standardized_innovations),
            'kurtosis': self._compute_kurtosis(standardized_innovations),
            'ljung_box_p': self._ljung_box_test(standardized_innovations),
            'normality_p': self._normality_test(standardized_innovations)
        }

        return results

    def _compute_skewness(self, data):
        """Calculate skewness"""
        from scipy.stats import skew
        return skew(data, axis=0)

    def _compute_kurtosis(self, data):
        """Calculate kurtosis"""
        from scipy.stats import kurtosis
        return kurtosis(data, axis=0)

    def _ljung_box_test(self, data):
        """Ljung-Box serial correlation test"""
        from scipy.stats import jarque_bera
        # Simplified implementation, should use specialized test in practice
        p_values = []
        for col in range(data.shape[1]):
            _, p_val = jarque_bera(data[:, col])
            p_values.append(p_val)
        return p_values

    def _normality_test(self, data):
        """Normality test"""
        from scipy.stats import jarque_bera
        p_values = []
        for col in range(data.shape[1]):
            _, p_val = jarque_bera(data[:, col])
            p_values.append(p_val)
        return p_values

    def cross_validation(self, kf_class, data, n_folds=5):
        """
        Cross-validation
        """
        n_samples = len(data)
        fold_size = n_samples // n_folds

        cv_scores = []

        for fold in range(n_folds):
            # Split data
            start_idx = fold * fold_size
            end_idx = (fold + 1) * fold_size if fold < n_folds - 1 else n_samples

            test_data = data[start_idx:end_idx]
            train_data = np.concatenate([data[:start_idx], data[end_idx:]])

            # Train on training data
            kf = kf_class()
            # Parameter estimation process should be included here

            # Evaluate on test data
            predictions = []
            actuals = []

            for obs in test_data:
                kf.predict()
                pred = kf.H @ kf.x
                predictions.append(pred.flatten())

                kf.update(obs)
                actuals.append(obs)

            # Calculate score
            mse = np.mean((np.array(actuals) - np.array(predictions))**2)
            cv_scores.append(mse)

        return {
            'cv_scores': cv_scores,
            'mean_cv_score': np.mean(cv_scores),
            'std_cv_score': np.std(cv_scores)
        }

def model_validation_example():
    """
    Model validation example
    """
    # Generate test data
    np.random.seed(42)
    T = 200

    # Simple random walk
    true_states = [0]
    observations = []

    for t in range(1, T):
        true_states.append(true_states[-1] + np.random.normal(0, 0.1))
        obs = true_states[-1] + np.random.normal(0, 0.05)
        observations.append(obs)

    # Run filter and collect innovations
    class TestKF:
        def __init__(self):
            self.x = np.array([[0]])
            self.P = np.array([[1]])
            self.F = np.array([[1]])
            self.H = np.array([[1]])
            self.Q = np.array([[0.01]])
            self.R = np.array([[0.0025]])
            self.y = None
            self.S = None

        def predict(self):
            self.x = self.F @ self.x
            self.P = self.F @ self.P @ self.F.T + self.Q

        def update(self, z):
            z = np.array([[z]])
            self.y = z - self.H @ self.x
            self.S = self.H @ self.P @ self.H.T + self.R
            K = self.P @ self.H.T / self.S

            self.x = self.x + K * self.y
            self.P = (np.eye(1) - K @ self.H) @ self.P

    # Run filter and collect data
    kf = TestKF()
    innovations = []
    S_matrices = []

    for obs in observations:
        kf.predict()
        kf.update(obs)

        if kf.y is not None:
            innovations.append(kf.y.copy())
            S_matrices.append(kf.S.copy())

    # Validation analysis
    validator = ModelValidation()
    innovation_results = validator.innovation_analysis(innovations, S_matrices)

    print("Innovation sequence analysis results:")
    print(f"Mean: {innovation_results['mean']}")
    print(f"Standard deviation: {innovation_results['std']}")
    print(f"Skewness: {innovation_results['skewness']}")
    print(f"Kurtosis: {innovation_results['kurtosis']}")
    print(f"Normality test p-value: {innovation_results['normality_p']}")

    # Plot innovation sequence
    innovations_flat = [inn[0, 0] for inn in innovations]

    plt.figure(figsize=(12, 8))

    plt.subplot(2, 2, 1)
    plt.plot(innovations_flat)
    plt.title('Innovation Sequence')
    plt.xlabel('Time')
    plt.ylabel('Innovation Value')

    plt.subplot(2, 2, 2)
    plt.hist(innovations_flat, bins=30, density=True, alpha=0.7)
    plt.title('Innovation Distribution')
    plt.xlabel('Innovation Value')
    plt.ylabel('Density')

    plt.subplot(2, 2, 3)
    from scipy.stats import probplot
    probplot(innovations_flat, dist="norm", plot=plt)
    plt.title('Q-Q Plot')

    plt.subplot(2, 2, 4)
    # ACF plot (simplified version)
    lags = range(1, 21)
    acf_values = []
    for lag in lags:
        if len(innovations_flat) > lag:
            corr = np.corrcoef(innovations_flat[:-lag], innovations_flat[lag:])[0, 1]
            acf_values.append(corr)
        else:
            acf_values.append(0)

    plt.plot(lags, acf_values, 'bo-')
    plt.axhline(y=0, color='k', linestyle='-')
    plt.axhline(y=0.05, color='r', linestyle='--', alpha=0.5)
    plt.axhline(y=-0.05, color='r', linestyle='--', alpha=0.5)
    plt.title('Autocorrelation Function')
    plt.xlabel('Lag')
    plt.ylabel('Autocorrelation')

    plt.tight_layout()
    plt.show()

# model_validation_example()
Next Chapter Preview

In the next chapter, we will learn about the limitations and improvements of Kalman filtering, including robust filtering, adaptive algorithms, and solutions to filter divergence problems.