Chapter 15: Real-World Project Cases and Comprehensive Applications

作者
76min

Chapter 15: Real-World Project Cases and Comprehensive Applications

Learning Objectives

  1. Comprehensively apply Markov models to solve complex financial problems
  2. Master the design and implementation of large-scale financial systems
  3. Learn project management and team collaboration
  4. Understand model deployment and operations management
  5. Master performance optimization and scaling strategies

15.1 Intelligent Portfolio Management System

15.1.1 Project Overview

Build an intelligent portfolio management system based on Markov models, integrating multiple modeling methods.

🔄 正在渲染 Mermaid 图表...

15.1.2 System Architecture Design

import numpy as np
import pandas as pd
from abc import ABC, abstractmethod
from typing import Dict, List, Tuple, Optional
import asyncio
import logging
from dataclasses import dataclass
from datetime import datetime, timedelta

@dataclass
class MarketState:
    state_id: int
    regime: str
    volatility: float
    correlation_matrix: np.ndarray
    expected_returns: np.ndarray
    confidence: float

@dataclass
class Portfolio:
    weights: np.ndarray
    assets: List[str]
    expected_return: float
    expected_risk: float
    sharpe_ratio: float
    timestamp: datetime

class DataProvider(ABC):
    """Abstract base class for data providers"""

    @abstractmethod
    async def get_prices(self, symbols: List[str],
                        start_date: datetime,
                        end_date: datetime) -> pd.DataFrame:
        pass

    @abstractmethod
    async def get_market_data(self, symbols: List[str]) -> Dict:
        pass

class MarketRegimeDetector:
    """Market state detector"""

    def __init__(self, n_states: int = 3):
        self.n_states = n_states
        self.transition_matrix = None
        self.state_probabilities = None

    def fit(self, returns: np.ndarray) -> None:
        """Trains a Hidden Markov Model"""
        from hmmlearn import hmm

        model = hmm.GaussianHMM(
            n_components=self.n_states,
            covariance_type="full"
        )

        model.fit(returns.reshape(-1, 1))
        self.transition_matrix = model.transmat_
        self.state_probabilities = model.predict_proba(returns.reshape(-1, 1))

    def predict_state(self, recent_returns: np.ndarray) -> MarketState:
        """Predicts the current market state"""
        if self.state_probabilities is None:
            raise ValueError("Model not trained")

        # Calculate current state probability
        current_state = np.argmax(self.state_probabilities[-1])

        # Calculate state features
        regime_names = ["Bear", "Sideways", "Bull"]
        volatility = np.std(recent_returns) * np.sqrt(252)

        return MarketState(
            state_id=current_state,
            regime=regime_names[current_state],
            volatility=volatility,
            correlation_matrix=np.corrcoef(recent_returns.reshape(1, -1)),
            expected_returns=np.mean(recent_returns),
            confidence=np.max(self.state_probabilities[-1])
        )

class RiskManager:
    """Risk management module"""

    def __init__(self, max_portfolio_risk: float = 0.15):
        self.max_portfolio_risk = max_portfolio_risk
        self.var_confidence = 0.05

    def calculate_var(self, portfolio_returns: np.ndarray) -> float:
        """Calculates portfolio VaR"""
        return np.percentile(portfolio_returns, self.var_confidence * 100)

    def calculate_expected_shortfall(self, portfolio_returns: np.ndarray) -> float:
        """Calculates Expected Shortfall (ES)"""
        var = self.calculate_var(portfolio_returns)
        return np.mean(portfolio_returns[portfolio_returns <= var])

    def stress_test(self, weights: np.ndarray,
                   stress_scenarios: Dict[str, np.ndarray]) -> Dict[str, float]:
        """Performs stress testing"""
        results = {}

        for scenario_name, scenario_returns in stress_scenarios.items():
            portfolio_return = np.dot(weights, scenario_returns)
            results[scenario_name] = portfolio_return

        return results

class PortfolioOptimizer:
    """Portfolio optimizer"""

    def __init__(self, risk_aversion: float = 5.0):
        self.risk_aversion = risk_aversion

    def mean_variance_optimization(self,
                                 expected_returns: np.ndarray,
                                 cov_matrix: np.ndarray,
                                 constraints: Dict = None) -> np.ndarray:
        """Mean-variance optimization"""
        from scipy.optimize import minimize

        n_assets = len(expected_returns)

        # Objective function: maximize utility U = μ'w - (λ/2)w'Σw
        def objective(weights):
            portfolio_return = np.dot(weights, expected_returns)
            portfolio_risk = np.dot(weights.T, np.dot(cov_matrix, weights))
            return -(portfolio_return - 0.5 * self.risk_aversion * portfolio_risk)

        # Constraints
        constraints_list = [
            {'type': 'eq', 'fun': lambda w: np.sum(w) - 1}  # Weights sum to 1
        ]

        # Bounds
        bounds = tuple((0, 1) for _ in range(n_assets))  # No short-selling

        # Initial weights
        initial_weights = np.ones(n_assets) / n_assets

        # Optimization
        result = minimize(
            objective,
            initial_weights,
            method='SLSQP',
            bounds=bounds,
            constraints=constraints_list
        )

        return result.x

class IntelligentPortfolioManager:
    """Main class for the intelligent portfolio management system"""

    def __init__(self,
                 data_provider: DataProvider,
                 assets: List[str],
                 initial_capital: float = 1000000):

        self.data_provider = data_provider
        self.assets = assets
        self.initial_capital = initial_capital
        self.current_capital = initial_capital

        # Component initialization
        self.regime_detector = MarketRegimeDetector()
        self.risk_manager = RiskManager()
        self.optimizer = PortfolioOptimizer()

        # Historical data
        self.price_history = pd.DataFrame()
        self.portfolio_history = []
        self.performance_metrics = {}

        # Logger
        self.logger = logging.getLogger(__name__)

    async def initialize(self, lookback_days: int = 252):
        """System initialization"""
        self.logger.info("Initializing intelligent portfolio management system...")

        # Get historical data
        end_date = datetime.now()
        start_date = end_date - timedelta(days=lookback_days)

        self.price_history = await self.data_provider.get_prices(
            self.assets, start_date, end_date
        )

        # Calculate returns
        returns = self.price_history.pct_change().dropna()

        # Train market state detection model
        market_returns = returns.mean(axis=1).values
        self.regime_detector.fit(market_returns)

        self.logger.info("System initialization complete")

    async def rebalance_portfolio(self) -> Portfolio:
        """Rebalances the portfolio"""
        self.logger.info("Starting portfolio rebalancing...")

        # Get latest market data
        latest_data = await self.data_provider.get_market_data(self.assets)

        # Calculate returns
        returns = self.price_history.pct_change().dropna()
        recent_returns = returns.tail(30)  # Last 30 days data

        # Detect market state
        market_state = self.regime_detector.predict_state(
            recent_returns.mean(axis=1).values
        )

        self.logger.info(f"Detected market state: {market_state.regime}")

        # Adjust expected returns based on market state
        expected_returns = self._adjust_returns_for_regime(
            recent_returns.mean().values, market_state
        )

        # Calculate covariance matrix
        cov_matrix = recent_returns.cov().values * 252  # Annualized

        # Portfolio optimization
        optimal_weights = self.optimizer.mean_variance_optimization(
            expected_returns, cov_matrix
        )

        # Calculate portfolio metrics
        portfolio_return = np.dot(optimal_weights, expected_returns)
        portfolio_risk = np.sqrt(np.dot(optimal_weights.T,
                                      np.dot(cov_matrix, optimal_weights)))
        sharpe_ratio = portfolio_return / portfolio_risk if portfolio_risk > 0 else 0

        # Risk check
        if portfolio_risk > self.risk_manager.max_portfolio_risk:
            self.logger.warning(f"Portfolio risk too high: {portfolio_risk:.4f}")
            # Adjust risk aversion coefficient
            self.optimizer.risk_aversion *= 1.5
            optimal_weights = self.optimizer.mean_variance_optimization(
                expected_returns, cov_matrix
            )

        # Create new portfolio
        new_portfolio = Portfolio(
            weights=optimal_weights,
            assets=self.assets,
            expected_return=portfolio_return,
            expected_risk=portfolio_risk,
            sharpe_ratio=sharpe_ratio,
            timestamp=datetime.now()
        )

        self.portfolio_history.append(new_portfolio)

        self.logger.info(f"Portfolio rebalancing complete, Sharpe ratio: {sharpe_ratio:.4f}")

        return new_portfolio

    def _adjust_returns_for_regime(self,
                                 base_returns: np.ndarray,
                                 market_state: MarketState) -> np.ndarray:
        """Adjusts expected returns based on market state"""

        # Adjustment factors for different market states
        regime_adjustments = {
            "Bear": 0.7,
            "Sideways": 1.0,
            "Bull": 1.3
        }

        adjustment = regime_adjustments.get(market_state.regime, 1.0)
        adjusted_returns = base_returns * adjustment

        # Consider state confidence
        confidence_weight = market_state.confidence
        final_returns = (confidence_weight * adjusted_returns +
                        (1 - confidence_weight) * base_returns)

        return final_returns * 252  # Annualized

    async def monitor_and_alert(self, current_portfolio: Portfolio):
        """Monitors and alerts for portfolio risk"""

        # Get current prices
        current_data = await self.data_provider.get_market_data(self.assets)
        current_prices = np.array([current_data[asset]['price'] for asset in self.assets])

        # Calculate current portfolio value
        portfolio_value = np.dot(current_portfolio.weights * self.current_capital,
                               current_prices)

        # Risk monitoring
        recent_returns = self.price_history.pct_change().tail(30)
        portfolio_returns = np.dot(recent_returns.values, current_portfolio.weights)

        var = self.risk_manager.calculate_var(portfolio_returns)
        es = self.risk_manager.calculate_expected_shortfall(portfolio_returns)

        # Alert conditions
        if var < -0.05:  # VaR exceeds 5%
            self.logger.warning(f"VaR Alert: {var:.4f}")

        if es < -0.08:  # ES exceeds 8%
            self.logger.warning(f"Expected Shortfall Alert: {es:.4f}")

        # Record monitoring metrics
        monitoring_data = {
            'timestamp': datetime.now(),
            'portfolio_value': portfolio_value,
            'var': var,
            'expected_shortfall': es,
            'market_regime': self.regime_detector.predict_state(
                recent_returns.mean(axis=1).values
            ).regime
        }

        return monitoring_data

    def generate_performance_report(self) -> Dict:
        """Generates a performance report"""

        if not self.portfolio_history:
            return {}

        # Calculate performance metrics
        portfolio_returns = []
        for i in range(1, len(self.portfolio_history)):
            prev_portfolio = self.portfolio_history[i-1]
            curr_portfolio = self.portfolio_history[i]

            # Simplified return calculation
            ret = curr_portfolio.expected_return - prev_portfolio.expected_return
            portfolio_returns.append(ret)

        portfolio_returns = np.array(portfolio_returns)

        report = {
            'total_return': np.sum(portfolio_returns),
            'annualized_return': np.mean(portfolio_returns) * 252,
            'volatility': np.std(portfolio_returns) * np.sqrt(252),
            'sharpe_ratio': np.mean(portfolio_returns) / np.std(portfolio_returns) * np.sqrt(252),
            'max_drawdown': self._calculate_max_drawdown(portfolio_returns),
            'num_rebalances': len(self.portfolio_history),
            'current_portfolio': self.portfolio_history[-1] if self.portfolio_history else None
        }

        return report

    def _calculate_max_drawdown(self, returns: np.ndarray) -> float:
        """Calculates maximum drawdown"""
        cumulative_returns = np.cumprod(1 + returns)
        running_max = np.maximum.accumulate(cumulative_returns)
        drawdown = (cumulative_returns - running_max) / running_max
        return np.min(drawdown)

