Chapter 05: Retrieval-Augmented Generation (RAG) with DSPy

Haiyue
49min

Chapter 05: Retrieval-Augmented Generation (RAG) with DSPy

Learning Objectives
  • Implement Retrieval-Augmented Generation in DSPy
  • Configure and use vector databases
  • Design retrieval strategies and evaluation metrics
  • Build multi-hop retrieval systems
  • Optimize the synergy between retrieval and generation

Key Concepts

1. RAG System Architecture Fundamentals

Retrieval-Augmented Generation (RAG) is a technique that combines information retrieval with text generation, enabling language models to access external knowledge bases.

Core Components of RAG Systems

import dspy
import numpy as np
from typing import List, Dict, Any, Optional
import json
from pathlib import Path

class RAGSystem:
    """Complete RAG system architecture"""

    def __init__(self,
                 vector_store=None,
                 embedding_model=None,
                 retrieval_k: int = 5,
                 rerank: bool = True):

        self.vector_store = vector_store
        self.embedding_model = embedding_model
        self.retrieval_k = retrieval_k
        self.rerank = rerank

        # Initialize DSPy components
        self.setup_dspy_modules()

    def setup_dspy_modules(self):
        """Setup DSPy modules"""

        # Query understanding and rewriting
        self.query_rewriter = dspy.ChainOfThought(
            "original_query -> reasoning, rewritten_query",
            instructions="Analyze the intent of the user query and rewrite it for better retrieval. Consider synonyms, related concepts, and search keywords."
        )

        # Retrieval relevance judgment
        self.relevance_judge = dspy.ChainOfThought(
            "query, document -> reasoning, relevance_score",
            instructions="Judge the relevance of the document to the query, provide a relevance score from 1-10 with detailed reasoning."
        )

        # Answer generation
        self.answer_generator = dspy.ChainOfThought(
            "query, retrieved_contexts -> reasoning, answer",
            instructions="Based on the retrieved relevant documents, generate an accurate and complete answer. Ensure the answer is fact-based."
        )

        # Answer verification
        self.answer_verifier = dspy.ChainOfThought(
            "query, answer, contexts -> reasoning, verification_result",
            instructions="Verify whether the generated answer is consistent with the retrieved documents and fully answers the question."
        )

    def process_query(self, query: str) -> Dict[str, Any]:
        """Complete query processing pipeline"""

        # 1. Query rewriting
        rewritten = self.query_rewriter(original_query=query)

        # 2. Retrieve documents
        retrieved_docs = self.retrieve_documents(rewritten.rewritten_query)

        # 3. Rerank (if enabled)
        if self.rerank:
            retrieved_docs = self.rerank_documents(query, retrieved_docs)

        # 4. Generate answer
        contexts = self.prepare_contexts(retrieved_docs)
        answer_result = self.answer_generator(
            query=query,
            retrieved_contexts=contexts
        )

        # 5. Verify answer
        verification = self.answer_verifier(
            query=query,
            answer=answer_result.answer,
            contexts=contexts
        )

        return {
            'original_query': query,
            'rewritten_query': rewritten.rewritten_query,
            'retrieved_documents': retrieved_docs,
            'answer': answer_result.answer,
            'reasoning': answer_result.reasoning,
            'verification': verification.verification_result,
            'confidence_score': self.calculate_confidence(verification)
        }

    def retrieve_documents(self, query: str) -> List[Dict]:
        """Retrieve relevant documents"""
        if not self.vector_store:
            # Mock retrieval results
            return self.mock_retrieval(query)

        # Actual vector retrieval implementation
        query_embedding = self.embedding_model.encode(query)
        results = self.vector_store.similarity_search(
            query_embedding,
            k=self.retrieval_k
        )

        return [
            {
                'content': doc.page_content,
                'metadata': doc.metadata,
                'score': score
            }
            for doc, score in results
        ]

    def rerank_documents(self, query: str, documents: List[Dict]) -> List[Dict]:
        """Rerank documents"""
        scored_docs = []

        for doc in documents:
            # Use DSPy module to calculate relevance score
            relevance_result = self.relevance_judge(
                query=query,
                document=doc['content']
            )

            try:
                # Extract numeric score
                import re
                score_match = re.search(r'(\d+(?:\.\d+)?)', relevance_result.relevance_score)
                score = float(score_match.group(1)) if score_match else 5.0
            except:
                score = 5.0

            doc['rerank_score'] = score
            doc['rerank_reasoning'] = relevance_result.reasoning
            scored_docs.append(doc)

        # Sort by rerank score
        return sorted(scored_docs, key=lambda x: x['rerank_score'], reverse=True)

    def prepare_contexts(self, documents: List[Dict]) -> str:
        """Prepare context string"""
        contexts = []
        for i, doc in enumerate(documents, 1):
            context = f"Document {i}:\n{doc['content']}\n"
            if 'metadata' in doc and doc['metadata']:
                context += f"Source: {doc['metadata'].get('source', 'Unknown')}\n"
            contexts.append(context)

        return "\n".join(contexts)

    def calculate_confidence(self, verification_result) -> float:
        """Calculate confidence score"""
        # Simple confidence calculation
        if "high confidence" in verification_result.verification_result or "completely correct" in verification_result.verification_result:
            return 0.9
        elif "medium confidence" in verification_result.verification_result or "mostly correct" in verification_result.verification_result:
            return 0.7
        elif "low confidence" in verification_result.verification_result or "partially correct" in verification_result.verification_result:
            return 0.5
        else:
            return 0.3

    def mock_retrieval(self, query: str) -> List[Dict]:
        """Mock retrieval results for demonstration"""
        mock_docs = [
            {
                'content': 'Artificial intelligence is a branch of computer science dedicated to creating systems capable of performing tasks that typically require human intelligence.',
                'metadata': {'source': 'AI_textbook.pdf', 'page': 1},
                'score': 0.95
            },
            {
                'content': 'Machine learning is a subfield of artificial intelligence that focuses on developing algorithms that enable computers to learn from and improve using data.',
                'metadata': {'source': 'ML_guide.pdf', 'page': 12},
                'score': 0.89
            },
            {
                'content': 'Deep learning uses multi-layer neural networks to simulate how the human brain works, achieving major breakthroughs in image recognition and natural language processing.',
                'metadata': {'source': 'DL_research.pdf', 'page': 5},
                'score': 0.87
            }
        ]
        return mock_docs

