Chapter 8: Lambda Integration with Other AWS Services

Haiyue
20min

Chapter 8: Lambda Integration with Other AWS Services

Chapter Overview

This chapter explores in-depth integration of Lambda functions with core services in the AWS ecosystem, including API Gateway, DynamoDB, S3, SQS/SNS, EventBridge, and more. Learn how to build complete serverless application architectures through practical examples.

Learning Objectives

  1. Master deep integration of Lambda with API Gateway
  2. Learn Lambda data operation patterns with DynamoDB
  3. Understand event-driven processing with Lambda and S3
  4. Master asynchronous processing with Lambda and message queues
  5. Learn to build event-driven architecture using EventBridge
  6. Understand Lambda integration patterns with other AWS services

8.1 Lambda and API Gateway Integration

8.1.1 RESTful API Design

from aws_cdk import (
    Stack,
    aws_lambda as _lambda,
    aws_apigateway as apigateway,
    aws_iam as iam,
    Duration,
    CfnOutput
)
from constructs import Construct

class ApiGatewayLambdaStack(Stack):
    def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
        super().__init__(scope, construct_id, **kwargs)

        # Create Lambda function
        self.api_handler = _lambda.Function(
            self, "ApiHandler",
            runtime=_lambda.Runtime.PYTHON_3_9,
            handler="index.handler",
            code=_lambda.Code.from_asset("lambda_functions/api_handler"),
            timeout=Duration.seconds(30),
            environment={
                'CORS_ALLOW_ORIGIN': '*',
                'LOG_LEVEL': 'INFO'
            }
        )

        # Create API Gateway
        self.api = apigateway.RestApi(
            self, "MyApi",
            rest_api_name="My Serverless API",
            description="Complete serverless API with Lambda",
            default_cors_preflight_options=apigateway.CorsOptions(
                allow_origins=apigateway.Cors.ALL_ORIGINS,
                allow_methods=apigateway.Cors.ALL_METHODS,
                allow_headers=["Content-Type", "X-Amz-Date", "Authorization", "X-Api-Key"]
            ),
            deploy_options=apigateway.StageOptions(
                stage_name="prod",
                throttling_rate_limit=100,
                throttling_burst_limit=200,
                logging_level=apigateway.MethodLoggingLevel.INFO,
                data_trace_enabled=True,
                metrics_enabled=True
            )
        )

        # Create Lambda integration
        lambda_integration = apigateway.LambdaIntegration(
            self.api_handler,
            proxy=False,
            integration_responses=[
                apigateway.IntegrationResponse(
                    status_code="200",
                    response_parameters={
                        'method.response.header.Access-Control-Allow-Origin': "'*'"
                    }
                ),
                apigateway.IntegrationResponse(
                    status_code="400",
                    selection_pattern="4\\d{2}"
                ),
                apigateway.IntegrationResponse(
                    status_code="500",
                    selection_pattern="5\\d{2}"
                )
            ]
        )

        # Add resources and methods
        self._create_users_api(lambda_integration)
        self._create_orders_api(lambda_integration)

        # Output API information
        CfnOutput(self, "ApiUrl", value=self.api.url)
        CfnOutput(self, "ApiId", value=self.api.rest_api_id)

    def _create_users_api(self, integration: apigateway.LambdaIntegration):
        """Create user-related API"""
        users = self.api.root.add_resource("users")

        # GET /users - Get user list
        users.add_method(
            "GET",
            integration,
            method_responses=[
                apigateway.MethodResponse(
                    status_code="200",
                    response_models={
                        "application/json": apigateway.Model.EMPTY_MODEL
                    },
                    response_parameters={
                        'method.response.header.Access-Control-Allow-Origin': True
                    }
                )
            ],
            request_parameters={
                'method.request.querystring.limit': False,
                'method.request.querystring.offset': False
            }
        )

        # POST /users - Create user
        users.add_method(
            "POST",
            integration,
            request_models={
                "application/json": self._create_user_model()
            },
            request_validator=apigateway.RequestValidator(
                self, "UserRequestValidator",
                rest_api=self.api,
                validate_request_body=True,
                validate_request_parameters=True
            )
        )

        # Individual user operations
        user = users.add_resource("{userId}")
        user.add_method("GET", integration)    # Get user
        user.add_method("PUT", integration)    # Update user
        user.add_method("DELETE", integration) # Delete user

    def _create_orders_api(self, integration: apigateway.LambdaIntegration):
        """Create order-related API"""
        orders = self.api.root.add_resource("orders")
        orders.add_method("GET", integration)
        orders.add_method("POST", integration)

        order = orders.add_resource("{orderId}")
        order.add_method("GET", integration)
        order.add_method("PUT", integration)

    def _create_user_model(self) -> apigateway.Model:
        """Create user data model"""
        return self.api.add_model(
            "UserModel",
            content_type="application/json",
            schema=apigateway.JsonSchema(
                type=apigateway.JsonSchemaType.OBJECT,
                properties={
                    "name": apigateway.JsonSchema(
                        type=apigateway.JsonSchemaType.STRING,
                        min_length=1,
                        max_length=100
                    ),
                    "email": apigateway.JsonSchema(
                        type=apigateway.JsonSchemaType.STRING,
                        format="email"
                    ),
                    "age": apigateway.JsonSchema(
                        type=apigateway.JsonSchemaType.INTEGER,
                        minimum=18,
                        maximum=120
                    )
                },
                required=["name", "email"]
            )
        )

