Chapter 08: DSPy Language Model Integration

Haiyue
54min

Chapter 08: DSPy Language Model Integration

Learning Objectives
  • Configure different language model backends
  • Learn OpenAI, Claude, and local model integration
  • Implement multi-model collaboration and switching
  • Master model configuration and parameter tuning
  • Understand model selection strategies

Key Concepts

1. Language Model Backend Configuration

DSPy supports multiple language model backends, providing a unified interface for using different model services.

Basic Model Configuration

import dspy
import os
from typing import Dict, Any, List, Optional, Union, Callable
import json
import time
import asyncio
from abc import ABC, abstractmethod

class BaseModelConfig:
    """Base model configuration class"""

    def __init__(self,
                 model_name: str,
                 api_key: str = None,
                 api_base: str = None,
                 max_tokens: int = 1000,
                 temperature: float = 0.7,
                 **kwargs):

        self.model_name = model_name
        self.api_key = api_key
        self.api_base = api_base
        self.max_tokens = max_tokens
        self.temperature = temperature
        self.additional_params = kwargs

    def to_dict(self) -> Dict[str, Any]:
        """Convert to dictionary"""
        return {
            'model_name': self.model_name,
            'api_key': self.api_key,
            'api_base': self.api_base,
            'max_tokens': self.max_tokens,
            'temperature': self.temperature,
            **self.additional_params
        }

class ModelConfigManager:
    """Model configuration manager"""

    def __init__(self):
        self.configs = {}
        self.active_models = {}
        self.model_metrics = {}

    def add_openai_config(self,
                         config_name: str,
                         model_name: str = "gpt-3.5-turbo",
                         api_key: str = None,
                         **kwargs) -> BaseModelConfig:
        """Add OpenAI configuration"""

        if not api_key:
            api_key = os.getenv('OPENAI_API_KEY')

        if not api_key:
            raise ValueError("OpenAI API key is required")

        config = BaseModelConfig(
            model_name=model_name,
            api_key=api_key,
            api_base=kwargs.get('api_base', 'https://api.openai.com/v1'),
            **kwargs
        )

        self.configs[config_name] = {
            'type': 'openai',
            'config': config,
            'status': 'configured'
        }

        print(f"✅ OpenAI model configured: {config_name} ({model_name})")
        return config

    def add_anthropic_config(self,
                            config_name: str,
                            model_name: str = "claude-3-sonnet-20240229",
                            api_key: str = None,
                            **kwargs) -> BaseModelConfig:
        """Add Anthropic (Claude) configuration"""

        if not api_key:
            api_key = os.getenv('ANTHROPIC_API_KEY')

        if not api_key:
            raise ValueError("Anthropic API key is required")

        config = BaseModelConfig(
            model_name=model_name,
            api_key=api_key,
            api_base=kwargs.get('api_base', 'https://api.anthropic.com'),
            **kwargs
        )

        self.configs[config_name] = {
            'type': 'anthropic',
            'config': config,
            'status': 'configured'
        }

        print(f"✅ Anthropic model configured: {config_name} ({model_name})")
        return config

    def add_local_model_config(self,
                              config_name: str,
                              model_path: str,
                              model_type: str = "huggingface",
                              **kwargs) -> BaseModelConfig:
        """Add local model configuration"""

        config = BaseModelConfig(
            model_name=model_path,
            api_base="local",
            model_type=model_type,
            **kwargs
        )

        self.configs[config_name] = {
            'type': 'local',
            'config': config,
            'status': 'configured'
        }

        print(f"✅ Local model configured: {config_name} ({model_path})")
        return config

    def initialize_model(self, config_name: str) -> dspy.LM:
        """Initialize model"""

        if config_name not in self.configs:
            raise ValueError(f"Configuration {config_name} does not exist")

        config_info = self.configs[config_name]
        config = config_info['config']
        model_type = config_info['type']

        try:
            if model_type == 'openai':
                model = dspy.OpenAI(
                    model=config.model_name,
                    api_key=config.api_key,
                    api_base=config.api_base,
                    max_tokens=config.max_tokens,
                    temperature=config.temperature
                )

            elif model_type == 'anthropic':
                # Note: Actual usage requires ensuring DSPy supports Anthropic
                model = self.create_anthropic_model(config)

            elif model_type == 'local':
                model = self.create_local_model(config)

            else:
                raise ValueError(f"Unsupported model type: {model_type}")

            self.active_models[config_name] = model
            self.configs[config_name]['status'] = 'active'

            print(f"🚀 Model activated: {config_name}")
            return model

        except Exception as e:
            self.configs[config_name]['status'] = 'failed'
            print(f"❌ Model initialization failed: {config_name} - {str(e)}")
            raise

    def create_anthropic_model(self, config: BaseModelConfig):
        """Create Anthropic model (custom implementation)"""

        class AnthropicModel(dspy.LM):
            def __init__(self, config):
                self.config = config
                self.history = []

            def basic_request(self, prompt: str, **kwargs):
                # Actual Anthropic API call needed here
                # For demonstration, return mock response
                return [f"Mock Anthropic response: {prompt[:50]}..."]

            def __call__(self, prompt, **kwargs):
                return self.basic_request(prompt, **kwargs)

        return AnthropicModel(config)

    def create_local_model(self, config: BaseModelConfig):
        """Create local model"""

        class LocalModel(dspy.LM):
            def __init__(self, config):
                self.config = config
                self.model = None
                self.tokenizer = None
                self._load_model()

            def _load_model(self):
                # Simulate local model loading
                print(f"📥 Loading local model: {self.config.model_name}")
                # Actual implementation needs to load real model
                pass

            def basic_request(self, prompt: str, **kwargs):
                # Simulate local model inference
                return [f"Local model response: {prompt[:30]}..."]

            def __call__(self, prompt, **kwargs):
                return self.basic_request(prompt, **kwargs)

        return LocalModel(config)

    def get_model(self, config_name: str) -> dspy.LM:
        """Get model instance"""
        if config_name not in self.active_models:
            return self.initialize_model(config_name)

        return self.active_models[config_name]

    def list_configs(self) -> Dict[str, Dict]:
        """List all configurations"""
        return {
            name: {
                'type': info['type'],
                'status': info['status'],
                'model_name': info['config'].model_name
            }
            for name, info in self.configs.items()
        }

    def switch_default_model(self, config_name: str):
        """Switch default model"""
        model = self.get_model(config_name)
        dspy.settings.configure(lm=model)
        print(f"🔄 Default model switched to: {config_name}")

