学习目标
- 掌握卡尔曼滤波在高频交易中的实时应用
- 学习市场微观结构建模方法
- 理解订单簿动态与价格发现机制
- 掌握延迟敏感的算法交易策略
- 实现高性能的实时数据处理系统
1. 高频数据的实时处理
1.1 实时价格滤波与降噪
高频交易中,价格数据包含大量噪声,需要实时滤波处理:
import numpy as np
import pandas as pd
from collections import deque
import time
from threading import Thread, Lock
import matplotlib.pyplot as plt
from filterpy.kalman import KalmanFilter
import warnings
warnings.filterwarnings('ignore')
class RealTimePriceFilter:
"""实时价格滤波器"""
def __init__(self, alpha=0.1, beta=0.01):
self.alpha = alpha # 价格平滑参数
self.beta = beta # 趋势平滑参数
# 卡尔曼滤波器设置
self.kf = KalmanFilter(dim_x=3, dim_z=1)
# 状态向量: [price, velocity, acceleration]
dt = 0.001 # 1毫秒
# 状态转移矩阵
self.kf.F = np.array([[1., dt, 0.5*dt**2],
[0., 1., dt],
[0., 0., 1.]])
# 观测矩阵
self.kf.H = np.array([[1., 0., 0.]])
# 过程噪声协方差
q = 0.01
self.kf.Q = np.array([[q*dt**4/4, q*dt**3/2, q*dt**2/2],
[q*dt**3/2, q*dt**2, q*dt],
[q*dt**2/2, q*dt, q]])
# 观测噪声
self.kf.R = np.array([[0.001]])
# 初始状态
self.kf.x = np.array([[100.], [0.], [0.]])
self.kf.P = np.eye(3) * 0.1
# 数据存储
self.raw_prices = deque(maxlen=1000)
self.filtered_prices = deque(maxlen=1000)
self.timestamps = deque(maxlen=1000)
# 线程安全
self.lock = Lock()
def update_price(self, price, timestamp=None):
"""更新价格数据"""
if timestamp is None:
timestamp = time.time()
with self.lock:
# 预测步骤
self.kf.predict()
# 更新步骤
self.kf.update([price])
# 存储数据
self.raw_prices.append(price)
self.filtered_prices.append(self.kf.x[0, 0])
self.timestamps.append(timestamp)
return {
'filtered_price': self.kf.x[0, 0],
'velocity': self.kf.x[1, 0],
'acceleration': self.kf.x[2, 0],
'raw_price': price
}
def get_current_estimate(self):
"""获取当前估计"""
with self.lock:
return {
'price': self.kf.x[0, 0],
'velocity': self.kf.x[1, 0],
'acceleration': self.kf.x[2, 0]
}
def get_price_trend(self, lookback=50):
"""获取价格趋势"""
with self.lock:
if len(self.filtered_prices) < lookback:
return 0
recent_prices = list(self.filtered_prices)[-lookback:]
return (recent_prices[-1] - recent_prices[0]) / len(recent_prices)
# 实时数据流模拟器
class HighFrequencyDataSimulator:
"""高频数据模拟器"""
def __init__(self, initial_price=100, volatility=0.02):
self.current_price = initial_price
self.volatility = volatility
self.is_running = False
self.subscribers = []
def add_subscriber(self, callback):
"""添加数据订阅者"""
self.subscribers.append(callback)
def start_simulation(self, duration=60, tick_interval=0.001):
"""开始数据模拟"""
self.is_running = True
def simulate():
start_time = time.time()
tick_count = 0
while self.is_running and (time.time() - start_time) < duration:
# 生成价格变化
dt = tick_interval
price_change = np.random.normal(0, self.volatility * np.sqrt(dt))
# 添加微观结构噪声
microstructure_noise = np.random.normal(0, 0.001)
# 添加跳跃
if np.random.random() < 0.001: # 0.1%概率的跳跃
jump = np.random.normal(0, 0.01)
price_change += jump
self.current_price += price_change
noisy_price = self.current_price + microstructure_noise
# 通知订阅者
timestamp = time.time()
for callback in self.subscribers:
callback(noisy_price, timestamp)
tick_count += 1
time.sleep(tick_interval)
# 在新线程中运行模拟
self.simulation_thread = Thread(target=simulate)
self.simulation_thread.start()
def stop_simulation(self):
"""停止模拟"""
self.is_running = False
if hasattr(self, 'simulation_thread'):
self.simulation_thread.join()
# 示例:实时价格滤波
def demonstrate_realtime_filtering():
print("开始实时价格滤波演示...")
# 创建价格滤波器
price_filter = RealTimePriceFilter()
# 创建数据模拟器
data_simulator = HighFrequencyDataSimulator(initial_price=100, volatility=0.02)
# 数据收集
results = []
def price_callback(price, timestamp):
result = price_filter.update_price(price, timestamp)
results.append({
'timestamp': timestamp,
'raw_price': price,
'filtered_price': result['filtered_price'],
'velocity': result['velocity'],
'acceleration': result['acceleration']
})
# 添加回调函数
data_simulator.add_subscriber(price_callback)
# 运行模拟
data_simulator.start_simulation(duration=10, tick_interval=0.001)
# 等待模拟完成
time.sleep(11)
data_simulator.stop_simulation()
print(f"收集到 {len(results)} 个数据点")
# 转换为DataFrame进行分析
df = pd.DataFrame(results)
df['timestamp'] = pd.to_datetime(df['timestamp'], unit='s')
# 绘图
fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(15, 10))
# 原始价格 vs 滤波价格
ax1.plot(df.index, df['raw_price'], alpha=0.3, label='原始价格')
ax1.plot(df.index, df['filtered_price'], label='滤波价格', linewidth=2)
ax1.set_title('价格滤波效果')
ax1.set_ylabel('价格')
ax1.legend()
# 价格速度
ax2.plot(df.index, df['velocity'])
ax2.set_title('价格变化速度')
ax2.set_ylabel('速度')
ax2.axhline(y=0, color='r', linestyle='--', alpha=0.5)
# 价格加速度
ax3.plot(df.index, df['acceleration'])
ax3.set_title('价格加速度')
ax3.set_ylabel('加速度')
ax3.axhline(y=0, color='r', linestyle='--', alpha=0.5)
# 滤波效果统计
noise_reduction = np.std(df['raw_price']) / np.std(df['filtered_price'])
correlation = np.corrcoef(df['raw_price'], df['filtered_price'])[0, 1]
ax4.text(0.1, 0.8, f'噪声降低倍数: {noise_reduction:.2f}', transform=ax4.transAxes)
ax4.text(0.1, 0.7, f'相关系数: {correlation:.4f}', transform=ax4.transAxes)
ax4.text(0.1, 0.6, f'数据点数: {len(df)}', transform=ax4.transAxes)
ax4.text(0.1, 0.5, f'处理频率: {len(df)/10:.0f} Hz', transform=ax4.transAxes)
ax4.set_title('滤波性能统计')
ax4.set_xlim(0, 1)
ax4.set_ylim(0, 1)
plt.tight_layout()
plt.show()
return df
# 运行演示
filtered_data = demonstrate_realtime_filtering()
12/19/24About 17 min
