Chapter 14: Integrating Machine Learning with Markov Models
Haiyue
26min
Chapter 14: Integrating Machine Learning with Markov Models
Learning Objectives
- Combine deep learning with Markov models
- Implement Neural Hidden Markov Models
- Apply Markov Decision Processes in reinforcement learning
- Build intelligent portfolio management systems
Summary of Key Concepts
1. Neural Markov Models
Combine neural networks with Markov models, leveraging the representation capabilities of deep learning and the sequence modeling advantages of Markov models.
Neural HMM Architecture:
🔄 正在渲染 Mermaid 图表...
2. Markov Decision Process (MDP)
In reinforcement learning, MDP provides a decision-making framework for agents:
- State Space : All possible market states
- Action Space : All possible trading actions
- Transition Probability : State transition function
- Reward Function : Immediate reward
3. Deep Q-Network (DQN)
Bellman Equation:
Neural Network Approximation:
Example Code
Example 1: Neural Hidden Markov Model
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import matplotlib.pyplot as plt
class NeuralHMM(nn.Module):
"""Neural Hidden Markov Model"""
def __init__(self, input_dim, hidden_dim, n_states, sequence_length):
super(NeuralHMM, self).__init__()
self.input_dim = input_dim
self.hidden_dim = hidden_dim
self.n_states = n_states
self.sequence_length = sequence_length
# LSTM encoder
self.lstm = nn.LSTM(input_dim, hidden_dim, batch_first=True)
# Emission probability network
self.emission_net = nn.Sequential(
nn.Linear(hidden_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, input_dim),
nn.Softmax(dim=-1)
)
# Transition probability network
self.transition_net = nn.Sequential(
nn.Linear(hidden_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, n_states * n_states),
nn.Softmax(dim=-1)
)
# Initial state distribution
self.initial_dist = nn.Parameter(torch.ones(n_states) / n_states)
def forward(self, observations):
"""Forward pass"""
batch_size, seq_len, _ = observations.shape
# LSTM encoding
lstm_out, _ = self.lstm(observations)
# Calculate emission probabilities
emission_probs = self.emission_net(lstm_out)
# Calculate transition probabilities
transition_logits = self.transition_net(lstm_out[:, :-1])
transition_probs = transition_logits.view(
batch_size, seq_len-1, self.n_states, self.n_states
)
return emission_probs, transition_probs
def viterbi_decode(self, observations):
"""Viterbi decoding for the most probable state sequence"""
with torch.no_grad():
emission_probs, transition_probs = self.forward(observations)
batch_size, seq_len, _ = observations.shape
# Viterbi algorithm
log_probs = torch.log(emission_probs + 1e-8)
log_transitions = torch.log(transition_probs + 1e-8)
# Initialization
viterbi_scores = torch.log(self.initial_dist.unsqueeze(0)) + log_probs[:, 0]
viterbi_path = []
# Forward process
for t in range(1, seq_len):
scores = viterbi_scores.unsqueeze(-1) + log_transitions[:, t-1]
best_prev_states = torch.argmax(scores, dim=1)
viterbi_scores = torch.gather(scores, 1, best_prev_states.unsqueeze(1)).squeeze(1) + log_probs[:, t]
viterbi_path.append(best_prev_states)
# Backtracking
best_last_states = torch.argmax(viterbi_scores, dim=1)
states = [best_last_states]
for t in range(len(viterbi_path) - 1, -1, -1):
best_last_states = torch.gather(viterbi_path[t], 1, best_last_states.unsqueeze(1)).squeeze(1)
states.append(best_last_states)
return torch.stack(states[::-1], dim=1)
def create_synthetic_data(n_samples=1000, seq_length=50, n_features=3):
"""Create synthetic time series data"""
# True HMM parameters
true_states = 3
true_transitions = torch.tensor([
[0.7, 0.2, 0.1],
[0.3, 0.5, 0.2],
[0.2, 0.3, 0.5]
])
data = []
state_sequences = []
for _ in range(n_samples):
# Simulate state sequence
states = [0] # Initial state
for t in range(seq_length - 1):
current_state = states[-1]
next_state = torch.multinomial(true_transitions[current_state], 1).item()
states.append(next_state)
states = torch.tensor(states)
state_sequences.append(states)
# Generate observations based on states
observations = torch.zeros(seq_length, n_features)
for t in range(seq_length):
if states[t] == 0:
observations[t] = torch.normal(torch.tensor([0.0, 0.0, 0.0]), 0.5)
elif states[t] == 1:
observations[t] = torch.normal(torch.tensor([2.0, -1.0, 1.0]), 0.5)
else:
observations[t] = torch.normal(torch.tensor([-1.0, 2.0, -0.5]), 0.5)
data.append(observations)
return torch.stack(data), torch.stack(state_sequences)
# Generate training data
torch.manual_seed(42)
train_data, true_states = create_synthetic_data(n_samples=500, seq_length=30, n_features=3)
print(f"Neural HMM Training:")
print(f"Training data shape: {train_data.shape}")
print(f"True states shape: {true_states.shape}")
# Create model
model = NeuralHMM(input_dim=3, hidden_dim=32, n_states=3, sequence_length=30)
optimizer = optim.Adam(model.parameters(), lr=0.001)
# Training loop
def train_neural_hmm(model, data, n_epochs=100):
losses = []
for epoch in range(n_epochs):
optimizer.zero_grad()
# Forward pass
emission_probs, transition_probs = model(data)
# Calculate negative log-likelihood loss (simplified)
batch_size, seq_len, n_features = data.shape
# Emission loss
emission_loss = -torch.sum(torch.log(torch.sum(emission_probs * data.unsqueeze(-2), dim=-1) + 1e-8))
# Transition loss (encouraging smooth transitions)
transition_loss = -torch.sum(torch.log(torch.diagonal(transition_probs, dim1=-2, dim2=-1) + 1e-8))
total_loss = emission_loss + 0.1 * transition_loss
total_loss.backward()
optimizer.step()
losses.append(total_loss.item())
if epoch % 20 == 0:
print(f'Epoch {epoch}, Loss: {total_loss.item():.4f}')
return losses
# Train the model
losses = train_neural_hmm(model, train_data, n_epochs=100)
# Predict state sequence
model.eval()
with torch.no_grad():
predicted_states = model.viterbi_decode(train_data[:10])
print(f"\nExample Prediction Results (first 10 steps of the first 5 sequences):")
for i in range(5):
print(f"Sequence {i}:")
print(f" True states: {true_states[i, :10].tolist()}")
print(f" Predicted states: {predicted_states[i, :10].tolist()}")
accuracy = (true_states[i] == predicted_states[i]).float().mean()
print(f" Accuracy: {accuracy:.3f}")
Example 2: Markov Decision Process with Reinforcement Learning
class MarkovDecisionProcess:
"""Markov Decision Process Environment"""
def __init__(self, n_states=10, n_actions=3):
self.n_states = n_states
self.n_actions = n_actions
self.current_state = 0
# Randomly generate transition probability matrix
self.transition_probs = np.random.dirichlet(
np.ones(n_states), size=(n_states, n_actions)
)
# Reward function
self.rewards = np.random.normal(0, 1, (n_states, n_actions))
# Target states (high reward)
self.terminal_states = [n_states - 1]
self.rewards[self.terminal_states, :] = 10.0
def reset(self):
"""Reset the environment"""
self.current_state = np.random.randint(0, self.n_states // 2)
return self.current_state
def step(self, action):
"""Execute an action"""
# Get reward
reward = self.rewards[self.current_state, action]
# State transition
next_state = np.random.choice(
self.n_states,
p=self.transition_probs[self.current_state, action]
)
# Check if terminated
done = next_state in self.terminal_states
self.current_state = next_state
return next_state, reward, done
class DQNAgent:
"""Deep Q-Network Agent"""
def __init__(self, state_dim, action_dim, lr=0.001):
self.state_dim = state_dim
self.action_dim = action_dim
self.epsilon = 1.0
self.epsilon_decay = 0.995
self.epsilon_min = 0.01
self.gamma = 0.95
# Q-network
self.q_network = nn.Sequential(
nn.Linear(state_dim, 64),
nn.ReLU(),
nn.Linear(64, 64),
nn.ReLU(),
nn.Linear(64, action_dim)
)
self.optimizer = optim.Adam(self.q_network.parameters(), lr=lr)
self.memory = []
self.memory_size = 10000
def get_action(self, state):
"""Select action (ε-greedy policy)"""
if np.random.random() < self.epsilon:
return np.random.randint(self.action_dim)
state_tensor = torch.FloatTensor(state).unsqueeze(0)
q_values = self.q_network(state_tensor)
return q_values.argmax().item()
def remember(self, state, action, reward, next_state, done):
"""Store experience"""
if len(self.memory) >= self.memory_size:
self.memory.pop(0)
self.memory.append((state, action, reward, next_state, done))
def replay(self, batch_size=32):
"""Experience replay training"""
if len(self.memory) < batch_size:
return
batch = np.random.choice(len(self.memory), batch_size, replace=False)
states = torch.FloatTensor([self.memory[i][0] for i in batch])
actions = torch.LongTensor([self.memory[i][1] for i in batch])
rewards = torch.FloatTensor([self.memory[i][2] for i in batch])
next_states = torch.FloatTensor([self.memory[i][3] for i in batch])
dones = torch.BoolTensor([self.memory[i][4] for i in batch])
current_q_values = self.q_network(states).gather(1, actions.unsqueeze(1))
next_q_values = self.q_network(next_states).max(1)[0].detach()
target_q_values = rewards + (self.gamma * next_q_values * ~dones)
loss = nn.MSELoss()(current_q_values.squeeze(), target_q_values)
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
if self.epsilon > self.epsilon_min:
self.epsilon *= self.epsilon_decay
def state_to_vector(state, n_states):
"""Convert state to one-hot vector"""
vector = np.zeros(n_states)
vector[state] = 1.0
return vector
# Create environment and agent
env = MarkovDecisionProcess(n_states=10, n_actions=3)
agent = DQNAgent(state_dim=10, action_dim=3)
print(f"Reinforcement Learning MDP Training:")
print(f"State space size: {env.n_states}")
print(f"Action space size: {env.n_actions}")
# Train the agent
episodes = 1000
scores = []
for episode in range(episodes):
state = env.reset()
state_vector = state_to_vector(state, env.n_states)
total_reward = 0
steps = 0
max_steps = 100
while steps < max_steps:
action = agent.get_action(state_vector)
next_state, reward, done = env.step(action)
next_state_vector = state_to_vector(next_state, env.n_states)
agent.remember(state_vector, action, reward, next_state_vector, done)
agent.replay()
state_vector = next_state_vector
total_reward += reward
steps += 1
if done:
break
scores.append(total_reward)
if episode % 100 == 0:
avg_score = np.mean(scores[-100:])
print(f'Episode {episode}, Average Score: {avg_score:.2f}, Epsilon: {agent.epsilon:.3f}')
# Visualize training results
plt.figure(figsize=(15, 10))
# Learning curve
plt.subplot(2, 2, 1)
window = 50
moving_avg = [np.mean(scores[max(0, i-window):i+1]) for i in range(len(scores))]
plt.plot(scores, alpha=0.3, label='Episode Score')
plt.plot(moving_avg, label=f'Moving Average ({window})')
plt.title('Reinforcement Learning Training Curve')
plt.xlabel('Episode')
plt.ylabel('Total Reward')
plt.legend()
plt.grid(True, alpha=0.3)
# Q-value visualization
plt.subplot(2, 2, 2)
with torch.no_grad():
q_values = []
for state in range(env.n_states):
state_vector = state_to_vector(state, env.n_states)
q_vals = agent.q_network(torch.FloatTensor(state_vector)).numpy()
q_values.append(q_vals)
q_values = np.array(q_values)
im = plt.imshow(q_values.T, cmap='viridis', aspect='auto')
plt.colorbar(im)
plt.title('Learned Q-values')
plt.xlabel('State')
plt.ylabel('Action')
# Policy visualization
plt.subplot(2, 2, 3)
policy = np.argmax(q_values, axis=1)
plt.bar(range(env.n_states), policy, alpha=0.7)
plt.title('Learned Policy')
plt.xlabel('State')
plt.ylabel('Optimal Action')
plt.grid(True, alpha=0.3, axis='y')
# Epsilon decay
plt.subplot(2, 2, 4)
olds = [1.0 * (0.995 ** i) for i in range(episodes)]
plt.plot(epsilons)
plt.title('Exploration Rate (Epsilon) Decay')
plt.xlabel('Episode')
plt.ylabel('Epsilon')
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
print(f"\nTraining complete!")
print(f"Final average reward: {np.mean(scores[-100:]):.2f}")
print(f"Learned policy: {policy}")
Example 3: Intelligent Portfolio Management
class PortfolioMDP:
"""Portfolio Management MDP Environment"""
def __init__(self, n_assets=3, initial_capital=10000):
self.n_assets = n_assets
self.initial_capital = initial_capital
self.current_capital = initial_capital
self.current_weights = np.ones(n_assets) / n_assets
self.price_history = []
# Markov states for asset returns
self.market_states = 3 # Bull, Sideways, Bear markets
self.current_market_state = 1
# State transition matrix
self.state_transitions = np.array([
[0.6, 0.3, 0.1], # Bull market
[0.3, 0.4, 0.3], # Sideways market
[0.1, 0.4, 0.5] # Bear market
])
# Asset return parameters for each state
self.asset_returns = {
0: np.array([0.015, 0.012, 0.008]), # Bull market returns
1: np.array([0.005, 0.003, 0.002]), # Sideways market returns
2: np.array([-0.005, -0.002, 0.001]) # Bear market returns
}
self.asset_volatilities = {
0: np.array([0.15, 0.12, 0.08]),
1: np.array([0.20, 0.15, 0.10]),
2: np.array([0.25, 0.20, 0.12])
}
def reset(self):
"""Reset the environment"""
self.current_capital = self.initial_capital
self.current_weights = np.ones(self.n_assets) / self.n_assets
self.current_market_state = 1
self.price_history = []
return self._get_state()
def _get_state(self):
"""Get current state representation"""
# State includes: current weights, market state, historical returns
market_state_vector = np.zeros(self.market_states)
market_state_vector[self.current_market_state] = 1.0
if len(self.price_history) >= 5:
recent_returns = np.array(self.price_history[-5:]).flatten()
else:
recent_returns = np.zeros(5 * self.n_assets)
state = np.concatenate([
self.current_weights,
market_state_vector,
recent_returns[:15] # Limit state dimension
])
return state
def step(self, action):
"""Execute portfolio adjustment action"""
# Action interpretation: weight adjustment vector
action = np.clip(action, -0.1, 0.1) # Limit adjustment magnitude
new_weights = self.current_weights + action
new_weights = np.clip(new_weights, 0.0, 1.0)
new_weights = new_weights / np.sum(new_weights) # Normalize
# Calculate transaction cost
transaction_cost = 0.001 * np.sum(np.abs(new_weights - self.current_weights))
# Market state transition
self.current_market_state = np.random.choice(
self.market_states,
p=self.state_transitions[self.current_market_state]
)
# Generate asset returns
mean_returns = self.asset_returns[self.current_market_state]
volatilities = self.asset_volatilities[self.current_market_state]
asset_returns = np.random.normal(mean_returns, volatilities)
# Calculate portfolio return
portfolio_return = np.sum(new_weights * asset_returns) - transaction_cost
# Update capital and weights
self.current_capital *= (1 + portfolio_return)
self.current_weights = new_weights
self.price_history.append(asset_returns)
# Calculate reward (based on Sharpe ratio and absolute return)
if len(self.price_history) >= 20:
recent_returns = [np.sum(self.current_weights * ret) for ret in self.price_history[-20:]]
sharpe_ratio = np.mean(recent_returns) / (np.std(recent_returns) + 1e-6)
reward = portfolio_return + 0.1 * sharpe_ratio
else:
reward = portfolio_return
return self._get_state(), reward, False # No termination condition
class PortfolioAgent:
"""Portfolio Management Agent"""
def __init__(self, state_dim, action_dim):
self.state_dim = state_dim
self.action_dim = action_dim
# Actor network (policy network)
self.actor = nn.Sequential(
nn.Linear(state_dim, 128),
nn.ReLU(),
nn.Linear(128, 64),
nn.ReLU(),
nn.Linear(64, action_dim),
nn.Tanh() # Output range [-1, 1]
)
# Critic network (value network)
self.critic = nn.Sequential(
nn.Linear(state_dim + action_dim, 128),
nn.ReLU(),
nn.Linear(128, 64),
nn.ReLU(),
nn.Linear(64, 1)
)
self.actor_optimizer = optim.Adam(self.actor.parameters(), lr=0.001)
self.critic_optimizer = optim.Adam(self.critic.parameters(), lr=0.002)
self.noise_std = 0.1
self.gamma = 0.99
def get_action(self, state, add_noise=True):
"""Get action"""
with torch.no_grad():
state_tensor = torch.FloatTensor(state).unsqueeze(0)
action = self.actor(state_tensor).squeeze().numpy()
if add_noise:
noise = np.random.normal(0, self.noise_std, size=action.shape)
action = action + noise
return np.clip(action, -1, 1) * 0.1 # Scale to [-0.1, 0.1]
def train(self, experiences):
"""Train the agent"""
states, actions, rewards, next_states = experiences
states = torch.FloatTensor(states)
actions = torch.FloatTensor(actions)
rewards = torch.FloatTensor(rewards)
next_states = torch.FloatTensor(next_states)
# Train Critic
with torch.no_grad():
next_actions = self.actor(next_states)
target_q = rewards + self.gamma * self.critic(
torch.cat([next_states, next_actions], dim=1)
).squeeze()
current_q = self.critic(torch.cat([states, actions], dim=1)).squeeze()
critic_loss = nn.MSELoss()(current_q, target_q)
self.critic_optimizer.zero_grad()
critic_loss.backward()
self.critic_optimizer.step()
# Train Actor
predicted_actions = self.actor(states)
actor_loss = -self.critic(torch.cat([states, predicted_actions], dim=1)).mean()
self.actor_optimizer.zero_grad()
actor_loss.backward()
self.actor_optimizer.step()
return critic_loss.item(), actor_loss.item()
# Create portfolio environment and agent
env = PortfolioMDP(n_assets=3, initial_capital=10000)
state_dim = len(env._get_state())
agent = PortfolioAgent(state_dim=state_dim, action_dim=3)
print(f"Intelligent Portfolio Management:")
print(f"State dimension: {state_dim}")
print(f"Action dimension: 3 (corresponding to weight adjustments for 3 assets)")
# Training loop
episodes = 500
episode_rewards = []
portfolio_values = []
memory = []
for episode in range(episodes):
state = env.reset()
episode_reward = 0
episode_portfolio_values = [env.current_capital]
for step in range(100): # 100 steps per episode
action = agent.get_action(state)
next_state, reward, done = env.step(action)
memory.append((state, action, reward, next_state))
if len(memory) > 10000:
memory.pop(0)
state = next_state
episode_reward += reward
episode_portfolio_values.append(env.current_capital)
# Train the agent
if len(memory) >= 32:
batch_indices = np.random.choice(len(memory), 32, replace=False)
batch = [memory[i] for i in batch_indices]
states = np.array([exp[0] for exp in batch])
actions = np.array([exp[1] for exp in batch])
rewards = np.array([exp[2] for exp in batch])
next_states = np.array([exp[3] for exp in batch])
agent.train((states, actions, rewards, next_states))
episode_rewards.append(episode_reward)
portfolio_values.append(episode_portfolio_values)
if episode % 50 == 0:
avg_reward = np.mean(episode_rewards[-50:])
final_capital = env.current_capital
print(f'Episode {episode}, Avg Reward: {avg_reward:.4f}, Final Capital: {final_capital:.2f}')
# Visualize results
plt.figure(figsize=(15, 10))
# Training rewards
plt.subplot(2, 2, 1)
window = 20
moving_avg_rewards = [np.mean(episode_rewards[max(0, i-window):i+1]) for i in range(len(episode_rewards))]
plt.plot(episode_rewards, alpha=0.3, label='Episode Reward')
plt.plot(moving_avg_rewards, label=f'Moving Average ({window})')
plt.title('Training Reward Curve')
plt.xlabel('Episode')
plt.ylabel('Episode Reward')
plt.legend()
plt.grid(True, alpha=0.3)
# Portfolio value for the last few episodes
plt.subplot(2, 2, 2)
for i in range(-5, 0):
plt.plot(portfolio_values[i], alpha=0.7, label=f'Episode {len(portfolio_values) + i}')
plt.title('Portfolio Value for Last 5 Episodes')
plt.xlabel('Step')
plt.ylabel('Portfolio Value')
plt.legend()
plt.grid(True, alpha=0.3)
# Final weight distribution
plt.subplot(2, 2, 3)
final_weights = env.current_weights
plt.pie(final_weights, labels=[f'Asset {i+1}' for i in range(len(final_weights))],
autopct='%1.1f%%', startangle=90)
plt.title('Final Portfolio Weight Distribution')
# Cumulative return comparison
plt.subplot(2, 2, 4)
# Equal-weight benchmark
equal_weight_returns = []
current_capital_eq = 10000
env_test = PortfolioMDP(n_assets=3, initial_capital=10000)
state = env_test.reset()
for _ in range(100):
action = np.array([0.0, 0.0, 0.0]) # No adjustment, maintain equal weights
next_state, reward, done = env_test.step(action)
equal_weight_returns.append(env_test.current_capital)
state = next_state
# Agent's strategy
agent_returns = portfolio_values[-1]
plt.plot(equal_weight_returns, label='Equal-Weight Benchmark', linewidth=2)
plt.plot(agent_returns, label='Agent Strategy', linewidth=2)
plt.title('Strategy Return Comparison')
plt.xlabel('Step')
plt.ylabel('Portfolio Value')
plt.legend()
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
print(f"\nPortfolio Management Results:")
print(f"Final portfolio value: {env.current_capital:.2f}")
print(f"Total return: {(env.current_capital/env.initial_capital - 1)*100:.2f}%")
print(f"Final weight allocation: {env.current_weights}")
Mathematical Formulas Summary
-
Neural Network HMM:
- Emission Probability:
- Transition Probability:
-
Reinforcement Learning:
- Q-function:
- Policy Gradient:
-
Actor-Critic:
- Actor Update:
- Critic Update:
Practical Advice
- Neural networks require sufficient data and appropriate regularization
- Reinforcement learning requires careful design of reward functions
- Hyperparameter tuning greatly affects performance
- Consider model interpretability and stability
- Pay special attention to risk control in financial applications