# Usage example
def demonstrate_rag_system():
    """Demonstrate RAG system usage"""

    # Initialize RAG system
    rag = RAGSystem(retrieval_k=3, rerank=True)

    # Process query
    query = "What is artificial intelligence?"
    result = rag.process_query(query)

    print("RAG System Results:")
    print(f"Original Query: {result['original_query']}")
    print(f"Rewritten Query: {result['rewritten_query']}")
    print(f"Answer: {result['answer']}")
    print(f"Confidence: {result['confidence_score']:.2f}")
    print(f"Verification Result: {result['verification']}")

    return result

# result = demonstrate_rag_system()

2. Vector Database Integration

Vector databases are core components of RAG systems, responsible for storing and retrieving document embeddings.

class VectorStoreManager:
    """Vector database manager"""

    def __init__(self,
                 store_type='faiss',  # 'faiss', 'chroma', 'pinecone'
                 embedding_dim=768,
                 distance_metric='cosine'):

        self.store_type = store_type
        self.embedding_dim = embedding_dim
        self.distance_metric = distance_metric
        self.vector_store = None
        self.documents = []
        self.embeddings = []

        self.initialize_store()

    def initialize_store(self):
        """Initialize vector store"""
        if self.store_type == 'faiss':
            self.initialize_faiss()
        elif self.store_type == 'chroma':
            self.initialize_chroma()
        else:
            raise ValueError(f"Unsupported store type: {self.store_type}")

    def initialize_faiss(self):
        """Initialize FAISS vector store"""
        try:
            import faiss

            # Create FAISS index
            if self.distance_metric == 'cosine':
                # Cosine similarity uses inner product, requires normalization
                self.index = faiss.IndexFlatIP(self.embedding_dim)
            else:
                # L2 distance
                self.index = faiss.IndexFlatL2(self.embedding_dim)

            print(f"FAISS index initialized, dimensions: {self.embedding_dim}")

        except ImportError:
            print("FAISS not installed, please run: pip install faiss-cpu")
            raise

    def initialize_chroma(self):
        """Initialize Chroma vector store"""
        try:
            import chromadb

            self.chroma_client = chromadb.Client()
            self.collection = self.chroma_client.create_collection(
                name="documents",
                metadata={"hnsw:space": self.distance_metric}
            )

            print("Chroma database initialized")

        except ImportError:
            print("Chroma not installed, please run: pip install chromadb")
            raise

    def add_documents(self, documents: List[str], metadatas: List[Dict] = None):
        """Add documents to vector store"""

        # Generate document embeddings
        embeddings = self.generate_embeddings(documents)

        if self.store_type == 'faiss':
            self.add_to_faiss(documents, embeddings, metadatas)
        elif self.store_type == 'chroma':
            self.add_to_chroma(documents, embeddings, metadatas)

    def generate_embeddings(self, texts: List[str]) -> np.ndarray:
        """Generate text embeddings"""
        # Using simple mock embeddings here, in production use real embedding models
        embeddings = []
        for text in texts:
            # Mock embedding generation
            embedding = np.random.rand(self.embedding_dim).astype('float32')

            # Normalize if using cosine similarity
            if self.distance_metric == 'cosine':
                embedding = embedding / np.linalg.norm(embedding)

            embeddings.append(embedding)

        return np.array(embeddings)

    def add_to_faiss(self, documents: List[str], embeddings: np.ndarray, metadatas: List[Dict]):
        """Add documents to FAISS"""
        # Add to index
        self.index.add(embeddings)

        # Save documents and metadata
        for i, doc in enumerate(documents):
            self.documents.append(doc)
            if metadatas and i < len(metadatas):
                self.documents[-1] = {
                    'content': doc,
                    'metadata': metadatas[i],
                    'id': len(self.documents) - 1
                }

        print(f"Added {len(documents)} documents to FAISS")

    def add_to_chroma(self, documents: List[str], embeddings: np.ndarray, metadatas: List[Dict]):
        """Add documents to Chroma"""
        ids = [f"doc_{i}" for i in range(len(self.documents), len(self.documents) + len(documents))]

        self.collection.add(
            embeddings=embeddings.tolist(),
            documents=documents,
            metadatas=metadatas or [{}] * len(documents),
            ids=ids
        )

        self.documents.extend(documents)
        print(f"Added {len(documents)} documents to Chroma")

    def similarity_search(self, query: str, k: int = 5) -> List[Dict]:
        """Similarity search"""
        query_embedding = self.generate_embeddings([query])[0]

        if self.store_type == 'faiss':
            return self.search_faiss(query_embedding, k)
        elif self.store_type == 'chroma':
            return self.search_chroma(query_embedding, k)

    def search_faiss(self, query_embedding: np.ndarray, k: int) -> List[Dict]:
        """Search in FAISS"""
        scores, indices = self.index.search(query_embedding.reshape(1, -1), k)

        results = []
        for i, (score, idx) in enumerate(zip(scores[0], indices[0])):
            if idx < len(self.documents):
                doc = self.documents[idx]
                if isinstance(doc, dict):
                    results.append({
                        'content': doc['content'],
                        'metadata': doc['metadata'],
                        'score': float(score),
                        'rank': i + 1
                    })
                else:
                    results.append({
                        'content': doc,
                        'metadata': {},
                        'score': float(score),
                        'rank': i + 1
                    })

        return results

    def search_chroma(self, query_embedding: np.ndarray, k: int) -> List[Dict]:
        """Search in Chroma"""
        results = self.collection.query(
            query_embeddings=[query_embedding.tolist()],
            n_results=k
        )

        formatted_results = []
        for i in range(len(results['documents'][0])):
            formatted_results.append({
                'content': results['documents'][0][i],
                'metadata': results['metadatas'][0][i] if results['metadatas'][0] else {},
                'score': 1.0 - results['distances'][0][i],  # Convert distance to similarity
                'rank': i + 1
            })

        return formatted_results

    def get_statistics(self) -> Dict:
        """Get database statistics"""
        stats = {
            'total_documents': len(self.documents),
            'store_type': self.store_type,
            'embedding_dim': self.embedding_dim,
            'distance_metric': self.distance_metric
        }

        if self.store_type == 'faiss':
            stats['index_size'] = self.index.ntotal

        return stats

