Chapter 3: Writing Your First DAG

Haiyue
12min

Chapter 3: Writing Your First DAG

Learning Objectives
  • Create a basic DAG with the TaskFlow API
  • Understand DAG parameters and configuration
  • Use the @dag and @task decorators
  • Run and monitor your DAG in the Web UI

Knowledge Points

The TaskFlow API is the recommended way to write DAGs in Airflow 3.x. It uses Python decorators to make DAG authoring more intuitive.

"""
File: dags/hello_airflow.py
Purpose: Your first DAG using TaskFlow API
"""
from airflow.decorators import dag, task
from datetime import datetime

@dag(
    dag_id="hello_airflow",
    start_date=datetime(2026, 1, 1),
    schedule="@daily",
    catchup=False,
    tags=["beginner"],
)
def hello_airflow():
    """My first Airflow DAG."""

    @task
    def say_hello():
        print("Hello, Airflow 3.x!")
        return "greeting_sent"

    @task
    def say_goodbye(status: str):
        print(f"Previous status: {status}")
        print("Goodbye, Airflow!")

    result = say_hello()
    say_goodbye(result)

hello_airflow()

3.2 Traditional Approach vs TaskFlow

Traditional approach (still supported):

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

def extract_fn():
    return {"data": [1, 2, 3]}

def process_fn(**context):
    ti = context["ti"]
    data = ti.xcom_pull(task_ids="extract")
    print(f"Processing: {data}")

with DAG(
    dag_id="traditional_dag",
    start_date=datetime(2026, 1, 1),
    schedule="@daily",
    catchup=False,
) as dag:
    extract = PythonOperator(
        task_id="extract",
        python_callable=extract_fn,
    )
    process = PythonOperator(
        task_id="process",
        python_callable=process_fn,
    )
    extract >> process

TaskFlow approach (recommended):

from airflow.decorators import dag, task
from datetime import datetime

@dag(
    dag_id="taskflow_dag",
    start_date=datetime(2026, 1, 1),
    schedule="@daily",
    catchup=False,
)
def taskflow_dag():

    @task
    def extract():
        return {"data": [1, 2, 3]}

    @task
    def process(data: dict):
        print(f"Processing: {data}")

    process(extract())

taskflow_dag()

3.3 DAG File Structure Best Practices

dags/
├── __init__.py
├── etl/
│   ├── __init__.py
│   ├── sales_etl.py
│   └── user_etl.py
├── ml/
│   ├── __init__.py
│   └── model_training.py
├── common/
│   ├── __init__.py
│   └── utils.py
└── config/
    └── pipeline_config.yaml

3.4 Comprehensive DAG Example

"""
File: dags/weather_pipeline.py
Purpose: A practical ETL pipeline that processes weather data
"""
from airflow.decorators import dag, task
from datetime import datetime, timedelta
import json

# Default arguments applied to all tasks
default_args = {
    "owner": "data_team",
    "retries": 3,
    "retry_delay": timedelta(minutes=5),
    "email_on_failure": False,
}

@dag(
    dag_id="weather_pipeline",
    start_date=datetime(2026, 1, 1),
    schedule="0 6 * * *",  # Every day at 6 AM
    catchup=False,
    default_args=default_args,
    tags=["etl", "weather"],
    doc_md="""
    ## Weather Data Pipeline
    This DAG extracts weather data, transforms it, and loads it into storage.
    - **Schedule**: Daily at 6 AM
    - **Owner**: data_team
    """,
)
def weather_pipeline():

    @task
    def extract_weather_data():
        """Simulate extracting weather data from an API."""
        raw_data = [
            {"city": "New York", "temp_f": 72, "humidity": 65, "wind_mph": 12},
            {"city": "London", "temp_f": 59, "humidity": 80, "wind_mph": 8},
            {"city": "Tokyo", "temp_f": 85, "humidity": 70, "wind_mph": 5},
            {"city": "Sydney", "temp_f": 68, "humidity": 55, "wind_mph": 15},
        ]
        print(f"Extracted {len(raw_data)} weather records")
        return raw_data

    @task
    def convert_to_celsius(weather_data: list):
        """Convert temperatures from Fahrenheit to Celsius."""
        for record in weather_data:
            record["temp_c"] = round((record["temp_f"] - 32) * 5 / 9, 1)
            record["wind_kph"] = round(record["wind_mph"] * 1.60934, 1)
        print(f"Converted {len(weather_data)} records to metric units")
        return weather_data

    @task
    def classify_weather(weather_data: list):
        """Add weather classification based on conditions."""
        for record in weather_data:
            if record["temp_c"] > 30:
                record["classification"] = "hot"
            elif record["temp_c"] > 15:
                record["classification"] = "mild"
            else:
                record["classification"] = "cold"

            if record["humidity"] > 70:
                record["classification"] += "_humid"
        print(f"Classified {len(weather_data)} records")
        return weather_data

    @task
    def generate_report(weather_data: list):
        """Generate a summary report."""
        avg_temp = sum(r["temp_c"] for r in weather_data) / len(weather_data)
        hottest = max(weather_data, key=lambda x: x["temp_c"])
        coldest = min(weather_data, key=lambda x: x["temp_c"])

        report = {
            "date": datetime.now().isoformat(),
            "total_cities": len(weather_data),
            "avg_temperature_c": round(avg_temp, 1),
            "hottest_city": hottest["city"],
            "coldest_city": coldest["city"],
        }
        print(f"Report: {json.dumps(report, indent=2)}")
        return report

    @task
    def save_results(weather_data: list, report: dict):
        """Simulate saving results to a data store."""
        print(f"Saving {len(weather_data)} records to database...")
        print(f"Saving report: avg temp = {report['avg_temperature_c']}°C")
        print("All data saved successfully!")

    # Define the pipeline flow
    raw_data = extract_weather_data()
    metric_data = convert_to_celsius(raw_data)
    classified_data = classify_weather(metric_data)
    report = generate_report(classified_data)
    save_results(classified_data, report)

