Chapter 03: DSPy Predictors in Detail

Haiyue
17min

Chapter 03: DSPy Predictors in Detail

Learning Objectives
  • Learn Chain of Thought (CoT) predictors
  • Master the use of Retrieve predictors
  • Understand ChainOfThoughtWithHint predictors
  • Explore ReAct predictor applications
  • Methods for creating custom predictors

Key Concepts

1. Predictor System Overview

DSPy provides multiple predictors, each suitable for different reasoning scenarios:

Predictor TypeUse CasesCharacteristics
PredictBasic Q&A, classificationDirect input-to-output mapping
ChainOfThoughtComplex reasoningExplicit reasoning process
ProgramOfThoughtMathematical computationProgrammatic thinking
ReActReasoning requiring actionsReasoning-action loop
RetrieveKnowledge retrievalRetrieve relevant information

2. Chain of Thought (CoT) Predictor

CoT predictors improve performance on complex tasks through explicit reasoning steps.

Working Mechanism

  1. Decompose the problem into reasoning steps
  2. Show the thinking process step by step
  3. Derive the final answer based on reasoning

Applicable Scenarios

  • Mathematical problem solving
  • Logical reasoning
  • Complex text analysis
  • Multi-step tasks

3. Retrieve Predictor

Combines retrieval mechanisms to enhance generation capabilities, particularly suitable for knowledge-intensive tasks.

Core Components

  • Retriever: Retrieves relevant documents from the knowledge base
  • Generator: Generates answers based on retrieval results

4. ReAct Predictor

A predictor that combines Reasoning and Acting.

ReAct Loop

  1. Think: Analyze the current situation
  2. Act: Execute specific operations
  3. Observe: Obtain action results
  4. Repeat until task completion

Example Code

Chain of Thought Predictor

import dspy

# Configure model
lm = dspy.OpenAI(model="gpt-3.5-turbo", max_tokens=300)
dspy.settings.configure(lm=lm)

# Define math problem signature
class MathProblem(dspy.Signature):
    """Solve math word problems"""
    problem = dspy.InputField(desc="Math problem description")
    reasoning = dspy.OutputField(desc="Detailed solution steps")
    answer = dspy.OutputField(desc="Final numerical answer")

# Compare basic predictor vs CoT predictor
def compare_basic_and_cot():
    # Basic predictor
    basic_predictor = dspy.Predict(MathProblem)

    # CoT predictor
    cot_predictor = dspy.ChainOfThought(MathProblem)

    problem = "A class has 30 students, 60% of whom are girls. If 5 more boys join, what percentage of the total are boys now?"

    print("=== Basic Predictor Result ===")
    basic_result = basic_predictor(problem=problem)
    print(f"Reasoning: {basic_result.reasoning}")
    print(f"Answer: {basic_result.answer}")

    print("\n=== CoT Predictor Result ===")
    cot_result = cot_predictor(problem=problem)
    print(f"Reasoning: {cot_result.reasoning}")
    print(f"Answer: {cot_result.answer}")

# Advanced usage of CoT predictor
class AdvancedCoT(dspy.Module):
    def __init__(self):
        super().__init__()
        self.cot = dspy.ChainOfThought(
            "problem, context -> step_by_step_reasoning, final_answer"
        )

    def forward(self, problem, context=""):
        result = self.cot(problem=problem, context=context)

        # Post-process reasoning steps
        steps = self._parse_reasoning_steps(result.step_by_step_reasoning)

        return dspy.Prediction(
            reasoning_steps=steps,
            final_answer=result.final_answer,
            raw_reasoning=result.step_by_step_reasoning
        )

    def _parse_reasoning_steps(self, reasoning_text):
        """Parse reasoning steps"""
        lines = reasoning_text.split('\n')
        steps = []

        for line in lines:
            line = line.strip()
            if line and (line.startswith('Step') or line.startswith('Step')):
                steps.append(line)

        return steps

# Use advanced CoT
def demo_advanced_cot():
    advanced_cot = AdvancedCoT()

    result = advanced_cot(
        problem="Calculate compound interest: principal of 10,000 yuan, annual interest rate of 5%, total principal and interest after 3 years?",
        context="Compound interest formula: A = P(1 + r)^t"
    )

    print("Reasoning steps:")
    for i, step in enumerate(result.reasoning_steps, 1):
        print(f"  {i}. {step}")

    print(f"\nFinal answer: {result.final_answer}")

Retrieve Predictor

import dspy
from dspy.retrieve import ColBERTv2

# Configure retrieval model
retriever = dspy.ColBERTv2(url='http://20.102.90.50:2017/wiki17_abstracts')
lm = dspy.OpenAI(model="gpt-3.5-turbo")

dspy.settings.configure(lm=lm, rm=retriever)

