Chapter 06: DSPy Data Processing and Evaluation

Haiyue
62min

Chapter 06: DSPy Data Processing and Evaluation

Learning Objectives
  • Master dataset processing methods in DSPy
  • Learn the definition and use of evaluation metrics
  • Implement custom evaluation functions
  • Conduct comparative analysis of model performance
  • Design an A/B testing framework

Key Concepts

1. DSPy Dataset Processing

DSPy provides powerful data processing tools capable of handling and preprocessing datasets in various formats.

Basic Dataset Operations

import dspy
import json
import pandas as pd
from typing import List, Dict, Any, Optional, Union
import random
import numpy as np
from pathlib import Path

class DSPyDataProcessor:
    """DSPy Data Processor"""
    
    def __init__(self):
        self.processed_datasets = {}
        self.statistics = {}
    
    def create_examples_from_dict(self, data_list: List[Dict]) -> List[dspy.Example]:
        """Create DSPy Examples from a list of dictionaries"""
        examples = []
        
        for item in data_list:
            # Create an Example object
            example = dspy.Example(**item)
            examples.append(example)
        
        return examples
    
    def create_examples_from_csv(self, 
                                csv_path: str,
                                input_columns: List[str],
                                output_columns: List[str]) -> List[dspy.Example]:
        """Create DSPy Examples from a CSV file"""
        
        df = pd.read_csv(csv_path)
        examples = []
        
        for _, row in df.iterrows():
            # Build example data
            example_data = {}
            
            # Add input fields
            for col in input_columns:
                if col in df.columns:
                    example_data[col] = row[col]
            
            # Add output fields
            for col in output_columns:
                if col in df.columns:
                    example_data[col] = row[col]
            
            # Create an Example and set the input fields
            example = dspy.Example(**example_data)
            example = example.with_inputs(*input_columns)
            
            examples.append(example)
        
        print(f"📊 Loaded {len(examples)} samples from CSV")
        return examples
    
    def create_examples_from_json(self, 
                                 json_path: str,
                                 input_key: str = "inputs",
                                 output_key: str = "outputs") -> List[dspy.Example]:
        """Create DSPy Examples from a JSON file"""
        
        with open(json_path, 'r', encoding='utf-8') as f:
            data = json.load(f)
        
        examples = []
        
        if isinstance(data, list):
            for item in data:
                example_data = {}
                
                # Process inputs
                if input_key in item:
                    if isinstance(item[input_key], dict):
                        example_data.update(item[input_key])
                    else:
                        example_data['input'] = item[input_key]
                
                # Process outputs
                if output_key in item:
                    if isinstance(item[output_key], dict):
                        example_data.update(item[output_key])
                    else:
                        example_data['output'] = item[output_key]
                
                # Process other fields
                for key, value in item.items():
                    if key not in [input_key, output_key]:
                        example_data[key] = value
                
                example = dspy.Example(**example_data)
                examples.append(example)
        
        print(f"📊 Loaded {len(examples)} samples from JSON")
        return examples
    
    def split_dataset(self, 
                     examples: List[dspy.Example],
                     train_ratio: float = 0.7,
                     dev_ratio: float = 0.15,
                     test_ratio: float = 0.15,
                     random_seed: int = 42) -> Dict[str, List[dspy.Example]]:
        """Split the dataset"""
        
        # Validate ratios
        total_ratio = train_ratio + dev_ratio + test_ratio
        if abs(total_ratio - 1.0) > 1e-6:
            raise ValueError("The sum of train, dev, and test ratios must be 1.0")
        
        # Set random seed
        random.seed(random_seed)
        
        # Shuffle the data randomly
        shuffled_examples = examples.copy()
        random.shuffle(shuffled_examples)
        
        # Calculate split points
        total_size = len(shuffled_examples)
        train_size = int(total_size * train_ratio)
        dev_size = int(total_size * dev_ratio)
        
        # Split the data
        train_set = shuffled_examples[:train_size]
        dev_set = shuffled_examples[train_size:train_size + dev_size]
        test_set = shuffled_examples[train_size + dev_size:]
        
        split_info = {
            'train': train_set,
            'dev': dev_set,
            'test': test_set
        }
        
        # Print split information
        print(f"📊 Dataset splitting completed:")
        print(f"   Training set: {len(train_set)} samples ({len(train_set)/total_size:.1%})")
        print(f"   Validation set: {len(dev_set)} samples ({len(dev_set)/total_size:.1%})")
        print(f"   Test set: {len(test_set)} samples ({len(test_set)/total_size:.1%})")
        
        return split_info
    
    def analyze_dataset(self, examples: List[dspy.Example], name: str = "dataset") -> Dict:
        """Analyze dataset statistics"""
        
        if not examples:
            return {'error': 'Empty dataset'}
        
        # Basic statistics
        stats = {
            'name': name,
            'total_samples': len(examples),
            'field_stats': {},
            'data_quality': {}
        }
        
        # Get all fields
        all_fields = set()
        for example in examples:
            all_fields.update(example.__dict__.keys())
        
        # Analyze each field
        for field in all_fields:
            field_values = []
            non_null_count = 0
            
            for example in examples:
                value = getattr(example, field, None)
                if value is not None and value != '':
                    field_values.append(value)
                    non_null_count += 1
            
            # Field statistics
            field_stat = {
                'total_count': len(examples),
                'non_null_count': non_null_count,
                'null_ratio': (len(examples) - non_null_count) / len(examples),
                'data_type': self.detect_data_type(field_values)
            }
            
            # Additional statistics for text fields
            if field_stat['data_type'] == 'text':
                if field_values:
                    lengths = [len(str(v)) for v in field_values]
                    field_stat.update({
                        'avg_length': sum(lengths) / len(lengths),
                        'min_length': min(lengths),
                        'max_length': max(lengths),
                        'total_chars': sum(lengths)
                    })
            
            stats['field_stats'][field] = field_stat
        
        # Data quality check
        stats['data_quality'] = self.check_data_quality(examples)
        
        self.statistics[name] = stats
        
        # Print summary of statistics
        self.print_dataset_summary(stats)
        
        return stats
    
    def detect_data_type(self, values: List) -> str:
        """Detect data type"""
        if not values:
            return 'unknown'
        
        sample_values = values[:100]  # Sample for detection
        
        # Check for numeric types
        numeric_count = 0
        for value in sample_values:
            try:
                float(str(value))
                numeric_count += 1
            except (ValueError, TypeError):
                pass
        
        if numeric_count / len(sample_values) > 0.8:
            return 'numeric'
        
        # Check for text types
        return 'text'
    
    def check_data_quality(self, examples: List[dspy.Example]) -> Dict:
        """Check data quality"""
        quality_issues = {
            'duplicate_count': 0,
            'missing_inputs': 0,
            'empty_outputs': 0,
            'unusual_lengths': 0
        }
        
        # Check for duplicates
        seen_inputs = set()
        for example in examples:
            input_str = str(example.inputs() if hasattr(example, 'inputs') else example.__dict__)
            if input_str in seen_inputs:
                quality_issues['duplicate_count'] += 1
            else:
                seen_inputs.add(input_str)
        
        # Check for missing and unusual values
        for example in examples:
            # Check for missing inputs
            try:
                inputs = example.inputs()
                if not any(inputs.values()):
                    quality_issues['missing_inputs'] += 1
            except:
                quality_issues['missing_inputs'] += 1
            
            # Check for empty outputs
            for key, value in example.__dict__.items():
                if 'answer' in key.lower() or 'output' in key.lower():
                    if not value or len(str(value).strip()) == 0:
                        quality_issues['empty_outputs'] += 1
                        break
            
            # Check for unusual lengths
            for key, value in example.__dict__.items():
                if isinstance(value, str):
                    if len(value) > 10000 or len(value) < 2:  # Too long or too short
                        quality_issues['unusual_lengths'] += 1
                        break
        
        return quality_issues
    
    def print_dataset_summary(self, stats: Dict):
        """Print dataset summary"""
        print(f"\n📋 Dataset '{stats['name']}' Analysis Report")
        print("=" * 50)
        print(f"Total samples: {stats['total_samples']}")
        
        print(f"\n📊 Field Statistics:")
        for field, field_stat in stats['field_stats'].items():
            print(f"  {field}:")
            print(f"    Type: {field_stat['data_type']}")
            print(f"    Non-null rate: {(1-field_stat['null_ratio']):.1%}")
            
            if field_stat['data_type'] == 'text' and 'avg_length' in field_stat:
                print(f"    Average length: {field_stat['avg_length']:.1f} characters")
        
        print(f"\n🔍 Data Quality:")
        quality = stats['data_quality']
        print(f"  Duplicate samples: {quality['duplicate_count']}")
        print(f"  Missing inputs: {quality['missing_inputs']}")
        print(f"  Empty outputs: {quality['empty_outputs']}")
        print(f"  Unusual lengths: {quality['unusual_lengths']}")

