Chapter 8: Connections, Hooks, and Providers
Haiyue
15min
Chapter 8: Connections, Hooks, and Providers
Learning Objectives
- Configure connections via UI, CLI, and environment variables
- Use hooks to interact with external systems (databases, cloud services)
- Install and use provider packages (AWS, GCP, Azure, etc.)
- Build a data pipeline with real external integrations
Knowledge Points
8.1 Understanding Connections
Connections store credentials and configuration for external systems. They are managed centrally and referenced by conn_id in your DAGs.
Creating Connections via CLI:
# Add a PostgreSQL connection
airflow connections add 'my_postgres' \
--conn-type 'postgres' \
--conn-host 'db.example.com' \
--conn-port 5432 \
--conn-login 'airflow_user' \
--conn-password 'secret123' \
--conn-schema 'analytics'
# Add an HTTP connection
airflow connections add 'my_api' \
--conn-type 'http' \
--conn-host 'https://api.example.com' \
--conn-extra '{"headers": {"Authorization": "Bearer token123"}}'
# Add an AWS connection
airflow connections add 'aws_default' \
--conn-type 'aws' \
--conn-extra '{
"aws_access_key_id": "AKIAIOSFODNN7EXAMPLE",
"aws_secret_access_key": "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY",
"region_name": "us-east-1"
}'
# List all connections
airflow connections list
# Delete a connection
airflow connections delete 'my_postgres'
Creating Connections via Environment Variables:
# Format: AIRFLOW_CONN_{CONN_ID}='{conn_type}://{login}:{password}@{host}:{port}/{schema}'
export AIRFLOW_CONN_MY_POSTGRES='postgresql://airflow_user:secret123@db.example.com:5432/analytics'
# With extras as URI query parameters
export AIRFLOW_CONN_MY_API='http://api.example.com:443/?headers=%7B%22Authorization%22%3A+%22Bearer+token123%22%7D'
# JSON format (recommended for complex connections)
export AIRFLOW_CONN_MY_POSTGRES='{
"conn_type": "postgres",
"host": "db.example.com",
"port": 5432,
"login": "airflow_user",
"password": "secret123",
"schema": "analytics"
}'
8.2 Using Hooks
Hooks provide a unified interface to interact with external systems. They manage connection lifecycle and provide helper methods.
"""
File: dags/hooks_demo.py
"""
from airflow.decorators import dag, task
from datetime import datetime
@dag(
dag_id="hooks_demo",
start_date=datetime(2026, 1, 1),
schedule="@daily",
catchup=False,
tags=["hooks"],
)
def hooks_demo():
@task
def query_postgres():
"""Use PostgresHook to query a database."""
from airflow.providers.postgres.hooks.postgres import PostgresHook
hook = PostgresHook(postgres_conn_id="my_postgres")
# Execute a query and get results
records = hook.get_records("SELECT * FROM users LIMIT 5")
print(f"Found {len(records)} records")
# Get results as a pandas DataFrame
df = hook.get_pandas_df("SELECT * FROM users LIMIT 5")
print(f"DataFrame shape: {df.shape}")
# Execute a statement (INSERT, UPDATE, DELETE)
hook.run("INSERT INTO logs (message) VALUES ('DAG executed')")
return {"record_count": len(records)}
@task
def call_http_api():
"""Use HttpHook to make API calls."""
from airflow.providers.http.hooks.http import HttpHook
hook = HttpHook(method="GET", http_conn_id="my_api")
# Make a GET request
response = hook.run(endpoint="/api/v1/status")
data = response.json()
print(f"API Status: {data}")
# POST request with data
response = hook.run(
endpoint="/api/v1/events",
data='{"event": "dag_completed"}',
headers={"Content-Type": "application/json"},
)
return {"status_code": response.status_code}
@task
def interact_with_s3():
"""Use S3Hook to interact with AWS S3."""
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
hook = S3Hook(aws_conn_id="aws_default")
# Upload a file
hook.load_string(
string_data='{"status": "complete"}',
key="reports/daily/status.json",
bucket_name="my-data-bucket",
replace=True,
)
# Check if a file exists
exists = hook.check_for_key(
key="reports/daily/status.json",
bucket_name="my-data-bucket",
)
print(f"File exists: {exists}")
# List files
keys = hook.list_keys(
bucket_name="my-data-bucket",
prefix="reports/daily/",
)
print(f"Files found: {keys}")
query_postgres()
call_http_api()
interact_with_s3()
hooks_demo()
8.3 Provider Packages
Airflow 3.x uses a modular provider system. Install only the providers you need:
# Install common providers
pip install apache-airflow-providers-postgres
pip install apache-airflow-providers-amazon
pip install apache-airflow-providers-google
pip install apache-airflow-providers-microsoft-azure
pip install apache-airflow-providers-http
pip install apache-airflow-providers-slack
pip install apache-airflow-providers-ssh
pip install apache-airflow-providers-docker
# Install multiple providers at once
pip install "apache-airflow[postgres,amazon,google,slack]"
# List installed providers
airflow providers list
Popular Provider Packages:
| Provider | Package | Key Operators/Hooks |
|---|---|---|
| PostgreSQL | providers-postgres | PostgresOperator, PostgresHook |
| MySQL | providers-mysql | MySqlOperator, MySqlHook |
| Amazon AWS | providers-amazon | S3, Redshift, EMR, Lambda operators |
| Google Cloud | providers-google | BigQuery, GCS, Dataflow operators |
| Azure | providers-microsoft-azure | AzureBlob, AzureSQL operators |
| Slack | providers-slack | SlackWebhookOperator |
| Docker | providers-docker | DockerOperator |
| SSH | providers-ssh | SSHOperator, SSHHook |
8.4 Database Operators
"""
File: dags/database_pipeline.py
"""
from airflow.decorators import dag, task
from airflow.providers.postgres.operators.postgres import PostgresOperator
from datetime import datetime
@dag(
dag_id="database_pipeline",
start_date=datetime(2026, 1, 1),
schedule="@daily",
catchup=False,
tags=["database", "providers"],
)
def database_pipeline():
# Create table if not exists
create_table = PostgresOperator(
task_id="create_table",
postgres_conn_id="my_postgres",
sql="""
CREATE TABLE IF NOT EXISTS daily_metrics (
id SERIAL PRIMARY KEY,
metric_date DATE NOT NULL,
page_views INTEGER,
unique_users INTEGER,
revenue DECIMAL(10,2),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
""",
)
# Insert data using templated SQL
insert_data = PostgresOperator(
task_id="insert_data",
postgres_conn_id="my_postgres",
sql="""
INSERT INTO daily_metrics (metric_date, page_views, unique_users, revenue)
VALUES ('{{ ds }}', 15000, 3200, 4500.00)
ON CONFLICT DO NOTHING;
""",
)
# Run a SQL file
run_report = PostgresOperator(
task_id="run_report",
postgres_conn_id="my_postgres",
sql="sql/daily_report.sql", # SQL file in dags/sql/ directory
)
@task
def verify_data(**context):
"""Verify data was inserted correctly."""
from airflow.providers.postgres.hooks.postgres import PostgresHook
hook = PostgresHook(postgres_conn_id="my_postgres")
result = hook.get_first(
"SELECT COUNT(*) FROM daily_metrics WHERE metric_date = %s",
parameters=(context["ds"],),
)
count = result[0]
print(f"Records for {context['ds']}: {count}")
if count == 0:
raise ValueError(f"No data found for {context['ds']}")
create_table >> insert_data >> run_report >> verify_data()
database_pipeline()
8.5 Airflow Variables
Variables store global configuration accessible from any DAG:
"""
File: dags/variables_demo.py
"""
from airflow.decorators import dag, task
from airflow.models import Variable
from datetime import datetime
@dag(
dag_id="variables_demo",
start_date=datetime(2026, 1, 1),
schedule="@daily",
catchup=False,
tags=["variables"],
)
def variables_demo():
@task
def use_variables():
# Get a simple variable
env = Variable.get("environment", default_var="development")
# Get a JSON variable
config = Variable.get("pipeline_config", deserialize_json=True, default_var={
"batch_size": 100,
"target_table": "default_table",
"notify": True,
})
print(f"Environment: {env}")
print(f"Batch size: {config['batch_size']}")
print(f"Target table: {config['target_table']}")
# Set a variable
Variable.set("last_run_status", "success")
Variable.set("last_run_config", {"processed": 500}, serialize_json=True)
use_variables()
variables_demo()
# Manage variables via CLI
airflow variables set environment production
airflow variables set pipeline_config '{"batch_size": 500, "target_table": "prod_table"}' --json
airflow variables get environment
airflow variables list
airflow variables delete environment
8.6 Secrets Backends
For production, store sensitive data in external secrets managers:
# airflow.cfg
# [secrets]
# backend = airflow.providers.amazon.aws.secrets.secrets_manager.SecretsManagerBackend
# backend_kwargs = {"connections_prefix": "airflow/connections", "variables_prefix": "airflow/variables"}
# Store connection in AWS Secrets Manager
aws secretsmanager create-secret \
--name airflow/connections/my_postgres \
--secret-string '{
"conn_type": "postgres",
"host": "db.example.com",
"port": 5432,
"login": "user",
"password": "secret",
"schema": "analytics"
}'
# Store variable in AWS Secrets Manager
aws secretsmanager create-secret \
--name airflow/variables/api_key \
--secret-string "my-secret-api-key"
Practice Exercise
Exercise: End-to-End Integration Pipeline
"""
File: dags/integration_pipeline.py
Build a pipeline that:
1. Fetches data from an HTTP API
2. Stores raw data in a file
3. Processes and transforms the data
4. Loads results into a database
5. Sends a Slack notification
"""
from airflow.decorators import dag, task
from airflow.operators.bash import BashOperator
from datetime import datetime
import json
@dag(
dag_id="integration_pipeline",
start_date=datetime(2026, 1, 1),
schedule="@daily",
catchup=False,
tags=["exercise", "integration"],
)
def integration_pipeline():
@task
def fetch_api_data():
"""Fetch data from an external API."""
# In production, use HttpHook:
# from airflow.providers.http.hooks.http import HttpHook
# hook = HttpHook(http_conn_id="data_api")
# response = hook.run("/api/v1/daily-stats")
# Simulated API response
data = {
"date": "2026-01-15",
"metrics": [
{"region": "US", "revenue": 50000, "orders": 120},
{"region": "EU", "revenue": 35000, "orders": 85},
{"region": "APAC", "revenue": 28000, "orders": 95},
],
}
return data
@task
def save_raw_data(data: dict):
"""Save raw data to local storage."""
output_path = f"/tmp/airflow_data/raw_{data['date']}.json"
# In practice: write to file
print(f"Saved raw data to {output_path}")
return output_path
@task
def transform_data(data: dict):
"""Transform and enrich the data."""
total_revenue = sum(m["revenue"] for m in data["metrics"])
total_orders = sum(m["orders"] for m in data["metrics"])
enriched = {
"date": data["date"],
"total_revenue": total_revenue,
"total_orders": total_orders,
"avg_order_value": round(total_revenue / total_orders, 2),
"top_region": max(data["metrics"], key=lambda x: x["revenue"])["region"],
"regions": len(data["metrics"]),
}
return enriched
@task
def load_to_database(enriched_data: dict):
"""Load transformed data into database."""
# In production: use PostgresHook
# from airflow.providers.postgres.hooks.postgres import PostgresHook
# hook = PostgresHook(postgres_conn_id="analytics_db")
# hook.run("INSERT INTO ...")
print(f"Loading data for {enriched_data['date']}:")
print(f" Total Revenue: ${enriched_data['total_revenue']:,}")
print(f" Total Orders: {enriched_data['total_orders']}")
print(f" Avg Order: ${enriched_data['avg_order_value']}")
return "load_complete"
@task
def send_notification(enriched_data: dict, load_status: str):
"""Send a notification about pipeline completion."""
# In production: use SlackWebhookHook
# from airflow.providers.slack.hooks.slack_webhook import SlackWebhookHook
# hook = SlackWebhookHook(slack_webhook_conn_id="slack_alerts")
message = (
f"Daily Pipeline Complete\n"
f"Date: {enriched_data['date']}\n"
f"Revenue: ${enriched_data['total_revenue']:,}\n"
f"Top Region: {enriched_data['top_region']}\n"
f"Status: {load_status}"
)
print(f"Notification sent:\n{message}")
raw_data = fetch_api_data()
save_raw_data(raw_data)
enriched = transform_data(raw_data)
status = load_to_database(enriched)
send_notification(enriched, status)
integration_pipeline()
Summary
In this chapter, you learned:
- Connections store external system credentials and are referenced by
conn_id - Hooks provide Python interfaces to interact with databases, APIs, and cloud services
- Provider packages add support for specific technologies (AWS, GCP, databases, etc.)
- Variables store global configuration accessible across DAGs
- Secrets backends enable secure credential management in production environments