Chapter 11: Multi-Sensor Fusion and Distributed Filtering

Haiyue
36min

Chapter 11: Multi-Sensor Fusion and Distributed Filtering

Learning Objectives
  • Master centralized and distributed fusion architectures
  • Understand information filtering and covariance intersection methods
  • Learn to handle asynchronous sensor data

Multi-Sensor Fusion Overview

Necessity of Fusion

In modern financial systems, information sources are diverse:

Financial Multi-Data Sources
  • Market Data: Prices, volume, order book
  • News Data: Financial news, announcements, analyst reports
  • Alternative Data: Social media, satellite imagery, credit card transactions
  • Macro Data: Economic indicators, central bank policies, international trade
  • Internal Data: Risk models, trading records, customer behavior

Fusion Architecture Classification

🔄 正在渲染 Mermaid 图表...

Centralized Kalman Filtering

Multi-Sensor Centralized Fusion

import numpy as np
import matplotlib.pyplot as plt
from scipy.linalg import block_diag
import warnings
warnings.filterwarnings('ignore')

class CentralizedMultiSensorKF:
    """
    Centralized multi-sensor Kalman filter
    """

    def __init__(self, dim_x, sensors_config):
        """
        Parameters:
        dim_x: State dimension
        sensors_config: Sensor configuration list, each element contains {'H': observation matrix, 'R': noise covariance}
        """
        self.dim_x = dim_x
        self.sensors = sensors_config
        self.n_sensors = len(sensors_config)

        # State and covariance
        self.x = np.zeros((dim_x, 1))
        self.P = np.eye(dim_x)

        # System matrices
        self.F = np.eye(dim_x)
        self.Q = np.eye(dim_x)

        # Build augmented observation matrix and noise covariance
        self._build_augmented_matrices()

    def _build_augmented_matrices(self):
        """Build augmented observation matrix and noise covariance"""
        H_list = []
        R_list = []

        for sensor in self.sensors:
            H_list.append(sensor['H'])
            R_list.append(sensor['R'])

        # Vertically stack observation matrices
        self.H_aug = np.vstack(H_list)

        # Block diagonalize noise covariance matrix
        self.R_aug = block_diag(*R_list)

        self.dim_z_aug = self.H_aug.shape[0]

    def predict(self):
        """Prediction step"""
        self.x = self.F @ self.x
        self.P = self.F @ self.P @ self.F.T + self.Q

    def update(self, observations):
        """
        Update step

        Parameters:
        observations: Observation list, in sensor order
        """
        # Handle missing observations
        available_sensors = []
        available_observations = []

        for i, obs in enumerate(observations):
            if obs is not None:
                available_sensors.append(i)
                if np.isscalar(obs):
                    available_observations.append([obs])
                else:
                    available_observations.append(obs)

        if not available_sensors:
            return  # No available observations

        # Build augmented matrices for available sensors
        H_available = []
        R_available = []

        for sensor_idx in available_sensors:
            H_available.append(self.sensors[sensor_idx]['H'])
            R_available.append(self.sensors[sensor_idx]['R'])

        H_aug = np.vstack(H_available)
        R_aug = block_diag(*R_available)

        # Flatten observation vector
        z_aug = np.concatenate(available_observations).reshape(-1, 1)

        # Standard Kalman update
        y = z_aug - H_aug @ self.x
        S = H_aug @ self.P @ H_aug.T + R_aug
        K = self.P @ H_aug.T @ np.linalg.inv(S)

        self.x = self.x + K @ y
        self.P = (np.eye(self.dim_x) - K @ H_aug) @ self.P