# Usage example
async def main():
    # Mock data provider
    class MockDataProvider(DataProvider):
        async def get_prices(self, symbols, start_date, end_date):
            # Simulate price data
            dates = pd.date_range(start_date, end_date, freq='D')
            np.random.seed(42)
            data = {}
            for symbol in symbols:
                prices = 100 * np.cumprod(1 + np.random.normal(0.0005, 0.02, len(dates)))
                data[symbol] = prices
            return pd.DataFrame(data, index=dates)

        async def get_market_data(self, symbols):
            # Simulate real-time data
            return {symbol: {'price': 100 + np.random.normal(0, 5)} for symbol in symbols}

    # Initialize system
    assets = ['AAPL', 'GOOGL', 'MSFT', 'TSLA', 'NVDA']
    data_provider = MockDataProvider()

    portfolio_manager = IntelligentPortfolioManager(
        data_provider=data_provider,
        assets=assets,
        initial_capital=1000000
    )

    # System initialization
    await portfolio_manager.initialize()

    # Perform rebalancing
    new_portfolio = await portfolio_manager.rebalance_portfolio()
    print(f"New portfolio weights: {dict(zip(assets, new_portfolio.weights))}")
    print(f"Expected return: {new_portfolio.expected_return:.4f}")
    print(f"Expected risk: {new_portfolio.expected_risk:.4f}")
    print(f"Sharpe ratio: {new_portfolio.sharpe_ratio:.4f}")

    # Monitor
    monitoring_data = await portfolio_manager.monitor_and_alert(new_portfolio)
    print(f"Monitoring data: {monitoring_data}")

    # Generate report
    report = portfolio_manager.generate_performance_report()
    print(f"Performance report: {report}")

# Run example
# asyncio.run(main())

15.2 Quantitative Trading Strategy System

15.2.1 Multi-Strategy Framework Design

import numpy as np
import pandas as pd
from typing import Dict, List, Optional, Union
from abc import ABC, abstractmethod
from enum import Enum
import asyncio
from dataclasses import dataclass
from datetime import datetime

class OrderType(Enum):
    BUY = "buy"
    SELL = "sell"
    HOLD = "hold"

class SignalStrength(Enum):
    WEAK = 1
    MODERATE = 2
    STRONG = 3

@dataclass
class TradingSignal:
    symbol: str
    signal_type: OrderType
    strength: SignalStrength
    confidence: float
    price: float
    quantity: int
    timestamp: datetime
    strategy_name: str
    metadata: Dict = None

@dataclass
class Position:
    symbol: str
    quantity: int
    entry_price: float
    entry_time: datetime
    current_price: float
    unrealized_pnl: float
    realized_pnl: float = 0.0

class TradingStrategy(ABC):
    """Abstract base class for trading strategies"""

    def __init__(self, name: str):
        self.name = name
        self.is_active = True
        self.performance_metrics = {}

    @abstractmethod
    def generate_signal(self, market_data: pd.DataFrame) -> Optional[TradingSignal]:
        """Generates a trading signal"""
        pass

    @abstractmethod
    def update_parameters(self, market_regime: str):
        """Updates parameters based on market state"""
        pass

