Chapter 9: Dynamic DAGs and Advanced Patterns

Haiyue
15min

Chapter 9: Dynamic DAGs and Advanced Patterns

Learning Objectives
  • Generate DAGs dynamically from configuration files
  • Use dynamic task mapping (expand/map) for parallel processing
  • Implement SubDAGs vs TaskGroups comparison
  • Apply the dataset-driven scheduling in Airflow 3.x

Knowledge Points

9.1 Dynamic DAG Generation

Generate multiple DAGs from a single Python file using configuration:

"""
File: dags/dynamic_dag_generator.py
Generate DAGs dynamically from a YAML configuration.
"""
from airflow.decorators import dag, task
from datetime import datetime
import yaml

# Configuration could come from a YAML file, database, or API
DAG_CONFIGS = [
    {
        "dag_id": "etl_sales",
        "schedule": "@daily",
        "source_table": "raw_sales",
        "target_table": "analytics.sales",
        "owner": "sales_team",
    },
    {
        "dag_id": "etl_inventory",
        "schedule": "@hourly",
        "source_table": "raw_inventory",
        "target_table": "analytics.inventory",
        "owner": "ops_team",
    },
    {
        "dag_id": "etl_customers",
        "schedule": "0 6 * * *",
        "source_table": "raw_customers",
        "target_table": "analytics.customers",
        "owner": "marketing_team",
    },
]


def create_etl_dag(config: dict):
    """Factory function to create an ETL DAG from config."""

    @dag(
        dag_id=config["dag_id"],
        start_date=datetime(2026, 1, 1),
        schedule=config["schedule"],
        catchup=False,
        default_args={"owner": config["owner"]},
        tags=["dynamic", "etl"],
    )
    def etl_pipeline():

        @task
        def extract():
            print(f"Extracting from {config['source_table']}")
            return {"source": config["source_table"], "rows": 1000}

        @task
        def transform(data: dict):
            print(f"Transforming {data['rows']} rows from {data['source']}")
            return {"rows": data["rows"], "transformed": True}

        @task
        def load(data: dict):
            print(f"Loading {data['rows']} rows into {config['target_table']}")

        raw = extract()
        processed = transform(raw)
        load(processed)

    return etl_pipeline()


# Generate DAGs dynamically
for config in DAG_CONFIGS:
    globals()[config["dag_id"]] = create_etl_dag(config)

Loading from YAML file:

# config/etl_pipelines.yaml
pipelines:
  - dag_id: etl_orders
    schedule: "@daily"
    source: orders_db
    target: warehouse.orders
    owner: data_team

  - dag_id: etl_events
    schedule: "*/15 * * * *"
    source: event_stream
    target: warehouse.events
    owner: analytics_team
"""
File: dags/yaml_dag_generator.py
"""
from airflow.decorators import dag, task
from datetime import datetime
import yaml
from pathlib import Path

config_path = Path(__file__).parent / "config" / "etl_pipelines.yaml"

if config_path.exists():
    with open(config_path) as f:
        config = yaml.safe_load(f)

    for pipeline in config.get("pipelines", []):
        # Use the factory pattern as shown above
        pass

9.2 Dynamic Task Mapping

Dynamic task mapping allows you to create tasks dynamically at runtime based on data. This is one of the most powerful features in Airflow 3.x.

"""
File: dags/dynamic_task_mapping.py
"""
from airflow.decorators import dag, task
from datetime import datetime

@dag(
    dag_id="dynamic_task_mapping",
    start_date=datetime(2026, 1, 1),
    schedule="@daily",
    catchup=False,
    tags=["dynamic", "mapping"],
)
def dynamic_task_mapping():

    @task
    def get_file_list():
        """Return a list of files to process."""
        return [
            {"filename": "users.csv", "size_mb": 10},
            {"filename": "orders.csv", "size_mb": 50},
            {"filename": "products.csv", "size_mb": 5},
            {"filename": "events.csv", "size_mb": 200},
        ]

    @task
    def process_file(file_info: dict):
        """Process a single file. This runs as multiple parallel tasks."""
        print(f"Processing {file_info['filename']} ({file_info['size_mb']} MB)")
        return {
            "filename": file_info["filename"],
            "records_processed": file_info["size_mb"] * 1000,
            "status": "complete",
        }

    @task
    def summarize(results: list):
        """Aggregate results from all mapped tasks."""
        total_records = sum(r["records_processed"] for r in results)
        print(f"Total files processed: {len(results)}")
        print(f"Total records: {total_records:,}")
        for r in results:
            print(f"  {r['filename']}: {r['records_processed']:,} records")

    files = get_file_list()
    # .expand() creates one task instance per item in the list
    results = process_file.expand(file_info=files)
    summarize(results)

