Chapter 6: XCom and Data Passing

Haiyue
13min

Chapter 6: XCom and Data Passing

Learning Objectives
  • Understand XCom for inter-task communication
  • Push and pull XCom values manually and automatically
  • Use TaskFlow API for implicit XCom passing
  • Handle large data with custom XCom backends

Knowledge Points

6.1 What is XCom?

XCom (Cross-Communication) is Airflow’s mechanism for tasks to exchange small amounts of data. When a task returns a value, it is automatically pushed to XCom and can be pulled by downstream tasks.

"""
File: dags/xcom_basics.py
"""
from airflow.decorators import dag, task
from datetime import datetime

@dag(
    dag_id="xcom_basics",
    start_date=datetime(2026, 1, 1),
    schedule="@daily",
    catchup=False,
    tags=["xcom"],
)
def xcom_basics():

    @task
    def produce_data():
        """Return value is automatically pushed to XCom."""
        result = {"message": "Hello from XCom!", "count": 42}
        return result

    @task
    def consume_data(data: dict):
        """TaskFlow automatically pulls XCom from upstream."""
        print(f"Received: {data['message']}")
        print(f"Count: {data['count']}")

    # Data flows implicitly through function arguments
    data = produce_data()
    consume_data(data)

xcom_basics()

6.2 Manual XCom Push and Pull

"""
File: dags/xcom_manual.py
"""
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

def push_multiple_values(**context):
    """Push multiple XCom values with different keys."""
    ti = context["ti"]

    # Default return value (key="return_value")
    ti.xcom_push(key="user_count", value=150)
    ti.xcom_push(key="order_count", value=320)
    ti.xcom_push(key="status", value="healthy")

    # Return value is also pushed automatically
    return {"summary": "all systems operational"}

def pull_values(**context):
    """Pull XCom values from upstream tasks."""
    ti = context["ti"]

    # Pull specific key from a specific task
    user_count = ti.xcom_pull(task_ids="push_task", key="user_count")
    order_count = ti.xcom_pull(task_ids="push_task", key="order_count")
    status = ti.xcom_pull(task_ids="push_task", key="status")

    # Pull the return value (default key)
    summary = ti.xcom_pull(task_ids="push_task")

    print(f"Users: {user_count}, Orders: {order_count}")
    print(f"Status: {status}")
    print(f"Summary: {summary}")

def pull_from_multiple(**context):
    """Pull XCom from multiple upstream tasks."""
    ti = context["ti"]

    # Pull from multiple tasks at once
    all_counts = ti.xcom_pull(
        task_ids=["push_task"],
        key="user_count",
    )
    print(f"All user counts: {all_counts}")

with DAG(
    dag_id="xcom_manual",
    start_date=datetime(2026, 1, 1),
    schedule="@daily",
    catchup=False,
    tags=["xcom"],
) as dag:

    push = PythonOperator(
        task_id="push_task",
        python_callable=push_multiple_values,
    )

    pull = PythonOperator(
        task_id="pull_task",
        python_callable=pull_values,
    )

    pull_multi = PythonOperator(
        task_id="pull_multi_task",
        python_callable=pull_from_multiple,
    )

    push >> [pull, pull_multi]

6.3 XCom with Jinja Templates

"""
File: dags/xcom_templates.py
"""
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from datetime import datetime

def generate_filename(**context):
    return f"report_{context['ds_nodash']}.csv"

with DAG(
    dag_id="xcom_templates",
    start_date=datetime(2026, 1, 1),
    schedule="@daily",
    catchup=False,
    tags=["xcom", "templates"],
) as dag:

    create_filename = PythonOperator(
        task_id="create_filename",
        python_callable=generate_filename,
    )

    # Pull XCom using Jinja template
    use_filename = BashOperator(
        task_id="use_filename",
        bash_command=(
            'echo "Processing file: '
            '{{ ti.xcom_pull(task_ids=\'create_filename\') }}"'
        ),
    )

    # Access in another operator
    archive_file = BashOperator(
        task_id="archive_file",
        bash_command=(
            "echo 'Archiving: "
            "{{ ti.xcom_pull(task_ids=\"create_filename\") }}'"
        ),
    )

    create_filename >> use_filename >> archive_file