class MarkovRegimeSwitchingStrategy(TradingStrategy):
    """Trading strategy based on Markov regime switching"""

    def __init__(self, name: str = "MarkovRegimeSwitching"):
        super().__init__(name)
        self.regime_detector = MarketRegimeDetector(n_states=3)
        self.current_regime = None
        self.regime_thresholds = {
            "Bear": {"buy": 0.3, "sell": 0.7},
            "Sideways": {"buy": 0.4, "sell": 0.6},
            "Bull": {"buy": 0.6, "sell": 0.3}
        }

    def generate_signal(self, market_data: pd.DataFrame) -> Optional[TradingSignal]:
        """Generates a trading signal based on market state"""

        # Calculate returns
        returns = market_data['close'].pct_change().dropna()

        if len(returns) < 30:
            return None

        # Detect current market state
        try:
            market_state = self.regime_detector.predict_state(returns.values[-30:])
            self.current_regime = market_state.regime

            # Calculate technical indicators
            current_price = market_data['close'].iloc[-1]
            sma_20 = market_data['close'].rolling(20).mean().iloc[-1]
            rsi = self._calculate_rsi(market_data['close'])

            # Generate signal based on state and indicators
            signal_strength = self._calculate_signal_strength(
                market_state, current_price, sma_20, rsi
            )

            # Decision logic
            thresholds = self.regime_thresholds[market_state.regime]

            if signal_strength >= thresholds["buy"]:
                signal_type = OrderType.BUY
                confidence = signal_strength
            elif signal_strength <= thresholds["sell"]:
                signal_type = OrderType.SELL
                confidence = 1 - signal_strength
            else:
                signal_type = OrderType.HOLD
                confidence = 0.5

            # Calculate position size
            quantity = self._calculate_position_size(market_state, confidence)

            return TradingSignal(
                symbol=market_data.index.name or "SYMBOL",
                signal_type=signal_type,
                strength=self._convert_to_signal_strength(confidence),
                confidence=confidence,
                price=current_price,
                quantity=quantity,
                timestamp=datetime.now(),
                strategy_name=self.name,
                metadata={
                    'regime': market_state.regime,
                    'regime_confidence': market_state.confidence,
                    'rsi': rsi,
                    'sma_ratio': current_price / sma_20
                }
            )

        except Exception as e:
            print(f"Signal generation error: {e}")
            return None

    def _calculate_rsi(self, prices: pd.Series, period: int = 14) -> float:
        """Calculates the RSI indicator"""
        delta = prices.diff()
        gain = (delta.where(delta > 0, 0)).rolling(window=period).mean()
        loss = (-delta.where(delta < 0, 0)).rolling(window=period).mean()
        rs = gain / loss
        rsi = 100 - (100 / (1 + rs))
        return rsi.iloc[-1]

    def _calculate_signal_strength(self, market_state, current_price, sma_20, rsi):
        """Calculates signal strength"""

        # Price relative to moving average
        price_ratio = current_price / sma_20

        # RSI normalization
        rsi_normalized = rsi / 100

        # Market state weight
        regime_weights = {
            "Bear": 0.3,
            "Sideways": 0.5,
            "Bull": 0.8
        }

        regime_weight = regime_weights[market_state.regime]

        # Combined signal strength
        signal_strength = (
            0.4 * price_ratio +
            0.3 * rsi_normalized +
            0.3 * regime_weight
        )

        return np.clip(signal_strength, 0, 1)

    def _calculate_position_size(self, market_state, confidence):
        """Calculates position size"""
        base_size = 100

        # Adjust based on market state
        regime_multipliers = {
            "Bear": 0.5,
            "Sideways": 0.8,
            "Bull": 1.2
        }

        multiplier = regime_multipliers[market_state.regime]

        # Adjust based on confidence
        confidence_multiplier = confidence * 2

        return int(base_size * multiplier * confidence_multiplier)

    def _convert_to_signal_strength(self, confidence):
        """Converts confidence to signal strength enum"""
        if confidence > 0.7:
            return SignalStrength.STRONG
        elif confidence > 0.5:
            return SignalStrength.MODERATE
        else:
            return SignalStrength.WEAK

    def update_parameters(self, market_regime: str):
        """Updates parameters based on market state"""
        if market_regime == "Bear":
            # More conservative in bear markets
            self.regime_thresholds["Bear"]["buy"] = 0.2
            self.regime_thresholds["Bear"]["sell"] = 0.8
        elif market_regime == "Bull":
            # More aggressive in bull markets
            self.regime_thresholds["Bull"]["buy"] = 0.7
            self.regime_thresholds["Bull"]["sell"] = 0.2

class MeanReversionStrategy(TradingStrategy):
    """Mean reversion strategy"""

    def __init__(self, name: str = "MeanReversion", lookback_period: int = 20):
        super().__init__(name)
        self.lookback_period = lookback_period
        self.threshold_std = 2.0

    def generate_signal(self, market_data: pd.DataFrame) -> Optional[TradingSignal]:
        """Generates mean reversion signals"""

        if len(market_data) < self.lookback_period:
            return None

        # Calculate moving average and standard deviation
        prices = market_data['close']
        sma = prices.rolling(self.lookback_period).mean()
        std = prices.rolling(self.lookback_period).std()

        current_price = prices.iloc[-1]
        current_sma = sma.iloc[-1]
        current_std = std.iloc[-1]

        # Calculate Z-score
        z_score = (current_price - current_sma) / current_std

        # Generate signal
        if z_score > self.threshold_std:
            # Price too high, sell
            signal_type = OrderType.SELL
            confidence = min(abs(z_score) / 3, 1.0)
        elif z_score < -self.threshold_std:
            # Price too low, buy
            signal_type = OrderType.BUY
            confidence = min(abs(z_score) / 3, 1.0)
        else:
            signal_type = OrderType.HOLD
            confidence = 0.3

        return TradingSignal(
            symbol=market_data.index.name or "SYMBOL",
            signal_type=signal_type,
            strength=self._convert_to_signal_strength(confidence),
            confidence=confidence,
            price=current_price,
            quantity=int(100 * confidence),
            timestamp=datetime.now(),
            strategy_name=self.name,
            metadata={'z_score': z_score, 'sma': current_sma}
        )

    def _convert_to_signal_strength(self, confidence):
        """Converts confidence to signal strength enum"""
        if confidence > 0.7:
            return SignalStrength.STRONG
        elif confidence > 0.5:
            return SignalStrength.MODERATE
        else:
            return SignalStrength.WEAK

    def update_parameters(self, market_regime: str):
        """Updates parameters based on market state"""
        if market_regime == "Sideways":
            self.threshold_std = 1.5  # More sensitive in sideways markets
        else:
            self.threshold_std = 2.0  # Conservative otherwise

class QuantTradingSystem:
    """Quantitative trading system"""

    def __init__(self, initial_capital: float = 1000000):
        self.initial_capital = initial_capital
        self.current_capital = initial_capital
        self.strategies: List[TradingStrategy] = []
        self.positions: Dict[str, Position] = {}
        self.trade_history = []
        self.performance_tracker = {}

    def add_strategy(self, strategy: TradingStrategy):
        """Adds a trading strategy"""
        self.strategies.append(strategy)

    def remove_strategy(self, strategy_name: str):
        """Removes a trading strategy"""
        self.strategies = [s for s in self.strategies if s.name != strategy_name]

    async def run_strategies(self, market_data: Dict[str, pd.DataFrame]) -> List[TradingSignal]:
        """Runs all strategies"""
        all_signals = []

        for symbol, data in market_data.items():
            for strategy in self.strategies:
                if strategy.is_active:
                    try:
                        signal = strategy.generate_signal(data)
                        if signal:
                            signal.symbol = symbol
                            all_signals.append(signal)
                    except Exception as e:
                        print(f"Strategy {strategy.name} failed: {e}")

        return all_signals

    def aggregate_signals(self, signals: List[TradingSignal]) -> Dict[str, TradingSignal]:
        """Aggregates signals from multiple strategies"""
        symbol_signals = {}

        for signal in signals:
            symbol = signal.symbol

            if symbol not in symbol_signals:
                symbol_signals[symbol] = []
            symbol_signals[symbol].append(signal)

        # Aggregate signals for each symbol
        aggregated = {}
        for symbol, signal_list in symbol_signals.items():
            aggregated[symbol] = self._aggregate_symbol_signals(signal_list)

        return aggregated

    def _aggregate_symbol_signals(self, signals: List[TradingSignal]) -> TradingSignal:
        """Aggregates multiple signals for a single symbol"""

        if len(signals) == 1:
            return signals[0]

        # Calculate weighted signal
        buy_weight = 0
        sell_weight = 0
        total_confidence = 0

        for signal in signals:
            strength_multiplier = signal.strength.value
            weighted_confidence = signal.confidence * strength_multiplier

            if signal.signal_type == OrderType.BUY:
                buy_weight += weighted_confidence
            elif signal.signal_type == OrderType.SELL:
                sell_weight += weighted_confidence

            total_confidence += weighted_confidence

        # Decide final signal
        if buy_weight > sell_weight and buy_weight > 0.5:
            final_signal_type = OrderType.BUY
            final_confidence = buy_weight / len(signals)
        elif sell_weight > buy_weight and sell_weight > 0.5:
            final_signal_type = OrderType.SELL
            final_confidence = sell_weight / len(signals)
        else:
            final_signal_type = OrderType.HOLD
            final_confidence = 0.3

        # Use basic information from the first signal
        base_signal = signals[0]

        return TradingSignal(
            symbol=base_signal.symbol,
            signal_type=final_signal_type,
            strength=self._convert_to_signal_strength(final_confidence),
            confidence=final_confidence,
            price=base_signal.price,
            quantity=int(np.mean([s.quantity for s in signals])),
            timestamp=datetime.now(),
            strategy_name="Aggregated",
            metadata={'contributing_strategies': [s.strategy_name for s in signals]}
        )

    def _convert_to_signal_strength(self, confidence):
        """Converts confidence to signal strength enum"""
        if confidence > 0.7:
            return SignalStrength.STRONG
        elif confidence > 0.5:
            return SignalStrength.MODERATE
        else:
            return SignalStrength.WEAK

    def execute_trades(self, signals: Dict[str, TradingSignal]):
        """Executes trades"""

        for symbol, signal in signals.items():
            if signal.signal_type == OrderType.HOLD:
                continue

            try:
                self._execute_single_trade(signal)
            except Exception as e:
                print(f"Failed to execute trade for {symbol}: {e}")

    def _execute_single_trade(self, signal: TradingSignal):
        """Executes a single trade"""

        symbol = signal.symbol
        current_position = self.positions.get(symbol)

        if signal.signal_type == OrderType.BUY:
            # Buy logic
            cost = signal.price * signal.quantity

            if cost <= self.current_capital:
                if current_position:
                    # Increase position
                    current_position.quantity += signal.quantity
                    current_position.entry_price = (
                        (current_position.entry_price * current_position.quantity + cost) /
                        (current_position.quantity + signal.quantity)
                    )
                else:
                    # New position
                    self.positions[symbol] = Position(
                        symbol=symbol,
                        quantity=signal.quantity,
                        entry_price=signal.price,
                        entry_time=signal.timestamp,
                        current_price=signal.price,
                        unrealized_pnl=0.0
                    )

                self.current_capital -= cost
                self._record_trade(signal, "EXECUTED")

        elif signal.signal_type == OrderType.SELL:
            # Sell logic
            if current_position and current_position.quantity >= signal.quantity:
                # Calculate realized P&L
                realized_pnl = (signal.price - current_position.entry_price) * signal.quantity
                current_position.realized_pnl += realized_pnl
                current_position.quantity -= signal.quantity

                # Update capital
                self.current_capital += signal.price * signal.quantity

                # If position is cleared, remove it
                if current_position.quantity == 0:
                    del self.positions[symbol]

                self._record_trade(signal, "EXECUTED")

    def _record_trade(self, signal: TradingSignal, status: str):
        """Records a trade"""
        trade_record = {
            'timestamp': signal.timestamp,
            'symbol': signal.symbol,
            'signal_type': signal.signal_type.value,
            'price': signal.price,
            'quantity': signal.quantity,
            'strategy': signal.strategy_name,
            'confidence': signal.confidence,
            'status': status
        }

        self.trade_history.append(trade_record)

    def update_positions(self, current_prices: Dict[str, float]):
        """Updates position information"""
        for symbol, position in self.positions.items():
            if symbol in current_prices:
                position.current_price = current_prices[symbol]
                position.unrealized_pnl = (
                    (position.current_price - position.entry_price) * position.quantity
                )

    def calculate_performance(self) -> Dict:
        """Calculates system performance"""

        total_realized_pnl = sum(pos.realized_pnl for pos in self.positions.values())
        total_unrealized_pnl = sum(pos.unrealized_pnl for pos in self.positions.values())

        total_value = self.current_capital + sum(
            pos.current_price * pos.quantity for pos in self.positions.values()
        )

        total_return = (total_value - self.initial_capital) / self.initial_capital

        # Calculate trade statistics
        trade_df = pd.DataFrame(self.trade_history)

        performance = {
            'total_value': total_value,
            'cash': self.current_capital,
            'total_return': total_return,
            'realized_pnl': total_realized_pnl,
            'unrealized_pnl': total_unrealized_pnl,
            'num_trades': len(self.trade_history),
            'num_positions': len(self.positions),
            'positions': dict(self.positions)
        }

        if not trade_df.empty:
            performance.update({
                'avg_trade_size': trade_df['quantity'].mean(),
                'trade_success_rate': len(trade_df[trade_df['status'] == 'EXECUTED']) / len(trade_df)
            })

        return performance