dynamic_task_mapping()

9.3 Advanced Task Mapping Patterns

"""
File: dags/advanced_mapping.py
"""
from airflow.decorators import dag, task
from datetime import datetime

@dag(
    dag_id="advanced_mapping",
    start_date=datetime(2026, 1, 1),
    schedule="@daily",
    catchup=False,
    tags=["dynamic", "advanced"],
)
def advanced_mapping():

    # Pattern 1: Map with partial() for fixed + dynamic arguments
    @task
    def process_with_config(item: str, batch_size: int, mode: str):
        print(f"Processing {item} with batch_size={batch_size}, mode={mode}")
        return f"{item}_done"

    items = ["file_a", "file_b", "file_c"]
    # expand() for dynamic args, partial() for fixed args
    process_with_config.partial(batch_size=100, mode="fast").expand(item=items)

    # Pattern 2: Map over multiple arguments with zip
    @task
    def send_email(recipient: str, subject: str):
        print(f"Sending '{subject}' to {recipient}")

    recipients = ["alice@example.com", "bob@example.com"]
    subjects = ["Daily Report", "Weekly Summary"]

    # expand_kwargs creates mapped tasks from a list of dicts
    @task
    def build_email_args():
        return [
            {"recipient": r, "subject": s}
            for r, s in zip(recipients, subjects)
        ]

    email_args = build_email_args()
    send_email.expand_kwargs(email_args)

    # Pattern 3: Chaining mapped tasks
    @task
    def generate_ids():
        return list(range(1, 6))

    @task
    def fetch_data(record_id: int):
        return {"id": record_id, "value": record_id * 10}

    @task
    def enrich_data(record: dict):
        record["enriched"] = True
        return record

    @task
    def collect_results(records: list):
        print(f"Collected {len(records)} enriched records")

    ids = generate_ids()
    fetched = fetch_data.expand(record_id=ids)
    enriched = enrich_data.expand(record=fetched)
    collect_results(enriched)

advanced_mapping()

9.4 Dataset-Driven Scheduling (Airflow 3.x)

Datasets allow DAGs to be triggered when upstream DAGs update specific datasets:

"""
File: dags/dataset_producer.py
Producer DAG that updates a dataset.
"""
from airflow.decorators import dag, task
from airflow.datasets import Dataset
from datetime import datetime

# Define datasets
sales_dataset = Dataset("s3://my-bucket/sales/daily/")
inventory_dataset = Dataset("s3://my-bucket/inventory/")

@dag(
    dag_id="dataset_producer_sales",
    start_date=datetime(2026, 1, 1),
    schedule="@daily",
    catchup=False,
    tags=["datasets", "producer"],
)
def dataset_producer_sales():

    @task(outlets=[sales_dataset])
    def update_sales_data():
        """This task declares it updates the sales dataset."""
        print("Updating sales data in S3...")
        # In practice: write data to S3
        return "sales_data_updated"

    update_sales_data()

dataset_producer_sales()


@dag(
    dag_id="dataset_producer_inventory",
    start_date=datetime(2026, 1, 1),
    schedule="@hourly",
    catchup=False,
    tags=["datasets", "producer"],
)
def dataset_producer_inventory():

    @task(outlets=[inventory_dataset])
    def update_inventory_data():
        """This task declares it updates the inventory dataset."""
        print("Updating inventory data in S3...")
        return "inventory_data_updated"

    update_inventory_data()

dataset_producer_inventory()
"""
File: dags/dataset_consumer.py
Consumer DAG triggered when datasets are updated.
"""
from airflow.decorators import dag, task
from airflow.datasets import Dataset
from datetime import datetime

sales_dataset = Dataset("s3://my-bucket/sales/daily/")
inventory_dataset = Dataset("s3://my-bucket/inventory/")

# Triggered when BOTH datasets are updated
@dag(
    dag_id="dataset_consumer_report",
    start_date=datetime(2026, 1, 1),
    schedule=[sales_dataset, inventory_dataset],  # AND logic
    catchup=False,
    tags=["datasets", "consumer"],
)
def dataset_consumer_report():

    @task
    def generate_combined_report():
        """Runs after both sales and inventory data are updated."""
        print("Both datasets updated - generating combined report")
        print("Reading latest sales data from S3...")
        print("Reading latest inventory data from S3...")
        print("Combined report generated!")

    generate_combined_report()

dataset_consumer_report()