# Usage example
def demonstrate_model_configuration():
    """Demonstrate model configuration"""

    config_manager = ModelConfigManager()

    # Configure multiple models
    try:
        # OpenAI configuration
        config_manager.add_openai_config(
            "gpt-3.5",
            model_name="gpt-3.5-turbo",
            temperature=0.5,
            max_tokens=500
        )

        config_manager.add_openai_config(
            "gpt-4",
            model_name="gpt-4",
            temperature=0.3,
            max_tokens=1000
        )

        # Anthropic configuration
        config_manager.add_anthropic_config(
            "claude-3-sonnet",
            model_name="claude-3-sonnet-20240229"
        )

        # Local model configuration
        config_manager.add_local_model_config(
            "local-llama",
            model_path="/path/to/llama-model"
        )

    except ValueError as e:
        print(f"⚠️ Configuration skipped (missing API key): {e}")

    # List configurations
    configs = config_manager.list_configs()
    print(f"\n📋 Configured models:")
    for name, info in configs.items():
        print(f"  {name}: {info['type']} - {info['model_name']} ({info['status']})")

    return config_manager

# demo_config_manager = demonstrate_model_configuration()

2. Multi-Model Collaboration Strategy

In complex applications, multiple models often need to collaborate to complete tasks.

class MultiModelOrchestrator:
    """Multi-model orchestrator"""

    def __init__(self, config_manager: ModelConfigManager):
        self.config_manager = config_manager
        self.model_capabilities = {}
        self.routing_rules = {}
        self.fallback_chains = {}

    def define_model_capabilities(self,
                                 config_name: str,
                                 capabilities: List[str],
                                 strengths: Dict[str, float] = None,
                                 cost_per_token: float = 0.0):
        """Define model capabilities"""

        self.model_capabilities[config_name] = {
            'capabilities': capabilities,
            'strengths': strengths or {},
            'cost_per_token': cost_per_token,
            'usage_stats': {
                'total_requests': 0,
                'successful_requests': 0,
                'avg_response_time': 0.0,
                'total_cost': 0.0
            }
        }

        print(f"📝 Model capabilities defined: {config_name}")
        print(f"   Capabilities: {', '.join(capabilities)}")

    def add_routing_rule(self,
                        task_type: str,
                        model_selector: Callable[[Dict], str]):
        """Add routing rule"""

        self.routing_rules[task_type] = model_selector
        print(f"🛤️ Routing rule added: {task_type}")

    def set_fallback_chain(self,
                          primary_model: str,
                          fallback_models: List[str]):
        """Set fallback chain"""

        self.fallback_chains[primary_model] = fallback_models
        print(f"🔄 Fallback chain set: {primary_model} -> {fallback_models}")

    def route_request(self,
                     task_type: str,
                     context: Dict[str, Any] = None) -> str:
        """Route request to appropriate model"""

        if task_type in self.routing_rules:
            selected_model = self.routing_rules[task_type](context or {})
            print(f"🎯 Task routed: {task_type} -> {selected_model}")
            return selected_model

        # Default routing logic
        return self.default_model_selection(task_type, context)

    def default_model_selection(self,
                               task_type: str,
                               context: Dict[str, Any]) -> str:
        """Default model selection logic"""

        # Capability-based matching
        suitable_models = []

        for model_name, info in self.model_capabilities.items():
            if task_type in info['capabilities']:
                strength = info['strengths'].get(task_type, 0.5)
                cost = info['cost_per_token']

                # Calculate composite score (adjust weights as needed)
                score = strength * 0.7 - cost * 0.3

                suitable_models.append((model_name, score))

        if suitable_models:
            # Select model with highest score
            selected_model = max(suitable_models, key=lambda x: x[1])[0]
            return selected_model

        # If no suitable model, return first available model
        available_models = list(self.model_capabilities.keys())
        return available_models[0] if available_models else "default"

    def execute_with_fallback(self,
                             model_name: str,
                             task_func: Callable,
                             *args, **kwargs) -> Dict[str, Any]:
        """Execute task with fallback support"""

        models_to_try = [model_name]

        # Add fallback models
        if model_name in self.fallback_chains:
            models_to_try.extend(self.fallback_chains[model_name])

        last_error = None

        for attempt, current_model in enumerate(models_to_try):
            try:
                print(f"🔄 Trying model: {current_model} (attempt {attempt + 1})")

                # Get model instance
                model = self.config_manager.get_model(current_model)

                # Set model and execute task
                dspy.settings.configure(lm=model)
                start_time = time.time()

                result = task_func(*args, **kwargs)

                execution_time = time.time() - start_time

                # Update statistics
                self.update_usage_stats(current_model, True, execution_time)

                return {
                    'result': result,
                    'model_used': current_model,
                    'attempt': attempt + 1,
                    'execution_time': execution_time,
                    'success': True
                }

            except Exception as e:
                last_error = e
                print(f"❌ Model {current_model} execution failed: {str(e)}")

                # Update statistics
                self.update_usage_stats(current_model, False, 0.0)

                if attempt < len(models_to_try) - 1:
                    print(f"⏭️ Trying next model...")
                    continue
                else:
                    print(f"🚨 All models failed")
                    break

        return {
            'result': None,
            'model_used': None,
            'attempt': len(models_to_try),
            'error': str(last_error),
            'success': False
        }

    def update_usage_stats(self,
                          model_name: str,
                          success: bool,
                          execution_time: float):
        """Update usage statistics"""

        if model_name not in self.model_capabilities:
            return

        stats = self.model_capabilities[model_name]['usage_stats']

        stats['total_requests'] += 1
        if success:
            stats['successful_requests'] += 1

        # Update average response time
        if stats['total_requests'] > 1:
            current_avg = stats['avg_response_time']
            stats['avg_response_time'] = (
                (current_avg * (stats['total_requests'] - 1) + execution_time)
                / stats['total_requests']
            )
        else:
            stats['avg_response_time'] = execution_time

    def get_performance_report(self) -> Dict[str, Any]:
        """Get performance report"""

        report = {
            'models': {},
            'summary': {
                'total_requests': 0,
                'total_successful': 0,
                'avg_success_rate': 0.0
            }
        }

        total_requests = 0
        total_successful = 0

        for model_name, info in self.model_capabilities.items():
            stats = info['usage_stats']

            success_rate = (
                stats['successful_requests'] / stats['total_requests']
                if stats['total_requests'] > 0 else 0.0
            )

            report['models'][model_name] = {
                'total_requests': stats['total_requests'],
                'success_rate': success_rate,
                'avg_response_time': stats['avg_response_time'],
                'total_cost': stats['total_cost']
            }

            total_requests += stats['total_requests']
            total_successful += stats['successful_requests']

        if total_requests > 0:
            report['summary']['total_requests'] = total_requests
            report['summary']['total_successful'] = total_successful
            report['summary']['avg_success_rate'] = total_successful / total_requests

        return report