weather_pipeline()

3.5 Running and Monitoring Your DAG

# Validate DAG syntax (no import errors)
airflow dags list-import-errors

# Test a DAG run (does not store results in DB)
airflow dags test weather_pipeline 2026-01-15

# Trigger a DAG run (stores results)
airflow dags trigger weather_pipeline

# Monitor task status
airflow tasks states-for-dag-run weather_pipeline <run_id>

Monitoring in the Web UI:

  1. Navigate to the DAGs page
  2. Click on weather_pipeline
  3. Switch to Graph view to see task dependencies
  4. Switch to Grid view to see historical run status
  5. Click on a task instance to view logs

Practice Exercise

Exercise 1: Build a Simple Notification Pipeline

"""
File: dags/notification_pipeline.py
Create a pipeline that:
1. Checks system health
2. Formats a status message
3. Sends a notification (simulated)
"""
from airflow.decorators import dag, task
from datetime import datetime

@dag(
    dag_id="notification_pipeline",
    start_date=datetime(2026, 1, 1),
    schedule="*/30 * * * *",  # Every 30 minutes
    catchup=False,
    tags=["exercise", "notifications"],
)
def notification_pipeline():

    @task
    def check_system_health():
        """Check various system metrics."""
        health = {
            "cpu_usage": 45.2,
            "memory_usage": 62.8,
            "disk_usage": 71.5,
            "active_connections": 128,
            "status": "healthy",
        }
        if health["cpu_usage"] > 90 or health["memory_usage"] > 90:
            health["status"] = "warning"
        return health

    @task
    def format_message(health: dict):
        """Format the health data into a readable message."""
        status_emoji = "OK" if health["status"] == "healthy" else "WARNING"
        message = (
            f"System Status: {status_emoji}\n"
            f"CPU: {health['cpu_usage']}%\n"
            f"Memory: {health['memory_usage']}%\n"
            f"Disk: {health['disk_usage']}%\n"
            f"Connections: {health['active_connections']}"
        )
        return {"message": message, "status": health["status"]}

    @task
    def send_notification(notification: dict):
        """Simulate sending a notification."""
        print(f"Sending notification (status: {notification['status']}):")
        print(notification["message"])
        print("Notification sent successfully!")

    health_data = check_system_health()
    formatted = format_message(health_data)
    send_notification(formatted)

notification_pipeline()

Exercise 2: Data Validation Pipeline

"""
File: dags/data_validation.py
Create a pipeline that validates incoming data quality.
"""
from airflow.decorators import dag, task
from datetime import datetime

@dag(
    dag_id="data_validation",
    start_date=datetime(2026, 1, 1),
    schedule="@hourly",
    catchup=False,
    tags=["exercise", "validation"],
)
def data_validation():

    @task
    def fetch_records():
        """Simulate fetching records from a source."""
        return [
            {"id": 1, "name": "Alice", "email": "alice@example.com", "age": 30},
            {"id": 2, "name": "", "email": "bob@example.com", "age": 25},
            {"id": 3, "name": "Charlie", "email": "invalid-email", "age": -5},
            {"id": 4, "name": "Diana", "email": "diana@example.com", "age": 28},
        ]

    @task
    def validate_records(records: list):
        """Validate each record and categorize as valid/invalid."""
        valid = []
        invalid = []
        for record in records:
            errors = []
            if not record.get("name"):
                errors.append("missing name")
            if "@" not in record.get("email", ""):
                errors.append("invalid email")
            if record.get("age", 0) < 0:
                errors.append("negative age")
            if errors:
                record["errors"] = errors
                invalid.append(record)
            else:
                valid.append(record)
        return {"valid": valid, "invalid": invalid}

    @task
    def report_results(validation: dict):
        """Report validation results."""
        total = len(validation["valid"]) + len(validation["invalid"])
        print(f"Total records: {total}")
        print(f"Valid records: {len(validation['valid'])}")
        print(f"Invalid records: {len(validation['invalid'])}")
        for record in validation["invalid"]:
            print(f"  ID {record['id']}: {', '.join(record['errors'])}")

    records = fetch_records()
    results = validate_records(records)
    report_results(results)

data_validation()

Summary

In this chapter, you learned:

  • The TaskFlow API is the recommended approach for writing DAGs in Airflow 3.x
  • How to use @dag and @task decorators to define workflows
  • Best practices for DAG file organization
  • How to build a complete ETL pipeline with multiple tasks
  • How to run, trigger, and monitor DAGs using CLI and Web UI