Chapter 8: Connections, Hooks, and Providers

Haiyue
15min

Chapter 8: Connections, Hooks, and Providers

Learning Objectives
  • Configure connections via UI, CLI, and environment variables
  • Use hooks to interact with external systems (databases, cloud services)
  • Install and use provider packages (AWS, GCP, Azure, etc.)
  • Build a data pipeline with real external integrations

Knowledge Points

8.1 Understanding Connections

Connections store credentials and configuration for external systems. They are managed centrally and referenced by conn_id in your DAGs.

Creating Connections via CLI:

# Add a PostgreSQL connection
airflow connections add 'my_postgres' \
    --conn-type 'postgres' \
    --conn-host 'db.example.com' \
    --conn-port 5432 \
    --conn-login 'airflow_user' \
    --conn-password 'secret123' \
    --conn-schema 'analytics'

# Add an HTTP connection
airflow connections add 'my_api' \
    --conn-type 'http' \
    --conn-host 'https://api.example.com' \
    --conn-extra '{"headers": {"Authorization": "Bearer token123"}}'

# Add an AWS connection
airflow connections add 'aws_default' \
    --conn-type 'aws' \
    --conn-extra '{
        "aws_access_key_id": "AKIAIOSFODNN7EXAMPLE",
        "aws_secret_access_key": "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY",
        "region_name": "us-east-1"
    }'

# List all connections
airflow connections list

# Delete a connection
airflow connections delete 'my_postgres'

Creating Connections via Environment Variables:

# Format: AIRFLOW_CONN_{CONN_ID}='{conn_type}://{login}:{password}@{host}:{port}/{schema}'
export AIRFLOW_CONN_MY_POSTGRES='postgresql://airflow_user:secret123@db.example.com:5432/analytics'

# With extras as URI query parameters
export AIRFLOW_CONN_MY_API='http://api.example.com:443/?headers=%7B%22Authorization%22%3A+%22Bearer+token123%22%7D'

# JSON format (recommended for complex connections)
export AIRFLOW_CONN_MY_POSTGRES='{
    "conn_type": "postgres",
    "host": "db.example.com",
    "port": 5432,
    "login": "airflow_user",
    "password": "secret123",
    "schema": "analytics"
}'

8.2 Using Hooks

Hooks provide a unified interface to interact with external systems. They manage connection lifecycle and provide helper methods.

"""
File: dags/hooks_demo.py
"""
from airflow.decorators import dag, task
from datetime import datetime

@dag(
    dag_id="hooks_demo",
    start_date=datetime(2026, 1, 1),
    schedule="@daily",
    catchup=False,
    tags=["hooks"],
)
def hooks_demo():

    @task
    def query_postgres():
        """Use PostgresHook to query a database."""
        from airflow.providers.postgres.hooks.postgres import PostgresHook

        hook = PostgresHook(postgres_conn_id="my_postgres")

        # Execute a query and get results
        records = hook.get_records("SELECT * FROM users LIMIT 5")
        print(f"Found {len(records)} records")

        # Get results as a pandas DataFrame
        df = hook.get_pandas_df("SELECT * FROM users LIMIT 5")
        print(f"DataFrame shape: {df.shape}")

        # Execute a statement (INSERT, UPDATE, DELETE)
        hook.run("INSERT INTO logs (message) VALUES ('DAG executed')")

        return {"record_count": len(records)}

    @task
    def call_http_api():
        """Use HttpHook to make API calls."""
        from airflow.providers.http.hooks.http import HttpHook

        hook = HttpHook(method="GET", http_conn_id="my_api")

        # Make a GET request
        response = hook.run(endpoint="/api/v1/status")
        data = response.json()
        print(f"API Status: {data}")

        # POST request with data
        response = hook.run(
            endpoint="/api/v1/events",
            data='{"event": "dag_completed"}',
            headers={"Content-Type": "application/json"},
        )

        return {"status_code": response.status_code}

    @task
    def interact_with_s3():
        """Use S3Hook to interact with AWS S3."""
        from airflow.providers.amazon.aws.hooks.s3 import S3Hook

        hook = S3Hook(aws_conn_id="aws_default")

        # Upload a file
        hook.load_string(
            string_data='{"status": "complete"}',
            key="reports/daily/status.json",
            bucket_name="my-data-bucket",
            replace=True,
        )

        # Check if a file exists
        exists = hook.check_for_key(
            key="reports/daily/status.json",
            bucket_name="my-data-bucket",
        )
        print(f"File exists: {exists}")

        # List files
        keys = hook.list_keys(
            bucket_name="my-data-bucket",
            prefix="reports/daily/",
        )
        print(f"Files found: {keys}")

    query_postgres()
    call_http_api()
    interact_with_s3()