# Data Augmentation Tool
class DataAugmentor:
    """Data Augmentor"""
    
    def __init__(self):
        self.augmentation_methods = {
            'paraphrase': self.paraphrase_augmentation,
            'synonym': self.synonym_replacement,
            'back_translation': self.back_translation,
            'template': self.template_based_augmentation
        }
    
    def augment_dataset(self, 
                       examples: List[dspy.Example], 
                       methods: List[str] = ['paraphrase'],
                       target_ratio: float = 1.5) -> List[dspy.Example]:
        """Augment the dataset"""
        
        original_size = len(examples)
        target_size = int(original_size * target_ratio)
        augmented_examples = examples.copy()
        
        print(f"🔄 Starting data augmentation: {original_size} -> {target_size}")
        
        while len(augmented_examples) < target_size:
            for method in methods:
                if method in self.augmentation_methods:
                    # Randomly select an original sample for augmentation
                    original_example = random.choice(examples)
                    augmented_example = self.augmentation_methods[method](original_example)
                    
                    if augmented_example:
                        augmented_examples.append(augmented_example)
                        
                        if len(augmented_examples) >= target_size:
                            break
        
        print(f"✅ Data augmentation completed: {len(augmented_examples)} samples")
        return augmented_examples[:target_size]
    
    def paraphrase_augmentation(self, example: dspy.Example) -> Optional[dspy.Example]:
        """Paraphrase augmentation"""
        # Use DSPy for paraphrasing
        paraphraser = dspy.ChainOfThought(
            "original_text -> reasoning, paraphrased_text",
            instructions="Paraphrase the given text, maintaining the original meaning but changing the expression."
        )
        
        try:
            # Select a text field to paraphrase
            text_fields = []
            for key, value in example.__dict__.items():
                if isinstance(value, str) and len(value) > 10:
                    text_fields.append((key, value))
            
            if not text_fields:
                return None
            
            field_name, original_text = random.choice(text_fields)
            
            # Generate paraphrase
            result = paraphraser(original_text=original_text)
            
            # Create a new example
            new_example_data = example.__dict__.copy()
            new_example_data[field_name] = result.paraphrased_text
            
            new_example = dspy.Example(**new_example_data)
            
            # Maintain input/output settings
            if hasattr(example, '_input_keys'):
                new_example = new_example.with_inputs(*example._input_keys)
            
            return new_example
            
        except Exception as e:
            print(f"Paraphrase augmentation failed: {e}")
            return None
    
    def synonym_replacement(self, example: dspy.Example) -> Optional[dspy.Example]:
        """Synonym replacement"""
        # Simplified implementation of synonym replacement
        synonym_dict = {
            'good': ['nice', 'excellent', 'favorable'],
            'problem': ['issue', 'challenge', 'matter'],
            'method': ['way', 'approach', 'technique'],
            'important': ['critical', 'significant', 'vital'],
            'simple': ['easy', 'straightforward', 'convenient']
        }
        
        try:
            new_example_data = example.__dict__.copy()
            
            # Perform synonym replacement on text fields
            for key, value in new_example_data.items():
                if isinstance(value, str):
                    modified_text = value
                    for original, synonyms in synonym_dict.items():
                        if original in modified_text:
                            replacement = random.choice(synonyms)
                            modified_text = modified_text.replace(original, replacement, 1)
                    new_example_data[key] = modified_text
            
            new_example = dspy.Example(**new_example_data)
            
            if hasattr(example, '_input_keys'):
                new_example = new_example.with_inputs(*example._input_keys)
            
            return new_example
            
        except Exception as e:
            print(f"Synonym replacement failed: {e}")
            return None
    
    def back_translation(self, example: dspy.Example) -> Optional[dspy.Example]:
        """Back-translation augmentation"""
        # Simplified implementation, requires an external translation API in practice
        print("Back-translation requires an external translation service, skipping here")
        return None
    
    def template_based_augmentation(self, example: dspy.Example) -> Optional[dspy.Example]:
        """Template-based augmentation"""
        templates = {
            'question': [
                "Please tell me about {}",
                "Can you tell me about {}",
                "I want to know about {}",
                "What is {}?"
            ]
        }
        
        try:
            new_example_data = example.__dict__.copy()
            
            # Find the question field and apply a template
            for key, value in new_example_data.items():
                if 'question' in key.lower() and isinstance(value, str):
                    # Extract core content (simplified)
                    core_content = value.replace('What is', '').replace('?', '').replace('?', '')
                    if core_content.strip():
                        template = random.choice(templates['question'])
                        new_example_data[key] = template.format(core_content.strip())
            
            new_example = dspy.Example(**new_example_data)
            
            if hasattr(example, '_input_keys'):
                new_example = new_example.with_inputs(*example._input_keys)
            
            return new_example
            
        except Exception as e:
            print(f"Template augmentation failed: {e}")
            return None

