Chapter 5: Task Dependencies and Control Flow

Haiyue
13min

Chapter 5: Task Dependencies and Control Flow

Learning Objectives
  • Define task dependencies using >>, <<, and chain()
  • Use BranchPythonOperator for conditional workflows
  • Implement trigger rules (all_success, one_failed, none_skipped, etc.)
  • Use task groups to organize complex DAGs

Knowledge Points

5.1 Defining Task Dependencies

from airflow.decorators import dag, task
from airflow.operators.empty import EmptyOperator
from airflow.models.baseoperator import chain, cross_downstream
from datetime import datetime

@dag(
    dag_id="dependency_patterns",
    start_date=datetime(2026, 1, 1),
    schedule="@daily",
    catchup=False,
    tags=["dependencies"],
)
def dependency_patterns():

    start = EmptyOperator(task_id="start")
    end = EmptyOperator(task_id="end")

    @task
    def task_a():
        return "A"

    @task
    def task_b():
        return "B"

    @task
    def task_c():
        return "C"

    @task
    def task_d():
        return "D"

    a = task_a()
    b = task_b()
    c = task_c()
    d = task_d()

    # Method 1: Bitshift operators
    # start >> a >> b >> end  (linear)

    # Method 2: Set upstream/downstream
    # a.set_downstream(b)
    # b.set_upstream(a)

    # Method 3: Fan-out / Fan-in
    # start >> [a, b] >> c >> end

    # Method 4: chain() for complex patterns
    chain(start, [a, b], [c, d], end)
    # This creates: start >> a >> c >> end
    #               start >> b >> d >> end

dependency_patterns()

Dependency Patterns:

Linear:       A >> B >> C

Fan-out:      A >> [B, C, D]

Fan-in:       [A, B, C] >> D

Diamond:      A >> [B, C] >> D

Chain:        chain(A, [B, C], [D, E], F)
              A -> B -> D -> F
              A -> C -> E -> F

Cross:        cross_downstream([A, B], [C, D])
              A -> C, A -> D
              B -> C, B -> D

5.2 Conditional Branching

"""
File: dags/branching_demo.py
"""
from airflow.decorators import dag, task
from airflow.operators.python import BranchPythonOperator
from airflow.operators.empty import EmptyOperator
from datetime import datetime

@dag(
    dag_id="branching_demo",
    start_date=datetime(2026, 1, 1),
    schedule="@daily",
    catchup=False,
    tags=["branching"],
)
def branching_demo():

    @task
    def fetch_data():
        """Simulate fetching data with varying record counts."""
        import random
        count = random.randint(0, 200)
        print(f"Fetched {count} records")
        return count

    def decide_path(**context):
        """Branch based on data volume."""
        ti = context["ti"]
        count = ti.xcom_pull(task_ids="fetch_data")
        if count == 0:
            return "no_data_handler"
        elif count < 100:
            return "small_batch_process"
        else:
            return "large_batch_process"

    branch = BranchPythonOperator(
        task_id="branch_decision",
        python_callable=decide_path,
    )

    @task
    def no_data_handler():
        print("No data received. Sending alert...")

    @task
    def small_batch_process():
        print("Processing small batch in-memory...")

    @task
    def large_batch_process():
        print("Processing large batch with Spark...")

    # Join point after branching (must use trigger_rule)
    join = EmptyOperator(
        task_id="join",
        trigger_rule="none_failed_min_one_success",
    )

    @task(trigger_rule="none_failed_min_one_success")
    def cleanup():
        print("Cleaning up temporary files...")

    data = fetch_data()
    data >> branch

    no_data = no_data_handler()
    small = small_batch_process()
    large = large_batch_process()

    branch >> [no_data, small, large] >> join >> cleanup()

branching_demo()

5.3 Using @task.branch Decorator (Airflow 3.x)