# Usage example
def demo_quant_trading_system():
    """Demonstrates the quantitative trading system"""

    # Create trading system
    trading_system = QuantTradingSystem(initial_capital=1000000)

    # Add strategies
    markov_strategy = MarkovRegimeSwitchingStrategy()
    mean_reversion_strategy = MeanReversionStrategy()

    trading_system.add_strategy(markov_strategy)
    trading_system.add_strategy(mean_reversion_strategy)

    # Simulate market data
    np.random.seed(42)
    dates = pd.date_range('2023-01-01', periods=100, freq='D')

    symbols = ['AAPL', 'GOOGL', 'MSFT']
    market_data = {}

    for symbol in symbols:
        prices = 100 * np.cumprod(1 + np.random.normal(0.001, 0.02, len(dates)))
        data = pd.DataFrame({
            'close': prices,
            'open': prices * (1 + np.random.normal(0, 0.005, len(dates))),
            'high': prices * (1 + np.abs(np.random.normal(0, 0.01, len(dates)))),
            'low': prices * (1 - np.abs(np.random.normal(0, 0.01, len(dates)))),
            'volume': np.random.randint(1000000, 5000000, len(dates))
        }, index=dates)
        market_data[symbol] = data

    print("=== Quantitative Trading System Demo ===")

    # Run strategies (simulate a single point in time)
    latest_data = {symbol: data.tail(50) for symbol, data in market_data.items()}

    async def run_demo():
        # Generate signals
        signals = await trading_system.run_strategies(latest_data)
        print(f"\nNumber of signals generated: {len(signals)}")

        for signal in signals:
            print(f"Strategy: {signal.strategy_name}, Symbol: {signal.symbol}, "
                  f"Signal: {signal.signal_type.value}, Confidence: {signal.confidence:.3f}")

        # Aggregate signals
        aggregated_signals = trading_system.aggregate_signals(signals)
        print(f"\nNumber of aggregated signals: {len(aggregated_signals)}")

        for symbol, signal in aggregated_signals.items():
            print(f"Final Signal - Symbol: {symbol}, Signal: {signal.signal_type.value}, "
                  f"Confidence: {signal.confidence:.3f}, Quantity: {signal.quantity}")

        # Execute trades
        trading_system.execute_trades(aggregated_signals)

        # Update position prices
        current_prices = {symbol: data['close'].iloc[-1] for symbol, data in latest_data.items()}
        trading_system.update_positions(current_prices)

        # Calculate performance
        performance = trading_system.calculate_performance()

        print(f"\n=== System Performance ===")
        print(f"Total asset value: ${performance['total_value']:,.2f}")
        print(f"Cash: ${performance['cash']:,.2f}")
        print(f"Total return: {performance['total_return']:.2%}")
        print(f"Realized P&L: ${performance['realized_pnl']:,.2f}")
        print(f"Unrealized P&L: ${performance['unrealized_pnl']:,.2f}")
        print(f"Number of trades: {performance['num_trades']}")
        print(f"Number of positions: {performance['num_positions']}")

        if performance['num_positions'] > 0:
            print(f"\nCurrent positions:")
            for symbol, position in performance['positions'].items():
                print(f"  {symbol}: {position.quantity} shares, "
                      f"Cost: ${position.entry_price:.2f}, "
                      f"Current Price: ${position.current_price:.2f}, "
                      f"P&L: ${position.unrealized_pnl:,.2f}")

    # Run demo
    import asyncio
    asyncio.run(run_demo())

# Run demo
demo_quant_trading_system()

15.3 Risk Management and Compliance Monitoring System

15.3.1 Real-time Risk Monitoring

import numpy as np
import pandas as pd
from typing import Dict, List, Optional, Callable
from dataclasses import dataclass
from datetime import datetime, timedelta
import asyncio
import logging
from enum import Enum

class RiskLevel(Enum):
    LOW = "low"
    MEDIUM = "medium"
    HIGH = "high"
    CRITICAL = "critical"

class AlertType(Enum):
    CONCENTRATION = "concentration"
    VAR_BREACH = "var_breach"
    DRAWDOWN = "drawdown"
    LEVERAGE = "leverage"
    LIQUIDITY = "liquidity"
    COMPLIANCE = "compliance"

@dataclass
class RiskAlert:
    alert_type: AlertType
    level: RiskLevel
    message: str
    current_value: float
    threshold: float
    symbol: Optional[str] = None
    timestamp: datetime = datetime.now()
    metadata: Dict = None

@dataclass
class RiskMetrics:
    portfolio_var: float
    portfolio_es: float
    max_drawdown: float
    sharpe_ratio: float
    concentration_risk: Dict[str, float]
    leverage_ratio: float
    liquidity_ratio: float
    beta: float
    tracking_error: float

class RiskLimits:
    """Risk limits configuration"""

    def __init__(self):
        # VaR limits
        self.max_portfolio_var = 0.05  # 5%
        self.max_position_var = 0.02   # 2%

        # Concentration limits
        self.max_single_position = 0.10  # Single position not exceeding 10%
        self.max_sector_exposure = 0.25   # Single sector not exceeding 25%

        # Leverage limits
        self.max_leverage = 2.0

        # Liquidity limits
        self.min_liquidity_ratio = 0.05  # At least 5% cash

        # Drawdown limits
        self.max_drawdown = 0.15  # Max drawdown 15%

        # Tracking error limits
        self.max_tracking_error = 0.03

class RealTimeRiskMonitor:
    """Real-time risk monitoring system"""

    def __init__(self, risk_limits: RiskLimits):
        self.risk_limits = risk_limits
        self.alerts: List[RiskAlert] = []
        self.risk_history = []
        self.logger = logging.getLogger(__name__)

        # Callbacks
        self.alert_callbacks: List[Callable] = []

    def add_alert_callback(self, callback: Callable):
        """Adds an alert callback function"""
        self.alert_callbacks.append(callback)

    async def monitor_portfolio(self,
                              positions: Dict[str, Position],
                              market_data: Dict[str, pd.DataFrame],
                              benchmark_returns: Optional[pd.Series] = None) -> RiskMetrics:
        """Monitors portfolio risk"""

        try:
            # Calculate risk metrics
            risk_metrics = await self._calculate_risk_metrics(
                positions, market_data, benchmark_returns
            )

            # Check risk limits
            alerts = self._check_risk_limits(risk_metrics, positions)

            # Handle alerts
            for alert in alerts:
                await self._handle_alert(alert)

            # Record risk history
            self.risk_history.append({
                'timestamp': datetime.now(),
                'metrics': risk_metrics
            })

            return risk_metrics

        except Exception as e:
            self.logger.error(f"Risk monitoring error: {e}")
            raise

    async def _calculate_risk_metrics(self,
                                    positions: Dict[str, Position],
                                    market_data: Dict[str, pd.DataFrame],
                                    benchmark_returns: Optional[pd.Series] = None) -> RiskMetrics:
        """Calculates risk metrics"""

        if not positions:
            return RiskMetrics(0, 0, 0, 0, {}, 0, 0, 0, 0)

        # Calculate position weights
        total_value = sum(pos.current_price * pos.quantity for pos in positions.values())
        weights = {symbol: (pos.current_price * pos.quantity) / total_value
                  for symbol, pos in positions.items()}

        # Calculate returns matrix
        returns_matrix = []
        symbols = list(positions.keys())

        for symbol in symbols:
            if symbol in market_data:
                returns = market_data[symbol]['close'].pct_change().dropna()
                returns_matrix.append(returns.values[-252:])  # Last year's data

        if not returns_matrix:
            return RiskMetrics(0, 0, 0, 0, {}, 0, 0, 0, 0)

        returns_matrix = np.array(returns_matrix).T
        weights_array = np.array([weights[symbol] for symbol in symbols])

        # Calculate portfolio returns
        portfolio_returns = np.dot(returns_matrix, weights_array)

        # VaR calculation
        portfolio_var = np.percentile(portfolio_returns, 5)
        portfolio_es = np.mean(portfolio_returns[portfolio_returns <= portfolio_var])

        # Max drawdown
        cumulative_returns = np.cumprod(1 + portfolio_returns)
        running_max = np.maximum.accumulate(cumulative_returns)
        drawdown = (cumulative_returns - running_max) / running_max
        max_drawdown = np.min(drawdown)

        # Sharpe ratio
        sharpe_ratio = np.mean(portfolio_returns) / np.std(portfolio_returns) * np.sqrt(252)

        # Concentration risk
        concentration_risk = self._calculate_concentration_risk(weights)

        # Leverage ratio (simplified)
        leverage_ratio = sum(abs(w) for w in weights.values())

        # Liquidity ratio (simplified assumption)
        liquidity_ratio = 0.05  # Simplified assumption

        # Beta calculation
        beta = 1.0  # Simplified assumption
        if benchmark_returns is not None and len(benchmark_returns) >= len(portfolio_returns):
            benchmark_aligned = benchmark_returns.values[-len(portfolio_returns):]
            covariance = np.cov(portfolio_returns, benchmark_aligned)[0][1]
            benchmark_variance = np.var(benchmark_aligned)
            beta = covariance / benchmark_variance if benchmark_variance > 0 else 1.0

        # Tracking error
        tracking_error = 0.0
        if benchmark_returns is not None:
            benchmark_aligned = benchmark_returns.values[-len(portfolio_returns):]
            tracking_diff = portfolio_returns - benchmark_aligned
            tracking_error = np.std(tracking_diff) * np.sqrt(252)

        return RiskMetrics(
            portfolio_var=abs(portfolio_var),
            portfolio_es=abs(portfolio_es),
            max_drawdown=abs(max_drawdown),
            sharpe_ratio=sharpe_ratio,
            concentration_risk=concentration_risk,
            leverage_ratio=leverage_ratio,
            liquidity_ratio=liquidity_ratio,
            beta=beta,
            tracking_error=tracking_error
        )

    def _calculate_concentration_risk(self, weights: Dict[str, float]) -> Dict[str, float]:
        """Calculates concentration risk"""

        # Herfindahl Index
        herfindahl_index = sum(w**2 for w in weights.values())

        # Max position weight
        max_weight = max(weights.values()) if weights else 0

        # Top 5 position weights
        sorted_weights = sorted(weights.values(), reverse=True)
        top5_weight = sum(sorted_weights[:5]) if len(sorted_weights) >= 5 else sum(sorted_weights)

        return {
            'herfindahl_index': herfindahl_index,
            'max_position_weight': max_weight,
            'top5_concentration': top5_weight
        }

    def _check_risk_limits(self,
                          risk_metrics: RiskMetrics,
                          positions: Dict[str, Position]) -> List[RiskAlert]:
        """Checks risk limits"""

        alerts = []

        # Check VaR limit
        if risk_metrics.portfolio_var > self.risk_limits.max_portfolio_var:
            alerts.append(RiskAlert(
                alert_type=AlertType.VAR_BREACH,
                level=RiskLevel.HIGH,
                message=f"Portfolio VaR limit breached",
                current_value=risk_metrics.portfolio_var,
                threshold=self.risk_limits.max_portfolio_var
            ))

        # Check max drawdown
        if risk_metrics.max_drawdown > self.risk_limits.max_drawdown:
            alerts.append(RiskAlert(
                alert_type=AlertType.DRAWDOWN,
                level=RiskLevel.CRITICAL,
                message=f"Max drawdown limit breached",
                current_value=risk_metrics.max_drawdown,
                threshold=self.risk_limits.max_drawdown
            ))

        # Check leverage ratio
        if risk_metrics.leverage_ratio > self.risk_limits.max_leverage:
            alerts.append(RiskAlert(
                alert_type=AlertType.LEVERAGE,
                level=RiskLevel.MEDIUM,
                message=f"Leverage ratio limit breached",
                current_value=risk_metrics.leverage_ratio,
                threshold=self.risk_limits.max_leverage
            ))

        # Check liquidity
        if risk_metrics.liquidity_ratio < self.risk_limits.min_liquidity_ratio:
            alerts.append(RiskAlert(
                alert_type=AlertType.LIQUIDITY,
                level=RiskLevel.MEDIUM,
                message=f"Insufficient liquidity",
                current_value=risk_metrics.liquidity_ratio,
                threshold=self.risk_limits.min_liquidity_ratio
            ))

        # Check concentration risk
        max_position_weight = risk_metrics.concentration_risk.get('max_position_weight', 0)
        if max_position_weight > self.risk_limits.max_single_position:
            alerts.append(RiskAlert(
                alert_type=AlertType.CONCENTRATION,
                level=RiskLevel.HIGH,
                message=f"Single position concentration too high",
                current_value=max_position_weight,
                threshold=self.risk_limits.max_single_position
            ))

        # Check tracking error
        if risk_metrics.tracking_error > self.risk_limits.max_tracking_error:
            alerts.append(RiskAlert(
                alert_type=AlertType.COMPLIANCE,
                level=RiskLevel.MEDIUM,
                message=f"Tracking error limit breached",
                current_value=risk_metrics.tracking_error,
                threshold=self.risk_limits.max_tracking_error
            ))

        return alerts

    async def _handle_alert(self, alert: RiskAlert):
        """Handles an alert"""

        # Log alert
        self.alerts.append(alert)
        self.logger.warning(f"Risk Alert: {alert.message} - {alert.current_value:.4f} > {alert.threshold:.4f}")

        # Invoke callback functions
        for callback in self.alert_callbacks:
            try:
                await callback(alert)
            except Exception as e:
                self.logger.error(f"Alert callback error: {e}")

    def get_risk_report(self) -> Dict:
        """Generates a risk report"""

        if not self.risk_history:
            return {}

        latest_metrics = self.risk_history[-1]['metrics']

        # Calculate risk trends
        if len(self.risk_history) > 1:
            prev_metrics = self.risk_history[-2]['metrics']
            var_trend = latest_metrics.portfolio_var - prev_metrics.portfolio_var
            drawdown_trend = latest_metrics.max_drawdown - prev_metrics.max_drawdown
        else:
            var_trend = 0
            drawdown_trend = 0

        # Summarize alerts
        alert_summary = {}
        for alert_type in AlertType:
            alert_summary[alert_type.value] = len([
                a for a in self.alerts
                if a.alert_type == alert_type and
                a.timestamp > datetime.now() - timedelta(days=1)
            ])

        return {
            'timestamp': datetime.now(),
            'current_metrics': {
                'portfolio_var': latest_metrics.portfolio_var,
                'portfolio_es': latest_metrics.portfolio_es,
                'max_drawdown': latest_metrics.max_drawdown,
                'sharpe_ratio': latest_metrics.sharpe_ratio,
                'leverage_ratio': latest_metrics.leverage_ratio,
                'concentration_risk': latest_metrics.concentration_risk
            },
            'risk_trends': {
                'var_change': var_trend,
                'drawdown_change': drawdown_trend
            },
            'alert_summary': alert_summary,
            'risk_score': self._calculate_risk_score(latest_metrics),
            'recommendations': self._generate_recommendations(latest_metrics)
        }

    def _calculate_risk_score(self, metrics: RiskMetrics) -> float:
        """Calculates comprehensive risk score (0-100, higher = riskier)"""

        score = 0

        # VaR score
        var_score = min(metrics.portfolio_var / self.risk_limits.max_portfolio_var * 30, 30)
        score += var_score

        # Drawdown score
        drawdown_score = min(metrics.max_drawdown / self.risk_limits.max_drawdown * 25, 25)
        score += drawdown_score

        # Leverage score
        leverage_score = min(metrics.leverage_ratio / self.risk_limits.max_leverage * 20, 20)
        score += leverage_score

        # Concentration score
        max_weight = metrics.concentration_risk.get('max_position_weight', 0)
        concentration_score = min(max_weight / self.risk_limits.max_single_position * 15, 15)
        score += concentration_score

        # Liquidity score
        liquidity_score = max(10 - metrics.liquidity_ratio / self.risk_limits.min_liquidity_ratio * 10, 0)
        score += liquidity_score

        return min(score, 100)

    def _generate_recommendations(self, metrics: RiskMetrics) -> List[str]:
        """Generates risk management recommendations"""

        recommendations = []

        if metrics.portfolio_var > self.risk_limits.max_portfolio_var * 0.8:
            recommendations.append("Suggest reducing overall portfolio risk exposure")

        if metrics.max_drawdown > self.risk_limits.max_drawdown * 0.7:
            recommendations.append("Consider implementing stop-loss strategies or hedging measures")

        max_weight = metrics.concentration_risk.get('max_position_weight', 0)
        if max_weight > self.risk_limits.max_single_position * 0.8:
            recommendations.append("Suggest diversifying holdings to reduce single asset concentration")

        if metrics.leverage_ratio > self.risk_limits.max_leverage * 0.8:
            recommendations.append("Suggest reducing leverage levels")

        if metrics.liquidity_ratio < self.risk_limits.min_liquidity_ratio * 1.2:
            recommendations.append("Suggest increasing cash or highly liquid asset proportion")

        if metrics.sharpe_ratio < 0.5:
            recommendations.append("Current risk-adjusted return is low, suggest optimizing investment strategy")

        return recommendations

