Chapter 10: Testing and Debugging DAGs

Haiyue
14min

Chapter 10: Testing and Debugging DAGs

Learning Objectives
  • Write unit tests for DAGs, tasks, and custom operators
  • Use airflow dags test and airflow tasks test commands
  • Debug DAGs locally with IDE integration
  • Validate DAG integrity and detect import errors

Knowledge Points

10.1 DAG Validation Tests

The most fundamental test is ensuring your DAGs load without errors:

"""
File: tests/test_dag_integrity.py
Test that all DAGs load correctly.
"""
import pytest
from airflow.models import DagBag


class TestDagIntegrity:
    """Test DAG loading and basic properties."""

    @pytest.fixture(scope="class")
    def dagbag(self):
        """Load all DAGs."""
        return DagBag(dag_folder="dags/", include_examples=False)

    def test_no_import_errors(self, dagbag):
        """Ensure no DAG import errors."""
        assert len(dagbag.import_errors) == 0, (
            f"DAG import errors: {dagbag.import_errors}"
        )

    def test_dags_loaded(self, dagbag):
        """Ensure at least one DAG is loaded."""
        assert len(dagbag.dags) > 0, "No DAGs found!"

    def test_dag_has_tasks(self, dagbag):
        """Ensure every DAG has at least one task."""
        for dag_id, dag in dagbag.dags.items():
            assert len(dag.tasks) > 0, f"DAG '{dag_id}' has no tasks"

    def test_dag_has_no_cycles(self, dagbag):
        """Ensure no DAG has circular dependencies."""
        for dag_id, dag in dagbag.dags.items():
            # This is implicitly checked by Airflow, but explicit is better
            assert dag.test_cycle() is False or dag.test_cycle() is None, (
                f"DAG '{dag_id}' has a cycle"
            )

    def test_specific_dag_exists(self, dagbag):
        """Ensure critical DAGs exist."""
        expected_dags = ["weather_pipeline", "data_validation"]
        for dag_id in expected_dags:
            assert dag_id in dagbag.dags, f"Expected DAG '{dag_id}' not found"

    def test_dag_tags(self, dagbag):
        """Ensure all DAGs have tags for organization."""
        for dag_id, dag in dagbag.dags.items():
            assert len(dag.tags) > 0, f"DAG '{dag_id}' has no tags"

10.2 Testing Task Logic

"""
File: tests/test_task_logic.py
Test individual task functions in isolation.
"""
import pytest


# Import the task functions directly
# When using TaskFlow, extract logic into testable functions
def convert_temperature(temp_f: float) -> float:
    """Convert Fahrenheit to Celsius."""
    return round((temp_f - 32) * 5 / 9, 1)


def classify_weather(temp_c: float, humidity: float) -> str:
    """Classify weather conditions."""
    if temp_c > 30:
        classification = "hot"
    elif temp_c > 15:
        classification = "mild"
    else:
        classification = "cold"

    if humidity > 70:
        classification += "_humid"
    return classification


def validate_record(record: dict) -> tuple:
    """Validate a single record. Returns (is_valid, errors)."""
    errors = []
    if not record.get("name"):
        errors.append("missing name")
    if "@" not in record.get("email", ""):
        errors.append("invalid email")
    if record.get("age", 0) < 0:
        errors.append("negative age")
    return len(errors) == 0, errors


class TestTemperatureConversion:
    def test_freezing_point(self):
        assert convert_temperature(32) == 0.0

    def test_boiling_point(self):
        assert convert_temperature(212) == 100.0

    def test_body_temperature(self):
        assert convert_temperature(98.6) == 37.0

    def test_negative_temperature(self):
        assert convert_temperature(-40) == -40.0


class TestWeatherClassification:
    def test_hot_dry(self):
        assert classify_weather(35, 50) == "hot"

    def test_hot_humid(self):
        assert classify_weather(35, 80) == "hot_humid"

    def test_mild(self):
        assert classify_weather(20, 50) == "mild"

    def test_cold(self):
        assert classify_weather(5, 40) == "cold"

    def test_cold_humid(self):
        assert classify_weather(5, 85) == "cold_humid"