hooks_demo()

8.3 Provider Packages

Airflow 3.x uses a modular provider system. Install only the providers you need:

# Install common providers
pip install apache-airflow-providers-postgres
pip install apache-airflow-providers-amazon
pip install apache-airflow-providers-google
pip install apache-airflow-providers-microsoft-azure
pip install apache-airflow-providers-http
pip install apache-airflow-providers-slack
pip install apache-airflow-providers-ssh
pip install apache-airflow-providers-docker

# Install multiple providers at once
pip install "apache-airflow[postgres,amazon,google,slack]"

# List installed providers
airflow providers list

Popular Provider Packages:

ProviderPackageKey Operators/Hooks
PostgreSQLproviders-postgresPostgresOperator, PostgresHook
MySQLproviders-mysqlMySqlOperator, MySqlHook
Amazon AWSproviders-amazonS3, Redshift, EMR, Lambda operators
Google Cloudproviders-googleBigQuery, GCS, Dataflow operators
Azureproviders-microsoft-azureAzureBlob, AzureSQL operators
Slackproviders-slackSlackWebhookOperator
Dockerproviders-dockerDockerOperator
SSHproviders-sshSSHOperator, SSHHook

8.4 Database Operators

"""
File: dags/database_pipeline.py
"""
from airflow.decorators import dag, task
from airflow.providers.postgres.operators.postgres import PostgresOperator
from datetime import datetime

@dag(
    dag_id="database_pipeline",
    start_date=datetime(2026, 1, 1),
    schedule="@daily",
    catchup=False,
    tags=["database", "providers"],
)
def database_pipeline():

    # Create table if not exists
    create_table = PostgresOperator(
        task_id="create_table",
        postgres_conn_id="my_postgres",
        sql="""
        CREATE TABLE IF NOT EXISTS daily_metrics (
            id SERIAL PRIMARY KEY,
            metric_date DATE NOT NULL,
            page_views INTEGER,
            unique_users INTEGER,
            revenue DECIMAL(10,2),
            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        );
        """,
    )

    # Insert data using templated SQL
    insert_data = PostgresOperator(
        task_id="insert_data",
        postgres_conn_id="my_postgres",
        sql="""
        INSERT INTO daily_metrics (metric_date, page_views, unique_users, revenue)
        VALUES ('{{ ds }}', 15000, 3200, 4500.00)
        ON CONFLICT DO NOTHING;
        """,
    )

    # Run a SQL file
    run_report = PostgresOperator(
        task_id="run_report",
        postgres_conn_id="my_postgres",
        sql="sql/daily_report.sql",  # SQL file in dags/sql/ directory
    )

    @task
    def verify_data(**context):
        """Verify data was inserted correctly."""
        from airflow.providers.postgres.hooks.postgres import PostgresHook

        hook = PostgresHook(postgres_conn_id="my_postgres")
        result = hook.get_first(
            "SELECT COUNT(*) FROM daily_metrics WHERE metric_date = %s",
            parameters=(context["ds"],),
        )
        count = result[0]
        print(f"Records for {context['ds']}: {count}")
        if count == 0:
            raise ValueError(f"No data found for {context['ds']}")

    create_table >> insert_data >> run_report >> verify_data()

database_pipeline()

8.5 Airflow Variables

Variables store global configuration accessible from any DAG:

"""
File: dags/variables_demo.py
"""
from airflow.decorators import dag, task
from airflow.models import Variable
from datetime import datetime

@dag(
    dag_id="variables_demo",
    start_date=datetime(2026, 1, 1),
    schedule="@daily",
    catchup=False,
    tags=["variables"],
)
def variables_demo():

    @task
    def use_variables():
        # Get a simple variable
        env = Variable.get("environment", default_var="development")

        # Get a JSON variable
        config = Variable.get("pipeline_config", deserialize_json=True, default_var={
            "batch_size": 100,
            "target_table": "default_table",
            "notify": True,
        })

        print(f"Environment: {env}")
        print(f"Batch size: {config['batch_size']}")
        print(f"Target table: {config['target_table']}")

        # Set a variable
        Variable.set("last_run_status", "success")
        Variable.set("last_run_config", {"processed": 500}, serialize_json=True)

    use_variables()

variables_demo()
# Manage variables via CLI
airflow variables set environment production
airflow variables set pipeline_config '{"batch_size": 500, "target_table": "prod_table"}' --json
airflow variables get environment
airflow variables list
airflow variables delete environment

8.6 Secrets Backends