# Triggered when EITHER dataset is updated
@dag(
    dag_id="dataset_consumer_monitor",
    start_date=datetime(2026, 1, 1),
    schedule=(sales_dataset | inventory_dataset),  # OR logic
    catchup=False,
    tags=["datasets", "consumer"],
)
def dataset_consumer_monitor():

    @task
    def log_data_change():
        """Runs whenever any monitored dataset changes."""
        print("Data change detected - logging event")

    log_data_change()

dataset_consumer_monitor()

9.5 SubDAGs vs TaskGroups

"""
File: dags/taskgroup_vs_subdag.py
TaskGroups are the recommended approach in Airflow 3.x.
SubDAGs are deprecated.
"""
from airflow.decorators import dag, task, task_group
from datetime import datetime

@dag(
    dag_id="taskgroup_recommended",
    start_date=datetime(2026, 1, 1),
    schedule="@daily",
    catchup=False,
    tags=["task-groups"],
)
def taskgroup_recommended():

    # Nested task groups for complex organization
    @task_group(group_id="ingestion")
    def ingestion():

        @task_group(group_id="source_a")
        def source_a_group():
            @task
            def extract_a():
                return {"source": "A", "records": 100}

            @task
            def validate_a(data: dict):
                print(f"Validating {data['records']} records from {data['source']}")
                return data

            return validate_a(extract_a())

        @task_group(group_id="source_b")
        def source_b_group():
            @task
            def extract_b():
                return {"source": "B", "records": 200}

            @task
            def validate_b(data: dict):
                print(f"Validating {data['records']} records from {data['source']}")
                return data

            return validate_b(extract_b())

        return [source_a_group(), source_b_group()]

    @task_group(group_id="processing")
    def processing(data_sources: list):
        @task
        def merge_data(sources: list):
            total = sum(s["records"] for s in sources)
            print(f"Merged {total} records from {len(sources)} sources")
            return {"total_records": total}

        @task
        def apply_rules(data: dict):
            print(f"Applying business rules to {data['total_records']} records")
            return data

        merged = merge_data(data_sources)
        return apply_rules(merged)

    sources = ingestion()
    processing(sources)

taskgroup_recommended()

Practice Exercise

Exercise: Dynamic Multi-Region Pipeline

"""
File: dags/multi_region_pipeline.py
Build a pipeline that:
1. Discovers active regions dynamically
2. Processes data for each region in parallel (mapped tasks)
3. Aggregates results
4. Updates a dataset to trigger downstream consumers
"""
from airflow.decorators import dag, task
from airflow.datasets import Dataset
from datetime import datetime

regional_dataset = Dataset("warehouse://regional_metrics")

@dag(
    dag_id="multi_region_pipeline",
    start_date=datetime(2026, 1, 1),
    schedule="@daily",
    catchup=False,
    tags=["exercise", "dynamic"],
)
def multi_region_pipeline():

    @task
    def discover_regions():
        """Dynamically discover active regions."""
        return ["us-east-1", "eu-west-1", "ap-southeast-1", "us-west-2"]

    @task
    def fetch_region_data(region: str):
        """Fetch data for a single region."""
        import random
        data = {
            "region": region,
            "users": random.randint(1000, 50000),
            "revenue": round(random.uniform(10000, 500000), 2),
            "latency_ms": round(random.uniform(10, 200), 1),
        }
        print(f"Region {region}: {data['users']} users, ${data['revenue']:,.2f}")
        return data

    @task(outlets=[regional_dataset])
    def aggregate_metrics(region_data: list):
        """Aggregate metrics from all regions."""
        total_users = sum(r["users"] for r in region_data)
        total_revenue = sum(r["revenue"] for r in region_data)
        avg_latency = sum(r["latency_ms"] for r in region_data) / len(region_data)

        print(f"\n=== Global Metrics ===")
        print(f"Regions: {len(region_data)}")
        print(f"Total Users: {total_users:,}")
        print(f"Total Revenue: ${total_revenue:,.2f}")
        print(f"Avg Latency: {avg_latency:.1f} ms")

        best_region = min(region_data, key=lambda r: r["latency_ms"])
        print(f"Best Latency: {best_region['region']} ({best_region['latency_ms']} ms)")

    regions = discover_regions()
    region_data = fetch_region_data.expand(region=regions)
    aggregate_metrics(region_data)

multi_region_pipeline()

Summary

In this chapter, you learned:

  • Dynamic DAG generation creates multiple DAGs from configuration files
  • Dynamic task mapping (expand()) runs tasks in parallel based on runtime data
  • Datasets enable event-driven scheduling between DAGs
  • TaskGroups are the recommended replacement for deprecated SubDAGs
  • These patterns enable scalable, maintainable workflow architectures