Chapter 7: Advanced Lambda Configuration with CDK

Haiyue
35min

Chapter 7: Advanced Lambda Configuration with CDK

Chapter Overview

This chapter explores advanced Lambda function configuration options, including VPC integration, Layer usage, error handling, concurrency control, reserved concurrency, and other enterprise-grade features. These configurations are crucial for building production-grade serverless applications.

Learning Objectives

  1. Master Lambda function VPC configuration and network security settings
  2. Learn to create and use Lambda Layers
  3. Understand Lambda error handling and retry mechanisms
  4. Master concurrency control and reserved concurrency configuration
  5. Learn to configure dead letter queues and asynchronous invocation
  6. Understand Lambda environment variables and secrets management

7.1 VPC Configuration and Network Security

7.1.1 Basic VPC Configuration

from aws_cdk import (
    Stack,
    aws_lambda as _lambda,
    aws_ec2 as ec2,
    aws_rds as rds,
    Duration
)
from constructs import Construct

class VpcLambdaStack(Stack):
    def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
        super().__init__(scope, construct_id, **kwargs)

        # Create VPC
        self.vpc = ec2.Vpc(
            self, "LambdaVpc",
            max_azs=2,
            cidr="10.0.0.0/16",
            subnet_configuration=[
                ec2.SubnetConfiguration(
                    name="Private",
                    subnet_type=ec2.SubnetType.PRIVATE_WITH_EGRESS,
                    cidr_mask=24
                ),
                ec2.SubnetConfiguration(
                    name="Public",
                    subnet_type=ec2.SubnetType.PUBLIC,
                    cidr_mask=24
                )
            ]
        )

        # Create security group
        self.lambda_sg = ec2.SecurityGroup(
            self, "LambdaSecurityGroup",
            vpc=self.vpc,
            description="Security group for Lambda functions",
            allow_all_outbound=True
        )

        # Create database security group
        self.db_sg = ec2.SecurityGroup(
            self, "DatabaseSecurityGroup",
            vpc=self.vpc,
            description="Security group for RDS database"
        )

        # Allow Lambda to access database
        self.db_sg.add_ingress_rule(
            peer=self.lambda_sg,
            connection=ec2.Port.tcp(5432),  # PostgreSQL port
            description="Allow Lambda access to database"
        )

        # Create Lambda function (in VPC)
        self.vpc_lambda = _lambda.Function(
            self, "VpcLambdaFunction",
            runtime=_lambda.Runtime.PYTHON_3_9,
            handler="index.handler",
            code=_lambda.Code.from_asset("lambda_functions/vpc_lambda"),
            vpc=self.vpc,
            vpc_subnets=ec2.SubnetSelection(
                subnet_type=ec2.SubnetType.PRIVATE_WITH_EGRESS
            ),
            security_groups=[self.lambda_sg],
            timeout=Duration.minutes(5),
            function_name="vpc-lambda-function"
        )

7.1.2 RDS Integration Configuration

class DatabaseLambdaStack(Stack):
    def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
        super().__init__(scope, construct_id, **kwargs)

        # Use existing VPC or create new VPC
        self.vpc = ec2.Vpc.from_lookup(self, "ExistingVpc", is_default=True)

        # Create database subnet group
        db_subnet_group = rds.SubnetGroup(
            self, "DatabaseSubnetGroup",
            description="Subnet group for RDS database",
            vpc=self.vpc,
            vpc_subnets=ec2.SubnetSelection(
                subnet_type=ec2.SubnetType.PRIVATE_WITH_EGRESS
            )
        )

        # Create RDS instance
        self.database = rds.DatabaseInstance(
            self, "PostgresDatabase",
            engine=rds.DatabaseInstanceEngine.postgres(
                version=rds.PostgresEngineVersion.VER_14_9
            ),
            instance_type=ec2.InstanceType.of(
                ec2.InstanceClass.T3,
                ec2.InstanceSize.MICRO
            ),
            vpc=self.vpc,
            subnet_group=db_subnet_group,
            database_name="appdb",
            credentials=rds.Credentials.from_generated_secret("dbadmin"),
            allocated_storage=20,
            storage_encrypted=True,
            multi_az=False,
            deletion_protection=False
        )

        # Lambda function configuration
        self.db_lambda = _lambda.Function(
            self, "DatabaseLambda",
            runtime=_lambda.Runtime.PYTHON_3_9,
            handler="index.handler",
            code=_lambda.Code.from_asset("lambda_functions/database_lambda"),
            vpc=self.vpc,
            environment={
                'DB_SECRET_ARN': self.database.secret.secret_arn,
                'DB_ENDPOINT': self.database.instance_endpoint.hostname,
                'DB_PORT': str(self.database.instance_endpoint.port),
                'DB_NAME': 'appdb'
            },
            timeout=Duration.minutes(2)
        )

        # Grant access to database secret
        self.database.secret.grant_read(self.db_lambda)

        # Allow Lambda to connect to database
        self.database.connections.allow_from(
            self.db_lambda,
            ec2.Port.tcp(5432),
            "Lambda database access"
        )

7.1.3 VPC Endpoint Configuration

class VpcEndpointStack(Stack):
    def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
        super().__init__(scope, construct_id, **kwargs)

        # Create VPC
        self.vpc = ec2.Vpc(self, "VpcWithEndpoints", max_azs=2)

        # Create S3 VPC endpoint
        s3_endpoint = self.vpc.add_gateway_endpoint(
            "S3Endpoint",
            service=ec2.GatewayVpcEndpointAwsService.S3,
            subnets=[ec2.SubnetSelection(
                subnet_type=ec2.SubnetType.PRIVATE_WITH_EGRESS
            )]
        )

        # Create DynamoDB VPC endpoint
        dynamodb_endpoint = self.vpc.add_gateway_endpoint(
            "DynamoDbEndpoint",
            service=ec2.GatewayVpcEndpointAwsService.DYNAMODB,
            subnets=[ec2.SubnetSelection(
                subnet_type=ec2.SubnetType.PRIVATE_WITH_EGRESS
            )]
        )

        # Create Lambda VPC endpoint (for invoking other Lambda functions)
        lambda_endpoint = self.vpc.add_interface_endpoint(
            "LambdaEndpoint",
            service=ec2.InterfaceVpcEndpointAwsService.LAMBDA_,
            subnets=ec2.SubnetSelection(
                subnet_type=ec2.SubnetType.PRIVATE_WITH_EGRESS
            )
        )

        # Lambda function using VPC endpoints
        self.endpoint_lambda = _lambda.Function(
            self, "EndpointLambda",
            runtime=_lambda.Runtime.PYTHON_3_9,
            handler="index.handler",
            code=_lambda.Code.from_asset("lambda_functions/endpoint_lambda"),
            vpc=self.vpc,
            environment={
                'USE_VPC_ENDPOINTS': 'true'
            }
        )

7.2 Lambda Layers

7.2.1 Creating Shared Library Layer

# layers/python_utils/python/utils.py
import json
import logging
from datetime import datetime
from typing import Dict, Any

def setup_logging(level: str = "INFO") -> logging.Logger:
    """Setup logging configuration"""
    logger = logging.getLogger()
    logger.setLevel(getattr(logging, level))
    return logger

def format_response(status_code: int, body: Dict[str, Any],
                   headers: Dict[str, str] = None) -> Dict[str, Any]:
    """Format Lambda response"""
    default_headers = {
        'Content-Type': 'application/json',
        'Access-Control-Allow-Origin': '*'
    }

    if headers:
        default_headers.update(headers)

    return {
        'statusCode': status_code,
        'headers': default_headers,
        'body': json.dumps(body, default=str)
    }

def get_current_timestamp() -> str:
    """Get current timestamp"""
    return datetime.utcnow().isoformat()

class DatabaseHelper:
    """Database operation helper class"""

    @staticmethod
    def build_update_expression(data: Dict[str, Any]) -> tuple:
        """Build DynamoDB update expression"""
        update_expression = "SET "
        expression_values = {}

        for key, value in data.items():
            if key not in ['id', 'pk', 'sk']:  # Exclude key fields
                update_expression += f"{key} = :{key}, "
                expression_values[f":{key}"] = value

        update_expression = update_expression.rstrip(', ')
        return update_expression, expression_values
# stacks/layers_stack.py
from aws_cdk import (
    Stack,
    aws_lambda as _lambda,
    Duration
)
from constructs import Construct
import os

class LayersStack(Stack):
    def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
        super().__init__(scope, construct_id, **kwargs)

        # Create Python utilities layer
        self.utils_layer = _lambda.LayerVersion(
            self, "PythonUtilsLayer",
            code=_lambda.Code.from_asset("layers/python_utils"),
            compatible_runtimes=[
                _lambda.Runtime.PYTHON_3_8,
                _lambda.Runtime.PYTHON_3_9,
                _lambda.Runtime.PYTHON_3_10,
                _lambda.Runtime.PYTHON_3_11
            ],
            description="Common Python utilities for Lambda functions",
            layer_version_name="python-utils-layer"
        )

        # Create third-party dependencies layer
        self.deps_layer = _lambda.LayerVersion(
            self, "DependenciesLayer",
            code=_lambda.Code.from_asset(
                "layers/dependencies",
                bundling=_lambda.BundlingOptions(
                    image=_lambda.Runtime.PYTHON_3_9.bundling_image,
                    command=[
                        "bash", "-c",
                        "pip install -r requirements.txt -t /asset-output/python/"
                    ]
                )
            ),
            compatible_runtimes=[_lambda.Runtime.PYTHON_3_9],
            description="Third-party dependencies layer"
        )

        # Lambda function using layers
        self.layered_function = _lambda.Function(
            self, "LayeredFunction",
            runtime=_lambda.Runtime.PYTHON_3_9,
            handler="index.handler",
            code=_lambda.Code.from_asset("lambda_functions/layered"),
            layers=[self.utils_layer, self.deps_layer],
            timeout=Duration.seconds(30)
        )

7.2.2 Using Layer in Lambda Function

# lambda_functions/layered/index.py
import json
from utils import setup_logging, format_response, get_current_timestamp, DatabaseHelper

# Use tools from layer
logger = setup_logging("INFO")

def handler(event, context):
    """Lambda function using layer tools"""
    logger.info(f"Processing event: {json.dumps(event)}")

    try:
        # Use utility functions from layer
        timestamp = get_current_timestamp()

        # Simulate data processing
        data = {
            'name': 'John Doe',
            'email': 'john@example.com',
            'updated_at': timestamp
        }

        # Use DatabaseHelper
        update_expr, expr_values = DatabaseHelper.build_update_expression(data)

        result = {
            'message': 'Data processed successfully',
            'timestamp': timestamp,
            'update_expression': update_expr,
            'expression_values': expr_values
        }

        # Use format response function
        return format_response(200, result)

    except Exception as e:
        logger.error(f"Error processing request: {str(e)}")
        return format_response(500, {'error': str(e)})

7.2.3 Cross-Account Layer Sharing

from aws_cdk import aws_iam as iam

class SharedLayersStack(Stack):
    def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
        super().__init__(scope, construct_id, **kwargs)

        # Create shareable layer
        self.shared_layer = _lambda.LayerVersion(
            self, "SharedUtilsLayer",
            code=_lambda.Code.from_asset("layers/shared_utils"),
            compatible_runtimes=[_lambda.Runtime.PYTHON_3_9],
            description="Shared utilities across accounts"
        )

        # Add resource policy to allow other accounts
        self.shared_layer.add_permission(
            "CrossAccountAccess",
            principal=iam.AccountPrincipal("123456789012"),  # Target account ID
            action="lambda:GetLayerVersion"
        )

        # Or allow all accounts in organization
        self.shared_layer.add_permission(
            "OrganizationAccess",
            principal=iam.OrganizationPrincipal("o-example12345"),
            action="lambda:GetLayerVersion"
        )

7.3 Error Handling and Retry Mechanisms

7.3.1 Synchronous Invocation Error Handling

# lambda_functions/error_handling/index.py
import json
import logging
import traceback
from typing import Dict, Any

logger = logging.getLogger()
logger.setLevel(logging.INFO)

class LambdaError(Exception):
    """Custom Lambda exception"""
    def __init__(self, message: str, error_code: str = "GENERAL_ERROR"):
        self.message = message
        self.error_code = error_code
        super().__init__(self.message)

def handler(event: Dict[str, Any], context) -> Dict[str, Any]:
    """Lambda function with complete error handling"""

    # Log request information
    logger.info(f"Request ID: {context.aws_request_id}")
    logger.info(f"Function Name: {context.function_name}")
    logger.info(f"Remaining Time: {context.get_remaining_time_in_millis()}ms")

    try:
        # Validate input
        if not event.get('action'):
            raise LambdaError("Missing required field: action", "VALIDATION_ERROR")

        action = event['action']

        # Execute different operations based on action
        if action == 'process_data':
            result = process_data(event.get('data', {}))
        elif action == 'validate_input':
            result = validate_input(event.get('input', {}))
        else:
            raise LambdaError(f"Unknown action: {action}", "INVALID_ACTION")

        return {
            'statusCode': 200,
            'body': json.dumps({
                'success': True,
                'result': result,
                'request_id': context.aws_request_id
            })
        }

    except LambdaError as e:
        logger.error(f"Business logic error: {e.message}")
        return {
            'statusCode': 400,
            'body': json.dumps({
                'success': False,
                'error': e.message,
                'error_code': e.error_code,
                'request_id': context.aws_request_id
            })
        }

    except Exception as e:
        logger.error(f"Unexpected error: {str(e)}")
        logger.error(traceback.format_exc())

        return {
            'statusCode': 500,
            'body': json.dumps({
                'success': False,
                'error': 'Internal server error',
                'error_code': 'INTERNAL_ERROR',
                'request_id': context.aws_request_id
            })
        }

def process_data(data: Dict[str, Any]) -> Dict[str, Any]:
    """Data processing function"""
    if not data:
        raise LambdaError("No data provided", "NO_DATA")

    # Simulate data processing
    processed = {
        'original': data,
        'processed_at': '2023-01-01T00:00:00Z',
        'status': 'processed'
    }

    return processed

def validate_input(input_data: Dict[str, Any]) -> Dict[str, Any]:
    """Input validation function"""
    required_fields = ['name', 'email']
    missing_fields = [field for field in required_fields if field not in input_data]

    if missing_fields:
        raise LambdaError(
            f"Missing required fields: {', '.join(missing_fields)}",
            "MISSING_FIELDS"
        )

    return {'valid': True, 'message': 'Input validation passed'}

7.3.2 Asynchronous Invocation and Retry Configuration

from aws_cdk import (
    aws_lambda as _lambda,
    aws_sqs as sqs,
    aws_lambda_event_sources as lambda_event_sources
)

class AsyncLambdaStack(Stack):
    def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
        super().__init__(scope, construct_id, **kwargs)

        # Create dead letter queue
        self.dlq = sqs.Queue(
            self, "AsyncLambdaDLQ",
            queue_name="async-lambda-dlq",
            retention_period=Duration.days(14)
        )

        # Create retry queue
        self.retry_queue = sqs.Queue(
            self, "AsyncLambdaRetryQueue",
            queue_name="async-lambda-retry",
            visibility_timeout=Duration.minutes(5),
            dead_letter_queue=sqs.DeadLetterQueue(
                max_receive_count=3,
                queue=self.dlq
            )
        )

        # Create asynchronous Lambda function
        self.async_function = _lambda.Function(
            self, "AsyncFunction",
            runtime=_lambda.Runtime.PYTHON_3_9,
            handler="index.handler",
            code=_lambda.Code.from_asset("lambda_functions/async_handler"),
            timeout=Duration.minutes(5),
            retry_attempts=2,  # Set retry attempts
            dead_letter_queue=self.dlq,  # Set dead letter queue
            environment={
                'RETRY_QUEUE_URL': self.retry_queue.queue_url
            }
        )

        # Add Lambda trigger for SQS queue
        self.async_function.add_event_source(
            lambda_event_sources.SqsEventSource(
                self.retry_queue,
                batch_size=10,
                max_batching_window=Duration.seconds(5)
            )
        )

        # Grant queue access permissions
        self.retry_queue.grant_send_messages(self.async_function)
        self.dlq.grant_send_messages(self.async_function)

7.3.3 Circuit Breaker Pattern Implementation

# lambda_functions/circuit_breaker/index.py
import json
import time
from typing import Dict, Any, Optional
import boto3

class CircuitBreaker:
    """Circuit breaker implementation"""

    def __init__(self, failure_threshold: int = 5, timeout: int = 60):
        self.failure_threshold = failure_threshold
        self.timeout = timeout
        self.failure_count = 0
        self.last_failure_time: Optional[float] = None
        self.state = "CLOSED"  # CLOSED, OPEN, HALF_OPEN

    def call(self, func, *args, **kwargs):
        """Call protected function"""
        if self.state == "OPEN":
            if time.time() - self.last_failure_time > self.timeout:
                self.state = "HALF_OPEN"
            else:
                raise Exception("Circuit breaker is OPEN")

        try:
            result = func(*args, **kwargs)
            self.on_success()
            return result
        except Exception as e:
            self.on_failure()
            raise e

    def on_success(self):
        """Handle successful call"""
        self.failure_count = 0
        self.state = "CLOSED"

    def on_failure(self):
        """Handle failed call"""
        self.failure_count += 1
        self.last_failure_time = time.time()

        if self.failure_count >= self.failure_threshold:
            self.state = "OPEN"

# Global circuit breaker instance
external_api_breaker = CircuitBreaker(failure_threshold=3, timeout=30)

def handler(event: Dict[str, Any], context) -> Dict[str, Any]:
    """Lambda function using circuit breaker"""

    try:
        # Use circuit breaker to protect external API call
        result = external_api_breaker.call(call_external_api, event.get('data'))

        return {
            'statusCode': 200,
            'body': json.dumps({
                'success': True,
                'result': result,
                'circuit_breaker_state': external_api_breaker.state
            })
        }

    except Exception as e:
        return {
            'statusCode': 503,
            'body': json.dumps({
                'success': False,
                'error': str(e),
                'circuit_breaker_state': external_api_breaker.state
            })
        }

def call_external_api(data: Dict[str, Any]) -> Dict[str, Any]:
    """Simulate external API call"""
    import requests

    # Simulate potentially failing external API call
    response = requests.get('https://httpbin.org/json', timeout=5)
    response.raise_for_status()

    return response.json()

7.4 Concurrency Control and Reserved Concurrency

7.4.1 Concurrency Limit Configuration

class ConcurrencyControlStack(Stack):
    def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
        super().__init__(scope, construct_id, **kwargs)

        # High priority function - reserved concurrency
        self.high_priority_function = _lambda.Function(
            self, "HighPriorityFunction",
            runtime=_lambda.Runtime.PYTHON_3_9,
            handler="index.handler",
            code=_lambda.Code.from_asset("lambda_functions/high_priority"),
            reserved_concurrent_executions=50,  # Reserve 50 concurrent executions
            timeout=Duration.seconds(30)
        )

        # Standard function - limited concurrency
        self.standard_function = _lambda.Function(
            self, "StandardFunction",
            runtime=_lambda.Runtime.PYTHON_3_9,
            handler="index.handler",
            code=_lambda.Code.from_asset("lambda_functions/standard"),
            reserved_concurrent_executions=10,  # Limit to 10 concurrent executions
            timeout=Duration.seconds(60)
        )

        # Batch processing function - no concurrency limit
        self.batch_function = _lambda.Function(
            self, "BatchFunction",
            runtime=_lambda.Runtime.PYTHON_3_9,
            handler="index.handler",
            code=_lambda.Code.from_asset("lambda_functions/batch"),
            # No concurrency limit, uses account-level remaining concurrency
            timeout=Duration.minutes(15)
        )

7.4.2 Provisioned Concurrency Configuration

class ProvisionedConcurrencyStack(Stack):
    def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
        super().__init__(scope, construct_id, **kwargs)

        # Create Lambda function
        self.warm_function = _lambda.Function(
            self, "WarmFunction",
            runtime=_lambda.Runtime.PYTHON_3_9,
            handler="index.handler",
            code=_lambda.Code.from_asset("lambda_functions/warm_function"),
            timeout=Duration.seconds(30)
        )

        # Create version
        version = self.warm_function.current_version

        # Create alias
        self.prod_alias = _lambda.Alias(
            self, "ProdAlias",
            alias_name="PROD",
            version=version,
            provisioned_concurrency_config=_lambda.ProvisionedConcurrencyConfig(
                provisioned_concurrent_executions=10
            )
        )

        # Create application autoscaling target
        from aws_cdk import aws_applicationautoscaling as appscaling

        scalable_target = appscaling.ScalableTarget(
            self, "ScalableTarget",
            service_namespace=appscaling.ServiceNamespace.LAMBDA,
            scalable_dimension="lambda:function:ProvisionedConcurrency",
            resource_id=f"function:{self.warm_function.function_name}:PROD",
            min_capacity=5,
            max_capacity=50
        )

        # Configure autoscaling policy
        scalable_target.scale_to_track_metric(
            "TargetTracking",
            target_value=0.7,
            metric=appscaling.predefined_metric_specification(
                appscaling.PredefinedMetric.LAMBDA_PROVISIONED_CONCURRENCY_UTILIZATION
            )
        )

7.5 Environment Variables and Secrets Management

7.5.1 Secure Environment Variable Configuration

from aws_cdk import (
    aws_secretsmanager as secretsmanager,
    aws_ssm as ssm,
    aws_kms as kms
)

class SecureConfigStack(Stack):
    def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
        super().__init__(scope, construct_id, **kwargs)

        # Create KMS key
        self.lambda_key = kms.Key(
            self, "LambdaKmsKey",
            description="KMS key for Lambda environment variables",
            enable_key_rotation=True
        )

        # Create Secrets Manager secret
        self.db_credentials = secretsmanager.Secret(
            self, "DatabaseCredentials",
            description="Database credentials",
            generate_secret_string=secretsmanager.SecretStringGenerator(
                secret_string_template='{"username": "dbadmin"}',
                generate_string_key="password",
                exclude_characters=" %+~`#$&*()|[]{}:;<>?!'/\\\"",
                password_length=32
            )
        )

        # Create SSM parameter
        self.api_config = ssm.StringParameter(
            self, "ApiConfig",
            parameter_name="/myapp/api/config",
            string_value=json.dumps({
                "endpoint": "https://api.example.com",
                "timeout": 30,
                "retry_count": 3
            })
        )

        # Create encrypted SSM parameter
        self.api_key = ssm.StringParameter(
            self, "ApiKey",
            parameter_name="/myapp/api/key",
            string_value="your-api-key-here",
            type=ssm.ParameterType.SECURE_STRING
        )

        # Create Lambda function
        self.secure_function = _lambda.Function(
            self, "SecureFunction",
            runtime=_lambda.Runtime.PYTHON_3_9,
            handler="index.handler",
            code=_lambda.Code.from_asset("lambda_functions/secure_config"),
            environment={
                # Basic configuration
                'ENVIRONMENT': 'production',
                'LOG_LEVEL': 'INFO',

                # Service configuration
                'DB_SECRET_ARN': self.db_credentials.secret_arn,
                'API_CONFIG_PARAM': self.api_config.parameter_name,
                'API_KEY_PARAM': self.api_key.parameter_name,

                # KMS key
                'KMS_KEY_ID': self.lambda_key.key_id
            },
            environment_encryption=self.lambda_key  # Encrypt environment variables
        )

        # Grant permissions
        self.db_credentials.grant_read(self.secure_function)
        self.api_config.grant_read(self.secure_function)
        self.api_key.grant_read(self.secure_function)
        self.lambda_key.grant_decrypt(self.secure_function)

7.5.2 Configuration Management Lambda Function

# lambda_functions/secure_config/index.py
import json
import boto3
import os
from typing import Dict, Any
import logging

logger = logging.getLogger()
logger.setLevel(os.getenv('LOG_LEVEL', 'INFO'))

# Initialize AWS clients
secrets_client = boto3.client('secretsmanager')
ssm_client = boto3.client('ssm')

class ConfigManager:
    """Configuration manager"""

    def __init__(self):
        self._cache = {}
        self.ttl = 300  # 5 minute cache

    def get_secret(self, secret_arn: str) -> Dict[str, Any]:
        """Get Secrets Manager secret"""
        if secret_arn in self._cache:
            return self._cache[secret_arn]

        try:
            response = secrets_client.get_secret_value(SecretId=secret_arn)
            secret_value = json.loads(response['SecretString'])
            self._cache[secret_arn] = secret_value
            return secret_value
        except Exception as e:
            logger.error(f"Failed to get secret {secret_arn}: {str(e)}")
            raise

    def get_parameter(self, parameter_name: str, decrypt: bool = True) -> str:
        """Get SSM parameter"""
        cache_key = f"{parameter_name}_{decrypt}"
        if cache_key in self._cache:
            return self._cache[cache_key]

        try:
            response = ssm_client.get_parameter(
                Name=parameter_name,
                WithDecryption=decrypt
            )
            value = response['Parameter']['Value']
            self._cache[cache_key] = value
            return value
        except Exception as e:
            logger.error(f"Failed to get parameter {parameter_name}: {str(e)}")
            raise

# Global configuration manager
config_manager = ConfigManager()

def handler(event: Dict[str, Any], context) -> Dict[str, Any]:
    """Lambda function using secure configuration"""

    try:
        # Get database credentials
        db_secret_arn = os.environ['DB_SECRET_ARN']
        db_credentials = config_manager.get_secret(db_secret_arn)

        # Get API configuration
        api_config_param = os.environ['API_CONFIG_PARAM']
        api_config = json.loads(config_manager.get_parameter(api_config_param, False))

        # Get API key
        api_key_param = os.environ['API_KEY_PARAM']
        api_key = config_manager.get_parameter(api_key_param, True)

        # Process business logic
        result = process_request(event, db_credentials, api_config, api_key)

        return {
            'statusCode': 200,
            'body': json.dumps({
                'success': True,
                'result': result
            })
        }

    except Exception as e:
        logger.error(f"Error processing request: {str(e)}")
        return {
            'statusCode': 500,
            'body': json.dumps({
                'success': False,
                'error': 'Internal server error'
            })
        }