# Advanced retrieval strategies
class AdvancedRetriever:
    """Advanced retriever supporting multiple retrieval strategies"""

    def __init__(self, vector_store: VectorStoreManager):
        self.vector_store = vector_store
        self.retrieval_strategies = {
            'semantic': self.semantic_search,
            'hybrid': self.hybrid_search,
            'multi_query': self.multi_query_search,
            'contextual': self.contextual_search
        }

    def semantic_search(self, query: str, k: int = 5) -> List[Dict]:
        """Semantic search"""
        return self.vector_store.similarity_search(query, k)

    def hybrid_search(self, query: str, k: int = 5, alpha: float = 0.7) -> List[Dict]:
        """Hybrid search (semantic + keyword)"""
        # Semantic search results
        semantic_results = self.semantic_search(query, k * 2)

        # Keyword search results
        keyword_results = self.keyword_search(query, k * 2)

        # Hybrid ranking
        combined_results = self.combine_search_results(
            semantic_results, keyword_results, alpha
        )

        return combined_results[:k]

    def keyword_search(self, query: str, k: int) -> List[Dict]:
        """Keyword search (simplified BM25)"""
        query_terms = query.lower().split()
        scored_docs = []

        for i, doc_data in enumerate(self.vector_store.documents):
            if isinstance(doc_data, dict):
                doc_text = doc_data['content'].lower()
                metadata = doc_data.get('metadata', {})
            else:
                doc_text = doc_data.lower()
                metadata = {}

            # Simplified BM25 scoring
            score = 0
            for term in query_terms:
                term_freq = doc_text.count(term)
                if term_freq > 0:
                    # Simplified BM25 formula
                    score += term_freq / (term_freq + 1.2) * 2.2

            if score > 0:
                scored_docs.append({
                    'content': doc_data['content'] if isinstance(doc_data, dict) else doc_data,
                    'metadata': metadata,
                    'score': score,
                    'rank': 0  # Will be set after sorting
                })

        # Sort by score
        scored_docs.sort(key=lambda x: x['score'], reverse=True)

        # Set ranks
        for i, doc in enumerate(scored_docs[:k]):
            doc['rank'] = i + 1

        return scored_docs[:k]

    def combine_search_results(self, semantic_results: List[Dict],
                              keyword_results: List[Dict],
                              alpha: float) -> List[Dict]:
        """Combine search results"""
        # Create document to score mapping
        doc_scores = {}

        # Semantic search results
        for result in semantic_results:
            doc_id = hash(result['content'])
            doc_scores[doc_id] = {
                'doc': result,
                'semantic_score': result['score'] * alpha,
                'keyword_score': 0
            }

        # Keyword search results
        for result in keyword_results:
            doc_id = hash(result['content'])
            if doc_id in doc_scores:
                doc_scores[doc_id]['keyword_score'] = result['score'] * (1 - alpha)
            else:
                doc_scores[doc_id] = {
                    'doc': result,
                    'semantic_score': 0,
                    'keyword_score': result['score'] * (1 - alpha)
                }

        # Calculate final scores
        final_results = []
        for doc_id, scores in doc_scores.items():
            final_score = scores['semantic_score'] + scores['keyword_score']
            doc = scores['doc'].copy()
            doc['score'] = final_score
            doc['semantic_score'] = scores['semantic_score']
            doc['keyword_score'] = scores['keyword_score']
            final_results.append(doc)

        # Sort
        final_results.sort(key=lambda x: x['score'], reverse=True)

        # Update ranks
        for i, doc in enumerate(final_results):
            doc['rank'] = i + 1

        return final_results

    def multi_query_search(self, query: str, k: int = 5) -> List[Dict]:
        """Multi-query search"""
        # Generate query variants
        query_variants = self.generate_query_variants(query)

        all_results = []
        for variant in query_variants:
            results = self.semantic_search(variant, k)
            all_results.extend(results)

        # Deduplicate and re-rank
        unique_results = self.deduplicate_results(all_results)
        return unique_results[:k]

    def generate_query_variants(self, query: str) -> List[str]:
        """Generate query variants"""
        # Simplified query variant generation
        variants = [query]

        # Add synonym substitutions (using simple rules here)
        synonyms = {
            'AI': ['artificial intelligence', 'machine intelligence'],
            'machine learning': ['ML', 'automated learning'],
            'deep learning': ['DL', 'neural network learning']
        }

        for original, syns in synonyms.items():
            if original in query:
                for syn in syns:
                    variants.append(query.replace(original, syn))

        return variants[:3]  # Limit number of variants

    def contextual_search(self, query: str, context: str = "", k: int = 5) -> List[Dict]:
        """Context-aware search"""
        # Combine query and context
        if context:
            enhanced_query = f"{context} {query}"
        else:
            enhanced_query = query

        return self.semantic_search(enhanced_query, k)

    def deduplicate_results(self, results: List[Dict]) -> List[Dict]:
        """Deduplicate search results"""
        seen_contents = set()
        unique_results = []

        for result in results:
            content_hash = hash(result['content'])
            if content_hash not in seen_contents:
                seen_contents.add(content_hash)
                unique_results.append(result)

        # Re-sort by score
        unique_results.sort(key=lambda x: x['score'], reverse=True)

        return unique_results

