Chapter 10: Testing and Debugging DAGs
Haiyue
14min
Chapter 10: Testing and Debugging DAGs
Learning Objectives
- Write unit tests for DAGs, tasks, and custom operators
- Use
airflow dags testandairflow tasks testcommands - Debug DAGs locally with IDE integration
- Validate DAG integrity and detect import errors
Knowledge Points
10.1 DAG Validation Tests
The most fundamental test is ensuring your DAGs load without errors:
"""
File: tests/test_dag_integrity.py
Test that all DAGs load correctly.
"""
import pytest
from airflow.models import DagBag
class TestDagIntegrity:
"""Test DAG loading and basic properties."""
@pytest.fixture(scope="class")
def dagbag(self):
"""Load all DAGs."""
return DagBag(dag_folder="dags/", include_examples=False)
def test_no_import_errors(self, dagbag):
"""Ensure no DAG import errors."""
assert len(dagbag.import_errors) == 0, (
f"DAG import errors: {dagbag.import_errors}"
)
def test_dags_loaded(self, dagbag):
"""Ensure at least one DAG is loaded."""
assert len(dagbag.dags) > 0, "No DAGs found!"
def test_dag_has_tasks(self, dagbag):
"""Ensure every DAG has at least one task."""
for dag_id, dag in dagbag.dags.items():
assert len(dag.tasks) > 0, f"DAG '{dag_id}' has no tasks"
def test_dag_has_no_cycles(self, dagbag):
"""Ensure no DAG has circular dependencies."""
for dag_id, dag in dagbag.dags.items():
# This is implicitly checked by Airflow, but explicit is better
assert dag.test_cycle() is False or dag.test_cycle() is None, (
f"DAG '{dag_id}' has a cycle"
)
def test_specific_dag_exists(self, dagbag):
"""Ensure critical DAGs exist."""
expected_dags = ["weather_pipeline", "data_validation"]
for dag_id in expected_dags:
assert dag_id in dagbag.dags, f"Expected DAG '{dag_id}' not found"
def test_dag_tags(self, dagbag):
"""Ensure all DAGs have tags for organization."""
for dag_id, dag in dagbag.dags.items():
assert len(dag.tags) > 0, f"DAG '{dag_id}' has no tags"
10.2 Testing Task Logic
"""
File: tests/test_task_logic.py
Test individual task functions in isolation.
"""
import pytest
# Import the task functions directly
# When using TaskFlow, extract logic into testable functions
def convert_temperature(temp_f: float) -> float:
"""Convert Fahrenheit to Celsius."""
return round((temp_f - 32) * 5 / 9, 1)
def classify_weather(temp_c: float, humidity: float) -> str:
"""Classify weather conditions."""
if temp_c > 30:
classification = "hot"
elif temp_c > 15:
classification = "mild"
else:
classification = "cold"
if humidity > 70:
classification += "_humid"
return classification
def validate_record(record: dict) -> tuple:
"""Validate a single record. Returns (is_valid, errors)."""
errors = []
if not record.get("name"):
errors.append("missing name")
if "@" not in record.get("email", ""):
errors.append("invalid email")
if record.get("age", 0) < 0:
errors.append("negative age")
return len(errors) == 0, errors
class TestTemperatureConversion:
def test_freezing_point(self):
assert convert_temperature(32) == 0.0
def test_boiling_point(self):
assert convert_temperature(212) == 100.0
def test_body_temperature(self):
assert convert_temperature(98.6) == 37.0
def test_negative_temperature(self):
assert convert_temperature(-40) == -40.0
class TestWeatherClassification:
def test_hot_dry(self):
assert classify_weather(35, 50) == "hot"
def test_hot_humid(self):
assert classify_weather(35, 80) == "hot_humid"
def test_mild(self):
assert classify_weather(20, 50) == "mild"
def test_cold(self):
assert classify_weather(5, 40) == "cold"
def test_cold_humid(self):
assert classify_weather(5, 85) == "cold_humid"
class TestRecordValidation:
def test_valid_record(self):
record = {"name": "Alice", "email": "alice@example.com", "age": 30}
is_valid, errors = validate_record(record)
assert is_valid is True
assert errors == []
def test_missing_name(self):
record = {"name": "", "email": "test@example.com", "age": 25}
is_valid, errors = validate_record(record)
assert is_valid is False
assert "missing name" in errors
def test_invalid_email(self):
record = {"name": "Bob", "email": "invalid", "age": 25}
is_valid, errors = validate_record(record)
assert is_valid is False
assert "invalid email" in errors
def test_negative_age(self):
record = {"name": "Charlie", "email": "c@example.com", "age": -1}
is_valid, errors = validate_record(record)
assert is_valid is False
assert "negative age" in errors
def test_multiple_errors(self):
record = {"name": "", "email": "invalid", "age": -5}
is_valid, errors = validate_record(record)
assert is_valid is False
assert len(errors) == 3
10.3 Testing DAGs with Airflow CLI
# Test an entire DAG run (does not persist to DB)
airflow dags test weather_pipeline 2026-01-15
# Test a single task
airflow tasks test weather_pipeline extract_weather_data 2026-01-15
# Test with verbose output
airflow tasks test weather_pipeline extract_weather_data 2026-01-15 -v
# Check for DAG import errors
airflow dags list-import-errors
# Validate DAG file syntax
python dags/weather_pipeline.py
# Render Jinja templates for a task
airflow tasks render weather_pipeline extract_weather_data 2026-01-15
10.4 Testing Custom Operators
"""
File: tests/test_custom_operator.py
"""
import pytest
from unittest.mock import MagicMock, patch
from airflow.models import TaskInstance, DagRun
from airflow.utils.state import State
from datetime import datetime
# Example custom operator
from airflow.models.baseoperator import BaseOperator
class DataQualityOperator(BaseOperator):
"""Custom operator that checks data quality."""
def __init__(self, table: str, checks: list, conn_id: str = "default", **kwargs):
super().__init__(**kwargs)
self.table = table
self.checks = checks
self.conn_id = conn_id
def execute(self, context):
results = []
for check in self.checks:
# In practice: run SQL query
result = {"check": check["name"], "passed": True}
results.append(result)
failed = [r for r in results if not r["passed"]]
if failed:
raise ValueError(f"Quality checks failed: {failed}")
return results
class TestDataQualityOperator:
def test_operator_initialization(self):
"""Test operator is initialized correctly."""
op = DataQualityOperator(
task_id="quality_check",
table="analytics.sales",
checks=[{"name": "row_count", "sql": "SELECT COUNT(*) FROM {table}"}],
conn_id="test_db",
)
assert op.table == "analytics.sales"
assert len(op.checks) == 1
assert op.conn_id == "test_db"
def test_all_checks_pass(self):
"""Test when all quality checks pass."""
op = DataQualityOperator(
task_id="quality_check",
table="test_table",
checks=[
{"name": "not_null", "sql": "SELECT COUNT(*) FROM test WHERE id IS NULL"},
{"name": "unique", "sql": "SELECT COUNT(*) - COUNT(DISTINCT id) FROM test"},
],
)
context = {"ds": "2026-01-15"}
results = op.execute(context)
assert len(results) == 2
assert all(r["passed"] for r in results)
10.5 Mocking External Dependencies
"""
File: tests/test_with_mocks.py
Test tasks that depend on external services.
"""
import pytest
from unittest.mock import patch, MagicMock
def fetch_api_data(endpoint: str, api_conn_id: str = "default") -> dict:
"""Function that calls an external API."""
from airflow.providers.http.hooks.http import HttpHook
hook = HttpHook(method="GET", http_conn_id=api_conn_id)
response = hook.run(endpoint)
return response.json()
def process_s3_file(bucket: str, key: str) -> dict:
"""Function that reads from S3."""
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
hook = S3Hook(aws_conn_id="aws_default")
content = hook.read_key(key, bucket)
return {"content": content, "size": len(content)}
class TestWithMocks:
@patch("airflow.providers.http.hooks.http.HttpHook")
def test_fetch_api_data(self, mock_hook_class):
"""Test API fetching with mocked HTTP hook."""
mock_response = MagicMock()
mock_response.json.return_value = {"status": "ok", "data": [1, 2, 3]}
mock_hook = MagicMock()
mock_hook.run.return_value = mock_response
mock_hook_class.return_value = mock_hook
result = fetch_api_data("/api/v1/data")
assert result["status"] == "ok"
assert len(result["data"]) == 3
mock_hook.run.assert_called_once_with("/api/v1/data")
@patch("airflow.providers.amazon.aws.hooks.s3.S3Hook")
def test_process_s3_file(self, mock_hook_class):
"""Test S3 file processing with mocked S3 hook."""
mock_hook = MagicMock()
mock_hook.read_key.return_value = '{"records": [1, 2, 3]}'
mock_hook_class.return_value = mock_hook
result = process_s3_file("my-bucket", "data/file.json")
assert result["size"] > 0
mock_hook.read_key.assert_called_once_with("data/file.json", "my-bucket")
### 10.6 Debugging Strategies
```python
"""
File: dags/debug_example.py
Techniques for debugging DAGs.
"""
from airflow.decorators import dag, task
from datetime import datetime
import logging
logger = logging.getLogger(__name__)
@dag(
dag_id="debug_example",
start_date=datetime(2026, 1, 1),
schedule=None,
catchup=False,
tags=["debug"],
)
def debug_example():
@task
def debug_with_logging():
"""Use Python logging for debugging."""
logger.debug("This is a debug message")
logger.info("This is an info message")
logger.warning("This is a warning message")
logger.error("This is an error message")
data = {"key": "value", "count": 42}
logger.info(f"Processing data: {data}")
return data
@task
def debug_with_context(**context):
"""Access Airflow context for debugging."""
# Print all available context variables
for key, value in sorted(context.items()):
logger.info(f"Context: {key} = {type(value).__name__}")
# Common context values
logger.info(f"DAG ID: {context['dag'].dag_id}")
logger.info(f"Task ID: {context['task'].task_id}")
logger.info(f"Execution Date: {context['ds']}")
logger.info(f"Run ID: {context['run_id']}")
debug_with_logging() >> debug_with_context()
debug_example()
# View task logs
airflow tasks logs debug_example debug_with_logging 2026-01-15
# Run with increased verbosity
AIRFLOW__LOGGING__LOGGING_LEVEL=DEBUG airflow tasks test debug_example debug_with_logging 2026-01-15
# Check scheduler logs for parsing issues
tail -f $AIRFLOW_HOME/logs/scheduler/latest/*.log
Practice Exercise
Exercise: Build a Test Suite
"""
File: tests/conftest.py
Shared test fixtures.
"""
import pytest
from airflow.models import DagBag
@pytest.fixture(scope="session")
def dagbag():
return DagBag(dag_folder="dags/", include_examples=False)
@pytest.fixture
def sample_weather_data():
return [
{"city": "New York", "temp_f": 72, "humidity": 65},
{"city": "London", "temp_f": 59, "humidity": 80},
{"city": "Tokyo", "temp_f": 85, "humidity": 70},
]
"""
File: tests/test_weather_pipeline.py
Comprehensive test suite for the weather pipeline.
"""
import pytest
class TestWeatherPipeline:
def test_dag_exists(self, dagbag):
assert "weather_pipeline" in dagbag.dags
def test_task_count(self, dagbag):
dag = dagbag.dags["weather_pipeline"]
assert len(dag.tasks) == 5
def test_task_dependencies(self, dagbag):
dag = dagbag.dags["weather_pipeline"]
extract = dag.get_task("extract_weather_data")
convert = dag.get_task("convert_to_celsius")
# Check that convert depends on extract
assert extract.task_id in [t.task_id for t in convert.upstream_list]
def test_temperature_conversion(self, sample_weather_data):
for record in sample_weather_data:
temp_c = round((record["temp_f"] - 32) * 5 / 9, 1)
assert isinstance(temp_c, float)
assert -100 < temp_c < 100
def test_dag_schedule(self, dagbag):
dag = dagbag.dags["weather_pipeline"]
assert dag.schedule_interval is not None
# Run the test suite
pytest tests/ -v
# Run with coverage
pytest tests/ --cov=dags --cov-report=term-missing
# Run specific test class
pytest tests/test_weather_pipeline.py::TestWeatherPipeline -v
Summary
In this chapter, you learned:
- DAG integrity tests catch import errors and structural issues early
- Extract business logic into pure functions for easy unit testing
- Use Airflow CLI (
dags test,tasks test) for integration testing - Mock external dependencies (hooks, APIs) to test in isolation
- Effective debugging uses Python logging and Airflow context inspection