8.1.2 API Handler Lambda Function

# lambda_functions/api_handler/index.py
import json
import logging
import os
from typing import Dict, Any, Optional
from datetime import datetime
import uuid

# Configure logging
logger = logging.getLogger()
logger.setLevel(os.getenv('LOG_LEVEL', 'INFO'))

class ApiResponse:
    """API response utility class"""

    @staticmethod
    def success(data: Any, status_code: int = 200) -> Dict[str, Any]:
        return {
            'statusCode': status_code,
            'headers': {
                'Content-Type': 'application/json',
                'Access-Control-Allow-Origin': os.getenv('CORS_ALLOW_ORIGIN', '*'),
                'Access-Control-Allow-Headers': 'Content-Type,X-Amz-Date,Authorization,X-Api-Key',
                'Access-Control-Allow-Methods': 'GET,POST,PUT,DELETE,OPTIONS'
            },
            'body': json.dumps(data, default=str)
        }

    @staticmethod
    def error(message: str, status_code: int = 400, error_code: str = None) -> Dict[str, Any]:
        error_data = {
            'error': message,
            'timestamp': datetime.utcnow().isoformat()
        }
        if error_code:
            error_data['errorCode'] = error_code

        return ApiResponse.success(error_data, status_code)

class RouteHandler:
    """Route handler"""

    def __init__(self):
        # Simulate data storage
        self.users_data = {}
        self.orders_data = {}

    def handle_users(self, method: str, path: str, body: str, query_params: Dict[str, str]) -> Dict[str, Any]:
        """Handle user-related requests"""
        path_parts = [p for p in path.split('/') if p]

        if method == 'GET' and len(path_parts) == 1:  # GET /users
            return self._get_users(query_params)
        elif method == 'POST' and len(path_parts) == 1:  # POST /users
            return self._create_user(json.loads(body) if body else {})
        elif method == 'GET' and len(path_parts) == 2:  # GET /users/{userId}
            return self._get_user(path_parts[1])
        elif method == 'PUT' and len(path_parts) == 2:  # PUT /users/{userId}
            return self._update_user(path_parts[1], json.loads(body) if body else {})
        elif method == 'DELETE' and len(path_parts) == 2:  # DELETE /users/{userId}
            return self._delete_user(path_parts[1])
        else:
            return ApiResponse.error("Route not found", 404)

    def handle_orders(self, method: str, path: str, body: str, query_params: Dict[str, str]) -> Dict[str, Any]:
        """Handle order-related requests"""
        path_parts = [p for p in path.split('/') if p]

        if method == 'GET' and len(path_parts) == 1:  # GET /orders
            return self._get_orders(query_params)
        elif method == 'POST' and len(path_parts) == 1:  # POST /orders
            return self._create_order(json.loads(body) if body else {})
        elif method == 'GET' and len(path_parts) == 2:  # GET /orders/{orderId}
            return self._get_order(path_parts[1])
        elif method == 'PUT' and len(path_parts) == 2:  # PUT /orders/{orderId}
            return self._update_order(path_parts[1], json.loads(body) if body else {})
        else:
            return ApiResponse.error("Route not found", 404)

    def _get_users(self, query_params: Dict[str, str]) -> Dict[str, Any]:
        """Get user list"""
        limit = int(query_params.get('limit', 10))
        offset = int(query_params.get('offset', 0))

        users_list = list(self.users_data.values())
        paginated_users = users_list[offset:offset + limit]

        return ApiResponse.success({
            'users': paginated_users,
            'total': len(users_list),
            'limit': limit,
            'offset': offset
        })

    def _create_user(self, user_data: Dict[str, Any]) -> Dict[str, Any]:
        """Create user"""
        # Validate required fields
        if not user_data.get('name') or not user_data.get('email'):
            return ApiResponse.error("Name and email are required", 400)

        user_id = str(uuid.uuid4())
        user = {
            'userId': user_id,
            'name': user_data['name'],
            'email': user_data['email'],
            'age': user_data.get('age'),
            'createdAt': datetime.utcnow().isoformat(),
            'updatedAt': datetime.utcnow().isoformat()
        }

        self.users_data[user_id] = user
        return ApiResponse.success(user, 201)

    def _get_user(self, user_id: str) -> Dict[str, Any]:
        """Get individual user"""
        user = self.users_data.get(user_id)
        if not user:
            return ApiResponse.error("User not found", 404)

        return ApiResponse.success(user)

    def _update_user(self, user_id: str, user_data: Dict[str, Any]) -> Dict[str, Any]:
        """Update user"""
        user = self.users_data.get(user_id)
        if not user:
            return ApiResponse.error("User not found", 404)

        # Update fields
        if 'name' in user_data:
            user['name'] = user_data['name']
        if 'email' in user_data:
            user['email'] = user_data['email']
        if 'age' in user_data:
            user['age'] = user_data['age']

        user['updatedAt'] = datetime.utcnow().isoformat()
        self.users_data[user_id] = user

        return ApiResponse.success(user)

    def _delete_user(self, user_id: str) -> Dict[str, Any]:
        """Delete user"""
        if user_id not in self.users_data:
            return ApiResponse.error("User not found", 404)

        del self.users_data[user_id]
        return ApiResponse.success({'message': 'User deleted successfully'})

    def _get_orders(self, query_params: Dict[str, str]) -> Dict[str, Any]:
        """Get order list"""
        orders_list = list(self.orders_data.values())
        return ApiResponse.success({'orders': orders_list})

    def _create_order(self, order_data: Dict[str, Any]) -> Dict[str, Any]:
        """Create order"""
        order_id = str(uuid.uuid4())
        order = {
            'orderId': order_id,
            'userId': order_data.get('userId'),
            'items': order_data.get('items', []),
            'totalAmount': order_data.get('totalAmount', 0),
            'status': 'pending',
            'createdAt': datetime.utcnow().isoformat()
        }

        self.orders_data[order_id] = order
        return ApiResponse.success(order, 201)

    def _get_order(self, order_id: str) -> Dict[str, Any]:
        """Get individual order"""
        order = self.orders_data.get(order_id)
        if not order:
            return ApiResponse.error("Order not found", 404)

        return ApiResponse.success(order)

    def _update_order(self, order_id: str, order_data: Dict[str, Any]) -> Dict[str, Any]:
        """Update order"""
        order = self.orders_data.get(order_id)
        if not order:
            return ApiResponse.error("Order not found", 404)

        # Update status
        if 'status' in order_data:
            order['status'] = order_data['status']

        order['updatedAt'] = datetime.utcnow().isoformat()
        self.orders_data[order_id] = order

        return ApiResponse.success(order)

