Chapter 06: DSPy Data Processing and Evaluation
Haiyue
62min
Chapter 06: DSPy Data Processing and Evaluation
Learning Objectives
- Master dataset processing methods in DSPy
- Learn the definition and use of evaluation metrics
- Implement custom evaluation functions
- Conduct comparative analysis of model performance
- Design an A/B testing framework
Key Concepts
1. DSPy Dataset Processing
DSPy provides powerful data processing tools capable of handling and preprocessing datasets in various formats.
Basic Dataset Operations
import dspy
import json
import pandas as pd
from typing import List, Dict, Any, Optional, Union
import random
import numpy as np
from pathlib import Path
class DSPyDataProcessor:
"""DSPy Data Processor"""
def __init__(self):
self.processed_datasets = {}
self.statistics = {}
def create_examples_from_dict(self, data_list: List[Dict]) -> List[dspy.Example]:
"""Create DSPy Examples from a list of dictionaries"""
examples = []
for item in data_list:
# Create an Example object
example = dspy.Example(**item)
examples.append(example)
return examples
def create_examples_from_csv(self,
csv_path: str,
input_columns: List[str],
output_columns: List[str]) -> List[dspy.Example]:
"""Create DSPy Examples from a CSV file"""
df = pd.read_csv(csv_path)
examples = []
for _, row in df.iterrows():
# Build example data
example_data = {}
# Add input fields
for col in input_columns:
if col in df.columns:
example_data[col] = row[col]
# Add output fields
for col in output_columns:
if col in df.columns:
example_data[col] = row[col]
# Create an Example and set the input fields
example = dspy.Example(**example_data)
example = example.with_inputs(*input_columns)
examples.append(example)
print(f"📊 Loaded {len(examples)} samples from CSV")
return examples
def create_examples_from_json(self,
json_path: str,
input_key: str = "inputs",
output_key: str = "outputs") -> List[dspy.Example]:
"""Create DSPy Examples from a JSON file"""
with open(json_path, 'r', encoding='utf-8') as f:
data = json.load(f)
examples = []
if isinstance(data, list):
for item in data:
example_data = {}
# Process inputs
if input_key in item:
if isinstance(item[input_key], dict):
example_data.update(item[input_key])
else:
example_data['input'] = item[input_key]
# Process outputs
if output_key in item:
if isinstance(item[output_key], dict):
example_data.update(item[output_key])
else:
example_data['output'] = item[output_key]
# Process other fields
for key, value in item.items():
if key not in [input_key, output_key]:
example_data[key] = value
example = dspy.Example(**example_data)
examples.append(example)
print(f"📊 Loaded {len(examples)} samples from JSON")
return examples
def split_dataset(self,
examples: List[dspy.Example],
train_ratio: float = 0.7,
dev_ratio: float = 0.15,
test_ratio: float = 0.15,
random_seed: int = 42) -> Dict[str, List[dspy.Example]]:
"""Split the dataset"""
# Validate ratios
total_ratio = train_ratio + dev_ratio + test_ratio
if abs(total_ratio - 1.0) > 1e-6:
raise ValueError("The sum of train, dev, and test ratios must be 1.0")
# Set random seed
random.seed(random_seed)
# Shuffle the data randomly
shuffled_examples = examples.copy()
random.shuffle(shuffled_examples)
# Calculate split points
total_size = len(shuffled_examples)
train_size = int(total_size * train_ratio)
dev_size = int(total_size * dev_ratio)
# Split the data
train_set = shuffled_examples[:train_size]
dev_set = shuffled_examples[train_size:train_size + dev_size]
test_set = shuffled_examples[train_size + dev_size:]
split_info = {
'train': train_set,
'dev': dev_set,
'test': test_set
}
# Print split information
print(f"📊 Dataset splitting completed:")
print(f" Training set: {len(train_set)} samples ({len(train_set)/total_size:.1%})")
print(f" Validation set: {len(dev_set)} samples ({len(dev_set)/total_size:.1%})")
print(f" Test set: {len(test_set)} samples ({len(test_set)/total_size:.1%})")
return split_info
def analyze_dataset(self, examples: List[dspy.Example], name: str = "dataset") -> Dict:
"""Analyze dataset statistics"""
if not examples:
return {'error': 'Empty dataset'}
# Basic statistics
stats = {
'name': name,
'total_samples': len(examples),
'field_stats': {},
'data_quality': {}
}
# Get all fields
all_fields = set()
for example in examples:
all_fields.update(example.__dict__.keys())
# Analyze each field
for field in all_fields:
field_values = []
non_null_count = 0
for example in examples:
value = getattr(example, field, None)
if value is not None and value != '':
field_values.append(value)
non_null_count += 1
# Field statistics
field_stat = {
'total_count': len(examples),
'non_null_count': non_null_count,
'null_ratio': (len(examples) - non_null_count) / len(examples),
'data_type': self.detect_data_type(field_values)
}
# Additional statistics for text fields
if field_stat['data_type'] == 'text':
if field_values:
lengths = [len(str(v)) for v in field_values]
field_stat.update({
'avg_length': sum(lengths) / len(lengths),
'min_length': min(lengths),
'max_length': max(lengths),
'total_chars': sum(lengths)
})
stats['field_stats'][field] = field_stat
# Data quality check
stats['data_quality'] = self.check_data_quality(examples)
self.statistics[name] = stats
# Print summary of statistics
self.print_dataset_summary(stats)
return stats
def detect_data_type(self, values: List) -> str:
"""Detect data type"""
if not values:
return 'unknown'
sample_values = values[:100] # Sample for detection
# Check for numeric types
numeric_count = 0
for value in sample_values:
try:
float(str(value))
numeric_count += 1
except (ValueError, TypeError):
pass
if numeric_count / len(sample_values) > 0.8:
return 'numeric'
# Check for text types
return 'text'
def check_data_quality(self, examples: List[dspy.Example]) -> Dict:
"""Check data quality"""
quality_issues = {
'duplicate_count': 0,
'missing_inputs': 0,
'empty_outputs': 0,
'unusual_lengths': 0
}
# Check for duplicates
seen_inputs = set()
for example in examples:
input_str = str(example.inputs() if hasattr(example, 'inputs') else example.__dict__)
if input_str in seen_inputs:
quality_issues['duplicate_count'] += 1
else:
seen_inputs.add(input_str)
# Check for missing and unusual values
for example in examples:
# Check for missing inputs
try:
inputs = example.inputs()
if not any(inputs.values()):
quality_issues['missing_inputs'] += 1
except:
quality_issues['missing_inputs'] += 1
# Check for empty outputs
for key, value in example.__dict__.items():
if 'answer' in key.lower() or 'output' in key.lower():
if not value or len(str(value).strip()) == 0:
quality_issues['empty_outputs'] += 1
break
# Check for unusual lengths
for key, value in example.__dict__.items():
if isinstance(value, str):
if len(value) > 10000 or len(value) < 2: # Too long or too short
quality_issues['unusual_lengths'] += 1
break
return quality_issues
def print_dataset_summary(self, stats: Dict):
"""Print dataset summary"""
print(f"\n📋 Dataset '{stats['name']}' Analysis Report")
print("=" * 50)
print(f"Total samples: {stats['total_samples']}")
print(f"\n📊 Field Statistics:")
for field, field_stat in stats['field_stats'].items():
print(f" {field}:")
print(f" Type: {field_stat['data_type']}")
print(f" Non-null rate: {(1-field_stat['null_ratio']):.1%}")
if field_stat['data_type'] == 'text' and 'avg_length' in field_stat:
print(f" Average length: {field_stat['avg_length']:.1f} characters")
print(f"\n🔍 Data Quality:")
quality = stats['data_quality']
print(f" Duplicate samples: {quality['duplicate_count']}")
print(f" Missing inputs: {quality['missing_inputs']}")
print(f" Empty outputs: {quality['empty_outputs']}")
print(f" Unusual lengths: {quality['unusual_lengths']}")
# Data Augmentation Tool
class DataAugmentor:
"""Data Augmentor"""
def __init__(self):
self.augmentation_methods = {
'paraphrase': self.paraphrase_augmentation,
'synonym': self.synonym_replacement,
'back_translation': self.back_translation,
'template': self.template_based_augmentation
}
def augment_dataset(self,
examples: List[dspy.Example],
methods: List[str] = ['paraphrase'],
target_ratio: float = 1.5) -> List[dspy.Example]:
"""Augment the dataset"""
original_size = len(examples)
target_size = int(original_size * target_ratio)
augmented_examples = examples.copy()
print(f"🔄 Starting data augmentation: {original_size} -> {target_size}")
while len(augmented_examples) < target_size:
for method in methods:
if method in self.augmentation_methods:
# Randomly select an original sample for augmentation
original_example = random.choice(examples)
augmented_example = self.augmentation_methods[method](original_example)
if augmented_example:
augmented_examples.append(augmented_example)
if len(augmented_examples) >= target_size:
break
print(f"✅ Data augmentation completed: {len(augmented_examples)} samples")
return augmented_examples[:target_size]
def paraphrase_augmentation(self, example: dspy.Example) -> Optional[dspy.Example]:
"""Paraphrase augmentation"""
# Use DSPy for paraphrasing
paraphraser = dspy.ChainOfThought(
"original_text -> reasoning, paraphrased_text",
instructions="Paraphrase the given text, maintaining the original meaning but changing the expression."
)
try:
# Select a text field to paraphrase
text_fields = []
for key, value in example.__dict__.items():
if isinstance(value, str) and len(value) > 10:
text_fields.append((key, value))
if not text_fields:
return None
field_name, original_text = random.choice(text_fields)
# Generate paraphrase
result = paraphraser(original_text=original_text)
# Create a new example
new_example_data = example.__dict__.copy()
new_example_data[field_name] = result.paraphrased_text
new_example = dspy.Example(**new_example_data)
# Maintain input/output settings
if hasattr(example, '_input_keys'):
new_example = new_example.with_inputs(*example._input_keys)
return new_example
except Exception as e:
print(f"Paraphrase augmentation failed: {e}")
return None
def synonym_replacement(self, example: dspy.Example) -> Optional[dspy.Example]:
"""Synonym replacement"""
# Simplified implementation of synonym replacement
synonym_dict = {
'good': ['nice', 'excellent', 'favorable'],
'problem': ['issue', 'challenge', 'matter'],
'method': ['way', 'approach', 'technique'],
'important': ['critical', 'significant', 'vital'],
'simple': ['easy', 'straightforward', 'convenient']
}
try:
new_example_data = example.__dict__.copy()
# Perform synonym replacement on text fields
for key, value in new_example_data.items():
if isinstance(value, str):
modified_text = value
for original, synonyms in synonym_dict.items():
if original in modified_text:
replacement = random.choice(synonyms)
modified_text = modified_text.replace(original, replacement, 1)
new_example_data[key] = modified_text
new_example = dspy.Example(**new_example_data)
if hasattr(example, '_input_keys'):
new_example = new_example.with_inputs(*example._input_keys)
return new_example
except Exception as e:
print(f"Synonym replacement failed: {e}")
return None
def back_translation(self, example: dspy.Example) -> Optional[dspy.Example]:
"""Back-translation augmentation"""
# Simplified implementation, requires an external translation API in practice
print("Back-translation requires an external translation service, skipping here")
return None
def template_based_augmentation(self, example: dspy.Example) -> Optional[dspy.Example]:
"""Template-based augmentation"""
templates = {
'question': [
"Please tell me about {}",
"Can you tell me about {}",
"I want to know about {}",
"What is {}?"
]
}
try:
new_example_data = example.__dict__.copy()
# Find the question field and apply a template
for key, value in new_example_data.items():
if 'question' in key.lower() and isinstance(value, str):
# Extract core content (simplified)
core_content = value.replace('What is', '').replace('?', '').replace('?', '')
if core_content.strip():
template = random.choice(templates['question'])
new_example_data[key] = template.format(core_content.strip())
new_example = dspy.Example(**new_example_data)
if hasattr(example, '_input_keys'):
new_example = new_example.with_inputs(*example._input_keys)
return new_example
except Exception as e:
print(f"Template augmentation failed: {e}")
return None
# Usage example
def demonstrate_data_processing():
"""Demonstrate data processing functionalities"""
# Create sample data
sample_data = [
{
'question': 'What is machine learning?',
'answer': 'Machine learning is a branch of artificial intelligence that allows computers to learn from data.',
'category': 'AI',
'difficulty': 'basic'
},
{
'question': 'What are the characteristics of deep learning?',
'answer': 'Deep learning uses multi-layered neural networks to automatically extract features.',
'category': 'AI',
'difficulty': 'intermediate'
},
{
'question': 'How to optimize a neural network?',
'answer': 'Neural networks can be optimized by adjusting the learning rate, using regularization, batch normalization, etc.',
'category': 'AI',
'difficulty': 'advanced'
}
]
# Initialize data processor
processor = DSPyDataProcessor()
# Create Examples
examples = processor.create_examples_from_dict(sample_data)
# Set inputs/outputs
examples = [ex.with_inputs('question') for ex in examples]
# Analyze the dataset
stats = processor.analyze_dataset(examples, "Sample Dataset")
# Split the dataset
splits = processor.split_dataset(examples, train_ratio=0.6, dev_ratio=0.2, test_ratio=0.2)
# Data augmentation
augmentor = DataAugmentor()
augmented_train = augmentor.augment_dataset(
splits['train'],
methods=['paraphrase', 'synonym'],
target_ratio=2.0
)
return {
'original_examples': examples,
'statistics': stats,
'splits': splits,
'augmented_train': augmented_train
}
# Run the demonstration
# demo_results = demonstrate_data_processing()
2. Evaluation Metric System
DSPy supports multiple evaluation metrics for comprehensive model performance assessment.
class DSPyEvaluationSystem:
"""DSPy Evaluation System"""
def __init__(self):
self.metrics = {}
self.evaluation_results = {}
# Register basic metrics
self.register_basic_metrics()
def register_basic_metrics(self):
"""Register basic evaluation metrics"""
# Exact match
self.metrics['exact_match'] = self.exact_match_metric
# Contains match
self.metrics['contains_match'] = self.contains_match_metric
# Semantic similarity
self.metrics['semantic_similarity'] = self.semantic_similarity_metric
# BLEU score
self.metrics['bleu_score'] = self.bleu_score_metric
# ROUGE score
self.metrics['rouge_score'] = self.rouge_score_metric
# Custom score
self.metrics['custom_score'] = self.custom_score_metric
def exact_match_metric(self, example, prediction, trace=None) -> bool:
"""Exact match metric"""
try:
expected = getattr(example, 'answer', '') or getattr(example, 'output', '')
actual = getattr(prediction, 'answer', '') or getattr(prediction, 'output', '')
return str(expected).strip().lower() == str(actual).strip().lower()
except Exception:
return False
def contains_match_metric(self, example, prediction, trace=None) -> bool:
"""Contains match metric"""
try:
expected = getattr(example, 'answer', '') or getattr(example, 'output', '')
actual = getattr(prediction, 'answer', '') or getattr(prediction, 'output', '')
expected_lower = str(expected).strip().lower()
actual_lower = str(actual).strip().lower()
return expected_lower in actual_lower or actual_lower in expected_lower
except Exception:
return False
def semantic_similarity_metric(self, example, prediction, trace=None) -> float:
"""Semantic similarity metric"""
try:
expected = str(getattr(example, 'answer', '') or getattr(example, 'output', ''))
actual = str(getattr(prediction, 'answer', '') or getattr(prediction, 'output', ''))
# Simplified semantic similarity calculation (use more complex methods in practice)
expected_words = set(expected.lower().split())
actual_words = set(actual.lower().split())
if not expected_words and not actual_words:
return 1.0
elif not expected_words or not actual_words:
return 0.0
intersection = len(expected_words.intersection(actual_words))
union = len(expected_words.union(actual_words))
return intersection / union if union > 0 else 0.0
except Exception:
return 0.0
def bleu_score_metric(self, example, prediction, trace=None) -> float:
"""BLEU score metric"""
try:
from nltk.translate.bleu_score import sentence_bleu
from nltk.tokenize import word_tokenize
expected = str(getattr(example, 'answer', '') or getattr(example, 'output', ''))
actual = str(getattr(prediction, 'answer', '') or getattr(prediction, 'output', ''))
reference = [word_tokenize(expected.lower())]
candidate = word_tokenize(actual.lower())
score = sentence_bleu(reference, candidate)
return score
except ImportError:
print("NLTK not installed, using simplified BLEU calculation")
return self.simple_bleu(example, prediction)
except Exception:
return 0.0
def simple_bleu(self, example, prediction) -> float:
"""Simplified BLEU calculation"""
try:
expected = str(getattr(example, 'answer', '') or getattr(example, 'output', ''))
actual = str(getattr(prediction, 'answer', '') or getattr(prediction, 'output', ''))
expected_words = expected.lower().split()
actual_words = actual.lower().split()
if not actual_words:
return 0.0
# 1-gram precision
matches = sum(1 for word in actual_words if word in expected_words)
precision = matches / len(actual_words)
# Brevity penalty
length_penalty = min(1.0, len(expected_words) / len(actual_words))
return precision * length_penalty
except Exception:
return 0.0
def rouge_score_metric(self, example, prediction, trace=None) -> float:
"""ROUGE score metric"""
try:
expected = str(getattr(example, 'answer', '') or getattr(example, 'output', ''))
actual = str(getattr(prediction, 'answer', '') or getattr(prediction, 'output', ''))
expected_words = expected.lower().split()
actual_words = actual.lower().split()
if not expected_words:
return 1.0 if not actual_words else 0.0
# ROUGE-1 (unigram overlap)
overlap = sum(1 for word in expected_words if word in actual_words)
recall = overlap / len(expected_words)
precision = overlap / len(actual_words) if actual_words else 0.0
# F1-score
if recall + precision == 0:
return 0.0
f1 = 2 * (recall * precision) / (recall + precision)
return f1
except Exception:
return 0.0
def custom_score_metric(self, example, prediction, trace=None) -> float:
"""Custom scoring metric"""
# Use a DSPy module for intelligent scoring
scorer = dspy.ChainOfThought(
"question, expected_answer, actual_answer -> reasoning, score",
instructions="""Evaluate the quality of the answer, considering the following factors:
1. Accuracy: Is the answer correct?
2. Completeness: Is the answer complete?
3. Relevance: Is the answer relevant?
4. Clarity: Is the answer clear and easy to understand?
Provide a score from 0 to 10."""
)
try:
question = getattr(example, 'question', '') or getattr(example, 'input', '')
expected = getattr(example, 'answer', '') or getattr(example, 'output', '')
actual = getattr(prediction, 'answer', '') or getattr(prediction, 'output', '')
result = scorer(
question=question,
expected_answer=expected,
actual_answer=actual
)
# Extract the numeric score
import re
score_match = re.search(r'(\d+(?:\.\d+)?)', result.score)
if score_match:
score = float(score_match.group(1))
return min(max(score / 10.0, 0.0), 1.0) # Normalize to 0-1
except Exception as e:
print(f"Custom scoring failed: {e}")
return 0.5 # Default to a medium score
def evaluate_program(self,
program,
test_examples: List[dspy.Example],
metrics: List[str] = None,
verbose: bool = True) -> Dict[str, Any]:
"""Evaluate program performance"""
if metrics is None:
metrics = ['exact_match', 'semantic_similarity']
print(f"🔍 Starting evaluation, number of test samples: {len(test_examples)}")
print(f"📊 Evaluation metrics: {', '.join(metrics)}")
results = {
'total_examples': len(test_examples),
'successful_predictions': 0,
'failed_predictions': 0,
'metric_scores': {metric: [] for metric in metrics},
'detailed_results': []
}
for i, example in enumerate(test_examples):
if verbose and (i + 1) % 10 == 0:
print(f" Processing progress: {i + 1}/{len(test_examples)}")
try:
# Execute prediction
prediction = program(**example.inputs())
results['successful_predictions'] += 1
# Calculate various metrics
example_scores = {}
for metric_name in metrics:
if metric_name in self.metrics:
score = self.metrics[metric_name](example, prediction)
example_scores[metric_name] = float(score)
results['metric_scores'][metric_name].append(float(score))
# Record detailed results
detailed_result = {
'example_id': i,
'input': example.inputs(),
'expected_output': {k: v for k, v in example.__dict__.items()
if k not in example.inputs()},
'actual_output': prediction.__dict__ if hasattr(prediction, '__dict__')
else str(prediction),
'scores': example_scores
}
results['detailed_results'].append(detailed_result)
except Exception as e:
results['failed_predictions'] += 1
if verbose:
print(f" Prediction failed (sample {i}): {e}")
# Record failed result
detailed_result = {
'example_id': i,
'input': example.inputs(),
'error': str(e),
'scores': {metric: 0.0 for metric in metrics}
}
results['detailed_results'].append(detailed_result)
# Add a score of 0 for failed cases
for metric_name in metrics:
results['metric_scores'][metric_name].append(0.0)
# Calculate overall statistics
results['summary'] = self.calculate_summary_statistics(results)
# Print evaluation results
if verbose:
self.print_evaluation_results(results)
return results
def calculate_summary_statistics(self, results: Dict) -> Dict:
"""Calculate summary statistics"""
summary = {
'success_rate': results['successful_predictions'] / results['total_examples'],
'metric_averages': {},
'metric_std': {},
'metric_ranges': {}
}
for metric_name, scores in results['metric_scores'].items():
if scores:
summary['metric_averages'][metric_name] = sum(scores) / len(scores)
# Calculate standard deviation
mean = summary['metric_averages'][metric_name]
variance = sum((x - mean) ** 2 for x in scores) / len(scores)
summary['metric_std'][metric_name] = variance ** 0.5
# Calculate range
summary['metric_ranges'][metric_name] = {
'min': min(scores),
'max': max(scores)
}
return summary
def print_evaluation_results(self, results: Dict):
"""Print evaluation results"""
print(f"\n📈 Evaluation Results Summary")
print("=" * 50)
summary = results['summary']
print(f"Success rate: {summary['success_rate']:.1%} "
f"({results['successful_predictions']}/{results['total_examples']})")
print(f"\n📊 Metric Performance:")
for metric_name, avg_score in summary['metric_averages'].items():
std_score = summary['metric_std'][metric_name]
range_info = summary['metric_ranges'][metric_name]
print(f" {metric_name}:")
print(f" Average: {avg_score:.3f} (±{std_score:.3f})")
print(f" Range: {range_info['min']:.3f} - {range_info['max']:.3f}")
def compare_programs(self,
programs: Dict[str, Any],
test_examples: List[dspy.Example],
metrics: List[str] = None) -> Dict:
"""Compare the performance of multiple programs"""
print(f"🏆 Starting program performance comparison")
print(f"Programs being compared: {list(programs.keys())}")
comparison_results = {}
for program_name, program in programs.items():
print(f"\n📊 Evaluating program: {program_name}")
results = self.evaluate_program(program, test_examples, metrics, verbose=False)
comparison_results[program_name] = results
# Generate comparison report
comparison_summary = self.generate_comparison_report(comparison_results)
return {
'individual_results': comparison_results,
'comparison_summary': comparison_summary
}
def generate_comparison_report(self, results: Dict) -> Dict:
"""Generate comparison report"""
if not results:
return {}
metrics = list(next(iter(results.values()))['summary']['metric_averages'].keys())
comparison = {
'program_rankings': {},
'best_program': {},
'performance_gaps': {}
}
# Rank for each metric
for metric in metrics:
program_scores = []
for program_name, program_results in results.items():
avg_score = program_results['summary']['metric_averages'][metric]
program_scores.append((program_name, avg_score))
# Sort by score
program_scores.sort(key=lambda x: x[1], reverse=True)
comparison['program_rankings'][metric] = program_scores
comparison['best_program'][metric] = program_scores[0]
# Calculate performance gap
if len(program_scores) > 1:
best_score = program_scores[0][1]
worst_score = program_scores[-1][1]
comparison['performance_gaps'][metric] = best_score - worst_score
# Print comparison results
self.print_comparison_results(comparison)
return comparison
def print_comparison_results(self, comparison: Dict):
"""Print comparison results"""
print(f"\n🏆 Program Performance Comparison Results")
print("=" * 60)
for metric, rankings in comparison['program_rankings'].items():
print(f"\n📊 {metric} Rankings:")
for i, (program_name, score) in enumerate(rankings, 1):
print(f" {i}. {program_name}: {score:.3f}")
# Show performance gap
if metric in comparison['performance_gaps']:
gap = comparison['performance_gaps'][metric]
print(f" Performance gap: {gap:.3f}")
print(f"\n🥇 Best Program for Each Metric:")
for metric, (best_program, best_score) in comparison['best_program'].items():
print(f" {metric}: {best_program} ({best_score:.3f})")
# Usage example
def demonstrate_evaluation_system():
"""Demonstrate the evaluation system"""
# Create test data
test_examples = [
dspy.Example(question="What is AI?", answer="Artificial Intelligence").with_inputs('question'),
dspy.Example(question="What is ML?", answer="Machine Learning").with_inputs('question'),
]
# Create mock programs
class MockProgram1:
def __call__(self, **kwargs):
return dspy.Prediction(answer="Artificial intelligence is technology that simulates human intelligence")
class MockProgram2:
def __call__(self, **kwargs):
return dspy.Prediction(answer="AI is artificial intelligence")
# Initialize evaluation system
evaluator = DSPyEvaluationSystem()
# Single program evaluation
program1 = MockProgram1()
results = evaluator.evaluate_program(
program1,
test_examples,
metrics=['exact_match', 'semantic_similarity', 'custom_score']
)
# Multi-program comparison
programs = {
'Program A': MockProgram1(),
'Program B': MockProgram2()
}
comparison = evaluator.compare_programs(programs, test_examples)
return results, comparison
# demo_eval_results = demonstrate_evaluation_system()
3. A/B Testing Framework
A/B testing is an important method for evaluating the effectiveness of different models or strategies.
import time
from datetime import datetime
import statistics
from typing import Callable
class ABTestFramework:
"""A/B Testing Framework"""
def __init__(self):
self.experiments = {}
self.results = {}
def create_experiment(self,
experiment_name: str,
control_program,
treatment_program,
traffic_split: float = 0.5,
success_metrics: List[str] = None,
minimum_sample_size: int = 100) -> str:
"""Create an A/B test experiment"""
experiment_id = f"{experiment_name}_{int(time.time())}"
self.experiments[experiment_id] = {
'name': experiment_name,
'control_program': control_program,
'treatment_program': treatment_program,
'traffic_split': traffic_split,
'success_metrics': success_metrics or ['success_rate'],
'minimum_sample_size': minimum_sample_size,
'start_time': datetime.now(),
'status': 'active',
'control_results': [],
'treatment_results': [],
'control_count': 0,
'treatment_count': 0
}
print(f"✅ Created A/B test experiment: {experiment_id}")
print(f" Control group program: {type(control_program).__name__}")
print(f" Treatment group program: {type(treatment_program).__name__}")
print(f" Traffic split: {traffic_split:.1%} to treatment group")
return experiment_id
def run_single_test(self,
experiment_id: str,
test_example: dspy.Example,
evaluator: DSPyEvaluationSystem) -> Dict:
"""Run a single test"""
if experiment_id not in self.experiments:
raise ValueError(f"Experiment {experiment_id} does not exist")
experiment = self.experiments[experiment_id]
# Decide which program to use (traffic split)
use_treatment = random.random() < experiment['traffic_split']
program = experiment['treatment_program'] if use_treatment else experiment['control_program']
program_type = 'treatment' if use_treatment else 'control'
# Execute prediction
start_time = time.time()
try:
prediction = program(**test_example.inputs())
execution_time = time.time() - start_time
success = True
error = None
except Exception as e:
execution_time = time.time() - start_time
prediction = None
success = False
error = str(e)
# Evaluate results
evaluation_scores = {}
if success and prediction:
for metric in experiment['success_metrics']:
if hasattr(evaluator, 'metrics') and metric in evaluator.metrics:
score = evaluator.metrics[metric](test_example, prediction)
evaluation_scores[metric] = score
# Record results
result = {
'timestamp': datetime.now(),
'program_type': program_type,
'success': success,
'execution_time': execution_time,
'evaluation_scores': evaluation_scores,
'error': error,
'example_id': id(test_example)
}
# Add to the corresponding result list
if program_type == 'treatment':
experiment['treatment_results'].append(result)
experiment['treatment_count'] += 1
else:
experiment['control_results'].append(result)
experiment['control_count'] += 1
return result
def run_batch_test(self,
experiment_id: str,
test_examples: List[dspy.Example],
evaluator: DSPyEvaluationSystem,
verbose: bool = True) -> Dict:
"""Run a batch test"""
print(f"🧪 Running A/B test: {experiment_id}")
print(f"📊 Number of test samples: {len(test_examples)}")
batch_results = []
for i, example in enumerate(test_examples):
if verbose and (i + 1) % 20 == 0:
print(f" Progress: {i + 1}/{len(test_examples)}")
result = self.run_single_test(experiment_id, example, evaluator)
batch_results.append(result)
# Check if the minimum sample size has been reached
experiment = self.experiments[experiment_id]
total_samples = experiment['control_count'] + experiment['treatment_count']
if total_samples >= experiment['minimum_sample_size']:
print(f"✅ Minimum sample size reached, statistical analysis can be performed")
else:
needed = experiment['minimum_sample_size'] - total_samples
print(f"⏳ {needed} more samples needed to reach the minimum sample size")
return {
'batch_results': batch_results,
'experiment_summary': self.get_experiment_summary(experiment_id)
}
def get_experiment_summary(self, experiment_id: str) -> Dict:
"""Get experiment summary"""
if experiment_id not in self.experiments:
return {}
experiment = self.experiments[experiment_id]
# Calculate control group statistics
control_stats = self._calculate_group_statistics(experiment['control_results'])
# Calculate treatment group statistics
treatment_stats = self._calculate_group_statistics(experiment['treatment_results'])
# Calculate statistical significance
significance_results = self._calculate_statistical_significance(
experiment['control_results'],
experiment['treatment_results'],
experiment['success_metrics']
)
summary = {
'experiment_name': experiment['name'],
'experiment_id': experiment_id,
'duration': datetime.now() - experiment['start_time'],
'control_count': experiment['control_count'],
'treatment_count': experiment['treatment_count'],
'control_stats': control_stats,
'treatment_stats': treatment_stats,
'significance_results': significance_results,
'recommendations': self._generate_recommendations(
control_stats, treatment_stats, significance_results
)
}
return summary
def _calculate_group_statistics(self, results: List[Dict]) -> Dict:
"""Calculate group statistics"""
if not results:
return {
'sample_size': 0,
'success_rate': 0.0,
'avg_execution_time': 0.0,
'metric_averages': {}
}
# Basic statistics
sample_size = len(results)
successful_results = [r for r in results if r['success']]
success_rate = len(successful_results) / sample_size
# Execution time statistics
execution_times = [r['execution_time'] for r in results]
avg_execution_time = statistics.mean(execution_times)
# Metric statistics
metric_averages = {}
if successful_results:
# Collect all metrics
all_metrics = set()
for result in successful_results:
all_metrics.update(result['evaluation_scores'].keys())
for metric in all_metrics:
metric_scores = []
for result in successful_results:
if metric in result['evaluation_scores']:
metric_scores.append(result['evaluation_scores'][metric])
if metric_scores:
metric_averages[metric] = {
'mean': statistics.mean(metric_scores),
'std': statistics.stdev(metric_scores) if len(metric_scores) > 1 else 0.0,
'min': min(metric_scores),
'max': max(metric_scores)
}
return {
'sample_size': sample_size,
'success_rate': success_rate,
'avg_execution_time': avg_execution_time,
'execution_time_std': statistics.stdev(execution_times) if len(execution_times) > 1 else 0.0,
'metric_averages': metric_averages
}
def _calculate_statistical_significance(self,
control_results: List[Dict],
treatment_results: List[Dict],
metrics: List[str]) -> Dict:
"""Calculate statistical significance"""
significance_results = {}
# Significance test for success rate
significance_results['success_rate'] = self._two_proportion_z_test(
control_results, treatment_results
)
# Significance test for each metric
for metric in metrics:
if metric != 'success_rate':
significance_results[metric] = self._two_sample_t_test(
control_results, treatment_results, metric
)
return significance_results
def _two_proportion_z_test(self, control_results: List[Dict], treatment_results: List[Dict]) -> Dict:
"""Two-proportion z-test"""
if not control_results or not treatment_results:
return {'test': 'two_proportion_z', 'p_value': 1.0, 'significant': False}
# Calculate success rates
n1 = len(control_results)
n2 = len(treatment_results)
x1 = sum(1 for r in control_results if r['success'])
x2 = sum(1 for r in treatment_results if r['success'])
p1 = x1 / n1
p2 = x2 / n2
# Pooled proportion
p = (x1 + x2) / (n1 + n2)
# Standard error
se = (p * (1 - p) * (1/n1 + 1/n2)) ** 0.5
if se == 0:
return {'test': 'two_proportion_z', 'p_value': 1.0, 'significant': False}
# Z-statistic
z = (p2 - p1) / se
# Simplified p-value calculation (use more accurate methods in practice)
p_value = 2 * (1 - abs(z) / 2) # Very simplified approximation
p_value = max(0.0, min(1.0, p_value))
return {
'test': 'two_proportion_z',
'z_statistic': z,
'p_value': p_value,
'significant': p_value < 0.05,
'control_rate': p1,
'treatment_rate': p2,
'effect_size': p2 - p1
}
def _two_sample_t_test(self, control_results: List[Dict], treatment_results: List[Dict], metric: str) -> Dict:
"""Two-sample t-test"""
# Extract metric data
control_scores = []
treatment_scores = []
for result in control_results:
if result['success'] and metric in result['evaluation_scores']:
control_scores.append(result['evaluation_scores'][metric])
for result in treatment_results:
if result['success'] and metric in result['evaluation_scores']:
treatment_scores.append(result['evaluation_scores'][metric])
if len(control_scores) < 2 or len(treatment_scores) < 2:
return {'test': 'two_sample_t', 'p_value': 1.0, 'significant': False}
# Calculate statistics
mean1 = statistics.mean(control_scores)
mean2 = statistics.mean(treatment_scores)
std1 = statistics.stdev(control_scores)
std2 = statistics.stdev(treatment_scores)
n1 = len(control_scores)
n2 = len(treatment_scores)
# Pooled standard deviation
pooled_std = ((n1 - 1) * std1**2 + (n2 - 1) * std2**2) / (n1 + n2 - 2)
pooled_std = pooled_std ** 0.5
if pooled_std == 0:
return {'test': 'two_sample_t', 'p_value': 1.0, 'significant': False}
# t-statistic
t = (mean2 - mean1) / (pooled_std * (1/n1 + 1/n2)**0.5)
# Degrees of freedom
df = n1 + n2 - 2
# Simplified p-value calculation
p_value = 2 * (1 - abs(t) / (df**0.5 + 2)) # Very simplified approximation
p_value = max(0.0, min(1.0, p_value))
return {
'test': 'two_sample_t',
't_statistic': t,
'degrees_of_freedom': df,
'p_value': p_value,
'significant': p_value < 0.05,
'control_mean': mean1,
'treatment_mean': mean2,
'effect_size': mean2 - mean1
}
def _generate_recommendations(self,
control_stats: Dict,
treatment_stats: Dict,
significance_results: Dict) -> List[str]:
"""Generate recommendations"""
recommendations = []
# Check sample size
if control_stats['sample_size'] < 30 or treatment_stats['sample_size'] < 30:
recommendations.append("⚠️ Sample size is small, consider collecting more data for higher statistical reliability")
# Check success rate
if 'success_rate' in significance_results:
sig_result = significance_results['success_rate']
if sig_result['significant']:
if sig_result['effect_size'] > 0:
recommendations.append("✅ The treatment group's success rate is significantly higher than the control group's, recommend adopting the treatment group's solution")
else:
recommendations.append("❌ The treatment group's success rate is significantly lower than the control group's, recommend sticking with the control group's solution")
else:
recommendations.append("➖ No significant difference in success rates between the treatment and control groups")
# Check execution time
if treatment_stats['avg_execution_time'] > control_stats['avg_execution_time'] * 1.2:
recommendations.append("⏱️ The treatment group's execution time is noticeably longer, consider the performance cost")
elif treatment_stats['avg_execution_time'] < control_stats['avg_execution_time'] * 0.8:
recommendations.append("🚀 The treatment group's execution time is shorter, offering a performance advantage")
# Check other metrics
for metric, sig_result in significance_results.items():
if metric != 'success_rate' and sig_result.get('significant', False):
effect_size = sig_result.get('effect_size', 0)
if effect_size > 0:
recommendations.append(f"📈 The treatment group performed significantly better on the {metric} metric")
else:
recommendations.append(f"📉 The control group performed significantly better on the {metric} metric")
if not recommendations:
recommendations.append("🤔 No clear differences in experimental results, consider extending the test duration or redesigning the experiment")
return recommendations
def generate_report(self, experiment_id: str) -> str:
"""Generate an experiment report"""
summary = self.get_experiment_summary(experiment_id)
if not summary:
return f"Experiment {experiment_id} does not exist or has no data"
report = []
report.append(f"📊 A/B Test Experiment Report")
report.append("=" * 60)
report.append(f"Experiment Name: {summary['experiment_name']}")
report.append(f"Experiment ID: {summary['experiment_id']}")
report.append(f"Test Duration: {summary['duration']}")
report.append(f"Sample Size: Control {summary['control_count']}, Treatment {summary['treatment_count']}")
report.append(f"\n📈 Control Group Performance:")
control_stats = summary['control_stats']
report.append(f" Success Rate: {control_stats['success_rate']:.1%}")
report.append(f" Average Execution Time: {control_stats['avg_execution_time']:.3f}s")
report.append(f"\n📈 Treatment Group Performance:")
treatment_stats = summary['treatment_stats']
report.append(f" Success Rate: {treatment_stats['success_rate']:.1%}")
report.append(f" Average Execution Time: {treatment_stats['avg_execution_time']:.3f}s")
report.append(f"\n🔬 Statistical Significance:")
for metric, sig_result in summary['significance_results'].items():
significance = "Significant" if sig_result.get('significant', False) else "Not Significant"
p_value = sig_result.get('p_value', 1.0)
report.append(f" {metric}: {significance} (p={p_value:.3f})")
report.append(f"\n💡 Recommendations:")
for rec in summary['recommendations']:
report.append(f" {rec}")
return "\n".join(report)
# Usage example
def demonstrate_ab_testing():
"""Demonstrate the A/B testing framework"""
# Create mock programs
class ControlProgram:
def __call__(self, **kwargs):
# Simulate a simpler answer
time.sleep(0.1) # Simulate execution time
return dspy.Prediction(answer="Basic answer")
class TreatmentProgram:
def __call__(self, **kwargs):
# Simulate a more complex but better answer
time.sleep(0.15) # Slightly longer execution time
if random.random() > 0.1: # 90% success rate
return dspy.Prediction(answer="Detailed and accurate answer")
else:
raise Exception("Processing failed")
# Create test data
test_examples = [
dspy.Example(question=f"Question {i}", answer="Standard answer").with_inputs('question')
for i in range(50)
]
# Initialize A/B testing framework and evaluator
ab_test = ABTestFramework()
evaluator = DSPyEvaluationSystem()
# Create an experiment
experiment_id = ab_test.create_experiment(
experiment_name="Answer Quality Improvement Test",
control_program=ControlProgram(),
treatment_program=TreatmentProgram(),
traffic_split=0.5,
success_metrics=['exact_match', 'semantic_similarity'],
minimum_sample_size=30
)
# Run a batch test
batch_results = ab_test.run_batch_test(experiment_id, test_examples, evaluator)
# Generate a report
report = ab_test.generate_report(experiment_id)
print(report)
return batch_results, report
# demo_ab_results = demonstrate_ab_testing()
Best Practices
1. Data Quality Assurance
def data_quality_guidelines():
"""Data quality assurance guidelines"""
guidelines = {
'Data Collection': [
'Ensure the reliability and diversity of data sources',
'Establish standardized data collection processes',
'Record the context and metadata of data collection'
],
'Data Cleaning': [
'Identify and handle duplicate data',
'Check data integrity and consistency',
'Handle outliers and noisy data'
],
'Data Annotation': [
'Establish clear annotation guidelines',
'Implement multi-person annotation and quality checks',
'Regularly evaluate annotation quality'
],
'Data Splitting': [
'Ensure the representativeness of training, validation, and test sets',
'Avoid data leakage',
'Consider the specifics of time-series data'
]
}
return guidelines
class DataQualityChecker:
"""Data Quality Checker"""
def __init__(self):
self.quality_checks = [
self.check_duplicates,
self.check_completeness,
self.check_consistency,
self.check_distribution
]
def run_quality_checks(self, examples: List[dspy.Example]) -> Dict:
"""Run quality checks"""
results = {}
for check in self.quality_checks:
check_name = check.__name__
try:
result = check(examples)
results[check_name] = result
except Exception as e:
results[check_name] = {'error': str(e)}
return results
def check_duplicates(self, examples: List[dspy.Example]) -> Dict:
"""Check for duplicate data"""
# TODO: Implement duplicate checking logic
pass
def check_completeness(self, examples: List[dspy.Example]) -> Dict:
"""Check data completeness"""
# TODO: Implement completeness checking
pass
def check_consistency(self, examples: List[dspy.Example]) -> Dict:
"""Check data consistency"""
# TODO: Implement consistency checking
pass
def check_distribution(self, examples: List[dspy.Example]) -> Dict:
"""Check data distribution"""
# TODO: Implement distribution checking
pass
2. Evaluation Best Practices
def evaluation_best_practices():
"""Evaluation best practices guidelines"""
practices = {
'Metric Selection': [
'Select metrics that align with business goals',
'Use multiple complementary evaluation metrics',
'Consider task-specific evaluation criteria'
],
'Test Set Design': [
'Ensure the representativeness of the test set',
'Include edge cases and difficult samples',
'Regularly update the test set to reflect data drift'
],
'Statistical Analysis': [
'Report confidence intervals, not just point estimates',
'Conduct significance tests',
'Consider the problem of multiple comparisons'
],
'Result Interpretation': [
'Provide error analysis and case studies',
'Consider performance across different user segments',
'Analyze the limitations of the model'
]
}
return practices
class EvaluationReportGenerator:
"""Evaluation Report Generator"""
def generate_comprehensive_report(self,
evaluation_results: Dict,
model_info: Dict = None,
dataset_info: Dict = None) -> str:
"""Generate a comprehensive evaluation report"""
report_sections = [
self._generate_executive_summary(evaluation_results),
self._generate_model_info_section(model_info),
self._generate_dataset_info_section(dataset_info),
self._generate_performance_metrics_section(evaluation_results),
self._generate_error_analysis_section(evaluation_results),
self._generate_recommendations_section(evaluation_results)
]
return "\n\n".join(filter(None, report_sections))
def _generate_executive_summary(self, results: Dict) -> str:
"""Generate an executive summary"""
# TODO: Implement summary generation
pass
def _generate_model_info_section(self, model_info: Dict) -> str:
"""Generate the model information section"""
# TODO: Implement model info generation
pass
# ... other report generation methods
By completing this chapter, you should have mastered the complete methods of data processing and evaluation in DSPy. Good data processing and a scientific evaluation system are the foundation for building high-quality AI systems. In practice, it is important to choose appropriate evaluation metrics based on specific business needs and to establish a comprehensive quality assurance mechanism.