class TestRecordValidation:
    def test_valid_record(self):
        record = {"name": "Alice", "email": "alice@example.com", "age": 30}
        is_valid, errors = validate_record(record)
        assert is_valid is True
        assert errors == []

    def test_missing_name(self):
        record = {"name": "", "email": "test@example.com", "age": 25}
        is_valid, errors = validate_record(record)
        assert is_valid is False
        assert "missing name" in errors

    def test_invalid_email(self):
        record = {"name": "Bob", "email": "invalid", "age": 25}
        is_valid, errors = validate_record(record)
        assert is_valid is False
        assert "invalid email" in errors

    def test_negative_age(self):
        record = {"name": "Charlie", "email": "c@example.com", "age": -1}
        is_valid, errors = validate_record(record)
        assert is_valid is False
        assert "negative age" in errors

    def test_multiple_errors(self):
        record = {"name": "", "email": "invalid", "age": -5}
        is_valid, errors = validate_record(record)
        assert is_valid is False
        assert len(errors) == 3

10.3 Testing DAGs with Airflow CLI

# Test an entire DAG run (does not persist to DB)
airflow dags test weather_pipeline 2026-01-15

# Test a single task
airflow tasks test weather_pipeline extract_weather_data 2026-01-15

# Test with verbose output
airflow tasks test weather_pipeline extract_weather_data 2026-01-15 -v

# Check for DAG import errors
airflow dags list-import-errors

# Validate DAG file syntax
python dags/weather_pipeline.py

# Render Jinja templates for a task
airflow tasks render weather_pipeline extract_weather_data 2026-01-15

10.4 Testing Custom Operators

"""
File: tests/test_custom_operator.py
"""
import pytest
from unittest.mock import MagicMock, patch
from airflow.models import TaskInstance, DagRun
from airflow.utils.state import State
from datetime import datetime


# Example custom operator
from airflow.models.baseoperator import BaseOperator


class DataQualityOperator(BaseOperator):
    """Custom operator that checks data quality."""

    def __init__(self, table: str, checks: list, conn_id: str = "default", **kwargs):
        super().__init__(**kwargs)
        self.table = table
        self.checks = checks
        self.conn_id = conn_id

    def execute(self, context):
        results = []
        for check in self.checks:
            # In practice: run SQL query
            result = {"check": check["name"], "passed": True}
            results.append(result)

        failed = [r for r in results if not r["passed"]]
        if failed:
            raise ValueError(f"Quality checks failed: {failed}")

        return results


class TestDataQualityOperator:

    def test_operator_initialization(self):
        """Test operator is initialized correctly."""
        op = DataQualityOperator(
            task_id="quality_check",
            table="analytics.sales",
            checks=[{"name": "row_count", "sql": "SELECT COUNT(*) FROM {table}"}],
            conn_id="test_db",
        )
        assert op.table == "analytics.sales"
        assert len(op.checks) == 1
        assert op.conn_id == "test_db"

    def test_all_checks_pass(self):
        """Test when all quality checks pass."""
        op = DataQualityOperator(
            task_id="quality_check",
            table="test_table",
            checks=[
                {"name": "not_null", "sql": "SELECT COUNT(*) FROM test WHERE id IS NULL"},
                {"name": "unique", "sql": "SELECT COUNT(*) - COUNT(DISTINCT id) FROM test"},
            ],
        )
        context = {"ds": "2026-01-15"}
        results = op.execute(context)
        assert len(results) == 2
        assert all(r["passed"] for r in results)

10.5 Mocking External Dependencies

"""
File: tests/test_with_mocks.py
Test tasks that depend on external services.
"""
import pytest
from unittest.mock import patch, MagicMock


def fetch_api_data(endpoint: str, api_conn_id: str = "default") -> dict:
    """Function that calls an external API."""
    from airflow.providers.http.hooks.http import HttpHook

    hook = HttpHook(method="GET", http_conn_id=api_conn_id)
    response = hook.run(endpoint)
    return response.json()


def process_s3_file(bucket: str, key: str) -> dict:
    """Function that reads from S3."""
    from airflow.providers.amazon.aws.hooks.s3 import S3Hook

    hook = S3Hook(aws_conn_id="aws_default")
    content = hook.read_key(key, bucket)
    return {"content": content, "size": len(content)}