class SpecializedModelEnsemble:
    """Specialized model ensemble"""

    def __init__(self, orchestrator: MultiModelOrchestrator):
        self.orchestrator = orchestrator

        # Define specialized models
        self.setup_specialized_models()

    def setup_specialized_models(self):
        """Set up specialized models"""

        # Define model capabilities
        self.orchestrator.define_model_capabilities(
            "gpt-4",
            capabilities=["reasoning", "analysis", "complex_questions"],
            strengths={"reasoning": 0.9, "analysis": 0.85, "complex_questions": 0.9},
            cost_per_token=0.03
        )

        self.orchestrator.define_model_capabilities(
            "gpt-3.5",
            capabilities=["general", "summarization", "simple_questions"],
            strengths={"general": 0.8, "summarization": 0.85, "simple_questions": 0.8},
            cost_per_token=0.002
        )

        self.orchestrator.define_model_capabilities(
            "claude-3-sonnet",
            capabilities=["creative_writing", "analysis", "long_context"],
            strengths={"creative_writing": 0.9, "analysis": 0.85, "long_context": 0.95},
            cost_per_token=0.015
        )

        # Set routing rules
        self.orchestrator.add_routing_rule(
            "complex_reasoning",
            lambda ctx: self.select_reasoning_model(ctx)
        )

        self.orchestrator.add_routing_rule(
            "text_generation",
            lambda ctx: self.select_generation_model(ctx)
        )

        # Set fallback chains
        self.orchestrator.set_fallback_chain("gpt-4", ["claude-3-sonnet", "gpt-3.5"])
        self.orchestrator.set_fallback_chain("claude-3-sonnet", ["gpt-4", "gpt-3.5"])

    def select_reasoning_model(self, context: Dict[str, Any]) -> str:
        """Select reasoning model"""

        complexity = context.get('complexity', 'medium')
        budget = context.get('budget', 'medium')

        if complexity == 'high' and budget == 'high':
            return "gpt-4"
        elif complexity == 'high' and budget == 'medium':
            return "claude-3-sonnet"
        else:
            return "gpt-3.5"

    def select_generation_model(self, context: Dict[str, Any]) -> str:
        """Select generation model"""

        text_type = context.get('text_type', 'general')
        length = context.get('length', 'medium')

        if text_type == 'creative' or length == 'long':
            return "claude-3-sonnet"
        elif text_type == 'technical':
            return "gpt-4"
        else:
            return "gpt-3.5"

# Usage example
def demonstrate_multi_model_orchestration():
    """Demonstrate multi-model orchestration"""

    # Assume config manager exists
    config_manager = ModelConfigManager()

    # Add some mock configurations (real API keys needed in actual use)
    try:
        config_manager.add_openai_config("gpt-4", "gpt-4")
        config_manager.add_openai_config("gpt-3.5", "gpt-3.5-turbo")
        config_manager.add_anthropic_config("claude-3-sonnet")
    except ValueError:
        print("⚠️ Skipping actual model configuration (demo mode)")

    # Create orchestrator and ensemble
    orchestrator = MultiModelOrchestrator(config_manager)
    ensemble = SpecializedModelEnsemble(orchestrator)

    # Test task routing
    test_contexts = [
        {
            'task_type': 'complex_reasoning',
            'context': {'complexity': 'high', 'budget': 'high'}
        },
        {
            'task_type': 'text_generation',
            'context': {'text_type': 'creative', 'length': 'long'}
        }
    ]

    for test in test_contexts:
        selected_model = orchestrator.route_request(
            test['task_type'],
            test['context']
        )

        print(f"Task type: {test['task_type']}")
        print(f"Context: {test['context']}")
        print(f"Selected model: {selected_model}\n")

    return orchestrator, ensemble