# Global route handler
route_handler = RouteHandler()

def handler(event: Dict[str, Any], context) -> Dict[str, Any]:
    """API Gateway Lambda handler"""

    logger.info(f"Received event: {json.dumps(event, default=str)}")

    try:
        # Parse request information
        method = event.get('httpMethod', '').upper()
        path = event.get('path', '')
        body = event.get('body', '')
        query_params = event.get('queryStringParameters') or {}

        logger.info(f"Processing {method} {path}")

        # Route dispatch
        if path.startswith('/users'):
            response = route_handler.handle_users(method, path, body, query_params)
        elif path.startswith('/orders'):
            response = route_handler.handle_orders(method, path, body, query_params)
        else:
            response = ApiResponse.error("API endpoint not found", 404)

        logger.info(f"Response status: {response['statusCode']}")
        return response

    except json.JSONDecodeError as e:
        logger.error(f"Invalid JSON in request body: {str(e)}")
        return ApiResponse.error("Invalid JSON in request body", 400)

    except Exception as e:
        logger.error(f"Unexpected error: {str(e)}")
        return ApiResponse.error("Internal server error", 500)

8.1.3 WebSocket API Integration

from aws_cdk import (
    aws_apigatewayv2 as apigatewayv2,
    aws_apigatewayv2_integrations as integrations
)