# Compliance monitoring system
class ComplianceMonitor:
    """Compliance monitoring system"""

    def __init__(self):
        self.compliance_rules = []
        self.violations = []

    def add_rule(self, rule_name: str, check_function: Callable, severity: str = "medium"):
        """Adds a compliance rule"""
        self.compliance_rules.append({
            'name': rule_name,
            'check': check_function,
            'severity': severity
        })

    async def check_compliance(self, trading_data: Dict) -> List[Dict]:
        """Checks compliance"""
        violations = []

        for rule in self.compliance_rules:
            try:
                result = await rule['check'](trading_data)
                if not result['compliant']:
                    violation = {
                        'rule_name': rule['name'],
                        'severity': rule['severity'],
                        'description': result['description'],
                        'timestamp': datetime.now(),
                        'data': result.get('data', {})
                    }
                    violations.append(violation)
                    self.violations.append(violation)
            except Exception as e:
                logging.error(f"Compliance check error {rule['name']}: {e}")

        return violations

# Usage example
async def demo_risk_monitoring():
    """Demonstrates the risk monitoring system"""

    # Create risk limits
    risk_limits = RiskLimits()

    # Create risk monitor
    risk_monitor = RealTimeRiskMonitor(risk_limits)

    # Add alert callback
    async def alert_handler(alert: RiskAlert):
        print(f"[{alert.level.value.upper()}] {alert.alert_type.value}: {alert.message}")
        print(f"Current value: {alert.current_value:.4f}, Threshold: {alert.threshold:.4f}")

    risk_monitor.add_alert_callback(alert_handler)

    # Simulate position data
    positions = {
        'AAPL': Position(
            symbol='AAPL',
            quantity=1000,
            entry_price=150.0,
            entry_time=datetime.now(),
            current_price=155.0,
            unrealized_pnl=5000.0
        ),
        'GOOGL': Position(
            symbol='GOOGL',
            quantity=500,
            entry_price=2800.0,
            entry_time=datetime.now(),
            current_price=2850.0,
            unrealized_pnl=25000.0
        )
    }

    # Simulate market data
    np.random.seed(42)
    dates = pd.date_range('2023-01-01', periods=252, freq='D')

    market_data = {}
    for symbol in positions.keys():
        prices = 100 * np.cumprod(1 + np.random.normal(0.001, 0.03, len(dates)))  # High volatility
        market_data[symbol] = pd.DataFrame({
            'close': prices
        }, index=dates)

    print("=== Risk Monitoring System Demo ===")

    # Execute risk monitoring
    risk_metrics = await risk_monitor.monitor_portfolio(positions, market_data)

    print(f"\n=== Risk Metrics ===")
    print(f"Portfolio VaR: {risk_metrics.portfolio_var:.4f}")
    print(f"Expected Shortfall ES: {risk_metrics.portfolio_es:.4f}")
    print(f"Max Drawdown: {risk_metrics.max_drawdown:.4f}")
    print(f"Sharpe Ratio: {risk_metrics.sharpe_ratio:.4f}")
    print(f"Leverage Ratio: {risk_metrics.leverage_ratio:.4f}")
    print(f"Max Position Weight: {risk_metrics.concentration_risk['max_position_weight']:.4f}")

    # Generate risk report
    risk_report = risk_monitor.get_risk_report()
    print(f"\n=== Risk Report ===")
    print(f"Risk Score: {risk_report['risk_score']:.1f}/100")
    print(f"Alert Statistics: {risk_report['alert_summary']}")
    print(f"Recommended Actions:")
    for rec in risk_report['recommendations']:
        print(f"  - {rec}")

# Run demo
# asyncio.run(demo_risk_monitoring())

15.4 Project Implementation and Deployment

15.4.1 System Architecture and Deployment

"""
Production Environment Deployment Architecture Design
"""

import yaml
import json
from typing import Dict, List
from dataclasses import dataclass, asdict

@dataclass
class DatabaseConfig:
    host: str
    port: int
    database: str
    username: str
    password_env: str  # Environment variable name
    pool_size: int = 10
    ssl_enabled: bool = True

@dataclass
class RedisConfig:
    host: str
    port: int
    password_env: str
    db: int = 0
    cluster_enabled: bool = False

@dataclass
class KafkaConfig:
    bootstrap_servers: List[str]
    security_protocol: str = "SASL_SSL"
    sasl_mechanism: str = "PLAIN"
    username_env: str = "KAFKA_USERNAME"
    password_env: str = "KAFKA_PASSWORD"

@dataclass
class MonitoringConfig:
    prometheus_endpoint: str
    grafana_endpoint: str
    alertmanager_endpoint: str
    log_level: str = "INFO"

@dataclass
class SystemConfig:
    environment: str  # dev, staging, prod
    database: DatabaseConfig
    redis: RedisConfig
    kafka: KafkaConfig
    monitoring: MonitoringConfig

    # Application configuration
    api_port: int = 8000
    max_workers: int = 4
    request_timeout: int = 30

    # Risk parameters
    max_portfolio_risk: float = 0.05
    max_position_size: float = 0.10
    rebalance_frequency: str = "daily"

