Chapter 3: Writing Your First DAG
Haiyue
12min
Chapter 3: Writing Your First DAG
Learning Objectives
- Create a basic DAG with the TaskFlow API
- Understand DAG parameters and configuration
- Use the
@dagand@taskdecorators - Run and monitor your DAG in the Web UI
Knowledge Points
3.1 The TaskFlow API (Recommended Approach)
The TaskFlow API is the recommended way to write DAGs in Airflow 3.x. It uses Python decorators to make DAG authoring more intuitive.
"""
File: dags/hello_airflow.py
Purpose: Your first DAG using TaskFlow API
"""
from airflow.decorators import dag, task
from datetime import datetime
@dag(
dag_id="hello_airflow",
start_date=datetime(2026, 1, 1),
schedule="@daily",
catchup=False,
tags=["beginner"],
)
def hello_airflow():
"""My first Airflow DAG."""
@task
def say_hello():
print("Hello, Airflow 3.x!")
return "greeting_sent"
@task
def say_goodbye(status: str):
print(f"Previous status: {status}")
print("Goodbye, Airflow!")
result = say_hello()
say_goodbye(result)
hello_airflow()
3.2 Traditional Approach vs TaskFlow
Traditional approach (still supported):
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
def extract_fn():
return {"data": [1, 2, 3]}
def process_fn(**context):
ti = context["ti"]
data = ti.xcom_pull(task_ids="extract")
print(f"Processing: {data}")
with DAG(
dag_id="traditional_dag",
start_date=datetime(2026, 1, 1),
schedule="@daily",
catchup=False,
) as dag:
extract = PythonOperator(
task_id="extract",
python_callable=extract_fn,
)
process = PythonOperator(
task_id="process",
python_callable=process_fn,
)
extract >> process
TaskFlow approach (recommended):
from airflow.decorators import dag, task
from datetime import datetime
@dag(
dag_id="taskflow_dag",
start_date=datetime(2026, 1, 1),
schedule="@daily",
catchup=False,
)
def taskflow_dag():
@task
def extract():
return {"data": [1, 2, 3]}
@task
def process(data: dict):
print(f"Processing: {data}")
process(extract())
taskflow_dag()
3.3 DAG File Structure Best Practices
dags/
├── __init__.py
├── etl/
│ ├── __init__.py
│ ├── sales_etl.py
│ └── user_etl.py
├── ml/
│ ├── __init__.py
│ └── model_training.py
├── common/
│ ├── __init__.py
│ └── utils.py
└── config/
└── pipeline_config.yaml
3.4 Comprehensive DAG Example
"""
File: dags/weather_pipeline.py
Purpose: A practical ETL pipeline that processes weather data
"""
from airflow.decorators import dag, task
from datetime import datetime, timedelta
import json
# Default arguments applied to all tasks
default_args = {
"owner": "data_team",
"retries": 3,
"retry_delay": timedelta(minutes=5),
"email_on_failure": False,
}
@dag(
dag_id="weather_pipeline",
start_date=datetime(2026, 1, 1),
schedule="0 6 * * *", # Every day at 6 AM
catchup=False,
default_args=default_args,
tags=["etl", "weather"],
doc_md="""
## Weather Data Pipeline
This DAG extracts weather data, transforms it, and loads it into storage.
- **Schedule**: Daily at 6 AM
- **Owner**: data_team
""",
)
def weather_pipeline():
@task
def extract_weather_data():
"""Simulate extracting weather data from an API."""
raw_data = [
{"city": "New York", "temp_f": 72, "humidity": 65, "wind_mph": 12},
{"city": "London", "temp_f": 59, "humidity": 80, "wind_mph": 8},
{"city": "Tokyo", "temp_f": 85, "humidity": 70, "wind_mph": 5},
{"city": "Sydney", "temp_f": 68, "humidity": 55, "wind_mph": 15},
]
print(f"Extracted {len(raw_data)} weather records")
return raw_data
@task
def convert_to_celsius(weather_data: list):
"""Convert temperatures from Fahrenheit to Celsius."""
for record in weather_data:
record["temp_c"] = round((record["temp_f"] - 32) * 5 / 9, 1)
record["wind_kph"] = round(record["wind_mph"] * 1.60934, 1)
print(f"Converted {len(weather_data)} records to metric units")
return weather_data
@task
def classify_weather(weather_data: list):
"""Add weather classification based on conditions."""
for record in weather_data:
if record["temp_c"] > 30:
record["classification"] = "hot"
elif record["temp_c"] > 15:
record["classification"] = "mild"
else:
record["classification"] = "cold"
if record["humidity"] > 70:
record["classification"] += "_humid"
print(f"Classified {len(weather_data)} records")
return weather_data
@task
def generate_report(weather_data: list):
"""Generate a summary report."""
avg_temp = sum(r["temp_c"] for r in weather_data) / len(weather_data)
hottest = max(weather_data, key=lambda x: x["temp_c"])
coldest = min(weather_data, key=lambda x: x["temp_c"])
report = {
"date": datetime.now().isoformat(),
"total_cities": len(weather_data),
"avg_temperature_c": round(avg_temp, 1),
"hottest_city": hottest["city"],
"coldest_city": coldest["city"],
}
print(f"Report: {json.dumps(report, indent=2)}")
return report
@task
def save_results(weather_data: list, report: dict):
"""Simulate saving results to a data store."""
print(f"Saving {len(weather_data)} records to database...")
print(f"Saving report: avg temp = {report['avg_temperature_c']}°C")
print("All data saved successfully!")
# Define the pipeline flow
raw_data = extract_weather_data()
metric_data = convert_to_celsius(raw_data)
classified_data = classify_weather(metric_data)
report = generate_report(classified_data)
save_results(classified_data, report)
weather_pipeline()
3.5 Running and Monitoring Your DAG
# Validate DAG syntax (no import errors)
airflow dags list-import-errors
# Test a DAG run (does not store results in DB)
airflow dags test weather_pipeline 2026-01-15
# Trigger a DAG run (stores results)
airflow dags trigger weather_pipeline
# Monitor task status
airflow tasks states-for-dag-run weather_pipeline <run_id>
Monitoring in the Web UI:
- Navigate to the DAGs page
- Click on
weather_pipeline - Switch to Graph view to see task dependencies
- Switch to Grid view to see historical run status
- Click on a task instance to view logs
Practice Exercise
Exercise 1: Build a Simple Notification Pipeline
"""
File: dags/notification_pipeline.py
Create a pipeline that:
1. Checks system health
2. Formats a status message
3. Sends a notification (simulated)
"""
from airflow.decorators import dag, task
from datetime import datetime
@dag(
dag_id="notification_pipeline",
start_date=datetime(2026, 1, 1),
schedule="*/30 * * * *", # Every 30 minutes
catchup=False,
tags=["exercise", "notifications"],
)
def notification_pipeline():
@task
def check_system_health():
"""Check various system metrics."""
health = {
"cpu_usage": 45.2,
"memory_usage": 62.8,
"disk_usage": 71.5,
"active_connections": 128,
"status": "healthy",
}
if health["cpu_usage"] > 90 or health["memory_usage"] > 90:
health["status"] = "warning"
return health
@task
def format_message(health: dict):
"""Format the health data into a readable message."""
status_emoji = "OK" if health["status"] == "healthy" else "WARNING"
message = (
f"System Status: {status_emoji}\n"
f"CPU: {health['cpu_usage']}%\n"
f"Memory: {health['memory_usage']}%\n"
f"Disk: {health['disk_usage']}%\n"
f"Connections: {health['active_connections']}"
)
return {"message": message, "status": health["status"]}
@task
def send_notification(notification: dict):
"""Simulate sending a notification."""
print(f"Sending notification (status: {notification['status']}):")
print(notification["message"])
print("Notification sent successfully!")
health_data = check_system_health()
formatted = format_message(health_data)
send_notification(formatted)
notification_pipeline()
Exercise 2: Data Validation Pipeline
"""
File: dags/data_validation.py
Create a pipeline that validates incoming data quality.
"""
from airflow.decorators import dag, task
from datetime import datetime
@dag(
dag_id="data_validation",
start_date=datetime(2026, 1, 1),
schedule="@hourly",
catchup=False,
tags=["exercise", "validation"],
)
def data_validation():
@task
def fetch_records():
"""Simulate fetching records from a source."""
return [
{"id": 1, "name": "Alice", "email": "alice@example.com", "age": 30},
{"id": 2, "name": "", "email": "bob@example.com", "age": 25},
{"id": 3, "name": "Charlie", "email": "invalid-email", "age": -5},
{"id": 4, "name": "Diana", "email": "diana@example.com", "age": 28},
]
@task
def validate_records(records: list):
"""Validate each record and categorize as valid/invalid."""
valid = []
invalid = []
for record in records:
errors = []
if not record.get("name"):
errors.append("missing name")
if "@" not in record.get("email", ""):
errors.append("invalid email")
if record.get("age", 0) < 0:
errors.append("negative age")
if errors:
record["errors"] = errors
invalid.append(record)
else:
valid.append(record)
return {"valid": valid, "invalid": invalid}
@task
def report_results(validation: dict):
"""Report validation results."""
total = len(validation["valid"]) + len(validation["invalid"])
print(f"Total records: {total}")
print(f"Valid records: {len(validation['valid'])}")
print(f"Invalid records: {len(validation['invalid'])}")
for record in validation["invalid"]:
print(f" ID {record['id']}: {', '.join(record['errors'])}")
records = fetch_records()
results = validate_records(records)
report_results(results)
data_validation()
Summary
In this chapter, you learned:
- The TaskFlow API is the recommended approach for writing DAGs in Airflow 3.x
- How to use
@dagand@taskdecorators to define workflows - Best practices for DAG file organization
- How to build a complete ETL pipeline with multiple tasks
- How to run, trigger, and monitor DAGs using CLI and Web UI