Chapter 9: Dynamic DAGs and Advanced Patterns
Haiyue
15min
Chapter 9: Dynamic DAGs and Advanced Patterns
Learning Objectives
- Generate DAGs dynamically from configuration files
- Use dynamic task mapping (expand/map) for parallel processing
- Implement SubDAGs vs TaskGroups comparison
- Apply the dataset-driven scheduling in Airflow 3.x
Knowledge Points
9.1 Dynamic DAG Generation
Generate multiple DAGs from a single Python file using configuration:
"""
File: dags/dynamic_dag_generator.py
Generate DAGs dynamically from a YAML configuration.
"""
from airflow.decorators import dag, task
from datetime import datetime
import yaml
# Configuration could come from a YAML file, database, or API
DAG_CONFIGS = [
{
"dag_id": "etl_sales",
"schedule": "@daily",
"source_table": "raw_sales",
"target_table": "analytics.sales",
"owner": "sales_team",
},
{
"dag_id": "etl_inventory",
"schedule": "@hourly",
"source_table": "raw_inventory",
"target_table": "analytics.inventory",
"owner": "ops_team",
},
{
"dag_id": "etl_customers",
"schedule": "0 6 * * *",
"source_table": "raw_customers",
"target_table": "analytics.customers",
"owner": "marketing_team",
},
]
def create_etl_dag(config: dict):
"""Factory function to create an ETL DAG from config."""
@dag(
dag_id=config["dag_id"],
start_date=datetime(2026, 1, 1),
schedule=config["schedule"],
catchup=False,
default_args={"owner": config["owner"]},
tags=["dynamic", "etl"],
)
def etl_pipeline():
@task
def extract():
print(f"Extracting from {config['source_table']}")
return {"source": config["source_table"], "rows": 1000}
@task
def transform(data: dict):
print(f"Transforming {data['rows']} rows from {data['source']}")
return {"rows": data["rows"], "transformed": True}
@task
def load(data: dict):
print(f"Loading {data['rows']} rows into {config['target_table']}")
raw = extract()
processed = transform(raw)
load(processed)
return etl_pipeline()
# Generate DAGs dynamically
for config in DAG_CONFIGS:
globals()[config["dag_id"]] = create_etl_dag(config)
Loading from YAML file:
# config/etl_pipelines.yaml
pipelines:
- dag_id: etl_orders
schedule: "@daily"
source: orders_db
target: warehouse.orders
owner: data_team
- dag_id: etl_events
schedule: "*/15 * * * *"
source: event_stream
target: warehouse.events
owner: analytics_team
"""
File: dags/yaml_dag_generator.py
"""
from airflow.decorators import dag, task
from datetime import datetime
import yaml
from pathlib import Path
config_path = Path(__file__).parent / "config" / "etl_pipelines.yaml"
if config_path.exists():
with open(config_path) as f:
config = yaml.safe_load(f)
for pipeline in config.get("pipelines", []):
# Use the factory pattern as shown above
pass
9.2 Dynamic Task Mapping
Dynamic task mapping allows you to create tasks dynamically at runtime based on data. This is one of the most powerful features in Airflow 3.x.
"""
File: dags/dynamic_task_mapping.py
"""
from airflow.decorators import dag, task
from datetime import datetime
@dag(
dag_id="dynamic_task_mapping",
start_date=datetime(2026, 1, 1),
schedule="@daily",
catchup=False,
tags=["dynamic", "mapping"],
)
def dynamic_task_mapping():
@task
def get_file_list():
"""Return a list of files to process."""
return [
{"filename": "users.csv", "size_mb": 10},
{"filename": "orders.csv", "size_mb": 50},
{"filename": "products.csv", "size_mb": 5},
{"filename": "events.csv", "size_mb": 200},
]
@task
def process_file(file_info: dict):
"""Process a single file. This runs as multiple parallel tasks."""
print(f"Processing {file_info['filename']} ({file_info['size_mb']} MB)")
return {
"filename": file_info["filename"],
"records_processed": file_info["size_mb"] * 1000,
"status": "complete",
}
@task
def summarize(results: list):
"""Aggregate results from all mapped tasks."""
total_records = sum(r["records_processed"] for r in results)
print(f"Total files processed: {len(results)}")
print(f"Total records: {total_records:,}")
for r in results:
print(f" {r['filename']}: {r['records_processed']:,} records")
files = get_file_list()
# .expand() creates one task instance per item in the list
results = process_file.expand(file_info=files)
summarize(results)
dynamic_task_mapping()
9.3 Advanced Task Mapping Patterns
"""
File: dags/advanced_mapping.py
"""
from airflow.decorators import dag, task
from datetime import datetime
@dag(
dag_id="advanced_mapping",
start_date=datetime(2026, 1, 1),
schedule="@daily",
catchup=False,
tags=["dynamic", "advanced"],
)
def advanced_mapping():
# Pattern 1: Map with partial() for fixed + dynamic arguments
@task
def process_with_config(item: str, batch_size: int, mode: str):
print(f"Processing {item} with batch_size={batch_size}, mode={mode}")
return f"{item}_done"
items = ["file_a", "file_b", "file_c"]
# expand() for dynamic args, partial() for fixed args
process_with_config.partial(batch_size=100, mode="fast").expand(item=items)
# Pattern 2: Map over multiple arguments with zip
@task
def send_email(recipient: str, subject: str):
print(f"Sending '{subject}' to {recipient}")
recipients = ["alice@example.com", "bob@example.com"]
subjects = ["Daily Report", "Weekly Summary"]
# expand_kwargs creates mapped tasks from a list of dicts
@task
def build_email_args():
return [
{"recipient": r, "subject": s}
for r, s in zip(recipients, subjects)
]
email_args = build_email_args()
send_email.expand_kwargs(email_args)
# Pattern 3: Chaining mapped tasks
@task
def generate_ids():
return list(range(1, 6))
@task
def fetch_data(record_id: int):
return {"id": record_id, "value": record_id * 10}
@task
def enrich_data(record: dict):
record["enriched"] = True
return record
@task
def collect_results(records: list):
print(f"Collected {len(records)} enriched records")
ids = generate_ids()
fetched = fetch_data.expand(record_id=ids)
enriched = enrich_data.expand(record=fetched)
collect_results(enriched)
advanced_mapping()
9.4 Dataset-Driven Scheduling (Airflow 3.x)
Datasets allow DAGs to be triggered when upstream DAGs update specific datasets:
"""
File: dags/dataset_producer.py
Producer DAG that updates a dataset.
"""
from airflow.decorators import dag, task
from airflow.datasets import Dataset
from datetime import datetime
# Define datasets
sales_dataset = Dataset("s3://my-bucket/sales/daily/")
inventory_dataset = Dataset("s3://my-bucket/inventory/")
@dag(
dag_id="dataset_producer_sales",
start_date=datetime(2026, 1, 1),
schedule="@daily",
catchup=False,
tags=["datasets", "producer"],
)
def dataset_producer_sales():
@task(outlets=[sales_dataset])
def update_sales_data():
"""This task declares it updates the sales dataset."""
print("Updating sales data in S3...")
# In practice: write data to S3
return "sales_data_updated"
update_sales_data()
dataset_producer_sales()
@dag(
dag_id="dataset_producer_inventory",
start_date=datetime(2026, 1, 1),
schedule="@hourly",
catchup=False,
tags=["datasets", "producer"],
)
def dataset_producer_inventory():
@task(outlets=[inventory_dataset])
def update_inventory_data():
"""This task declares it updates the inventory dataset."""
print("Updating inventory data in S3...")
return "inventory_data_updated"
update_inventory_data()
dataset_producer_inventory()
"""
File: dags/dataset_consumer.py
Consumer DAG triggered when datasets are updated.
"""
from airflow.decorators import dag, task
from airflow.datasets import Dataset
from datetime import datetime
sales_dataset = Dataset("s3://my-bucket/sales/daily/")
inventory_dataset = Dataset("s3://my-bucket/inventory/")
# Triggered when BOTH datasets are updated
@dag(
dag_id="dataset_consumer_report",
start_date=datetime(2026, 1, 1),
schedule=[sales_dataset, inventory_dataset], # AND logic
catchup=False,
tags=["datasets", "consumer"],
)
def dataset_consumer_report():
@task
def generate_combined_report():
"""Runs after both sales and inventory data are updated."""
print("Both datasets updated - generating combined report")
print("Reading latest sales data from S3...")
print("Reading latest inventory data from S3...")
print("Combined report generated!")
generate_combined_report()
dataset_consumer_report()
# Triggered when EITHER dataset is updated
@dag(
dag_id="dataset_consumer_monitor",
start_date=datetime(2026, 1, 1),
schedule=(sales_dataset | inventory_dataset), # OR logic
catchup=False,
tags=["datasets", "consumer"],
)
def dataset_consumer_monitor():
@task
def log_data_change():
"""Runs whenever any monitored dataset changes."""
print("Data change detected - logging event")
log_data_change()
dataset_consumer_monitor()
9.5 SubDAGs vs TaskGroups
"""
File: dags/taskgroup_vs_subdag.py
TaskGroups are the recommended approach in Airflow 3.x.
SubDAGs are deprecated.
"""
from airflow.decorators import dag, task, task_group
from datetime import datetime
@dag(
dag_id="taskgroup_recommended",
start_date=datetime(2026, 1, 1),
schedule="@daily",
catchup=False,
tags=["task-groups"],
)
def taskgroup_recommended():
# Nested task groups for complex organization
@task_group(group_id="ingestion")
def ingestion():
@task_group(group_id="source_a")
def source_a_group():
@task
def extract_a():
return {"source": "A", "records": 100}
@task
def validate_a(data: dict):
print(f"Validating {data['records']} records from {data['source']}")
return data
return validate_a(extract_a())
@task_group(group_id="source_b")
def source_b_group():
@task
def extract_b():
return {"source": "B", "records": 200}
@task
def validate_b(data: dict):
print(f"Validating {data['records']} records from {data['source']}")
return data
return validate_b(extract_b())
return [source_a_group(), source_b_group()]
@task_group(group_id="processing")
def processing(data_sources: list):
@task
def merge_data(sources: list):
total = sum(s["records"] for s in sources)
print(f"Merged {total} records from {len(sources)} sources")
return {"total_records": total}
@task
def apply_rules(data: dict):
print(f"Applying business rules to {data['total_records']} records")
return data
merged = merge_data(data_sources)
return apply_rules(merged)
sources = ingestion()
processing(sources)
taskgroup_recommended()
Practice Exercise
Exercise: Dynamic Multi-Region Pipeline
"""
File: dags/multi_region_pipeline.py
Build a pipeline that:
1. Discovers active regions dynamically
2. Processes data for each region in parallel (mapped tasks)
3. Aggregates results
4. Updates a dataset to trigger downstream consumers
"""
from airflow.decorators import dag, task
from airflow.datasets import Dataset
from datetime import datetime
regional_dataset = Dataset("warehouse://regional_metrics")
@dag(
dag_id="multi_region_pipeline",
start_date=datetime(2026, 1, 1),
schedule="@daily",
catchup=False,
tags=["exercise", "dynamic"],
)
def multi_region_pipeline():
@task
def discover_regions():
"""Dynamically discover active regions."""
return ["us-east-1", "eu-west-1", "ap-southeast-1", "us-west-2"]
@task
def fetch_region_data(region: str):
"""Fetch data for a single region."""
import random
data = {
"region": region,
"users": random.randint(1000, 50000),
"revenue": round(random.uniform(10000, 500000), 2),
"latency_ms": round(random.uniform(10, 200), 1),
}
print(f"Region {region}: {data['users']} users, ${data['revenue']:,.2f}")
return data
@task(outlets=[regional_dataset])
def aggregate_metrics(region_data: list):
"""Aggregate metrics from all regions."""
total_users = sum(r["users"] for r in region_data)
total_revenue = sum(r["revenue"] for r in region_data)
avg_latency = sum(r["latency_ms"] for r in region_data) / len(region_data)
print(f"\n=== Global Metrics ===")
print(f"Regions: {len(region_data)}")
print(f"Total Users: {total_users:,}")
print(f"Total Revenue: ${total_revenue:,.2f}")
print(f"Avg Latency: {avg_latency:.1f} ms")
best_region = min(region_data, key=lambda r: r["latency_ms"])
print(f"Best Latency: {best_region['region']} ({best_region['latency_ms']} ms)")
regions = discover_regions()
region_data = fetch_region_data.expand(region=regions)
aggregate_metrics(region_data)
multi_region_pipeline()
Summary
In this chapter, you learned:
- Dynamic DAG generation creates multiple DAGs from configuration files
- Dynamic task mapping (
expand()) runs tasks in parallel based on runtime data - Datasets enable event-driven scheduling between DAGs
- TaskGroups are the recommended replacement for deprecated SubDAGs
- These patterns enable scalable, maintainable workflow architectures