def generate_production_config() -> SystemConfig:
    """Generates production environment configuration"""

    return SystemConfig(
        environment="production",
        database=DatabaseConfig(
            host="postgres-cluster.internal",
            port=5432,
            database="quant_trading",
            username="trading_user",
            password_env="DB_PASSWORD",
            pool_size=20,
            ssl_enabled=True
        ),
        redis=RedisConfig(
            host="redis-cluster.internal",
            port=6379,
            password_env="REDIS_PASSWORD",
            db=0,
            cluster_enabled=True
        ),
        kafka=KafkaConfig(
            bootstrap_servers=[
                "kafka-1.internal:9092",
                "kafka-2.internal:9092",
                "kafka-3.internal:9092"
            ],
            security_protocol="SASL_SSL",
            sasl_mechanism="SCRAM-SHA-256"
        ),
        monitoring=MonitoringConfig(
            prometheus_endpoint="http://prometheus.monitoring:9090",
            grafana_endpoint="http://grafana.monitoring:3000",
            alertmanager_endpoint="http://alertmanager.monitoring:9093",
            log_level="INFO"
        ),
        api_port=8000,
        max_workers=8,
        request_timeout=60,
        max_portfolio_risk=0.03,
        max_position_size=0.08,
        rebalance_frequency="hourly"
    )

def save_config_as_yaml(config: SystemConfig, filename: str):
    """Saves configuration as a YAML file"""
    config_dict = asdict(config)

    with open(filename, 'w', encoding='utf-8') as f:
        yaml.dump(config_dict, f, default_flow_style=False, allow_unicode=True)

# Docker Compose configuration
DOCKER_COMPOSE_TEMPLATE = """
version: '3.8'

services:
  # Quantitative trading API service
  quant-api:
    build:
      context: .
      dockerfile: Dockerfile.api
    ports:
      - "8000:8000"
    environment:
      - ENV=production
      - DB_PASSWORD=${DB_PASSWORD}
      - REDIS_PASSWORD=${REDIS_PASSWORD}
      - KAFKA_USERNAME=${KAFKA_USERNAME}
      - KAFKA_PASSWORD=${KAFKA_PASSWORD}
    depends_on:
      - postgres
      - redis
      -kafka
    networks:
      - quant-network
    restart: unless-stopped
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
      interval: 30s
      timeout: 10s
      retries: 3

  # Strategy engine service
  strategy-engine:
    build:
      context: .
      dockerfile: Dockerfile.strategy
    environment:
      - ENV=production
      - DB_PASSWORD=${DB_PASSWORD}
      - REDIS_PASSWORD=${REDIS_PASSWORD}
    depends_on:
      - postgres
      - redis
    networks:
      - quant-network
    restart: unless-stopped

  # Risk monitoring service
  risk-monitor:
    build:
      context: .
      dockerfile: Dockerfile.risk
    environment:
      - ENV=production
      - DB_PASSWORD=${DB_PASSWORD}
      - REDIS_PASSWORD=${REDIS_PASSWORD}
    depends_on:
      - postgres
      - redis
    networks:
      - quant-network
    restart: unless-stopped

  # PostgreSQL database
  postgres:
    image: postgres:14
    environment:
      - POSTGRES_DB=quant_trading
      - POSTGRES_USER=trading_user
      - POSTGRES_PASSWORD=${DB_PASSWORD}
    volumes:
      - postgres_data:/var/lib/postgresql/data
      - ./init.sql:/docker-entrypoint-initdb.d/init.sql
    networks:
      - quant-network
    restart: unless-stopped

  # Redis cache
  redis:
    image: redis:7-alpine
    command: redis-server --requirepass ${REDIS_PASSWORD}
    volumes:
      - redis_data:/data
    networks:
      - quant-network
    restart: unless-stopped

  # Kafka message queue
  kafka:
    image: confluentinc/cp-kafka:latest
    environment:
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    depends_on:
      - zookeeper
    networks:
      - quant-network
    restart: unless-stopped

  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    networks:
      - quant-network
    restart: unless-stopped

  # Prometheus monitoring
  prometheus:
    image: prom/prometheus:latest
    ports:
      - "9090:9090"
    volumes:
      - ./prometheus.yml:/etc/prometheus/prometheus.yml
      - prometheus_data:/prometheus
    networks:
      - quant-network
    restart: unless-stopped

  # Grafana visualization
  grafana:
    image: grafana/grafana:latest
    ports:
      - "3000:3000"
    environment:
      - GF_SECURITY_ADMIN_PASSWORD=${GRAFANA_PASSWORD}
    volumes:
      - grafana_data:/var/lib/grafana
    networks:
      - quant-network
    restart: unless-stopped

volumes:
  postgres_data:
  redis_data:
  prometheus_data:
  grafana_data:

networks:
  quant-network:
    driver: bridge
"""

# Kubernetes deployment configuration
KUBERNETES_DEPLOYMENT = """
apiVersion: apps/v1
kind: Deployment
metadata:
  name: quant-trading-api
  labels:
    app: quant-trading-api
spec:
  replicas: 3
  selector:
    matchLabels:
      app: quant-trading-api
  template:
    metadata:
      labels:
        app: quant-trading-api
    spec:
      containers:
      - name: api
        image: quant-trading:latest
        ports:
        - containerPort: 8000
        env:
        - name: ENV
          value: "production"
        - name: DB_PASSWORD
          valueFrom:
            secretKeyRef:
              name: db-secret
              key: password
        - name: REDIS_PASSWORD
          valueFrom:
            secretKeyRef:
              name: redis-secret
              key: password
        resources:
          requests:
            memory: "512Mi"
            cpu: "250m"
          limits:
            memory: "1Gi"
            cpu: "500m"
        livenessProbe:
          httpGet:
            path: /health
            port: 8000
          initialDelaySeconds: 30
          periodSeconds: 10
        readinessProbe:
          httpGet:
            path: /ready
            port: 8000
          initialDelaySeconds: 5
          periodSeconds: 5

---
apiVersion: v1
kind: Service
metadata:
  name: quant-trading-api-service
spec:
  selector:
    app: quant-trading-api
  ports:
    - protocol: TCP
      port: 80
      targetPort: 8000
  type: LoadBalancer

---
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: quant-trading-api-hpa
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: quant-trading-api
  minReplicas: 2
  maxReplicas: 10
  metrics:
  - type: Resource
    resource:
      name: cpu
      target:
        type: Utilization
        averageUtilization: 70
  - type: Resource
    resource:
      name: memory
      target:
        type: Utilization
        averageUtilization: 80
"""

# Dockerfile example
DOCKERFILE_API = """
FROM python:3.9-slim

WORKDIR /app

# Install system dependencies
RUN apt-get update && apt-get install -y \
    gcc \
    g++ \
    && rm -rf /var/lib/apt/lists/*

# Copy and install Python dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Copy application code
COPY . .

# Create non-root user
RUN groupadd -r appuser && useradd -r -g appuser appuser
RUN chown -R appuser:appuser /app
USER appuser

# Health check
HEALTHCHECK --interval=30s --timeout=30s --start-period=5s --retries=3 \
    CMD curl -f http://localhost:8000/health || exit 1

# Start application
CMD ["python", "-m", "uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
"""

# CI/CD Pipeline configuration (GitHub Actions)
GITHUB_ACTIONS_WORKFLOW = """
name: Deploy Quant Trading System

on:
  push:
    branches: [ main ]
  pull_request:
    branches: [ main ]

env:
  REGISTRY: ghcr.io
  IMAGE_NAME: ${{ github.repository }}

jobs:
  test:
    runs-on: ubuntu-latest

    steps:
    - uses: actions/checkout@v3

    - name: Set up Python
      uses: actions/setup-python@v4
      with:
        python-version: '3.9'

    - name: Install dependencies
      run: |
        python -m pip install --upgrade pip
        pip install -r requirements.txt
        pip install pytest pytest-cov

    - name: Run tests
      run: |
        pytest tests/ --cov=./ --cov-report=xml

    - name: Upload coverage to Codecov
      uses: codecov/codecov-action@v3

  build-and-push:
    needs: test
    runs-on: ubuntu-latest
    permissions:
      contents: read
      packages: write

    steps:
    - name: Checkout repository
      uses: actions/checkout@v3

    - name: Log in to Container Registry
      uses: docker/login-action@v2
      with:
        registry: ${{ env.REGISTRY }}
        username: ${{ github.actor }}
        password: ${{ secrets.GITHUB_TOKEN }}

    - name: Extract metadata
      id: meta
      uses: docker/metadata-action@v4
      with:
        images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}

    - name: Build and push Docker image
      uses: docker/build-push-action@v3
      with:
        context: .
        push: true
        tags: ${{ steps.meta.outputs.tags }}
        labels: ${{ steps.meta.outputs.labels }}

  deploy:
    needs: build-and-push
    runs-on: ubuntu-latest
    if: github.ref == 'refs/heads/main'

    steps:
    - name: Deploy to Kubernetes
      uses: azure/k8s-deploy@v1
      with:
        manifests: |
          k8s/deployment.yaml
          k8s/service.yaml
        images: |
          ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:${{ github.sha }}
"""