# Usage example
def demonstrate_data_processing():
    """Demonstrate data processing functionalities"""
    
    # Create sample data
    sample_data = [
        {
            'question': 'What is machine learning?',
            'answer': 'Machine learning is a branch of artificial intelligence that allows computers to learn from data.',
            'category': 'AI',
            'difficulty': 'basic'
        },
        {
            'question': 'What are the characteristics of deep learning?',
            'answer': 'Deep learning uses multi-layered neural networks to automatically extract features.',
            'category': 'AI',
            'difficulty': 'intermediate'
        },
        {
            'question': 'How to optimize a neural network?',
            'answer': 'Neural networks can be optimized by adjusting the learning rate, using regularization, batch normalization, etc.',
            'category': 'AI',
            'difficulty': 'advanced'
        }
    ]
    
    # Initialize data processor
    processor = DSPyDataProcessor()
    
    # Create Examples
    examples = processor.create_examples_from_dict(sample_data)
    
    # Set inputs/outputs
    examples = [ex.with_inputs('question') for ex in examples]
    
    # Analyze the dataset
    stats = processor.analyze_dataset(examples, "Sample Dataset")
    
    # Split the dataset
    splits = processor.split_dataset(examples, train_ratio=0.6, dev_ratio=0.2, test_ratio=0.2)
    
    # Data augmentation
    augmentor = DataAugmentor()
    augmented_train = augmentor.augment_dataset(
        splits['train'], 
        methods=['paraphrase', 'synonym'], 
        target_ratio=2.0
    )
    
    return {
        'original_examples': examples,
        'statistics': stats,
        'splits': splits,
        'augmented_train': augmented_train
    }

# Run the demonstration
# demo_results = demonstrate_data_processing()

2. Evaluation Metric System

DSPy supports multiple evaluation metrics for comprehensive model performance assessment.