class WebSocketApiStack(Stack):
    def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
        super().__init__(scope, construct_id, **kwargs)

        # WebSocket handler function
        self.websocket_handler = _lambda.Function(
            self, "WebSocketHandler",
            runtime=_lambda.Runtime.PYTHON_3_9,
            handler="index.handler",
            code=_lambda.Code.from_asset("lambda_functions/websocket_handler"),
            environment={
                'CONNECTION_TABLE': 'websocket-connections'
            }
        )

        # Create WebSocket API
        self.websocket_api = apigatewayv2.WebSocketApi(
            self, "WebSocketApi",
            api_name="My WebSocket API",
            description="Real-time communication API"
        )

        # Create Lambda integration
        lambda_integration = integrations.WebSocketLambdaIntegration(
            "LambdaIntegration",
            self.websocket_handler
        )

        # Add routes
        self.websocket_api.add_route(
            "$connect",
            integration=lambda_integration
        )

        self.websocket_api.add_route(
            "$disconnect",
            integration=lambda_integration
        )

        self.websocket_api.add_route(
            "sendMessage",
            integration=lambda_integration
        )

        # Deploy WebSocket API
        self.websocket_stage = apigatewayv2.WebSocketStage(
            self, "WebSocketStage",
            web_socket_api=self.websocket_api,
            stage_name="prod",
            auto_deploy=True
        )

        # Output WebSocket URL
        CfnOutput(
            self, "WebSocketUrl",
            value=self.websocket_stage.url
        )

8.2 Lambda and DynamoDB Integration

8.2.1 DynamoDB Operation Patterns

from aws_cdk import (
    aws_dynamodb as dynamodb,
    aws_lambda_event_sources as lambda_event_sources,
    RemovalPolicy
)

class DynamoDBLambdaStack(Stack):
    def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
        super().__init__(scope, construct_id, **kwargs)

        # Create DynamoDB table
        self.users_table = dynamodb.Table(
            self, "UsersTable",
            table_name="users",
            partition_key=dynamodb.Attribute(
                name="userId",
                type=dynamodb.AttributeType.STRING
            ),
            sort_key=dynamodb.Attribute(
                name="sk",
                type=dynamodb.AttributeType.STRING
            ),
            billing_mode=dynamodb.BillingMode.PAY_PER_REQUEST,
            stream=dynamodb.StreamViewType.NEW_AND_OLD_IMAGES,
            removal_policy=RemovalPolicy.DESTROY,
            global_secondary_indexes=[
                dynamodb.GlobalSecondaryIndex(
                    index_name="EmailIndex",
                    partition_key=dynamodb.Attribute(
                        name="email",
                        type=dynamodb.AttributeType.STRING
                    ),
                    projection_type=dynamodb.ProjectionType.ALL
                )
            ]
        )

        # CRUD operations Lambda function
        self.crud_function = _lambda.Function(
            self, "CrudFunction",
            runtime=_lambda.Runtime.PYTHON_3_9,
            handler="index.handler",
            code=_lambda.Code.from_asset("lambda_functions/dynamodb_crud"),
            environment={
                'USERS_TABLE': self.users_table.table_name,
                'EMAIL_INDEX': 'EmailIndex'
            },
            timeout=Duration.seconds(30)
        )

        # Stream processor Lambda function
        self.stream_processor = _lambda.Function(
            self, "StreamProcessor",
            runtime=_lambda.Runtime.PYTHON_3_9,
            handler="index.handler",
            code=_lambda.Code.from_asset("lambda_functions/dynamodb_stream"),
            timeout=Duration.minutes(1)
        )

        # Grant permissions
        self.users_table.grant_read_write_data(self.crud_function)
        self.users_table.grant_stream_read(self.stream_processor)

        # Add DynamoDB stream trigger
        self.stream_processor.add_event_source(
            lambda_event_sources.DynamoEventSource(
                self.users_table,
                starting_position=_lambda.StartingPosition.TRIM_HORIZON,
                batch_size=10,
                max_batching_window=Duration.seconds(5),
                retry_attempts=3
            )
        )

Due to the extensive length, I’ll continue with the key remaining sections while maintaining the complete structure. Let me continue writing the file with all essential content: