Chapter 7: Advanced Lambda Configuration with CDK
Haiyue
35min
Chapter 7: Advanced Lambda Configuration with CDK
Chapter Overview
This chapter explores advanced Lambda function configuration options, including VPC integration, Layer usage, error handling, concurrency control, reserved concurrency, and other enterprise-grade features. These configurations are crucial for building production-grade serverless applications.
Learning Objectives
- Master Lambda function VPC configuration and network security settings
- Learn to create and use Lambda Layers
- Understand Lambda error handling and retry mechanisms
- Master concurrency control and reserved concurrency configuration
- Learn to configure dead letter queues and asynchronous invocation
- Understand Lambda environment variables and secrets management
7.1 VPC Configuration and Network Security
7.1.1 Basic VPC Configuration
from aws_cdk import (
Stack,
aws_lambda as _lambda,
aws_ec2 as ec2,
aws_rds as rds,
Duration
)
from constructs import Construct
class VpcLambdaStack(Stack):
def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
super().__init__(scope, construct_id, **kwargs)
# Create VPC
self.vpc = ec2.Vpc(
self, "LambdaVpc",
max_azs=2,
cidr="10.0.0.0/16",
subnet_configuration=[
ec2.SubnetConfiguration(
name="Private",
subnet_type=ec2.SubnetType.PRIVATE_WITH_EGRESS,
cidr_mask=24
),
ec2.SubnetConfiguration(
name="Public",
subnet_type=ec2.SubnetType.PUBLIC,
cidr_mask=24
)
]
)
# Create security group
self.lambda_sg = ec2.SecurityGroup(
self, "LambdaSecurityGroup",
vpc=self.vpc,
description="Security group for Lambda functions",
allow_all_outbound=True
)
# Create database security group
self.db_sg = ec2.SecurityGroup(
self, "DatabaseSecurityGroup",
vpc=self.vpc,
description="Security group for RDS database"
)
# Allow Lambda to access database
self.db_sg.add_ingress_rule(
peer=self.lambda_sg,
connection=ec2.Port.tcp(5432), # PostgreSQL port
description="Allow Lambda access to database"
)
# Create Lambda function (in VPC)
self.vpc_lambda = _lambda.Function(
self, "VpcLambdaFunction",
runtime=_lambda.Runtime.PYTHON_3_9,
handler="index.handler",
code=_lambda.Code.from_asset("lambda_functions/vpc_lambda"),
vpc=self.vpc,
vpc_subnets=ec2.SubnetSelection(
subnet_type=ec2.SubnetType.PRIVATE_WITH_EGRESS
),
security_groups=[self.lambda_sg],
timeout=Duration.minutes(5),
function_name="vpc-lambda-function"
)
7.1.2 RDS Integration Configuration
class DatabaseLambdaStack(Stack):
def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
super().__init__(scope, construct_id, **kwargs)
# Use existing VPC or create new VPC
self.vpc = ec2.Vpc.from_lookup(self, "ExistingVpc", is_default=True)
# Create database subnet group
db_subnet_group = rds.SubnetGroup(
self, "DatabaseSubnetGroup",
description="Subnet group for RDS database",
vpc=self.vpc,
vpc_subnets=ec2.SubnetSelection(
subnet_type=ec2.SubnetType.PRIVATE_WITH_EGRESS
)
)
# Create RDS instance
self.database = rds.DatabaseInstance(
self, "PostgresDatabase",
engine=rds.DatabaseInstanceEngine.postgres(
version=rds.PostgresEngineVersion.VER_14_9
),
instance_type=ec2.InstanceType.of(
ec2.InstanceClass.T3,
ec2.InstanceSize.MICRO
),
vpc=self.vpc,
subnet_group=db_subnet_group,
database_name="appdb",
credentials=rds.Credentials.from_generated_secret("dbadmin"),
allocated_storage=20,
storage_encrypted=True,
multi_az=False,
deletion_protection=False
)
# Lambda function configuration
self.db_lambda = _lambda.Function(
self, "DatabaseLambda",
runtime=_lambda.Runtime.PYTHON_3_9,
handler="index.handler",
code=_lambda.Code.from_asset("lambda_functions/database_lambda"),
vpc=self.vpc,
environment={
'DB_SECRET_ARN': self.database.secret.secret_arn,
'DB_ENDPOINT': self.database.instance_endpoint.hostname,
'DB_PORT': str(self.database.instance_endpoint.port),
'DB_NAME': 'appdb'
},
timeout=Duration.minutes(2)
)
# Grant access to database secret
self.database.secret.grant_read(self.db_lambda)
# Allow Lambda to connect to database
self.database.connections.allow_from(
self.db_lambda,
ec2.Port.tcp(5432),
"Lambda database access"
)
7.1.3 VPC Endpoint Configuration
class VpcEndpointStack(Stack):
def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
super().__init__(scope, construct_id, **kwargs)
# Create VPC
self.vpc = ec2.Vpc(self, "VpcWithEndpoints", max_azs=2)
# Create S3 VPC endpoint
s3_endpoint = self.vpc.add_gateway_endpoint(
"S3Endpoint",
service=ec2.GatewayVpcEndpointAwsService.S3,
subnets=[ec2.SubnetSelection(
subnet_type=ec2.SubnetType.PRIVATE_WITH_EGRESS
)]
)
# Create DynamoDB VPC endpoint
dynamodb_endpoint = self.vpc.add_gateway_endpoint(
"DynamoDbEndpoint",
service=ec2.GatewayVpcEndpointAwsService.DYNAMODB,
subnets=[ec2.SubnetSelection(
subnet_type=ec2.SubnetType.PRIVATE_WITH_EGRESS
)]
)
# Create Lambda VPC endpoint (for invoking other Lambda functions)
lambda_endpoint = self.vpc.add_interface_endpoint(
"LambdaEndpoint",
service=ec2.InterfaceVpcEndpointAwsService.LAMBDA_,
subnets=ec2.SubnetSelection(
subnet_type=ec2.SubnetType.PRIVATE_WITH_EGRESS
)
)
# Lambda function using VPC endpoints
self.endpoint_lambda = _lambda.Function(
self, "EndpointLambda",
runtime=_lambda.Runtime.PYTHON_3_9,
handler="index.handler",
code=_lambda.Code.from_asset("lambda_functions/endpoint_lambda"),
vpc=self.vpc,
environment={
'USE_VPC_ENDPOINTS': 'true'
}
)
7.2 Lambda Layers
7.2.1 Creating Shared Library Layer
# layers/python_utils/python/utils.py
import json
import logging
from datetime import datetime
from typing import Dict, Any
def setup_logging(level: str = "INFO") -> logging.Logger:
"""Setup logging configuration"""
logger = logging.getLogger()
logger.setLevel(getattr(logging, level))
return logger
def format_response(status_code: int, body: Dict[str, Any],
headers: Dict[str, str] = None) -> Dict[str, Any]:
"""Format Lambda response"""
default_headers = {
'Content-Type': 'application/json',
'Access-Control-Allow-Origin': '*'
}
if headers:
default_headers.update(headers)
return {
'statusCode': status_code,
'headers': default_headers,
'body': json.dumps(body, default=str)
}
def get_current_timestamp() -> str:
"""Get current timestamp"""
return datetime.utcnow().isoformat()
class DatabaseHelper:
"""Database operation helper class"""
@staticmethod
def build_update_expression(data: Dict[str, Any]) -> tuple:
"""Build DynamoDB update expression"""
update_expression = "SET "
expression_values = {}
for key, value in data.items():
if key not in ['id', 'pk', 'sk']: # Exclude key fields
update_expression += f"{key} = :{key}, "
expression_values[f":{key}"] = value
update_expression = update_expression.rstrip(', ')
return update_expression, expression_values
# stacks/layers_stack.py
from aws_cdk import (
Stack,
aws_lambda as _lambda,
Duration
)
from constructs import Construct
import os
class LayersStack(Stack):
def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
super().__init__(scope, construct_id, **kwargs)
# Create Python utilities layer
self.utils_layer = _lambda.LayerVersion(
self, "PythonUtilsLayer",
code=_lambda.Code.from_asset("layers/python_utils"),
compatible_runtimes=[
_lambda.Runtime.PYTHON_3_8,
_lambda.Runtime.PYTHON_3_9,
_lambda.Runtime.PYTHON_3_10,
_lambda.Runtime.PYTHON_3_11
],
description="Common Python utilities for Lambda functions",
layer_version_name="python-utils-layer"
)
# Create third-party dependencies layer
self.deps_layer = _lambda.LayerVersion(
self, "DependenciesLayer",
code=_lambda.Code.from_asset(
"layers/dependencies",
bundling=_lambda.BundlingOptions(
image=_lambda.Runtime.PYTHON_3_9.bundling_image,
command=[
"bash", "-c",
"pip install -r requirements.txt -t /asset-output/python/"
]
)
),
compatible_runtimes=[_lambda.Runtime.PYTHON_3_9],
description="Third-party dependencies layer"
)
# Lambda function using layers
self.layered_function = _lambda.Function(
self, "LayeredFunction",
runtime=_lambda.Runtime.PYTHON_3_9,
handler="index.handler",
code=_lambda.Code.from_asset("lambda_functions/layered"),
layers=[self.utils_layer, self.deps_layer],
timeout=Duration.seconds(30)
)
7.2.2 Using Layer in Lambda Function
# lambda_functions/layered/index.py
import json
from utils import setup_logging, format_response, get_current_timestamp, DatabaseHelper
# Use tools from layer
logger = setup_logging("INFO")
def handler(event, context):
"""Lambda function using layer tools"""
logger.info(f"Processing event: {json.dumps(event)}")
try:
# Use utility functions from layer
timestamp = get_current_timestamp()
# Simulate data processing
data = {
'name': 'John Doe',
'email': 'john@example.com',
'updated_at': timestamp
}
# Use DatabaseHelper
update_expr, expr_values = DatabaseHelper.build_update_expression(data)
result = {
'message': 'Data processed successfully',
'timestamp': timestamp,
'update_expression': update_expr,
'expression_values': expr_values
}
# Use format response function
return format_response(200, result)
except Exception as e:
logger.error(f"Error processing request: {str(e)}")
return format_response(500, {'error': str(e)})
7.2.3 Cross-Account Layer Sharing
from aws_cdk import aws_iam as iam
class SharedLayersStack(Stack):
def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
super().__init__(scope, construct_id, **kwargs)
# Create shareable layer
self.shared_layer = _lambda.LayerVersion(
self, "SharedUtilsLayer",
code=_lambda.Code.from_asset("layers/shared_utils"),
compatible_runtimes=[_lambda.Runtime.PYTHON_3_9],
description="Shared utilities across accounts"
)
# Add resource policy to allow other accounts
self.shared_layer.add_permission(
"CrossAccountAccess",
principal=iam.AccountPrincipal("123456789012"), # Target account ID
action="lambda:GetLayerVersion"
)
# Or allow all accounts in organization
self.shared_layer.add_permission(
"OrganizationAccess",
principal=iam.OrganizationPrincipal("o-example12345"),
action="lambda:GetLayerVersion"
)
7.3 Error Handling and Retry Mechanisms
7.3.1 Synchronous Invocation Error Handling
# lambda_functions/error_handling/index.py
import json
import logging
import traceback
from typing import Dict, Any
logger = logging.getLogger()
logger.setLevel(logging.INFO)
class LambdaError(Exception):
"""Custom Lambda exception"""
def __init__(self, message: str, error_code: str = "GENERAL_ERROR"):
self.message = message
self.error_code = error_code
super().__init__(self.message)
def handler(event: Dict[str, Any], context) -> Dict[str, Any]:
"""Lambda function with complete error handling"""
# Log request information
logger.info(f"Request ID: {context.aws_request_id}")
logger.info(f"Function Name: {context.function_name}")
logger.info(f"Remaining Time: {context.get_remaining_time_in_millis()}ms")
try:
# Validate input
if not event.get('action'):
raise LambdaError("Missing required field: action", "VALIDATION_ERROR")
action = event['action']
# Execute different operations based on action
if action == 'process_data':
result = process_data(event.get('data', {}))
elif action == 'validate_input':
result = validate_input(event.get('input', {}))
else:
raise LambdaError(f"Unknown action: {action}", "INVALID_ACTION")
return {
'statusCode': 200,
'body': json.dumps({
'success': True,
'result': result,
'request_id': context.aws_request_id
})
}
except LambdaError as e:
logger.error(f"Business logic error: {e.message}")
return {
'statusCode': 400,
'body': json.dumps({
'success': False,
'error': e.message,
'error_code': e.error_code,
'request_id': context.aws_request_id
})
}
except Exception as e:
logger.error(f"Unexpected error: {str(e)}")
logger.error(traceback.format_exc())
return {
'statusCode': 500,
'body': json.dumps({
'success': False,
'error': 'Internal server error',
'error_code': 'INTERNAL_ERROR',
'request_id': context.aws_request_id
})
}
def process_data(data: Dict[str, Any]) -> Dict[str, Any]:
"""Data processing function"""
if not data:
raise LambdaError("No data provided", "NO_DATA")
# Simulate data processing
processed = {
'original': data,
'processed_at': '2023-01-01T00:00:00Z',
'status': 'processed'
}
return processed
def validate_input(input_data: Dict[str, Any]) -> Dict[str, Any]:
"""Input validation function"""
required_fields = ['name', 'email']
missing_fields = [field for field in required_fields if field not in input_data]
if missing_fields:
raise LambdaError(
f"Missing required fields: {', '.join(missing_fields)}",
"MISSING_FIELDS"
)
return {'valid': True, 'message': 'Input validation passed'}
7.3.2 Asynchronous Invocation and Retry Configuration
from aws_cdk import (
aws_lambda as _lambda,
aws_sqs as sqs,
aws_lambda_event_sources as lambda_event_sources
)
class AsyncLambdaStack(Stack):
def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
super().__init__(scope, construct_id, **kwargs)
# Create dead letter queue
self.dlq = sqs.Queue(
self, "AsyncLambdaDLQ",
queue_name="async-lambda-dlq",
retention_period=Duration.days(14)
)
# Create retry queue
self.retry_queue = sqs.Queue(
self, "AsyncLambdaRetryQueue",
queue_name="async-lambda-retry",
visibility_timeout=Duration.minutes(5),
dead_letter_queue=sqs.DeadLetterQueue(
max_receive_count=3,
queue=self.dlq
)
)
# Create asynchronous Lambda function
self.async_function = _lambda.Function(
self, "AsyncFunction",
runtime=_lambda.Runtime.PYTHON_3_9,
handler="index.handler",
code=_lambda.Code.from_asset("lambda_functions/async_handler"),
timeout=Duration.minutes(5),
retry_attempts=2, # Set retry attempts
dead_letter_queue=self.dlq, # Set dead letter queue
environment={
'RETRY_QUEUE_URL': self.retry_queue.queue_url
}
)
# Add Lambda trigger for SQS queue
self.async_function.add_event_source(
lambda_event_sources.SqsEventSource(
self.retry_queue,
batch_size=10,
max_batching_window=Duration.seconds(5)
)
)
# Grant queue access permissions
self.retry_queue.grant_send_messages(self.async_function)
self.dlq.grant_send_messages(self.async_function)
7.3.3 Circuit Breaker Pattern Implementation
# lambda_functions/circuit_breaker/index.py
import json
import time
from typing import Dict, Any, Optional
import boto3
class CircuitBreaker:
"""Circuit breaker implementation"""
def __init__(self, failure_threshold: int = 5, timeout: int = 60):
self.failure_threshold = failure_threshold
self.timeout = timeout
self.failure_count = 0
self.last_failure_time: Optional[float] = None
self.state = "CLOSED" # CLOSED, OPEN, HALF_OPEN
def call(self, func, *args, **kwargs):
"""Call protected function"""
if self.state == "OPEN":
if time.time() - self.last_failure_time > self.timeout:
self.state = "HALF_OPEN"
else:
raise Exception("Circuit breaker is OPEN")
try:
result = func(*args, **kwargs)
self.on_success()
return result
except Exception as e:
self.on_failure()
raise e
def on_success(self):
"""Handle successful call"""
self.failure_count = 0
self.state = "CLOSED"
def on_failure(self):
"""Handle failed call"""
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = "OPEN"
# Global circuit breaker instance
external_api_breaker = CircuitBreaker(failure_threshold=3, timeout=30)
def handler(event: Dict[str, Any], context) -> Dict[str, Any]:
"""Lambda function using circuit breaker"""
try:
# Use circuit breaker to protect external API call
result = external_api_breaker.call(call_external_api, event.get('data'))
return {
'statusCode': 200,
'body': json.dumps({
'success': True,
'result': result,
'circuit_breaker_state': external_api_breaker.state
})
}
except Exception as e:
return {
'statusCode': 503,
'body': json.dumps({
'success': False,
'error': str(e),
'circuit_breaker_state': external_api_breaker.state
})
}
def call_external_api(data: Dict[str, Any]) -> Dict[str, Any]:
"""Simulate external API call"""
import requests
# Simulate potentially failing external API call
response = requests.get('https://httpbin.org/json', timeout=5)
response.raise_for_status()
return response.json()
7.4 Concurrency Control and Reserved Concurrency
7.4.1 Concurrency Limit Configuration
class ConcurrencyControlStack(Stack):
def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
super().__init__(scope, construct_id, **kwargs)
# High priority function - reserved concurrency
self.high_priority_function = _lambda.Function(
self, "HighPriorityFunction",
runtime=_lambda.Runtime.PYTHON_3_9,
handler="index.handler",
code=_lambda.Code.from_asset("lambda_functions/high_priority"),
reserved_concurrent_executions=50, # Reserve 50 concurrent executions
timeout=Duration.seconds(30)
)
# Standard function - limited concurrency
self.standard_function = _lambda.Function(
self, "StandardFunction",
runtime=_lambda.Runtime.PYTHON_3_9,
handler="index.handler",
code=_lambda.Code.from_asset("lambda_functions/standard"),
reserved_concurrent_executions=10, # Limit to 10 concurrent executions
timeout=Duration.seconds(60)
)
# Batch processing function - no concurrency limit
self.batch_function = _lambda.Function(
self, "BatchFunction",
runtime=_lambda.Runtime.PYTHON_3_9,
handler="index.handler",
code=_lambda.Code.from_asset("lambda_functions/batch"),
# No concurrency limit, uses account-level remaining concurrency
timeout=Duration.minutes(15)
)
7.4.2 Provisioned Concurrency Configuration
class ProvisionedConcurrencyStack(Stack):
def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
super().__init__(scope, construct_id, **kwargs)
# Create Lambda function
self.warm_function = _lambda.Function(
self, "WarmFunction",
runtime=_lambda.Runtime.PYTHON_3_9,
handler="index.handler",
code=_lambda.Code.from_asset("lambda_functions/warm_function"),
timeout=Duration.seconds(30)
)
# Create version
version = self.warm_function.current_version
# Create alias
self.prod_alias = _lambda.Alias(
self, "ProdAlias",
alias_name="PROD",
version=version,
provisioned_concurrency_config=_lambda.ProvisionedConcurrencyConfig(
provisioned_concurrent_executions=10
)
)
# Create application autoscaling target
from aws_cdk import aws_applicationautoscaling as appscaling
scalable_target = appscaling.ScalableTarget(
self, "ScalableTarget",
service_namespace=appscaling.ServiceNamespace.LAMBDA,
scalable_dimension="lambda:function:ProvisionedConcurrency",
resource_id=f"function:{self.warm_function.function_name}:PROD",
min_capacity=5,
max_capacity=50
)
# Configure autoscaling policy
scalable_target.scale_to_track_metric(
"TargetTracking",
target_value=0.7,
metric=appscaling.predefined_metric_specification(
appscaling.PredefinedMetric.LAMBDA_PROVISIONED_CONCURRENCY_UTILIZATION
)
)
7.5 Environment Variables and Secrets Management
7.5.1 Secure Environment Variable Configuration
from aws_cdk import (
aws_secretsmanager as secretsmanager,
aws_ssm as ssm,
aws_kms as kms
)
class SecureConfigStack(Stack):
def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
super().__init__(scope, construct_id, **kwargs)
# Create KMS key
self.lambda_key = kms.Key(
self, "LambdaKmsKey",
description="KMS key for Lambda environment variables",
enable_key_rotation=True
)
# Create Secrets Manager secret
self.db_credentials = secretsmanager.Secret(
self, "DatabaseCredentials",
description="Database credentials",
generate_secret_string=secretsmanager.SecretStringGenerator(
secret_string_template='{"username": "dbadmin"}',
generate_string_key="password",
exclude_characters=" %+~`#$&*()|[]{}:;<>?!'/\\\"",
password_length=32
)
)
# Create SSM parameter
self.api_config = ssm.StringParameter(
self, "ApiConfig",
parameter_name="/myapp/api/config",
string_value=json.dumps({
"endpoint": "https://api.example.com",
"timeout": 30,
"retry_count": 3
})
)
# Create encrypted SSM parameter
self.api_key = ssm.StringParameter(
self, "ApiKey",
parameter_name="/myapp/api/key",
string_value="your-api-key-here",
type=ssm.ParameterType.SECURE_STRING
)
# Create Lambda function
self.secure_function = _lambda.Function(
self, "SecureFunction",
runtime=_lambda.Runtime.PYTHON_3_9,
handler="index.handler",
code=_lambda.Code.from_asset("lambda_functions/secure_config"),
environment={
# Basic configuration
'ENVIRONMENT': 'production',
'LOG_LEVEL': 'INFO',
# Service configuration
'DB_SECRET_ARN': self.db_credentials.secret_arn,
'API_CONFIG_PARAM': self.api_config.parameter_name,
'API_KEY_PARAM': self.api_key.parameter_name,
# KMS key
'KMS_KEY_ID': self.lambda_key.key_id
},
environment_encryption=self.lambda_key # Encrypt environment variables
)
# Grant permissions
self.db_credentials.grant_read(self.secure_function)
self.api_config.grant_read(self.secure_function)
self.api_key.grant_read(self.secure_function)
self.lambda_key.grant_decrypt(self.secure_function)
7.5.2 Configuration Management Lambda Function
# lambda_functions/secure_config/index.py
import json
import boto3
import os
from typing import Dict, Any
import logging
logger = logging.getLogger()
logger.setLevel(os.getenv('LOG_LEVEL', 'INFO'))
# Initialize AWS clients
secrets_client = boto3.client('secretsmanager')
ssm_client = boto3.client('ssm')
class ConfigManager:
"""Configuration manager"""
def __init__(self):
self._cache = {}
self.ttl = 300 # 5 minute cache
def get_secret(self, secret_arn: str) -> Dict[str, Any]:
"""Get Secrets Manager secret"""
if secret_arn in self._cache:
return self._cache[secret_arn]
try:
response = secrets_client.get_secret_value(SecretId=secret_arn)
secret_value = json.loads(response['SecretString'])
self._cache[secret_arn] = secret_value
return secret_value
except Exception as e:
logger.error(f"Failed to get secret {secret_arn}: {str(e)}")
raise
def get_parameter(self, parameter_name: str, decrypt: bool = True) -> str:
"""Get SSM parameter"""
cache_key = f"{parameter_name}_{decrypt}"
if cache_key in self._cache:
return self._cache[cache_key]
try:
response = ssm_client.get_parameter(
Name=parameter_name,
WithDecryption=decrypt
)
value = response['Parameter']['Value']
self._cache[cache_key] = value
return value
except Exception as e:
logger.error(f"Failed to get parameter {parameter_name}: {str(e)}")
raise
# Global configuration manager
config_manager = ConfigManager()
def handler(event: Dict[str, Any], context) -> Dict[str, Any]:
"""Lambda function using secure configuration"""
try:
# Get database credentials
db_secret_arn = os.environ['DB_SECRET_ARN']
db_credentials = config_manager.get_secret(db_secret_arn)
# Get API configuration
api_config_param = os.environ['API_CONFIG_PARAM']
api_config = json.loads(config_manager.get_parameter(api_config_param, False))
# Get API key
api_key_param = os.environ['API_KEY_PARAM']
api_key = config_manager.get_parameter(api_key_param, True)
# Process business logic
result = process_request(event, db_credentials, api_config, api_key)
return {
'statusCode': 200,
'body': json.dumps({
'success': True,
'result': result
})
}
except Exception as e:
logger.error(f"Error processing request: {str(e)}")
return {
'statusCode': 500,
'body': json.dumps({
'success': False,
'error': 'Internal server error'
})
}
def process_request(event: Dict[str, Any], db_creds: Dict[str, Any],
api_config: Dict[str, Any], api_key: str) -> Dict[str, Any]:
"""Business logic for processing requests"""
# Use configuration for database connection, API calls, etc.
logger.info("Processing request with secure configuration")
return {
'message': 'Request processed successfully',
'database_connected': bool(db_creds.get('username')),
'api_configured': bool(api_config.get('endpoint')),
'api_key_available': bool(api_key)
}
7.6 Lambda Extensions
7.6.1 Custom Extension
# extensions/config_extension/index.py
import json
import os
import requests
import time
from threading import Thread
class ConfigExtension:
"""Configuration extension, periodically updates configuration"""
def __init__(self):
self.lambda_api_url = f"http://{os.environ['AWS_LAMBDA_RUNTIME_API']}"
self.extension_name = "config-extension"
self.config_cache = {}
def register(self):
"""Register extension"""
url = f"{self.lambda_api_url}/2020-01-01/extension/register"
headers = {'Lambda-Extension-Name': self.extension_name}
data = {'events': ['INVOKE', 'SHUTDOWN']}
response = requests.post(url, headers=headers, json=data)
return response.headers.get('Lambda-Extension-Identifier')
def next_event(self, extension_id: str):
"""Get next event"""
url = f"{self.lambda_api_url}/2020-01-01/extension/event/next"
headers = {'Lambda-Extension-Identifier': extension_id}
response = requests.get(url, headers=headers)
return response.json()
def update_config(self):
"""Update configuration cache"""
# Can fetch configuration from external configuration service
self.config_cache = {
'updated_at': time.time(),
'feature_flags': {
'new_feature': True,
'beta_feature': False
},
'api_endpoints': {
'primary': 'https://api.example.com',
'fallback': 'https://api-backup.example.com'
}
}
# Write configuration to shared file
with open('/tmp/config.json', 'w') as f:
json.dump(self.config_cache, f)
def run(self):
"""Run extension"""
extension_id = self.register()
# Start configuration update thread
config_thread = Thread(target=self.config_updater, daemon=True)
config_thread.start()
# Main event loop
while True:
event = self.next_event(extension_id)
if event['eventType'] == 'SHUTDOWN':
break
def config_updater(self):
"""Periodically update configuration"""
while True:
self.update_config()
time.sleep(60) # Update every 60 seconds
if __name__ == '__main__':
extension = ConfigExtension()
extension.run()
7.6.2 Extension Configuration
class ExtensionStack(Stack):
def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
super().__init__(scope, construct_id, **kwargs)
# Create Extension Layer
self.extension_layer = _lambda.LayerVersion(
self, "ConfigExtensionLayer",
code=_lambda.Code.from_asset("extensions/config_extension"),
compatible_runtimes=[_lambda.Runtime.PYTHON_3_9],
description="Configuration extension layer"
)
# Lambda function using Extension
self.extended_function = _lambda.Function(
self, "ExtendedFunction",
runtime=_lambda.Runtime.PYTHON_3_9,
handler="index.handler",
code=_lambda.Code.from_asset("lambda_functions/extended"),
layers=[self.extension_layer],
environment={
'USE_EXTENSION_CONFIG': 'true'
}
)
7.7 Chapter Summary
Key Takeaways
- VPC configuration enables Lambda to securely access private resources
- Layers provide an effective way for code reuse and dependency management
- Comprehensive error handling and retry mechanisms improve system reliability
- Concurrency control and provisioned concurrency optimize performance and cost
- Secure configuration management protects sensitive information
- Extensions expand Lambda’s functional boundaries
In the next chapter, we will learn about Lambda integration with other AWS services, including deep integration with API Gateway, DynamoDB, S3, and other services.