Chapter 12: Production Deployment and Operations

Haiyue
19min

Chapter 12: Production Deployment and Operations

Learning Objectives
  • Deploy Airflow on Kubernetes with the official Helm chart
  • Configure CeleryExecutor and KubernetesExecutor for scalability
  • Set up monitoring with Prometheus, Grafana, and StatsD
  • Implement CI/CD pipelines for DAG deployment

Knowledge Points

12.1 Executor Comparison

Choosing the right executor is critical for production deployments:

ExecutorBest ForScalingComplexity
LocalExecutorSingle-node, small workloadsVerticalLow
CeleryExecutorMulti-node, steady workloadsHorizontal (workers)Medium
KubernetesExecutorDynamic, variable workloadsPod-per-taskHigh
CeleryKubernetesExecutorMixed workloadsBothHigh

12.2 CeleryExecutor Setup

# airflow.cfg for CeleryExecutor
"""
[core]
executor = CeleryExecutor

[celery]
broker_url = redis://redis:6379/0
result_backend = db+postgresql://airflow:airflow@postgres/airflow
worker_concurrency = 16
worker_autoscale = 16,4

[celery_broker_transport_options]
visibility_timeout = 21600
"""
# docker-compose.yaml for CeleryExecutor
version: '3.8'
services:
  postgres:
    image: postgres:15
    environment:
      POSTGRES_USER: airflow
      POSTGRES_PASSWORD: airflow
      POSTGRES_DB: airflow
    volumes:
      - postgres-data:/var/lib/postgresql/data

  redis:
    image: redis:7
    ports:
      - "6379:6379"

  airflow-webserver:
    image: apache/airflow:3.0.0
    command: webserver
    ports:
      - "8080:8080"
    environment: &airflow-env
      AIRFLOW__CORE__EXECUTOR: CeleryExecutor
      AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
      AIRFLOW__CELERY__BROKER_URL: redis://redis:6379/0
      AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow
      AIRFLOW__CORE__FERNET_KEY: 'your_fernet_key_here'
    volumes:
      - ./dags:/opt/airflow/dags
      - ./logs:/opt/airflow/logs
    depends_on:
      - postgres
      - redis

  airflow-scheduler:
    image: apache/airflow:3.0.0
    command: scheduler
    environment: *airflow-env
    volumes:
      - ./dags:/opt/airflow/dags
      - ./logs:/opt/airflow/logs
    depends_on:
      - postgres
      - redis

  airflow-worker:
    image: apache/airflow:3.0.0
    command: celery worker
    environment: *airflow-env
    volumes:
      - ./dags:/opt/airflow/dags
      - ./logs:/opt/airflow/logs
    depends_on:
      - postgres
      - redis
    deploy:
      replicas: 3  # Scale workers as needed

  flower:
    image: apache/airflow:3.0.0
    command: celery flower
    ports:
      - "5555:5555"
    environment: *airflow-env
    depends_on:
      - redis

volumes:
  postgres-data:

12.3 KubernetesExecutor Setup

# values.yaml for Airflow Helm Chart
executor: KubernetesExecutor

# Webserver configuration
webserver:
  replicas: 2
  resources:
    requests:
      cpu: "500m"
      memory: "1Gi"
    limits:
      cpu: "1000m"
      memory: "2Gi"

# Scheduler configuration
scheduler:
  replicas: 2  # HA scheduler in Airflow 3.x
  resources:
    requests:
      cpu: "500m"
      memory: "1Gi"

# Database configuration
postgresql:
  enabled: true
  auth:
    username: airflow
    password: airflow
    database: airflow

# DAG sync configuration
dags:
  persistence:
    enabled: true
    size: 5Gi
  gitSync:
    enabled: true
    repo: git@github.com:company/airflow-dags.git
    branch: main
    wait: 60
    subPath: dags

# KubernetesExecutor specific
kubernetesExecutor:
  # Default pod template for tasks
  podTemplate: |
    apiVersion: v1
    kind: Pod
    metadata:
      name: airflow-task
    spec:
      containers:
        - name: base
          image: apache/airflow:3.0.0
          resources:
            requests:
              cpu: "250m"
              memory: "512Mi"
            limits:
              cpu: "1000m"
              memory: "2Gi"
      restartPolicy: Never
# Deploy with Helm
helm repo add apache-airflow https://airflow.apache.org
helm repo update