# demo_orchestration = demonstrate_multi_model_orchestration()

3. Model Performance Monitoring and Optimization

To ensure efficient model usage, comprehensive performance monitoring needs to be implemented.

class ModelPerformanceMonitor:
    """Model performance monitor"""

    def __init__(self):
        self.metrics = {}
        self.alerts = {}
        self.thresholds = {
            'response_time': 5.0,  # 5 seconds
            'error_rate': 0.05,    # 5%
            'cost_per_request': 0.1  # $0.1
        }

    def track_request(self,
                     model_name: str,
                     request_info: Dict[str, Any]):
        """Track individual request"""

        if model_name not in self.metrics:
            self.metrics[model_name] = {
                'requests': [],
                'hourly_stats': {},
                'daily_stats': {}
            }

        # Add timestamp
        request_info['timestamp'] = time.time()

        self.metrics[model_name]['requests'].append(request_info)

        # Check alert conditions
        self.check_alerts(model_name, request_info)

    def check_alerts(self, model_name: str, request_info: Dict[str, Any]):
        """Check alert conditions"""

        # Response time alert
        if request_info.get('response_time', 0) > self.thresholds['response_time']:
            self.trigger_alert(
                model_name,
                'high_response_time',
                f"Response time too high: {request_info['response_time']:.2f}s"
            )

        # Error rate alert
        recent_requests = self.get_recent_requests(model_name, minutes=10)
        if recent_requests:
            error_rate = sum(1 for r in recent_requests if not r.get('success', True)) / len(recent_requests)
            if error_rate > self.thresholds['error_rate']:
                self.trigger_alert(
                    model_name,
                    'high_error_rate',
                    f"Error rate too high: {error_rate:.1%}"
                )

    def trigger_alert(self, model_name: str, alert_type: str, message: str):
        """Trigger alert"""

        alert_key = f"{model_name}_{alert_type}"
        current_time = time.time()

        # Avoid duplicate alerts (no repeat within 5 minutes)
        if alert_key in self.alerts:
            last_alert_time = self.alerts[alert_key]['last_triggered']
            if current_time - last_alert_time < 300:  # 5 minutes
                return

        self.alerts[alert_key] = {
            'model_name': model_name,
            'alert_type': alert_type,
            'message': message,
            'last_triggered': current_time
        }

        print(f"🚨 Alert: {model_name} - {message}")

    def get_recent_requests(self, model_name: str, minutes: int = 60) -> List[Dict]:
        """Get recent request records"""

        if model_name not in self.metrics:
            return []

        cutoff_time = time.time() - (minutes * 60)
        recent_requests = [
            req for req in self.metrics[model_name]['requests']
            if req['timestamp'] > cutoff_time
        ]

        return recent_requests

    def calculate_model_statistics(self, model_name: str, hours: int = 24) -> Dict[str, Any]:
        """Calculate model statistics"""

        recent_requests = self.get_recent_requests(model_name, minutes=hours * 60)

        if not recent_requests:
            return {'status': 'no_data'}

        # Basic statistics
        total_requests = len(recent_requests)
        successful_requests = sum(1 for r in recent_requests if r.get('success', True))
        failed_requests = total_requests - successful_requests

        # Response time statistics
        response_times = [r.get('response_time', 0) for r in recent_requests if 'response_time' in r]
        avg_response_time = sum(response_times) / len(response_times) if response_times else 0

        # Cost statistics
        total_cost = sum(r.get('cost', 0) for r in recent_requests)
        avg_cost_per_request = total_cost / total_requests if total_requests > 0 else 0

        # Token statistics
        total_tokens = sum(r.get('tokens_used', 0) for r in recent_requests)
        avg_tokens_per_request = total_tokens / total_requests if total_requests > 0 else 0

        return {
            'status': 'active',
            'time_window_hours': hours,
            'total_requests': total_requests,
            'successful_requests': successful_requests,
            'failed_requests': failed_requests,
            'success_rate': successful_requests / total_requests if total_requests > 0 else 0,
            'avg_response_time': avg_response_time,
            'total_cost': total_cost,
            'avg_cost_per_request': avg_cost_per_request,
            'total_tokens': total_tokens,
            'avg_tokens_per_request': avg_tokens_per_request
        }

    def generate_performance_report(self, model_names: List[str] = None) -> str:
        """Generate performance report"""

        if model_names is None:
            model_names = list(self.metrics.keys())

        report_lines = []
        report_lines.append("📊 Model Performance Monitoring Report")
        report_lines.append("=" * 50)

        for model_name in model_names:
            stats = self.calculate_model_statistics(model_name)

            if stats['status'] == 'no_data':
                report_lines.append(f"\n🔍 {model_name}: No data")
                continue

            report_lines.append(f"\n🤖 {model_name}:")
            report_lines.append(f"   Total requests: {stats['total_requests']}")
            report_lines.append(f"   Success rate: {stats['success_rate']:.1%}")
            report_lines.append(f"   Avg response time: {stats['avg_response_time']:.2f}s")
            report_lines.append(f"   Total cost: ${stats['total_cost']:.4f}")
            report_lines.append(f"   Avg cost per request: ${stats['avg_cost_per_request']:.4f}")
            report_lines.append(f"   Avg tokens: {stats['avg_tokens_per_request']:.0f}")

        # Add active alerts
        if self.alerts:
            report_lines.append(f"\n🚨 Active Alerts:")
            for alert_key, alert_info in self.alerts.items():
                report_lines.append(f"   {alert_info['model_name']}: {alert_info['message']}")

        return "\n".join(report_lines)