def centralized_fusion_example():
    """
    Centralized fusion example: Multiple financial data sources
    """
    # State: [price trend, volatility]
    dim_x = 2

    # Three sensor configurations
    sensors_config = [
        {
            'name': 'Market Price',
            'H': np.array([[1, 0]]),  # Directly observe price trend
            'R': np.array([[0.1]])
        },
        {
            'name': 'Technical Indicator',
            'H': np.array([[0.8, 0.2]]),  # Combination of trend and volatility
            'R': np.array([[0.2]])
        },
        {
            'name': 'Sentiment Indicator',
            'H': np.array([[0.6, -0.3]]),  # Inverse volatility impact
            'R': np.array([[0.5]])
        }
    ]

    # Create centralized filter
    kf = CentralizedMultiSensorKF(dim_x, sensors_config)

    # Set system dynamics
    kf.F = np.array([[0.95, 0.1], [0, 0.9]])  # Price trend decay, volatility persistence
    kf.Q = np.array([[0.01, 0], [0, 0.005]])

    # Initialize
    kf.x = np.array([[0], [0.2]])
    kf.P = np.array([[1, 0], [0, 0.1]])

    # Simulate data
    np.random.seed(42)
    T = 100

    true_states = []
    all_observations = []
    estimates = []

    # True state evolution
    true_state = np.array([0, 0.2])

    for t in range(T):
        # True state update
        true_state = kf.F @ true_state + np.random.multivariate_normal([0, 0], kf.Q)
        true_states.append(true_state.copy())

        # Generate observations (some may be missing)
        observations = []

        for i, sensor in enumerate(sensors_config):
            # Simulate sensor failure (20% probability of missing)
            if np.random.random() > 0.2:
                true_obs = sensor['H'] @ true_state
                noisy_obs = true_obs + np.random.multivariate_normal(
                    np.zeros(sensor['R'].shape[0]), sensor['R'])
                observations.append(noisy_obs[0] if len(noisy_obs) == 1 else noisy_obs)
            else:
                observations.append(None)  # Missing observation

        all_observations.append(observations)

        # Filter update
        kf.predict()
        kf.update(observations)
        estimates.append(kf.x.flatten().copy())

    # Plot results
    true_states = np.array(true_states)
    estimates = np.array(estimates)

    fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 10))

    # Price trend
    ax1.plot(true_states[:, 0], 'g-', linewidth=3, label='True Price Trend')
    ax1.plot(estimates[:, 0], 'b-', linewidth=2, label='Fusion Estimate')

    # Plot observations from each sensor (when available)
    colors = ['red', 'orange', 'purple']
    for i, sensor in enumerate(sensors_config):
        sensor_obs = []
        sensor_times = []
        for t, obs_list in enumerate(all_observations):
            if obs_list[i] is not None:
                # Infer trend component from observation (simplified)
                if sensor['H'][0, 0] != 0:
                    trend_obs = obs_list[i] / sensor['H'][0, 0]
                    sensor_obs.append(trend_obs)
                    sensor_times.append(t)

        if sensor_obs:
            ax1.scatter(sensor_times, sensor_obs, c=colors[i], alpha=0.6,
                       s=20, label=f"{sensor['name']} Observations")

    ax1.set_ylabel('Price Trend')
    ax1.set_title('Multi-Sensor Fusion: Price Trend Estimation')
    ax1.legend()
    ax1.grid(True, alpha=0.3)

    # Volatility
    ax2.plot(true_states[:, 1], 'g-', linewidth=3, label='True Volatility')
    ax2.plot(estimates[:, 1], 'b-', linewidth=2, label='Fusion Estimate')
    ax2.set_xlabel('Time')
    ax2.set_ylabel('Volatility')
    ax2.set_title('Volatility Estimation')
    ax2.legend()
    ax2.grid(True, alpha=0.3)

    plt.tight_layout()
    plt.show()

    # Calculate performance
    trend_rmse = np.sqrt(np.mean((true_states[:, 0] - estimates[:, 0])**2))
    vol_rmse = np.sqrt(np.mean((true_states[:, 1] - estimates[:, 1])**2))

    print(f"Price Trend RMSE: {trend_rmse:.4f}")
    print(f"Volatility RMSE: {vol_rmse:.4f}")

    # Sensor availability statistics
    availability = []
    for i in range(len(sensors_config)):
        available_count = sum(1 for obs_list in all_observations if obs_list[i] is not None)
        availability.append(available_count / len(all_observations))
        print(f"{sensors_config[i]['name']} Availability: {availability[i]:.1%}")

# Run example
# centralized_fusion_example()

Distributed Kalman Filtering

Information Filter Form

