Chapter 4: Built-in Operators and Sensors

Haiyue
12min

Chapter 4: Built-in Operators and Sensors

Learning Objectives
  • Master common operators (BashOperator, PythonOperator, EmailOperator)
  • Understand Sensors and their poke/reschedule modes
  • Use FileSensor, HttpSensor, and ExternalTaskSensor
  • Choose between operators and TaskFlow-decorated functions

Knowledge Points

4.1 BashOperator

The BashOperator executes bash commands or scripts.

"""
File: dags/bash_operator_demo.py
"""
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime

with DAG(
    dag_id="bash_operator_demo",
    start_date=datetime(2026, 1, 1),
    schedule="@daily",
    catchup=False,
    tags=["operators"],
) as dag:

    # Simple command
    print_date = BashOperator(
        task_id="print_date",
        bash_command="date",
    )

    # Command with templated variables
    print_context = BashOperator(
        task_id="print_context",
        bash_command='echo "Execution date: {{ ds }}, DAG: {{ dag.dag_id }}"',
    )

    # Run a script file
    run_script = BashOperator(
        task_id="run_script",
        bash_command="/opt/airflow/scripts/process_data.sh ",
        # Note: trailing space is required for script files!
    )

    # Command with environment variables
    with_env = BashOperator(
        task_id="with_env",
        bash_command="echo $MY_VAR",
        env={"MY_VAR": "Hello from Airflow!"},
    )

    # Capture output as XCom
    capture_output = BashOperator(
        task_id="capture_output",
        bash_command="echo 'result_value'",
        do_xcom_push=True,  # Last line of stdout pushed to XCom
    )

    print_date >> print_context >> run_script >> with_env >> capture_output

4.2 PythonOperator

The PythonOperator runs Python callables with full access to Airflow context.

"""
File: dags/python_operator_demo.py
"""
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

def extract_data(**context):
    """Extract data with access to Airflow context."""
    execution_date = context["ds"]
    print(f"Extracting data for {execution_date}")
    data = {"records": 42, "source": "api"}
    # Push to XCom
    context["ti"].xcom_push(key="record_count", value=data["records"])
    return data

def transform_data(ti, **context):
    """Transform data pulled from XCom."""
    # Pull from XCom
    raw_data = ti.xcom_pull(task_ids="extract")
    record_count = ti.xcom_pull(task_ids="extract", key="record_count")
    print(f"Transforming {record_count} records from {raw_data['source']}")
    return {"transformed": True, "count": record_count}

def load_data(target_table, **context):
    """Load data with custom parameters."""
    ti = context["ti"]
    data = ti.xcom_pull(task_ids="transform")
    print(f"Loading {data['count']} records into {target_table}")

with DAG(
    dag_id="python_operator_demo",
    start_date=datetime(2026, 1, 1),
    schedule="@daily",
    catchup=False,
    tags=["operators"],
) as dag:

    extract = PythonOperator(
        task_id="extract",
        python_callable=extract_data,
    )

    transform = PythonOperator(
        task_id="transform",
        python_callable=transform_data,
    )

    load = PythonOperator(
        task_id="load",
        python_callable=load_data,
        op_kwargs={"target_table": "analytics.weather_data"},
    )

    extract >> transform >> load

4.3 Other Common Operators

from airflow.operators.empty import EmptyOperator
from airflow.operators.python import BranchPythonOperator
from airflow.operators.email import EmailOperator

# EmptyOperator - Useful as a placeholder or join point
start = EmptyOperator(task_id="start")
end = EmptyOperator(task_id="end")

# EmailOperator - Send email notifications
send_report = EmailOperator(
    task_id="send_report",
    to="team@example.com",
    subject="Daily Report - {{ ds }}",
    html_content="""
    <h3>Daily Pipeline Report</h3>
    <p>Date: {{ ds }}</p>
    <p>Status: Complete</p>
    """,
)

# BranchPythonOperator - Conditional branching
def choose_branch(**context):
    day = context["ds_nodash"]
    if int(day[-2:]) % 2 == 0:
        return "even_day_task"
    return "odd_day_task"

branch = BranchPythonOperator(
    task_id="branch",
    python_callable=choose_branch,
)

4.4 Sensors

Sensors are special operators that wait for a specific condition to be met before proceeding.

"""
File: dags/sensor_demo.py
"""
from airflow import DAG
from airflow.sensors.filesystem import FileSensor
from airflow.providers.http.sensors.http import HttpSensor
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.sensors.time_delta import TimeDeltaSensor
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

with DAG(
    dag_id="sensor_demo",
    start_date=datetime(2026, 1, 1),
    schedule="@daily",
    catchup=False,
    tags=["sensors"],
) as dag:

    # FileSensor - Wait for a file to appear
    wait_for_file = FileSensor(
        task_id="wait_for_file",
        filepath="/opt/airflow/data/input/daily_data_{{ ds_nodash }}.csv",
        poke_interval=30,       # Check every 30 seconds
        timeout=60 * 60,        # Timeout after 1 hour
        mode="poke",            # Keep the worker slot while waiting
    )

    # FileSensor with reschedule mode (frees up worker slot)
    wait_for_file_reschedule = FileSensor(
        task_id="wait_for_file_reschedule",
        filepath="/opt/airflow/data/input/report.csv",
        poke_interval=300,      # Check every 5 minutes
        timeout=60 * 60 * 6,   # Timeout after 6 hours
        mode="reschedule",      # Free worker slot between checks
    )

    # HttpSensor - Wait for an API endpoint to be available
    wait_for_api = HttpSensor(
        task_id="wait_for_api",
        http_conn_id="my_api",
        endpoint="/health",
        response_check=lambda response: response.status_code == 200,
        poke_interval=60,
        timeout=300,
    )

    # ExternalTaskSensor - Wait for another DAG's task to complete
    wait_for_upstream = ExternalTaskSensor(
        task_id="wait_for_upstream",
        external_dag_id="upstream_dag",
        external_task_id="final_task",
        timeout=3600,
        mode="reschedule",
    )

    # TimeDeltaSensor - Wait for a time duration
    wait_5_minutes = TimeDeltaSensor(
        task_id="wait_5_minutes",
        delta=timedelta(minutes=5),
    )

    def process_data():
        print("Processing data after all sensors are satisfied!")

    process = PythonOperator(
        task_id="process",
        python_callable=process_data,
    )

    [wait_for_file, wait_for_api] >> process