class AdaptiveModelSelector:
    """Adaptive model selector"""

    def __init__(self,
                 monitor: ModelPerformanceMonitor,
                 orchestrator: MultiModelOrchestrator):

        self.monitor = monitor
        self.orchestrator = orchestrator
        self.selection_history = []
        self.learning_rate = 0.1

    def select_optimal_model(self,
                           task_type: str,
                           context: Dict[str, Any]) -> str:
        """Select optimal model based on historical performance"""

        # Get candidate models
        candidate_models = self.get_candidate_models(task_type)

        if not candidate_models:
            return "gpt-3.5"  # Default model

        # Calculate composite score for each model
        model_scores = {}

        for model_name in candidate_models:
            score = self.calculate_model_score(model_name, context)
            model_scores[model_name] = score

        # Select model with highest score
        best_model = max(model_scores, key=model_scores.get)

        # Record selection history
        selection_record = {
            'timestamp': time.time(),
            'task_type': task_type,
            'context': context,
            'selected_model': best_model,
            'candidate_models': candidate_models,
            'model_scores': model_scores
        }

        self.selection_history.append(selection_record)

        print(f"🎯 Adaptive selection: {task_type} -> {best_model}")
        print(f"   Candidate model scores: {model_scores}")

        return best_model

    def get_candidate_models(self, task_type: str) -> List[str]:
        """Get candidate models for task type"""

        candidate_models = []

        for model_name, info in self.orchestrator.model_capabilities.items():
            if task_type in info['capabilities']:
                candidate_models.append(model_name)

        return candidate_models

    def calculate_model_score(self,
                             model_name: str,
                             context: Dict[str, Any]) -> float:
        """Calculate model composite score"""

        # Get historical performance statistics
        stats = self.monitor.calculate_model_statistics(model_name, hours=24)

        if stats['status'] == 'no_data':
            return 0.5  # Default medium score

        # Performance factor (success rate, response time)
        performance_factor = (
            stats['success_rate'] * 0.4 +
            (1.0 / max(stats['avg_response_time'], 0.1)) * 0.1
        )

        # Cost factor
        cost_factor = 1.0 / (1.0 + stats['avg_cost_per_request'] * 10)

        # Task adaptation factor
        model_info = self.orchestrator.model_capabilities.get(model_name, {})
        task_type = context.get('task_type', 'general')
        task_strength = model_info.get('strengths', {}).get(task_type, 0.5)

        # Composite score
        total_score = (
            performance_factor * 0.4 +
            cost_factor * 0.3 +
            task_strength * 0.3
        )

        return total_score

    def update_model_performance(self,
                                model_name: str,
                                task_result: Dict[str, Any]):
        """Update model performance data"""

        # Extract performance metrics
        request_info = {
            'success': task_result.get('success', False),
            'response_time': task_result.get('execution_time', 0),
            'tokens_used': task_result.get('tokens_used', 0),
            'cost': task_result.get('cost', 0),
            'task_type': task_result.get('task_type', 'unknown')
        }

        # Submit to monitor
        self.monitor.track_request(model_name, request_info)

        # Learning adjustment (simple reinforcement learning)
        self.adjust_selection_strategy(model_name, task_result)

    def adjust_selection_strategy(self,
                                 model_name: str,
                                 task_result: Dict[str, Any]):
        """Adjust selection strategy"""

        # Adjust model capability assessment based on results
        success = task_result.get('success', False)
        task_type = task_result.get('task_type', 'unknown')

        if model_name in self.orchestrator.model_capabilities:
            strengths = self.orchestrator.model_capabilities[model_name]['strengths']

            if task_type in strengths:
                # Update strength assessment based on results
                current_strength = strengths[task_type]

                if success:
                    # Slightly increase score on success
                    new_strength = min(1.0, current_strength + self.learning_rate * 0.1)
                else:
                    # Slightly decrease score on failure
                    new_strength = max(0.1, current_strength - self.learning_rate * 0.1)

                strengths[task_type] = new_strength

# Usage example
def demonstrate_performance_monitoring():
    """Demonstrate performance monitoring"""

    monitor = ModelPerformanceMonitor()

    # Simulate some request data
    models_data = {
        'gpt-4': [
            {'success': True, 'response_time': 2.5, 'tokens_used': 150, 'cost': 0.045},
            {'success': True, 'response_time': 3.2, 'tokens_used': 200, 'cost': 0.06},
            {'success': False, 'response_time': 8.0, 'tokens_used': 0, 'cost': 0.0},
        ],
        'gpt-3.5': [
            {'success': True, 'response_time': 1.8, 'tokens_used': 180, 'cost': 0.0036},
            {'success': True, 'response_time': 1.5, 'tokens_used': 160, 'cost': 0.0032},
            {'success': True, 'response_time': 2.0, 'tokens_used': 190, 'cost': 0.0038},
        ]
    }

    # Submit monitoring data
    for model_name, requests in models_data.items():
        for request in requests:
            monitor.track_request(model_name, request)

    # Generate performance report
    report = monitor.generate_performance_report()
    print(report)

    return monitor

# demo_monitoring = demonstrate_performance_monitoring()

4. Model Parameter Tuning

Different tasks require different model parameter configurations to achieve optimal results.