"""
File: dags/taskflow_branching.py
"""
from airflow.decorators import dag, task
from airflow.operators.empty import EmptyOperator
from datetime import datetime

@dag(
    dag_id="taskflow_branching",
    start_date=datetime(2026, 1, 1),
    schedule="@daily",
    catchup=False,
    tags=["branching", "taskflow"],
)
def taskflow_branching():

    @task.branch
    def check_day_of_week():
        """Branch based on the day of the week."""
        day = datetime.now().weekday()
        if day < 5:  # Monday to Friday
            return "weekday_report"
        else:
            return "weekend_summary"

    @task
    def weekday_report():
        print("Generating detailed weekday report...")
        return "weekday_report_complete"

    @task
    def weekend_summary():
        print("Generating weekend summary...")
        return "weekend_summary_complete"

    end = EmptyOperator(
        task_id="end",
        trigger_rule="none_failed_min_one_success",
    )

    check_day_of_week() >> [weekday_report(), weekend_summary()] >> end

taskflow_branching()

5.4 Trigger Rules

Trigger rules determine when a task runs based on the status of its upstream tasks.

"""
File: dags/trigger_rules_demo.py
"""
from airflow.decorators import dag, task
from airflow.operators.empty import EmptyOperator
from airflow.utils.trigger_rule import TriggerRule
from datetime import datetime

@dag(
    dag_id="trigger_rules_demo",
    start_date=datetime(2026, 1, 1),
    schedule="@daily",
    catchup=False,
    tags=["trigger-rules"],
)
def trigger_rules_demo():

    @task
    def might_fail():
        import random
        if random.random() < 0.3:
            raise Exception("Random failure!")
        return "success"

    @task
    def always_succeeds():
        return "ok"

    # Runs only if ALL upstream tasks succeed (default)
    @task(trigger_rule=TriggerRule.ALL_SUCCESS)
    def all_success_task():
        print("All upstream tasks succeeded!")

    # Runs if ANY upstream task fails
    @task(trigger_rule=TriggerRule.ONE_FAILED)
    def alert_on_failure():
        print("ALERT: At least one upstream task failed!")

    # Always runs regardless of upstream status
    @task(trigger_rule=TriggerRule.ALL_DONE)
    def always_run_cleanup():
        print("Cleanup runs no matter what happened upstream")

    # Runs if no upstream task has failed (includes skipped)
    @task(trigger_rule=TriggerRule.NONE_FAILED)
    def none_failed_task():
        print("No failures detected upstream")

    # Runs if none are skipped and all have finished
    @task(trigger_rule=TriggerRule.NONE_SKIPPED)
    def none_skipped_task():
        print("No tasks were skipped")

    a = might_fail()
    b = always_succeeds()

    [a, b] >> all_success_task()
    [a, b] >> alert_on_failure()
    [a, b] >> always_run_cleanup()
    [a, b] >> none_failed_task()

trigger_rules_demo()

Available Trigger Rules:

RuleDescription
all_successAll parents succeeded (default)
all_failedAll parents failed
all_doneAll parents completed (any status)
one_successAt least one parent succeeded
one_failedAt least one parent failed
one_doneAt least one parent completed
none_failedNo parent has failed (success or skipped)
none_skippedNo parent was skipped
none_failed_min_one_successNo failure and at least one success
alwaysRun regardless of upstream

5.5 Task Groups

Task Groups organize related tasks visually in the UI without the complexity of SubDAGs.

"""
File: dags/task_groups_demo.py
"""
from airflow.decorators import dag, task, task_group
from airflow.operators.empty import EmptyOperator
from datetime import datetime

