Chapter 4: Lambda Event Sources and Triggers
9/1/25About 6 min
Chapter 4: Lambda Event Sources and Triggers
Learning Objectives
- Understand various event source types
- Configure API Gateway triggers
- Set up S3 event triggers
- Configure scheduled tasks (CloudWatch Events)
- Understand event data structures
Knowledge Points
Lambda Event Sources Overview
Lambda functions can be triggered by various AWS services, forming an event-driven architecture:
Event Source Categories
Type | Event Source | Invocation Model | Features |
---|---|---|---|
Synchronous | API Gateway, ALB | Request-Response | Real-time response, has a timeout limit |
Asynchronous | S3, SNS, EventBridge | Event-driven | No immediate response needed, supports retries |
Stream Processing | Kinesis, DynamoDB Streams | Polling | Batch processing, ordered processing |
Pull Model | SQS | Polling | Lambda actively pulls messages |
Example Code
1. API Gateway Event Handling
import json
import logging
from urllib.parse import parse_qs
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def lambda_handler(event, context):
"""
Handle API Gateway events
API Gateway event structure:
- httpMethod: HTTP method
- path: Request path
- headers: Request headers
- queryStringParameters: Query parameters
- body: Request body
- requestContext: Request context
"""
logger.info(f"API Gateway event: {json.dumps(event)}")
# Extract request information
http_method = event.get('httpMethod')
path = event.get('path')
headers = event.get('headers', {})
query_params = event.get('queryStringParameters') or {}
body = event.get('body')
# Handle different HTTP methods
if http_method == 'GET':
return handle_get_request(query_params, headers, context)
elif http_method == 'POST':
return handle_post_request(body, headers, context)
elif http_method == 'PUT':
return handle_put_request(body, headers, context)
elif http_method == 'DELETE':
return handle_delete_request(query_params, headers, context)
else:
return {
'statusCode': 405,
'headers': {
'Content-Type': 'application/json',
'Access-Control-Allow-Origin': '*'
},
'body': json.dumps({
'error': f'Method {http_method} not allowed'
})
}
def handle_get_request(query_params, headers, context):
"""Handle a GET request"""
# Get query parameters
user_id = query_params.get('userId')
limit = int(query_params.get('limit', 10))
return {
'statusCode': 200,
'headers': {
'Content-Type': 'application/json',
'Access-Control-Allow-Origin': '*'
},
'body': json.dumps({
'message': 'GET request processed',
'userId': user_id,
'limit': limit,
'requestId': context.aws_request_id
})
}
def handle_post_request(body, headers, context):
"""Handle a POST request"""
try:
# Parse the JSON request body
if body:
data = json.loads(body)
else:
data = {}
# Process business logic
result = {
'id': context.aws_request_id,
'data': data,
'created_at': context.aws_request_id
}
return {
'statusCode': 201,
'headers': {
'Content-Type': 'application/json',
'Access-Control-Allow-Origin': '*'
},
'body': json.dumps(result)
}
except json.JSONDecodeError:
return {
'statusCode': 400,
'headers': {
'Content-Type': 'application/json',
'Access-Control-Allow-Origin': '*'
},
'body': json.dumps({
'error': 'Invalid JSON in request body'
})
}
def handle_put_request(body, headers, context):
"""Handle a PUT request"""
return {
'statusCode': 200,
'headers': {
'Content-Type': 'application/json',
'Access-Control-Allow-Origin': '*'
},
'body': json.dumps({
'message': 'PUT request processed',
'requestId': context.aws_request_id
})
}
def handle_delete_request(query_params, headers, context):
"""Handle a DELETE request"""
resource_id = query_params.get('id')
if not resource_id:
return {
'statusCode': 400,
'headers': {
'Content-Type': 'application/json',
'Access-Control-Allow-Origin': '*'
},
'body': json.dumps({
'error': 'Missing required parameter: id'
})
}
return {
'statusCode': 204,
'headers': {
'Access-Control-Allow-Origin': '*'
},
'body': ''
}
2. S3 Event Handling
import json
import boto3
from urllib.parse import unquote_plus
# Initialize AWS clients
s3_client = boto3.client('s3')
rekognition_client = boto3.client('rekognition')
def lambda_handler(event, context):
"""
Handle S3 events, such as file uploads, deletions, etc.
S3 event structure:
- Records: An array of event records
- eventSource: aws:s3
- eventName: The event type
- s3.bucket: Bucket information
- s3.object: Object information
"""
logger.info(f"S3 event: {json.dumps(event)}")
results = []
# Process each S3 event record
for record in event['Records']:
# Extract event information
event_source = record['eventSource']
event_name = record['eventName']
if event_source == 'aws:s3':
result = process_s3_record(record, context)
results.append(result)
return {
'statusCode': 200,
'body': json.dumps({
'processed_records': len(results),
'results': results
})
}
def process_s3_record(record, context):
"""
Process a single S3 record
"""
# Extract S3 information
bucket_name = record['s3']['bucket']['name']
object_key = unquote_plus(record['s3']['object']['key'])
event_name = record['eventName']
object_size = record['s3']['object']['size']
logger.info(f"Processing {event_name} for {object_key} in bucket {bucket_name}")
# Process based on the event type
if event_name.startswith('ObjectCreated'):
return handle_object_created(bucket_name, object_key, object_size, context)
elif event_name.startswith('ObjectRemoved'):
return handle_object_removed(bucket_name, object_key, context)
else:
return {
'status': 'ignored',
'event_name': event_name,
'object_key': object_key
}
def handle_object_created(bucket_name, object_key, object_size, context):
"""
Handle an object creation event
"""
try:
# Check the file type
if object_key.lower().endswith(('.jpg', '.jpeg', '.png')):
# Process an image file
return process_image_file(bucket_name, object_key, context)
elif object_key.lower().endswith(('.mp4', '.avi', '.mov')):
# Process a video file
return process_video_file(bucket_name, object_key, context)
else:
# Process other files
return {
'status': 'processed',
'file_type': 'other',
'object_key': object_key,
'size': object_size
}
except Exception as e:
logger.error(f"Error processing {object_key}: {str(e)}")
return {
'status': 'error',
'error': str(e),
'object_key': object_key
}
def process_image_file(bucket_name, object_key, context):
"""
Process an image file - analyze with Rekognition
"""
try:
# Use Rekognition to detect labels
response = rekognition_client.detect_labels(
Image={
'S3Object': {
'Bucket': bucket_name,
'Name': object_key
}
},
MaxLabels=10,
MinConfidence=80
)
# Extract labels
labels = [{
'name': label['Name'],
'confidence': label['Confidence']
} for label in response['Labels']]
return {
'status': 'analyzed',
'file_type': 'image',
'object_key': object_key,
'labels': labels
}
except Exception as e:
logger.error(f"Error analyzing image {object_key}: {str(e)}")
return {
'status': 'error',
'error': str(e),
'object_key': object_key
}
def process_video_file(bucket_name, object_key, context):
"""
Process a video file - can trigger other services for processing
"""
# This could trigger Elastic Transcoder or MediaConvert
return {
'status': 'queued_for_processing',
'file_type': 'video',
'object_key': object_key,
'note': 'Video processing job created'
}
def handle_object_removed(bucket_name, object_key, context):
"""
Handle an object deletion event
"""
logger.info(f"Object {object_key} was deleted from bucket {bucket_name}")
# Clean up related resources or records
return {
'status': 'cleaned_up',
'object_key': object_key,
'action': 'metadata_removed'
}
3. CloudWatch Events/EventBridge Scheduled Tasks
import json
import boto3
from datetime import datetime, timezone
import logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
# Initialize AWS service clients
cloudwatch = boto3.client('cloudwatch')
sns = boto3.client('sns')
dynamodb = boto3.resource('dynamodb')
def lambda_handler(event, context):
"""
Handle CloudWatch Events/EventBridge events
Scheduled task event structure:
- source: aws.events
- detail-type: Scheduled Event
- detail: Event details
- time: Event time
"""
logger.info(f"CloudWatch event: {json.dumps(event)}")
# Check the event source
if event.get('source') == 'aws.events':
return handle_scheduled_event(event, context)
else:
return handle_custom_event(event, context)
def handle_scheduled_event(event, context):
"""
Handle a scheduled task event
"""
event_time = datetime.fromisoformat(event['time'].replace('Z', '+00:00'))
detail_type = event.get('detail-type', '')
logger.info(f"Processing scheduled event: {detail_type} at {event_time}")
# Execute scheduled tasks
results = []
try:
# 1. Data backup task
backup_result = perform_data_backup()
results.append(backup_result)
# 2. System health check
health_result = perform_health_check()
results.append(health_result)
# 3. Clean up expired data
cleanup_result = cleanup_expired_data()
results.append(cleanup_result)
# 4. Send a report
report_result = send_daily_report(results)
results.append(report_result)
return {
'statusCode': 200,
'body': json.dumps({
'message': 'Scheduled tasks completed successfully',
'event_time': event_time.isoformat(),
'results': results
})
}
except Exception as e:
logger.error(f"Scheduled task failed: {str(e)}")
# Send a failure notification
send_failure_notification(str(e), event_time)
return {
'statusCode': 500,
'body': json.dumps({
'error': str(e),
'event_time': event_time.isoformat()
})
}
def perform_data_backup():
"""
Perform a data backup
"""
logger.info("Starting data backup...")
# Simulate a backup operation
backup_tables = ['users', 'orders', 'products']
backed_up = []
for table_name in backup_tables:
try:
# This could call the DynamoDB backup API
logger.info(f"Backing up table: {table_name}")
backed_up.append(table_name)
except Exception as e:
logger.error(f"Failed to backup {table_name}: {str(e)}")
return {
'task': 'data_backup',
'status': 'completed',
'backed_up_tables': backed_up
}
def perform_health_check():
"""
Perform a system health check
"""
logger.info("Performing health check...")
health_metrics = []
try:
# Check key metrics
response = cloudwatch.get_metric_statistics(
Namespace='AWS/Lambda',
MetricName='Duration',
Dimensions=[
{
'Name': 'FunctionName',
'Value': context.function_name
}
],
StartTime=datetime.now(timezone.utc).replace(hour=0, minute=0, second=0),
EndTime=datetime.now(timezone.utc),
Period=3600,
Statistics=['Average']
)
health_metrics.append({
'metric': 'lambda_duration',
'status': 'healthy',
'datapoints': len(response['Datapoints'])
})
except Exception as e:
logger.error(f"Health check failed: {str(e)}")
health_metrics.append({
'metric': 'lambda_duration',
'status': 'unhealthy',
'error': str(e)
})
return {
'task': 'health_check',
'status': 'completed',
'metrics': health_metrics
}
def cleanup_expired_data():
"""
Clean up expired data
"""
logger.info("Cleaning up expired data...")
# Simulate a cleanup operation
cleaned_items = 0
try:
# This could query and delete expired records
# table = dynamodb.Table('expired_data')
# response = table.scan(...)
cleaned_items = 42 # Simulate the number of cleaned items
except Exception as e:
logger.error(f"Cleanup failed: {str(e)}")
return {
'task': 'cleanup_expired_data',
'status': 'failed',
'error': str(e)
}
return {
'task': 'cleanup_expired_data',
'status': 'completed',
'cleaned_items': cleaned_items
}
def send_daily_report(results):
"""
Send a daily report
"""
logger.info("Sending daily report...")
try:
# Construct the report content
report = {
'date': datetime.now(timezone.utc).date().isoformat(),
'summary': {
'total_tasks': len(results),
'successful_tasks': len([r for r in results if r.get('status') == 'completed']),
'failed_tasks': len([r for r in results if r.get('status') == 'failed'])
},
'details': results
}
# Send an SNS notification
sns.publish(
TopicArn='arn:aws:sns:region:account:daily-report',
Subject='Daily System Report',
Message=json.dumps(report, indent=2)
)
return {
'task': 'send_daily_report',
'status': 'completed',
'report_sent': True
}
except Exception as e:
logger.error(f"Failed to send report: {str(e)}")
return {
'task': 'send_daily_report',
'status': 'failed',
'error': str(e)
}
def send_failure_notification(error_message, event_time):
"""
Send a failure notification
"""
try:
sns.publish(
TopicArn='arn:aws:sns:region:account:system-alerts',
Subject='Scheduled Task Failure',
Message=f"Scheduled task failed at {event_time}\nError: {error_message}"
)
except Exception as e:
logger.error(f"Failed to send failure notification: {str(e)}")
def handle_custom_event(event, context):
"""
Handle a custom event
"""
return {
'statusCode': 200,
'body': json.dumps({
'message': 'Custom event processed',
'event': event
})
}
Event Configuration Process
API Gateway Configuration Process
S3 Event Configuration Steps
- Create an S3 bucket
- Configure event notifications
- Select event types: ObjectCreated, ObjectRemoved
- Set prefix/suffix filters
- Specify the target Lambda function
- Set up Lambda permissions
- Test the event trigger
Comparison of Event Source Features
- API Gateway: Synchronous invocation, requires an immediate response, suitable for web APIs
- S3 Events: Asynchronous invocation, event-driven, suitable for file processing
- Scheduled Tasks: Triggered on a schedule, suitable for maintenance and batch processing tasks
- Stream Processing: Continuous processing, suitable for real-time data analysis
Configuration Notes
- Permission Settings: Ensure Lambda has permission to access the event source
- Concurrency Control: Consider the impact of concurrent executions
- Error Handling: Configure a dead-letter queue to handle failed events
- Cost Control: Monitor invocation frequency and costs
Best Practices
- Use environment variables to configure event source parameters
- Implement idempotency to avoid repeated executions
- Add detailed logging
- Use batch processing to improve efficiency
- Set appropriate timeouts and retry policies