class ModelParameterOptimizer:
    """Model parameter optimizer"""

    def __init__(self):
        self.parameter_history = {}
        self.optimization_results = {}

        # Define parameter search space
        self.parameter_space = {
            'temperature': [0.0, 0.3, 0.5, 0.7, 0.9, 1.0],
            'max_tokens': [100, 250, 500, 1000, 2000],
            'top_p': [0.8, 0.9, 0.95, 1.0],
            'frequency_penalty': [0.0, 0.1, 0.2, 0.5],
            'presence_penalty': [0.0, 0.1, 0.2, 0.5]
        }

    def optimize_parameters(self,
                           model_config_name: str,
                           task_type: str,
                           test_examples: List[dspy.Example],
                           evaluation_metric: Callable,
                           max_iterations: int = 10) -> Dict[str, Any]:
        """Optimize model parameters"""

        print(f"🔧 Starting parameter optimization: {model_config_name} - {task_type}")

        best_params = None
        best_score = 0.0
        optimization_history = []

        # Use grid search for parameter optimization
        for iteration in range(max_iterations):
            print(f"\n🔄 Optimization iteration {iteration + 1}/{max_iterations}")

            # Generate parameter combination
            if iteration == 0:
                # First time use default parameters
                params = self.get_default_parameters()
            else:
                # Subsequent iterations use random search or Bayesian optimization
                params = self.generate_parameter_combination(
                    best_params if best_params else {}
                )

            print(f"📋 Testing parameters: {params}")

            # Evaluate parameter combination
            score = self.evaluate_parameter_combination(
                model_config_name,
                params,
                test_examples,
                evaluation_metric
            )

            optimization_history.append({
                'iteration': iteration + 1,
                'parameters': params.copy(),
                'score': score
            })

            print(f"📊 Score: {score:.3f}")

            # Update best parameters
            if score > best_score:
                best_score = score
                best_params = params.copy()
                print(f"🏆 Better parameters found! Score: {score:.3f}")

        # Save optimization results
        optimization_key = f"{model_config_name}_{task_type}"
        self.optimization_results[optimization_key] = {
            'best_parameters': best_params,
            'best_score': best_score,
            'optimization_history': optimization_history,
            'total_iterations': max_iterations
        }

        print(f"\n✅ Parameter optimization completed")
        print(f"🎯 Best score: {best_score:.3f}")
        print(f"🔧 Best parameters: {best_params}")

        return self.optimization_results[optimization_key]

    def get_default_parameters(self) -> Dict[str, Any]:
        """Get default parameters"""
        return {
            'temperature': 0.7,
            'max_tokens': 500,
            'top_p': 0.9,
            'frequency_penalty': 0.0,
            'presence_penalty': 0.0
        }

    def generate_parameter_combination(self,
                                      base_params: Dict[str, Any] = None) -> Dict[str, Any]:
        """Generate parameter combination"""
        import random

        if base_params is None:
            base_params = self.get_default_parameters()

        new_params = base_params.copy()

        # Randomly select 1-2 parameters to adjust
        params_to_adjust = random.sample(
            list(self.parameter_space.keys()),
            random.randint(1, 2)
        )

        for param_name in params_to_adjust:
            if param_name in self.parameter_space:
                new_params[param_name] = random.choice(
                    self.parameter_space[param_name]
                )

        return new_params

    def evaluate_parameter_combination(self,
                                      model_config_name: str,
                                      parameters: Dict[str, Any],
                                      test_examples: List[dspy.Example],
                                      evaluation_metric: Callable) -> float:
        """Evaluate parameter combination"""

        # Create parameterized model configuration
        test_model = self.create_parameterized_model(model_config_name, parameters)

        # Temporarily set model
        original_model = dspy.settings.lm
        dspy.settings.configure(lm=test_model)

        try:
            scores = []

            # Evaluate on test samples
            for example in test_examples[:20]:  # Limit test sample count
                try:
                    # Create simple test program
                    predictor = dspy.Predict("question -> answer")
                    prediction = predictor(**example.inputs())

                    score = evaluation_metric(example, prediction)
                    scores.append(float(score))

                except Exception as e:
                    print(f"⚠️ Sample evaluation failed: {e}")
                    scores.append(0.0)

            # Calculate average score
            average_score = sum(scores) / len(scores) if scores else 0.0

        finally:
            # Restore original model
            dspy.settings.configure(lm=original_model)

        return average_score

    def create_parameterized_model(self,
                                   model_config_name: str,
                                   parameters: Dict[str, Any]):
        """Create parameterized model"""

        # This needs to create model based on actual model configuration system
        # For demonstration purposes, return a mock model

        class ParameterizedModel(dspy.LM):
            def __init__(self, config_name, params):
                self.config_name = config_name
                self.parameters = params

            def basic_request(self, prompt, **kwargs):
                # Simulate parameterized request
                return [f"Parameterized response (temp={self.parameters.get('temperature', 0.7)}): {prompt[:30]}..."]

            def __call__(self, prompt, **kwargs):
                return self.basic_request(prompt, **kwargs)

        return ParameterizedModel(model_config_name, parameters)

    def get_optimized_parameters(self,
                                model_config_name: str,
                                task_type: str) -> Dict[str, Any]:
        """Get optimized parameters"""

        optimization_key = f"{model_config_name}_{task_type}"

        if optimization_key in self.optimization_results:
            return self.optimization_results[optimization_key]['best_parameters']

        return self.get_default_parameters()

    def apply_optimized_parameters(self,
                                  model_config: BaseModelConfig,
                                  task_type: str):
        """Apply optimized parameters to model configuration"""

        optimized_params = self.get_optimized_parameters(
            model_config.model_name,
            task_type
        )

        # Update configuration parameters
        for param_name, param_value in optimized_params.items():
            if hasattr(model_config, param_name):
                setattr(model_config, param_name, param_value)
            else:
                model_config.additional_params[param_name] = param_value

        print(f"✅ Optimized parameters applied to {model_config.model_name}")