6.4 Returning Multiple Outputs with TaskFlow

"""
File: dags/xcom_multiple_outputs.py
"""
from airflow.decorators import dag, task
from datetime import datetime

@dag(
    dag_id="xcom_multiple_outputs",
    start_date=datetime(2026, 1, 1),
    schedule="@daily",
    catchup=False,
    tags=["xcom", "taskflow"],
)
def xcom_multiple_outputs():

    @task(multiple_outputs=True)
    def get_user_info():
        """Each dict key becomes a separate XCom entry."""
        return {
            "name": "Alice",
            "email": "alice@example.com",
            "role": "admin",
            "active": True,
        }

    @task
    def send_welcome_email(name: str, email: str):
        """Access individual dict values as separate arguments."""
        print(f"Sending welcome email to {name} at {email}")

    @task
    def assign_permissions(name: str, role: str):
        """Use different keys from the same upstream task."""
        print(f"Assigning {role} permissions to {name}")

    @task
    def log_activity(name: str, active: bool):
        """Log user activity status."""
        status = "active" if active else "inactive"
        print(f"User {name} is {status}")

    user_info = get_user_info()
    send_welcome_email(user_info["name"], user_info["email"])
    assign_permissions(user_info["name"], user_info["role"])
    log_activity(user_info["name"], user_info["active"])

xcom_multiple_outputs()

6.5 XCom Limitations and Best Practices

"""
File: dags/xcom_best_practices.py
Demonstrates XCom best practices and handling large data.
"""
from airflow.decorators import dag, task
from datetime import datetime
import json

@dag(
    dag_id="xcom_best_practices",
    start_date=datetime(2026, 1, 1),
    schedule="@daily",
    catchup=False,
    tags=["xcom", "best-practices"],
)
def xcom_best_practices():

    # GOOD: Pass references, not large data
    @task
    def process_large_dataset():
        """Write large data to file, return file path via XCom."""
        data = [{"id": i, "value": i * 10} for i in range(10000)]
        output_path = "/tmp/airflow_data/processed_data.json"

        with open(output_path, "w") as f:
            json.dump(data, f)

        # Return the file path, not the data itself
        return {
            "file_path": output_path,
            "record_count": len(data),
            "file_size_mb": 0.5,
        }

    @task
    def load_from_file(metadata: dict):
        """Read data using the file path from XCom."""
        print(f"Loading {metadata['record_count']} records from {metadata['file_path']}")
        # In practice: read from the file path
        print(f"File size: {metadata['file_size_mb']} MB")

    # GOOD: Pass small, serializable data
    @task
    def get_config():
        """Small configuration data is fine for XCom."""
        return {
            "batch_size": 100,
            "target_table": "analytics.events",
            "mode": "append",
        }

    @task
    def use_config(config: dict):
        print(f"Using batch_size={config['batch_size']}")
        print(f"Target: {config['target_table']}")
        print(f"Mode: {config['mode']}")

    metadata = process_large_dataset()
    load_from_file(metadata)

    config = get_config()
    use_config(config)

xcom_best_practices()

XCom Best Practices:

DoDon’t
Pass small metadata (paths, counts, IDs)Pass large datasets directly
Use file paths or object storage URIsStore binary data in XCom
Keep XCom values JSON-serializablePass non-serializable objects
Use multiple_outputs=True for dict valuesOverload a single XCom with too much data
Clean up old XCom entries periodicallyLet XCom table grow unbounded

6.6 Custom XCom Backends

For handling larger data, Airflow supports custom XCom backends:

# In airflow.cfg or environment variable:
# AIRFLOW__CORE__XCOM_BACKEND=my_package.custom_xcom.S3XComBackend

