学习目标
- 综合运用马尔科夫模型解决复杂金融问题
- 掌握大型金融系统的设计与实现
- 学习项目管理与团队协作
- 理解模型部署与运维管理
- 掌握性能优化与扩展策略
15.1 智能投资组合管理系统
15.1.1 项目概述
构建一个基于马尔科夫模型的智能投资组合管理系统,集成多种建模方法。
15.1.2 系统架构设计
import numpy as np
import pandas as pd
from abc import ABC, abstractmethod
from typing import Dict, List, Tuple, Optional
import asyncio
import logging
from dataclasses import dataclass
from datetime import datetime, timedelta
@dataclass
class MarketState:
state_id: int
regime: str
volatility: float
correlation_matrix: np.ndarray
expected_returns: np.ndarray
confidence: float
@dataclass
class Portfolio:
weights: np.ndarray
assets: List[str]
expected_return: float
expected_risk: float
sharpe_ratio: float
timestamp: datetime
class DataProvider(ABC):
"""数据提供者抽象基类"""
@abstractmethod
async def get_prices(self, symbols: List[str],
start_date: datetime,
end_date: datetime) -> pd.DataFrame:
pass
@abstractmethod
async def get_market_data(self, symbols: List[str]) -> Dict:
pass
class MarketRegimeDetector:
"""市场状态检测器"""
def __init__(self, n_states: int = 3):
self.n_states = n_states
self.transition_matrix = None
self.state_probabilities = None
def fit(self, returns: np.ndarray) -> None:
"""训练隐马尔科夫模型"""
from hmmlearn import hmm
model = hmm.GaussianHMM(
n_components=self.n_states,
covariance_type="full"
)
model.fit(returns.reshape(-1, 1))
self.transition_matrix = model.transmat_
self.state_probabilities = model.predict_proba(returns.reshape(-1, 1))
def predict_state(self, recent_returns: np.ndarray) -> MarketState:
"""预测当前市场状态"""
if self.state_probabilities is None:
raise ValueError("模型未训练")
# 计算当前状态概率
current_state = np.argmax(self.state_probabilities[-1])
# 计算状态特征
regime_names = ["熊市", "震荡", "牛市"]
volatility = np.std(recent_returns) * np.sqrt(252)
return MarketState(
state_id=current_state,
regime=regime_names[current_state],
volatility=volatility,
correlation_matrix=np.corrcoef(recent_returns.reshape(1, -1)),
expected_returns=np.mean(recent_returns),
confidence=np.max(self.state_probabilities[-1])
)
class RiskManager:
"""风险管理模块"""
def __init__(self, max_portfolio_risk: float = 0.15):
self.max_portfolio_risk = max_portfolio_risk
self.var_confidence = 0.05
def calculate_var(self, portfolio_returns: np.ndarray) -> float:
"""计算投资组合VaR"""
return np.percentile(portfolio_returns, self.var_confidence * 100)
def calculate_expected_shortfall(self, portfolio_returns: np.ndarray) -> float:
"""计算期望损失ES"""
var = self.calculate_var(portfolio_returns)
return np.mean(portfolio_returns[portfolio_returns <= var])
def stress_test(self, weights: np.ndarray,
stress_scenarios: Dict[str, np.ndarray]) -> Dict[str, float]:
"""压力测试"""
results = {}
for scenario_name, scenario_returns in stress_scenarios.items():
portfolio_return = np.dot(weights, scenario_returns)
results[scenario_name] = portfolio_return
return results
class PortfolioOptimizer:
"""投资组合优化器"""
def __init__(self, risk_aversion: float = 5.0):
self.risk_aversion = risk_aversion
def mean_variance_optimization(self,
expected_returns: np.ndarray,
cov_matrix: np.ndarray,
constraints: Dict = None) -> np.ndarray:
"""均值-方差优化"""
from scipy.optimize import minimize
n_assets = len(expected_returns)
# 目标函数:最大化效用 U = μ'w - (λ/2)w'Σw
def objective(weights):
portfolio_return = np.dot(weights, expected_returns)
portfolio_risk = np.dot(weights.T, np.dot(cov_matrix, weights))
return -(portfolio_return - 0.5 * self.risk_aversion * portfolio_risk)
# 约束条件
constraints_list = [
{'type': 'eq', 'fun': lambda w: np.sum(w) - 1} # 权重和为1
]
# 边界条件
bounds = tuple((0, 1) for _ in range(n_assets)) # 不允许卖空
# 初始权重
initial_weights = np.ones(n_assets) / n_assets
# 优化
result = minimize(
objective,
initial_weights,
method='SLSQP',
bounds=bounds,
constraints=constraints_list
)
return result.x
class IntelligentPortfolioManager:
"""智能投资组合管理系统主类"""
def __init__(self,
data_provider: DataProvider,
assets: List[str],
initial_capital: float = 1000000):
self.data_provider = data_provider
self.assets = assets
self.initial_capital = initial_capital
self.current_capital = initial_capital
# 组件初始化
self.regime_detector = MarketRegimeDetector()
self.risk_manager = RiskManager()
self.optimizer = PortfolioOptimizer()
# 历史数据
self.price_history = pd.DataFrame()
self.portfolio_history = []
self.performance_metrics = {}
# 日志
self.logger = logging.getLogger(__name__)
async def initialize(self, lookback_days: int = 252):
"""系统初始化"""
self.logger.info("正在初始化智能投资组合管理系统...")
# 获取历史数据
end_date = datetime.now()
start_date = end_date - timedelta(days=lookback_days)
self.price_history = await self.data_provider.get_prices(
self.assets, start_date, end_date
)
# 计算收益率
returns = self.price_history.pct_change().dropna()
# 训练市场状态检测模型
market_returns = returns.mean(axis=1).values
self.regime_detector.fit(market_returns)
self.logger.info("系统初始化完成")
async def rebalance_portfolio(self) -> Portfolio:
"""投资组合再平衡"""
self.logger.info("开始投资组合再平衡...")
# 获取最新市场数据
latest_data = await self.data_provider.get_market_data(self.assets)
# 计算收益率
returns = self.price_history.pct_change().dropna()
recent_returns = returns.tail(30) # 最近30天数据
# 检测市场状态
market_state = self.regime_detector.predict_state(
recent_returns.mean(axis=1).values
)
self.logger.info(f"检测到市场状态: {market_state.regime}")
# 根据市场状态调整预期收益
expected_returns = self._adjust_returns_for_regime(
recent_returns.mean().values, market_state
)
# 计算协方差矩阵
cov_matrix = recent_returns.cov().values * 252 # 年化
# 投资组合优化
optimal_weights = self.optimizer.mean_variance_optimization(
expected_returns, cov_matrix
)
# 计算组合指标
portfolio_return = np.dot(optimal_weights, expected_returns)
portfolio_risk = np.sqrt(np.dot(optimal_weights.T,
np.dot(cov_matrix, optimal_weights)))
sharpe_ratio = portfolio_return / portfolio_risk if portfolio_risk > 0 else 0
# 风险检查
if portfolio_risk > self.risk_manager.max_portfolio_risk:
self.logger.warning(f"组合风险过高: {portfolio_risk:.4f}")
# 调整风险厌恶系数
self.optimizer.risk_aversion *= 1.5
optimal_weights = self.optimizer.mean_variance_optimization(
expected_returns, cov_matrix
)
# 创建新的投资组合
new_portfolio = Portfolio(
weights=optimal_weights,
assets=self.assets,
expected_return=portfolio_return,
expected_risk=portfolio_risk,
sharpe_ratio=sharpe_ratio,
timestamp=datetime.now()
)
self.portfolio_history.append(new_portfolio)
self.logger.info(f"投资组合再平衡完成,夏普比率: {sharpe_ratio:.4f}")
return new_portfolio
def _adjust_returns_for_regime(self,
base_returns: np.ndarray,
market_state: MarketState) -> np.ndarray:
"""根据市场状态调整预期收益"""
# 不同市场状态的调整因子
regime_adjustments = {
"熊市": 0.7,
"震荡": 1.0,
"牛市": 1.3
}
adjustment = regime_adjustments.get(market_state.regime, 1.0)
adjusted_returns = base_returns * adjustment
# 考虑状态置信度
confidence_weight = market_state.confidence
final_returns = (confidence_weight * adjusted_returns +
(1 - confidence_weight) * base_returns)
return final_returns * 252 # 年化
async def monitor_and_alert(self, current_portfolio: Portfolio):
"""监控和预警"""
# 获取当前价格
current_data = await self.data_provider.get_market_data(self.assets)
current_prices = np.array([current_data[asset]['price'] for asset in self.assets])
# 计算当前组合价值
portfolio_value = np.dot(current_portfolio.weights * self.current_capital,
current_prices)
# 风险监控
recent_returns = self.price_history.pct_change().tail(30)
portfolio_returns = np.dot(recent_returns.values, current_portfolio.weights)
var = self.risk_manager.calculate_var(portfolio_returns)
es = self.risk_manager.calculate_expected_shortfall(portfolio_returns)
# 预警条件
if var < -0.05: # VaR超过5%
self.logger.warning(f"VaR预警: {var:.4f}")
if es < -0.08: # ES超过8%
self.logger.warning(f"期望损失预警: {es:.4f}")
# 记录监控指标
monitoring_data = {
'timestamp': datetime.now(),
'portfolio_value': portfolio_value,
'var': var,
'expected_shortfall': es,
'market_regime': self.regime_detector.predict_state(
recent_returns.mean(axis=1).values
).regime
}
return monitoring_data
def generate_performance_report(self) -> Dict:
"""生成绩效报告"""
if not self.portfolio_history:
return {}
# 计算绩效指标
portfolio_returns = []
for i in range(1, len(self.portfolio_history)):
prev_portfolio = self.portfolio_history[i-1]
curr_portfolio = self.portfolio_history[i]
# 简化的收益计算
ret = curr_portfolio.expected_return - prev_portfolio.expected_return
portfolio_returns.append(ret)
portfolio_returns = np.array(portfolio_returns)
report = {
'total_return': np.sum(portfolio_returns),
'annualized_return': np.mean(portfolio_returns) * 252,
'volatility': np.std(portfolio_returns) * np.sqrt(252),
'sharpe_ratio': np.mean(portfolio_returns) / np.std(portfolio_returns) * np.sqrt(252),
'max_drawdown': self._calculate_max_drawdown(portfolio_returns),
'num_rebalances': len(self.portfolio_history),
'current_portfolio': self.portfolio_history[-1] if self.portfolio_history else None
}
return report
def _calculate_max_drawdown(self, returns: np.ndarray) -> float:
"""计算最大回撤"""
cumulative_returns = np.cumprod(1 + returns)
running_max = np.maximum.accumulate(cumulative_returns)
drawdown = (cumulative_returns - running_max) / running_max
return np.min(drawdown)
# 使用示例
async def main():
# 模拟数据提供者
class MockDataProvider(DataProvider):
async def get_prices(self, symbols, start_date, end_date):
# 模拟价格数据
dates = pd.date_range(start_date, end_date, freq='D')
np.random.seed(42)
data = {}
for symbol in symbols:
prices = 100 * np.cumprod(1 + np.random.normal(0.0005, 0.02, len(dates)))
data[symbol] = prices
return pd.DataFrame(data, index=dates)
async def get_market_data(self, symbols):
# 模拟实时数据
return {symbol: {'price': 100 + np.random.normal(0, 5)} for symbol in symbols}
# 初始化系统
assets = ['AAPL', 'GOOGL', 'MSFT', 'TSLA', 'NVDA']
data_provider = MockDataProvider()
portfolio_manager = IntelligentPortfolioManager(
data_provider=data_provider,
assets=assets,
initial_capital=1000000
)
# 系统初始化
await portfolio_manager.initialize()
# 执行再平衡
new_portfolio = await portfolio_manager.rebalance_portfolio()
print(f"新投资组合权重: {dict(zip(assets, new_portfolio.weights))}")
print(f"预期收益率: {new_portfolio.expected_return:.4f}")
print(f"预期风险: {new_portfolio.expected_risk:.4f}")
print(f"夏普比率: {new_portfolio.sharpe_ratio:.4f}")
# 监控
monitoring_data = await portfolio_manager.monitor_and_alert(new_portfolio)
print(f"监控数据: {monitoring_data}")
# 生成报告
report = portfolio_manager.generate_performance_report()
print(f"绩效报告: {report}")
# 运行示例
# asyncio.run(main())
10/2/25About 25 min