class DSPyEvaluationSystem:
    """DSPy Evaluation System"""
    
    def __init__(self):
        self.metrics = {}
        self.evaluation_results = {}
        
        # Register basic metrics
        self.register_basic_metrics()
    
    def register_basic_metrics(self):
        """Register basic evaluation metrics"""
        
        # Exact match
        self.metrics['exact_match'] = self.exact_match_metric
        
        # Contains match
        self.metrics['contains_match'] = self.contains_match_metric
        
        # Semantic similarity
        self.metrics['semantic_similarity'] = self.semantic_similarity_metric
        
        # BLEU score
        self.metrics['bleu_score'] = self.bleu_score_metric
        
        # ROUGE score
        self.metrics['rouge_score'] = self.rouge_score_metric
        
        # Custom score
        self.metrics['custom_score'] = self.custom_score_metric
    
    def exact_match_metric(self, example, prediction, trace=None) -> bool:
        """Exact match metric"""
        try:
            expected = getattr(example, 'answer', '') or getattr(example, 'output', '')
            actual = getattr(prediction, 'answer', '') or getattr(prediction, 'output', '')
            
            return str(expected).strip().lower() == str(actual).strip().lower()
        except Exception:
            return False
    
    def contains_match_metric(self, example, prediction, trace=None) -> bool:
        """Contains match metric"""
        try:
            expected = getattr(example, 'answer', '') or getattr(example, 'output', '')
            actual = getattr(prediction, 'answer', '') or getattr(prediction, 'output', '')
            
            expected_lower = str(expected).strip().lower()
            actual_lower = str(actual).strip().lower()
            
            return expected_lower in actual_lower or actual_lower in expected_lower
        except Exception:
            return False
    
    def semantic_similarity_metric(self, example, prediction, trace=None) -> float:
        """Semantic similarity metric"""
        try:
            expected = str(getattr(example, 'answer', '') or getattr(example, 'output', ''))
            actual = str(getattr(prediction, 'answer', '') or getattr(prediction, 'output', ''))
            
            # Simplified semantic similarity calculation (use more complex methods in practice)
            expected_words = set(expected.lower().split())
            actual_words = set(actual.lower().split())
            
            if not expected_words and not actual_words:
                return 1.0
            elif not expected_words or not actual_words:
                return 0.0
            
            intersection = len(expected_words.intersection(actual_words))
            union = len(expected_words.union(actual_words))
            
            return intersection / union if union > 0 else 0.0
            
        except Exception:
            return 0.0
    
    def bleu_score_metric(self, example, prediction, trace=None) -> float:
        """BLEU score metric"""
        try:
            from nltk.translate.bleu_score import sentence_bleu
            from nltk.tokenize import word_tokenize
            
            expected = str(getattr(example, 'answer', '') or getattr(example, 'output', ''))
            actual = str(getattr(prediction, 'answer', '') or getattr(prediction, 'output', ''))
            
            reference = [word_tokenize(expected.lower())]
            candidate = word_tokenize(actual.lower())
            
            score = sentence_bleu(reference, candidate)
            return score
            
        except ImportError:
            print("NLTK not installed, using simplified BLEU calculation")
            return self.simple_bleu(example, prediction)
        except Exception:
            return 0.0
    
    def simple_bleu(self, example, prediction) -> float:
        """Simplified BLEU calculation"""
        try:
            expected = str(getattr(example, 'answer', '') or getattr(example, 'output', ''))
            actual = str(getattr(prediction, 'answer', '') or getattr(prediction, 'output', ''))
            
            expected_words = expected.lower().split()
            actual_words = actual.lower().split()
            
            if not actual_words:
                return 0.0
            
            # 1-gram precision
            matches = sum(1 for word in actual_words if word in expected_words)
            precision = matches / len(actual_words)
            
            # Brevity penalty
            length_penalty = min(1.0, len(expected_words) / len(actual_words))
            
            return precision * length_penalty
            
        except Exception:
            return 0.0
    
    def rouge_score_metric(self, example, prediction, trace=None) -> float:
        """ROUGE score metric"""
        try:
            expected = str(getattr(example, 'answer', '') or getattr(example, 'output', ''))
            actual = str(getattr(prediction, 'answer', '') or getattr(prediction, 'output', ''))
            
            expected_words = expected.lower().split()
            actual_words = actual.lower().split()
            
            if not expected_words:
                return 1.0 if not actual_words else 0.0
            
            # ROUGE-1 (unigram overlap)
            overlap = sum(1 for word in expected_words if word in actual_words)
            recall = overlap / len(expected_words)
            precision = overlap / len(actual_words) if actual_words else 0.0
            
            # F1-score
            if recall + precision == 0:
                return 0.0
            
            f1 = 2 * (recall * precision) / (recall + precision)
            return f1
            
        except Exception:
            return 0.0
    
    def custom_score_metric(self, example, prediction, trace=None) -> float:
        """Custom scoring metric"""
        # Use a DSPy module for intelligent scoring
        scorer = dspy.ChainOfThought(
            "question, expected_answer, actual_answer -> reasoning, score",
            instructions="""Evaluate the quality of the answer, considering the following factors:
            1. Accuracy: Is the answer correct?
            2. Completeness: Is the answer complete?
            3. Relevance: Is the answer relevant?
            4. Clarity: Is the answer clear and easy to understand?
            Provide a score from 0 to 10."""
        )
        
        try:
            question = getattr(example, 'question', '') or getattr(example, 'input', '')
            expected = getattr(example, 'answer', '') or getattr(example, 'output', '')
            actual = getattr(prediction, 'answer', '') or getattr(prediction, 'output', '')
            
            result = scorer(
                question=question,
                expected_answer=expected,
                actual_answer=actual
            )
            
            # Extract the numeric score
            import re
            score_match = re.search(r'(\d+(?:\.\d+)?)', result.score)
            if score_match:
                score = float(score_match.group(1))
                return min(max(score / 10.0, 0.0), 1.0)  # Normalize to 0-1
            
        except Exception as e:
            print(f"Custom scoring failed: {e}")
        
        return 0.5  # Default to a medium score
    
    def evaluate_program(self, 
                        program, 
                        test_examples: List[dspy.Example], 
                        metrics: List[str] = None, 
                        verbose: bool = True) -> Dict[str, Any]:
        """Evaluate program performance"""
        
        if metrics is None:
            metrics = ['exact_match', 'semantic_similarity']
        
        print(f"🔍 Starting evaluation, number of test samples: {len(test_examples)}")
        print(f"📊 Evaluation metrics: {', '.join(metrics)}")
        
        results = {
            'total_examples': len(test_examples),
            'successful_predictions': 0,
            'failed_predictions': 0,
            'metric_scores': {metric: [] for metric in metrics},
            'detailed_results': []
        }
        
        for i, example in enumerate(test_examples):
            if verbose and (i + 1) % 10 == 0:
                print(f"  Processing progress: {i + 1}/{len(test_examples)}")
            
            try:
                # Execute prediction
                prediction = program(**example.inputs())
                results['successful_predictions'] += 1
                
                # Calculate various metrics
                example_scores = {}
                for metric_name in metrics:
                    if metric_name in self.metrics:
                        score = self.metrics[metric_name](example, prediction)
                        example_scores[metric_name] = float(score)
                        results['metric_scores'][metric_name].append(float(score))
                
                # Record detailed results
                detailed_result = {
                    'example_id': i,
                    'input': example.inputs(),
                    'expected_output': {k: v for k, v in example.__dict__.items() 
                                      if k not in example.inputs()},
                    'actual_output': prediction.__dict__ if hasattr(prediction, '__dict__') 
                                   else str(prediction),
                    'scores': example_scores
                }
                
                results['detailed_results'].append(detailed_result)
                
            except Exception as e:
                results['failed_predictions'] += 1
                if verbose:
                    print(f"  Prediction failed (sample {i}): {e}")
                
                # Record failed result
                detailed_result = {
                    'example_id': i,
                    'input': example.inputs(),
                    'error': str(e),
                    'scores': {metric: 0.0 for metric in metrics}
                }
                
                results['detailed_results'].append(detailed_result)
                
                # Add a score of 0 for failed cases
                for metric_name in metrics:
                    results['metric_scores'][metric_name].append(0.0)
        
        # Calculate overall statistics
        results['summary'] = self.calculate_summary_statistics(results)
        
        # Print evaluation results
        if verbose:
            self.print_evaluation_results(results)
        
        return results
    
    def calculate_summary_statistics(self, results: Dict) -> Dict:
        """Calculate summary statistics"""
        summary = {
            'success_rate': results['successful_predictions'] / results['total_examples'],
            'metric_averages': {},
            'metric_std': {},
            'metric_ranges': {}
        }
        
        for metric_name, scores in results['metric_scores'].items():
            if scores:
                summary['metric_averages'][metric_name] = sum(scores) / len(scores)
                
                # Calculate standard deviation
                mean = summary['metric_averages'][metric_name]
                variance = sum((x - mean) ** 2 for x in scores) / len(scores)
                summary['metric_std'][metric_name] = variance ** 0.5
                
                # Calculate range
                summary['metric_ranges'][metric_name] = {
                    'min': min(scores),
                    'max': max(scores)
                }
        
        return summary
    
    def print_evaluation_results(self, results: Dict):
        """Print evaluation results"""
        print(f"\n📈 Evaluation Results Summary")
        print("=" * 50)
        
        summary = results['summary']
        
        print(f"Success rate: {summary['success_rate']:.1%} "
              f"({results['successful_predictions']}/{results['total_examples']})")
        
        print(f"\n📊 Metric Performance:")
        for metric_name, avg_score in summary['metric_averages'].items():
            std_score = summary['metric_std'][metric_name]
            range_info = summary['metric_ranges'][metric_name]
            
            print(f"  {metric_name}:")
            print(f"    Average: {avg_score:.3f}{std_score:.3f})")
            print(f"    Range: {range_info['min']:.3f} - {range_info['max']:.3f}")
    
    def compare_programs(self, 
                        programs: Dict[str, Any], 
                        test_examples: List[dspy.Example], 
                        metrics: List[str] = None) -> Dict:
        """Compare the performance of multiple programs"""
        
        print(f"🏆 Starting program performance comparison")
        print(f"Programs being compared: {list(programs.keys())}")
        
        comparison_results = {}
        
        for program_name, program in programs.items():
            print(f"\n📊 Evaluating program: {program_name}")
            
            results = self.evaluate_program(program, test_examples, metrics, verbose=False)
            comparison_results[program_name] = results
        
        # Generate comparison report
        comparison_summary = self.generate_comparison_report(comparison_results)
        
        return {
            'individual_results': comparison_results,
            'comparison_summary': comparison_summary
        }
    
    def generate_comparison_report(self, results: Dict) -> Dict:
        """Generate comparison report"""
        if not results:
            return {}
        
        metrics = list(next(iter(results.values()))['summary']['metric_averages'].keys())
        
        comparison = {
            'program_rankings': {},
            'best_program': {},
            'performance_gaps': {}
        }
        
        # Rank for each metric
        for metric in metrics:
            program_scores = []
            for program_name, program_results in results.items():
                avg_score = program_results['summary']['metric_averages'][metric]
                program_scores.append((program_name, avg_score))
            
            # Sort by score
            program_scores.sort(key=lambda x: x[1], reverse=True)
            comparison['program_rankings'][metric] = program_scores
            comparison['best_program'][metric] = program_scores[0]
            
            # Calculate performance gap
            if len(program_scores) > 1:
                best_score = program_scores[0][1]
                worst_score = program_scores[-1][1]
                comparison['performance_gaps'][metric] = best_score - worst_score
        
        # Print comparison results
        self.print_comparison_results(comparison)
        
        return comparison
    
    def print_comparison_results(self, comparison: Dict):
        """Print comparison results"""
        print(f"\n🏆 Program Performance Comparison Results")
        print("=" * 60)
        
        for metric, rankings in comparison['program_rankings'].items():
            print(f"\n📊 {metric} Rankings:")
            for i, (program_name, score) in enumerate(rankings, 1):
                print(f"  {i}. {program_name}: {score:.3f}")
            
            # Show performance gap
            if metric in comparison['performance_gaps']:
                gap = comparison['performance_gaps'][metric]
                print(f"  Performance gap: {gap:.3f}")
        
        print(f"\n🥇 Best Program for Each Metric:")
        for metric, (best_program, best_score) in comparison['best_program'].items():
            print(f"  {metric}: {best_program} ({best_score:.3f})")

