Chapter 7: Scheduling and Timetables

Haiyue
12min

Chapter 7: Scheduling and Timetables

Learning Objectives
  • Master cron expressions and preset schedules
  • Use Timetable API for custom scheduling logic
  • Understand data intervals and logical dates
  • Configure catchup, backfill, and max_active_runs

Knowledge Points

7.1 Schedule Presets

Airflow provides convenient preset schedule values:

from airflow.decorators import dag
from datetime import datetime

# Using preset schedules
@dag(schedule="@daily")      # Runs daily at midnight
def daily_dag(): ...

@dag(schedule="@hourly")     # Runs at the start of every hour
def hourly_dag(): ...

@dag(schedule="@weekly")     # Runs every Sunday at midnight
def weekly_dag(): ...

@dag(schedule="@monthly")    # Runs on the 1st of every month
def monthly_dag(): ...

@dag(schedule="@yearly")     # Runs on January 1st
def yearly_dag(): ...

@dag(schedule=None)          # Manual trigger only
def manual_dag(): ...

7.2 Cron Expressions

"""
File: dags/cron_schedules.py
"""
from airflow.decorators import dag, task
from datetime import datetime

# Cron format: minute hour day_of_month month day_of_week
# ┌───────────── minute (0-59)
# │ ┌───────────── hour (0-23)
# │ │ ┌───────────── day of month (1-31)
# │ │ │ ┌───────────── month (1-12)
# │ │ │ │ ┌───────────── day of week (0-6, Sunday=0)
# * * * * *

@dag(
    dag_id="business_hours_report",
    start_date=datetime(2026, 1, 1),
    schedule="0 9 * * 1-5",  # 9 AM, Monday to Friday
    catchup=False,
    tags=["scheduling"],
)
def business_hours_report():
    @task
    def generate_report():
        print("Generating business hours report...")
    generate_report()

business_hours_report()


@dag(
    dag_id="quarterly_report",
    start_date=datetime(2026, 1, 1),
    schedule="0 0 1 */3 *",  # 1st day of every 3rd month
    catchup=False,
    tags=["scheduling"],
)
def quarterly_report():
    @task
    def generate_quarterly():
        print("Generating quarterly report...")
    generate_quarterly()

quarterly_report()

Common Cron Patterns:

ExpressionDescription
0 * * * *Every hour at minute 0
*/15 * * * *Every 15 minutes
0 6 * * *Every day at 6
AM
0 9 * * 1-5Weekdays at 9
AM
0 0 1 * *First of every month at midnight
0 0 * * 0Every Sunday at midnight
30 14 1,15 * *1st and 15th at 2
PM

7.3 Data Intervals and Logical Dates

In Airflow 3.x, scheduling is based on data intervals rather than execution dates.

"""
File: dags/data_intervals_demo.py
"""
from airflow.decorators import dag, task
from datetime import datetime

@dag(
    dag_id="data_intervals_demo",
    start_date=datetime(2026, 1, 1),
    schedule="@daily",
    catchup=False,
    tags=["scheduling", "intervals"],
)
def data_intervals_demo():

    @task
    def show_intervals(**context):
        """Display the data interval for this run."""
        # The time period this run covers
        data_interval_start = context["data_interval_start"]
        data_interval_end = context["data_interval_end"]

        # Logical date (= data_interval_start for regular schedules)
        logical_date = context["logical_date"]

        # Template variables
        ds = context["ds"]           # YYYY-MM-DD
        ds_nodash = context["ds_nodash"]  # YYYYMMDD

        print(f"Logical date:         {logical_date}")
        print(f"Data interval start:  {data_interval_start}")
        print(f"Data interval end:    {data_interval_end}")
        print(f"ds:                   {ds}")
        print(f"ds_nodash:            {ds_nodash}")

        # For a @daily DAG running on 2026-01-15:
        # logical_date:         2026-01-15T00:00:00+00:00
        # data_interval_start:  2026-01-15T00:00:00+00:00
        # data_interval_end:    2026-01-16T00:00:00+00:00

    @task
    def query_with_interval(**context):
        """Use data interval to query the correct data range."""
        start = context["data_interval_start"].isoformat()
        end = context["data_interval_end"].isoformat()

        query = f"""
        SELECT *
        FROM events
        WHERE event_time >= '{start}'
          AND event_time < '{end}'
        """
        print(f"Running query:\n{query}")

    show_intervals()
    query_with_interval()