Sensor Modes:

ModeBehaviorUse When
pokeKeeps the worker slot; checks at intervalsShort wait times, frequent checks
rescheduleFrees the worker slot between checksLong wait times, expensive resources

4.5 Operators vs TaskFlow: When to Use Which

"""
File: dags/operators_vs_taskflow.py
Demonstrates when to use operators vs TaskFlow
"""
from airflow.decorators import dag, task
from airflow.operators.bash import BashOperator
from airflow.sensors.filesystem import FileSensor
from datetime import datetime

@dag(
    dag_id="operators_vs_taskflow",
    start_date=datetime(2026, 1, 1),
    schedule="@daily",
    catchup=False,
    tags=["best-practices"],
)
def operators_vs_taskflow():

    # Use OPERATORS for:
    # 1. Bash commands
    setup_env = BashOperator(
        task_id="setup_env",
        bash_command="mkdir -p /tmp/airflow_data && echo 'ready'",
    )

    # 2. Sensors (waiting for conditions)
    wait_for_data = FileSensor(
        task_id="wait_for_data",
        filepath="/tmp/airflow_data/input.csv",
        poke_interval=30,
        timeout=300,
        mode="poke",
        soft_fail=True,  # Skip instead of fail on timeout
    )

    # Use TASKFLOW for:
    # 1. Python data processing logic
    @task
    def process_data():
        """Complex Python logic is best as TaskFlow tasks."""
        import csv
        data = [
            {"name": "Alice", "score": 95},
            {"name": "Bob", "score": 87},
        ]
        avg_score = sum(d["score"] for d in data) / len(data)
        return {"average": avg_score, "count": len(data)}

    # 2. Tasks that pass data between each other
    @task
    def generate_summary(stats: dict):
        """Receiving data from upstream is clean with TaskFlow."""
        print(f"Processed {stats['count']} records")
        print(f"Average score: {stats['average']}")
        return f"Summary: {stats['count']} records, avg={stats['average']}"

    @task
    def notify(summary: str):
        """Final notification."""
        print(f"Sending notification: {summary}")

    # Mix operators and TaskFlow tasks
    setup_env >> wait_for_data
    stats = process_data()
    summary = generate_summary(stats)
    wait_for_data >> stats
    notify(summary)

operators_vs_taskflow()

Practice Exercise

Exercise: Build a File Processing Pipeline

"""
File: dags/file_processing.py
Build a pipeline that:
1. Waits for an input file (sensor)
2. Reads and validates the file (TaskFlow)
3. Processes the data (TaskFlow)
4. Archives the file (BashOperator)
5. Sends a notification (TaskFlow)
"""
from airflow.decorators import dag, task
from airflow.operators.bash import BashOperator
from airflow.sensors.filesystem import FileSensor
from datetime import datetime

@dag(
    dag_id="file_processing",
    start_date=datetime(2026, 1, 1),
    schedule=None,  # Manually triggered
    catchup=False,
    tags=["exercise"],
)
def file_processing():

    wait_for_file = FileSensor(
        task_id="wait_for_file",
        filepath="/tmp/airflow_data/input.csv",
        poke_interval=10,
        timeout=120,
        soft_fail=True,
    )

    @task
    def validate_file():
        """Validate the input file format."""
        # Simulate validation
        file_info = {
            "filename": "input.csv",
            "rows": 1000,
            "columns": ["id", "name", "value"],
            "is_valid": True,
        }
        if not file_info["is_valid"]:
            raise ValueError("File validation failed!")
        return file_info

    @task
    def process_records(file_info: dict):
        """Process the validated records."""
        print(f"Processing {file_info['rows']} rows from {file_info['filename']}")
        results = {
            "processed": file_info["rows"],
            "errors": 3,
            "success_rate": 99.7,
        }
        return results

    archive = BashOperator(
        task_id="archive_file",
        bash_command=(
            "mkdir -p /tmp/airflow_data/archive && "
            "mv /tmp/airflow_data/input.csv "
            "/tmp/airflow_data/archive/input_{{ ds_nodash }}.csv 2>/dev/null; "
            "echo 'archived'"
        ),
    )

    @task
    def send_notification(results: dict):
        """Send processing results notification."""
        print(f"Processing complete!")
        print(f"  Records: {results['processed']}")
        print(f"  Errors: {results['errors']}")
        print(f"  Success rate: {results['success_rate']}%")

    # Define flow
    file_info = validate_file()
    results = process_records(file_info)
    wait_for_file >> file_info
    results >> archive
    send_notification(results)

file_processing()

Summary

In this chapter, you learned:

  • BashOperator executes shell commands with Jinja templating support
  • PythonOperator runs Python callables with full Airflow context access
  • Sensors wait for external conditions using poke or reschedule modes
  • TaskFlow is preferred for Python logic; operators are best for shell commands and sensors
  • You can mix operators and TaskFlow tasks in the same DAG