# Usage example
def demonstrate_evaluation_system():
    """Demonstrate the evaluation system"""
    
    # Create test data
    test_examples = [
        dspy.Example(question="What is AI?", answer="Artificial Intelligence").with_inputs('question'),
        dspy.Example(question="What is ML?", answer="Machine Learning").with_inputs('question'),
    ]
    
    # Create mock programs
    class MockProgram1:
        def __call__(self, **kwargs):
            return dspy.Prediction(answer="Artificial intelligence is technology that simulates human intelligence")
    
    class MockProgram2:
        def __call__(self, **kwargs):
            return dspy.Prediction(answer="AI is artificial intelligence")
    
    # Initialize evaluation system
    evaluator = DSPyEvaluationSystem()
    
    # Single program evaluation
    program1 = MockProgram1()
    results = evaluator.evaluate_program(
        program1, 
        test_examples, 
        metrics=['exact_match', 'semantic_similarity', 'custom_score']
    )
    
    # Multi-program comparison
    programs = {
        'Program A': MockProgram1(),
        'Program B': MockProgram2()
    }
    
    comparison = evaluator.compare_programs(programs, test_examples)
    
    return results, comparison

# demo_eval_results = demonstrate_evaluation_system()

3. A/B Testing Framework

A/B testing is an important method for evaluating the effectiveness of different models or strategies.

import time
from datetime import datetime
import statistics
from typing import Callable