data_intervals_demo()

7.4 Catchup and Backfill

"""
File: dags/catchup_demo.py
"""
from airflow.decorators import dag, task
from datetime import datetime

# With catchup=True: Airflow will create runs for all missed intervals
@dag(
    dag_id="catchup_enabled",
    start_date=datetime(2026, 1, 1),
    schedule="@daily",
    catchup=True,    # Will backfill from start_date to now
    max_active_runs=3,  # Limit concurrent runs
    tags=["scheduling", "catchup"],
)
def catchup_enabled():
    @task
    def process(**context):
        ds = context["ds"]
        print(f"Processing data for {ds}")
    process()

catchup_enabled()

# With catchup=False: Only the latest interval is triggered
@dag(
    dag_id="catchup_disabled",
    start_date=datetime(2026, 1, 1),
    schedule="@daily",
    catchup=False,   # Only run the most recent interval
    tags=["scheduling", "catchup"],
)
def catchup_disabled():
    @task
    def process(**context):
        ds = context["ds"]
        print(f"Processing data for {ds}")
    process()

catchup_disabled()
# Manual backfill via CLI
airflow dags backfill \
    --start-date 2026-01-01 \
    --end-date 2026-01-31 \
    catchup_disabled

7.5 Timetable API (Custom Schedules)

For schedules that cron cannot express, use the Timetable API:

"""
File: plugins/business_day_timetable.py
A custom timetable that only schedules on business days.
"""
from datetime import timedelta
from pendulum import DateTime, Time, timezone
from airflow.timetables.base import DagRunInfo, DataInterval, TimeRestriction, Timetable


class BusinessDayTimetable(Timetable):
    """Schedule DAG runs only on business days (Mon-Fri)."""

    description = "Runs on business days at 9:00 AM UTC"

    def __init__(self, run_time: Time = Time(9, 0)):
        self.run_time = run_time

    def next_dagrun_info(
        self,
        *,
        last_automated_data_interval: DataInterval | None,
        restriction: TimeRestriction,
    ) -> DagRunInfo | None:
        utc = timezone("UTC")

        if last_automated_data_interval is None:
            # First run
            next_start = restriction.earliest
            if next_start is None:
                return None
            next_start = next_start.set(
                hour=self.run_time.hour,
                minute=self.run_time.minute,
                second=0,
            )
        else:
            next_start = last_automated_data_interval.end + timedelta(days=1)

        # Skip weekends
        while next_start.weekday() >= 5:  # Saturday=5, Sunday=6
            next_start = next_start + timedelta(days=1)

        next_start = next_start.set(
            hour=self.run_time.hour,
            minute=self.run_time.minute,
            second=0,
        )

        if restriction.latest is not None and next_start > restriction.latest:
            return None

        next_end = next_start + timedelta(days=1)
        # Skip to next business day for the end
        while next_end.weekday() >= 5:
            next_end = next_end + timedelta(days=1)

        return DagRunInfo.interval(
            start=next_start,
            end=next_end,
        )

    def serialize(self):
        return {"run_time": self.run_time.isoformat()}

    @classmethod
    def deserialize(cls, data):
        return cls(run_time=Time.fromisoformat(data["run_time"]))
"""
File: dags/business_day_dag.py
Use the custom timetable.
"""
from airflow.decorators import dag, task
from datetime import datetime

# Import the custom timetable from plugins
# from business_day_timetable import BusinessDayTimetable

