Chapter 4: Built-in Operators and Sensors
Haiyue
12min
Chapter 4: Built-in Operators and Sensors
Learning Objectives
- Master common operators (BashOperator, PythonOperator, EmailOperator)
- Understand Sensors and their poke/reschedule modes
- Use FileSensor, HttpSensor, and ExternalTaskSensor
- Choose between operators and TaskFlow-decorated functions
Knowledge Points
4.1 BashOperator
The BashOperator executes bash commands or scripts.
"""
File: dags/bash_operator_demo.py
"""
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime
with DAG(
dag_id="bash_operator_demo",
start_date=datetime(2026, 1, 1),
schedule="@daily",
catchup=False,
tags=["operators"],
) as dag:
# Simple command
print_date = BashOperator(
task_id="print_date",
bash_command="date",
)
# Command with templated variables
print_context = BashOperator(
task_id="print_context",
bash_command='echo "Execution date: {{ ds }}, DAG: {{ dag.dag_id }}"',
)
# Run a script file
run_script = BashOperator(
task_id="run_script",
bash_command="/opt/airflow/scripts/process_data.sh ",
# Note: trailing space is required for script files!
)
# Command with environment variables
with_env = BashOperator(
task_id="with_env",
bash_command="echo $MY_VAR",
env={"MY_VAR": "Hello from Airflow!"},
)
# Capture output as XCom
capture_output = BashOperator(
task_id="capture_output",
bash_command="echo 'result_value'",
do_xcom_push=True, # Last line of stdout pushed to XCom
)
print_date >> print_context >> run_script >> with_env >> capture_output
4.2 PythonOperator
The PythonOperator runs Python callables with full access to Airflow context.
"""
File: dags/python_operator_demo.py
"""
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
def extract_data(**context):
"""Extract data with access to Airflow context."""
execution_date = context["ds"]
print(f"Extracting data for {execution_date}")
data = {"records": 42, "source": "api"}
# Push to XCom
context["ti"].xcom_push(key="record_count", value=data["records"])
return data
def transform_data(ti, **context):
"""Transform data pulled from XCom."""
# Pull from XCom
raw_data = ti.xcom_pull(task_ids="extract")
record_count = ti.xcom_pull(task_ids="extract", key="record_count")
print(f"Transforming {record_count} records from {raw_data['source']}")
return {"transformed": True, "count": record_count}
def load_data(target_table, **context):
"""Load data with custom parameters."""
ti = context["ti"]
data = ti.xcom_pull(task_ids="transform")
print(f"Loading {data['count']} records into {target_table}")
with DAG(
dag_id="python_operator_demo",
start_date=datetime(2026, 1, 1),
schedule="@daily",
catchup=False,
tags=["operators"],
) as dag:
extract = PythonOperator(
task_id="extract",
python_callable=extract_data,
)
transform = PythonOperator(
task_id="transform",
python_callable=transform_data,
)
load = PythonOperator(
task_id="load",
python_callable=load_data,
op_kwargs={"target_table": "analytics.weather_data"},
)
extract >> transform >> load
4.3 Other Common Operators
from airflow.operators.empty import EmptyOperator
from airflow.operators.python import BranchPythonOperator
from airflow.operators.email import EmailOperator
# EmptyOperator - Useful as a placeholder or join point
start = EmptyOperator(task_id="start")
end = EmptyOperator(task_id="end")
# EmailOperator - Send email notifications
send_report = EmailOperator(
task_id="send_report",
to="team@example.com",
subject="Daily Report - {{ ds }}",
html_content="""
<h3>Daily Pipeline Report</h3>
<p>Date: {{ ds }}</p>
<p>Status: Complete</p>
""",
)
# BranchPythonOperator - Conditional branching
def choose_branch(**context):
day = context["ds_nodash"]
if int(day[-2:]) % 2 == 0:
return "even_day_task"
return "odd_day_task"
branch = BranchPythonOperator(
task_id="branch",
python_callable=choose_branch,
)
4.4 Sensors
Sensors are special operators that wait for a specific condition to be met before proceeding.
"""
File: dags/sensor_demo.py
"""
from airflow import DAG
from airflow.sensors.filesystem import FileSensor
from airflow.providers.http.sensors.http import HttpSensor
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.sensors.time_delta import TimeDeltaSensor
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
with DAG(
dag_id="sensor_demo",
start_date=datetime(2026, 1, 1),
schedule="@daily",
catchup=False,
tags=["sensors"],
) as dag:
# FileSensor - Wait for a file to appear
wait_for_file = FileSensor(
task_id="wait_for_file",
filepath="/opt/airflow/data/input/daily_data_{{ ds_nodash }}.csv",
poke_interval=30, # Check every 30 seconds
timeout=60 * 60, # Timeout after 1 hour
mode="poke", # Keep the worker slot while waiting
)
# FileSensor with reschedule mode (frees up worker slot)
wait_for_file_reschedule = FileSensor(
task_id="wait_for_file_reschedule",
filepath="/opt/airflow/data/input/report.csv",
poke_interval=300, # Check every 5 minutes
timeout=60 * 60 * 6, # Timeout after 6 hours
mode="reschedule", # Free worker slot between checks
)
# HttpSensor - Wait for an API endpoint to be available
wait_for_api = HttpSensor(
task_id="wait_for_api",
http_conn_id="my_api",
endpoint="/health",
response_check=lambda response: response.status_code == 200,
poke_interval=60,
timeout=300,
)
# ExternalTaskSensor - Wait for another DAG's task to complete
wait_for_upstream = ExternalTaskSensor(
task_id="wait_for_upstream",
external_dag_id="upstream_dag",
external_task_id="final_task",
timeout=3600,
mode="reschedule",
)
# TimeDeltaSensor - Wait for a time duration
wait_5_minutes = TimeDeltaSensor(
task_id="wait_5_minutes",
delta=timedelta(minutes=5),
)
def process_data():
print("Processing data after all sensors are satisfied!")
process = PythonOperator(
task_id="process",
python_callable=process_data,
)
[wait_for_file, wait_for_api] >> process
Sensor Modes:
| Mode | Behavior | Use When |
|---|---|---|
poke | Keeps the worker slot; checks at intervals | Short wait times, frequent checks |
reschedule | Frees the worker slot between checks | Long wait times, expensive resources |
4.5 Operators vs TaskFlow: When to Use Which
"""
File: dags/operators_vs_taskflow.py
Demonstrates when to use operators vs TaskFlow
"""
from airflow.decorators import dag, task
from airflow.operators.bash import BashOperator
from airflow.sensors.filesystem import FileSensor
from datetime import datetime
@dag(
dag_id="operators_vs_taskflow",
start_date=datetime(2026, 1, 1),
schedule="@daily",
catchup=False,
tags=["best-practices"],
)
def operators_vs_taskflow():
# Use OPERATORS for:
# 1. Bash commands
setup_env = BashOperator(
task_id="setup_env",
bash_command="mkdir -p /tmp/airflow_data && echo 'ready'",
)
# 2. Sensors (waiting for conditions)
wait_for_data = FileSensor(
task_id="wait_for_data",
filepath="/tmp/airflow_data/input.csv",
poke_interval=30,
timeout=300,
mode="poke",
soft_fail=True, # Skip instead of fail on timeout
)
# Use TASKFLOW for:
# 1. Python data processing logic
@task
def process_data():
"""Complex Python logic is best as TaskFlow tasks."""
import csv
data = [
{"name": "Alice", "score": 95},
{"name": "Bob", "score": 87},
]
avg_score = sum(d["score"] for d in data) / len(data)
return {"average": avg_score, "count": len(data)}
# 2. Tasks that pass data between each other
@task
def generate_summary(stats: dict):
"""Receiving data from upstream is clean with TaskFlow."""
print(f"Processed {stats['count']} records")
print(f"Average score: {stats['average']}")
return f"Summary: {stats['count']} records, avg={stats['average']}"
@task
def notify(summary: str):
"""Final notification."""
print(f"Sending notification: {summary}")
# Mix operators and TaskFlow tasks
setup_env >> wait_for_data
stats = process_data()
summary = generate_summary(stats)
wait_for_data >> stats
notify(summary)
operators_vs_taskflow()
Practice Exercise
Exercise: Build a File Processing Pipeline
"""
File: dags/file_processing.py
Build a pipeline that:
1. Waits for an input file (sensor)
2. Reads and validates the file (TaskFlow)
3. Processes the data (TaskFlow)
4. Archives the file (BashOperator)
5. Sends a notification (TaskFlow)
"""
from airflow.decorators import dag, task
from airflow.operators.bash import BashOperator
from airflow.sensors.filesystem import FileSensor
from datetime import datetime
@dag(
dag_id="file_processing",
start_date=datetime(2026, 1, 1),
schedule=None, # Manually triggered
catchup=False,
tags=["exercise"],
)
def file_processing():
wait_for_file = FileSensor(
task_id="wait_for_file",
filepath="/tmp/airflow_data/input.csv",
poke_interval=10,
timeout=120,
soft_fail=True,
)
@task
def validate_file():
"""Validate the input file format."""
# Simulate validation
file_info = {
"filename": "input.csv",
"rows": 1000,
"columns": ["id", "name", "value"],
"is_valid": True,
}
if not file_info["is_valid"]:
raise ValueError("File validation failed!")
return file_info
@task
def process_records(file_info: dict):
"""Process the validated records."""
print(f"Processing {file_info['rows']} rows from {file_info['filename']}")
results = {
"processed": file_info["rows"],
"errors": 3,
"success_rate": 99.7,
}
return results
archive = BashOperator(
task_id="archive_file",
bash_command=(
"mkdir -p /tmp/airflow_data/archive && "
"mv /tmp/airflow_data/input.csv "
"/tmp/airflow_data/archive/input_{{ ds_nodash }}.csv 2>/dev/null; "
"echo 'archived'"
),
)
@task
def send_notification(results: dict):
"""Send processing results notification."""
print(f"Processing complete!")
print(f" Records: {results['processed']}")
print(f" Errors: {results['errors']}")
print(f" Success rate: {results['success_rate']}%")
# Define flow
file_info = validate_file()
results = process_records(file_info)
wait_for_file >> file_info
results >> archive
send_notification(results)
file_processing()
Summary
In this chapter, you learned:
- BashOperator executes shell commands with Jinja templating support
- PythonOperator runs Python callables with full Airflow context access
- Sensors wait for external conditions using poke or reschedule modes
- TaskFlow is preferred for Python logic; operators are best for shell commands and sensors
- You can mix operators and TaskFlow tasks in the same DAG