helm install airflow apache-airflow/airflow \
    --namespace airflow \
    --create-namespace \
    --values values.yaml \
    --timeout 10m

# Upgrade deployment
helm upgrade airflow apache-airflow/airflow \
    --namespace airflow \
    --values values.yaml

# Check status
kubectl get pods -n airflow
kubectl get svc -n airflow

12.4 Per-Task Kubernetes Configuration

"""
File: dags/k8s_executor_dag.py
Configure Kubernetes resources per task.
"""
from airflow.decorators import dag, task
from datetime import datetime
from kubernetes.client import models as k8s

@dag(
    dag_id="k8s_executor_dag",
    start_date=datetime(2026, 1, 1),
    schedule="@daily",
    catchup=False,
    tags=["kubernetes", "production"],
)
def k8s_executor_dag():

    @task(
        executor_config={
            "pod_override": k8s.V1Pod(
                spec=k8s.V1PodSpec(
                    containers=[
                        k8s.V1Container(
                            name="base",
                            resources=k8s.V1ResourceRequirements(
                                requests={"cpu": "500m", "memory": "1Gi"},
                                limits={"cpu": "2", "memory": "4Gi"},
                            ),
                        )
                    ],
                )
            )
        }
    )
    def heavy_processing():
        """This task gets more CPU and memory."""
        print("Running heavy processing with extra resources")

    @task(
        executor_config={
            "pod_override": k8s.V1Pod(
                spec=k8s.V1PodSpec(
                    containers=[
                        k8s.V1Container(
                            name="base",
                            resources=k8s.V1ResourceRequirements(
                                requests={"cpu": "100m", "memory": "256Mi"},
                                limits={"cpu": "500m", "memory": "512Mi"},
                            ),
                        )
                    ],
                    node_selector={"workload-type": "general"},
                )
            )
        }
    )
    def light_processing():
        """This task uses minimal resources."""
        print("Running lightweight task")

    @task(
        executor_config={
            "pod_override": k8s.V1Pod(
                spec=k8s.V1PodSpec(
                    containers=[
                        k8s.V1Container(
                            name="base",
                            resources=k8s.V1ResourceRequirements(
                                requests={"nvidia.com/gpu": "1"},
                                limits={"nvidia.com/gpu": "1"},
                            ),
                        )
                    ],
                    tolerations=[
                        k8s.V1Toleration(
                            key="nvidia.com/gpu",
                            operator="Exists",
                            effect="NoSchedule",
                        )
                    ],
                    node_selector={"workload-type": "gpu"},
                )
            )
        }
    )
    def gpu_ml_task():
        """This task runs on a GPU node."""
        print("Running ML training on GPU")

    light_processing() >> heavy_processing() >> gpu_ml_task()

k8s_executor_dag()

12.5 Monitoring with Prometheus and Grafana

# airflow.cfg for StatsD metrics
"""
[metrics]
statsd_on = True
statsd_host = statsd-exporter
statsd_port = 9125
statsd_prefix = airflow
"""
# statsd-exporter-config.yaml
# Mapping StatsD metrics to Prometheus format
mappings:
  - match: "airflow.dag.*.*.duration"
    name: "airflow_dag_task_duration"
    labels:
      dag_id: "$1"
      task_id: "$2"

  - match: "airflow.dagrun.duration.success.*"
    name: "airflow_dagrun_duration_success"
    labels:
      dag_id: "$1"

  - match: "airflow.dagrun.duration.failed.*"
    name: "airflow_dagrun_duration_failed"
    labels:
      dag_id: "$1"

  - match: "airflow.scheduler.tasks.running"
    name: "airflow_scheduler_tasks_running"

  - match: "airflow.scheduler.tasks.starving"
    name: "airflow_scheduler_tasks_starving"

  - match: "airflow.pool.open_slots.*"
    name: "airflow_pool_open_slots"
    labels:
      pool: "$1"

  - match: "airflow.pool.used_slots.*"
    name: "airflow_pool_used_slots"
    labels:
      pool: "$1"
# Prometheus scrape config
scrape_configs:
  - job_name: 'airflow'
    static_configs:
      - targets: ['statsd-exporter:9102']
    scrape_interval: 15s

Key Metrics to Monitor:

MetricDescriptionAlert Threshold
dag_processing_total_parse_timeTime to parse all DAG files> 30s
scheduler_tasks_runningCurrently running tasksDepends on capacity
scheduler_tasks_starvingTasks waiting for slots> 0 sustained
dagrun_duration_successSuccessful DAG run timeVaries per DAG
dagrun_duration_failedFailed DAG run timeAny occurrence
pool_open_slotsAvailable pool slots< 10% total
task_fail_countTask failure count> 0

12.6 CI/CD Pipeline for DAG Deployment

# .github/workflows/airflow-ci.yml
name: Airflow CI/CD

on:
  push:
    branches: [main]
    paths: ['dags/**']
  pull_request:
    branches: [main]
    paths: ['dags/**']

jobs:
  test:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4

      - name: Set up Python
        uses: actions/setup-python@v5
        with:
          python-version: '3.11'

      - name: Install dependencies
        run: |
          pip install "apache-airflow==3.0.0" pytest pytest-cov
          pip install -r requirements.txt

      - name: Validate DAG syntax
        run: |
          python -c "
          from airflow.models import DagBag
          bag = DagBag(dag_folder='dags/', include_examples=False)
          assert len(bag.import_errors) == 0, f'Errors: {bag.import_errors}'
          print(f'Successfully loaded {len(bag.dags)} DAGs')
          "

      - name: Run tests
        run: pytest tests/ -v --cov=dags --cov-report=xml

      - name: Lint DAGs
        run: |
          pip install ruff
          ruff check dags/

  deploy:
    needs: test
    runs-on: ubuntu-latest
    if: github.ref == 'refs/heads/main'
    steps:
      - uses: actions/checkout@v4

      - name: Sync DAGs to S3
        env:
          AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
          AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
        run: |
          aws s3 sync dags/ s3://airflow-dags-bucket/dags/ \
            --delete \
            --exclude "__pycache__/*" \
            --exclude "*.pyc"

      # Alternative: Deploy via git-sync (Kubernetes)
      # The Helm chart's git-sync will automatically pull changes

12.7 Production Best Practices

# airflow.cfg - Production-optimized configuration
"""
[core]
executor = KubernetesExecutor
parallelism = 64
max_active_tasks_per_dag = 32
max_active_runs_per_dag = 4
dag_file_processor_timeout = 120

[scheduler]
parsing_processes = 4
min_file_process_interval = 60
dag_dir_list_interval = 300
schedule_after_task_execution = True

[webserver]
web_server_worker_timeout = 120
worker_refresh_interval = 60
expose_config = False

[logging]
remote_logging = True
remote_log_conn_id = aws_default
remote_base_log_folder = s3://airflow-logs/logs
encrypt_s3_logs = True

[celery]
worker_autoscale = 16,4
worker_concurrency = 16

[kubernetes_executor]
delete_worker_pods = True
delete_worker_pods_on_failure = False
worker_pods_creation_batch_size = 16
"""
"""
File: dags/production_dag_template.py
Production-ready DAG template with best practices.
"""
from airflow.decorators import dag, task
from airflow.operators.empty import EmptyOperator
from datetime import datetime, timedelta

default_args = {
    "owner": "data_platform",
    "depends_on_past": False,
    "retries": 3,
    "retry_delay": timedelta(minutes=5),
    "retry_exponential_backoff": True,
    "max_retry_delay": timedelta(minutes=30),
    "execution_timeout": timedelta(hours=2),
    "email_on_failure": True,
    "email_on_retry": False,
    "email": ["alerts@example.com"],
    "sla": timedelta(hours=4),
}

@dag(
    dag_id="production_pipeline",
    start_date=datetime(2026, 1, 1),
    schedule="0 6 * * *",
    catchup=False,
    default_args=default_args,
    max_active_runs=1,
    tags=["production", "critical"],
    doc_md="""
    ## Production Data Pipeline
    - **Owner**: Data Platform Team
    - **Schedule**: Daily at 6 AM UTC
    - **SLA**: Must complete within 4 hours
    - **On-call**: #data-platform-oncall Slack channel
    """,
)
def production_pipeline():

    start = EmptyOperator(task_id="start")
    end = EmptyOperator(task_id="end")

    @task
    def extract(**context):
        """Extract data with retry and timeout handling."""
        ds = context["ds"]
        print(f"Extracting data for {ds}")
        return {"date": ds, "records": 50000}

    @task
    def transform(data: dict):
        """Transform with data validation."""
        assert data["records"] > 0, "No records extracted!"
        print(f"Transforming {data['records']} records")
        return {"records": data["records"], "transformed": True}

    @task
    def load(data: dict):
        """Load with idempotency."""
        print(f"Loading {data['records']} records (idempotent)")
        return "success"

    @task
    def validate(status: str):
        """Post-load validation."""
        assert status == "success", "Load failed!"
        print("Validation passed!")

    @task(trigger_rule="all_done")
    def notify(status: str):
        """Always send notification regardless of outcome."""
        print(f"Pipeline completed with status: {status}")

    raw = extract()
    processed = transform(raw)
    status = load(processed)
    start >> raw
    validate(status) >> end
    status >> notify(status)

