Chapter 2: Core Concepts and Architecture

Haiyue
8min

Chapter 2: Core Concepts and Architecture

Learning Objectives
  • Understand DAGs, Tasks, and Operators
  • Learn the Airflow 3.x architecture (Scheduler, Executor, Webserver, DAG Processor)
  • Understand the new Task Execution Interface in Airflow 3.x
  • Learn how metadata database and message broker work together

Knowledge Points

2.1 DAGs (Directed Acyclic Graphs)

A DAG is a collection of tasks organized with dependencies. “Directed” means tasks have a defined order; “Acyclic” means there are no circular dependencies.

from airflow.decorators import dag
from datetime import datetime

@dag(
    dag_id="my_first_dag",
    start_date=datetime(2026, 1, 1),
    schedule="@daily",
    catchup=False,
    tags=["example"],
)
def my_first_dag():
    """A simple DAG to demonstrate core concepts."""
    pass

my_first_dag()

Key DAG Parameters:

ParameterDescriptionExample
dag_idUnique identifier"etl_pipeline"
start_dateWhen scheduling beginsdatetime(2026, 1, 1)
scheduleHow often to run"@daily", "0 6 * * *"
catchupRun missed intervalsTrue / False
max_active_runsConcurrent DAG runs limit1
default_argsDefault task parameters{"retries": 3}

2.2 Tasks and Operators

Tasks are the individual units of work in a DAG. They are created by instantiating Operators.

from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator

# A task using BashOperator
print_date = BashOperator(
    task_id="print_date",
    bash_command="date",
)

# A task using PythonOperator
def greet(name):
    print(f"Hello, {name}!")

greet_task = PythonOperator(
    task_id="greet",
    python_callable=greet,
    op_kwargs={"name": "Airflow"},
)

Three ways to define tasks:

  1. Operators - Pre-built task templates (BashOperator, PythonOperator, etc.)
  2. Sensors - Special operators that wait for a condition to be met
  3. TaskFlow - Python functions decorated with @task (recommended in 3.x)

2.3 Airflow 3.x Architecture

                    ┌──────────────┐
                    │   Web UI /   │
                    │   REST API   │
                    └──────┬───────┘

                    ┌──────▼───────┐
                    │  Webserver   │
                    └──────┬───────┘

              ┌────────────┼────────────┐
              │            │            │
      ┌───────▼──────┐ ┌──▼─────┐ ┌───▼──────────┐
      │ DAG Processor│ │Scheduler│ │   Executor   │
      │  (Parsing)   │ │        │ │(Local/Celery/ │
      └───────┬──────┘ └──┬─────┘ │ Kubernetes)  │
              │            │       └───┬──────────┘
              │     ┌──────▼───────┐   │
              └─────►  Metadata DB  ◄──┘
                    │ (PostgreSQL)  │
                    └──────────────┘

Components:

  • Webserver - Serves the UI and REST API
  • Scheduler - Determines when tasks should run and sends them to the executor
  • DAG Processor - Parses DAG files and stores metadata (separated from scheduler in 3.x)
  • Executor - Runs tasks (LocalExecutor, CeleryExecutor, KubernetesExecutor)
  • Metadata Database - Stores DAG definitions, task states, variables, connections
  • Message Broker - Queue for task messages (required for CeleryExecutor, e.g., Redis/RabbitMQ)

2.4 Task Execution Interface (Airflow 3.x)

Airflow 3.x introduces the Task Execution Interface (TEI), which decouples task execution from the Airflow core:

┌─────────────────────┐     ┌──────────────────────┐
│   Airflow Core      │     │   Task Execution     │
│  ┌───────────────┐  │     │   Environment        │
│  │  Scheduler    │  │     │  ┌────────────────┐  │
│  │               ├──┼─────┼──► Task Runner    │  │
│  │  DAG Processor│  │ TEI │  │                │  │
│  └───────────────┘  │     │  │  Minimal deps  │  │
│  ┌───────────────┐  │     │  │  Isolated env  │  │
│  │  Metadata DB  │  │     │  └────────────────┘  │
│  └───────────────┘  │     └──────────────────────┘
└─────────────────────┘

Benefits of TEI:

  • Tasks run in isolated environments with minimal dependencies
  • Easier to scale and manage resources
  • Better support for different runtime environments
  • Improved security through isolation

2.5 Task Instance Lifecycle

A task instance goes through several states during execution:

                    ┌──────────┐
            ┌───────│  none    │
            │       └────┬─────┘
            │            │ scheduled
            │       ┌────▼─────┐
            │       │ queued   │
            │       └────┬─────┘
            │            │ running
            │       ┌────▼─────┐
     upstream_  ┌───│ running  │───┐
     failed  │  │   └──────────┘   │
            │   │                   │
       ┌────▼───▼──┐         ┌─────▼────┐
       │  failed   │         │ success  │
       └───────────┘         └──────────┘

Common States:

  • none - Task has not been queued yet
  • scheduled - Scheduler determined the task should run
  • queued - Task is assigned to an executor
  • running - Task is actively executing
  • success - Task completed successfully
  • failed - Task encountered an error
  • skipped - Task was skipped (e.g., by branching)
  • up_for_retry - Task failed but will be retried

Practice Exercise

Exercise 1: Explore DAG Structure

"""
File: dags/architecture_demo.py
Purpose: Demonstrate core architecture concepts
"""
from airflow.decorators import dag, task
from datetime import datetime

@dag(
    dag_id="architecture_demo",
    start_date=datetime(2026, 1, 1),
    schedule="@daily",
    catchup=False,
    tags=["chapter2", "demo"],
    default_args={
        "retries": 2,
        "retry_delay": 60,  # seconds
    },
)
def architecture_demo():

    @task
    def extract():
        """Simulate data extraction."""
        data = {"users": 100, "orders": 250}
        print(f"Extracted data: {data}")
        return data

    @task
    def transform(raw_data: dict):
        """Simulate data transformation."""
        transformed = {
            "total_records": raw_data["users"] + raw_data["orders"],
            "source": "demo_db",
        }
        print(f"Transformed data: {transformed}")
        return transformed

    @task
    def load(processed_data: dict):
        """Simulate data loading."""
        print(f"Loading {processed_data['total_records']} records from {processed_data['source']}")
        print("Data loaded successfully!")

    # Define task dependencies through data flow
    raw = extract()
    processed = transform(raw)
    load(processed)

architecture_demo()

Exercise 2: Inspect Task States

# Trigger the DAG
airflow dags trigger architecture_demo

# Watch task states
airflow tasks states-for-dag-run architecture_demo 2026-01-01T00:00:00+00:00

# View logs for a specific task
airflow tasks logs architecture_demo extract 2026-01-01T00:00:00+00:00

Summary

In this chapter, you learned:

  • DAGs define workflows as directed acyclic graphs of tasks
  • Tasks are created using Operators, Sensors, or the TaskFlow API
  • Airflow 3.x architecture separates DAG processing from scheduling
  • The Task Execution Interface decouples task execution from Airflow core
  • Task instances progress through a defined lifecycle of states