Chapter 4: Lambda Event Sources and Triggers

Haiyue
19min

Chapter 4: Lambda Event Sources and Triggers

Learning Objectives
  • Understand various event source types
  • Configure API Gateway triggers
  • Set up S3 event triggers
  • Configure scheduled tasks (CloudWatch Events)
  • Understand event data structures

Key Concepts

Lambda Event Sources Overview

Lambda functions can be triggered by multiple AWS services, forming an event-driven architecture:

🔄 正在渲染 Mermaid 图表...

Event Source Categories

TypeEvent SourceInvocation ModeCharacteristics
SynchronousAPI Gateway, ALBRequest-ResponseReal-time response, has timeout limits
AsynchronousS3, SNS, EventBridgeEvent-drivenNo immediate response needed, supports retry
Stream ProcessingKinesis, DynamoDB StreamsPollingBatch processing, ordered processing
Pull ModeSQSPollingLambda actively pulls messages

Example Code

1. API Gateway Event Processing

import json
import logging
from urllib.parse import parse_qs

logger = logging.getLogger()
logger.setLevel(logging.INFO)

def lambda_handler(event, context):
    """
    Handle API Gateway events

    API Gateway event structure:
    - httpMethod: HTTP method
    - path: Request path
    - headers: Request headers
    - queryStringParameters: Query parameters
    - body: Request body
    - requestContext: Request context
    """

    logger.info(f"API Gateway event: {json.dumps(event)}")

    # Extract request information
    http_method = event.get('httpMethod')
    path = event.get('path')
    headers = event.get('headers', {})
    query_params = event.get('queryStringParameters') or {}
    body = event.get('body')

    # Handle different HTTP methods
    if http_method == 'GET':
        return handle_get_request(query_params, headers, context)
    elif http_method == 'POST':
        return handle_post_request(body, headers, context)
    elif http_method == 'PUT':
        return handle_put_request(body, headers, context)
    elif http_method == 'DELETE':
        return handle_delete_request(query_params, headers, context)
    else:
        return {
            'statusCode': 405,
            'headers': {
                'Content-Type': 'application/json',
                'Access-Control-Allow-Origin': '*'
            },
            'body': json.dumps({
                'error': f'Method {http_method} not allowed'
            })
        }

def handle_get_request(query_params, headers, context):
    """Handle GET request"""

    # Get query parameters
    user_id = query_params.get('userId')
    limit = int(query_params.get('limit', 10))

    return {
        'statusCode': 200,
        'headers': {
            'Content-Type': 'application/json',
            'Access-Control-Allow-Origin': '*'
        },
        'body': json.dumps({
            'message': 'GET request processed',
            'userId': user_id,
            'limit': limit,
            'requestId': context.aws_request_id
        })
    }

def handle_post_request(body, headers, context):
    """Handle POST request"""

    try:
        # Parse JSON request body
        if body:
            data = json.loads(body)
        else:
            data = {}

        # Process business logic
        result = {
            'id': context.aws_request_id,
            'data': data,
            'created_at': context.aws_request_id
        }

        return {
            'statusCode': 201,
            'headers': {
                'Content-Type': 'application/json',
                'Access-Control-Allow-Origin': '*'
            },
            'body': json.dumps(result)
        }

    except json.JSONDecodeError:
        return {
            'statusCode': 400,
            'headers': {
                'Content-Type': 'application/json',
                'Access-Control-Allow-Origin': '*'
            },
            'body': json.dumps({
                'error': 'Invalid JSON in request body'
            })
        }

def handle_put_request(body, headers, context):
    """Handle PUT request"""
    return {
        'statusCode': 200,
        'headers': {
            'Content-Type': 'application/json',
            'Access-Control-Allow-Origin': '*'
        },
        'body': json.dumps({
            'message': 'PUT request processed',
            'requestId': context.aws_request_id
        })
    }

def handle_delete_request(query_params, headers, context):
    """Handle DELETE request"""
    resource_id = query_params.get('id')

    if not resource_id:
        return {
            'statusCode': 400,
            'headers': {
                'Content-Type': 'application/json',
                'Access-Control-Allow-Origin': '*'
            },
            'body': json.dumps({
                'error': 'Missing required parameter: id'
            })
        }

    return {
        'statusCode': 204,
        'headers': {
            'Access-Control-Allow-Origin': '*'
        },
        'body': ''
    }

2. S3 Event Processing

import json
import boto3
from urllib.parse import unquote_plus

# Initialize AWS clients
s3_client = boto3.client('s3')
rekognition_client = boto3.client('rekognition')

def lambda_handler(event, context):
    """
    Handle S3 events, such as file uploads, deletions, etc.

    S3 event structure:
    - Records: Array of event records
    - eventSource: aws:s3
    - eventName: Event type
    - s3.bucket: Bucket information
    - s3.object: Object information
    """

    logger.info(f"S3 event: {json.dumps(event)}")

    results = []

    # Process each S3 event record
    for record in event['Records']:
        # Extract event information
        event_source = record['eventSource']
        event_name = record['eventName']

        if event_source == 'aws:s3':
            result = process_s3_record(record, context)
            results.append(result)

    return {
        'statusCode': 200,
        'body': json.dumps({
            'processed_records': len(results),
            'results': results
        })
    }