production_pipeline()

12.8 Health Checks and Alerting

# Airflow health check endpoint
curl http://localhost:8080/health

# Expected response:
# {
#   "metadatabase": {"status": "healthy"},
#   "scheduler": {"status": "healthy", "latest_scheduler_heartbeat": "2026-01-15T10:00:00"}
# }
"""
File: dags/airflow_health_monitor.py
Monitor Airflow's own health.
"""
from airflow.decorators import dag, task
from datetime import datetime, timedelta

@dag(
    dag_id="airflow_health_monitor",
    start_date=datetime(2026, 1, 1),
    schedule="*/5 * * * *",
    catchup=False,
    max_active_runs=1,
    tags=["monitoring", "system"],
)
def airflow_health_monitor():

    @task
    def check_failed_dags():
        """Check for recently failed DAG runs."""
        from airflow.models import DagRun
        from airflow.utils.state import State
        from airflow.utils.session import create_session

        with create_session() as session:
            recent_failures = session.query(DagRun).filter(
                DagRun.state == State.FAILED,
                DagRun.end_date >= datetime.now() - timedelta(hours=1),
            ).all()

            if recent_failures:
                for run in recent_failures:
                    print(f"FAILED: {run.dag_id} - {run.run_id}")
                return {"status": "alert", "failures": len(recent_failures)}

            print("No recent failures")
            return {"status": "ok", "failures": 0}

    @task
    def check_long_running_tasks():
        """Alert on tasks running longer than expected."""
        from airflow.models import TaskInstance
        from airflow.utils.state import State
        from airflow.utils.session import create_session

        threshold = datetime.now() - timedelta(hours=2)

        with create_session() as session:
            long_running = session.query(TaskInstance).filter(
                TaskInstance.state == State.RUNNING,
                TaskInstance.start_date < threshold,
            ).all()

            if long_running:
                for ti in long_running:
                    print(f"LONG RUNNING: {ti.dag_id}.{ti.task_id} since {ti.start_date}")
                return {"status": "warning", "count": len(long_running)}

            return {"status": "ok", "count": 0}

    check_failed_dags()
    check_long_running_tasks()

airflow_health_monitor()

Practice Exercise

Exercise: Deploy Airflow with Docker Compose

#!/bin/bash
# deploy_airflow.sh
# Complete deployment script for local production-like environment

set -e

# Create directories
mkdir -p dags logs plugins config

# Generate Fernet key
FERNET_KEY=$(python3 -c "from cryptography.fernet import Fernet; print(Fernet.generate_key().decode())")
echo "Generated Fernet Key: $FERNET_KEY"

# Create .env file
cat > .env << EOF
AIRFLOW_UID=$(id -u)
AIRFLOW__CORE__FERNET_KEY=${FERNET_KEY}
AIRFLOW__CORE__EXECUTOR=CeleryExecutor
_AIRFLOW_WWW_USER_USERNAME=admin
_AIRFLOW_WWW_USER_PASSWORD=admin
EOF

# Initialize the database
docker compose up airflow-init

# Start all services
docker compose up -d

# Wait for webserver
echo "Waiting for Airflow webserver..."
until curl -s http://localhost:8080/health | grep -q "healthy"; do
    sleep 5
done

echo "Airflow is ready at http://localhost:8080"
echo "Username: admin, Password: admin"
echo "Flower (Celery monitor): http://localhost:5555"

Summary

In this chapter, you learned:

  • CeleryExecutor scales with dedicated workers; KubernetesExecutor creates a pod per task
  • The official Helm chart simplifies Kubernetes deployment with git-sync for DAGs
  • StatsD + Prometheus + Grafana provides comprehensive monitoring
  • CI/CD pipelines validate DAGs before deployment with syntax checks and tests
  • Production configurations include retries, SLAs, timeouts, remote logging, and health monitoring