def save_deployment_files():
    """Saves deployment-related files"""

    # Save configuration
    config = generate_production_config()
    save_config_as_yaml(config, "config/production.yaml")

    # Save Docker Compose
    with open("docker-compose.yml", "w") as f:
        f.write(DOCKER_COMPOSE_TEMPLATE)

    # Save Kubernetes configuration
    with open("k8s/deployment.yaml", "w") as f:
        f.write(KUBERNETES_DEPLOYMENT)

    # Save Dockerfile
    with open("Dockerfile.api", "w") as f:
        f.write(DOCKERFILE_API)

    # Save GitHub Actions workflow
    import os
    os.makedirs(".github/workflows", exist_ok=True)
    with open(".github/workflows/deploy.yml", "w") as f:
        f.write(GITHUB_ACTIONS_WORKFLOW)

    print("Deployment files generated successfully")

# Performance monitoring and alerting configuration
PROMETHEUS_CONFIG = """
global:
  scrape_interval: 15s
  evaluation_interval: 15s

rule_files:
  - "alert_rules.yml"

alerting:
  alertmanagers:
    - static_configs:
        - targets:
          - alertmanager:9093

scrape_configs:
  - job_name: 'quant-trading-api'
    static_configs:
      - targets: ['quant-api:8000']
    metrics_path: /metrics
    scrape_interval: 10s

  - job_name: 'strategy-engine'
    static_configs:
      - targets: ['strategy-engine:8001']
    metrics_path: /metrics
    scrape_interval: 30s

  - job_name: 'risk-monitor'
    static_configs:
      - targets: ['risk-monitor:8002']
    metrics_path: /metrics
    scrape_interval: 10s
"""

ALERT_RULES = """
groups:
- name: quant_trading_alerts
  rules:
  - alert: HighErrorRate
    expr: rate(http_requests_total{status=~"5.."}[5m]) > 0.1
    for: 5m
    labels:
      severity: critical
    annotations:
      summary: "High error rate detected"
      description: "Error rate is above 10% for 5 minutes"

  - alert: HighLatency
    expr: histogram_quantile(0.95, rate(http_request_duration_seconds_bucket[5m])) > 1
    for: 5m
    labels:
      severity: warning
    annotations:
      summary: "High latency detected"
      description: "95th percentile latency is above 1 second"

  - alert: RiskLimitBreach
    expr: portfolio_var > 0.05
    for: 1m
    labels:
      severity: critical
    annotations:
      summary: "Portfolio VaR limit breached"
      description: "Portfolio VaR is above 5%"

  - alert: SystemDown
    expr: up == 0
    for: 1m
    labels:
      severity: critical
    annotations:
      summary: "Service is down"
      description: "Service {{ $labels.instance }} is down"
"""

if __name__ == "__main__":
    save_deployment_files()

    # Save monitoring configuration
    with open("config/prometheus.yml", "w") as f:
        f.write(PROMETHEUS_CONFIG)

    with open("config/alert_rules.yml", "w") as f:
        f.write(ALERT_RULES)

15.5 Learning Summary and Outlook

15.5.1 Project Achievements Summary

Through this chapter, we have successfully built a complete quantitative trading system based on Markov models, including:

🔄 正在渲染 Mermaid 图表...

15.5.2 Key Technical Highlights

"""
Core Technology Summary
"""

class TechnicalSummary:
    """Technical Summary"""

    def __init__(self):
        self.key_components = {
            "Model Technologies": [
                "Markov Chain Theory and Applications",
                "Hidden Markov Model Implementation",
                "Regime Switching Models",
                "Monte Carlo Simulation",
                "Machine Learning Integration"
            ],
            "System Architecture": [
                "Microservices Architecture Design",
                "Event-Driven Architecture",
                "Real-time Data Processing",
                "Distributed Computing",
                "Containerized Deployment"
            ],
            "Risk Management": [
                "Real-time Risk Monitoring",
                "VaR and ES Calculation",
                "Stress Testing Framework",
                "Compliance Checking System",
                "Alerting Mechanism"
            ],
            "Performance Optimization": [
                "Algorithm Optimization",
                "Parallel Computing",
                "Caching Strategies",
                "Database Optimization",
                "Network Optimization"
            ]
        }

    def generate_implementation_checklist(self) -> Dict[str, List[str]]:
        """Generates implementation checklist"""

        return {
            "Data Preparation": [
                "✓ Establish data acquisition pipelines",
                "✓ Data cleaning and preprocessing",
                "✓ Data quality monitoring",
                "✓ Backup and recovery mechanisms"
            ],
            "Model Development": [
                "✓ Markov model implementation",
                "✓ Parameter estimation and validation",
                "✓ Model performance evaluation",
                "✓ A/B testing framework"
            ],
            "System Integration": [
                "✓ API interface design",
                "✓ Message queue integration",
                "✓ Database design",
                "✓ Caching mechanisms"
            ],
            "Risk Control": [
                "✓ Risk limit setting",
                "✓ Real-time monitoring deployment",
                "✓ Alerting system configuration",
                "✓ Contingency plan development"
            ],
            "Deployment Operations": [
                "✓ Environment configuration",
                "✓ Monitoring deployment",
                "✓ Logging system",
                "✓ Backup strategy"
            ]
        }

    def calculate_project_metrics(self) -> Dict[str, float]:
        """Calculates project metrics"""

        # Simulate project outcome metrics
        return {
            "Code Coverage": 0.85,
            "System Availability": 0.999,
            "Average Response Time": 0.1,  # seconds
            "Risk Control Effectiveness": 0.92,
            "Return-Risk Ratio": 1.8,
            "User Satisfaction": 0.88
        }

def generate_final_report():
    """Generates the final project report"""

    summary = TechnicalSummary()

    print("=== Markov Model Quantitative Trading System Project Summary ===\n")

    print("📊 Project Outcome Metrics:")
    metrics = summary.calculate_project_metrics()
    for metric, value in metrics.items():
        if isinstance(value, float) and value < 1:
            print(f"  {metric}: {value:.1%}")
        else:
            print(f"  {metric}: {value}")

    print("\n📋 Implementation Checklist:")
    checklist = summary.generate_implementation_checklist()
    for category, items in checklist.items():
        print(f"\n{category}:")
        for item in items:
            print(f"  {item}")

    print("\n🎯 Core Technical Components:")
    for category, components in summary.key_components.items():
        print(f"\n{category}:")
        for component in components:
            print(f"  • {component}")

    print("\n🚀 Project Highlights:")
    highlights = [
        "Complete end-to-end quantitative trading system",
        "Intelligent strategies based on Markov models",
        "Real-time risk monitoring and alerting",
        "High-availability microservices architecture",
        "Comprehensive performance monitoring system",
        "Automated deployment and operations"
    ]

    for highlight in highlights:
        print(f"  ⭐ {highlight}")

    print("\n📈 Business Value:")
    business_values = [
        "Improve investment return-risk ratio",
        "Reduce manual trading costs",
        "Enhance risk control capabilities",
        "Improve asset management efficiency",
        "Support scalable operations",
        "Meet compliance and regulatory requirements"
    ]

    for value in business_values:
        print(f"  💰 {value}")

    print("\n🔮 Future Development Directions:")
    future_directions = [
        "Deep learning model integration",
        "Multi-asset class expansion",
        "Real-time decision optimization",
        "ESG factor integration",
        "Blockchain technology application",
        "Quantum computing exploration"
    ]

    for direction in future_directions:
        print(f"  🚀 {direction}")

# Generate final report
generate_final_report()

Learning Reflection

Through Chapter 15, I have gained a deep understanding of how to translate Markov model theory into practical financial application systems. This comprehensive project has enabled me to master:

  1. System Design Capability: The entire process from requirements analysis to architecture design.
  2. Technology Integration Capability: Organic combination of various technology stacks.
  3. Risk Management Awareness: The importance of risk control in financial systems.
  4. Engineering Practice Experience: Code quality and deployment operations.
  5. Project Management Skills: Team collaboration and progress control.

This system is not only a showcase of technology but also a typical case study combining theory and practice, laying a solid foundation for future career development in financial technology.