@dag(
    dag_id="task_groups_demo",
    start_date=datetime(2026, 1, 1),
    schedule="@daily",
    catchup=False,
    tags=["task-groups"],
)
def task_groups_demo():

    start = EmptyOperator(task_id="start")
    end = EmptyOperator(task_id="end")

    @task_group(group_id="extract")
    def extract_group():
        @task
        def extract_users():
            return {"users": [{"id": 1, "name": "Alice"}, {"id": 2, "name": "Bob"}]}

        @task
        def extract_orders():
            return {"orders": [{"id": 101, "user_id": 1}, {"id": 102, "user_id": 2}]}

        @task
        def extract_products():
            return {"products": [{"id": "P1", "name": "Widget"}]}

        return {
            "users": extract_users(),
            "orders": extract_orders(),
            "products": extract_products(),
        }

    @task_group(group_id="transform")
    def transform_group(raw_data):
        @task
        def join_data(data: dict):
            print(f"Joining users, orders, and products...")
            return {"joined_records": 150}

        @task
        def validate(joined: dict):
            print(f"Validating {joined['joined_records']} records")
            return {"valid": 148, "invalid": 2}

        joined = join_data(raw_data)
        return validate(joined)

    @task_group(group_id="load")
    def load_group(validated_data):
        @task
        def load_to_warehouse(data: dict):
            print(f"Loading {data['valid']} valid records to warehouse")

        @task
        def load_to_report(data: dict):
            print(f"Loading {data['valid']} records for reporting")

        load_to_warehouse(validated_data)
        load_to_report(validated_data)

    # Connect the groups
    raw = extract_group()
    validated = transform_group(raw)
    start >> raw
    load_group(validated) >> end

task_groups_demo()

Practice Exercise

Exercise: Build a Multi-Branch Data Pipeline

"""
File: dags/multi_branch_pipeline.py
Build a pipeline that:
1. Fetches data from multiple sources in parallel (task group)
2. Validates the data quality
3. Branches based on quality score
4. Either processes normally or triggers an alert
5. Always runs a cleanup step
"""
from airflow.decorators import dag, task, task_group
from airflow.operators.empty import EmptyOperator
from datetime import datetime

@dag(
    dag_id="multi_branch_pipeline",
    start_date=datetime(2026, 1, 1),
    schedule="@daily",
    catchup=False,
    tags=["exercise", "intermediate"],
)
def multi_branch_pipeline():

    @task_group(group_id="sources")
    def fetch_sources():
        @task
        def source_database():
            return {"source": "db", "records": 500, "errors": 2}

        @task
        def source_api():
            return {"source": "api", "records": 300, "errors": 15}

        @task
        def source_files():
            return {"source": "files", "records": 200, "errors": 1}

        return [source_database(), source_api(), source_files()]

    @task
    def aggregate_quality(sources: list):
        total_records = sum(s["records"] for s in sources)
        total_errors = sum(s["errors"] for s in sources)
        quality_score = ((total_records - total_errors) / total_records) * 100
        return {
            "total_records": total_records,
            "total_errors": total_errors,
            "quality_score": round(quality_score, 2),
        }

    @task.branch
    def quality_check(quality: dict):
        if quality["quality_score"] >= 99:
            return "high_quality_process"
        elif quality["quality_score"] >= 95:
            return "standard_process"
        else:
            return "alert_and_review"

    @task
    def high_quality_process():
        print("Data quality excellent - fast-track processing")

    @task
    def standard_process():
        print("Data quality acceptable - standard processing")

    @task
    def alert_and_review():
        print("ALERT: Data quality below threshold!")

    @task(trigger_rule="all_done")
    def cleanup():
        print("Cleanup: removing temp files regardless of outcome")

    sources = fetch_sources()
    quality = aggregate_quality(sources)
    check = quality_check(quality)
    check >> [high_quality_process(), standard_process(), alert_and_review()]
    check >> cleanup()

multi_branch_pipeline()

Summary

In this chapter, you learned:

  • Multiple ways to define task dependencies (>>, chain(), cross_downstream())
  • How to implement conditional branching with BranchPythonOperator and @task.branch
  • Trigger rules control when tasks run based on upstream status
  • Task Groups organize complex DAGs visually without SubDAG overhead