class TaskSpecificOptimizer:
    """Task-specific optimizer"""

    def __init__(self, parameter_optimizer: ModelParameterOptimizer):
        self.parameter_optimizer = parameter_optimizer
        self.task_templates = {}

    def register_task_template(self,
                              task_type: str,
                              template_config: Dict[str, Any]):
        """Register task template"""

        self.task_templates[task_type] = template_config
        print(f"📝 Task template registered: {task_type}")

    def optimize_for_task(self,
                         model_name: str,
                         task_type: str,
                         training_data: List[dspy.Example]) -> Dict[str, Any]:
        """Optimize model for specific task"""

        print(f"🎯 Optimizing model for task: {task_type}")

        # Get task template
        if task_type in self.task_templates:
            template = self.task_templates[task_type]
            base_params = template.get('base_parameters', {})

            print(f"📋 Using task template: {base_params}")
        else:
            base_params = {}

        # Define evaluation metric
        def task_evaluation_metric(example, prediction):
            if task_type == 'summarization':
                return self.evaluate_summarization(example, prediction)
            elif task_type == 'question_answering':
                return self.evaluate_qa(example, prediction)
            elif task_type == 'creative_writing':
                return self.evaluate_creativity(example, prediction)
            else:
                return self.evaluate_general(example, prediction)

        # Execute parameter optimization
        optimization_result = self.parameter_optimizer.optimize_parameters(
            model_name,
            task_type,
            training_data,
            task_evaluation_metric,
            max_iterations=5  # Reduce iterations to save time
        )

        return optimization_result

    def evaluate_summarization(self, example, prediction) -> float:
        """Evaluate summarization quality"""
        # Simplified summarization evaluation
        expected = getattr(example, 'summary', '') or getattr(example, 'answer', '')
        actual = getattr(prediction, 'answer', '') or str(prediction)

        # Moderate length score
        length_score = 0.5
        if 50 <= len(actual) <= 200:
            length_score = 1.0
        elif len(actual) < 20 or len(actual) > 400:
            length_score = 0.0

        # Content similarity score (simplified)
        expected_words = set(expected.lower().split())
        actual_words = set(actual.lower().split())

        if expected_words and actual_words:
            overlap = len(expected_words & actual_words)
            union = len(expected_words | actual_words)
            similarity_score = overlap / union if union > 0 else 0.0
        else:
            similarity_score = 0.0

        return (length_score * 0.3 + similarity_score * 0.7)

    def evaluate_qa(self, example, prediction) -> float:
        """Evaluate question answering quality"""
        expected = getattr(example, 'answer', '')
        actual = getattr(prediction, 'answer', '') or str(prediction)

        # Simple containment match
        if expected.lower() in actual.lower() or actual.lower() in expected.lower():
            return 1.0
        else:
            return 0.0

    def evaluate_creativity(self, example, prediction) -> float:
        """Evaluate creative writing quality"""
        actual = getattr(prediction, 'answer', '') or str(prediction)

        # Length score
        length_score = min(len(actual) / 500, 1.0)  # Encourage longer creative content

        # Vocabulary diversity score
        words = actual.lower().split()
        unique_words = len(set(words))
        diversity_score = unique_words / len(words) if words else 0.0

        return (length_score * 0.4 + diversity_score * 0.6)

    def evaluate_general(self, example, prediction) -> float:
        """General evaluation"""
        expected = getattr(example, 'answer', '') or getattr(example, 'output', '')
        actual = getattr(prediction, 'answer', '') or str(prediction)

        if not expected or not actual:
            return 0.5

        # Simple similarity evaluation
        expected_words = set(expected.lower().split())
        actual_words = set(actual.lower().split())

        if expected_words and actual_words:
            overlap = len(expected_words & actual_words)
            return overlap / max(len(expected_words), len(actual_words))

        return 0.0

# Usage example
def demonstrate_parameter_optimization():
    """Demonstrate parameter optimization"""

    optimizer = ModelParameterOptimizer()
    task_optimizer = TaskSpecificOptimizer(optimizer)

    # Register task template
    task_optimizer.register_task_template(
        'summarization',
        {
            'base_parameters': {
                'temperature': 0.3,
                'max_tokens': 200
            },
            'evaluation_criteria': ['length', 'coherence', 'coverage']
        }
    )

    # Create test data
    test_examples = [
        dspy.Example(
            text="Artificial intelligence is a branch of computer science...",
            summary="AI is a computer science branch"
        ).with_inputs('text'),
        dspy.Example(
            text="Machine learning allows computers to learn from data through algorithms...",
            summary="Machine learning enables computers to learn from data"
        ).with_inputs('text')
    ]

    # Optimize parameters for summarization task
    result = task_optimizer.optimize_for_task(
        'gpt-3.5',
        'summarization',
        test_examples
    )

    print(f"\n🎊 Optimization results:")
    print(f"Best parameters: {result['best_parameters']}")
    print(f"Best score: {result['best_score']:.3f}")

    return optimizer, task_optimizer

# demo_optimization = demonstrate_parameter_optimization()

Practice Exercises

Exercise 1: Implement Custom Model Adapter