# Usage example
def demonstrate_vector_store():
    """Demonstrate vector store usage"""

    # Initialize vector store
    vector_store = VectorStoreManager(store_type='faiss', embedding_dim=768)

    # Prepare documents
    documents = [
        "Artificial intelligence is technology that simulates human intelligence",
        "Machine learning is an important branch of artificial intelligence",
        "Deep learning uses neural networks for learning",
        "Natural language processing enables machines to understand human language",
        "Computer vision enables machines to see and understand images"
    ]

    metadatas = [
        {"source": "AI_basics.pdf", "chapter": 1},
        {"source": "ML_guide.pdf", "chapter": 2},
        {"source": "DL_tutorial.pdf", "chapter": 1},
        {"source": "NLP_handbook.pdf", "chapter": 3},
        {"source": "CV_principles.pdf", "chapter": 1}
    ]

    # Add documents
    vector_store.add_documents(documents, metadatas)

    # Initialize advanced retriever
    retriever = AdvancedRetriever(vector_store)

    # Test different retrieval strategies
    query = "What is machine learning?"

    print("Semantic Search Results:")
    semantic_results = retriever.semantic_search(query, k=3)
    for result in semantic_results:
        print(f"  Score: {result['score']:.3f} - {result['content']}")

    print("\nHybrid Search Results:")
    hybrid_results = retriever.hybrid_search(query, k=3)
    for result in hybrid_results:
        print(f"  Score: {result['score']:.3f} - {result['content']}")

    return vector_store, retriever

# vector_store, retriever = demonstrate_vector_store()

3. Multi-Hop Retrieval System

Multi-hop retrieval can answer complex questions through multiple rounds of retrieval, particularly suitable for scenarios requiring integration of multiple information sources.

