Chapter 7: Scheduling and Timetables
Haiyue
12min
Chapter 7: Scheduling and Timetables
Learning Objectives
- Master cron expressions and preset schedules
- Use Timetable API for custom scheduling logic
- Understand data intervals and logical dates
- Configure catchup, backfill, and max_active_runs
Knowledge Points
7.1 Schedule Presets
Airflow provides convenient preset schedule values:
from airflow.decorators import dag
from datetime import datetime
# Using preset schedules
@dag(schedule="@daily") # Runs daily at midnight
def daily_dag(): ...
@dag(schedule="@hourly") # Runs at the start of every hour
def hourly_dag(): ...
@dag(schedule="@weekly") # Runs every Sunday at midnight
def weekly_dag(): ...
@dag(schedule="@monthly") # Runs on the 1st of every month
def monthly_dag(): ...
@dag(schedule="@yearly") # Runs on January 1st
def yearly_dag(): ...
@dag(schedule=None) # Manual trigger only
def manual_dag(): ...
7.2 Cron Expressions
"""
File: dags/cron_schedules.py
"""
from airflow.decorators import dag, task
from datetime import datetime
# Cron format: minute hour day_of_month month day_of_week
# ┌───────────── minute (0-59)
# │ ┌───────────── hour (0-23)
# │ │ ┌───────────── day of month (1-31)
# │ │ │ ┌───────────── month (1-12)
# │ │ │ │ ┌───────────── day of week (0-6, Sunday=0)
# * * * * *
@dag(
dag_id="business_hours_report",
start_date=datetime(2026, 1, 1),
schedule="0 9 * * 1-5", # 9 AM, Monday to Friday
catchup=False,
tags=["scheduling"],
)
def business_hours_report():
@task
def generate_report():
print("Generating business hours report...")
generate_report()
business_hours_report()
@dag(
dag_id="quarterly_report",
start_date=datetime(2026, 1, 1),
schedule="0 0 1 */3 *", # 1st day of every 3rd month
catchup=False,
tags=["scheduling"],
)
def quarterly_report():
@task
def generate_quarterly():
print("Generating quarterly report...")
generate_quarterly()
quarterly_report()
Common Cron Patterns:
| Expression | Description |
|---|---|
0 * * * * | Every hour at minute 0 |
*/15 * * * * | Every 15 minutes |
0 6 * * * | Every day at 6 AM |
0 9 * * 1-5 | Weekdays at 9 AM |
0 0 1 * * | First of every month at midnight |
0 0 * * 0 | Every Sunday at midnight |
30 14 1,15 * * | 1st and 15th at 2 PM |
7.3 Data Intervals and Logical Dates
In Airflow 3.x, scheduling is based on data intervals rather than execution dates.
"""
File: dags/data_intervals_demo.py
"""
from airflow.decorators import dag, task
from datetime import datetime
@dag(
dag_id="data_intervals_demo",
start_date=datetime(2026, 1, 1),
schedule="@daily",
catchup=False,
tags=["scheduling", "intervals"],
)
def data_intervals_demo():
@task
def show_intervals(**context):
"""Display the data interval for this run."""
# The time period this run covers
data_interval_start = context["data_interval_start"]
data_interval_end = context["data_interval_end"]
# Logical date (= data_interval_start for regular schedules)
logical_date = context["logical_date"]
# Template variables
ds = context["ds"] # YYYY-MM-DD
ds_nodash = context["ds_nodash"] # YYYYMMDD
print(f"Logical date: {logical_date}")
print(f"Data interval start: {data_interval_start}")
print(f"Data interval end: {data_interval_end}")
print(f"ds: {ds}")
print(f"ds_nodash: {ds_nodash}")
# For a @daily DAG running on 2026-01-15:
# logical_date: 2026-01-15T00:00:00+00:00
# data_interval_start: 2026-01-15T00:00:00+00:00
# data_interval_end: 2026-01-16T00:00:00+00:00
@task
def query_with_interval(**context):
"""Use data interval to query the correct data range."""
start = context["data_interval_start"].isoformat()
end = context["data_interval_end"].isoformat()
query = f"""
SELECT *
FROM events
WHERE event_time >= '{start}'
AND event_time < '{end}'
"""
print(f"Running query:\n{query}")
show_intervals()
query_with_interval()
data_intervals_demo()
7.4 Catchup and Backfill
"""
File: dags/catchup_demo.py
"""
from airflow.decorators import dag, task
from datetime import datetime
# With catchup=True: Airflow will create runs for all missed intervals
@dag(
dag_id="catchup_enabled",
start_date=datetime(2026, 1, 1),
schedule="@daily",
catchup=True, # Will backfill from start_date to now
max_active_runs=3, # Limit concurrent runs
tags=["scheduling", "catchup"],
)
def catchup_enabled():
@task
def process(**context):
ds = context["ds"]
print(f"Processing data for {ds}")
process()
catchup_enabled()
# With catchup=False: Only the latest interval is triggered
@dag(
dag_id="catchup_disabled",
start_date=datetime(2026, 1, 1),
schedule="@daily",
catchup=False, # Only run the most recent interval
tags=["scheduling", "catchup"],
)
def catchup_disabled():
@task
def process(**context):
ds = context["ds"]
print(f"Processing data for {ds}")
process()
catchup_disabled()
# Manual backfill via CLI
airflow dags backfill \
--start-date 2026-01-01 \
--end-date 2026-01-31 \
catchup_disabled
7.5 Timetable API (Custom Schedules)
For schedules that cron cannot express, use the Timetable API:
"""
File: plugins/business_day_timetable.py
A custom timetable that only schedules on business days.
"""
from datetime import timedelta
from pendulum import DateTime, Time, timezone
from airflow.timetables.base import DagRunInfo, DataInterval, TimeRestriction, Timetable
class BusinessDayTimetable(Timetable):
"""Schedule DAG runs only on business days (Mon-Fri)."""
description = "Runs on business days at 9:00 AM UTC"
def __init__(self, run_time: Time = Time(9, 0)):
self.run_time = run_time
def next_dagrun_info(
self,
*,
last_automated_data_interval: DataInterval | None,
restriction: TimeRestriction,
) -> DagRunInfo | None:
utc = timezone("UTC")
if last_automated_data_interval is None:
# First run
next_start = restriction.earliest
if next_start is None:
return None
next_start = next_start.set(
hour=self.run_time.hour,
minute=self.run_time.minute,
second=0,
)
else:
next_start = last_automated_data_interval.end + timedelta(days=1)
# Skip weekends
while next_start.weekday() >= 5: # Saturday=5, Sunday=6
next_start = next_start + timedelta(days=1)
next_start = next_start.set(
hour=self.run_time.hour,
minute=self.run_time.minute,
second=0,
)
if restriction.latest is not None and next_start > restriction.latest:
return None
next_end = next_start + timedelta(days=1)
# Skip to next business day for the end
while next_end.weekday() >= 5:
next_end = next_end + timedelta(days=1)
return DagRunInfo.interval(
start=next_start,
end=next_end,
)
def serialize(self):
return {"run_time": self.run_time.isoformat()}
@classmethod
def deserialize(cls, data):
return cls(run_time=Time.fromisoformat(data["run_time"]))
"""
File: dags/business_day_dag.py
Use the custom timetable.
"""
from airflow.decorators import dag, task
from datetime import datetime
# Import the custom timetable from plugins
# from business_day_timetable import BusinessDayTimetable
@dag(
dag_id="business_day_dag",
start_date=datetime(2026, 1, 1),
# schedule=BusinessDayTimetable(), # Use custom timetable
schedule="0 9 * * 1-5", # Fallback: cron equivalent (simpler)
catchup=False,
tags=["scheduling", "timetable"],
)
def business_day_dag():
@task
def morning_report(**context):
print(f"Business day report for {context['ds']}")
morning_report()
business_day_dag()
7.6 Timezone Configuration
"""
File: dags/timezone_demo.py
"""
from airflow.decorators import dag, task
from datetime import datetime
import pendulum
# Use timezone-aware start_date
local_tz = pendulum.timezone("America/New_York")
@dag(
dag_id="timezone_demo",
start_date=datetime(2026, 1, 1, tzinfo=local_tz),
schedule="0 9 * * *", # 9 AM New York time
catchup=False,
tags=["scheduling", "timezone"],
)
def timezone_demo():
@task
def check_timezone(**context):
logical_date = context["logical_date"]
print(f"Logical date: {logical_date}")
print(f"Timezone: {logical_date.timezone_name}")
check_timezone()
timezone_demo()
Practice Exercise
Exercise: Multi-Schedule Pipeline
"""
File: dags/multi_schedule.py
Create a set of DAGs with different schedules:
1. Real-time monitoring (every 5 minutes)
2. Hourly aggregation
3. Daily summary report
"""
from airflow.decorators import dag, task
from datetime import datetime
@dag(
dag_id="monitor_5min",
start_date=datetime(2026, 1, 1),
schedule="*/5 * * * *",
catchup=False,
max_active_runs=1,
tags=["exercise", "monitoring"],
)
def monitor_5min():
@task
def check_metrics():
import random
metrics = {
"cpu": round(random.uniform(10, 95), 1),
"memory": round(random.uniform(30, 85), 1),
"latency_ms": round(random.uniform(5, 200), 1),
}
print(f"Metrics: {metrics}")
if metrics["cpu"] > 90 or metrics["latency_ms"] > 150:
print("ALERT: Threshold exceeded!")
return metrics
check_metrics()
monitor_5min()
@dag(
dag_id="hourly_aggregation",
start_date=datetime(2026, 1, 1),
schedule="@hourly",
catchup=False,
tags=["exercise", "aggregation"],
)
def hourly_aggregation():
@task
def aggregate(**context):
start = context["data_interval_start"]
end = context["data_interval_end"]
print(f"Aggregating metrics from {start} to {end}")
aggregated = {
"avg_cpu": 45.3,
"max_cpu": 92.1,
"avg_latency": 35.6,
"p99_latency": 180.2,
"data_points": 12,
}
print(f"Aggregated: {aggregated}")
return aggregated
aggregate()
hourly_aggregation()
@dag(
dag_id="daily_summary",
start_date=datetime(2026, 1, 1),
schedule="0 23 * * *", # 11 PM daily
catchup=False,
tags=["exercise", "reporting"],
)
def daily_summary():
@task
def generate_summary(**context):
ds = context["ds"]
summary = {
"date": ds,
"total_alerts": 3,
"avg_cpu": 42.1,
"peak_cpu_hour": "14:00",
"avg_latency": 28.5,
"uptime_percent": 99.97,
}
print(f"Daily Summary for {ds}:")
for key, value in summary.items():
print(f" {key}: {value}")
generate_summary()
daily_summary()
Summary
In this chapter, you learned:
- Airflow supports cron expressions and convenient preset schedules (
@daily,@hourly, etc.) - Data intervals define the time period each DAG run covers
- Catchup controls whether missed intervals are backfilled automatically
- The Timetable API enables custom scheduling logic beyond cron capabilities
- Timezone-aware scheduling ensures DAGs run at the correct local time