class CustomModelAdapter:
    """Custom model adapter exercise"""

    def __init__(self, model_endpoint: str):
        self.model_endpoint = model_endpoint

    def adapt_custom_api(self):
        """Adapt custom API"""
        # TODO: Implement custom API adaptation
        pass

    def implement_retry_logic(self):
        """Implement retry logic"""
        # TODO: Implement smart retry mechanism
        pass

    def add_rate_limiting(self):
        """Add rate limiting"""
        # TODO: Implement API call rate control
        pass

# Exercise tasks:
# 1. Choose a third-party API and implement DSPy adapter
# 2. Add error handling and retry mechanism
# 3. Implement request rate limiting

Exercise 2: Build Model Performance Benchmark

class ModelBenchmark:
    """Model performance benchmark exercise"""

    def __init__(self):
        self.benchmark_suites = {}
        self.results = {}

    def create_benchmark_suite(self, suite_name: str, tasks: List[Dict]):
        """Create benchmark suite"""
        # TODO: Implement benchmark suite
        pass

    def run_benchmark(self, models: List[str], suite_name: str):
        """Run benchmark"""
        # TODO: Implement automated benchmarking
        pass

    def generate_comparison_report(self):
        """Generate comparison report"""
        # TODO: Implement detailed performance comparison report
        pass

# Exercise tasks:
# 1. Design multi-dimensional performance evaluation metrics
# 2. Implement automated benchmark testing process
# 3. Generate visualized performance comparison report

Best Practices

1. Model Selection Strategy

def model_selection_guidelines():
    """Model selection guidelines"""

    guidelines = {
        'Task Complexity Matching': [
            'Use lightweight models for simple tasks',
            'Use high-performance models for complex reasoning',
            'Choose generation-focused models for creative tasks'
        ],

        'Cost-Benefit Optimization': [
            'Select appropriate models based on budget constraints',
            'Implement smart caching to reduce duplicate calls',
            'Use traffic allocation to optimize costs'
        ],

        'Performance-Latency Balance': [
            'Prioritize response speed for real-time applications',
            'Prioritize accuracy for batch processing tasks',
            'Implement multi-tier fallback strategies'
        ],

        'Reliability Assurance': [
            'Set up multiple model alternatives',
            'Implement health checks and monitoring',
            'Establish exception handling mechanisms'
        ]
    }

    return guidelines

class ModelGovernance:
    """Model governance framework"""

    def __init__(self):
        self.policies = {}
        self.compliance_checks = []
        self.audit_logs = []

    def define_usage_policy(self, policy_name: str, rules: Dict[str, Any]):
        """Define usage policy"""
        self.policies[policy_name] = {
            'rules': rules,
            'created_at': time.time(),
            'active': True
        }

    def enforce_policies(self, request_context: Dict[str, Any]) -> bool:
        """Enforce policy checks"""
        for policy_name, policy in self.policies.items():
            if policy['active']:
                if not self.check_policy_compliance(request_context, policy['rules']):
                    self.log_policy_violation(policy_name, request_context)
                    return False
        return True

    def check_policy_compliance(self, context: Dict, rules: Dict) -> bool:
        """Check policy compliance"""
        # Implement specific policy check logic
        return True

    def log_policy_violation(self, policy_name: str, context: Dict):
        """Log policy violation"""
        violation_log = {
            'timestamp': time.time(),
            'policy': policy_name,
            'context': context,
            'action': 'blocked'
        }
        self.audit_logs.append(violation_log)

2. Security and Privacy Protection

class ModelSecurityManager:
    """Model security manager"""

    def __init__(self):
        self.security_policies = {}
        self.data_filters = []
        self.audit_enabled = True

    def add_input_filter(self, filter_func: Callable[[str], bool]):
        """Add input filter"""
        self.data_filters.append(filter_func)

    def sanitize_input(self, user_input: str) -> str:
        """Sanitize user input"""
        sanitized_input = user_input

        # Remove sensitive information
        import re

        # Remove potential injection attempts
        injection_patterns = [
            r'<script.*?</script>',
            r'javascript:',
            r'eval\s*\(',
            r'exec\s*\('
        ]

        for pattern in injection_patterns:
            sanitized_input = re.sub(pattern, '', sanitized_input, flags=re.IGNORECASE)

        # Apply custom filters
        for filter_func in self.data_filters:
            if not filter_func(sanitized_input):
                raise ValueError("Input failed security check")

        return sanitized_input

    def protect_sensitive_output(self, model_output: str) -> str:
        """Protect sensitive output"""
        protected_output = model_output

        # Remove potential sensitive information
        sensitive_patterns = [
            (r'\b\d{4}[-\s]?\d{4}[-\s]?\d{4}[-\s]?\d{4}\b', '****-****-****-****'),  # Credit card
            (r'\b\d{3}-\d{2}-\d{4}\b', '***-**-****'),  # SSN
            (r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', '****@****.***')  # Email
        ]

        for pattern, replacement in sensitive_patterns:
            protected_output = re.sub(pattern, replacement, protected_output)

        return protected_output

    def audit_request(self, request_info: Dict[str, Any]):
        """Audit request"""
        if self.audit_enabled:
            audit_entry = {
                'timestamp': time.time(),
                'user_id': request_info.get('user_id', 'anonymous'),
                'model_used': request_info.get('model', 'unknown'),
                'input_length': len(request_info.get('input', '')),
                'output_length': len(request_info.get('output', '')),
                'success': request_info.get('success', False)
            }

            # Log to audit trail
            print(f"📝 Audit record: {audit_entry}")

Through this chapter, you should have mastered how to integrate and manage multiple language models in DSPy. These skills can help you build more flexible, efficient, and reliable AI application systems.