class MultiHopRAG(dspy.Module):
    """Multi-hop retrieval system"""

    def __init__(self,
                 retriever,
                 max_hops: int = 3,
                 min_confidence: float = 0.7):

        super().__init__()
        self.retriever = retriever
        self.max_hops = max_hops
        self.min_confidence = min_confidence

        # DSPy modules
        self.question_decomposer = dspy.ChainOfThought(
            "complex_question -> reasoning, sub_questions",
            instructions="Decompose complex questions into multiple simple sub-questions, each should be independently answerable."
        )

        self.answer_synthesizer = dspy.ChainOfThought(
            "original_question, sub_answers -> reasoning, final_answer",
            instructions="Based on the answers to sub-questions, synthesize a complete answer to the original question."
        )

        self.hop_planner = dspy.ChainOfThought(
            "question, current_context -> reasoning, next_query",
            instructions="Based on current context and question, decide the next retrieval query."
        )

        self.termination_judge = dspy.ChainOfThought(
            "question, gathered_info -> reasoning, should_continue",
            instructions="Judge whether enough information has been gathered to answer the question, output 'continue' or 'stop'."
        )

    def forward(self, question: str):
        """Main multi-hop retrieval pipeline"""

        # Decompose question
        decomposition = self.question_decomposer(complex_question=question)
        sub_questions = self.parse_sub_questions(decomposition.sub_questions)

        print(f"Question decomposition result: {len(sub_questions)} sub-questions")
        for i, sq in enumerate(sub_questions, 1):
            print(f"  {i}. {sq}")

        # Multi-hop retrieval
        hop_results = []
        current_context = ""

        for hop in range(self.max_hops):
            print(f"\nHop {hop + 1} retrieval")

            # Decide next query
            if hop == 0:
                next_query = question
            else:
                hop_plan = self.hop_planner(
                    question=question,
                    current_context=current_context
                )
                next_query = hop_plan.next_query

            print(f"Query: {next_query}")

            # Execute retrieval
            retrieved_docs = self.retriever.semantic_search(next_query, k=5)

            # Update context
            new_context = self.format_retrieved_docs(retrieved_docs)
            current_context = self.update_context(current_context, new_context)

            hop_results.append({
                'hop': hop + 1,
                'query': next_query,
                'retrieved_docs': retrieved_docs,
                'context': new_context
            })

            # Judge whether to continue
            termination = self.termination_judge(
                question=question,
                gathered_info=current_context
            )

            print(f"Termination judgment: {termination.should_continue}")

            if "stop" in termination.should_continue.lower():
                print("Enough information gathered, stopping retrieval")
                break

        # Synthesize answer
        final_answer = self.synthesize_answer(question, hop_results, current_context)

        return dspy.Prediction(
            question=question,
            sub_questions=sub_questions,
            hop_results=hop_results,
            final_context=current_context,
            answer=final_answer.answer,
            reasoning=final_answer.reasoning
        )

    def parse_sub_questions(self, sub_questions_text: str) -> List[str]:
        """Parse sub-questions"""
        import re

        # Use regex to extract sub-questions
        questions = re.findall(r'\d+[.\)]?\s*(.+?)(?=\d+[.\)]|$)', sub_questions_text, re.MULTILINE)

        # Clean and filter
        cleaned_questions = []
        for q in questions:
            q = q.strip().rstrip('?') + '?'
            if len(q) > 5:  # Filter out too short questions
                cleaned_questions.append(q)

        return cleaned_questions[:5]  # Limit number of sub-questions

    def format_retrieved_docs(self, docs: List[Dict]) -> str:
        """Format retrieved documents"""
        formatted_docs = []

        for i, doc in enumerate(docs, 1):
            doc_text = f"Document {i}: {doc['content']}\n"
            if doc.get('metadata'):
                doc_text += f"Source: {doc['metadata'].get('source', 'Unknown')}\n"
            formatted_docs.append(doc_text)

        return "\n".join(formatted_docs)

    def update_context(self, current_context: str, new_context: str) -> str:
        """Update context, avoiding excessive length"""
        combined = current_context + "\n" + new_context if current_context else new_context

        # Simple length control
        max_length = 2000
        if len(combined) > max_length:
            # Keep the latest information
            combined = "..." + combined[-max_length:]

        return combined

    def synthesize_answer(self, question: str, hop_results: List[Dict], context: str):
        """Synthesize answer"""

        # Collect all sub-answers
        sub_answers = []
        for hop_result in hop_results:
            sub_answers.append(f"Hop {hop_result['hop']}: {hop_result['query']}")

        # Generate final answer
        synthesis_result = self.answer_synthesizer(
            original_question=question,
            sub_answers=context
        )

        return synthesis_result

# Adaptive RAG system
class AdaptiveRAG(dspy.Module):
    """Adaptive RAG system that selects the best strategy based on question type"""

    def __init__(self, retriever):
        super().__init__()
        self.retriever = retriever

        # Different RAG strategies
        self.simple_rag = self.create_simple_rag()
        self.multi_hop_rag = MultiHopRAG(retriever)

        # Question classifier
        self.question_classifier = dspy.ChainOfThought(
            "question -> reasoning, complexity, strategy",
            instructions="""Analyze question complexity and type:
            - Simple questions: Can be answered directly with single retrieval
            - Complex questions: Require multi-step reasoning or multiple retrievals
            - Strategy selection: 'simple' or 'multi_hop'"""
        )

    def create_simple_rag(self):
        """Create simple RAG module"""
        class SimpleRAG(dspy.Module):
            def __init__(self, retriever):
                super().__init__()
                self.retriever = retriever
                self.generator = dspy.ChainOfThought(
                    "question, context -> reasoning, answer"
                )

            def forward(self, question):
                docs = self.retriever.semantic_search(question, k=5)
                context = "\n".join([doc['content'] for doc in docs])
                result = self.generator(question=question, context=context)
                return dspy.Prediction(
                    answer=result.answer,
                    reasoning=result.reasoning,
                    retrieved_docs=docs
                )

        return SimpleRAG(self.retriever)

    def forward(self, question: str):
        """Adaptively select RAG strategy"""

        # Classify question
        classification = self.question_classifier(question=question)

        print(f"Question analysis: {classification.complexity}")
        print(f"Selected strategy: {classification.strategy}")

        # Select strategy based on classification
        if "multi_hop" in classification.strategy.lower() or "complex" in classification.complexity.lower():
            result = self.multi_hop_rag(question)
        else:
            result = self.simple_rag(question)

        # Add strategy information
        result.strategy_used = classification.strategy
        result.complexity_analysis = classification.complexity

        return result