class InformationFilter:
    """
    Information filter (dual form of Kalman filter)
    """

    def __init__(self, dim_x):
        self.dim_x = dim_x

        # Information matrix and information vector
        self.Y = np.eye(dim_x)  # Information matrix Y = P^(-1)
        self.y = np.zeros((dim_x, 1))  # Information vector y = P^(-1) * x

        # System matrices
        self.F = np.eye(dim_x)
        self.Q = np.eye(dim_x)

    @property
    def x(self):
        """State estimate: x = Y^(-1) * y"""
        try:
            return np.linalg.inv(self.Y) @ self.y
        except np.linalg.LinAlgError:
            return np.zeros((self.dim_x, 1))

    @property
    def P(self):
        """Covariance matrix: P = Y^(-1)"""
        try:
            return np.linalg.inv(self.Y)
        except np.linalg.LinAlgError:
            return np.eye(self.dim_x) * 1e6

    def predict(self):
        """Information filter prediction"""
        # Convert to covariance form for prediction
        P_prev = self.P
        x_prev = self.x

        # Standard prediction
        x_pred = self.F @ x_prev
        P_pred = self.F @ P_prev @ self.F.T + self.Q

        # Convert back to information form
        self.Y = np.linalg.inv(P_pred)
        self.y = self.Y @ x_pred

    def update(self, z, H, R):
        """Information filter update"""
        z = z.reshape(-1, 1)

        # Information contribution
        R_inv = np.linalg.inv(R)
        i = H.T @ R_inv @ z  # Information vector contribution
        I = H.T @ R_inv @ H  # Information matrix contribution

        # Information fusion
        self.y += i
        self.Y += I

    def fuse_information(self, other_y, other_Y):
        """
        Fuse with another information filter

        Parameters:
        other_y: Other node's information vector
        other_Y: Other node's information matrix
        """
        self.y += other_y
        self.Y += other_Y

class DistributedKalmanFilter:
    """
    Distributed Kalman filter network
    """

    def __init__(self, n_nodes, dim_x):
        self.n_nodes = n_nodes
        self.dim_x = dim_x

        # Create information filter nodes
        self.nodes = [InformationFilter(dim_x) for _ in range(n_nodes)]

        # Communication topology (fully connected)
        self.communication_graph = np.ones((n_nodes, n_nodes)) - np.eye(n_nodes)

    def distributed_update(self, observations, H_matrices, R_matrices):
        """
        Distributed update step

        Parameters:
        observations: Observation list for each node
        H_matrices: Observation matrix list for each node
        R_matrices: Noise covariance list for each node
        """
        # Step 1: Each node independently predicts and updates
        for i, (obs, H, R) in enumerate(zip(observations, H_matrices, R_matrices)):
            self.nodes[i].predict()

            if obs is not None:
                self.nodes[i].update(obs, H, R)

        # Step 2: Information exchange and fusion
        self._exchange_information()

    def _exchange_information(self):
        """Information exchange (simplified one-round consensus)"""
        # Collect information from all nodes
        all_y = [node.y.copy() for node in self.nodes]
        all_Y = [node.Y.copy() for node in self.nodes]

        # Calculate global information (centralized equivalent)
        global_y = sum(all_y)
        global_Y = sum(all_Y)

        # Distribute to all nodes (more complex in real applications)
        for node in self.nodes:
            node.y = global_y / self.n_nodes
            node.Y = global_Y / self.n_nodes

    def get_consensus_estimate(self):
        """Get consensus estimate"""
        # Simple average (should be consistent in practice)
        all_estimates = [node.x for node in self.nodes]
        return np.mean(all_estimates, axis=0)