# Example: Custom S3 XCom Backend
from airflow.models.xcom import BaseXCom
import json

class S3XComBackend(BaseXCom):
    """Store XCom values in S3 for large data."""

    @staticmethod
    def serialize_value(value, key=None, task_id=None, dag_id=None, run_id=None, map_index=None):
        """Serialize and store in S3 if data is large."""
        serialized = json.dumps(value)
        if len(serialized) > 48000:  # If larger than 48KB
            # Upload to S3 and return reference
            s3_key = f"xcom/{dag_id}/{run_id}/{task_id}/{key}.json"
            # boto3.client('s3').put_object(...)
            return BaseXCom.serialize_value(f"s3://my-bucket/{s3_key}")
        return BaseXCom.serialize_value(value)

    @staticmethod
    def deserialize_value(result):
        """Deserialize, fetching from S3 if needed."""
        value = BaseXCom.deserialize_value(result)
        if isinstance(value, str) and value.startswith("s3://"):
            # Download from S3 and deserialize
            pass
        return value

Practice Exercise

Exercise: Build a Data Pipeline with XCom

"""
File: dags/xcom_pipeline_exercise.py
Build a pipeline where:
1. Multiple sources push data independently
2. An aggregator pulls data from all sources
3. A reporter generates a summary
4. Results are saved with metadata passed via XCom
"""
from airflow.decorators import dag, task
from datetime import datetime

@dag(
    dag_id="xcom_pipeline_exercise",
    start_date=datetime(2026, 1, 1),
    schedule="@daily",
    catchup=False,
    tags=["exercise", "xcom"],
)
def xcom_pipeline_exercise():

    @task(multiple_outputs=True)
    def fetch_sales_data():
        return {
            "source": "sales_db",
            "revenue": 125000,
            "transactions": 450,
            "avg_order": 277.78,
        }

    @task(multiple_outputs=True)
    def fetch_web_analytics():
        return {
            "source": "google_analytics",
            "page_views": 50000,
            "unique_visitors": 12000,
            "bounce_rate": 35.2,
        }

    @task(multiple_outputs=True)
    def fetch_support_metrics():
        return {
            "source": "zendesk",
            "tickets_opened": 45,
            "tickets_resolved": 52,
            "avg_response_hours": 2.3,
        }

    @task
    def build_dashboard(
        revenue: float,
        transactions: int,
        page_views: int,
        unique_visitors: int,
        tickets_opened: int,
        tickets_resolved: int,
    ):
        conversion_rate = (transactions / unique_visitors) * 100
        dashboard = {
            "revenue": f"${revenue:,.2f}",
            "transactions": transactions,
            "page_views": f"{page_views:,}",
            "conversion_rate": f"{conversion_rate:.2f}%",
            "support_backlog": tickets_opened - tickets_resolved,
        }
        for key, value in dashboard.items():
            print(f"  {key}: {value}")
        return dashboard

    @task
    def save_dashboard(dashboard: dict):
        output_path = "/tmp/airflow_data/dashboard.json"
        print(f"Dashboard saved to {output_path}")
        print(f"Metrics: {len(dashboard)} KPIs generated")

    sales = fetch_sales_data()
    web = fetch_web_analytics()
    support = fetch_support_metrics()

    dashboard = build_dashboard(
        revenue=sales["revenue"],
        transactions=sales["transactions"],
        page_views=web["page_views"],
        unique_visitors=web["unique_visitors"],
        tickets_opened=support["tickets_opened"],
        tickets_resolved=support["tickets_resolved"],
    )
    save_dashboard(dashboard)

xcom_pipeline_exercise()

Summary

In this chapter, you learned:

  • XCom enables inter-task communication by pushing and pulling small values
  • TaskFlow API automatically handles XCom with function arguments and return values
  • Use multiple_outputs=True to access individual dictionary keys as separate XComs
  • XCom values should be small and JSON-serializable; use file paths for large data
  • Custom XCom backends (S3, GCS) can handle larger payloads transparently