# Define RAG signature
class RAGSignature(dspy.Signature):
    """Retrieval-based question answering"""
    question = dspy.InputField(desc="User question")
    context = dspy.InputField(desc="Retrieved relevant documents")
    answer = dspy.OutputField(desc="Answer based on context")

# Simple RAG module
class SimpleRAG(dspy.Module):
    def __init__(self, k=3):
        super().__init__()
        self.retrieve = dspy.Retrieve(k=k)
        self.generate_answer = dspy.ChainOfThought(RAGSignature)

    def forward(self, question):
        # 1. Retrieve relevant documents
        context = self.retrieve(question).passages

        # 2. Generate answer based on context
        result = self.generate_answer(
            question=question,
            context=context
        )

        return dspy.Prediction(
            answer=result.answer,
            context=context,
            reasoning=getattr(result, 'reasoning', '')
        )

# Advanced RAG module
class AdvancedRAG(dspy.Module):
    def __init__(self, k=5):
        super().__init__()
        self.retrieve = dspy.Retrieve(k=k)
        self.rerank = dspy.Predict(
            "question, passages -> relevant_passages"
        )
        self.generate = dspy.ChainOfThought(RAGSignature)

    def forward(self, question):
        # 1. Initial retrieval
        initial_context = self.retrieve(question).passages

        # 2. Re-ranking
        reranked = self.rerank(
            question=question,
            passages=initial_context
        )

        # 3. Generate answer
        result = self.generate(
            question=question,
            context=reranked.relevant_passages
        )

        return dspy.Prediction(
            answer=result.answer,
            context=initial_context,
            reranked_context=reranked.relevant_passages,
            reasoning=getattr(result, 'reasoning', '')
        )

# Use retrieve predictor
def demo_retrieve_predictor():
    rag = SimpleRAG(k=3)

    question = "What is artificial intelligence?"
    result = rag(question=question)

    print(f"Question: {question}")
    print(f"\nRetrieved context:")
    for i, passage in enumerate(result.context, 1):
        print(f"  {i}. {passage[:100]}...")

    print(f"\nAnswer: {result.answer}")
    if result.reasoning:
        print(f"Reasoning: {result.reasoning}")

ReAct Predictor

# ReAct signature definition
class ReActSignature(dspy.Signature):
    """ReAct reasoning-action pattern"""
    query = dspy.InputField(desc="Problem to solve")
    thought = dspy.OutputField(desc="Current thinking")
    action = dspy.OutputField(desc="Action to execute")
    observation = dspy.InputField(desc="Observation result of action")
    answer = dspy.OutputField(desc="Final answer")

# Tool definitions
class Calculator:
    """Calculator tool"""
    def calculate(self, expression):
        try:
            result = eval(expression)
            return f"Calculation result: {result}"
        except Exception as e:
            return f"Calculation error: {e}"

class WebSearcher:
    """Web search tool (simulated)"""
    def search(self, query):
        # Simulate search results
        mock_results = {
            "artificial intelligence": "Artificial intelligence is a branch of computer science...",
            "machine learning": "Machine learning is a subfield of artificial intelligence...",
            "deep learning": "Deep learning is a method of machine learning..."
        }

        for key in mock_results:
            if key in query.lower():
                return mock_results[key]

        return "No relevant information found"

# ReAct module implementation
class ReActAgent(dspy.Module):
    def __init__(self, max_iterations=5):
        super().__init__()
        self.max_iterations = max_iterations
        self.think_act = dspy.ChainOfThought(
            "query, history -> thought, action"
        )
        self.final_answer = dspy.ChainOfThought(
            "query, history -> answer"
        )

        # Tools
        self.tools = {
            "calculator": Calculator(),
            "search": WebSearcher()
        }

    def forward(self, query):
        history = []

        for i in range(self.max_iterations):
            # Think and act
            history_text = self._format_history(history)
            result = self.think_act(
                query=query,
                history=history_text
            )

            thought = result.thought
            action = result.action

            print(f"Round {i+1}:")
            print(f"  Thought: {thought}")
            print(f"  Action: {action}")

            # Execute action
            observation = self._execute_action(action)
            print(f"  Observation: {observation}")

            # Record history
            history.append({
                'thought': thought,
                'action': action,
                'observation': observation
            })

            # Check if completed
            if "complete" in observation.lower() or "answer is" in observation.lower():
                break

        # Generate final answer
        history_text = self._format_history(history)
        final_result = self.final_answer(
            query=query,
            history=history_text
        )

        return dspy.Prediction(
            answer=final_result.answer,
            history=history,
            iterations=len(history)
        )

    def _format_history(self, history):
        """Format history"""
        formatted = []
        for entry in history:
            formatted.append(f"Thought: {entry['thought']}")
            formatted.append(f"Action: {entry['action']}")
            formatted.append(f"Observation: {entry['observation']}")
            formatted.append("---")

        return "\n".join(formatted)

    def _execute_action(self, action):
        """Execute action"""
        action = action.strip()

        if action.lower().startswith("calculate"):
            # Extract expression
            expression = action.lower().replace("calculate", "").strip()
            return self.tools["calculator"].calculate(expression)

        elif action.lower().startswith("search"):
            # Extract search query
            query = action.lower().replace("search", "").strip()
            return self.tools["search"].search(query)

        elif "complete" in action.lower():
            return "Task complete"

        else:
            return "Invalid action"