# Usage examples and tests
def test_multi_hop_rag():
    """Test multi-hop RAG system"""

    # Mock vector store
    class MockRetriever:
        def semantic_search(self, query, k=5):
            # Mock different retrieval results for different queries
            mock_responses = {
                "history of artificial intelligence": [
                    {'content': 'The concept of artificial intelligence was first proposed by Alan Turing in 1950', 'score': 0.9, 'metadata': {'source': 'AI_history.pdf'}},
                    {'content': 'The 1956 Dartmouth conference marked the official birth of the AI field', 'score': 0.85, 'metadata': {'source': 'AI_timeline.pdf'}},
                ],
                "machine learning development": [
                    {'content': 'Machine learning began rapid development in the 1980s', 'score': 0.88, 'metadata': {'source': 'ML_development.pdf'}},
                    {'content': 'Deep learning achieved major breakthroughs in the 2010s', 'score': 0.92, 'metadata': {'source': 'DL_breakthrough.pdf'}},
                ],
                "default": [
                    {'content': f'Information related to "{query}"...', 'score': 0.7, 'metadata': {'source': 'general.pdf'}}
                ]
            }

            # Return appropriate results based on query
            for key, docs in mock_responses.items():
                if key in query.lower():
                    return docs

            return mock_responses["default"]

    # Initialize multi-hop RAG
    mock_retriever = MockRetriever()
    multi_hop = MultiHopRAG(mock_retriever, max_hops=2)

    # Test complex question
    complex_question = "What important stages has the historical development of artificial intelligence gone through, and what role has machine learning played in it?"

    print("Testing Multi-Hop RAG System")
    print(f"Question: {complex_question}")

    result = multi_hop(complex_question)

    print(f"\nFinal Answer: {result.answer}")
    print(f"\nRetrieved {len(result.hop_results)} hops")

    return result

# Execute test
# test_result = test_multi_hop_rag()

4. Retrieval Evaluation and Optimization