class ABTestFramework:
    """A/B Testing Framework"""
    
    def __init__(self):
        self.experiments = {}
        self.results = {}
    
    def create_experiment(self, 
                         experiment_name: str,
                         control_program,
                         treatment_program,
                         traffic_split: float = 0.5,
                         success_metrics: List[str] = None,
                         minimum_sample_size: int = 100) -> str:
        """Create an A/B test experiment"""
        
        experiment_id = f"{experiment_name}_{int(time.time())}"
        
        self.experiments[experiment_id] = {
            'name': experiment_name,
            'control_program': control_program,
            'treatment_program': treatment_program,
            'traffic_split': traffic_split,
            'success_metrics': success_metrics or ['success_rate'],
            'minimum_sample_size': minimum_sample_size,
            'start_time': datetime.now(),
            'status': 'active',
            'control_results': [],
            'treatment_results': [],
            'control_count': 0,
            'treatment_count': 0
        }
        
        print(f"✅ Created A/B test experiment: {experiment_id}")
        print(f"   Control group program: {type(control_program).__name__}")
        print(f"   Treatment group program: {type(treatment_program).__name__}")
        print(f"   Traffic split: {traffic_split:.1%} to treatment group")
        
        return experiment_id
    
    def run_single_test(self, 
                       experiment_id: str, 
                       test_example: dspy.Example,
                       evaluator: DSPyEvaluationSystem) -> Dict:
        """Run a single test"""
        
        if experiment_id not in self.experiments:
            raise ValueError(f"Experiment {experiment_id} does not exist")
        
        experiment = self.experiments[experiment_id]
        
        # Decide which program to use (traffic split)
        use_treatment = random.random() < experiment['traffic_split']
        
        program = experiment['treatment_program'] if use_treatment else experiment['control_program']
        program_type = 'treatment' if use_treatment else 'control'
        
        # Execute prediction
        start_time = time.time()
        try:
            prediction = program(**test_example.inputs())
            execution_time = time.time() - start_time
            success = True
            error = None
        except Exception as e:
            execution_time = time.time() - start_time
            prediction = None
            success = False
            error = str(e)
        
        # Evaluate results
        evaluation_scores = {}
        if success and prediction:
            for metric in experiment['success_metrics']:
                if hasattr(evaluator, 'metrics') and metric in evaluator.metrics:
                    score = evaluator.metrics[metric](test_example, prediction)
                    evaluation_scores[metric] = score
        
        # Record results
        result = {
            'timestamp': datetime.now(),
            'program_type': program_type,
            'success': success,
            'execution_time': execution_time,
            'evaluation_scores': evaluation_scores,
            'error': error,
            'example_id': id(test_example)
        }
        
        # Add to the corresponding result list
        if program_type == 'treatment':
            experiment['treatment_results'].append(result)
            experiment['treatment_count'] += 1
        else:
            experiment['control_results'].append(result)
            experiment['control_count'] += 1
        
        return result
    
    def run_batch_test(self, 
                      experiment_id: str,
                      test_examples: List[dspy.Example],
                      evaluator: DSPyEvaluationSystem,
                      verbose: bool = True) -> Dict:
        """Run a batch test"""
        
        print(f"🧪 Running A/B test: {experiment_id}")
        print(f"📊 Number of test samples: {len(test_examples)}")
        
        batch_results = []
        
        for i, example in enumerate(test_examples):
            if verbose and (i + 1) % 20 == 0:
                print(f"  Progress: {i + 1}/{len(test_examples)}")
            
            result = self.run_single_test(experiment_id, example, evaluator)
            batch_results.append(result)
        
        # Check if the minimum sample size has been reached
        experiment = self.experiments[experiment_id]
        total_samples = experiment['control_count'] + experiment['treatment_count']
        
        if total_samples >= experiment['minimum_sample_size']:
            print(f"✅ Minimum sample size reached, statistical analysis can be performed")
        else:
            needed = experiment['minimum_sample_size'] - total_samples
            print(f"⏳ {needed} more samples needed to reach the minimum sample size")
        
        return {
            'batch_results': batch_results,
            'experiment_summary': self.get_experiment_summary(experiment_id)
        }
    
    def get_experiment_summary(self, experiment_id: str) -> Dict:
        """Get experiment summary"""
        
        if experiment_id not in self.experiments:
            return {}
        
        experiment = self.experiments[experiment_id]
        
        # Calculate control group statistics
        control_stats = self._calculate_group_statistics(experiment['control_results'])
        
        # Calculate treatment group statistics
        treatment_stats = self._calculate_group_statistics(experiment['treatment_results'])
        
        # Calculate statistical significance
        significance_results = self._calculate_statistical_significance(
            experiment['control_results'], 
            experiment['treatment_results'],
            experiment['success_metrics']
        )
        
        summary = {
            'experiment_name': experiment['name'],
            'experiment_id': experiment_id,
            'duration': datetime.now() - experiment['start_time'],
            'control_count': experiment['control_count'],
            'treatment_count': experiment['treatment_count'],
            'control_stats': control_stats,
            'treatment_stats': treatment_stats,
            'significance_results': significance_results,
            'recommendations': self._generate_recommendations(
                control_stats, treatment_stats, significance_results
            )
        }
        
        return summary
    
    def _calculate_group_statistics(self, results: List[Dict]) -> Dict:
        """Calculate group statistics"""
        
        if not results:
            return {
                'sample_size': 0,
                'success_rate': 0.0,
                'avg_execution_time': 0.0,
                'metric_averages': {}
            }
        
        # Basic statistics
        sample_size = len(results)
        successful_results = [r for r in results if r['success']]
        success_rate = len(successful_results) / sample_size
        
        # Execution time statistics
        execution_times = [r['execution_time'] for r in results]
        avg_execution_time = statistics.mean(execution_times)
        
        # Metric statistics
        metric_averages = {}
        
        if successful_results:
            # Collect all metrics
            all_metrics = set()
            for result in successful_results:
                all_metrics.update(result['evaluation_scores'].keys())
            
            for metric in all_metrics:
                metric_scores = []
                for result in successful_results:
                    if metric in result['evaluation_scores']:
                        metric_scores.append(result['evaluation_scores'][metric])
                
                if metric_scores:
                    metric_averages[metric] = {
                        'mean': statistics.mean(metric_scores),
                        'std': statistics.stdev(metric_scores) if len(metric_scores) > 1 else 0.0,
                        'min': min(metric_scores),
                        'max': max(metric_scores)
                    }
        
        return {
            'sample_size': sample_size,
            'success_rate': success_rate,
            'avg_execution_time': avg_execution_time,
            'execution_time_std': statistics.stdev(execution_times) if len(execution_times) > 1 else 0.0,
            'metric_averages': metric_averages
        }
    
    def _calculate_statistical_significance(self, 
                                          control_results: List[Dict], 
                                          treatment_results: List[Dict], 
                                          metrics: List[str]) -> Dict:
        """Calculate statistical significance"""
        
        significance_results = {}
        
        # Significance test for success rate
        significance_results['success_rate'] = self._two_proportion_z_test(
            control_results, treatment_results
        )
        
        # Significance test for each metric
        for metric in metrics:
            if metric != 'success_rate':
                significance_results[metric] = self._two_sample_t_test(
                    control_results, treatment_results, metric
                )
        
        return significance_results
    
    def _two_proportion_z_test(self, control_results: List[Dict], treatment_results: List[Dict]) -> Dict:
        """Two-proportion z-test"""
        
        if not control_results or not treatment_results:
            return {'test': 'two_proportion_z', 'p_value': 1.0, 'significant': False}
        
        # Calculate success rates
        n1 = len(control_results)
        n2 = len(treatment_results)
        x1 = sum(1 for r in control_results if r['success'])
        x2 = sum(1 for r in treatment_results if r['success'])
        
        p1 = x1 / n1
        p2 = x2 / n2
        
        # Pooled proportion
        p = (x1 + x2) / (n1 + n2)
        
        # Standard error
        se = (p * (1 - p) * (1/n1 + 1/n2)) ** 0.5
        
        if se == 0:
            return {'test': 'two_proportion_z', 'p_value': 1.0, 'significant': False}
        
        # Z-statistic
        z = (p2 - p1) / se
        
        # Simplified p-value calculation (use more accurate methods in practice)
        p_value = 2 * (1 - abs(z) / 2)  # Very simplified approximation
        p_value = max(0.0, min(1.0, p_value))
        
        return {
            'test': 'two_proportion_z',
            'z_statistic': z,
            'p_value': p_value,
            'significant': p_value < 0.05,
            'control_rate': p1,
            'treatment_rate': p2,
            'effect_size': p2 - p1
        }
    
    def _two_sample_t_test(self, control_results: List[Dict], treatment_results: List[Dict], metric: str) -> Dict:
        """Two-sample t-test"""
        
        # Extract metric data
        control_scores = []
        treatment_scores = []
        
        for result in control_results:
            if result['success'] and metric in result['evaluation_scores']:
                control_scores.append(result['evaluation_scores'][metric])
        
        for result in treatment_results:
            if result['success'] and metric in result['evaluation_scores']:
                treatment_scores.append(result['evaluation_scores'][metric])
        
        if len(control_scores) < 2 or len(treatment_scores) < 2:
            return {'test': 'two_sample_t', 'p_value': 1.0, 'significant': False}
        
        # Calculate statistics
        mean1 = statistics.mean(control_scores)
        mean2 = statistics.mean(treatment_scores)
        std1 = statistics.stdev(control_scores)
        std2 = statistics.stdev(treatment_scores)
        n1 = len(control_scores)
        n2 = len(treatment_scores)
        
        # Pooled standard deviation
        pooled_std = ((n1 - 1) * std1**2 + (n2 - 1) * std2**2) / (n1 + n2 - 2)
        pooled_std = pooled_std ** 0.5
        
        if pooled_std == 0:
            return {'test': 'two_sample_t', 'p_value': 1.0, 'significant': False}
        
        # t-statistic
        t = (mean2 - mean1) / (pooled_std * (1/n1 + 1/n2)**0.5)
        
        # Degrees of freedom
        df = n1 + n2 - 2
        
        # Simplified p-value calculation
        p_value = 2 * (1 - abs(t) / (df**0.5 + 2))  # Very simplified approximation
        p_value = max(0.0, min(1.0, p_value))
        
        return {
            'test': 'two_sample_t',
            't_statistic': t,
            'degrees_of_freedom': df,
            'p_value': p_value,
            'significant': p_value < 0.05,
            'control_mean': mean1,
            'treatment_mean': mean2,
            'effect_size': mean2 - mean1
        }
    
    def _generate_recommendations(self, 
                                 control_stats: Dict, 
                                 treatment_stats: Dict, 
                                 significance_results: Dict) -> List[str]:
        """Generate recommendations"""
        
        recommendations = []
        
        # Check sample size
        if control_stats['sample_size'] < 30 or treatment_stats['sample_size'] < 30:
            recommendations.append("⚠️ Sample size is small, consider collecting more data for higher statistical reliability")
        
        # Check success rate
        if 'success_rate' in significance_results:
            sig_result = significance_results['success_rate']
            if sig_result['significant']:
                if sig_result['effect_size'] > 0:
                    recommendations.append("✅ The treatment group's success rate is significantly higher than the control group's, recommend adopting the treatment group's solution")
                else:
                    recommendations.append("❌ The treatment group's success rate is significantly lower than the control group's, recommend sticking with the control group's solution")
            else:
                recommendations.append("➖ No significant difference in success rates between the treatment and control groups")
        
        # Check execution time
        if treatment_stats['avg_execution_time'] > control_stats['avg_execution_time'] * 1.2:
            recommendations.append("⏱️ The treatment group's execution time is noticeably longer, consider the performance cost")
        elif treatment_stats['avg_execution_time'] < control_stats['avg_execution_time'] * 0.8:
            recommendations.append("🚀 The treatment group's execution time is shorter, offering a performance advantage")
        
        # Check other metrics
        for metric, sig_result in significance_results.items():
            if metric != 'success_rate' and sig_result.get('significant', False):
                effect_size = sig_result.get('effect_size', 0)
                if effect_size > 0:
                    recommendations.append(f"📈 The treatment group performed significantly better on the {metric} metric")
                else:
                    recommendations.append(f"📉 The control group performed significantly better on the {metric} metric")
        
        if not recommendations:
            recommendations.append("🤔 No clear differences in experimental results, consider extending the test duration or redesigning the experiment")
        
        return recommendations
    
    def generate_report(self, experiment_id: str) -> str:
        """Generate an experiment report"""
        
        summary = self.get_experiment_summary(experiment_id)
        
        if not summary:
            return f"Experiment {experiment_id} does not exist or has no data"
        
        report = []
        report.append(f"📊 A/B Test Experiment Report")
        report.append("=" * 60)
        report.append(f"Experiment Name: {summary['experiment_name']}")
        report.append(f"Experiment ID: {summary['experiment_id']}")
        report.append(f"Test Duration: {summary['duration']}")
        report.append(f"Sample Size: Control {summary['control_count']}, Treatment {summary['treatment_count']}")
        
        report.append(f"\n📈 Control Group Performance:")
        control_stats = summary['control_stats']
        report.append(f"  Success Rate: {control_stats['success_rate']:.1%}")
        report.append(f"  Average Execution Time: {control_stats['avg_execution_time']:.3f}s")
        
        report.append(f"\n📈 Treatment Group Performance:")
        treatment_stats = summary['treatment_stats']
        report.append(f"  Success Rate: {treatment_stats['success_rate']:.1%}")
        report.append(f"  Average Execution Time: {treatment_stats['avg_execution_time']:.3f}s")
        
        report.append(f"\n🔬 Statistical Significance:")
        for metric, sig_result in summary['significance_results'].items():
            significance = "Significant" if sig_result.get('significant', False) else "Not Significant"
            p_value = sig_result.get('p_value', 1.0)
            report.append(f"  {metric}: {significance} (p={p_value:.3f})")
        
        report.append(f"\n💡 Recommendations:")
        for rec in summary['recommendations']:
            report.append(f"  {rec}")
        
        return "\n".join(report)