# Use ReAct predictor
def demo_react_predictor():
    react_agent = ReActAgent(max_iterations=3)

    query = "What is deep learning? Please search for relevant information first, then provide a detailed explanation"
    result = react_agent(query=query)

    print(f"\nQuestion: {query}")
    print(f"Total {result.iterations} rounds of reasoning executed")
    print(f"Final answer: {result.answer}")

Custom Predictors

# Custom predictor base class
class CustomPredictor(dspy.Module):
    def __init__(self, signature, strategy="default"):
        super().__init__()
        self.signature = signature
        self.strategy = strategy

        if strategy == "cot":
            self.predictor = dspy.ChainOfThought(signature)
        elif strategy == "multi_shot":
            self.predictor = dspy.Predict(signature)
        else:
            self.predictor = dspy.Predict(signature)

    def forward(self, **kwargs):
        if self.strategy == "multi_shot":
            # Multi-sample strategy
            results = []
            for _ in range(3):  # Generate 3 candidate answers
                result = self.predictor(**kwargs)
                results.append(result)

            # Select best answer (simple voting)
            return self._select_best_answer(results)
        else:
            return self.predictor(**kwargs)

    def _select_best_answer(self, results):
        # Simple majority voting
        return results[0]  # Simplified implementation

# Specialized predictor: Sentiment Analysis
class SentimentAnalyzer(dspy.Module):
    def __init__(self):
        super().__init__()

        # Multi-stage processing
        self.preprocess = dspy.Predict(
            "text -> cleaned_text, features"
        )
        self.analyze = dspy.ChainOfThought(
            "cleaned_text, features -> sentiment, confidence, reasoning"
        )
        self.postprocess = dspy.Predict(
            "sentiment, confidence, reasoning -> final_sentiment, score"
        )

    def forward(self, text):
        # Preprocessing
        prep_result = self.preprocess(text=text)

        # Analysis
        analysis = self.analyze(
            cleaned_text=prep_result.cleaned_text,
            features=prep_result.features
        )

        # Post-processing
        final_result = self.postprocess(
            sentiment=analysis.sentiment,
            confidence=analysis.confidence,
            reasoning=analysis.reasoning
        )

        return dspy.Prediction(
            sentiment=final_result.final_sentiment,
            score=final_result.score,
            reasoning=analysis.reasoning,
            features=prep_result.features
        )

# Use custom predictor
def demo_custom_predictor():
    # Sentiment analyzer
    analyzer = SentimentAnalyzer()

    text = "The weather is so nice today, I'm feeling very happy!"
    result = analyzer(text=text)

    print(f"Text: {text}")
    print(f"Sentiment: {result.sentiment}")
    print(f"Score: {result.score}")
    print(f"Reasoning: {result.reasoning}")
    print(f"Features: {result.features}")

Practical Exercises

Exercise 1: CoT Optimization

Optimize Chain of Thought predictor for domain-specific problems.

class DomainSpecificCoT(dspy.Module):
    """Domain-optimized CoT predictor"""
    def __init__(self, domain):
        super().__init__()
        # Implement your optimization logic
        pass

Exercise 2: RAG Enhancement

Implement a multi-source retrieval RAG system.

class MultiSourceRAG(dspy.Module):
    """Multi-source retrieval system"""
    def __init__(self):
        super().__init__()
        # Implement multi-source retrieval logic
        pass

Exercise 3: ReAct Tool Integration

Add more tools to the ReAct predictor.

class AdvancedReActAgent(dspy.Module):
    """Enhanced ReAct agent"""
    def __init__(self):
        super().__init__()
        # Add more tools
        pass
Performance Tips
  • Choose appropriate predictors based on task complexity
  • CoT predictors consume more tokens, pay attention to cost control
  • Retrieve predictors need proper retriever configuration
  • ReAct predictors may require multiple interaction rounds, set reasonable iteration limits
Best Practices
  • Design appropriate signatures for each predictor type
  • Consider combining multiple predictors for complex tasks
  • Thoroughly test predictor performance on different inputs
  • Consider caching mechanisms to improve performance

Chapter Summary

This chapter detailed various DSPy predictors:

  1. Chain of Thought: Improve complex task performance through explicit reasoning
  2. Retrieve: Combine retrieval to enhance generation capabilities
  3. ReAct: Reasoning-action loop for handling tasks requiring tools
  4. Custom Predictors: Customized solutions for specific needs

Mastering the usage of these predictors will lay a solid foundation for building efficient language model applications.