Chapter 08: DSPy Language Model Integration
Haiyue
54min
Chapter 08: DSPy Language Model Integration
Learning Objectives
- Configure different language model backends
- Learn OpenAI, Claude, and local model integration
- Implement multi-model collaboration and switching
- Master model configuration and parameter tuning
- Understand model selection strategies
Key Concepts
1. Language Model Backend Configuration
DSPy supports multiple language model backends, providing a unified interface for using different model services.
Basic Model Configuration
import dspy
import os
from typing import Dict, Any, List, Optional, Union, Callable
import json
import time
import asyncio
from abc import ABC, abstractmethod
class BaseModelConfig:
"""Base model configuration class"""
def __init__(self,
model_name: str,
api_key: str = None,
api_base: str = None,
max_tokens: int = 1000,
temperature: float = 0.7,
**kwargs):
self.model_name = model_name
self.api_key = api_key
self.api_base = api_base
self.max_tokens = max_tokens
self.temperature = temperature
self.additional_params = kwargs
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary"""
return {
'model_name': self.model_name,
'api_key': self.api_key,
'api_base': self.api_base,
'max_tokens': self.max_tokens,
'temperature': self.temperature,
**self.additional_params
}
class ModelConfigManager:
"""Model configuration manager"""
def __init__(self):
self.configs = {}
self.active_models = {}
self.model_metrics = {}
def add_openai_config(self,
config_name: str,
model_name: str = "gpt-3.5-turbo",
api_key: str = None,
**kwargs) -> BaseModelConfig:
"""Add OpenAI configuration"""
if not api_key:
api_key = os.getenv('OPENAI_API_KEY')
if not api_key:
raise ValueError("OpenAI API key is required")
config = BaseModelConfig(
model_name=model_name,
api_key=api_key,
api_base=kwargs.get('api_base', 'https://api.openai.com/v1'),
**kwargs
)
self.configs[config_name] = {
'type': 'openai',
'config': config,
'status': 'configured'
}
print(f"✅ OpenAI model configured: {config_name} ({model_name})")
return config
def add_anthropic_config(self,
config_name: str,
model_name: str = "claude-3-sonnet-20240229",
api_key: str = None,
**kwargs) -> BaseModelConfig:
"""Add Anthropic (Claude) configuration"""
if not api_key:
api_key = os.getenv('ANTHROPIC_API_KEY')
if not api_key:
raise ValueError("Anthropic API key is required")
config = BaseModelConfig(
model_name=model_name,
api_key=api_key,
api_base=kwargs.get('api_base', 'https://api.anthropic.com'),
**kwargs
)
self.configs[config_name] = {
'type': 'anthropic',
'config': config,
'status': 'configured'
}
print(f"✅ Anthropic model configured: {config_name} ({model_name})")
return config
def add_local_model_config(self,
config_name: str,
model_path: str,
model_type: str = "huggingface",
**kwargs) -> BaseModelConfig:
"""Add local model configuration"""
config = BaseModelConfig(
model_name=model_path,
api_base="local",
model_type=model_type,
**kwargs
)
self.configs[config_name] = {
'type': 'local',
'config': config,
'status': 'configured'
}
print(f"✅ Local model configured: {config_name} ({model_path})")
return config
def initialize_model(self, config_name: str) -> dspy.LM:
"""Initialize model"""
if config_name not in self.configs:
raise ValueError(f"Configuration {config_name} does not exist")
config_info = self.configs[config_name]
config = config_info['config']
model_type = config_info['type']
try:
if model_type == 'openai':
model = dspy.OpenAI(
model=config.model_name,
api_key=config.api_key,
api_base=config.api_base,
max_tokens=config.max_tokens,
temperature=config.temperature
)
elif model_type == 'anthropic':
# Note: Actual usage requires ensuring DSPy supports Anthropic
model = self.create_anthropic_model(config)
elif model_type == 'local':
model = self.create_local_model(config)
else:
raise ValueError(f"Unsupported model type: {model_type}")
self.active_models[config_name] = model
self.configs[config_name]['status'] = 'active'
print(f"🚀 Model activated: {config_name}")
return model
except Exception as e:
self.configs[config_name]['status'] = 'failed'
print(f"❌ Model initialization failed: {config_name} - {str(e)}")
raise
def create_anthropic_model(self, config: BaseModelConfig):
"""Create Anthropic model (custom implementation)"""
class AnthropicModel(dspy.LM):
def __init__(self, config):
self.config = config
self.history = []
def basic_request(self, prompt: str, **kwargs):
# Actual Anthropic API call needed here
# For demonstration, return mock response
return [f"Mock Anthropic response: {prompt[:50]}..."]
def __call__(self, prompt, **kwargs):
return self.basic_request(prompt, **kwargs)
return AnthropicModel(config)
def create_local_model(self, config: BaseModelConfig):
"""Create local model"""
class LocalModel(dspy.LM):
def __init__(self, config):
self.config = config
self.model = None
self.tokenizer = None
self._load_model()
def _load_model(self):
# Simulate local model loading
print(f"📥 Loading local model: {self.config.model_name}")
# Actual implementation needs to load real model
pass
def basic_request(self, prompt: str, **kwargs):
# Simulate local model inference
return [f"Local model response: {prompt[:30]}..."]
def __call__(self, prompt, **kwargs):
return self.basic_request(prompt, **kwargs)
return LocalModel(config)
def get_model(self, config_name: str) -> dspy.LM:
"""Get model instance"""
if config_name not in self.active_models:
return self.initialize_model(config_name)
return self.active_models[config_name]
def list_configs(self) -> Dict[str, Dict]:
"""List all configurations"""
return {
name: {
'type': info['type'],
'status': info['status'],
'model_name': info['config'].model_name
}
for name, info in self.configs.items()
}
def switch_default_model(self, config_name: str):
"""Switch default model"""
model = self.get_model(config_name)
dspy.settings.configure(lm=model)
print(f"🔄 Default model switched to: {config_name}")
# Usage example
def demonstrate_model_configuration():
"""Demonstrate model configuration"""
config_manager = ModelConfigManager()
# Configure multiple models
try:
# OpenAI configuration
config_manager.add_openai_config(
"gpt-3.5",
model_name="gpt-3.5-turbo",
temperature=0.5,
max_tokens=500
)
config_manager.add_openai_config(
"gpt-4",
model_name="gpt-4",
temperature=0.3,
max_tokens=1000
)
# Anthropic configuration
config_manager.add_anthropic_config(
"claude-3-sonnet",
model_name="claude-3-sonnet-20240229"
)
# Local model configuration
config_manager.add_local_model_config(
"local-llama",
model_path="/path/to/llama-model"
)
except ValueError as e:
print(f"⚠️ Configuration skipped (missing API key): {e}")
# List configurations
configs = config_manager.list_configs()
print(f"\n📋 Configured models:")
for name, info in configs.items():
print(f" {name}: {info['type']} - {info['model_name']} ({info['status']})")
return config_manager
# demo_config_manager = demonstrate_model_configuration()
2. Multi-Model Collaboration Strategy
In complex applications, multiple models often need to collaborate to complete tasks.
class MultiModelOrchestrator:
"""Multi-model orchestrator"""
def __init__(self, config_manager: ModelConfigManager):
self.config_manager = config_manager
self.model_capabilities = {}
self.routing_rules = {}
self.fallback_chains = {}
def define_model_capabilities(self,
config_name: str,
capabilities: List[str],
strengths: Dict[str, float] = None,
cost_per_token: float = 0.0):
"""Define model capabilities"""
self.model_capabilities[config_name] = {
'capabilities': capabilities,
'strengths': strengths or {},
'cost_per_token': cost_per_token,
'usage_stats': {
'total_requests': 0,
'successful_requests': 0,
'avg_response_time': 0.0,
'total_cost': 0.0
}
}
print(f"📝 Model capabilities defined: {config_name}")
print(f" Capabilities: {', '.join(capabilities)}")
def add_routing_rule(self,
task_type: str,
model_selector: Callable[[Dict], str]):
"""Add routing rule"""
self.routing_rules[task_type] = model_selector
print(f"🛤️ Routing rule added: {task_type}")
def set_fallback_chain(self,
primary_model: str,
fallback_models: List[str]):
"""Set fallback chain"""
self.fallback_chains[primary_model] = fallback_models
print(f"🔄 Fallback chain set: {primary_model} -> {fallback_models}")
def route_request(self,
task_type: str,
context: Dict[str, Any] = None) -> str:
"""Route request to appropriate model"""
if task_type in self.routing_rules:
selected_model = self.routing_rules[task_type](context or {})
print(f"🎯 Task routed: {task_type} -> {selected_model}")
return selected_model
# Default routing logic
return self.default_model_selection(task_type, context)
def default_model_selection(self,
task_type: str,
context: Dict[str, Any]) -> str:
"""Default model selection logic"""
# Capability-based matching
suitable_models = []
for model_name, info in self.model_capabilities.items():
if task_type in info['capabilities']:
strength = info['strengths'].get(task_type, 0.5)
cost = info['cost_per_token']
# Calculate composite score (adjust weights as needed)
score = strength * 0.7 - cost * 0.3
suitable_models.append((model_name, score))
if suitable_models:
# Select model with highest score
selected_model = max(suitable_models, key=lambda x: x[1])[0]
return selected_model
# If no suitable model, return first available model
available_models = list(self.model_capabilities.keys())
return available_models[0] if available_models else "default"
def execute_with_fallback(self,
model_name: str,
task_func: Callable,
*args, **kwargs) -> Dict[str, Any]:
"""Execute task with fallback support"""
models_to_try = [model_name]
# Add fallback models
if model_name in self.fallback_chains:
models_to_try.extend(self.fallback_chains[model_name])
last_error = None
for attempt, current_model in enumerate(models_to_try):
try:
print(f"🔄 Trying model: {current_model} (attempt {attempt + 1})")
# Get model instance
model = self.config_manager.get_model(current_model)
# Set model and execute task
dspy.settings.configure(lm=model)
start_time = time.time()
result = task_func(*args, **kwargs)
execution_time = time.time() - start_time
# Update statistics
self.update_usage_stats(current_model, True, execution_time)
return {
'result': result,
'model_used': current_model,
'attempt': attempt + 1,
'execution_time': execution_time,
'success': True
}
except Exception as e:
last_error = e
print(f"❌ Model {current_model} execution failed: {str(e)}")
# Update statistics
self.update_usage_stats(current_model, False, 0.0)
if attempt < len(models_to_try) - 1:
print(f"⏭️ Trying next model...")
continue
else:
print(f"🚨 All models failed")
break
return {
'result': None,
'model_used': None,
'attempt': len(models_to_try),
'error': str(last_error),
'success': False
}
def update_usage_stats(self,
model_name: str,
success: bool,
execution_time: float):
"""Update usage statistics"""
if model_name not in self.model_capabilities:
return
stats = self.model_capabilities[model_name]['usage_stats']
stats['total_requests'] += 1
if success:
stats['successful_requests'] += 1
# Update average response time
if stats['total_requests'] > 1:
current_avg = stats['avg_response_time']
stats['avg_response_time'] = (
(current_avg * (stats['total_requests'] - 1) + execution_time)
/ stats['total_requests']
)
else:
stats['avg_response_time'] = execution_time
def get_performance_report(self) -> Dict[str, Any]:
"""Get performance report"""
report = {
'models': {},
'summary': {
'total_requests': 0,
'total_successful': 0,
'avg_success_rate': 0.0
}
}
total_requests = 0
total_successful = 0
for model_name, info in self.model_capabilities.items():
stats = info['usage_stats']
success_rate = (
stats['successful_requests'] / stats['total_requests']
if stats['total_requests'] > 0 else 0.0
)
report['models'][model_name] = {
'total_requests': stats['total_requests'],
'success_rate': success_rate,
'avg_response_time': stats['avg_response_time'],
'total_cost': stats['total_cost']
}
total_requests += stats['total_requests']
total_successful += stats['successful_requests']
if total_requests > 0:
report['summary']['total_requests'] = total_requests
report['summary']['total_successful'] = total_successful
report['summary']['avg_success_rate'] = total_successful / total_requests
return report
class SpecializedModelEnsemble:
"""Specialized model ensemble"""
def __init__(self, orchestrator: MultiModelOrchestrator):
self.orchestrator = orchestrator
# Define specialized models
self.setup_specialized_models()
def setup_specialized_models(self):
"""Set up specialized models"""
# Define model capabilities
self.orchestrator.define_model_capabilities(
"gpt-4",
capabilities=["reasoning", "analysis", "complex_questions"],
strengths={"reasoning": 0.9, "analysis": 0.85, "complex_questions": 0.9},
cost_per_token=0.03
)
self.orchestrator.define_model_capabilities(
"gpt-3.5",
capabilities=["general", "summarization", "simple_questions"],
strengths={"general": 0.8, "summarization": 0.85, "simple_questions": 0.8},
cost_per_token=0.002
)
self.orchestrator.define_model_capabilities(
"claude-3-sonnet",
capabilities=["creative_writing", "analysis", "long_context"],
strengths={"creative_writing": 0.9, "analysis": 0.85, "long_context": 0.95},
cost_per_token=0.015
)
# Set routing rules
self.orchestrator.add_routing_rule(
"complex_reasoning",
lambda ctx: self.select_reasoning_model(ctx)
)
self.orchestrator.add_routing_rule(
"text_generation",
lambda ctx: self.select_generation_model(ctx)
)
# Set fallback chains
self.orchestrator.set_fallback_chain("gpt-4", ["claude-3-sonnet", "gpt-3.5"])
self.orchestrator.set_fallback_chain("claude-3-sonnet", ["gpt-4", "gpt-3.5"])
def select_reasoning_model(self, context: Dict[str, Any]) -> str:
"""Select reasoning model"""
complexity = context.get('complexity', 'medium')
budget = context.get('budget', 'medium')
if complexity == 'high' and budget == 'high':
return "gpt-4"
elif complexity == 'high' and budget == 'medium':
return "claude-3-sonnet"
else:
return "gpt-3.5"
def select_generation_model(self, context: Dict[str, Any]) -> str:
"""Select generation model"""
text_type = context.get('text_type', 'general')
length = context.get('length', 'medium')
if text_type == 'creative' or length == 'long':
return "claude-3-sonnet"
elif text_type == 'technical':
return "gpt-4"
else:
return "gpt-3.5"
# Usage example
def demonstrate_multi_model_orchestration():
"""Demonstrate multi-model orchestration"""
# Assume config manager exists
config_manager = ModelConfigManager()
# Add some mock configurations (real API keys needed in actual use)
try:
config_manager.add_openai_config("gpt-4", "gpt-4")
config_manager.add_openai_config("gpt-3.5", "gpt-3.5-turbo")
config_manager.add_anthropic_config("claude-3-sonnet")
except ValueError:
print("⚠️ Skipping actual model configuration (demo mode)")
# Create orchestrator and ensemble
orchestrator = MultiModelOrchestrator(config_manager)
ensemble = SpecializedModelEnsemble(orchestrator)
# Test task routing
test_contexts = [
{
'task_type': 'complex_reasoning',
'context': {'complexity': 'high', 'budget': 'high'}
},
{
'task_type': 'text_generation',
'context': {'text_type': 'creative', 'length': 'long'}
}
]
for test in test_contexts:
selected_model = orchestrator.route_request(
test['task_type'],
test['context']
)
print(f"Task type: {test['task_type']}")
print(f"Context: {test['context']}")
print(f"Selected model: {selected_model}\n")
return orchestrator, ensemble
# demo_orchestration = demonstrate_multi_model_orchestration()
3. Model Performance Monitoring and Optimization
To ensure efficient model usage, comprehensive performance monitoring needs to be implemented.
class ModelPerformanceMonitor:
"""Model performance monitor"""
def __init__(self):
self.metrics = {}
self.alerts = {}
self.thresholds = {
'response_time': 5.0, # 5 seconds
'error_rate': 0.05, # 5%
'cost_per_request': 0.1 # $0.1
}
def track_request(self,
model_name: str,
request_info: Dict[str, Any]):
"""Track individual request"""
if model_name not in self.metrics:
self.metrics[model_name] = {
'requests': [],
'hourly_stats': {},
'daily_stats': {}
}
# Add timestamp
request_info['timestamp'] = time.time()
self.metrics[model_name]['requests'].append(request_info)
# Check alert conditions
self.check_alerts(model_name, request_info)
def check_alerts(self, model_name: str, request_info: Dict[str, Any]):
"""Check alert conditions"""
# Response time alert
if request_info.get('response_time', 0) > self.thresholds['response_time']:
self.trigger_alert(
model_name,
'high_response_time',
f"Response time too high: {request_info['response_time']:.2f}s"
)
# Error rate alert
recent_requests = self.get_recent_requests(model_name, minutes=10)
if recent_requests:
error_rate = sum(1 for r in recent_requests if not r.get('success', True)) / len(recent_requests)
if error_rate > self.thresholds['error_rate']:
self.trigger_alert(
model_name,
'high_error_rate',
f"Error rate too high: {error_rate:.1%}"
)
def trigger_alert(self, model_name: str, alert_type: str, message: str):
"""Trigger alert"""
alert_key = f"{model_name}_{alert_type}"
current_time = time.time()
# Avoid duplicate alerts (no repeat within 5 minutes)
if alert_key in self.alerts:
last_alert_time = self.alerts[alert_key]['last_triggered']
if current_time - last_alert_time < 300: # 5 minutes
return
self.alerts[alert_key] = {
'model_name': model_name,
'alert_type': alert_type,
'message': message,
'last_triggered': current_time
}
print(f"🚨 Alert: {model_name} - {message}")
def get_recent_requests(self, model_name: str, minutes: int = 60) -> List[Dict]:
"""Get recent request records"""
if model_name not in self.metrics:
return []
cutoff_time = time.time() - (minutes * 60)
recent_requests = [
req for req in self.metrics[model_name]['requests']
if req['timestamp'] > cutoff_time
]
return recent_requests
def calculate_model_statistics(self, model_name: str, hours: int = 24) -> Dict[str, Any]:
"""Calculate model statistics"""
recent_requests = self.get_recent_requests(model_name, minutes=hours * 60)
if not recent_requests:
return {'status': 'no_data'}
# Basic statistics
total_requests = len(recent_requests)
successful_requests = sum(1 for r in recent_requests if r.get('success', True))
failed_requests = total_requests - successful_requests
# Response time statistics
response_times = [r.get('response_time', 0) for r in recent_requests if 'response_time' in r]
avg_response_time = sum(response_times) / len(response_times) if response_times else 0
# Cost statistics
total_cost = sum(r.get('cost', 0) for r in recent_requests)
avg_cost_per_request = total_cost / total_requests if total_requests > 0 else 0
# Token statistics
total_tokens = sum(r.get('tokens_used', 0) for r in recent_requests)
avg_tokens_per_request = total_tokens / total_requests if total_requests > 0 else 0
return {
'status': 'active',
'time_window_hours': hours,
'total_requests': total_requests,
'successful_requests': successful_requests,
'failed_requests': failed_requests,
'success_rate': successful_requests / total_requests if total_requests > 0 else 0,
'avg_response_time': avg_response_time,
'total_cost': total_cost,
'avg_cost_per_request': avg_cost_per_request,
'total_tokens': total_tokens,
'avg_tokens_per_request': avg_tokens_per_request
}
def generate_performance_report(self, model_names: List[str] = None) -> str:
"""Generate performance report"""
if model_names is None:
model_names = list(self.metrics.keys())
report_lines = []
report_lines.append("📊 Model Performance Monitoring Report")
report_lines.append("=" * 50)
for model_name in model_names:
stats = self.calculate_model_statistics(model_name)
if stats['status'] == 'no_data':
report_lines.append(f"\n🔍 {model_name}: No data")
continue
report_lines.append(f"\n🤖 {model_name}:")
report_lines.append(f" Total requests: {stats['total_requests']}")
report_lines.append(f" Success rate: {stats['success_rate']:.1%}")
report_lines.append(f" Avg response time: {stats['avg_response_time']:.2f}s")
report_lines.append(f" Total cost: ${stats['total_cost']:.4f}")
report_lines.append(f" Avg cost per request: ${stats['avg_cost_per_request']:.4f}")
report_lines.append(f" Avg tokens: {stats['avg_tokens_per_request']:.0f}")
# Add active alerts
if self.alerts:
report_lines.append(f"\n🚨 Active Alerts:")
for alert_key, alert_info in self.alerts.items():
report_lines.append(f" {alert_info['model_name']}: {alert_info['message']}")
return "\n".join(report_lines)
class AdaptiveModelSelector:
"""Adaptive model selector"""
def __init__(self,
monitor: ModelPerformanceMonitor,
orchestrator: MultiModelOrchestrator):
self.monitor = monitor
self.orchestrator = orchestrator
self.selection_history = []
self.learning_rate = 0.1
def select_optimal_model(self,
task_type: str,
context: Dict[str, Any]) -> str:
"""Select optimal model based on historical performance"""
# Get candidate models
candidate_models = self.get_candidate_models(task_type)
if not candidate_models:
return "gpt-3.5" # Default model
# Calculate composite score for each model
model_scores = {}
for model_name in candidate_models:
score = self.calculate_model_score(model_name, context)
model_scores[model_name] = score
# Select model with highest score
best_model = max(model_scores, key=model_scores.get)
# Record selection history
selection_record = {
'timestamp': time.time(),
'task_type': task_type,
'context': context,
'selected_model': best_model,
'candidate_models': candidate_models,
'model_scores': model_scores
}
self.selection_history.append(selection_record)
print(f"🎯 Adaptive selection: {task_type} -> {best_model}")
print(f" Candidate model scores: {model_scores}")
return best_model
def get_candidate_models(self, task_type: str) -> List[str]:
"""Get candidate models for task type"""
candidate_models = []
for model_name, info in self.orchestrator.model_capabilities.items():
if task_type in info['capabilities']:
candidate_models.append(model_name)
return candidate_models
def calculate_model_score(self,
model_name: str,
context: Dict[str, Any]) -> float:
"""Calculate model composite score"""
# Get historical performance statistics
stats = self.monitor.calculate_model_statistics(model_name, hours=24)
if stats['status'] == 'no_data':
return 0.5 # Default medium score
# Performance factor (success rate, response time)
performance_factor = (
stats['success_rate'] * 0.4 +
(1.0 / max(stats['avg_response_time'], 0.1)) * 0.1
)
# Cost factor
cost_factor = 1.0 / (1.0 + stats['avg_cost_per_request'] * 10)
# Task adaptation factor
model_info = self.orchestrator.model_capabilities.get(model_name, {})
task_type = context.get('task_type', 'general')
task_strength = model_info.get('strengths', {}).get(task_type, 0.5)
# Composite score
total_score = (
performance_factor * 0.4 +
cost_factor * 0.3 +
task_strength * 0.3
)
return total_score
def update_model_performance(self,
model_name: str,
task_result: Dict[str, Any]):
"""Update model performance data"""
# Extract performance metrics
request_info = {
'success': task_result.get('success', False),
'response_time': task_result.get('execution_time', 0),
'tokens_used': task_result.get('tokens_used', 0),
'cost': task_result.get('cost', 0),
'task_type': task_result.get('task_type', 'unknown')
}
# Submit to monitor
self.monitor.track_request(model_name, request_info)
# Learning adjustment (simple reinforcement learning)
self.adjust_selection_strategy(model_name, task_result)
def adjust_selection_strategy(self,
model_name: str,
task_result: Dict[str, Any]):
"""Adjust selection strategy"""
# Adjust model capability assessment based on results
success = task_result.get('success', False)
task_type = task_result.get('task_type', 'unknown')
if model_name in self.orchestrator.model_capabilities:
strengths = self.orchestrator.model_capabilities[model_name]['strengths']
if task_type in strengths:
# Update strength assessment based on results
current_strength = strengths[task_type]
if success:
# Slightly increase score on success
new_strength = min(1.0, current_strength + self.learning_rate * 0.1)
else:
# Slightly decrease score on failure
new_strength = max(0.1, current_strength - self.learning_rate * 0.1)
strengths[task_type] = new_strength
# Usage example
def demonstrate_performance_monitoring():
"""Demonstrate performance monitoring"""
monitor = ModelPerformanceMonitor()
# Simulate some request data
models_data = {
'gpt-4': [
{'success': True, 'response_time': 2.5, 'tokens_used': 150, 'cost': 0.045},
{'success': True, 'response_time': 3.2, 'tokens_used': 200, 'cost': 0.06},
{'success': False, 'response_time': 8.0, 'tokens_used': 0, 'cost': 0.0},
],
'gpt-3.5': [
{'success': True, 'response_time': 1.8, 'tokens_used': 180, 'cost': 0.0036},
{'success': True, 'response_time': 1.5, 'tokens_used': 160, 'cost': 0.0032},
{'success': True, 'response_time': 2.0, 'tokens_used': 190, 'cost': 0.0038},
]
}
# Submit monitoring data
for model_name, requests in models_data.items():
for request in requests:
monitor.track_request(model_name, request)
# Generate performance report
report = monitor.generate_performance_report()
print(report)
return monitor
# demo_monitoring = demonstrate_performance_monitoring()
4. Model Parameter Tuning
Different tasks require different model parameter configurations to achieve optimal results.
class ModelParameterOptimizer:
"""Model parameter optimizer"""
def __init__(self):
self.parameter_history = {}
self.optimization_results = {}
# Define parameter search space
self.parameter_space = {
'temperature': [0.0, 0.3, 0.5, 0.7, 0.9, 1.0],
'max_tokens': [100, 250, 500, 1000, 2000],
'top_p': [0.8, 0.9, 0.95, 1.0],
'frequency_penalty': [0.0, 0.1, 0.2, 0.5],
'presence_penalty': [0.0, 0.1, 0.2, 0.5]
}
def optimize_parameters(self,
model_config_name: str,
task_type: str,
test_examples: List[dspy.Example],
evaluation_metric: Callable,
max_iterations: int = 10) -> Dict[str, Any]:
"""Optimize model parameters"""
print(f"🔧 Starting parameter optimization: {model_config_name} - {task_type}")
best_params = None
best_score = 0.0
optimization_history = []
# Use grid search for parameter optimization
for iteration in range(max_iterations):
print(f"\n🔄 Optimization iteration {iteration + 1}/{max_iterations}")
# Generate parameter combination
if iteration == 0:
# First time use default parameters
params = self.get_default_parameters()
else:
# Subsequent iterations use random search or Bayesian optimization
params = self.generate_parameter_combination(
best_params if best_params else {}
)
print(f"📋 Testing parameters: {params}")
# Evaluate parameter combination
score = self.evaluate_parameter_combination(
model_config_name,
params,
test_examples,
evaluation_metric
)
optimization_history.append({
'iteration': iteration + 1,
'parameters': params.copy(),
'score': score
})
print(f"📊 Score: {score:.3f}")
# Update best parameters
if score > best_score:
best_score = score
best_params = params.copy()
print(f"🏆 Better parameters found! Score: {score:.3f}")
# Save optimization results
optimization_key = f"{model_config_name}_{task_type}"
self.optimization_results[optimization_key] = {
'best_parameters': best_params,
'best_score': best_score,
'optimization_history': optimization_history,
'total_iterations': max_iterations
}
print(f"\n✅ Parameter optimization completed")
print(f"🎯 Best score: {best_score:.3f}")
print(f"🔧 Best parameters: {best_params}")
return self.optimization_results[optimization_key]
def get_default_parameters(self) -> Dict[str, Any]:
"""Get default parameters"""
return {
'temperature': 0.7,
'max_tokens': 500,
'top_p': 0.9,
'frequency_penalty': 0.0,
'presence_penalty': 0.0
}
def generate_parameter_combination(self,
base_params: Dict[str, Any] = None) -> Dict[str, Any]:
"""Generate parameter combination"""
import random
if base_params is None:
base_params = self.get_default_parameters()
new_params = base_params.copy()
# Randomly select 1-2 parameters to adjust
params_to_adjust = random.sample(
list(self.parameter_space.keys()),
random.randint(1, 2)
)
for param_name in params_to_adjust:
if param_name in self.parameter_space:
new_params[param_name] = random.choice(
self.parameter_space[param_name]
)
return new_params
def evaluate_parameter_combination(self,
model_config_name: str,
parameters: Dict[str, Any],
test_examples: List[dspy.Example],
evaluation_metric: Callable) -> float:
"""Evaluate parameter combination"""
# Create parameterized model configuration
test_model = self.create_parameterized_model(model_config_name, parameters)
# Temporarily set model
original_model = dspy.settings.lm
dspy.settings.configure(lm=test_model)
try:
scores = []
# Evaluate on test samples
for example in test_examples[:20]: # Limit test sample count
try:
# Create simple test program
predictor = dspy.Predict("question -> answer")
prediction = predictor(**example.inputs())
score = evaluation_metric(example, prediction)
scores.append(float(score))
except Exception as e:
print(f"⚠️ Sample evaluation failed: {e}")
scores.append(0.0)
# Calculate average score
average_score = sum(scores) / len(scores) if scores else 0.0
finally:
# Restore original model
dspy.settings.configure(lm=original_model)
return average_score
def create_parameterized_model(self,
model_config_name: str,
parameters: Dict[str, Any]):
"""Create parameterized model"""
# This needs to create model based on actual model configuration system
# For demonstration purposes, return a mock model
class ParameterizedModel(dspy.LM):
def __init__(self, config_name, params):
self.config_name = config_name
self.parameters = params
def basic_request(self, prompt, **kwargs):
# Simulate parameterized request
return [f"Parameterized response (temp={self.parameters.get('temperature', 0.7)}): {prompt[:30]}..."]
def __call__(self, prompt, **kwargs):
return self.basic_request(prompt, **kwargs)
return ParameterizedModel(model_config_name, parameters)
def get_optimized_parameters(self,
model_config_name: str,
task_type: str) -> Dict[str, Any]:
"""Get optimized parameters"""
optimization_key = f"{model_config_name}_{task_type}"
if optimization_key in self.optimization_results:
return self.optimization_results[optimization_key]['best_parameters']
return self.get_default_parameters()
def apply_optimized_parameters(self,
model_config: BaseModelConfig,
task_type: str):
"""Apply optimized parameters to model configuration"""
optimized_params = self.get_optimized_parameters(
model_config.model_name,
task_type
)
# Update configuration parameters
for param_name, param_value in optimized_params.items():
if hasattr(model_config, param_name):
setattr(model_config, param_name, param_value)
else:
model_config.additional_params[param_name] = param_value
print(f"✅ Optimized parameters applied to {model_config.model_name}")
class TaskSpecificOptimizer:
"""Task-specific optimizer"""
def __init__(self, parameter_optimizer: ModelParameterOptimizer):
self.parameter_optimizer = parameter_optimizer
self.task_templates = {}
def register_task_template(self,
task_type: str,
template_config: Dict[str, Any]):
"""Register task template"""
self.task_templates[task_type] = template_config
print(f"📝 Task template registered: {task_type}")
def optimize_for_task(self,
model_name: str,
task_type: str,
training_data: List[dspy.Example]) -> Dict[str, Any]:
"""Optimize model for specific task"""
print(f"🎯 Optimizing model for task: {task_type}")
# Get task template
if task_type in self.task_templates:
template = self.task_templates[task_type]
base_params = template.get('base_parameters', {})
print(f"📋 Using task template: {base_params}")
else:
base_params = {}
# Define evaluation metric
def task_evaluation_metric(example, prediction):
if task_type == 'summarization':
return self.evaluate_summarization(example, prediction)
elif task_type == 'question_answering':
return self.evaluate_qa(example, prediction)
elif task_type == 'creative_writing':
return self.evaluate_creativity(example, prediction)
else:
return self.evaluate_general(example, prediction)
# Execute parameter optimization
optimization_result = self.parameter_optimizer.optimize_parameters(
model_name,
task_type,
training_data,
task_evaluation_metric,
max_iterations=5 # Reduce iterations to save time
)
return optimization_result
def evaluate_summarization(self, example, prediction) -> float:
"""Evaluate summarization quality"""
# Simplified summarization evaluation
expected = getattr(example, 'summary', '') or getattr(example, 'answer', '')
actual = getattr(prediction, 'answer', '') or str(prediction)
# Moderate length score
length_score = 0.5
if 50 <= len(actual) <= 200:
length_score = 1.0
elif len(actual) < 20 or len(actual) > 400:
length_score = 0.0
# Content similarity score (simplified)
expected_words = set(expected.lower().split())
actual_words = set(actual.lower().split())
if expected_words and actual_words:
overlap = len(expected_words & actual_words)
union = len(expected_words | actual_words)
similarity_score = overlap / union if union > 0 else 0.0
else:
similarity_score = 0.0
return (length_score * 0.3 + similarity_score * 0.7)
def evaluate_qa(self, example, prediction) -> float:
"""Evaluate question answering quality"""
expected = getattr(example, 'answer', '')
actual = getattr(prediction, 'answer', '') or str(prediction)
# Simple containment match
if expected.lower() in actual.lower() or actual.lower() in expected.lower():
return 1.0
else:
return 0.0
def evaluate_creativity(self, example, prediction) -> float:
"""Evaluate creative writing quality"""
actual = getattr(prediction, 'answer', '') or str(prediction)
# Length score
length_score = min(len(actual) / 500, 1.0) # Encourage longer creative content
# Vocabulary diversity score
words = actual.lower().split()
unique_words = len(set(words))
diversity_score = unique_words / len(words) if words else 0.0
return (length_score * 0.4 + diversity_score * 0.6)
def evaluate_general(self, example, prediction) -> float:
"""General evaluation"""
expected = getattr(example, 'answer', '') or getattr(example, 'output', '')
actual = getattr(prediction, 'answer', '') or str(prediction)
if not expected or not actual:
return 0.5
# Simple similarity evaluation
expected_words = set(expected.lower().split())
actual_words = set(actual.lower().split())
if expected_words and actual_words:
overlap = len(expected_words & actual_words)
return overlap / max(len(expected_words), len(actual_words))
return 0.0
# Usage example
def demonstrate_parameter_optimization():
"""Demonstrate parameter optimization"""
optimizer = ModelParameterOptimizer()
task_optimizer = TaskSpecificOptimizer(optimizer)
# Register task template
task_optimizer.register_task_template(
'summarization',
{
'base_parameters': {
'temperature': 0.3,
'max_tokens': 200
},
'evaluation_criteria': ['length', 'coherence', 'coverage']
}
)
# Create test data
test_examples = [
dspy.Example(
text="Artificial intelligence is a branch of computer science...",
summary="AI is a computer science branch"
).with_inputs('text'),
dspy.Example(
text="Machine learning allows computers to learn from data through algorithms...",
summary="Machine learning enables computers to learn from data"
).with_inputs('text')
]
# Optimize parameters for summarization task
result = task_optimizer.optimize_for_task(
'gpt-3.5',
'summarization',
test_examples
)
print(f"\n🎊 Optimization results:")
print(f"Best parameters: {result['best_parameters']}")
print(f"Best score: {result['best_score']:.3f}")
return optimizer, task_optimizer
# demo_optimization = demonstrate_parameter_optimization()
Practice Exercises
Exercise 1: Implement Custom Model Adapter
class CustomModelAdapter:
"""Custom model adapter exercise"""
def __init__(self, model_endpoint: str):
self.model_endpoint = model_endpoint
def adapt_custom_api(self):
"""Adapt custom API"""
# TODO: Implement custom API adaptation
pass
def implement_retry_logic(self):
"""Implement retry logic"""
# TODO: Implement smart retry mechanism
pass
def add_rate_limiting(self):
"""Add rate limiting"""
# TODO: Implement API call rate control
pass
# Exercise tasks:
# 1. Choose a third-party API and implement DSPy adapter
# 2. Add error handling and retry mechanism
# 3. Implement request rate limiting
Exercise 2: Build Model Performance Benchmark
class ModelBenchmark:
"""Model performance benchmark exercise"""
def __init__(self):
self.benchmark_suites = {}
self.results = {}
def create_benchmark_suite(self, suite_name: str, tasks: List[Dict]):
"""Create benchmark suite"""
# TODO: Implement benchmark suite
pass
def run_benchmark(self, models: List[str], suite_name: str):
"""Run benchmark"""
# TODO: Implement automated benchmarking
pass
def generate_comparison_report(self):
"""Generate comparison report"""
# TODO: Implement detailed performance comparison report
pass
# Exercise tasks:
# 1. Design multi-dimensional performance evaluation metrics
# 2. Implement automated benchmark testing process
# 3. Generate visualized performance comparison report
Best Practices
1. Model Selection Strategy
def model_selection_guidelines():
"""Model selection guidelines"""
guidelines = {
'Task Complexity Matching': [
'Use lightweight models for simple tasks',
'Use high-performance models for complex reasoning',
'Choose generation-focused models for creative tasks'
],
'Cost-Benefit Optimization': [
'Select appropriate models based on budget constraints',
'Implement smart caching to reduce duplicate calls',
'Use traffic allocation to optimize costs'
],
'Performance-Latency Balance': [
'Prioritize response speed for real-time applications',
'Prioritize accuracy for batch processing tasks',
'Implement multi-tier fallback strategies'
],
'Reliability Assurance': [
'Set up multiple model alternatives',
'Implement health checks and monitoring',
'Establish exception handling mechanisms'
]
}
return guidelines
class ModelGovernance:
"""Model governance framework"""
def __init__(self):
self.policies = {}
self.compliance_checks = []
self.audit_logs = []
def define_usage_policy(self, policy_name: str, rules: Dict[str, Any]):
"""Define usage policy"""
self.policies[policy_name] = {
'rules': rules,
'created_at': time.time(),
'active': True
}
def enforce_policies(self, request_context: Dict[str, Any]) -> bool:
"""Enforce policy checks"""
for policy_name, policy in self.policies.items():
if policy['active']:
if not self.check_policy_compliance(request_context, policy['rules']):
self.log_policy_violation(policy_name, request_context)
return False
return True
def check_policy_compliance(self, context: Dict, rules: Dict) -> bool:
"""Check policy compliance"""
# Implement specific policy check logic
return True
def log_policy_violation(self, policy_name: str, context: Dict):
"""Log policy violation"""
violation_log = {
'timestamp': time.time(),
'policy': policy_name,
'context': context,
'action': 'blocked'
}
self.audit_logs.append(violation_log)
2. Security and Privacy Protection
class ModelSecurityManager:
"""Model security manager"""
def __init__(self):
self.security_policies = {}
self.data_filters = []
self.audit_enabled = True
def add_input_filter(self, filter_func: Callable[[str], bool]):
"""Add input filter"""
self.data_filters.append(filter_func)
def sanitize_input(self, user_input: str) -> str:
"""Sanitize user input"""
sanitized_input = user_input
# Remove sensitive information
import re
# Remove potential injection attempts
injection_patterns = [
r'<script.*?</script>',
r'javascript:',
r'eval\s*\(',
r'exec\s*\('
]
for pattern in injection_patterns:
sanitized_input = re.sub(pattern, '', sanitized_input, flags=re.IGNORECASE)
# Apply custom filters
for filter_func in self.data_filters:
if not filter_func(sanitized_input):
raise ValueError("Input failed security check")
return sanitized_input
def protect_sensitive_output(self, model_output: str) -> str:
"""Protect sensitive output"""
protected_output = model_output
# Remove potential sensitive information
sensitive_patterns = [
(r'\b\d{4}[-\s]?\d{4}[-\s]?\d{4}[-\s]?\d{4}\b', '****-****-****-****'), # Credit card
(r'\b\d{3}-\d{2}-\d{4}\b', '***-**-****'), # SSN
(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', '****@****.***') # Email
]
for pattern, replacement in sensitive_patterns:
protected_output = re.sub(pattern, replacement, protected_output)
return protected_output
def audit_request(self, request_info: Dict[str, Any]):
"""Audit request"""
if self.audit_enabled:
audit_entry = {
'timestamp': time.time(),
'user_id': request_info.get('user_id', 'anonymous'),
'model_used': request_info.get('model', 'unknown'),
'input_length': len(request_info.get('input', '')),
'output_length': len(request_info.get('output', '')),
'success': request_info.get('success', False)
}
# Log to audit trail
print(f"📝 Audit record: {audit_entry}")
Through this chapter, you should have mastered how to integrate and manage multiple language models in DSPy. These skills can help you build more flexible, efficient, and reliable AI application systems.