class RAGEvaluator:
    """RAG system evaluator"""

    def __init__(self):
        self.evaluation_metrics = {
            'retrieval_precision': self.calculate_retrieval_precision,
            'retrieval_recall': self.calculate_retrieval_recall,
            'answer_relevance': self.evaluate_answer_relevance,
            'answer_faithfulness': self.evaluate_answer_faithfulness,
            'answer_completeness': self.evaluate_answer_completeness
        }

    def evaluate_rag_system(self,
                           rag_system,
                           test_dataset: List[Dict],
                           ground_truth: List[Dict]) -> Dict:
        """Comprehensively evaluate RAG system"""

        results = []

        for i, test_case in enumerate(test_dataset):
            print(f"Evaluating test case {i+1}/{len(test_dataset)}")

            # Get RAG system output
            rag_output = rag_system.process_query(test_case['question'])

            # Calculate metrics
            case_metrics = {}

            for metric_name, metric_func in self.evaluation_metrics.items():
                try:
                    score = metric_func(
                        test_case,
                        rag_output,
                        ground_truth[i] if i < len(ground_truth) else {}
                    )
                    case_metrics[metric_name] = score
                except Exception as e:
                    print(f"Error calculating metric {metric_name}: {e}")
                    case_metrics[metric_name] = 0.0

            results.append({
                'question': test_case['question'],
                'rag_output': rag_output,
                'metrics': case_metrics
            })

        # Calculate overall statistics
        overall_metrics = self.calculate_overall_metrics(results)

        return {
            'individual_results': results,
            'overall_metrics': overall_metrics,
            'summary': self.generate_evaluation_summary(overall_metrics)
        }

    def calculate_retrieval_precision(self, test_case: Dict, rag_output: Dict, ground_truth: Dict) -> float:
        """Calculate retrieval precision"""
        retrieved_docs = rag_output.get('retrieved_documents', [])
        relevant_docs = ground_truth.get('relevant_documents', [])

        if not retrieved_docs:
            return 0.0

        relevant_retrieved = 0
        for doc in retrieved_docs:
            if any(rel_doc in doc['content'] for rel_doc in relevant_docs):
                relevant_retrieved += 1

        return relevant_retrieved / len(retrieved_docs)

    def calculate_retrieval_recall(self, test_case: Dict, rag_output: Dict, ground_truth: Dict) -> float:
        """Calculate retrieval recall"""
        retrieved_docs = rag_output.get('retrieved_documents', [])
        relevant_docs = ground_truth.get('relevant_documents', [])

        if not relevant_docs:
            return 1.0

        found_relevant = 0
        for rel_doc in relevant_docs:
            if any(rel_doc in doc['content'] for doc in retrieved_docs):
                found_relevant += 1

        return found_relevant / len(relevant_docs)

    def evaluate_answer_relevance(self, test_case: Dict, rag_output: Dict, ground_truth: Dict) -> float:
        """Evaluate answer relevance"""
        # Use DSPy module to evaluate answer relevance
        relevance_evaluator = dspy.ChainOfThought(
            "question, answer -> reasoning, relevance_score",
            instructions="Evaluate the relevance of the answer to the question, provide a score from 0-1. 1 means completely relevant, 0 means completely irrelevant."
        )

        try:
            result = relevance_evaluator(
                question=test_case['question'],
                answer=rag_output['answer']
            )

            # Extract numeric score
            import re
            score_match = re.search(r'(\d*\.?\d+)', result.relevance_score)
            if score_match:
                score = float(score_match.group(1))
                return min(max(score, 0.0), 1.0)  # Ensure within 0-1 range

        except Exception as e:
            print(f"Relevance evaluation failed: {e}")

        return 0.5  # Default medium relevance

    def evaluate_answer_faithfulness(self, test_case: Dict, rag_output: Dict, ground_truth: Dict) -> float:
        """Evaluate answer faithfulness (whether based on retrieved content)"""
        faithfulness_evaluator = dspy.ChainOfThought(
            "answer, retrieved_contexts -> reasoning, faithfulness_score",
            instructions="Evaluate whether the answer is faithful to the retrieved contexts, provide a score from 0-1. 1 means completely faithful, 0 means completely divergent."
        )

        try:
            contexts = "\n".join([doc['content'] for doc in rag_output.get('retrieved_documents', [])])

            result = faithfulness_evaluator(
                answer=rag_output['answer'],
                retrieved_contexts=contexts
            )

            import re
            score_match = re.search(r'(\d*\.?\d+)', result.faithfulness_score)
            if score_match:
                score = float(score_match.group(1))
                return min(max(score, 0.0), 1.0)

        except Exception as e:
            print(f"Faithfulness evaluation failed: {e}")

        return 0.5

    def evaluate_answer_completeness(self, test_case: Dict, rag_output: Dict, ground_truth: Dict) -> float:
        """Evaluate answer completeness"""
        expected_answer = ground_truth.get('expected_answer', '')
        actual_answer = rag_output['answer']

        if not expected_answer:
            return 0.5  # Return medium score when no reference answer

        # Simple completeness measure: check if key concepts are mentioned
        expected_concepts = set(expected_answer.lower().split())
        actual_concepts = set(actual_answer.lower().split())

        if not expected_concepts:
            return 1.0

        covered_concepts = len(expected_concepts & actual_concepts)
        return covered_concepts / len(expected_concepts)

    def calculate_overall_metrics(self, results: List[Dict]) -> Dict:
        """Calculate overall metrics"""
        overall = {}

        # Calculate average for each metric
        for metric_name in self.evaluation_metrics.keys():
            scores = [r['metrics'][metric_name] for r in results if metric_name in r['metrics']]
            if scores:
                overall[metric_name] = {
                    'mean': sum(scores) / len(scores),
                    'min': min(scores),
                    'max': max(scores),
                    'std': self.calculate_std(scores)
                }

        # Calculate composite score
        if overall:
            composite_scores = []
            for result in results:
                metrics = result['metrics']
                if metrics:
                    # Weighted average (weights can be adjusted as needed)
                    weights = {
                        'retrieval_precision': 0.2,
                        'retrieval_recall': 0.2,
                        'answer_relevance': 0.3,
                        'answer_faithfulness': 0.2,
                        'answer_completeness': 0.1
                    }

                    weighted_score = sum(
                        metrics.get(metric, 0) * weight
                        for metric, weight in weights.items()
                    )
                    composite_scores.append(weighted_score)

            if composite_scores:
                overall['composite_score'] = {
                    'mean': sum(composite_scores) / len(composite_scores),
                    'min': min(composite_scores),
                    'max': max(composite_scores),
                    'std': self.calculate_std(composite_scores)
                }

        return overall

    def calculate_std(self, scores: List[float]) -> float:
        """Calculate standard deviation"""
        if len(scores) < 2:
            return 0.0

        mean = sum(scores) / len(scores)
        variance = sum((x - mean) ** 2 for x in scores) / (len(scores) - 1)
        return variance ** 0.5

    def generate_evaluation_summary(self, overall_metrics: Dict) -> str:
        """Generate evaluation summary"""
        summary = []
        summary.append("RAG System Evaluation Summary")
        summary.append("=" * 40)

        if 'composite_score' in overall_metrics:
            composite = overall_metrics['composite_score']
            summary.append(f"Composite Score: {composite['mean']:.3f}{composite['std']:.3f})")

        summary.append("\nDetailed Metrics:")
        for metric_name, stats in overall_metrics.items():
            if metric_name != 'composite_score':
                summary.append(f"  {metric_name}: {stats['mean']:.3f}{stats['std']:.3f})")

        # Performance recommendations
        summary.append("\nOptimization Suggestions:")

        if overall_metrics.get('retrieval_precision', {}).get('mean', 0) < 0.6:
            summary.append("  - Low retrieval precision, consider optimizing retrieval strategy or reranking algorithm")

        if overall_metrics.get('retrieval_recall', {}).get('mean', 0) < 0.6:
            summary.append("  - Low retrieval recall, consider expanding retrieval range or improving query expansion")

        if overall_metrics.get('answer_faithfulness', {}).get('mean', 0) < 0.7:
            summary.append("  - Insufficient answer faithfulness, consider strengthening reliance on retrieved content")

        return "\n".join(summary)

