Chapter 5: Task Dependencies and Control Flow
Haiyue
13min
Chapter 5: Task Dependencies and Control Flow
Learning Objectives
- Define task dependencies using
>>,<<, and chain() - Use BranchPythonOperator for conditional workflows
- Implement trigger rules (all_success, one_failed, none_skipped, etc.)
- Use task groups to organize complex DAGs
Knowledge Points
5.1 Defining Task Dependencies
from airflow.decorators import dag, task
from airflow.operators.empty import EmptyOperator
from airflow.models.baseoperator import chain, cross_downstream
from datetime import datetime
@dag(
dag_id="dependency_patterns",
start_date=datetime(2026, 1, 1),
schedule="@daily",
catchup=False,
tags=["dependencies"],
)
def dependency_patterns():
start = EmptyOperator(task_id="start")
end = EmptyOperator(task_id="end")
@task
def task_a():
return "A"
@task
def task_b():
return "B"
@task
def task_c():
return "C"
@task
def task_d():
return "D"
a = task_a()
b = task_b()
c = task_c()
d = task_d()
# Method 1: Bitshift operators
# start >> a >> b >> end (linear)
# Method 2: Set upstream/downstream
# a.set_downstream(b)
# b.set_upstream(a)
# Method 3: Fan-out / Fan-in
# start >> [a, b] >> c >> end
# Method 4: chain() for complex patterns
chain(start, [a, b], [c, d], end)
# This creates: start >> a >> c >> end
# start >> b >> d >> end
dependency_patterns()
Dependency Patterns:
Linear: A >> B >> C
Fan-out: A >> [B, C, D]
Fan-in: [A, B, C] >> D
Diamond: A >> [B, C] >> D
Chain: chain(A, [B, C], [D, E], F)
A -> B -> D -> F
A -> C -> E -> F
Cross: cross_downstream([A, B], [C, D])
A -> C, A -> D
B -> C, B -> D
5.2 Conditional Branching
"""
File: dags/branching_demo.py
"""
from airflow.decorators import dag, task
from airflow.operators.python import BranchPythonOperator
from airflow.operators.empty import EmptyOperator
from datetime import datetime
@dag(
dag_id="branching_demo",
start_date=datetime(2026, 1, 1),
schedule="@daily",
catchup=False,
tags=["branching"],
)
def branching_demo():
@task
def fetch_data():
"""Simulate fetching data with varying record counts."""
import random
count = random.randint(0, 200)
print(f"Fetched {count} records")
return count
def decide_path(**context):
"""Branch based on data volume."""
ti = context["ti"]
count = ti.xcom_pull(task_ids="fetch_data")
if count == 0:
return "no_data_handler"
elif count < 100:
return "small_batch_process"
else:
return "large_batch_process"
branch = BranchPythonOperator(
task_id="branch_decision",
python_callable=decide_path,
)
@task
def no_data_handler():
print("No data received. Sending alert...")
@task
def small_batch_process():
print("Processing small batch in-memory...")
@task
def large_batch_process():
print("Processing large batch with Spark...")
# Join point after branching (must use trigger_rule)
join = EmptyOperator(
task_id="join",
trigger_rule="none_failed_min_one_success",
)
@task(trigger_rule="none_failed_min_one_success")
def cleanup():
print("Cleaning up temporary files...")
data = fetch_data()
data >> branch
no_data = no_data_handler()
small = small_batch_process()
large = large_batch_process()
branch >> [no_data, small, large] >> join >> cleanup()
branching_demo()
5.3 Using @task.branch Decorator (Airflow 3.x)
"""
File: dags/taskflow_branching.py
"""
from airflow.decorators import dag, task
from airflow.operators.empty import EmptyOperator
from datetime import datetime
@dag(
dag_id="taskflow_branching",
start_date=datetime(2026, 1, 1),
schedule="@daily",
catchup=False,
tags=["branching", "taskflow"],
)
def taskflow_branching():
@task.branch
def check_day_of_week():
"""Branch based on the day of the week."""
day = datetime.now().weekday()
if day < 5: # Monday to Friday
return "weekday_report"
else:
return "weekend_summary"
@task
def weekday_report():
print("Generating detailed weekday report...")
return "weekday_report_complete"
@task
def weekend_summary():
print("Generating weekend summary...")
return "weekend_summary_complete"
end = EmptyOperator(
task_id="end",
trigger_rule="none_failed_min_one_success",
)
check_day_of_week() >> [weekday_report(), weekend_summary()] >> end
taskflow_branching()
5.4 Trigger Rules
Trigger rules determine when a task runs based on the status of its upstream tasks.
"""
File: dags/trigger_rules_demo.py
"""
from airflow.decorators import dag, task
from airflow.operators.empty import EmptyOperator
from airflow.utils.trigger_rule import TriggerRule
from datetime import datetime
@dag(
dag_id="trigger_rules_demo",
start_date=datetime(2026, 1, 1),
schedule="@daily",
catchup=False,
tags=["trigger-rules"],
)
def trigger_rules_demo():
@task
def might_fail():
import random
if random.random() < 0.3:
raise Exception("Random failure!")
return "success"
@task
def always_succeeds():
return "ok"
# Runs only if ALL upstream tasks succeed (default)
@task(trigger_rule=TriggerRule.ALL_SUCCESS)
def all_success_task():
print("All upstream tasks succeeded!")
# Runs if ANY upstream task fails
@task(trigger_rule=TriggerRule.ONE_FAILED)
def alert_on_failure():
print("ALERT: At least one upstream task failed!")
# Always runs regardless of upstream status
@task(trigger_rule=TriggerRule.ALL_DONE)
def always_run_cleanup():
print("Cleanup runs no matter what happened upstream")
# Runs if no upstream task has failed (includes skipped)
@task(trigger_rule=TriggerRule.NONE_FAILED)
def none_failed_task():
print("No failures detected upstream")
# Runs if none are skipped and all have finished
@task(trigger_rule=TriggerRule.NONE_SKIPPED)
def none_skipped_task():
print("No tasks were skipped")
a = might_fail()
b = always_succeeds()
[a, b] >> all_success_task()
[a, b] >> alert_on_failure()
[a, b] >> always_run_cleanup()
[a, b] >> none_failed_task()
trigger_rules_demo()
Available Trigger Rules:
| Rule | Description |
|---|---|
all_success | All parents succeeded (default) |
all_failed | All parents failed |
all_done | All parents completed (any status) |
one_success | At least one parent succeeded |
one_failed | At least one parent failed |
one_done | At least one parent completed |
none_failed | No parent has failed (success or skipped) |
none_skipped | No parent was skipped |
none_failed_min_one_success | No failure and at least one success |
always | Run regardless of upstream |
5.5 Task Groups
Task Groups organize related tasks visually in the UI without the complexity of SubDAGs.
"""
File: dags/task_groups_demo.py
"""
from airflow.decorators import dag, task, task_group
from airflow.operators.empty import EmptyOperator
from datetime import datetime
@dag(
dag_id="task_groups_demo",
start_date=datetime(2026, 1, 1),
schedule="@daily",
catchup=False,
tags=["task-groups"],
)
def task_groups_demo():
start = EmptyOperator(task_id="start")
end = EmptyOperator(task_id="end")
@task_group(group_id="extract")
def extract_group():
@task
def extract_users():
return {"users": [{"id": 1, "name": "Alice"}, {"id": 2, "name": "Bob"}]}
@task
def extract_orders():
return {"orders": [{"id": 101, "user_id": 1}, {"id": 102, "user_id": 2}]}
@task
def extract_products():
return {"products": [{"id": "P1", "name": "Widget"}]}
return {
"users": extract_users(),
"orders": extract_orders(),
"products": extract_products(),
}
@task_group(group_id="transform")
def transform_group(raw_data):
@task
def join_data(data: dict):
print(f"Joining users, orders, and products...")
return {"joined_records": 150}
@task
def validate(joined: dict):
print(f"Validating {joined['joined_records']} records")
return {"valid": 148, "invalid": 2}
joined = join_data(raw_data)
return validate(joined)
@task_group(group_id="load")
def load_group(validated_data):
@task
def load_to_warehouse(data: dict):
print(f"Loading {data['valid']} valid records to warehouse")
@task
def load_to_report(data: dict):
print(f"Loading {data['valid']} records for reporting")
load_to_warehouse(validated_data)
load_to_report(validated_data)
# Connect the groups
raw = extract_group()
validated = transform_group(raw)
start >> raw
load_group(validated) >> end
task_groups_demo()
Practice Exercise
Exercise: Build a Multi-Branch Data Pipeline
"""
File: dags/multi_branch_pipeline.py
Build a pipeline that:
1. Fetches data from multiple sources in parallel (task group)
2. Validates the data quality
3. Branches based on quality score
4. Either processes normally or triggers an alert
5. Always runs a cleanup step
"""
from airflow.decorators import dag, task, task_group
from airflow.operators.empty import EmptyOperator
from datetime import datetime
@dag(
dag_id="multi_branch_pipeline",
start_date=datetime(2026, 1, 1),
schedule="@daily",
catchup=False,
tags=["exercise", "intermediate"],
)
def multi_branch_pipeline():
@task_group(group_id="sources")
def fetch_sources():
@task
def source_database():
return {"source": "db", "records": 500, "errors": 2}
@task
def source_api():
return {"source": "api", "records": 300, "errors": 15}
@task
def source_files():
return {"source": "files", "records": 200, "errors": 1}
return [source_database(), source_api(), source_files()]
@task
def aggregate_quality(sources: list):
total_records = sum(s["records"] for s in sources)
total_errors = sum(s["errors"] for s in sources)
quality_score = ((total_records - total_errors) / total_records) * 100
return {
"total_records": total_records,
"total_errors": total_errors,
"quality_score": round(quality_score, 2),
}
@task.branch
def quality_check(quality: dict):
if quality["quality_score"] >= 99:
return "high_quality_process"
elif quality["quality_score"] >= 95:
return "standard_process"
else:
return "alert_and_review"
@task
def high_quality_process():
print("Data quality excellent - fast-track processing")
@task
def standard_process():
print("Data quality acceptable - standard processing")
@task
def alert_and_review():
print("ALERT: Data quality below threshold!")
@task(trigger_rule="all_done")
def cleanup():
print("Cleanup: removing temp files regardless of outcome")
sources = fetch_sources()
quality = aggregate_quality(sources)
check = quality_check(quality)
check >> [high_quality_process(), standard_process(), alert_and_review()]
check >> cleanup()
multi_branch_pipeline()
Summary
In this chapter, you learned:
- Multiple ways to define task dependencies (
>>,chain(),cross_downstream()) - How to implement conditional branching with
BranchPythonOperatorand@task.branch - Trigger rules control when tasks run based on upstream status
- Task Groups organize complex DAGs visually without SubDAG overhead