def process_s3_record(record, context):
    """
    Process single S3 record
    """

    # Extract S3 information
    bucket_name = record['s3']['bucket']['name']
    object_key = unquote_plus(record['s3']['object']['key'])
    event_name = record['eventName']
    object_size = record['s3']['object']['size']

    logger.info(f"Processing {event_name} for {object_key} in bucket {bucket_name}")

    # Process based on event type
    if event_name.startswith('ObjectCreated'):
        return handle_object_created(bucket_name, object_key, object_size, context)
    elif event_name.startswith('ObjectRemoved'):
        return handle_object_removed(bucket_name, object_key, context)
    else:
        return {
            'status': 'ignored',
            'event_name': event_name,
            'object_key': object_key
        }

def handle_object_created(bucket_name, object_key, object_size, context):
    """
    Handle object creation event
    """

    try:
        # Check file type
        if object_key.lower().endswith(('.jpg', '.jpeg', '.png')):
            # Process image file
            return process_image_file(bucket_name, object_key, context)
        elif object_key.lower().endswith(('.mp4', '.avi', '.mov')):
            # Process video file
            return process_video_file(bucket_name, object_key, context)
        else:
            # Process other files
            return {
                'status': 'processed',
                'file_type': 'other',
                'object_key': object_key,
                'size': object_size
            }

    except Exception as e:
        logger.error(f"Error processing {object_key}: {str(e)}")
        return {
            'status': 'error',
            'error': str(e),
            'object_key': object_key
        }

def process_image_file(bucket_name, object_key, context):
    """
    Process image file - Use Rekognition for analysis
    """

    try:
        # Use Rekognition to detect labels
        response = rekognition_client.detect_labels(
            Image={
                'S3Object': {
                    'Bucket': bucket_name,
                    'Name': object_key
                }
            },
            MaxLabels=10,
            MinConfidence=80
        )

        # Extract labels
        labels = [{
            'name': label['Name'],
            'confidence': label['Confidence']
        } for label in response['Labels']]

        return {
            'status': 'analyzed',
            'file_type': 'image',
            'object_key': object_key,
            'labels': labels
        }

    except Exception as e:
        logger.error(f"Error analyzing image {object_key}: {str(e)}")
        return {
            'status': 'error',
            'error': str(e),
            'object_key': object_key
        }

def process_video_file(bucket_name, object_key, context):
    """
    Process video file - Can trigger other services for processing
    """

    # Can trigger Elastic Transcoder or MediaConvert here
    return {
        'status': 'queued_for_processing',
        'file_type': 'video',
        'object_key': object_key,
        'note': 'Video processing job created'
    }

def handle_object_removed(bucket_name, object_key, context):
    """
    Handle object deletion event
    """

    logger.info(f"Object {object_key} was deleted from bucket {bucket_name}")

    # Clean up related resources or records
    return {
        'status': 'cleaned_up',
        'object_key': object_key,
        'action': 'metadata_removed'
    }

3. CloudWatch Events/EventBridge Scheduled Tasks

import json
import boto3
from datetime import datetime, timezone
import logging

logger = logging.getLogger()
logger.setLevel(logging.INFO)

# Initialize AWS service clients
cloudwatch = boto3.client('cloudwatch')
sns = boto3.client('sns')
dynamodb = boto3.resource('dynamodb')

def lambda_handler(event, context):
    """
    Handle CloudWatch Events/EventBridge events

    Scheduled task event structure:
    - source: aws.events
    - detail-type: Scheduled Event
    - detail: Event details
    - time: Event time
    """

    logger.info(f"CloudWatch event: {json.dumps(event)}")

    # Check event source
    if event.get('source') == 'aws.events':
        return handle_scheduled_event(event, context)
    else:
        return handle_custom_event(event, context)

def handle_scheduled_event(event, context):
    """
    Handle scheduled task event
    """

    event_time = datetime.fromisoformat(event['time'].replace('Z', '+00:00'))
    detail_type = event.get('detail-type', '')

    logger.info(f"Processing scheduled event: {detail_type} at {event_time}")

    # Execute scheduled tasks
    results = []

    try:
        # 1. Data backup task
        backup_result = perform_data_backup()
        results.append(backup_result)

        # 2. System health check
        health_result = perform_health_check()
        results.append(health_result)

        # 3. Clean up expired data
        cleanup_result = cleanup_expired_data()
        results.append(cleanup_result)

        # 4. Send report
        report_result = send_daily_report(results)
        results.append(report_result)

        return {
            'statusCode': 200,
            'body': json.dumps({
                'message': 'Scheduled tasks completed successfully',
                'event_time': event_time.isoformat(),
                'results': results
            })
        }

    except Exception as e:
        logger.error(f"Scheduled task failed: {str(e)}")

        # Send failure notification
        send_failure_notification(str(e), event_time)

        return {
            'statusCode': 500,
            'body': json.dumps({
                'error': str(e),
                'event_time': event_time.isoformat()
            })
        }

def perform_data_backup():
    """
    Perform data backup
    """

    logger.info("Starting data backup...")

    # Simulate backup operation
    backup_tables = ['users', 'orders', 'products']
    backed_up = []

    for table_name in backup_tables:
        try:
            # Can call DynamoDB backup API here
            logger.info(f"Backing up table: {table_name}")
            backed_up.append(table_name)
        except Exception as e:
            logger.error(f"Failed to backup {table_name}: {str(e)}")

    return {
        'task': 'data_backup',
        'status': 'completed',
        'backed_up_tables': backed_up
    }

def perform_health_check():
    """
    Perform system health check
    """

    logger.info("Performing health check...")

    health_metrics = []

    try:
        # Check key metrics
        response = cloudwatch.get_metric_statistics(
            Namespace='AWS/Lambda',
            MetricName='Duration',
            Dimensions=[
                {
                    'Name': 'FunctionName',
                    'Value': context.function_name
                }
            ],
            StartTime=datetime.now(timezone.utc).replace(hour=0, minute=0, second=0),
            EndTime=datetime.now(timezone.utc),
            Period=3600,
            Statistics=['Average']
        )

        health_metrics.append({
            'metric': 'lambda_duration',
            'status': 'healthy',
            'datapoints': len(response['Datapoints'])
        })

    except Exception as e:
        logger.error(f"Health check failed: {str(e)}")
        health_metrics.append({
            'metric': 'lambda_duration',
            'status': 'unhealthy',
            'error': str(e)
        })

    return {
        'task': 'health_check',
        'status': 'completed',
        'metrics': health_metrics
    }

def cleanup_expired_data():
    """
    Clean up expired data
    """

    logger.info("Cleaning up expired data...")

    # Simulate cleanup operation
    cleaned_items = 0

    try:
        # Can query and delete expired records here
        # table = dynamodb.Table('expired_data')
        # response = table.scan(...)

        cleaned_items = 42  # Simulated number of cleaned items

    except Exception as e:
        logger.error(f"Cleanup failed: {str(e)}")
        return {
            'task': 'cleanup_expired_data',
            'status': 'failed',
            'error': str(e)
        }

    return {
        'task': 'cleanup_expired_data',
        'status': 'completed',
        'cleaned_items': cleaned_items
    }

def send_daily_report(results):
    """
    Send daily report
    """

    logger.info("Sending daily report...")

    try:
        # Construct report content
        report = {
            'date': datetime.now(timezone.utc).date().isoformat(),
            'summary': {
                'total_tasks': len(results),
                'successful_tasks': len([r for r in results if r.get('status') == 'completed']),
                'failed_tasks': len([r for r in results if r.get('status') == 'failed'])
            },
            'details': results
        }

        # Send SNS notification
        sns.publish(
            TopicArn='arn:aws:sns:region:account:daily-report',
            Subject='Daily System Report',
            Message=json.dumps(report, indent=2)
        )

        return {
            'task': 'send_daily_report',
            'status': 'completed',
            'report_sent': True
        }

    except Exception as e:
        logger.error(f"Failed to send report: {str(e)}")
        return {
            'task': 'send_daily_report',
            'status': 'failed',
            'error': str(e)
        }

def send_failure_notification(error_message, event_time):
    """
    Send failure notification
    """

    try:
        sns.publish(
            TopicArn='arn:aws:sns:region:account:system-alerts',
            Subject='Scheduled Task Failure',
            Message=f"Scheduled task failed at {event_time}\nError: {error_message}"
        )
    except Exception as e:
        logger.error(f"Failed to send failure notification: {str(e)}")

def handle_custom_event(event, context):
    """
    Handle custom event
    """

    return {
        'statusCode': 200,
        'body': json.dumps({
            'message': 'Custom event processed',
            'event': event
        })
    }

Event Configuration Flow

API Gateway Configuration Flow

🔄 正在渲染 Mermaid 图表...

S3 Event Configuration Steps

  1. Create S3 Bucket
  2. Configure Event Notification
    • Select event type: ObjectCreated, ObjectRemoved
    • Set prefix/suffix filters
    • Specify target Lambda function
  3. Set Lambda Permissions
  4. Test Event Trigger
Event Source Comparison
  • API Gateway: Synchronous invocation, requires immediate response, suitable for Web APIs
  • S3 Events: Asynchronous invocation, event-driven, suitable for file processing
  • Scheduled Tasks: Triggered by schedule, suitable for maintenance and batch processing tasks
  • Stream Processing: Continuous processing, suitable for real-time data analysis
Configuration Notes
  • Permission Settings: Ensure Lambda has permissions to access event sources
  • Concurrency Control: Consider the impact of concurrent execution
  • Error Handling: Configure dead letter queues to handle failed events
  • Cost Control: Monitor invocation frequency and costs
Best Practices
  • Use environment variables to configure event source parameters
  • Implement idempotent processing to avoid duplicate execution
  • Add detailed logging
  • Use batch processing to improve efficiency
  • Set appropriate timeout and retry policies