def process_request(event: Dict[str, Any], db_creds: Dict[str, Any],
                   api_config: Dict[str, Any], api_key: str) -> Dict[str, Any]:
    """Business logic for processing requests"""

    # Use configuration for database connection, API calls, etc.
    logger.info("Processing request with secure configuration")

    return {
        'message': 'Request processed successfully',
        'database_connected': bool(db_creds.get('username')),
        'api_configured': bool(api_config.get('endpoint')),
        'api_key_available': bool(api_key)
    }

7.6 Lambda Extensions

7.6.1 Custom Extension

# extensions/config_extension/index.py
import json
import os
import requests
import time
from threading import Thread

class ConfigExtension:
    """Configuration extension, periodically updates configuration"""

    def __init__(self):
        self.lambda_api_url = f"http://{os.environ['AWS_LAMBDA_RUNTIME_API']}"
        self.extension_name = "config-extension"
        self.config_cache = {}

    def register(self):
        """Register extension"""
        url = f"{self.lambda_api_url}/2020-01-01/extension/register"
        headers = {'Lambda-Extension-Name': self.extension_name}
        data = {'events': ['INVOKE', 'SHUTDOWN']}

        response = requests.post(url, headers=headers, json=data)
        return response.headers.get('Lambda-Extension-Identifier')

    def next_event(self, extension_id: str):
        """Get next event"""
        url = f"{self.lambda_api_url}/2020-01-01/extension/event/next"
        headers = {'Lambda-Extension-Identifier': extension_id}

        response = requests.get(url, headers=headers)
        return response.json()

    def update_config(self):
        """Update configuration cache"""
        # Can fetch configuration from external configuration service
        self.config_cache = {
            'updated_at': time.time(),
            'feature_flags': {
                'new_feature': True,
                'beta_feature': False
            },
            'api_endpoints': {
                'primary': 'https://api.example.com',
                'fallback': 'https://api-backup.example.com'
            }
        }

        # Write configuration to shared file
        with open('/tmp/config.json', 'w') as f:
            json.dump(self.config_cache, f)

    def run(self):
        """Run extension"""
        extension_id = self.register()

        # Start configuration update thread
        config_thread = Thread(target=self.config_updater, daemon=True)
        config_thread.start()

        # Main event loop
        while True:
            event = self.next_event(extension_id)

            if event['eventType'] == 'SHUTDOWN':
                break

    def config_updater(self):
        """Periodically update configuration"""
        while True:
            self.update_config()
            time.sleep(60)  # Update every 60 seconds

if __name__ == '__main__':
    extension = ConfigExtension()
    extension.run()

7.6.2 Extension Configuration

class ExtensionStack(Stack):
    def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
        super().__init__(scope, construct_id, **kwargs)

        # Create Extension Layer
        self.extension_layer = _lambda.LayerVersion(
            self, "ConfigExtensionLayer",
            code=_lambda.Code.from_asset("extensions/config_extension"),
            compatible_runtimes=[_lambda.Runtime.PYTHON_3_9],
            description="Configuration extension layer"
        )

        # Lambda function using Extension
        self.extended_function = _lambda.Function(
            self, "ExtendedFunction",
            runtime=_lambda.Runtime.PYTHON_3_9,
            handler="index.handler",
            code=_lambda.Code.from_asset("lambda_functions/extended"),
            layers=[self.extension_layer],
            environment={
                'USE_EXTENSION_CONFIG': 'true'
            }
        )

7.7 Chapter Summary

Key Takeaways
  • VPC configuration enables Lambda to securely access private resources
  • Layers provide an effective way for code reuse and dependency management
  • Comprehensive error handling and retry mechanisms improve system reliability
  • Concurrency control and provisioned concurrency optimize performance and cost
  • Secure configuration management protects sensitive information
  • Extensions expand Lambda’s functional boundaries

In the next chapter, we will learn about Lambda integration with other AWS services, including deep integration with API Gateway, DynamoDB, S3, and other services.

Further Reading