# Usage example
def demonstrate_ab_testing():
    """Demonstrate the A/B testing framework"""
    
    # Create mock programs
    class ControlProgram:
        def __call__(self, **kwargs):
            # Simulate a simpler answer
            time.sleep(0.1)  # Simulate execution time
            return dspy.Prediction(answer="Basic answer")
    
    class TreatmentProgram:
        def __call__(self, **kwargs):
            # Simulate a more complex but better answer
            time.sleep(0.15)  # Slightly longer execution time
            if random.random() > 0.1:  # 90% success rate
                return dspy.Prediction(answer="Detailed and accurate answer")
            else:
                raise Exception("Processing failed")
    
    # Create test data
    test_examples = [
        dspy.Example(question=f"Question {i}", answer="Standard answer").with_inputs('question')
        for i in range(50)
    ]
    
    # Initialize A/B testing framework and evaluator
    ab_test = ABTestFramework()
    evaluator = DSPyEvaluationSystem()
    
    # Create an experiment
    experiment_id = ab_test.create_experiment(
        experiment_name="Answer Quality Improvement Test",
        control_program=ControlProgram(),
        treatment_program=TreatmentProgram(),
        traffic_split=0.5,
        success_metrics=['exact_match', 'semantic_similarity'],
        minimum_sample_size=30
    )
    
    # Run a batch test
    batch_results = ab_test.run_batch_test(experiment_id, test_examples, evaluator)
    
    # Generate a report
    report = ab_test.generate_report(experiment_id)
    print(report)
    
    return batch_results, report

