Chapter 7: Advanced Lambda Configuration in CDK
9/1/25About 9 min
Chapter 7: Advanced Lambda Configuration in CDK
Chapter Overview
This chapter will delve into advanced configuration options for Lambda functions, including VPC integration, Layer usage, error handling, concurrency control, reserved concurrency, and other enterprise-level features. These configurations are crucial for building production-grade serverless applications.
Learning Objectives
- Master VPC configuration and network security settings for Lambda functions
- Learn to create and use Lambda Layers
- Understand Lambda's error handling and retry mechanisms
- Master concurrency control and reserved concurrency configuration
- Learn to configure dead letter queues and asynchronous invocation
- Understand Lambda's environment variables and secrets management
7.1 VPC Configuration and Network Security
7.1.1 Basic VPC Configuration
from aws_cdk import (
Stack,
aws_lambda as _lambda,
aws_ec2 as ec2,
aws_rds as rds,
Duration
)
from constructs import Construct
class VpcLambdaStack(Stack):
def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
super().__init__(scope, construct_id, **kwargs)
# Create VPC
self.vpc = ec2.Vpc(
self, "LambdaVpc",
max_azs=2,
cidr="10.0.0.0/16",
subnet_configuration=[
ec2.SubnetConfiguration(
name="Private",
subnet_type=ec2.SubnetType.PRIVATE_WITH_EGRESS,
cidr_mask=24
),
ec2.SubnetConfiguration(
name="Public",
subnet_type=ec2.SubnetType.PUBLIC,
cidr_mask=24
)
]
)
# Create security group
self.lambda_sg = ec2.SecurityGroup(
self, "LambdaSecurityGroup",
vpc=self.vpc,
description="Security group for Lambda functions",
allow_all_outbound=True
)
# Create database security group
self.db_sg = ec2.SecurityGroup(
self, "DatabaseSecurityGroup",
vpc=self.vpc,
description="Security group for RDS database"
)
# Allow Lambda to access database
self.db_sg.add_ingress_rule(
peer=self.lambda_sg,
connection=ec2.Port.tcp(5432), # PostgreSQL port
description="Allow Lambda access to database"
)
# Create Lambda function (in VPC)
self.vpc_lambda = _lambda.Function(
self, "VpcLambdaFunction",
runtime=_lambda.Runtime.PYTHON_3_9,
handler="index.handler",
code=_lambda.Code.from_asset("lambda_functions/vpc_lambda"),
vpc=self.vpc,
vpc_subnets=ec2.SubnetSelection(
subnet_type=ec2.SubnetType.PRIVATE_WITH_EGRESS
),
security_groups=[self.lambda_sg],
timeout=Duration.minutes(5),
function_name="vpc-lambda-function"
)
7.1.2 RDS Integration Configuration
class DatabaseLambdaStack(Stack):
def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
super().__init__(scope, construct_id, **kwargs)
# Use existing VPC or create new VPC
self.vpc = ec2.Vpc.from_lookup(self, "ExistingVpc", is_default=True)
# Create database subnet group
db_subnet_group = rds.SubnetGroup(
self, "DatabaseSubnetGroup",
description="Subnet group for RDS database",
vpc=self.vpc,
vpc_subnets=ec2.SubnetSelection(
subnet_type=ec2.SubnetType.PRIVATE_WITH_EGRESS
)
)
# Create RDS instance
self.database = rds.DatabaseInstance(
self, "PostgresDatabase",
engine=rds.DatabaseInstanceEngine.postgres(
version=rds.PostgresEngineVersion.VER_14_9
),
instance_type=ec2.InstanceType.of(
ec2.InstanceClass.T3,
ec2.InstanceSize.MICRO
),
vpc=self.vpc,
subnet_group=db_subnet_group,
database_name="appdb",
credentials=rds.Credentials.from_generated_secret("dbadmin"),
allocated_storage=20,
storage_encrypted=True,
multi_az=False,
deletion_protection=False
)
# Lambda function configuration
self.db_lambda = _lambda.Function(
self, "DatabaseLambda",
runtime=_lambda.Runtime.PYTHON_3_9,
handler="index.handler",
code=_lambda.Code.from_asset("lambda_functions/database_lambda"),
vpc=self.vpc,
environment={
'DB_SECRET_ARN': self.database.secret.secret_arn,
'DB_ENDPOINT': self.database.instance_endpoint.hostname,
'DB_PORT': str(self.database.instance_endpoint.port),
'DB_NAME': 'appdb'
},
timeout=Duration.minutes(2)
)
# Grant access to database secret
self.database.secret.grant_read(self.db_lambda)
# Allow Lambda to connect to database
self.database.connections.allow_from(
self.db_lambda,
ec2.Port.tcp(5432),
"Lambda database access"
)
7.1.3 VPC Endpoint Configuration
class VpcEndpointStack(Stack):
def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
super().__init__(scope, construct_id, **kwargs)
# Create VPC
self.vpc = ec2.Vpc(self, "VpcWithEndpoints", max_azs=2)
# Create S3 VPC endpoint
s3_endpoint = self.vpc.add_gateway_endpoint(
"S3Endpoint",
service=ec2.GatewayVpcEndpointAwsService.S3,
subnets=[ec2.SubnetSelection(
subnet_type=ec2.SubnetType.PRIVATE_WITH_EGRESS
)]
)
# Create DynamoDB VPC endpoint
dynamodb_endpoint = self.vpc.add_gateway_endpoint(
"DynamoDbEndpoint",
service=ec2.GatewayVpcEndpointAwsService.DYNAMODB,
subnets=[ec2.SubnetSelection(
subnet_type=ec2.SubnetType.PRIVATE_WITH_EGRESS
)]
)
# Create Lambda VPC endpoint (for invoking other Lambda functions)
lambda_endpoint = self.vpc.add_interface_endpoint(
"LambdaEndpoint",
service=ec2.InterfaceVpcEndpointAwsService.LAMBDA_,
subnets=ec2.SubnetSelection(
subnet_type=ec2.SubnetType.PRIVATE_WITH_EGRESS
)
)
# Lambda function using VPC endpoints
self.endpoint_lambda = _lambda.Function(
self, "EndpointLambda",
runtime=_lambda.Runtime.PYTHON_3_9,
handler="index.handler",
code=_lambda.Code.from_asset("lambda_functions/endpoint_lambda"),
vpc=self.vpc,
environment={
'USE_VPC_ENDPOINTS': 'true'
}
)
7.2 Lambda Layers
7.2.1 Creating Shared Library Layer
# layers/python_utils/python/utils.py
import json
import logging
from datetime import datetime
from typing import Dict, Any
def setup_logging(level: str = "INFO") -> logging.Logger:
"""Setup logging configuration"""
logger = logging.getLogger()
logger.setLevel(getattr(logging, level))
return logger
def format_response(status_code: int, body: Dict[str, Any],
headers: Dict[str, str] = None) -> Dict[str, Any]:
"""Format Lambda response"""
default_headers = {
'Content-Type': 'application/json',
'Access-Control-Allow-Origin': '*'
}
if headers:
default_headers.update(headers)
return {
'statusCode': status_code,
'headers': default_headers,
'body': json.dumps(body, default=str)
}
def get_current_timestamp() -> str:
"""Get current timestamp"""
return datetime.utcnow().isoformat()
class DatabaseHelper:
"""Database operation helper class"""
@staticmethod
def build_update_expression(data: Dict[str, Any]) -> tuple:
"""Build DynamoDB update expression"""
update_expression = "SET "
expression_values = {}
for key, value in data.items():
if key not in ['id', 'pk', 'sk']: # Exclude key fields
update_expression += f"{key} = :{key}, "
expression_values[f":{key}"] = value
update_expression = update_expression.rstrip(', ')
return update_expression, expression_values
# stacks/layers_stack.py
from aws_cdk import (
Stack,
aws_lambda as _lambda,
Duration
)
from constructs import Construct
import os
class LayersStack(Stack):
def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
super().__init__(scope, construct_id, **kwargs)
# Create Python utilities layer
self.utils_layer = _lambda.LayerVersion(
self, "PythonUtilsLayer",
code=_lambda.Code.from_asset("layers/python_utils"),
compatible_runtimes=[
_lambda.Runtime.PYTHON_3_8,
_lambda.Runtime.PYTHON_3_9,
_lambda.Runtime.PYTHON_3_10,
_lambda.Runtime.PYTHON_3_11
],
description="Common Python utilities for Lambda functions",
layer_version_name="python-utils-layer"
)
# Create third-party dependencies layer
self.deps_layer = _lambda.LayerVersion(
self, "DependenciesLayer",
code=_lambda.Code.from_asset(
"layers/dependencies",
bundling=_lambda.BundlingOptions(
image=_lambda.Runtime.PYTHON_3_9.bundling_image,
command=[
"bash", "-c",
"pip install -r requirements.txt -t /asset-output/python/"
]
)
),
compatible_runtimes=[_lambda.Runtime.PYTHON_3_9],
description="Third-party dependencies layer"
)
# Lambda function using layers
self.layered_function = _lambda.Function(
self, "LayeredFunction",
runtime=_lambda.Runtime.PYTHON_3_9,
handler="index.handler",
code=_lambda.Code.from_asset("lambda_functions/layered"),
layers=[self.utils_layer, self.deps_layer],
timeout=Duration.seconds(30)
)
7.2.2 Lambda Function Using Layer
# lambda_functions/layered/index.py
import json
from utils import setup_logging, format_response, get_current_timestamp, DatabaseHelper
# Use tools from layer
logger = setup_logging("INFO")
def handler(event, context):
"""Lambda function using layer tools"""
logger.info(f"Processing event: {json.dumps(event)}")
try:
# Use utility functions from layer
timestamp = get_current_timestamp()
# Simulate data processing
data = {
'name': 'John Doe',
'email': 'john@example.com',
'updated_at': timestamp
}
# Use DatabaseHelper
update_expr, expr_values = DatabaseHelper.build_update_expression(data)
result = {
'message': 'Data processed successfully',
'timestamp': timestamp,
'update_expression': update_expr,
'expression_values': expr_values
}
# Use response formatting function
return format_response(200, result)
except Exception as e:
logger.error(f"Error processing request: {str(e)}")
return format_response(500, {'error': str(e)})
7.2.3 Cross-Account Layer Sharing
from aws_cdk import aws_iam as iam
class SharedLayersStack(Stack):
def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
super().__init__(scope, construct_id, **kwargs)
# Create shareable layer
self.shared_layer = _lambda.LayerVersion(
self, "SharedUtilsLayer",
code=_lambda.Code.from_asset("layers/shared_utils"),
compatible_runtimes=[_lambda.Runtime.PYTHON_3_9],
description="Shared utilities across accounts"
)
# Add resource policy to allow other accounts to use
self.shared_layer.add_permission(
"CrossAccountAccess",
principal=iam.AccountPrincipal("123456789012"), # Target account ID
action="lambda:GetLayerVersion"
)
# Or allow all accounts in organization
self.shared_layer.add_permission(
"OrganizationAccess",
principal=iam.OrganizationPrincipal("o-example12345"),
action="lambda:GetLayerVersion"
)
7.3 Error Handling and Retry Mechanisms
7.3.1 Synchronous Invocation Error Handling
# lambda_functions/error_handling/index.py
import json
import logging
import traceback
from typing import Dict, Any
logger = logging.getLogger()
logger.setLevel(logging.INFO)
class LambdaError(Exception):
"""Custom Lambda exception"""
def __init__(self, message: str, error_code: str = "GENERAL_ERROR"):
self.message = message
self.error_code = error_code
super().__init__(self.message)
def handler(event: Dict[str, Any], context) -> Dict[str, Any]:
"""Lambda function with comprehensive error handling"""
# Log request information
logger.info(f"Request ID: {context.aws_request_id}")
logger.info(f"Function Name: {context.function_name}")
logger.info(f"Remaining Time: {context.get_remaining_time_in_millis()}ms")
try:
# Validate input
if not event.get('action'):
raise LambdaError("Missing required field: action", "VALIDATION_ERROR")
action = event['action']
# Execute different operations based on action
if action == 'process_data':
result = process_data(event.get('data', {}))
elif action == 'validate_input':
result = validate_input(event.get('input', {}))
else:
raise LambdaError(f"Unknown action: {action}", "INVALID_ACTION")
return {
'statusCode': 200,
'body': json.dumps({
'success': True,
'result': result,
'request_id': context.aws_request_id
})
}
except LambdaError as e:
logger.error(f"Business logic error: {e.message}")
return {
'statusCode': 400,
'body': json.dumps({
'success': False,
'error': e.message,
'error_code': e.error_code,
'request_id': context.aws_request_id
})
}
except Exception as e:
logger.error(f"Unexpected error: {str(e)}")
logger.error(traceback.format_exc())
return {
'statusCode': 500,
'body': json.dumps({
'success': False,
'error': 'Internal server error',
'error_code': 'INTERNAL_ERROR',
'request_id': context.aws_request_id
})
}
def process_data(data: Dict[str, Any]) -> Dict[str, Any]:
"""Data processing function"""
if not data:
raise LambdaError("No data provided", "NO_DATA")
# Simulate data processing
processed = {
'original': data,
'processed_at': '2023-01-01T00:00:00Z',
'status': 'processed'
}
return processed
def validate_input(input_data: Dict[str, Any]) -> Dict[str, Any]:
"""Input validation function"""
required_fields = ['name', 'email']
missing_fields = [field for field in required_fields if field not in input_data]
if missing_fields:
raise LambdaError(
f"Missing required fields: {', '.join(missing_fields)}",
"MISSING_FIELDS"
)
return {'valid': True, 'message': 'Input validation passed'}
7.3.2 Asynchronous Invocation and Retry Configuration
from aws_cdk import (
aws_lambda as _lambda,
aws_sqs as sqs,
aws_lambda_event_sources as lambda_event_sources
)
class AsyncLambdaStack(Stack):
def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
super().__init__(scope, construct_id, **kwargs)
# Create dead letter queue
self.dlq = sqs.Queue(
self, "AsyncLambdaDLQ",
queue_name="async-lambda-dlq",
retention_period=Duration.days(14)
)
# Create retry queue
self.retry_queue = sqs.Queue(
self, "AsyncLambdaRetryQueue",
queue_name="async-lambda-retry",
visibility_timeout=Duration.minutes(5),
dead_letter_queue=sqs.DeadLetterQueue(
max_receive_count=3,
queue=self.dlq
)
)
# Create asynchronous Lambda function
self.async_function = _lambda.Function(
self, "AsyncFunction",
runtime=_lambda.Runtime.PYTHON_3_9,
handler="index.handler",
code=_lambda.Code.from_asset("lambda_functions/async_handler"),
timeout=Duration.minutes(5),
retry_attempts=2, # Set retry count
dead_letter_queue=self.dlq, # Set dead letter queue
environment={
'RETRY_QUEUE_URL': self.retry_queue.queue_url
}
)
# Add Lambda trigger for SQS queue
self.async_function.add_event_source(
lambda_event_sources.SqsEventSource(
self.retry_queue,
batch_size=10,
max_batching_window=Duration.seconds(5)
)
)
# Grant permissions to access queues
self.retry_queue.grant_send_messages(self.async_function)
self.dlq.grant_send_messages(self.async_function)
7.3.3 Circuit Breaker Pattern Implementation
# lambda_functions/circuit_breaker/index.py
import json
import time
from typing import Dict, Any, Optional
import boto3
class CircuitBreaker:
"""Circuit breaker implementation"""
def __init__(self, failure_threshold: int = 5, timeout: int = 60):
self.failure_threshold = failure_threshold
self.timeout = timeout
self.failure_count = 0
self.last_failure_time: Optional[float] = None
self.state = "CLOSED" # CLOSED, OPEN, HALF_OPEN
def call(self, func, *args, **kwargs):
"""Call protected function"""
if self.state == "OPEN":
if time.time() - self.last_failure_time > self.timeout:
self.state = "HALF_OPEN"
else:
raise Exception("Circuit breaker is OPEN")
try:
result = func(*args, **kwargs)
self.on_success()
return result
except Exception as e:
self.on_failure()
raise e
def on_success(self):
"""Handle successful call"""
self.failure_count = 0
self.state = "CLOSED"
def on_failure(self):
"""Handle failed call"""
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = "OPEN"
# Global circuit breaker instance
external_api_breaker = CircuitBreaker(failure_threshold=3, timeout=30)
def handler(event: Dict[str, Any], context) -> Dict[str, Any]:
"""Lambda function using circuit breaker"""
try:
# Use circuit breaker to protect external API calls
result = external_api_breaker.call(call_external_api, event.get('data'))
return {
'statusCode': 200,
'body': json.dumps({
'success': True,
'result': result,
'circuit_breaker_state': external_api_breaker.state
})
}
except Exception as e:
return {
'statusCode': 503,
'body': json.dumps({
'success': False,
'error': str(e),
'circuit_breaker_state': external_api_breaker.state
})
}
def call_external_api(data: Dict[str, Any]) -> Dict[str, Any]:
"""Simulate external API call"""
import requests
# Simulate potentially failing external API call
response = requests.get('https://httpbin.org/json', timeout=5)
response.raise_for_status()
return response.json()
7.4 Concurrency Control and Reserved Concurrency
7.4.1 Concurrency Limit Configuration
class ConcurrencyControlStack(Stack):
def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
super().__init__(scope, construct_id, **kwargs)
# High priority function - reserved concurrency
self.high_priority_function = _lambda.Function(
self, "HighPriorityFunction",
runtime=_lambda.Runtime.PYTHON_3_9,
handler="index.handler",
code=_lambda.Code.from_asset("lambda_functions/high_priority"),
reserved_concurrent_executions=50, # Reserve 50 concurrent executions
timeout=Duration.seconds(30)
)
# Standard function - limited concurrency
self.standard_function = _lambda.Function(
self, "StandardFunction",
runtime=_lambda.Runtime.PYTHON_3_9,
handler="index.handler",
code=_lambda.Code.from_asset("lambda_functions/standard"),
reserved_concurrent_executions=10, # Limit to 10 concurrent executions
timeout=Duration.seconds(60)
)
# Batch function - no concurrency limit
self.batch_function = _lambda.Function(
self, "BatchFunction",
runtime=_lambda.Runtime.PYTHON_3_9,
handler="index.handler",
code=_lambda.Code.from_asset("lambda_functions/batch"),
# No concurrency limit, uses remaining account-level concurrency
timeout=Duration.minutes(15)
)
7.4.2 Provisioned Concurrency Configuration
class ProvisionedConcurrencyStack(Stack):
def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
super().__init__(scope, construct_id, **kwargs)
# Create Lambda function
self.warm_function = _lambda.Function(
self, "WarmFunction",
runtime=_lambda.Runtime.PYTHON_3_9,
handler="index.handler",
code=_lambda.Code.from_asset("lambda_functions/warm_function"),
timeout=Duration.seconds(30)
)
# Create version
version = self.warm_function.current_version
# Create alias
self.prod_alias = _lambda.Alias(
self, "ProdAlias",
alias_name="PROD",
version=version,
provisioned_concurrency_config=_lambda.ProvisionedConcurrencyConfig(
provisioned_concurrent_executions=10
)
)
# Create Application Auto Scaling target
from aws_cdk import aws_applicationautoscaling as appscaling
scalable_target = appscaling.ScalableTarget(
self, "ScalableTarget",
service_namespace=appscaling.ServiceNamespace.LAMBDA,
scalable_dimension="lambda:function:ProvisionedConcurrency",
resource_id=f"function:{self.warm_function.function_name}:PROD",
min_capacity=5,
max_capacity=50
)
# Configure auto scaling policy
scalable_target.scale_to_track_metric(
"TargetTracking",
target_value=0.7,
metric=appscaling.predefined_metric_specification(
appscaling.PredefinedMetric.LAMBDA_PROVISIONED_CONCURRENCY_UTILIZATION
)
)
7.5 Environment Variables and Secrets Management
7.5.1 Secure Environment Variable Configuration
from aws_cdk import (
aws_secretsmanager as secretsmanager,
aws_ssm as ssm,
aws_kms as kms
)
class SecureConfigStack(Stack):
def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
super().__init__(scope, construct_id, **kwargs)
# Create KMS key
self.lambda_key = kms.Key(
self, "LambdaKmsKey",
description="KMS key for Lambda environment variables",
enable_key_rotation=True
)
# Create Secrets Manager secret
self.db_credentials = secretsmanager.Secret(
self, "DatabaseCredentials",
description="Database credentials",
generate_secret_string=secretsmanager.SecretStringGenerator(
secret_string_template='{"username": "dbadmin"}',
generate_string_key="password",
exclude_characters=" %+~`#$&*()|[]{}:;<>?!'/\\\"",
password_length=32
)
)
# Create SSM parameter
self.api_config = ssm.StringParameter(
self, "ApiConfig",
parameter_name="/myapp/api/config",
string_value=json.dumps({
"endpoint": "https://api.example.com",
"timeout": 30,
"retry_count": 3
})
)
# Create encrypted SSM parameter
self.api_key = ssm.StringParameter(
self, "ApiKey",
parameter_name="/myapp/api/key",
string_value="your-api-key-here",
type=ssm.ParameterType.SECURE_STRING
)
# Create Lambda function
self.secure_function = _lambda.Function(
self, "SecureFunction",
runtime=_lambda.Runtime.PYTHON_3_9,
handler="index.handler",
code=_lambda.Code.from_asset("lambda_functions/secure_config"),
environment={
# Basic configuration
'ENVIRONMENT': 'production',
'LOG_LEVEL': 'INFO',
# Service configuration
'DB_SECRET_ARN': self.db_credentials.secret_arn,
'API_CONFIG_PARAM': self.api_config.parameter_name,
'API_KEY_PARAM': self.api_key.parameter_name,
# KMS key
'KMS_KEY_ID': self.lambda_key.key_id
},
environment_encryption=self.lambda_key # Encrypt environment variables
)
# Grant permissions
self.db_credentials.grant_read(self.secure_function)
self.api_config.grant_read(self.secure_function)
self.api_key.grant_read(self.secure_function)
self.lambda_key.grant_decrypt(self.secure_function)
7.5.2 Configuration Management Lambda Function
# lambda_functions/secure_config/index.py
import json
import boto3
import os
from typing import Dict, Any
import logging
logger = logging.getLogger()
logger.setLevel(os.getenv('LOG_LEVEL', 'INFO'))
# Initialize AWS clients
secrets_client = boto3.client('secretsmanager')
ssm_client = boto3.client('ssm')
class ConfigManager:
"""Configuration manager"""
def __init__(self):
self._cache = {}
self.ttl = 300 # 5 minute cache
def get_secret(self, secret_arn: str) -> Dict[str, Any]:
"""Get Secrets Manager secret"""
if secret_arn in self._cache:
return self._cache[secret_arn]
try:
response = secrets_client.get_secret_value(SecretId=secret_arn)
secret_value = json.loads(response['SecretString'])
self._cache[secret_arn] = secret_value
return secret_value
except Exception as e:
logger.error(f"Failed to get secret {secret_arn}: {str(e)}")
raise
def get_parameter(self, parameter_name: str, decrypt: bool = True) -> str:
"""Get SSM parameter"""
cache_key = f"{parameter_name}_{decrypt}"
if cache_key in self._cache:
return self._cache[cache_key]
try:
response = ssm_client.get_parameter(
Name=parameter_name,
WithDecryption=decrypt
)
value = response['Parameter']['Value']
self._cache[cache_key] = value
return value
except Exception as e:
logger.error(f"Failed to get parameter {parameter_name}: {str(e)}")
raise
# Global configuration manager
config_manager = ConfigManager()
def handler(event: Dict[str, Any], context) -> Dict[str, Any]:
"""Lambda function using secure configuration"""
try:
# Get database credentials
db_secret_arn = os.environ['DB_SECRET_ARN']
db_credentials = config_manager.get_secret(db_secret_arn)
# Get API configuration
api_config_param = os.environ['API_CONFIG_PARAM']
api_config = json.loads(config_manager.get_parameter(api_config_param, False))
# Get API key
api_key_param = os.environ['API_KEY_PARAM']
api_key = config_manager.get_parameter(api_key_param, True)
# Process business logic
result = process_request(event, db_credentials, api_config, api_key)
return {
'statusCode': 200,
'body': json.dumps({
'success': True,
'result': result
})
}
except Exception as e:
logger.error(f"Error processing request: {str(e)}")
return {
'statusCode': 500,
'body': json.dumps({
'success': False,
'error': 'Internal server error'
})
}
def process_request(event: Dict[str, Any], db_creds: Dict[str, Any],
api_config: Dict[str, Any], api_key: str) -> Dict[str, Any]:
"""Process request business logic"""
# Use configuration for database connection, API calls, etc.
logger.info("Processing request with secure configuration")
return {
'message': 'Request processed successfully',
'database_connected': bool(db_creds.get('username')),
'api_configured': bool(api_config.get('endpoint')),
'api_key_available': bool(api_key)
}
7.6 Lambda Extensions
7.6.1 Custom Extension
# extensions/config_extension/index.py
import json
import os
import requests
import time
from threading import Thread
class ConfigExtension:
"""Configuration extension for periodic config updates"""
def __init__(self):
self.lambda_api_url = f"http://{os.environ['AWS_LAMBDA_RUNTIME_API']}"
self.extension_name = "config-extension"
self.config_cache = {}
def register(self):
"""Register extension"""
url = f"{self.lambda_api_url}/2020-01-01/extension/register"
headers = {'Lambda-Extension-Name': self.extension_name}
data = {'events': ['INVOKE', 'SHUTDOWN']}
response = requests.post(url, headers=headers, json=data)
return response.headers.get('Lambda-Extension-Identifier')
def next_event(self, extension_id: str):
"""Get next event"""
url = f"{self.lambda_api_url}/2020-01-01/extension/event/next"
headers = {'Lambda-Extension-Identifier': extension_id}
response = requests.get(url, headers=headers)
return response.json()
def update_config(self):
"""Update configuration cache"""
# Here you can fetch configuration from external configuration service
self.config_cache = {
'updated_at': time.time(),
'feature_flags': {
'new_feature': True,
'beta_feature': False
},
'api_endpoints': {
'primary': 'https://api.example.com',
'fallback': 'https://api-backup.example.com'
}
}
# Write configuration to shared file
with open('/tmp/config.json', 'w') as f:
json.dump(self.config_cache, f)
def run(self):
"""Run extension"""
extension_id = self.register()
# Start configuration update thread
config_thread = Thread(target=self.config_updater, daemon=True)
config_thread.start()
# Main event loop
while True:
event = self.next_event(extension_id)
if event['eventType'] == 'SHUTDOWN':
break
def config_updater(self):
"""Periodically update configuration"""
while True:
self.update_config()
time.sleep(60) # Update every 60 seconds
if __name__ == '__main__':
extension = ConfigExtension()
extension.run()
7.6.2 Extension Configuration
class ExtensionStack(Stack):
def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
super().__init__(scope, construct_id, **kwargs)
# Create Extension Layer
self.extension_layer = _lambda.LayerVersion(
self, "ConfigExtensionLayer",
code=_lambda.Code.from_asset("extensions/config_extension"),
compatible_runtimes=[_lambda.Runtime.PYTHON_3_9],
description="Configuration extension layer"
)
# Lambda function using Extension
self.extended_function = _lambda.Function(
self, "ExtendedFunction",
runtime=_lambda.Runtime.PYTHON_3_9,
handler="index.handler",
code=_lambda.Code.from_asset("lambda_functions/extended"),
layers=[self.extension_layer],
environment={
'USE_EXTENSION_CONFIG': 'true'
}
)
7.7 Chapter Summary
Key Points
- VPC configuration enables Lambda to securely access private resources
- Layers provide an effective way for code reuse and dependency management
- Comprehensive error handling and retry mechanisms improve system reliability
- Concurrency control and provisioned concurrency optimize performance and cost
- Secure configuration management protects sensitive information
- Extensions expand Lambda's functional boundaries
In the next chapter, we will learn about Lambda integration with other AWS services, including deep integration with API Gateway, DynamoDB, S3, and other services.