# Usage example
def run_rag_evaluation():
    """Run RAG evaluation example"""

    # Prepare test data
    test_dataset = [
        {
            'question': 'What is machine learning?',
        },
        {
            'question': 'What are the main characteristics of deep learning?',
        }
    ]

    ground_truth = [
        {
            'expected_answer': 'Machine learning is a branch of artificial intelligence that enables computers to learn from data',
            'relevant_documents': ['Machine learning is a branch of artificial intelligence']
        },
        {
            'expected_answer': 'Deep learning uses multi-layer neural networks and can automatically extract features',
            'relevant_documents': ['Deep learning uses neural networks']
        }
    ]

    # Create mock RAG system
    class MockRAGSystem:
        def process_query(self, query):
            return {
                'answer': f'Answer about "{query}"...',
                'retrieved_documents': [
                    {'content': 'Related document content...', 'score': 0.8}
                ]
            }

    # Execute evaluation
    evaluator = RAGEvaluator()
    rag_system = MockRAGSystem()

    results = evaluator.evaluate_rag_system(rag_system, test_dataset, ground_truth)

    print(results['summary'])

    return results

# evaluation_results = run_rag_evaluation()

Practice Exercises

Exercise 1: Build Domain-Specific RAG System

class DomainSpecificRAG:
    """Domain-specific RAG system exercise"""

    def __init__(self, domain: str):
        self.domain = domain
        # TODO: Implement domain-specific RAG system
        pass

    def add_domain_documents(self, documents):
        """Add domain documents"""
        # TODO: Implement domain document processing
        pass

    def query_with_domain_context(self, query):
        """Query with domain context"""
        # TODO: Implement domain-related query processing
        pass

# Exercise tasks:
# 1. Choose a specific domain (e.g., medical, legal, technical)
# 2. Implement domain-specific document preprocessing
# 3. Design domain-related evaluation metrics

Exercise 2: Real-time RAG System

class RealTimeRAG:
    """Real-time RAG system exercise"""

    def __init__(self):
        # TODO: Implement real-time updating RAG system
        pass

    def update_knowledge_base(self, new_documents):
        """Update knowledge base in real-time"""
        # TODO: Implement incremental updates
        pass

    def handle_concurrent_queries(self, queries):
        """Handle concurrent queries"""
        # TODO: Implement concurrent processing
        pass

# Exercise tasks:
# 1. Implement incremental index updates
# 2. Design caching strategy
# 3. Optimize concurrent performance

Best Practices

1. Retrieval Optimization Strategies

def retrieval_optimization_guide():
    """Retrieval optimization strategy guide"""

    strategies = {
        'Query Optimization': [
            'Query expansion: Add synonyms and related terms',
            'Query rewriting: Decompose complex queries into simple ones',
            'Multi-query generation: Generate multiple query variants'
        ],

        'Retrieval Strategy': [
            'Hybrid retrieval: Combine semantic search with keyword search',
            'Multi-hop retrieval: Multiple rounds of retrieval for complex questions',
            'Context-aware retrieval: Consider conversation history and context'
        ],

        'Reranking Optimization': [
            'Relevance reranking: Use more precise relevance models',
            'Diversity reranking: Ensure result diversity',
            'Time weighting: Consider information timeliness'
        ],

        'Answer Generation': [
            'Context compression: Extract most relevant information snippets',
            'Multi-document fusion: Synthesize information from multiple documents',
            'Answer verification: Check answer factuality and consistency'
        ]
    }

    return strategies

# Performance monitoring
class RAGPerformanceMonitor:
    """RAG performance monitor"""

    def __init__(self):
        self.metrics = {
            'latency': [],
            'retrieval_accuracy': [],
            'answer_quality': []
        }

    def monitor_query_performance(self, query_func):
        """Decorator to monitor query performance"""
        import time
        import functools

        @functools.wraps(query_func)
        def wrapper(*args, **kwargs):
            start_time = time.time()
            result = query_func(*args, **kwargs)
            end_time = time.time()

            # Record latency
            latency = end_time - start_time
            self.metrics['latency'].append(latency)

            # Record other metrics
            self.record_performance_metrics(result)

            return result

        return wrapper

    def record_performance_metrics(self, result):
        """Record performance metrics"""
        # TODO: Implement metric recording logic
        pass

    def generate_performance_report(self):
        """Generate performance report"""
        if self.metrics['latency']:
            avg_latency = sum(self.metrics['latency']) / len(self.metrics['latency'])
            print(f"Average Latency: {avg_latency:.3f}s")

        # TODO: Generate more detailed report

Through this chapter, you should have mastered the complete approach to implementing retrieval-augmented generation systems in DSPy. RAG systems are an important component of modern AI applications, enabling language models to access external knowledge and generate more accurate answers. In practical applications, you should choose appropriate retrieval strategies and optimization methods based on specific business requirements and data characteristics.