# demo_ab_results = demonstrate_ab_testing()

Best Practices

1. Data Quality Assurance

def data_quality_guidelines():
    """Data quality assurance guidelines"""
    
    guidelines = {
        'Data Collection': [
            'Ensure the reliability and diversity of data sources',
            'Establish standardized data collection processes',
            'Record the context and metadata of data collection'
        ],
        
        'Data Cleaning': [
            'Identify and handle duplicate data',
            'Check data integrity and consistency',
            'Handle outliers and noisy data'
        ],
        
        'Data Annotation': [
            'Establish clear annotation guidelines',
            'Implement multi-person annotation and quality checks',
            'Regularly evaluate annotation quality'
        ],
        
        'Data Splitting': [
            'Ensure the representativeness of training, validation, and test sets',
            'Avoid data leakage',
            'Consider the specifics of time-series data'
        ]
    }
    
    return guidelines

class DataQualityChecker:
    """Data Quality Checker"""
    
    def __init__(self):
        self.quality_checks = [
            self.check_duplicates,
            self.check_completeness,
            self.check_consistency,
            self.check_distribution
        ]
    
    def run_quality_checks(self, examples: List[dspy.Example]) -> Dict:
        """Run quality checks"""
        results = {}
        
        for check in self.quality_checks:
            check_name = check.__name__
            try:
                result = check(examples)
                results[check_name] = result
            except Exception as e:
                results[check_name] = {'error': str(e)}
        
        return results
    
    def check_duplicates(self, examples: List[dspy.Example]) -> Dict:
        """Check for duplicate data"""
        # TODO: Implement duplicate checking logic
        pass
    
    def check_completeness(self, examples: List[dspy.Example]) -> Dict:
        """Check data completeness"""
        # TODO: Implement completeness checking
        pass
    
    def check_consistency(self, examples: List[dspy.Example]) -> Dict:
        """Check data consistency"""
        # TODO: Implement consistency checking
        pass
    
    def check_distribution(self, examples: List[dspy.Example]) -> Dict:
        """Check data distribution"""
        # TODO: Implement distribution checking
        pass

2. Evaluation Best Practices

def evaluation_best_practices():
    """Evaluation best practices guidelines"""
    
    practices = {
        'Metric Selection': [
            'Select metrics that align with business goals',
            'Use multiple complementary evaluation metrics',
            'Consider task-specific evaluation criteria'
        ],
        
        'Test Set Design': [
            'Ensure the representativeness of the test set',
            'Include edge cases and difficult samples',
            'Regularly update the test set to reflect data drift'
        ],
        
        'Statistical Analysis': [
            'Report confidence intervals, not just point estimates',
            'Conduct significance tests',
            'Consider the problem of multiple comparisons'
        ],
        
        'Result Interpretation': [
            'Provide error analysis and case studies',
            'Consider performance across different user segments',
            'Analyze the limitations of the model'
        ]
    }
    
    return practices

class EvaluationReportGenerator:
    """Evaluation Report Generator"""
    
    def generate_comprehensive_report(self, 
                                    evaluation_results: Dict,
                                    model_info: Dict = None,
                                    dataset_info: Dict = None) -> str:
        """Generate a comprehensive evaluation report"""
        
        report_sections = [
            self._generate_executive_summary(evaluation_results),
            self._generate_model_info_section(model_info),
            self._generate_dataset_info_section(dataset_info),
            self._generate_performance_metrics_section(evaluation_results),
            self._generate_error_analysis_section(evaluation_results),
            self._generate_recommendations_section(evaluation_results)
        ]
        
        return "\n\n".join(filter(None, report_sections))
    
    def _generate_executive_summary(self, results: Dict) -> str:
        """Generate an executive summary"""
        # TODO: Implement summary generation
        pass
    
    def _generate_model_info_section(self, model_info: Dict) -> str:
        """Generate the model information section"""
        # TODO: Implement model info generation
        pass
    
    # ... other report generation methods

By completing this chapter, you should have mastered the complete methods of data processing and evaluation in DSPy. Good data processing and a scientific evaluation system are the foundation for building high-quality AI systems. In practice, it is important to choose appropriate evaluation metrics based on specific business needs and to establish a comprehensive quality assurance mechanism.