def distributed_fusion_example():
    """
    Distributed fusion example
    """
    # System setup
    dim_x = 2
    n_nodes = 3

    dkf = DistributedKalmanFilter(n_nodes, dim_x)

    # Set system matrices for each node
    F = np.array([[1.0, 1.0], [0, 0.95]])
    Q = np.array([[0.01, 0], [0, 0.01]])

    for node in dkf.nodes:
        node.F = F
        node.Q = Q

    # Observation configuration for each node
    H_configs = [
        np.array([[1, 0]]),      # Node 1: observe position
        np.array([[0, 1]]),      # Node 2: observe velocity
        np.array([[1, 1]])       # Node 3: observe position+velocity
    ]

    R_configs = [
        np.array([[0.1]]),
        np.array([[0.2]]),
        np.array([[0.15]])
    ]

    # Simulate data
    np.random.seed(42)
    T = 50

    true_states = []
    distributed_estimates = []

    # Initial true state
    true_state = np.array([0, 1])

    for t in range(T):
        # True state evolution
        true_state = F @ true_state + np.random.multivariate_normal([0, 0], Q)
        true_states.append(true_state.copy())

        # Generate observations for each node
        observations = []
        for i in range(n_nodes):
            if np.random.random() > 0.1:  # 90% probability of observation
                true_obs = H_configs[i] @ true_state
                noisy_obs = true_obs + np.random.multivariate_normal(
                    np.zeros(R_configs[i].shape[0]), R_configs[i])
                observations.append(noisy_obs)
            else:
                observations.append(None)

        # Distributed update
        dkf.distributed_update(observations, H_configs, R_configs)

        # Get consensus estimate
        consensus_estimate = dkf.get_consensus_estimate()
        distributed_estimates.append(consensus_estimate.flatten())

    # Comparison: Centralized vs Distributed
    # Centralized version
    centralized_kf = CentralizedMultiSensorKF(dim_x, [
        {'H': H_configs[0], 'R': R_configs[0]},
        {'H': H_configs[1], 'R': R_configs[1]},
        {'H': H_configs[2], 'R': R_configs[2]}
    ])

    centralized_kf.F = F
    centralized_kf.Q = Q

    centralized_estimates = []

    # Re-run centralized (using same random seed data)
    np.random.seed(42)
    true_state = np.array([0, 1])

    for t in range(T):
        true_state = F @ true_state + np.random.multivariate_normal([0, 0], Q)

        observations = []
        for i in range(n_nodes):
            if np.random.random() > 0.1:
                true_obs = H_configs[i] @ true_state
                noisy_obs = true_obs + np.random.multivariate_normal(
                    np.zeros(R_configs[i].shape[0]), R_configs[i])
                observations.append(noisy_obs[0] if len(noisy_obs) == 1 else noisy_obs)
            else:
                observations.append(None)

        centralized_kf.predict()
        centralized_kf.update(observations)
        centralized_estimates.append(centralized_kf.x.flatten().copy())

    # Plot comparison results
    true_states = np.array(true_states)
    distributed_estimates = np.array(distributed_estimates)
    centralized_estimates = np.array(centralized_estimates)

    fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 10))

    # Position comparison
    ax1.plot(true_states[:, 0], 'g-', linewidth=3, label='True Position')
    ax1.plot(centralized_estimates[:, 0], 'b-', linewidth=2, label='Centralized Estimate')
    ax1.plot(distributed_estimates[:, 0], 'r--', linewidth=2, label='Distributed Estimate')
    ax1.set_ylabel('Position')
    ax1.set_title('Centralized vs Distributed Fusion: Position Estimation')
    ax1.legend()
    ax1.grid(True, alpha=0.3)

    # Velocity comparison
    ax2.plot(true_states[:, 1], 'g-', linewidth=3, label='True Velocity')
    ax2.plot(centralized_estimates[:, 1], 'b-', linewidth=2, label='Centralized Estimate')
    ax2.plot(distributed_estimates[:, 1], 'r--', linewidth=2, label='Distributed Estimate')
    ax2.set_xlabel('Time')
    ax2.set_ylabel('Velocity')
    ax2.set_title('Velocity Estimation')
    ax2.legend()
    ax2.grid(True, alpha=0.3)

    plt.tight_layout()
    plt.show()

    # Performance comparison
    cent_pos_rmse = np.sqrt(np.mean((true_states[:, 0] - centralized_estimates[:, 0])**2))
    dist_pos_rmse = np.sqrt(np.mean((true_states[:, 0] - distributed_estimates[:, 0])**2))

    cent_vel_rmse = np.sqrt(np.mean((true_states[:, 1] - centralized_estimates[:, 1])**2))
    dist_vel_rmse = np.sqrt(np.mean((true_states[:, 1] - distributed_estimates[:, 1])**2))

    print("Performance Comparison:")
    print(f"Position RMSE - Centralized: {cent_pos_rmse:.4f}, Distributed: {dist_pos_rmse:.4f}")
    print(f"Velocity RMSE - Centralized: {cent_vel_rmse:.4f}, Distributed: {dist_vel_rmse:.4f}")

# Run example
# distributed_fusion_example()

Covariance Intersection Method

CI Fusion Algorithm

When the correlation between sensors is unknown, Covariance Intersection (CI) provides a conservative but consistent fusion:

class CovarianceIntersection:
    """
    Covariance intersection fusion method
    """

    def __init__(self):
        pass

    def fuse_two_estimates(self, x1, P1, x2, P2, omega=None):
        """
        Fuse two estimates

        Parameters:
        x1, P1: Mean and covariance of first estimate
        x2, P2: Mean and covariance of second estimate
        omega: Weight parameter [0,1], automatically optimized if None

        Returns:
        x_fused, P_fused: Fused estimate
        """
        if omega is None:
            omega = self._optimize_omega(P1, P2)

        # CI fusion formula
        P1_inv = np.linalg.inv(P1)
        P2_inv = np.linalg.inv(P2)

        P_fused_inv = omega * P1_inv + (1 - omega) * P2_inv
        P_fused = np.linalg.inv(P_fused_inv)

        x_fused = P_fused @ (omega * P1_inv @ x1 + (1 - omega) * P2_inv @ x2)

        return x_fused, P_fused

    def _optimize_omega(self, P1, P2):
        """
        Optimize weight parameter to minimize fusion covariance trace

        Parameters:
        P1, P2: Two covariance matrices

        Returns:
        optimal_omega: Optimal weight
        """
        def trace_objective(omega):
            try:
                P1_inv = np.linalg.inv(P1)
                P2_inv = np.linalg.inv(P2)
                P_fused_inv = omega * P1_inv + (1 - omega) * P2_inv
                P_fused = np.linalg.inv(P_fused_inv)
                return np.trace(P_fused)
            except:
                return 1e10

        # Simple golden section optimization
        from scipy.optimize import minimize_scalar

        result = minimize_scalar(trace_objective, bounds=(0, 1), method='bounded')
        return result.x

    def fuse_multiple_estimates(self, estimates_list):
        """
        Fuse multiple estimates

        Parameters:
        estimates_list: List of estimates, each element is (x, P) tuple

        Returns:
        x_fused, P_fused: Fused estimate
        """
        if len(estimates_list) < 2:
            return estimates_list[0] if estimates_list else (None, None)

        # Pairwise fusion
        x_fused, P_fused = estimates_list[0]

        for i in range(1, len(estimates_list)):
            x_i, P_i = estimates_list[i]
            x_fused, P_fused = self.fuse_two_estimates(x_fused, P_fused, x_i, P_i)

        return x_fused, P_fused

def ci_fusion_example():
    """
    Covariance intersection fusion example
    """
    # Create CI fuser
    ci = CovarianceIntersection()

    # Simulated scenario: Three different financial models estimate same asset
    np.random.seed(42)
    T = 100

    # True state: [price, volatility]
    true_states = []
    true_state = np.array([100, 0.2])

    # Three model estimates
    model1_estimates = []  # Technical analysis model
    model2_estimates = []  # Fundamental analysis model
    model3_estimates = []  # Sentiment analysis model

    ci_estimates = []

    for t in range(T):
        # True state evolution
        true_state[0] *= (1 + np.random.normal(0.001, true_state[1] / np.sqrt(252)))
        true_state[1] = 0.8 * true_state[1] + 0.2 * 0.2 + np.random.normal(0, 0.01)
        true_state[1] = max(true_state[1], 0.05)  # Minimum volatility
        true_states.append(true_state.copy())

        # Model 1: Technical analysis (sensitive to price, insensitive to volatility)
        x1 = true_state + np.random.multivariate_normal([0, 0], [[1, 0], [0, 0.01]])
        P1 = np.array([[1, 0.1], [0.1, 0.01]])

        # Model 2: Fundamental analysis (sensitive to long-term trends)
        x2 = true_state + np.random.multivariate_normal([0, 0], [[4, 0], [0, 0.005]])
        P2 = np.array([[4, 0], [0, 0.005]])

        # Model 3: Sentiment analysis (sensitive to volatility)
        x3 = true_state + np.random.multivariate_normal([0, 0], [[2, 0.5], [0.5, 0.02]])
        P3 = np.array([[2, 0.5], [0.5, 0.02]])

        model1_estimates.append(x1)
        model2_estimates.append(x2)
        model3_estimates.append(x3)

        # CI fusion
        estimates_list = [(x1.reshape(-1, 1), P1), (x2.reshape(-1, 1), P2), (x3.reshape(-1, 1), P3)]
        x_ci, P_ci = ci.fuse_multiple_estimates(estimates_list)

        ci_estimates.append(x_ci.flatten())

    # Convert to arrays
    true_states = np.array(true_states)
    model1_estimates = np.array(model1_estimates)
    model2_estimates = np.array(model2_estimates)
    model3_estimates = np.array(model3_estimates)
    ci_estimates = np.array(ci_estimates)

    # Plot results
    fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(15, 12))

    # Price estimation
    ax1.plot(true_states[:, 0], 'g-', linewidth=3, label='True Price')
    ax1.plot(model1_estimates[:, 0], 'r:', linewidth=1, alpha=0.7, label='Technical Analysis')
    ax1.plot(model2_estimates[:, 0], 'b:', linewidth=1, alpha=0.7, label='Fundamental Analysis')
    ax1.plot(model3_estimates[:, 0], 'm:', linewidth=1, alpha=0.7, label='Sentiment Analysis')
    ax1.plot(ci_estimates[:, 0], 'k-', linewidth=2, label='CI Fusion')
    ax1.set_ylabel('Price')
    ax1.set_title('Price Estimation Comparison')
    ax1.legend()
    ax1.grid(True, alpha=0.3)

    # Volatility estimation
    ax2.plot(true_states[:, 1], 'g-', linewidth=3, label='True Volatility')
    ax2.plot(model1_estimates[:, 1], 'r:', linewidth=1, alpha=0.7, label='Technical Analysis')
    ax2.plot(model2_estimates[:, 1], 'b:', linewidth=1, alpha=0.7, label='Fundamental Analysis')
    ax2.plot(model3_estimates[:, 1], 'm:', linewidth=1, alpha=0.7, label='Sentiment Analysis')
    ax2.plot(ci_estimates[:, 1], 'k-', linewidth=2, label='CI Fusion')
    ax2.set_ylabel('Volatility')
    ax2.set_title('Volatility Estimation Comparison')
    ax2.legend()
    ax2.grid(True, alpha=0.3)

    # Price error
    ax3.plot(np.abs(true_states[:, 0] - model1_estimates[:, 0]), 'r:', label='Technical Analysis Error')
    ax3.plot(np.abs(true_states[:, 0] - model2_estimates[:, 0]), 'b:', label='Fundamental Error')
    ax3.plot(np.abs(true_states[:, 0] - model3_estimates[:, 0]), 'm:', label='Sentiment Analysis Error')
    ax3.plot(np.abs(true_states[:, 0] - ci_estimates[:, 0]), 'k-', linewidth=2, label='CI Fusion Error')
    ax3.set_ylabel('Price Absolute Error')
    ax3.set_title('Price Estimation Error')
    ax3.legend()
    ax3.grid(True, alpha=0.3)

    # Volatility error
    ax4.plot(np.abs(true_states[:, 1] - model1_estimates[:, 1]), 'r:', label='Technical Analysis Error')
    ax4.plot(np.abs(true_states[:, 1] - model2_estimates[:, 1]), 'b:', label='Fundamental Error')
    ax4.plot(np.abs(true_states[:, 1] - model3_estimates[:, 1]), 'm:', label='Sentiment Analysis Error')
    ax4.plot(np.abs(true_states[:, 1] - ci_estimates[:, 1]), 'k-', linewidth=2, label='CI Fusion Error')
    ax4.set_xlabel('Time')
    ax4.set_ylabel('Volatility Absolute Error')
    ax4.set_title('Volatility Estimation Error')
    ax4.legend()
    ax4.grid(True, alpha=0.3)

    plt.tight_layout()
    plt.show()

    # Performance statistics
    models = {
        'Technical Analysis': model1_estimates,
        'Fundamental Analysis': model2_estimates,
        'Sentiment Analysis': model3_estimates,
        'CI Fusion': ci_estimates
    }

    print("Performance Comparison (RMSE):")
    print("-" * 40)
    for name, estimates in models.items():
        price_rmse = np.sqrt(np.mean((true_states[:, 0] - estimates[:, 0])**2))
        vol_rmse = np.sqrt(np.mean((true_states[:, 1] - estimates[:, 1])**2))
        print(f"{name:12} - Price: {price_rmse:.3f}, Volatility: {vol_rmse:.4f}")

# Run example
# ci_fusion_example()

Asynchronous Data Processing

Time Synchronization and Interpolation

class AsynchronousDataFusion:
    """
    Asynchronous data fusion processor
    """

    def __init__(self, dim_x, buffer_size=100):
        self.dim_x = dim_x
        self.buffer_size = buffer_size

        # Data buffer
        self.data_buffer = {}  # sensor_id -> [(timestamp, observation, H, R), ...]

        # State history
        self.state_history = []  # [(timestamp, x, P), ...]

        # Current state
        self.current_time = 0
        self.x = np.zeros((dim_x, 1))
        self.P = np.eye(dim_x)

        # System parameters
        self.F = np.eye(dim_x)
        self.Q = np.eye(dim_x)

    def add_observation(self, sensor_id, timestamp, observation, H, R):
        """
        Add asynchronous observation

        Parameters:
        sensor_id: Sensor ID
        timestamp: Timestamp
        observation: Observation value
        H: Observation matrix
        R: Observation noise covariance
        """
        if sensor_id not in self.data_buffer:
            self.data_buffer[sensor_id] = []

        self.data_buffer[sensor_id].append((timestamp, observation, H, R))

        # Maintain buffer size
        if len(self.data_buffer[sensor_id]) > self.buffer_size:
            self.data_buffer[sensor_id].pop(0)

        # Process newly arrived data
        self._process_pending_observations()

    def _process_pending_observations(self):
        """Process pending observations"""
        # Collect all pending observations
        all_observations = []
        for sensor_id, buffer in self.data_buffer.items():
            for obs_data in buffer:
                timestamp, observation, H, R = obs_data
                if timestamp > self.current_time:
                    all_observations.append((timestamp, sensor_id, observation, H, R))

        if not all_observations:
            return

        # Sort by timestamp
        all_observations.sort(key=lambda x: x[0])

        # Process one by one
        for timestamp, sensor_id, observation, H, R in all_observations:
            self._process_single_observation(timestamp, observation, H, R)

            # Remove processed observations
            self.data_buffer[sensor_id] = [
                obs for obs in self.data_buffer[sensor_id]
                if obs[0] != timestamp
            ]

    def _process_single_observation(self, timestamp, observation, H, R):
        """Process single observation"""
        # Predict to observation time
        dt = timestamp - self.current_time
        if dt > 0:
            self._predict_to_time(dt)

        # Update
        observation = np.array(observation).reshape(-1, 1)
        y = observation - H @ self.x
        S = H @ self.P @ H.T + R
        K = self.P @ H.T @ np.linalg.inv(S)

        self.x = self.x + K @ y
        self.P = (np.eye(self.dim_x) - K @ H) @ self.P

        # Update time
        self.current_time = timestamp

        # Save state history
        self.state_history.append((timestamp, self.x.copy(), self.P.copy()))

    def _predict_to_time(self, dt):
        """Predict to specified time"""
        # Discretize system matrices
        F_dt = np.linalg.matrix_power(self.F, int(dt)) if dt == int(dt) else self.F
        Q_dt = self.Q * dt

        self.x = F_dt @ self.x
        self.P = F_dt @ self.P @ F_dt.T + Q_dt

    def get_state_at_time(self, query_time):
        """Get state estimate at specified time (interpolation)"""
        if not self.state_history:
            return self.x, self.P

        # Find nearest historical states
        before_states = [(t, x, P) for t, x, P in self.state_history if t <= query_time]
        after_states = [(t, x, P) for t, x, P in self.state_history if t > query_time]

        if not before_states:
            # Extrapolate to past
            t_after, x_after, P_after = after_states[0]
            dt = t_after - query_time
            F_inv = np.linalg.inv(self.F)
            x_interp = F_inv @ x_after
            P_interp = F_inv @ P_after @ F_inv.T
            return x_interp, P_interp

        elif not after_states:
            # Extrapolate to future
            t_before, x_before, P_before = before_states[-1]
            dt = query_time - t_before
            F_dt = self.F  # Simplified
            Q_dt = self.Q * dt
            x_interp = F_dt @ x_before
            P_interp = F_dt @ P_before @ F_dt.T + Q_dt
            return x_interp, P_interp

        else:
            # Interpolate
            t_before, x_before, P_before = before_states[-1]
            t_after, x_after, P_after = after_states[0]

            # Simple linear interpolation
            alpha = (query_time - t_before) / (t_after - t_before)
            x_interp = (1 - alpha) * x_before + alpha * x_after
            P_interp = (1 - alpha) * P_before + alpha * P_after

            return x_interp, P_interp

def asynchronous_fusion_example():
    """
    Asynchronous fusion example
    """
    # Create asynchronous fuser
    adf = AsynchronousDataFusion(dim_x=2)

    # Set system parameters
    adf.F = np.array([[1, 1], [0, 0.95]])
    adf.Q = np.array([[0.01, 0], [0, 0.01]])

    # Initialize
    adf.x = np.array([[0], [1]])
    adf.P = np.array([[1, 0], [0, 1]])

    # Simulate asynchronous data streams
    np.random.seed(42)

    # Sensor configuration
    sensors = {
        'sensor_A': {'H': np.array([[1, 0]]), 'R': np.array([[0.1]]), 'rate': 1.0},
        'sensor_B': {'H': np.array([[0, 1]]), 'R': np.array([[0.2]]), 'rate': 0.5},
        'sensor_C': {'H': np.array([[1, 1]]), 'R': np.array([[0.15]]), 'rate': 0.3}
    }

    # Generate true state sequence
    true_states = []
    timestamps = []
    true_state = np.array([0, 1])

    T = 100
    for t in range(T):
        true_state = adf.F @ true_state + np.random.multivariate_normal([0, 0], adf.Q)
        true_states.append(true_state.copy())
        timestamps.append(t)

    # Generate asynchronous observations
    all_observations = []

    for sensor_id, config in sensors.items():
        H, R, rate = config['H'], config['R'], config['rate']

        # Generate sensor timestamps
        sensor_times = []
        t = 0
        while t < T:
            t += np.random.exponential(1.0 / rate)  # Poisson process
            if t < T:
                sensor_times.append(t)

        # Generate observations for each time point
        for obs_time in sensor_times:
            # Interpolate to get true state at that time
            if obs_time <= T - 1:
                idx = int(obs_time)
                alpha = obs_time - idx
                if idx + 1 < len(true_states):
                    true_state_interp = (1 - alpha) * true_states[idx] + alpha * true_states[idx + 1]
                else:
                    true_state_interp = true_states[idx]

                # Generate observation
                true_obs = H @ true_state_interp
                noisy_obs = true_obs + np.random.multivariate_normal(np.zeros(R.shape[0]), R)

                all_observations.append((obs_time, sensor_id, noisy_obs, H, R))

    # Sort by time and add to fuser
    all_observations.sort(key=lambda x: x[0])

    for obs_time, sensor_id, observation, H, R in all_observations:
        adf.add_observation(sensor_id, obs_time, observation, H, R)

    # Get estimates at synchronized time points
    synchronized_estimates = []
    for t in range(T):
        x_est, P_est = adf.get_state_at_time(t)
        synchronized_estimates.append(x_est.flatten())

    synchronized_estimates = np.array(synchronized_estimates)

    # Plot results
    fig, (ax1, ax2, ax3) = plt.subplots(3, 1, figsize=(12, 12))

    # State estimation
    ax1.plot(timestamps, np.array(true_states)[:, 0], 'g-', linewidth=3, label='True Position')
    ax1.plot(timestamps, synchronized_estimates[:, 0], 'b-', linewidth=2, label='Async Fusion Estimate')

    # Mark observation times
    colors = {'sensor_A': 'red', 'sensor_B': 'orange', 'sensor_C': 'purple'}
    for obs_time, sensor_id, observation, H, R in all_observations[:50]:  # Only show first 50
        if H[0, 0] > 0:  # If observation contains position information
            ax1.scatter(obs_time, observation[0] / H[0, 0], c=colors[sensor_id],
                       s=20, alpha=0.6)

    ax1.set_ylabel('Position')
    ax1.set_title('Asynchronous Multi-Sensor Fusion: Position Estimation')
    ax1.legend()
    ax1.grid(True, alpha=0.3)

    # Velocity estimation
    ax2.plot(timestamps, np.array(true_states)[:, 1], 'g-', linewidth=3, label='True Velocity')
    ax2.plot(timestamps, synchronized_estimates[:, 1], 'b-', linewidth=2, label='Async Fusion Estimate')
    ax2.set_ylabel('Velocity')
    ax2.set_title('Velocity Estimation')
    ax2.legend()
    ax2.grid(True, alpha=0.3)

    # Observation time distribution
    sensor_obs_times = {sensor: [] for sensor in sensors.keys()}
    for obs_time, sensor_id, _, _, _ in all_observations:
        sensor_obs_times[sensor_id].append(obs_time)

    for i, (sensor_id, obs_times) in enumerate(sensor_obs_times.items()):
        ax3.scatter(obs_times, [i] * len(obs_times), c=colors[sensor_id],
                   label=f'{sensor_id} ({len(obs_times)} obs)', alpha=0.7)

    ax3.set_xlabel('Time')
    ax3.set_ylabel('Sensor')
    ax3.set_title('Asynchronous Observation Time Distribution')
    ax3.legend()
    ax3.grid(True, alpha=0.3)

    plt.tight_layout()
    plt.show()

    # Performance evaluation
    pos_rmse = np.sqrt(np.mean((np.array(true_states)[:, 0] - synchronized_estimates[:, 0])**2))
    vel_rmse = np.sqrt(np.mean((np.array(true_states)[:, 1] - synchronized_estimates[:, 1])**2))

    print(f"Asynchronous Fusion Performance:")
    print(f"Position RMSE: {pos_rmse:.4f}")
    print(f"Velocity RMSE: {vel_rmse:.4f}")

    # Sensor statistics
    total_observations = len(all_observations)
    print(f"\nSensor Observation Statistics:")
    for sensor_id in sensors.keys():
        count = sum(1 for obs in all_observations if obs[1] == sensor_id)
        print(f"{sensor_id}: {count} observations ({count/total_observations:.1%})")

# Run example
# asynchronous_fusion_example()
Next Chapter Preview

In the next chapter, we will learn about applications of Kalman filtering in portfolio management, including dynamic portfolio optimization, risk factor estimation, and asset allocation strategies.