Chapter 12: Production Deployment and Operations
Haiyue
19min
Chapter 12: Production Deployment and Operations
Learning Objectives
- Deploy Airflow on Kubernetes with the official Helm chart
- Configure CeleryExecutor and KubernetesExecutor for scalability
- Set up monitoring with Prometheus, Grafana, and StatsD
- Implement CI/CD pipelines for DAG deployment
Knowledge Points
12.1 Executor Comparison
Choosing the right executor is critical for production deployments:
| Executor | Best For | Scaling | Complexity |
|---|---|---|---|
| LocalExecutor | Single-node, small workloads | Vertical | Low |
| CeleryExecutor | Multi-node, steady workloads | Horizontal (workers) | Medium |
| KubernetesExecutor | Dynamic, variable workloads | Pod-per-task | High |
| CeleryKubernetesExecutor | Mixed workloads | Both | High |
12.2 CeleryExecutor Setup
# airflow.cfg for CeleryExecutor
"""
[core]
executor = CeleryExecutor
[celery]
broker_url = redis://redis:6379/0
result_backend = db+postgresql://airflow:airflow@postgres/airflow
worker_concurrency = 16
worker_autoscale = 16,4
[celery_broker_transport_options]
visibility_timeout = 21600
"""
# docker-compose.yaml for CeleryExecutor
version: '3.8'
services:
postgres:
image: postgres:15
environment:
POSTGRES_USER: airflow
POSTGRES_PASSWORD: airflow
POSTGRES_DB: airflow
volumes:
- postgres-data:/var/lib/postgresql/data
redis:
image: redis:7
ports:
- "6379:6379"
airflow-webserver:
image: apache/airflow:3.0.0
command: webserver
ports:
- "8080:8080"
environment: &airflow-env
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
AIRFLOW__CELERY__BROKER_URL: redis://redis:6379/0
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow
AIRFLOW__CORE__FERNET_KEY: 'your_fernet_key_here'
volumes:
- ./dags:/opt/airflow/dags
- ./logs:/opt/airflow/logs
depends_on:
- postgres
- redis
airflow-scheduler:
image: apache/airflow:3.0.0
command: scheduler
environment: *airflow-env
volumes:
- ./dags:/opt/airflow/dags
- ./logs:/opt/airflow/logs
depends_on:
- postgres
- redis
airflow-worker:
image: apache/airflow:3.0.0
command: celery worker
environment: *airflow-env
volumes:
- ./dags:/opt/airflow/dags
- ./logs:/opt/airflow/logs
depends_on:
- postgres
- redis
deploy:
replicas: 3 # Scale workers as needed
flower:
image: apache/airflow:3.0.0
command: celery flower
ports:
- "5555:5555"
environment: *airflow-env
depends_on:
- redis
volumes:
postgres-data:
12.3 KubernetesExecutor Setup
# values.yaml for Airflow Helm Chart
executor: KubernetesExecutor
# Webserver configuration
webserver:
replicas: 2
resources:
requests:
cpu: "500m"
memory: "1Gi"
limits:
cpu: "1000m"
memory: "2Gi"
# Scheduler configuration
scheduler:
replicas: 2 # HA scheduler in Airflow 3.x
resources:
requests:
cpu: "500m"
memory: "1Gi"
# Database configuration
postgresql:
enabled: true
auth:
username: airflow
password: airflow
database: airflow
# DAG sync configuration
dags:
persistence:
enabled: true
size: 5Gi
gitSync:
enabled: true
repo: git@github.com:company/airflow-dags.git
branch: main
wait: 60
subPath: dags
# KubernetesExecutor specific
kubernetesExecutor:
# Default pod template for tasks
podTemplate: |
apiVersion: v1
kind: Pod
metadata:
name: airflow-task
spec:
containers:
- name: base
image: apache/airflow:3.0.0
resources:
requests:
cpu: "250m"
memory: "512Mi"
limits:
cpu: "1000m"
memory: "2Gi"
restartPolicy: Never
# Deploy with Helm
helm repo add apache-airflow https://airflow.apache.org
helm repo update
helm install airflow apache-airflow/airflow \
--namespace airflow \
--create-namespace \
--values values.yaml \
--timeout 10m
# Upgrade deployment
helm upgrade airflow apache-airflow/airflow \
--namespace airflow \
--values values.yaml
# Check status
kubectl get pods -n airflow
kubectl get svc -n airflow
12.4 Per-Task Kubernetes Configuration
"""
File: dags/k8s_executor_dag.py
Configure Kubernetes resources per task.
"""
from airflow.decorators import dag, task
from datetime import datetime
from kubernetes.client import models as k8s
@dag(
dag_id="k8s_executor_dag",
start_date=datetime(2026, 1, 1),
schedule="@daily",
catchup=False,
tags=["kubernetes", "production"],
)
def k8s_executor_dag():
@task(
executor_config={
"pod_override": k8s.V1Pod(
spec=k8s.V1PodSpec(
containers=[
k8s.V1Container(
name="base",
resources=k8s.V1ResourceRequirements(
requests={"cpu": "500m", "memory": "1Gi"},
limits={"cpu": "2", "memory": "4Gi"},
),
)
],
)
)
}
)
def heavy_processing():
"""This task gets more CPU and memory."""
print("Running heavy processing with extra resources")
@task(
executor_config={
"pod_override": k8s.V1Pod(
spec=k8s.V1PodSpec(
containers=[
k8s.V1Container(
name="base",
resources=k8s.V1ResourceRequirements(
requests={"cpu": "100m", "memory": "256Mi"},
limits={"cpu": "500m", "memory": "512Mi"},
),
)
],
node_selector={"workload-type": "general"},
)
)
}
)
def light_processing():
"""This task uses minimal resources."""
print("Running lightweight task")
@task(
executor_config={
"pod_override": k8s.V1Pod(
spec=k8s.V1PodSpec(
containers=[
k8s.V1Container(
name="base",
resources=k8s.V1ResourceRequirements(
requests={"nvidia.com/gpu": "1"},
limits={"nvidia.com/gpu": "1"},
),
)
],
tolerations=[
k8s.V1Toleration(
key="nvidia.com/gpu",
operator="Exists",
effect="NoSchedule",
)
],
node_selector={"workload-type": "gpu"},
)
)
}
)
def gpu_ml_task():
"""This task runs on a GPU node."""
print("Running ML training on GPU")
light_processing() >> heavy_processing() >> gpu_ml_task()
k8s_executor_dag()
12.5 Monitoring with Prometheus and Grafana
# airflow.cfg for StatsD metrics
"""
[metrics]
statsd_on = True
statsd_host = statsd-exporter
statsd_port = 9125
statsd_prefix = airflow
"""
# statsd-exporter-config.yaml
# Mapping StatsD metrics to Prometheus format
mappings:
- match: "airflow.dag.*.*.duration"
name: "airflow_dag_task_duration"
labels:
dag_id: "$1"
task_id: "$2"
- match: "airflow.dagrun.duration.success.*"
name: "airflow_dagrun_duration_success"
labels:
dag_id: "$1"
- match: "airflow.dagrun.duration.failed.*"
name: "airflow_dagrun_duration_failed"
labels:
dag_id: "$1"
- match: "airflow.scheduler.tasks.running"
name: "airflow_scheduler_tasks_running"
- match: "airflow.scheduler.tasks.starving"
name: "airflow_scheduler_tasks_starving"
- match: "airflow.pool.open_slots.*"
name: "airflow_pool_open_slots"
labels:
pool: "$1"
- match: "airflow.pool.used_slots.*"
name: "airflow_pool_used_slots"
labels:
pool: "$1"
# Prometheus scrape config
scrape_configs:
- job_name: 'airflow'
static_configs:
- targets: ['statsd-exporter:9102']
scrape_interval: 15s
Key Metrics to Monitor:
| Metric | Description | Alert Threshold |
|---|---|---|
dag_processing_total_parse_time | Time to parse all DAG files | > 30s |
scheduler_tasks_running | Currently running tasks | Depends on capacity |
scheduler_tasks_starving | Tasks waiting for slots | > 0 sustained |
dagrun_duration_success | Successful DAG run time | Varies per DAG |
dagrun_duration_failed | Failed DAG run time | Any occurrence |
pool_open_slots | Available pool slots | < 10% total |
task_fail_count | Task failure count | > 0 |
12.6 CI/CD Pipeline for DAG Deployment
# .github/workflows/airflow-ci.yml
name: Airflow CI/CD
on:
push:
branches: [main]
paths: ['dags/**']
pull_request:
branches: [main]
paths: ['dags/**']
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: '3.11'
- name: Install dependencies
run: |
pip install "apache-airflow==3.0.0" pytest pytest-cov
pip install -r requirements.txt
- name: Validate DAG syntax
run: |
python -c "
from airflow.models import DagBag
bag = DagBag(dag_folder='dags/', include_examples=False)
assert len(bag.import_errors) == 0, f'Errors: {bag.import_errors}'
print(f'Successfully loaded {len(bag.dags)} DAGs')
"
- name: Run tests
run: pytest tests/ -v --cov=dags --cov-report=xml
- name: Lint DAGs
run: |
pip install ruff
ruff check dags/
deploy:
needs: test
runs-on: ubuntu-latest
if: github.ref == 'refs/heads/main'
steps:
- uses: actions/checkout@v4
- name: Sync DAGs to S3
env:
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
run: |
aws s3 sync dags/ s3://airflow-dags-bucket/dags/ \
--delete \
--exclude "__pycache__/*" \
--exclude "*.pyc"
# Alternative: Deploy via git-sync (Kubernetes)
# The Helm chart's git-sync will automatically pull changes
12.7 Production Best Practices
# airflow.cfg - Production-optimized configuration
"""
[core]
executor = KubernetesExecutor
parallelism = 64
max_active_tasks_per_dag = 32
max_active_runs_per_dag = 4
dag_file_processor_timeout = 120
[scheduler]
parsing_processes = 4
min_file_process_interval = 60
dag_dir_list_interval = 300
schedule_after_task_execution = True
[webserver]
web_server_worker_timeout = 120
worker_refresh_interval = 60
expose_config = False
[logging]
remote_logging = True
remote_log_conn_id = aws_default
remote_base_log_folder = s3://airflow-logs/logs
encrypt_s3_logs = True
[celery]
worker_autoscale = 16,4
worker_concurrency = 16
[kubernetes_executor]
delete_worker_pods = True
delete_worker_pods_on_failure = False
worker_pods_creation_batch_size = 16
"""
"""
File: dags/production_dag_template.py
Production-ready DAG template with best practices.
"""
from airflow.decorators import dag, task
from airflow.operators.empty import EmptyOperator
from datetime import datetime, timedelta
default_args = {
"owner": "data_platform",
"depends_on_past": False,
"retries": 3,
"retry_delay": timedelta(minutes=5),
"retry_exponential_backoff": True,
"max_retry_delay": timedelta(minutes=30),
"execution_timeout": timedelta(hours=2),
"email_on_failure": True,
"email_on_retry": False,
"email": ["alerts@example.com"],
"sla": timedelta(hours=4),
}
@dag(
dag_id="production_pipeline",
start_date=datetime(2026, 1, 1),
schedule="0 6 * * *",
catchup=False,
default_args=default_args,
max_active_runs=1,
tags=["production", "critical"],
doc_md="""
## Production Data Pipeline
- **Owner**: Data Platform Team
- **Schedule**: Daily at 6 AM UTC
- **SLA**: Must complete within 4 hours
- **On-call**: #data-platform-oncall Slack channel
""",
)
def production_pipeline():
start = EmptyOperator(task_id="start")
end = EmptyOperator(task_id="end")
@task
def extract(**context):
"""Extract data with retry and timeout handling."""
ds = context["ds"]
print(f"Extracting data for {ds}")
return {"date": ds, "records": 50000}
@task
def transform(data: dict):
"""Transform with data validation."""
assert data["records"] > 0, "No records extracted!"
print(f"Transforming {data['records']} records")
return {"records": data["records"], "transformed": True}
@task
def load(data: dict):
"""Load with idempotency."""
print(f"Loading {data['records']} records (idempotent)")
return "success"
@task
def validate(status: str):
"""Post-load validation."""
assert status == "success", "Load failed!"
print("Validation passed!")
@task(trigger_rule="all_done")
def notify(status: str):
"""Always send notification regardless of outcome."""
print(f"Pipeline completed with status: {status}")
raw = extract()
processed = transform(raw)
status = load(processed)
start >> raw
validate(status) >> end
status >> notify(status)
production_pipeline()
12.8 Health Checks and Alerting
# Airflow health check endpoint
curl http://localhost:8080/health
# Expected response:
# {
# "metadatabase": {"status": "healthy"},
# "scheduler": {"status": "healthy", "latest_scheduler_heartbeat": "2026-01-15T10:00:00"}
# }
"""
File: dags/airflow_health_monitor.py
Monitor Airflow's own health.
"""
from airflow.decorators import dag, task
from datetime import datetime, timedelta
@dag(
dag_id="airflow_health_monitor",
start_date=datetime(2026, 1, 1),
schedule="*/5 * * * *",
catchup=False,
max_active_runs=1,
tags=["monitoring", "system"],
)
def airflow_health_monitor():
@task
def check_failed_dags():
"""Check for recently failed DAG runs."""
from airflow.models import DagRun
from airflow.utils.state import State
from airflow.utils.session import create_session
with create_session() as session:
recent_failures = session.query(DagRun).filter(
DagRun.state == State.FAILED,
DagRun.end_date >= datetime.now() - timedelta(hours=1),
).all()
if recent_failures:
for run in recent_failures:
print(f"FAILED: {run.dag_id} - {run.run_id}")
return {"status": "alert", "failures": len(recent_failures)}
print("No recent failures")
return {"status": "ok", "failures": 0}
@task
def check_long_running_tasks():
"""Alert on tasks running longer than expected."""
from airflow.models import TaskInstance
from airflow.utils.state import State
from airflow.utils.session import create_session
threshold = datetime.now() - timedelta(hours=2)
with create_session() as session:
long_running = session.query(TaskInstance).filter(
TaskInstance.state == State.RUNNING,
TaskInstance.start_date < threshold,
).all()
if long_running:
for ti in long_running:
print(f"LONG RUNNING: {ti.dag_id}.{ti.task_id} since {ti.start_date}")
return {"status": "warning", "count": len(long_running)}
return {"status": "ok", "count": 0}
check_failed_dags()
check_long_running_tasks()
airflow_health_monitor()
Practice Exercise
Exercise: Deploy Airflow with Docker Compose
#!/bin/bash
# deploy_airflow.sh
# Complete deployment script for local production-like environment
set -e
# Create directories
mkdir -p dags logs plugins config
# Generate Fernet key
FERNET_KEY=$(python3 -c "from cryptography.fernet import Fernet; print(Fernet.generate_key().decode())")
echo "Generated Fernet Key: $FERNET_KEY"
# Create .env file
cat > .env << EOF
AIRFLOW_UID=$(id -u)
AIRFLOW__CORE__FERNET_KEY=${FERNET_KEY}
AIRFLOW__CORE__EXECUTOR=CeleryExecutor
_AIRFLOW_WWW_USER_USERNAME=admin
_AIRFLOW_WWW_USER_PASSWORD=admin
EOF
# Initialize the database
docker compose up airflow-init
# Start all services
docker compose up -d
# Wait for webserver
echo "Waiting for Airflow webserver..."
until curl -s http://localhost:8080/health | grep -q "healthy"; do
sleep 5
done
echo "Airflow is ready at http://localhost:8080"
echo "Username: admin, Password: admin"
echo "Flower (Celery monitor): http://localhost:5555"
Summary
In this chapter, you learned:
- CeleryExecutor scales with dedicated workers; KubernetesExecutor creates a pod per task
- The official Helm chart simplifies Kubernetes deployment with git-sync for DAGs
- StatsD + Prometheus + Grafana provides comprehensive monitoring
- CI/CD pipelines validate DAGs before deployment with syntax checks and tests
- Production configurations include retries, SLAs, timeouts, remote logging, and health monitoring