For production, store sensitive data in external secrets managers:

# airflow.cfg
# [secrets]
# backend = airflow.providers.amazon.aws.secrets.secrets_manager.SecretsManagerBackend
# backend_kwargs = {"connections_prefix": "airflow/connections", "variables_prefix": "airflow/variables"}
# Store connection in AWS Secrets Manager
aws secretsmanager create-secret \
    --name airflow/connections/my_postgres \
    --secret-string '{
        "conn_type": "postgres",
        "host": "db.example.com",
        "port": 5432,
        "login": "user",
        "password": "secret",
        "schema": "analytics"
    }'

# Store variable in AWS Secrets Manager
aws secretsmanager create-secret \
    --name airflow/variables/api_key \
    --secret-string "my-secret-api-key"

Practice Exercise

Exercise: End-to-End Integration Pipeline

"""
File: dags/integration_pipeline.py
Build a pipeline that:
1. Fetches data from an HTTP API
2. Stores raw data in a file
3. Processes and transforms the data
4. Loads results into a database
5. Sends a Slack notification
"""
from airflow.decorators import dag, task
from airflow.operators.bash import BashOperator
from datetime import datetime
import json

@dag(
    dag_id="integration_pipeline",
    start_date=datetime(2026, 1, 1),
    schedule="@daily",
    catchup=False,
    tags=["exercise", "integration"],
)
def integration_pipeline():

    @task
    def fetch_api_data():
        """Fetch data from an external API."""
        # In production, use HttpHook:
        # from airflow.providers.http.hooks.http import HttpHook
        # hook = HttpHook(http_conn_id="data_api")
        # response = hook.run("/api/v1/daily-stats")

        # Simulated API response
        data = {
            "date": "2026-01-15",
            "metrics": [
                {"region": "US", "revenue": 50000, "orders": 120},
                {"region": "EU", "revenue": 35000, "orders": 85},
                {"region": "APAC", "revenue": 28000, "orders": 95},
            ],
        }
        return data

    @task
    def save_raw_data(data: dict):
        """Save raw data to local storage."""
        output_path = f"/tmp/airflow_data/raw_{data['date']}.json"
        # In practice: write to file
        print(f"Saved raw data to {output_path}")
        return output_path

    @task
    def transform_data(data: dict):
        """Transform and enrich the data."""
        total_revenue = sum(m["revenue"] for m in data["metrics"])
        total_orders = sum(m["orders"] for m in data["metrics"])

        enriched = {
            "date": data["date"],
            "total_revenue": total_revenue,
            "total_orders": total_orders,
            "avg_order_value": round(total_revenue / total_orders, 2),
            "top_region": max(data["metrics"], key=lambda x: x["revenue"])["region"],
            "regions": len(data["metrics"]),
        }
        return enriched

    @task
    def load_to_database(enriched_data: dict):
        """Load transformed data into database."""
        # In production: use PostgresHook
        # from airflow.providers.postgres.hooks.postgres import PostgresHook
        # hook = PostgresHook(postgres_conn_id="analytics_db")
        # hook.run("INSERT INTO ...")

        print(f"Loading data for {enriched_data['date']}:")
        print(f"  Total Revenue: ${enriched_data['total_revenue']:,}")
        print(f"  Total Orders: {enriched_data['total_orders']}")
        print(f"  Avg Order: ${enriched_data['avg_order_value']}")
        return "load_complete"

    @task
    def send_notification(enriched_data: dict, load_status: str):
        """Send a notification about pipeline completion."""
        # In production: use SlackWebhookHook
        # from airflow.providers.slack.hooks.slack_webhook import SlackWebhookHook
        # hook = SlackWebhookHook(slack_webhook_conn_id="slack_alerts")

        message = (
            f"Daily Pipeline Complete\n"
            f"Date: {enriched_data['date']}\n"
            f"Revenue: ${enriched_data['total_revenue']:,}\n"
            f"Top Region: {enriched_data['top_region']}\n"
            f"Status: {load_status}"
        )
        print(f"Notification sent:\n{message}")

    raw_data = fetch_api_data()
    save_raw_data(raw_data)
    enriched = transform_data(raw_data)
    status = load_to_database(enriched)
    send_notification(enriched, status)

integration_pipeline()

Summary

In this chapter, you learned:

  • Connections store external system credentials and are referenced by conn_id
  • Hooks provide Python interfaces to interact with databases, APIs, and cloud services
  • Provider packages add support for specific technologies (AWS, GCP, databases, etc.)
  • Variables store global configuration accessible across DAGs
  • Secrets backends enable secure credential management in production environments