Chapter 8: Lambda Integration with Other AWS Services
Haiyue
20min
Chapter 8: Lambda Integration with Other AWS Services
Chapter Overview
This chapter explores in-depth integration of Lambda functions with core services in the AWS ecosystem, including API Gateway, DynamoDB, S3, SQS/SNS, EventBridge, and more. Learn how to build complete serverless application architectures through practical examples.
Learning Objectives
- Master deep integration of Lambda with API Gateway
- Learn Lambda data operation patterns with DynamoDB
- Understand event-driven processing with Lambda and S3
- Master asynchronous processing with Lambda and message queues
- Learn to build event-driven architecture using EventBridge
- Understand Lambda integration patterns with other AWS services
8.1 Lambda and API Gateway Integration
8.1.1 RESTful API Design
from aws_cdk import (
Stack,
aws_lambda as _lambda,
aws_apigateway as apigateway,
aws_iam as iam,
Duration,
CfnOutput
)
from constructs import Construct
class ApiGatewayLambdaStack(Stack):
def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
super().__init__(scope, construct_id, **kwargs)
# Create Lambda function
self.api_handler = _lambda.Function(
self, "ApiHandler",
runtime=_lambda.Runtime.PYTHON_3_9,
handler="index.handler",
code=_lambda.Code.from_asset("lambda_functions/api_handler"),
timeout=Duration.seconds(30),
environment={
'CORS_ALLOW_ORIGIN': '*',
'LOG_LEVEL': 'INFO'
}
)
# Create API Gateway
self.api = apigateway.RestApi(
self, "MyApi",
rest_api_name="My Serverless API",
description="Complete serverless API with Lambda",
default_cors_preflight_options=apigateway.CorsOptions(
allow_origins=apigateway.Cors.ALL_ORIGINS,
allow_methods=apigateway.Cors.ALL_METHODS,
allow_headers=["Content-Type", "X-Amz-Date", "Authorization", "X-Api-Key"]
),
deploy_options=apigateway.StageOptions(
stage_name="prod",
throttling_rate_limit=100,
throttling_burst_limit=200,
logging_level=apigateway.MethodLoggingLevel.INFO,
data_trace_enabled=True,
metrics_enabled=True
)
)
# Create Lambda integration
lambda_integration = apigateway.LambdaIntegration(
self.api_handler,
proxy=False,
integration_responses=[
apigateway.IntegrationResponse(
status_code="200",
response_parameters={
'method.response.header.Access-Control-Allow-Origin': "'*'"
}
),
apigateway.IntegrationResponse(
status_code="400",
selection_pattern="4\\d{2}"
),
apigateway.IntegrationResponse(
status_code="500",
selection_pattern="5\\d{2}"
)
]
)
# Add resources and methods
self._create_users_api(lambda_integration)
self._create_orders_api(lambda_integration)
# Output API information
CfnOutput(self, "ApiUrl", value=self.api.url)
CfnOutput(self, "ApiId", value=self.api.rest_api_id)
def _create_users_api(self, integration: apigateway.LambdaIntegration):
"""Create user-related API"""
users = self.api.root.add_resource("users")
# GET /users - Get user list
users.add_method(
"GET",
integration,
method_responses=[
apigateway.MethodResponse(
status_code="200",
response_models={
"application/json": apigateway.Model.EMPTY_MODEL
},
response_parameters={
'method.response.header.Access-Control-Allow-Origin': True
}
)
],
request_parameters={
'method.request.querystring.limit': False,
'method.request.querystring.offset': False
}
)
# POST /users - Create user
users.add_method(
"POST",
integration,
request_models={
"application/json": self._create_user_model()
},
request_validator=apigateway.RequestValidator(
self, "UserRequestValidator",
rest_api=self.api,
validate_request_body=True,
validate_request_parameters=True
)
)
# Individual user operations
user = users.add_resource("{userId}")
user.add_method("GET", integration) # Get user
user.add_method("PUT", integration) # Update user
user.add_method("DELETE", integration) # Delete user
def _create_orders_api(self, integration: apigateway.LambdaIntegration):
"""Create order-related API"""
orders = self.api.root.add_resource("orders")
orders.add_method("GET", integration)
orders.add_method("POST", integration)
order = orders.add_resource("{orderId}")
order.add_method("GET", integration)
order.add_method("PUT", integration)
def _create_user_model(self) -> apigateway.Model:
"""Create user data model"""
return self.api.add_model(
"UserModel",
content_type="application/json",
schema=apigateway.JsonSchema(
type=apigateway.JsonSchemaType.OBJECT,
properties={
"name": apigateway.JsonSchema(
type=apigateway.JsonSchemaType.STRING,
min_length=1,
max_length=100
),
"email": apigateway.JsonSchema(
type=apigateway.JsonSchemaType.STRING,
format="email"
),
"age": apigateway.JsonSchema(
type=apigateway.JsonSchemaType.INTEGER,
minimum=18,
maximum=120
)
},
required=["name", "email"]
)
)
8.1.2 API Handler Lambda Function
# lambda_functions/api_handler/index.py
import json
import logging
import os
from typing import Dict, Any, Optional
from datetime import datetime
import uuid
# Configure logging
logger = logging.getLogger()
logger.setLevel(os.getenv('LOG_LEVEL', 'INFO'))
class ApiResponse:
"""API response utility class"""
@staticmethod
def success(data: Any, status_code: int = 200) -> Dict[str, Any]:
return {
'statusCode': status_code,
'headers': {
'Content-Type': 'application/json',
'Access-Control-Allow-Origin': os.getenv('CORS_ALLOW_ORIGIN', '*'),
'Access-Control-Allow-Headers': 'Content-Type,X-Amz-Date,Authorization,X-Api-Key',
'Access-Control-Allow-Methods': 'GET,POST,PUT,DELETE,OPTIONS'
},
'body': json.dumps(data, default=str)
}
@staticmethod
def error(message: str, status_code: int = 400, error_code: str = None) -> Dict[str, Any]:
error_data = {
'error': message,
'timestamp': datetime.utcnow().isoformat()
}
if error_code:
error_data['errorCode'] = error_code
return ApiResponse.success(error_data, status_code)
class RouteHandler:
"""Route handler"""
def __init__(self):
# Simulate data storage
self.users_data = {}
self.orders_data = {}
def handle_users(self, method: str, path: str, body: str, query_params: Dict[str, str]) -> Dict[str, Any]:
"""Handle user-related requests"""
path_parts = [p for p in path.split('/') if p]
if method == 'GET' and len(path_parts) == 1: # GET /users
return self._get_users(query_params)
elif method == 'POST' and len(path_parts) == 1: # POST /users
return self._create_user(json.loads(body) if body else {})
elif method == 'GET' and len(path_parts) == 2: # GET /users/{userId}
return self._get_user(path_parts[1])
elif method == 'PUT' and len(path_parts) == 2: # PUT /users/{userId}
return self._update_user(path_parts[1], json.loads(body) if body else {})
elif method == 'DELETE' and len(path_parts) == 2: # DELETE /users/{userId}
return self._delete_user(path_parts[1])
else:
return ApiResponse.error("Route not found", 404)
def handle_orders(self, method: str, path: str, body: str, query_params: Dict[str, str]) -> Dict[str, Any]:
"""Handle order-related requests"""
path_parts = [p for p in path.split('/') if p]
if method == 'GET' and len(path_parts) == 1: # GET /orders
return self._get_orders(query_params)
elif method == 'POST' and len(path_parts) == 1: # POST /orders
return self._create_order(json.loads(body) if body else {})
elif method == 'GET' and len(path_parts) == 2: # GET /orders/{orderId}
return self._get_order(path_parts[1])
elif method == 'PUT' and len(path_parts) == 2: # PUT /orders/{orderId}
return self._update_order(path_parts[1], json.loads(body) if body else {})
else:
return ApiResponse.error("Route not found", 404)
def _get_users(self, query_params: Dict[str, str]) -> Dict[str, Any]:
"""Get user list"""
limit = int(query_params.get('limit', 10))
offset = int(query_params.get('offset', 0))
users_list = list(self.users_data.values())
paginated_users = users_list[offset:offset + limit]
return ApiResponse.success({
'users': paginated_users,
'total': len(users_list),
'limit': limit,
'offset': offset
})
def _create_user(self, user_data: Dict[str, Any]) -> Dict[str, Any]:
"""Create user"""
# Validate required fields
if not user_data.get('name') or not user_data.get('email'):
return ApiResponse.error("Name and email are required", 400)
user_id = str(uuid.uuid4())
user = {
'userId': user_id,
'name': user_data['name'],
'email': user_data['email'],
'age': user_data.get('age'),
'createdAt': datetime.utcnow().isoformat(),
'updatedAt': datetime.utcnow().isoformat()
}
self.users_data[user_id] = user
return ApiResponse.success(user, 201)
def _get_user(self, user_id: str) -> Dict[str, Any]:
"""Get individual user"""
user = self.users_data.get(user_id)
if not user:
return ApiResponse.error("User not found", 404)
return ApiResponse.success(user)
def _update_user(self, user_id: str, user_data: Dict[str, Any]) -> Dict[str, Any]:
"""Update user"""
user = self.users_data.get(user_id)
if not user:
return ApiResponse.error("User not found", 404)
# Update fields
if 'name' in user_data:
user['name'] = user_data['name']
if 'email' in user_data:
user['email'] = user_data['email']
if 'age' in user_data:
user['age'] = user_data['age']
user['updatedAt'] = datetime.utcnow().isoformat()
self.users_data[user_id] = user
return ApiResponse.success(user)
def _delete_user(self, user_id: str) -> Dict[str, Any]:
"""Delete user"""
if user_id not in self.users_data:
return ApiResponse.error("User not found", 404)
del self.users_data[user_id]
return ApiResponse.success({'message': 'User deleted successfully'})
def _get_orders(self, query_params: Dict[str, str]) -> Dict[str, Any]:
"""Get order list"""
orders_list = list(self.orders_data.values())
return ApiResponse.success({'orders': orders_list})
def _create_order(self, order_data: Dict[str, Any]) -> Dict[str, Any]:
"""Create order"""
order_id = str(uuid.uuid4())
order = {
'orderId': order_id,
'userId': order_data.get('userId'),
'items': order_data.get('items', []),
'totalAmount': order_data.get('totalAmount', 0),
'status': 'pending',
'createdAt': datetime.utcnow().isoformat()
}
self.orders_data[order_id] = order
return ApiResponse.success(order, 201)
def _get_order(self, order_id: str) -> Dict[str, Any]:
"""Get individual order"""
order = self.orders_data.get(order_id)
if not order:
return ApiResponse.error("Order not found", 404)
return ApiResponse.success(order)
def _update_order(self, order_id: str, order_data: Dict[str, Any]) -> Dict[str, Any]:
"""Update order"""
order = self.orders_data.get(order_id)
if not order:
return ApiResponse.error("Order not found", 404)
# Update status
if 'status' in order_data:
order['status'] = order_data['status']
order['updatedAt'] = datetime.utcnow().isoformat()
self.orders_data[order_id] = order
return ApiResponse.success(order)
# Global route handler
route_handler = RouteHandler()
def handler(event: Dict[str, Any], context) -> Dict[str, Any]:
"""API Gateway Lambda handler"""
logger.info(f"Received event: {json.dumps(event, default=str)}")
try:
# Parse request information
method = event.get('httpMethod', '').upper()
path = event.get('path', '')
body = event.get('body', '')
query_params = event.get('queryStringParameters') or {}
logger.info(f"Processing {method} {path}")
# Route dispatch
if path.startswith('/users'):
response = route_handler.handle_users(method, path, body, query_params)
elif path.startswith('/orders'):
response = route_handler.handle_orders(method, path, body, query_params)
else:
response = ApiResponse.error("API endpoint not found", 404)
logger.info(f"Response status: {response['statusCode']}")
return response
except json.JSONDecodeError as e:
logger.error(f"Invalid JSON in request body: {str(e)}")
return ApiResponse.error("Invalid JSON in request body", 400)
except Exception as e:
logger.error(f"Unexpected error: {str(e)}")
return ApiResponse.error("Internal server error", 500)
8.1.3 WebSocket API Integration
from aws_cdk import (
aws_apigatewayv2 as apigatewayv2,
aws_apigatewayv2_integrations as integrations
)
class WebSocketApiStack(Stack):
def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
super().__init__(scope, construct_id, **kwargs)
# WebSocket handler function
self.websocket_handler = _lambda.Function(
self, "WebSocketHandler",
runtime=_lambda.Runtime.PYTHON_3_9,
handler="index.handler",
code=_lambda.Code.from_asset("lambda_functions/websocket_handler"),
environment={
'CONNECTION_TABLE': 'websocket-connections'
}
)
# Create WebSocket API
self.websocket_api = apigatewayv2.WebSocketApi(
self, "WebSocketApi",
api_name="My WebSocket API",
description="Real-time communication API"
)
# Create Lambda integration
lambda_integration = integrations.WebSocketLambdaIntegration(
"LambdaIntegration",
self.websocket_handler
)
# Add routes
self.websocket_api.add_route(
"$connect",
integration=lambda_integration
)
self.websocket_api.add_route(
"$disconnect",
integration=lambda_integration
)
self.websocket_api.add_route(
"sendMessage",
integration=lambda_integration
)
# Deploy WebSocket API
self.websocket_stage = apigatewayv2.WebSocketStage(
self, "WebSocketStage",
web_socket_api=self.websocket_api,
stage_name="prod",
auto_deploy=True
)
# Output WebSocket URL
CfnOutput(
self, "WebSocketUrl",
value=self.websocket_stage.url
)
8.2 Lambda and DynamoDB Integration
8.2.1 DynamoDB Operation Patterns
from aws_cdk import (
aws_dynamodb as dynamodb,
aws_lambda_event_sources as lambda_event_sources,
RemovalPolicy
)
class DynamoDBLambdaStack(Stack):
def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
super().__init__(scope, construct_id, **kwargs)
# Create DynamoDB table
self.users_table = dynamodb.Table(
self, "UsersTable",
table_name="users",
partition_key=dynamodb.Attribute(
name="userId",
type=dynamodb.AttributeType.STRING
),
sort_key=dynamodb.Attribute(
name="sk",
type=dynamodb.AttributeType.STRING
),
billing_mode=dynamodb.BillingMode.PAY_PER_REQUEST,
stream=dynamodb.StreamViewType.NEW_AND_OLD_IMAGES,
removal_policy=RemovalPolicy.DESTROY,
global_secondary_indexes=[
dynamodb.GlobalSecondaryIndex(
index_name="EmailIndex",
partition_key=dynamodb.Attribute(
name="email",
type=dynamodb.AttributeType.STRING
),
projection_type=dynamodb.ProjectionType.ALL
)
]
)
# CRUD operations Lambda function
self.crud_function = _lambda.Function(
self, "CrudFunction",
runtime=_lambda.Runtime.PYTHON_3_9,
handler="index.handler",
code=_lambda.Code.from_asset("lambda_functions/dynamodb_crud"),
environment={
'USERS_TABLE': self.users_table.table_name,
'EMAIL_INDEX': 'EmailIndex'
},
timeout=Duration.seconds(30)
)
# Stream processor Lambda function
self.stream_processor = _lambda.Function(
self, "StreamProcessor",
runtime=_lambda.Runtime.PYTHON_3_9,
handler="index.handler",
code=_lambda.Code.from_asset("lambda_functions/dynamodb_stream"),
timeout=Duration.minutes(1)
)
# Grant permissions
self.users_table.grant_read_write_data(self.crud_function)
self.users_table.grant_stream_read(self.stream_processor)
# Add DynamoDB stream trigger
self.stream_processor.add_event_source(
lambda_event_sources.DynamoEventSource(
self.users_table,
starting_position=_lambda.StartingPosition.TRIM_HORIZON,
batch_size=10,
max_batching_window=Duration.seconds(5),
retry_attempts=3
)
)
Due to the extensive length, I’ll continue with the key remaining sections while maintaining the complete structure. Let me continue writing the file with all essential content: