Chapter 6: XCom and Data Passing
Haiyue
13min
Chapter 6: XCom and Data Passing
Learning Objectives
- Understand XCom for inter-task communication
- Push and pull XCom values manually and automatically
- Use TaskFlow API for implicit XCom passing
- Handle large data with custom XCom backends
Knowledge Points
6.1 What is XCom?
XCom (Cross-Communication) is Airflow’s mechanism for tasks to exchange small amounts of data. When a task returns a value, it is automatically pushed to XCom and can be pulled by downstream tasks.
"""
File: dags/xcom_basics.py
"""
from airflow.decorators import dag, task
from datetime import datetime
@dag(
dag_id="xcom_basics",
start_date=datetime(2026, 1, 1),
schedule="@daily",
catchup=False,
tags=["xcom"],
)
def xcom_basics():
@task
def produce_data():
"""Return value is automatically pushed to XCom."""
result = {"message": "Hello from XCom!", "count": 42}
return result
@task
def consume_data(data: dict):
"""TaskFlow automatically pulls XCom from upstream."""
print(f"Received: {data['message']}")
print(f"Count: {data['count']}")
# Data flows implicitly through function arguments
data = produce_data()
consume_data(data)
xcom_basics()
6.2 Manual XCom Push and Pull
"""
File: dags/xcom_manual.py
"""
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
def push_multiple_values(**context):
"""Push multiple XCom values with different keys."""
ti = context["ti"]
# Default return value (key="return_value")
ti.xcom_push(key="user_count", value=150)
ti.xcom_push(key="order_count", value=320)
ti.xcom_push(key="status", value="healthy")
# Return value is also pushed automatically
return {"summary": "all systems operational"}
def pull_values(**context):
"""Pull XCom values from upstream tasks."""
ti = context["ti"]
# Pull specific key from a specific task
user_count = ti.xcom_pull(task_ids="push_task", key="user_count")
order_count = ti.xcom_pull(task_ids="push_task", key="order_count")
status = ti.xcom_pull(task_ids="push_task", key="status")
# Pull the return value (default key)
summary = ti.xcom_pull(task_ids="push_task")
print(f"Users: {user_count}, Orders: {order_count}")
print(f"Status: {status}")
print(f"Summary: {summary}")
def pull_from_multiple(**context):
"""Pull XCom from multiple upstream tasks."""
ti = context["ti"]
# Pull from multiple tasks at once
all_counts = ti.xcom_pull(
task_ids=["push_task"],
key="user_count",
)
print(f"All user counts: {all_counts}")
with DAG(
dag_id="xcom_manual",
start_date=datetime(2026, 1, 1),
schedule="@daily",
catchup=False,
tags=["xcom"],
) as dag:
push = PythonOperator(
task_id="push_task",
python_callable=push_multiple_values,
)
pull = PythonOperator(
task_id="pull_task",
python_callable=pull_values,
)
pull_multi = PythonOperator(
task_id="pull_multi_task",
python_callable=pull_from_multiple,
)
push >> [pull, pull_multi]
6.3 XCom with Jinja Templates
"""
File: dags/xcom_templates.py
"""
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from datetime import datetime
def generate_filename(**context):
return f"report_{context['ds_nodash']}.csv"
with DAG(
dag_id="xcom_templates",
start_date=datetime(2026, 1, 1),
schedule="@daily",
catchup=False,
tags=["xcom", "templates"],
) as dag:
create_filename = PythonOperator(
task_id="create_filename",
python_callable=generate_filename,
)
# Pull XCom using Jinja template
use_filename = BashOperator(
task_id="use_filename",
bash_command=(
'echo "Processing file: '
'{{ ti.xcom_pull(task_ids=\'create_filename\') }}"'
),
)
# Access in another operator
archive_file = BashOperator(
task_id="archive_file",
bash_command=(
"echo 'Archiving: "
"{{ ti.xcom_pull(task_ids=\"create_filename\") }}'"
),
)
create_filename >> use_filename >> archive_file
6.4 Returning Multiple Outputs with TaskFlow
"""
File: dags/xcom_multiple_outputs.py
"""
from airflow.decorators import dag, task
from datetime import datetime
@dag(
dag_id="xcom_multiple_outputs",
start_date=datetime(2026, 1, 1),
schedule="@daily",
catchup=False,
tags=["xcom", "taskflow"],
)
def xcom_multiple_outputs():
@task(multiple_outputs=True)
def get_user_info():
"""Each dict key becomes a separate XCom entry."""
return {
"name": "Alice",
"email": "alice@example.com",
"role": "admin",
"active": True,
}
@task
def send_welcome_email(name: str, email: str):
"""Access individual dict values as separate arguments."""
print(f"Sending welcome email to {name} at {email}")
@task
def assign_permissions(name: str, role: str):
"""Use different keys from the same upstream task."""
print(f"Assigning {role} permissions to {name}")
@task
def log_activity(name: str, active: bool):
"""Log user activity status."""
status = "active" if active else "inactive"
print(f"User {name} is {status}")
user_info = get_user_info()
send_welcome_email(user_info["name"], user_info["email"])
assign_permissions(user_info["name"], user_info["role"])
log_activity(user_info["name"], user_info["active"])
xcom_multiple_outputs()
6.5 XCom Limitations and Best Practices
"""
File: dags/xcom_best_practices.py
Demonstrates XCom best practices and handling large data.
"""
from airflow.decorators import dag, task
from datetime import datetime
import json
@dag(
dag_id="xcom_best_practices",
start_date=datetime(2026, 1, 1),
schedule="@daily",
catchup=False,
tags=["xcom", "best-practices"],
)
def xcom_best_practices():
# GOOD: Pass references, not large data
@task
def process_large_dataset():
"""Write large data to file, return file path via XCom."""
data = [{"id": i, "value": i * 10} for i in range(10000)]
output_path = "/tmp/airflow_data/processed_data.json"
with open(output_path, "w") as f:
json.dump(data, f)
# Return the file path, not the data itself
return {
"file_path": output_path,
"record_count": len(data),
"file_size_mb": 0.5,
}
@task
def load_from_file(metadata: dict):
"""Read data using the file path from XCom."""
print(f"Loading {metadata['record_count']} records from {metadata['file_path']}")
# In practice: read from the file path
print(f"File size: {metadata['file_size_mb']} MB")
# GOOD: Pass small, serializable data
@task
def get_config():
"""Small configuration data is fine for XCom."""
return {
"batch_size": 100,
"target_table": "analytics.events",
"mode": "append",
}
@task
def use_config(config: dict):
print(f"Using batch_size={config['batch_size']}")
print(f"Target: {config['target_table']}")
print(f"Mode: {config['mode']}")
metadata = process_large_dataset()
load_from_file(metadata)
config = get_config()
use_config(config)
xcom_best_practices()
XCom Best Practices:
| Do | Don’t |
|---|---|
| Pass small metadata (paths, counts, IDs) | Pass large datasets directly |
| Use file paths or object storage URIs | Store binary data in XCom |
| Keep XCom values JSON-serializable | Pass non-serializable objects |
Use multiple_outputs=True for dict values | Overload a single XCom with too much data |
| Clean up old XCom entries periodically | Let XCom table grow unbounded |
6.6 Custom XCom Backends
For handling larger data, Airflow supports custom XCom backends:
# In airflow.cfg or environment variable:
# AIRFLOW__CORE__XCOM_BACKEND=my_package.custom_xcom.S3XComBackend
# Example: Custom S3 XCom Backend
from airflow.models.xcom import BaseXCom
import json
class S3XComBackend(BaseXCom):
"""Store XCom values in S3 for large data."""
@staticmethod
def serialize_value(value, key=None, task_id=None, dag_id=None, run_id=None, map_index=None):
"""Serialize and store in S3 if data is large."""
serialized = json.dumps(value)
if len(serialized) > 48000: # If larger than 48KB
# Upload to S3 and return reference
s3_key = f"xcom/{dag_id}/{run_id}/{task_id}/{key}.json"
# boto3.client('s3').put_object(...)
return BaseXCom.serialize_value(f"s3://my-bucket/{s3_key}")
return BaseXCom.serialize_value(value)
@staticmethod
def deserialize_value(result):
"""Deserialize, fetching from S3 if needed."""
value = BaseXCom.deserialize_value(result)
if isinstance(value, str) and value.startswith("s3://"):
# Download from S3 and deserialize
pass
return value
Practice Exercise
Exercise: Build a Data Pipeline with XCom
"""
File: dags/xcom_pipeline_exercise.py
Build a pipeline where:
1. Multiple sources push data independently
2. An aggregator pulls data from all sources
3. A reporter generates a summary
4. Results are saved with metadata passed via XCom
"""
from airflow.decorators import dag, task
from datetime import datetime
@dag(
dag_id="xcom_pipeline_exercise",
start_date=datetime(2026, 1, 1),
schedule="@daily",
catchup=False,
tags=["exercise", "xcom"],
)
def xcom_pipeline_exercise():
@task(multiple_outputs=True)
def fetch_sales_data():
return {
"source": "sales_db",
"revenue": 125000,
"transactions": 450,
"avg_order": 277.78,
}
@task(multiple_outputs=True)
def fetch_web_analytics():
return {
"source": "google_analytics",
"page_views": 50000,
"unique_visitors": 12000,
"bounce_rate": 35.2,
}
@task(multiple_outputs=True)
def fetch_support_metrics():
return {
"source": "zendesk",
"tickets_opened": 45,
"tickets_resolved": 52,
"avg_response_hours": 2.3,
}
@task
def build_dashboard(
revenue: float,
transactions: int,
page_views: int,
unique_visitors: int,
tickets_opened: int,
tickets_resolved: int,
):
conversion_rate = (transactions / unique_visitors) * 100
dashboard = {
"revenue": f"${revenue:,.2f}",
"transactions": transactions,
"page_views": f"{page_views:,}",
"conversion_rate": f"{conversion_rate:.2f}%",
"support_backlog": tickets_opened - tickets_resolved,
}
for key, value in dashboard.items():
print(f" {key}: {value}")
return dashboard
@task
def save_dashboard(dashboard: dict):
output_path = "/tmp/airflow_data/dashboard.json"
print(f"Dashboard saved to {output_path}")
print(f"Metrics: {len(dashboard)} KPIs generated")
sales = fetch_sales_data()
web = fetch_web_analytics()
support = fetch_support_metrics()
dashboard = build_dashboard(
revenue=sales["revenue"],
transactions=sales["transactions"],
page_views=web["page_views"],
unique_visitors=web["unique_visitors"],
tickets_opened=support["tickets_opened"],
tickets_resolved=support["tickets_resolved"],
)
save_dashboard(dashboard)
xcom_pipeline_exercise()
Summary
In this chapter, you learned:
- XCom enables inter-task communication by pushing and pulling small values
- TaskFlow API automatically handles XCom with function arguments and return values
- Use
multiple_outputs=Trueto access individual dictionary keys as separate XComs - XCom values should be small and JSON-serializable; use file paths for large data
- Custom XCom backends (S3, GCS) can handle larger payloads transparently