Chapter 11: Multi-Sensor Fusion and Distributed Filtering
Haiyue
36min
Chapter 11: Multi-Sensor Fusion and Distributed Filtering
Learning Objectives
- Master centralized and distributed fusion architectures
- Understand information filtering and covariance intersection methods
- Learn to handle asynchronous sensor data
Multi-Sensor Fusion Overview
Necessity of Fusion
In modern financial systems, information sources are diverse:
Financial Multi-Data Sources
- Market Data: Prices, volume, order book
- News Data: Financial news, announcements, analyst reports
- Alternative Data: Social media, satellite imagery, credit card transactions
- Macro Data: Economic indicators, central bank policies, international trade
- Internal Data: Risk models, trading records, customer behavior
Fusion Architecture Classification
🔄 正在渲染 Mermaid 图表...
Centralized Kalman Filtering
Multi-Sensor Centralized Fusion
import numpy as np
import matplotlib.pyplot as plt
from scipy.linalg import block_diag
import warnings
warnings.filterwarnings('ignore')
class CentralizedMultiSensorKF:
"""
Centralized multi-sensor Kalman filter
"""
def __init__(self, dim_x, sensors_config):
"""
Parameters:
dim_x: State dimension
sensors_config: Sensor configuration list, each element contains {'H': observation matrix, 'R': noise covariance}
"""
self.dim_x = dim_x
self.sensors = sensors_config
self.n_sensors = len(sensors_config)
# State and covariance
self.x = np.zeros((dim_x, 1))
self.P = np.eye(dim_x)
# System matrices
self.F = np.eye(dim_x)
self.Q = np.eye(dim_x)
# Build augmented observation matrix and noise covariance
self._build_augmented_matrices()
def _build_augmented_matrices(self):
"""Build augmented observation matrix and noise covariance"""
H_list = []
R_list = []
for sensor in self.sensors:
H_list.append(sensor['H'])
R_list.append(sensor['R'])
# Vertically stack observation matrices
self.H_aug = np.vstack(H_list)
# Block diagonalize noise covariance matrix
self.R_aug = block_diag(*R_list)
self.dim_z_aug = self.H_aug.shape[0]
def predict(self):
"""Prediction step"""
self.x = self.F @ self.x
self.P = self.F @ self.P @ self.F.T + self.Q
def update(self, observations):
"""
Update step
Parameters:
observations: Observation list, in sensor order
"""
# Handle missing observations
available_sensors = []
available_observations = []
for i, obs in enumerate(observations):
if obs is not None:
available_sensors.append(i)
if np.isscalar(obs):
available_observations.append([obs])
else:
available_observations.append(obs)
if not available_sensors:
return # No available observations
# Build augmented matrices for available sensors
H_available = []
R_available = []
for sensor_idx in available_sensors:
H_available.append(self.sensors[sensor_idx]['H'])
R_available.append(self.sensors[sensor_idx]['R'])
H_aug = np.vstack(H_available)
R_aug = block_diag(*R_available)
# Flatten observation vector
z_aug = np.concatenate(available_observations).reshape(-1, 1)
# Standard Kalman update
y = z_aug - H_aug @ self.x
S = H_aug @ self.P @ H_aug.T + R_aug
K = self.P @ H_aug.T @ np.linalg.inv(S)
self.x = self.x + K @ y
self.P = (np.eye(self.dim_x) - K @ H_aug) @ self.P
def centralized_fusion_example():
"""
Centralized fusion example: Multiple financial data sources
"""
# State: [price trend, volatility]
dim_x = 2
# Three sensor configurations
sensors_config = [
{
'name': 'Market Price',
'H': np.array([[1, 0]]), # Directly observe price trend
'R': np.array([[0.1]])
},
{
'name': 'Technical Indicator',
'H': np.array([[0.8, 0.2]]), # Combination of trend and volatility
'R': np.array([[0.2]])
},
{
'name': 'Sentiment Indicator',
'H': np.array([[0.6, -0.3]]), # Inverse volatility impact
'R': np.array([[0.5]])
}
]
# Create centralized filter
kf = CentralizedMultiSensorKF(dim_x, sensors_config)
# Set system dynamics
kf.F = np.array([[0.95, 0.1], [0, 0.9]]) # Price trend decay, volatility persistence
kf.Q = np.array([[0.01, 0], [0, 0.005]])
# Initialize
kf.x = np.array([[0], [0.2]])
kf.P = np.array([[1, 0], [0, 0.1]])
# Simulate data
np.random.seed(42)
T = 100
true_states = []
all_observations = []
estimates = []
# True state evolution
true_state = np.array([0, 0.2])
for t in range(T):
# True state update
true_state = kf.F @ true_state + np.random.multivariate_normal([0, 0], kf.Q)
true_states.append(true_state.copy())
# Generate observations (some may be missing)
observations = []
for i, sensor in enumerate(sensors_config):
# Simulate sensor failure (20% probability of missing)
if np.random.random() > 0.2:
true_obs = sensor['H'] @ true_state
noisy_obs = true_obs + np.random.multivariate_normal(
np.zeros(sensor['R'].shape[0]), sensor['R'])
observations.append(noisy_obs[0] if len(noisy_obs) == 1 else noisy_obs)
else:
observations.append(None) # Missing observation
all_observations.append(observations)
# Filter update
kf.predict()
kf.update(observations)
estimates.append(kf.x.flatten().copy())
# Plot results
true_states = np.array(true_states)
estimates = np.array(estimates)
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 10))
# Price trend
ax1.plot(true_states[:, 0], 'g-', linewidth=3, label='True Price Trend')
ax1.plot(estimates[:, 0], 'b-', linewidth=2, label='Fusion Estimate')
# Plot observations from each sensor (when available)
colors = ['red', 'orange', 'purple']
for i, sensor in enumerate(sensors_config):
sensor_obs = []
sensor_times = []
for t, obs_list in enumerate(all_observations):
if obs_list[i] is not None:
# Infer trend component from observation (simplified)
if sensor['H'][0, 0] != 0:
trend_obs = obs_list[i] / sensor['H'][0, 0]
sensor_obs.append(trend_obs)
sensor_times.append(t)
if sensor_obs:
ax1.scatter(sensor_times, sensor_obs, c=colors[i], alpha=0.6,
s=20, label=f"{sensor['name']} Observations")
ax1.set_ylabel('Price Trend')
ax1.set_title('Multi-Sensor Fusion: Price Trend Estimation')
ax1.legend()
ax1.grid(True, alpha=0.3)
# Volatility
ax2.plot(true_states[:, 1], 'g-', linewidth=3, label='True Volatility')
ax2.plot(estimates[:, 1], 'b-', linewidth=2, label='Fusion Estimate')
ax2.set_xlabel('Time')
ax2.set_ylabel('Volatility')
ax2.set_title('Volatility Estimation')
ax2.legend()
ax2.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
# Calculate performance
trend_rmse = np.sqrt(np.mean((true_states[:, 0] - estimates[:, 0])**2))
vol_rmse = np.sqrt(np.mean((true_states[:, 1] - estimates[:, 1])**2))
print(f"Price Trend RMSE: {trend_rmse:.4f}")
print(f"Volatility RMSE: {vol_rmse:.4f}")
# Sensor availability statistics
availability = []
for i in range(len(sensors_config)):
available_count = sum(1 for obs_list in all_observations if obs_list[i] is not None)
availability.append(available_count / len(all_observations))
print(f"{sensors_config[i]['name']} Availability: {availability[i]:.1%}")
# Run example
# centralized_fusion_example()
Distributed Kalman Filtering
Information Filter Form
class InformationFilter:
"""
Information filter (dual form of Kalman filter)
"""
def __init__(self, dim_x):
self.dim_x = dim_x
# Information matrix and information vector
self.Y = np.eye(dim_x) # Information matrix Y = P^(-1)
self.y = np.zeros((dim_x, 1)) # Information vector y = P^(-1) * x
# System matrices
self.F = np.eye(dim_x)
self.Q = np.eye(dim_x)
@property
def x(self):
"""State estimate: x = Y^(-1) * y"""
try:
return np.linalg.inv(self.Y) @ self.y
except np.linalg.LinAlgError:
return np.zeros((self.dim_x, 1))
@property
def P(self):
"""Covariance matrix: P = Y^(-1)"""
try:
return np.linalg.inv(self.Y)
except np.linalg.LinAlgError:
return np.eye(self.dim_x) * 1e6
def predict(self):
"""Information filter prediction"""
# Convert to covariance form for prediction
P_prev = self.P
x_prev = self.x
# Standard prediction
x_pred = self.F @ x_prev
P_pred = self.F @ P_prev @ self.F.T + self.Q
# Convert back to information form
self.Y = np.linalg.inv(P_pred)
self.y = self.Y @ x_pred
def update(self, z, H, R):
"""Information filter update"""
z = z.reshape(-1, 1)
# Information contribution
R_inv = np.linalg.inv(R)
i = H.T @ R_inv @ z # Information vector contribution
I = H.T @ R_inv @ H # Information matrix contribution
# Information fusion
self.y += i
self.Y += I
def fuse_information(self, other_y, other_Y):
"""
Fuse with another information filter
Parameters:
other_y: Other node's information vector
other_Y: Other node's information matrix
"""
self.y += other_y
self.Y += other_Y
class DistributedKalmanFilter:
"""
Distributed Kalman filter network
"""
def __init__(self, n_nodes, dim_x):
self.n_nodes = n_nodes
self.dim_x = dim_x
# Create information filter nodes
self.nodes = [InformationFilter(dim_x) for _ in range(n_nodes)]
# Communication topology (fully connected)
self.communication_graph = np.ones((n_nodes, n_nodes)) - np.eye(n_nodes)
def distributed_update(self, observations, H_matrices, R_matrices):
"""
Distributed update step
Parameters:
observations: Observation list for each node
H_matrices: Observation matrix list for each node
R_matrices: Noise covariance list for each node
"""
# Step 1: Each node independently predicts and updates
for i, (obs, H, R) in enumerate(zip(observations, H_matrices, R_matrices)):
self.nodes[i].predict()
if obs is not None:
self.nodes[i].update(obs, H, R)
# Step 2: Information exchange and fusion
self._exchange_information()
def _exchange_information(self):
"""Information exchange (simplified one-round consensus)"""
# Collect information from all nodes
all_y = [node.y.copy() for node in self.nodes]
all_Y = [node.Y.copy() for node in self.nodes]
# Calculate global information (centralized equivalent)
global_y = sum(all_y)
global_Y = sum(all_Y)
# Distribute to all nodes (more complex in real applications)
for node in self.nodes:
node.y = global_y / self.n_nodes
node.Y = global_Y / self.n_nodes
def get_consensus_estimate(self):
"""Get consensus estimate"""
# Simple average (should be consistent in practice)
all_estimates = [node.x for node in self.nodes]
return np.mean(all_estimates, axis=0)
def distributed_fusion_example():
"""
Distributed fusion example
"""
# System setup
dim_x = 2
n_nodes = 3
dkf = DistributedKalmanFilter(n_nodes, dim_x)
# Set system matrices for each node
F = np.array([[1.0, 1.0], [0, 0.95]])
Q = np.array([[0.01, 0], [0, 0.01]])
for node in dkf.nodes:
node.F = F
node.Q = Q
# Observation configuration for each node
H_configs = [
np.array([[1, 0]]), # Node 1: observe position
np.array([[0, 1]]), # Node 2: observe velocity
np.array([[1, 1]]) # Node 3: observe position+velocity
]
R_configs = [
np.array([[0.1]]),
np.array([[0.2]]),
np.array([[0.15]])
]
# Simulate data
np.random.seed(42)
T = 50
true_states = []
distributed_estimates = []
# Initial true state
true_state = np.array([0, 1])
for t in range(T):
# True state evolution
true_state = F @ true_state + np.random.multivariate_normal([0, 0], Q)
true_states.append(true_state.copy())
# Generate observations for each node
observations = []
for i in range(n_nodes):
if np.random.random() > 0.1: # 90% probability of observation
true_obs = H_configs[i] @ true_state
noisy_obs = true_obs + np.random.multivariate_normal(
np.zeros(R_configs[i].shape[0]), R_configs[i])
observations.append(noisy_obs)
else:
observations.append(None)
# Distributed update
dkf.distributed_update(observations, H_configs, R_configs)
# Get consensus estimate
consensus_estimate = dkf.get_consensus_estimate()
distributed_estimates.append(consensus_estimate.flatten())
# Comparison: Centralized vs Distributed
# Centralized version
centralized_kf = CentralizedMultiSensorKF(dim_x, [
{'H': H_configs[0], 'R': R_configs[0]},
{'H': H_configs[1], 'R': R_configs[1]},
{'H': H_configs[2], 'R': R_configs[2]}
])
centralized_kf.F = F
centralized_kf.Q = Q
centralized_estimates = []
# Re-run centralized (using same random seed data)
np.random.seed(42)
true_state = np.array([0, 1])
for t in range(T):
true_state = F @ true_state + np.random.multivariate_normal([0, 0], Q)
observations = []
for i in range(n_nodes):
if np.random.random() > 0.1:
true_obs = H_configs[i] @ true_state
noisy_obs = true_obs + np.random.multivariate_normal(
np.zeros(R_configs[i].shape[0]), R_configs[i])
observations.append(noisy_obs[0] if len(noisy_obs) == 1 else noisy_obs)
else:
observations.append(None)
centralized_kf.predict()
centralized_kf.update(observations)
centralized_estimates.append(centralized_kf.x.flatten().copy())
# Plot comparison results
true_states = np.array(true_states)
distributed_estimates = np.array(distributed_estimates)
centralized_estimates = np.array(centralized_estimates)
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 10))
# Position comparison
ax1.plot(true_states[:, 0], 'g-', linewidth=3, label='True Position')
ax1.plot(centralized_estimates[:, 0], 'b-', linewidth=2, label='Centralized Estimate')
ax1.plot(distributed_estimates[:, 0], 'r--', linewidth=2, label='Distributed Estimate')
ax1.set_ylabel('Position')
ax1.set_title('Centralized vs Distributed Fusion: Position Estimation')
ax1.legend()
ax1.grid(True, alpha=0.3)
# Velocity comparison
ax2.plot(true_states[:, 1], 'g-', linewidth=3, label='True Velocity')
ax2.plot(centralized_estimates[:, 1], 'b-', linewidth=2, label='Centralized Estimate')
ax2.plot(distributed_estimates[:, 1], 'r--', linewidth=2, label='Distributed Estimate')
ax2.set_xlabel('Time')
ax2.set_ylabel('Velocity')
ax2.set_title('Velocity Estimation')
ax2.legend()
ax2.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
# Performance comparison
cent_pos_rmse = np.sqrt(np.mean((true_states[:, 0] - centralized_estimates[:, 0])**2))
dist_pos_rmse = np.sqrt(np.mean((true_states[:, 0] - distributed_estimates[:, 0])**2))
cent_vel_rmse = np.sqrt(np.mean((true_states[:, 1] - centralized_estimates[:, 1])**2))
dist_vel_rmse = np.sqrt(np.mean((true_states[:, 1] - distributed_estimates[:, 1])**2))
print("Performance Comparison:")
print(f"Position RMSE - Centralized: {cent_pos_rmse:.4f}, Distributed: {dist_pos_rmse:.4f}")
print(f"Velocity RMSE - Centralized: {cent_vel_rmse:.4f}, Distributed: {dist_vel_rmse:.4f}")
# Run example
# distributed_fusion_example()
Covariance Intersection Method
CI Fusion Algorithm
When the correlation between sensors is unknown, Covariance Intersection (CI) provides a conservative but consistent fusion:
class CovarianceIntersection:
"""
Covariance intersection fusion method
"""
def __init__(self):
pass
def fuse_two_estimates(self, x1, P1, x2, P2, omega=None):
"""
Fuse two estimates
Parameters:
x1, P1: Mean and covariance of first estimate
x2, P2: Mean and covariance of second estimate
omega: Weight parameter [0,1], automatically optimized if None
Returns:
x_fused, P_fused: Fused estimate
"""
if omega is None:
omega = self._optimize_omega(P1, P2)
# CI fusion formula
P1_inv = np.linalg.inv(P1)
P2_inv = np.linalg.inv(P2)
P_fused_inv = omega * P1_inv + (1 - omega) * P2_inv
P_fused = np.linalg.inv(P_fused_inv)
x_fused = P_fused @ (omega * P1_inv @ x1 + (1 - omega) * P2_inv @ x2)
return x_fused, P_fused
def _optimize_omega(self, P1, P2):
"""
Optimize weight parameter to minimize fusion covariance trace
Parameters:
P1, P2: Two covariance matrices
Returns:
optimal_omega: Optimal weight
"""
def trace_objective(omega):
try:
P1_inv = np.linalg.inv(P1)
P2_inv = np.linalg.inv(P2)
P_fused_inv = omega * P1_inv + (1 - omega) * P2_inv
P_fused = np.linalg.inv(P_fused_inv)
return np.trace(P_fused)
except:
return 1e10
# Simple golden section optimization
from scipy.optimize import minimize_scalar
result = minimize_scalar(trace_objective, bounds=(0, 1), method='bounded')
return result.x
def fuse_multiple_estimates(self, estimates_list):
"""
Fuse multiple estimates
Parameters:
estimates_list: List of estimates, each element is (x, P) tuple
Returns:
x_fused, P_fused: Fused estimate
"""
if len(estimates_list) < 2:
return estimates_list[0] if estimates_list else (None, None)
# Pairwise fusion
x_fused, P_fused = estimates_list[0]
for i in range(1, len(estimates_list)):
x_i, P_i = estimates_list[i]
x_fused, P_fused = self.fuse_two_estimates(x_fused, P_fused, x_i, P_i)
return x_fused, P_fused
def ci_fusion_example():
"""
Covariance intersection fusion example
"""
# Create CI fuser
ci = CovarianceIntersection()
# Simulated scenario: Three different financial models estimate same asset
np.random.seed(42)
T = 100
# True state: [price, volatility]
true_states = []
true_state = np.array([100, 0.2])
# Three model estimates
model1_estimates = [] # Technical analysis model
model2_estimates = [] # Fundamental analysis model
model3_estimates = [] # Sentiment analysis model
ci_estimates = []
for t in range(T):
# True state evolution
true_state[0] *= (1 + np.random.normal(0.001, true_state[1] / np.sqrt(252)))
true_state[1] = 0.8 * true_state[1] + 0.2 * 0.2 + np.random.normal(0, 0.01)
true_state[1] = max(true_state[1], 0.05) # Minimum volatility
true_states.append(true_state.copy())
# Model 1: Technical analysis (sensitive to price, insensitive to volatility)
x1 = true_state + np.random.multivariate_normal([0, 0], [[1, 0], [0, 0.01]])
P1 = np.array([[1, 0.1], [0.1, 0.01]])
# Model 2: Fundamental analysis (sensitive to long-term trends)
x2 = true_state + np.random.multivariate_normal([0, 0], [[4, 0], [0, 0.005]])
P2 = np.array([[4, 0], [0, 0.005]])
# Model 3: Sentiment analysis (sensitive to volatility)
x3 = true_state + np.random.multivariate_normal([0, 0], [[2, 0.5], [0.5, 0.02]])
P3 = np.array([[2, 0.5], [0.5, 0.02]])
model1_estimates.append(x1)
model2_estimates.append(x2)
model3_estimates.append(x3)
# CI fusion
estimates_list = [(x1.reshape(-1, 1), P1), (x2.reshape(-1, 1), P2), (x3.reshape(-1, 1), P3)]
x_ci, P_ci = ci.fuse_multiple_estimates(estimates_list)
ci_estimates.append(x_ci.flatten())
# Convert to arrays
true_states = np.array(true_states)
model1_estimates = np.array(model1_estimates)
model2_estimates = np.array(model2_estimates)
model3_estimates = np.array(model3_estimates)
ci_estimates = np.array(ci_estimates)
# Plot results
fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(15, 12))
# Price estimation
ax1.plot(true_states[:, 0], 'g-', linewidth=3, label='True Price')
ax1.plot(model1_estimates[:, 0], 'r:', linewidth=1, alpha=0.7, label='Technical Analysis')
ax1.plot(model2_estimates[:, 0], 'b:', linewidth=1, alpha=0.7, label='Fundamental Analysis')
ax1.plot(model3_estimates[:, 0], 'm:', linewidth=1, alpha=0.7, label='Sentiment Analysis')
ax1.plot(ci_estimates[:, 0], 'k-', linewidth=2, label='CI Fusion')
ax1.set_ylabel('Price')
ax1.set_title('Price Estimation Comparison')
ax1.legend()
ax1.grid(True, alpha=0.3)
# Volatility estimation
ax2.plot(true_states[:, 1], 'g-', linewidth=3, label='True Volatility')
ax2.plot(model1_estimates[:, 1], 'r:', linewidth=1, alpha=0.7, label='Technical Analysis')
ax2.plot(model2_estimates[:, 1], 'b:', linewidth=1, alpha=0.7, label='Fundamental Analysis')
ax2.plot(model3_estimates[:, 1], 'm:', linewidth=1, alpha=0.7, label='Sentiment Analysis')
ax2.plot(ci_estimates[:, 1], 'k-', linewidth=2, label='CI Fusion')
ax2.set_ylabel('Volatility')
ax2.set_title('Volatility Estimation Comparison')
ax2.legend()
ax2.grid(True, alpha=0.3)
# Price error
ax3.plot(np.abs(true_states[:, 0] - model1_estimates[:, 0]), 'r:', label='Technical Analysis Error')
ax3.plot(np.abs(true_states[:, 0] - model2_estimates[:, 0]), 'b:', label='Fundamental Error')
ax3.plot(np.abs(true_states[:, 0] - model3_estimates[:, 0]), 'm:', label='Sentiment Analysis Error')
ax3.plot(np.abs(true_states[:, 0] - ci_estimates[:, 0]), 'k-', linewidth=2, label='CI Fusion Error')
ax3.set_ylabel('Price Absolute Error')
ax3.set_title('Price Estimation Error')
ax3.legend()
ax3.grid(True, alpha=0.3)
# Volatility error
ax4.plot(np.abs(true_states[:, 1] - model1_estimates[:, 1]), 'r:', label='Technical Analysis Error')
ax4.plot(np.abs(true_states[:, 1] - model2_estimates[:, 1]), 'b:', label='Fundamental Error')
ax4.plot(np.abs(true_states[:, 1] - model3_estimates[:, 1]), 'm:', label='Sentiment Analysis Error')
ax4.plot(np.abs(true_states[:, 1] - ci_estimates[:, 1]), 'k-', linewidth=2, label='CI Fusion Error')
ax4.set_xlabel('Time')
ax4.set_ylabel('Volatility Absolute Error')
ax4.set_title('Volatility Estimation Error')
ax4.legend()
ax4.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
# Performance statistics
models = {
'Technical Analysis': model1_estimates,
'Fundamental Analysis': model2_estimates,
'Sentiment Analysis': model3_estimates,
'CI Fusion': ci_estimates
}
print("Performance Comparison (RMSE):")
print("-" * 40)
for name, estimates in models.items():
price_rmse = np.sqrt(np.mean((true_states[:, 0] - estimates[:, 0])**2))
vol_rmse = np.sqrt(np.mean((true_states[:, 1] - estimates[:, 1])**2))
print(f"{name:12} - Price: {price_rmse:.3f}, Volatility: {vol_rmse:.4f}")
# Run example
# ci_fusion_example()
Asynchronous Data Processing
Time Synchronization and Interpolation
class AsynchronousDataFusion:
"""
Asynchronous data fusion processor
"""
def __init__(self, dim_x, buffer_size=100):
self.dim_x = dim_x
self.buffer_size = buffer_size
# Data buffer
self.data_buffer = {} # sensor_id -> [(timestamp, observation, H, R), ...]
# State history
self.state_history = [] # [(timestamp, x, P), ...]
# Current state
self.current_time = 0
self.x = np.zeros((dim_x, 1))
self.P = np.eye(dim_x)
# System parameters
self.F = np.eye(dim_x)
self.Q = np.eye(dim_x)
def add_observation(self, sensor_id, timestamp, observation, H, R):
"""
Add asynchronous observation
Parameters:
sensor_id: Sensor ID
timestamp: Timestamp
observation: Observation value
H: Observation matrix
R: Observation noise covariance
"""
if sensor_id not in self.data_buffer:
self.data_buffer[sensor_id] = []
self.data_buffer[sensor_id].append((timestamp, observation, H, R))
# Maintain buffer size
if len(self.data_buffer[sensor_id]) > self.buffer_size:
self.data_buffer[sensor_id].pop(0)
# Process newly arrived data
self._process_pending_observations()
def _process_pending_observations(self):
"""Process pending observations"""
# Collect all pending observations
all_observations = []
for sensor_id, buffer in self.data_buffer.items():
for obs_data in buffer:
timestamp, observation, H, R = obs_data
if timestamp > self.current_time:
all_observations.append((timestamp, sensor_id, observation, H, R))
if not all_observations:
return
# Sort by timestamp
all_observations.sort(key=lambda x: x[0])
# Process one by one
for timestamp, sensor_id, observation, H, R in all_observations:
self._process_single_observation(timestamp, observation, H, R)
# Remove processed observations
self.data_buffer[sensor_id] = [
obs for obs in self.data_buffer[sensor_id]
if obs[0] != timestamp
]
def _process_single_observation(self, timestamp, observation, H, R):
"""Process single observation"""
# Predict to observation time
dt = timestamp - self.current_time
if dt > 0:
self._predict_to_time(dt)
# Update
observation = np.array(observation).reshape(-1, 1)
y = observation - H @ self.x
S = H @ self.P @ H.T + R
K = self.P @ H.T @ np.linalg.inv(S)
self.x = self.x + K @ y
self.P = (np.eye(self.dim_x) - K @ H) @ self.P
# Update time
self.current_time = timestamp
# Save state history
self.state_history.append((timestamp, self.x.copy(), self.P.copy()))
def _predict_to_time(self, dt):
"""Predict to specified time"""
# Discretize system matrices
F_dt = np.linalg.matrix_power(self.F, int(dt)) if dt == int(dt) else self.F
Q_dt = self.Q * dt
self.x = F_dt @ self.x
self.P = F_dt @ self.P @ F_dt.T + Q_dt
def get_state_at_time(self, query_time):
"""Get state estimate at specified time (interpolation)"""
if not self.state_history:
return self.x, self.P
# Find nearest historical states
before_states = [(t, x, P) for t, x, P in self.state_history if t <= query_time]
after_states = [(t, x, P) for t, x, P in self.state_history if t > query_time]
if not before_states:
# Extrapolate to past
t_after, x_after, P_after = after_states[0]
dt = t_after - query_time
F_inv = np.linalg.inv(self.F)
x_interp = F_inv @ x_after
P_interp = F_inv @ P_after @ F_inv.T
return x_interp, P_interp
elif not after_states:
# Extrapolate to future
t_before, x_before, P_before = before_states[-1]
dt = query_time - t_before
F_dt = self.F # Simplified
Q_dt = self.Q * dt
x_interp = F_dt @ x_before
P_interp = F_dt @ P_before @ F_dt.T + Q_dt
return x_interp, P_interp
else:
# Interpolate
t_before, x_before, P_before = before_states[-1]
t_after, x_after, P_after = after_states[0]
# Simple linear interpolation
alpha = (query_time - t_before) / (t_after - t_before)
x_interp = (1 - alpha) * x_before + alpha * x_after
P_interp = (1 - alpha) * P_before + alpha * P_after
return x_interp, P_interp
def asynchronous_fusion_example():
"""
Asynchronous fusion example
"""
# Create asynchronous fuser
adf = AsynchronousDataFusion(dim_x=2)
# Set system parameters
adf.F = np.array([[1, 1], [0, 0.95]])
adf.Q = np.array([[0.01, 0], [0, 0.01]])
# Initialize
adf.x = np.array([[0], [1]])
adf.P = np.array([[1, 0], [0, 1]])
# Simulate asynchronous data streams
np.random.seed(42)
# Sensor configuration
sensors = {
'sensor_A': {'H': np.array([[1, 0]]), 'R': np.array([[0.1]]), 'rate': 1.0},
'sensor_B': {'H': np.array([[0, 1]]), 'R': np.array([[0.2]]), 'rate': 0.5},
'sensor_C': {'H': np.array([[1, 1]]), 'R': np.array([[0.15]]), 'rate': 0.3}
}
# Generate true state sequence
true_states = []
timestamps = []
true_state = np.array([0, 1])
T = 100
for t in range(T):
true_state = adf.F @ true_state + np.random.multivariate_normal([0, 0], adf.Q)
true_states.append(true_state.copy())
timestamps.append(t)
# Generate asynchronous observations
all_observations = []
for sensor_id, config in sensors.items():
H, R, rate = config['H'], config['R'], config['rate']
# Generate sensor timestamps
sensor_times = []
t = 0
while t < T:
t += np.random.exponential(1.0 / rate) # Poisson process
if t < T:
sensor_times.append(t)
# Generate observations for each time point
for obs_time in sensor_times:
# Interpolate to get true state at that time
if obs_time <= T - 1:
idx = int(obs_time)
alpha = obs_time - idx
if idx + 1 < len(true_states):
true_state_interp = (1 - alpha) * true_states[idx] + alpha * true_states[idx + 1]
else:
true_state_interp = true_states[idx]
# Generate observation
true_obs = H @ true_state_interp
noisy_obs = true_obs + np.random.multivariate_normal(np.zeros(R.shape[0]), R)
all_observations.append((obs_time, sensor_id, noisy_obs, H, R))
# Sort by time and add to fuser
all_observations.sort(key=lambda x: x[0])
for obs_time, sensor_id, observation, H, R in all_observations:
adf.add_observation(sensor_id, obs_time, observation, H, R)
# Get estimates at synchronized time points
synchronized_estimates = []
for t in range(T):
x_est, P_est = adf.get_state_at_time(t)
synchronized_estimates.append(x_est.flatten())
synchronized_estimates = np.array(synchronized_estimates)
# Plot results
fig, (ax1, ax2, ax3) = plt.subplots(3, 1, figsize=(12, 12))
# State estimation
ax1.plot(timestamps, np.array(true_states)[:, 0], 'g-', linewidth=3, label='True Position')
ax1.plot(timestamps, synchronized_estimates[:, 0], 'b-', linewidth=2, label='Async Fusion Estimate')
# Mark observation times
colors = {'sensor_A': 'red', 'sensor_B': 'orange', 'sensor_C': 'purple'}
for obs_time, sensor_id, observation, H, R in all_observations[:50]: # Only show first 50
if H[0, 0] > 0: # If observation contains position information
ax1.scatter(obs_time, observation[0] / H[0, 0], c=colors[sensor_id],
s=20, alpha=0.6)
ax1.set_ylabel('Position')
ax1.set_title('Asynchronous Multi-Sensor Fusion: Position Estimation')
ax1.legend()
ax1.grid(True, alpha=0.3)
# Velocity estimation
ax2.plot(timestamps, np.array(true_states)[:, 1], 'g-', linewidth=3, label='True Velocity')
ax2.plot(timestamps, synchronized_estimates[:, 1], 'b-', linewidth=2, label='Async Fusion Estimate')
ax2.set_ylabel('Velocity')
ax2.set_title('Velocity Estimation')
ax2.legend()
ax2.grid(True, alpha=0.3)
# Observation time distribution
sensor_obs_times = {sensor: [] for sensor in sensors.keys()}
for obs_time, sensor_id, _, _, _ in all_observations:
sensor_obs_times[sensor_id].append(obs_time)
for i, (sensor_id, obs_times) in enumerate(sensor_obs_times.items()):
ax3.scatter(obs_times, [i] * len(obs_times), c=colors[sensor_id],
label=f'{sensor_id} ({len(obs_times)} obs)', alpha=0.7)
ax3.set_xlabel('Time')
ax3.set_ylabel('Sensor')
ax3.set_title('Asynchronous Observation Time Distribution')
ax3.legend()
ax3.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
# Performance evaluation
pos_rmse = np.sqrt(np.mean((np.array(true_states)[:, 0] - synchronized_estimates[:, 0])**2))
vel_rmse = np.sqrt(np.mean((np.array(true_states)[:, 1] - synchronized_estimates[:, 1])**2))
print(f"Asynchronous Fusion Performance:")
print(f"Position RMSE: {pos_rmse:.4f}")
print(f"Velocity RMSE: {vel_rmse:.4f}")
# Sensor statistics
total_observations = len(all_observations)
print(f"\nSensor Observation Statistics:")
for sensor_id in sensors.keys():
count = sum(1 for obs in all_observations if obs[1] == sensor_id)
print(f"{sensor_id}: {count} observations ({count/total_observations:.1%})")
# Run example
# asynchronous_fusion_example()
Next Chapter Preview
In the next chapter, we will learn about applications of Kalman filtering in portfolio management, including dynamic portfolio optimization, risk factor estimation, and asset allocation strategies.