@dag(
    dag_id="business_day_dag",
    start_date=datetime(2026, 1, 1),
    # schedule=BusinessDayTimetable(),  # Use custom timetable
    schedule="0 9 * * 1-5",  # Fallback: cron equivalent (simpler)
    catchup=False,
    tags=["scheduling", "timetable"],
)
def business_day_dag():
    @task
    def morning_report(**context):
        print(f"Business day report for {context['ds']}")
    morning_report()

business_day_dag()

7.6 Timezone Configuration

"""
File: dags/timezone_demo.py
"""
from airflow.decorators import dag, task
from datetime import datetime
import pendulum

# Use timezone-aware start_date
local_tz = pendulum.timezone("America/New_York")

@dag(
    dag_id="timezone_demo",
    start_date=datetime(2026, 1, 1, tzinfo=local_tz),
    schedule="0 9 * * *",  # 9 AM New York time
    catchup=False,
    tags=["scheduling", "timezone"],
)
def timezone_demo():
    @task
    def check_timezone(**context):
        logical_date = context["logical_date"]
        print(f"Logical date: {logical_date}")
        print(f"Timezone: {logical_date.timezone_name}")
    check_timezone()

timezone_demo()

Practice Exercise

Exercise: Multi-Schedule Pipeline

"""
File: dags/multi_schedule.py
Create a set of DAGs with different schedules:
1. Real-time monitoring (every 5 minutes)
2. Hourly aggregation
3. Daily summary report
"""
from airflow.decorators import dag, task
from datetime import datetime

@dag(
    dag_id="monitor_5min",
    start_date=datetime(2026, 1, 1),
    schedule="*/5 * * * *",
    catchup=False,
    max_active_runs=1,
    tags=["exercise", "monitoring"],
)
def monitor_5min():
    @task
    def check_metrics():
        import random
        metrics = {
            "cpu": round(random.uniform(10, 95), 1),
            "memory": round(random.uniform(30, 85), 1),
            "latency_ms": round(random.uniform(5, 200), 1),
        }
        print(f"Metrics: {metrics}")
        if metrics["cpu"] > 90 or metrics["latency_ms"] > 150:
            print("ALERT: Threshold exceeded!")
        return metrics
    check_metrics()

monitor_5min()


@dag(
    dag_id="hourly_aggregation",
    start_date=datetime(2026, 1, 1),
    schedule="@hourly",
    catchup=False,
    tags=["exercise", "aggregation"],
)
def hourly_aggregation():
    @task
    def aggregate(**context):
        start = context["data_interval_start"]
        end = context["data_interval_end"]
        print(f"Aggregating metrics from {start} to {end}")
        aggregated = {
            "avg_cpu": 45.3,
            "max_cpu": 92.1,
            "avg_latency": 35.6,
            "p99_latency": 180.2,
            "data_points": 12,
        }
        print(f"Aggregated: {aggregated}")
        return aggregated
    aggregate()

hourly_aggregation()


@dag(
    dag_id="daily_summary",
    start_date=datetime(2026, 1, 1),
    schedule="0 23 * * *",  # 11 PM daily
    catchup=False,
    tags=["exercise", "reporting"],
)
def daily_summary():
    @task
    def generate_summary(**context):
        ds = context["ds"]
        summary = {
            "date": ds,
            "total_alerts": 3,
            "avg_cpu": 42.1,
            "peak_cpu_hour": "14:00",
            "avg_latency": 28.5,
            "uptime_percent": 99.97,
        }
        print(f"Daily Summary for {ds}:")
        for key, value in summary.items():
            print(f"  {key}: {value}")
    generate_summary()

daily_summary()

Summary

In this chapter, you learned:

  • Airflow supports cron expressions and convenient preset schedules (@daily, @hourly, etc.)
  • Data intervals define the time period each DAG run covers
  • Catchup controls whether missed intervals are backfilled automatically
  • The Timetable API enables custom scheduling logic beyond cron capabilities
  • Timezone-aware scheduling ensures DAGs run at the correct local time