class TestWithMocks:

    @patch("airflow.providers.http.hooks.http.HttpHook")
    def test_fetch_api_data(self, mock_hook_class):
        """Test API fetching with mocked HTTP hook."""
        mock_response = MagicMock()
        mock_response.json.return_value = {"status": "ok", "data": [1, 2, 3]}

        mock_hook = MagicMock()
        mock_hook.run.return_value = mock_response
        mock_hook_class.return_value = mock_hook

        result = fetch_api_data("/api/v1/data")

        assert result["status"] == "ok"
        assert len(result["data"]) == 3
        mock_hook.run.assert_called_once_with("/api/v1/data")

    @patch("airflow.providers.amazon.aws.hooks.s3.S3Hook")
    def test_process_s3_file(self, mock_hook_class):
        """Test S3 file processing with mocked S3 hook."""
        mock_hook = MagicMock()
        mock_hook.read_key.return_value = '{"records": [1, 2, 3]}'
        mock_hook_class.return_value = mock_hook

        result = process_s3_file("my-bucket", "data/file.json")

        assert result["size"] > 0
        mock_hook.read_key.assert_called_once_with("data/file.json", "my-bucket")


### 10.6 Debugging Strategies

```python
"""
File: dags/debug_example.py
Techniques for debugging DAGs.
"""
from airflow.decorators import dag, task
from datetime import datetime
import logging

logger = logging.getLogger(__name__)

@dag(
    dag_id="debug_example",
    start_date=datetime(2026, 1, 1),
    schedule=None,
    catchup=False,
    tags=["debug"],
)
def debug_example():

    @task
    def debug_with_logging():
        """Use Python logging for debugging."""
        logger.debug("This is a debug message")
        logger.info("This is an info message")
        logger.warning("This is a warning message")
        logger.error("This is an error message")

        data = {"key": "value", "count": 42}
        logger.info(f"Processing data: {data}")
        return data

    @task
    def debug_with_context(**context):
        """Access Airflow context for debugging."""
        # Print all available context variables
        for key, value in sorted(context.items()):
            logger.info(f"Context: {key} = {type(value).__name__}")

        # Common context values
        logger.info(f"DAG ID: {context['dag'].dag_id}")
        logger.info(f"Task ID: {context['task'].task_id}")
        logger.info(f"Execution Date: {context['ds']}")
        logger.info(f"Run ID: {context['run_id']}")

    debug_with_logging() >> debug_with_context()

debug_example()
# View task logs
airflow tasks logs debug_example debug_with_logging 2026-01-15

# Run with increased verbosity
AIRFLOW__LOGGING__LOGGING_LEVEL=DEBUG airflow tasks test debug_example debug_with_logging 2026-01-15

# Check scheduler logs for parsing issues
tail -f $AIRFLOW_HOME/logs/scheduler/latest/*.log

Practice Exercise

Exercise: Build a Test Suite

"""
File: tests/conftest.py
Shared test fixtures.
"""
import pytest
from airflow.models import DagBag


@pytest.fixture(scope="session")
def dagbag():
    return DagBag(dag_folder="dags/", include_examples=False)


@pytest.fixture
def sample_weather_data():
    return [
        {"city": "New York", "temp_f": 72, "humidity": 65},
        {"city": "London", "temp_f": 59, "humidity": 80},
        {"city": "Tokyo", "temp_f": 85, "humidity": 70},
    ]
"""
File: tests/test_weather_pipeline.py
Comprehensive test suite for the weather pipeline.
"""
import pytest


class TestWeatherPipeline:

    def test_dag_exists(self, dagbag):
        assert "weather_pipeline" in dagbag.dags

    def test_task_count(self, dagbag):
        dag = dagbag.dags["weather_pipeline"]
        assert len(dag.tasks) == 5

    def test_task_dependencies(self, dagbag):
        dag = dagbag.dags["weather_pipeline"]

        extract = dag.get_task("extract_weather_data")
        convert = dag.get_task("convert_to_celsius")

        # Check that convert depends on extract
        assert extract.task_id in [t.task_id for t in convert.upstream_list]

    def test_temperature_conversion(self, sample_weather_data):
        for record in sample_weather_data:
            temp_c = round((record["temp_f"] - 32) * 5 / 9, 1)
            assert isinstance(temp_c, float)
            assert -100 < temp_c < 100

    def test_dag_schedule(self, dagbag):
        dag = dagbag.dags["weather_pipeline"]
        assert dag.schedule_interval is not None
# Run the test suite
pytest tests/ -v

# Run with coverage
pytest tests/ --cov=dags --cov-report=term-missing

# Run specific test class
pytest tests/test_weather_pipeline.py::TestWeatherPipeline -v

Summary

In this chapter, you learned:

  • DAG integrity tests catch import errors and structural issues early
  • Extract business logic into pure functions for easy unit testing
  • Use Airflow CLI (dags test, tasks test) for integration testing
  • Mock external dependencies (hooks, APIs) to test in isolation
  • Effective debugging uses Python logging and Airflow context inspection