Chapter 4 - Advanced Constructs Features

Haiyue
34min
Learning Objectives
  • Master the creation of custom Constructs
  • Learn to use CDK Patterns and best practices
  • Understand the use of Cross-Stack References
  • Master the application of CDK Context and Feature Flags

Custom Construct Development

Construct Design Principles

🔄 正在渲染 Mermaid 图表...

Creating Custom Constructs

from constructs import Construct
from aws_cdk import (
    aws_lambda as lambda_,
    aws_apigateway as apigw,
    aws_dynamodb as dynamodb,
    aws_iam as iam,
    Duration,
    RemovalPolicy
)
from typing import Optional, List, Dict

class ServerlessApi(Construct):
    """Custom Construct: Serverless API"""

    def __init__(self, scope: Construct, construct_id: str,
                 table_name: str,
                 lambda_timeout: Optional[Duration] = None,
                 cors_origins: Optional[List[str]] = None,
                 api_key_required: bool = False,
                 **kwargs) -> None:
        super().__init__(scope, construct_id, **kwargs)

        # Set default values
        lambda_timeout = lambda_timeout or Duration.seconds(30)
        cors_origins = cors_origins or ["*"]

        # Create DynamoDB table
        self.table = dynamodb.Table(
            self,
            "Table",
            table_name=table_name,
            partition_key=dynamodb.Attribute(
                name="id",
                type=dynamodb.AttributeType.STRING
            ),
            billing_mode=dynamodb.BillingMode.PAY_PER_REQUEST,
            removal_policy=RemovalPolicy.DESTROY
        )

        # Create Lambda function
        self.function = lambda_.Function(
            self,
            "ApiFunction",
            runtime=lambda_.Runtime.PYTHON_3_9,
            handler="index.handler",
            code=lambda_.Code.from_inline(self._get_lambda_code()),
            timeout=lambda_timeout,
            environment={
                "TABLE_NAME": self.table.table_name
            }
        )

        # Grant Lambda access to DynamoDB
        self.table.grant_read_write_data(self.function)

        # Create API Gateway
        self.api = apigw.RestApi(
            self,
            "Api",
            rest_api_name=f"{table_name}-api",
            description=f"API for {table_name} operations",
            default_cors_preflight_options=apigw.CorsOptions(
                allow_origins=cors_origins,
                allow_methods=apigw.Cors.ALL_METHODS,
                allow_headers=["Content-Type", "X-Amz-Date", "Authorization", "X-Api-Key"]
            )
        )

        # Create API resources and methods
        items_resource = self.api.root.add_resource("items")
        item_resource = items_resource.add_resource("{id}")

        # Lambda integration
        integration = apigw.LambdaIntegration(self.function)

        # Add methods
        items_resource.add_method("GET", integration, api_key_required=api_key_required)
        items_resource.add_method("POST", integration, api_key_required=api_key_required)
        item_resource.add_method("GET", integration, api_key_required=api_key_required)
        item_resource.add_method("PUT", integration, api_key_required=api_key_required)
        item_resource.add_method("DELETE", integration, api_key_required=api_key_required)

        # If API Key is required, create usage plan
        if api_key_required:
            self._setup_api_key()

    def _get_lambda_code(self) -> str:
        """Lambda function code"""
        return """
import json
import boto3
import uuid
from decimal import Decimal

dynamodb = boto3.resource('dynamodb')
table_name = os.environ['TABLE_NAME']
table = dynamodb.Table(table_name)

def decimal_default(obj):
    if isinstance(obj, Decimal):
        return float(obj)
    raise TypeError

def handler(event, context):
    try:
        http_method = event['httpMethod']
        resource_path = event['resource']

        if resource_path == '/items' and http_method == 'GET':
            return list_items()
        elif resource_path == '/items' and http_method == 'POST':
            return create_item(json.loads(event['body']))
        elif resource_path == '/items/{id}' and http_method == 'GET':
            return get_item(event['pathParameters']['id'])
        elif resource_path == '/items/{id}' and http_method == 'PUT':
            return update_item(event['pathParameters']['id'], json.loads(event['body']))
        elif resource_path == '/items/{id}' and http_method == 'DELETE':
            return delete_item(event['pathParameters']['id'])
        else:
            return {
                'statusCode': 405,
                'body': json.dumps({'error': 'Method not allowed'})
            }

    except Exception as e:
        return {
            'statusCode': 500,
            'body': json.dumps({'error': str(e)})
        }

def list_items():
    response = table.scan()
    return {
        'statusCode': 200,
        'body': json.dumps(response['Items'], default=decimal_default)
    }

def create_item(item_data):
    item_data['id'] = str(uuid.uuid4())
    table.put_item(Item=item_data)
    return {
        'statusCode': 201,
        'body': json.dumps(item_data, default=decimal_default)
    }

def get_item(item_id):
    response = table.get_item(Key={'id': item_id})
    if 'Item' not in response:
        return {
            'statusCode': 404,
            'body': json.dumps({'error': 'Item not found'})
        }
    return {
        'statusCode': 200,
        'body': json.dumps(response['Item'], default=decimal_default)
    }

def update_item(item_id, item_data):
    item_data['id'] = item_id
    table.put_item(Item=item_data)
    return {
        'statusCode': 200,
        'body': json.dumps(item_data, default=decimal_default)
    }

def delete_item(item_id):
    table.delete_item(Key={'id': item_id})
    return {
        'statusCode': 204,
        'body': ''
    }
        """

    def _setup_api_key(self):
        """Set up API Key and usage plan"""
        api_key = apigw.ApiKey(
            self,
            "ApiKey",
            api_key_name=f"{self.api.rest_api_name}-key"
        )

        usage_plan = apigw.UsagePlan(
            self,
            "UsagePlan",
            name=f"{self.api.rest_api_name}-usage-plan",
            throttle=apigw.ThrottleSettings(
                rate_limit=100,
                burst_limit=200
            ),
            quota=apigw.QuotaSettings(
                limit=10000,
                period=apigw.Period.MONTH
            ),
            api_stages=[
                apigw.UsagePlanPerApiStage(
                    api=self.api,
                    stage=self.api.deployment_stage
                )
            ]
        )

        usage_plan.add_api_key(api_key)

        self.api_key = api_key
        self.usage_plan = usage_plan

    @property
    def api_url(self) -> str:
        """API Gateway URL"""
        return self.api.url

    @property
    def table_name(self) -> str:
        """DynamoDB table name"""
        return self.table.table_name

    def add_custom_domain(self, domain_name: str, certificate_arn: str):
        """Add custom domain"""
        domain = apigw.DomainName(
            self,
            "CustomDomain",
            domain_name=domain_name,
            certificate=apigw.Certificate.from_certificate_arn(
                self, "Certificate", certificate_arn
            )
        )

        apigw.BasePathMapping(
            self,
            "BasePathMapping",
            domain_name=domain,
            rest_api=self.api
        )

        return domain

Using Custom Constructs

from aws_cdk import Stack
from constructs import Construct
from .serverless_api import ServerlessApi

class MyApplicationStack(Stack):
    def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
        super().__init__(scope, construct_id, **kwargs)

        # Use custom Construct
        users_api = ServerlessApi(
            self,
            "UsersApi",
            table_name="users",
            lambda_timeout=Duration.minutes(1),
            cors_origins=["https://myapp.com", "https://admin.myapp.com"],
            api_key_required=True
        )

        products_api = ServerlessApi(
            self,
            "ProductsApi",
            table_name="products",
            cors_origins=["*"]
        )

        # Output API URLs
        CfnOutput(
            self,
            "UsersApiUrl",
            value=users_api.api_url,
            description="Users API Gateway URL"
        )

        CfnOutput(
            self,
            "ProductsApiUrl",
            value=products_api.api_url,
            description="Products API Gateway URL"
        )

CDK Patterns and Best Practices

Common Architectural Patterns

from constructs import Construct
from aws_cdk import (
    aws_lambda as lambda_,
    aws_sqs as sqs,
    aws_lambda_event_sources as lambda_events,
    aws_sns as sns,
    aws_sns_subscriptions as subscriptions,
    Duration
)

class EventDrivenPattern(Construct):
    """Event-driven architecture pattern"""

    def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
        super().__init__(scope, construct_id, **kwargs)

        # Create SNS topic
        self.topic = sns.Topic(
            self,
            "EventTopic",
            topic_name="application-events"
        )

        # Create dead letter queue
        dlq = sqs.Queue(
            self,
            "DeadLetterQueue",
            queue_name="failed-events-dlq",
            retention_period=Duration.days(14)
        )

        # Create processing queue
        self.processing_queue = sqs.Queue(
            self,
            "ProcessingQueue",
            queue_name="event-processing",
            dead_letter_queue=sqs.DeadLetterQueue(
                max_receive_count=3,
                queue=dlq
            ),
            visibility_timeout=Duration.minutes(5)
        )

        # Subscribe SNS to SQS
        self.topic.add_subscription(
            subscriptions.SqsSubscription(self.processing_queue)
        )

        # Create processor function
        self.processor = lambda_.Function(
            self,
            "EventProcessor",
            runtime=lambda_.Runtime.PYTHON_3_9,
            handler="processor.handler",
            code=lambda_.Code.from_inline("""
import json

def handler(event, context):
    for record in event['Records']:
        # Extract SNS message from SQS record
        sns_message = json.loads(record['body'])
        message = json.loads(sns_message['Message'])

        print(f"Processing event: {message}")

        # Add actual processing logic here
        process_event(message)

    return {'statusCode': 200}

def process_event(event_data):
    event_type = event_data.get('type')

    if event_type == 'user_created':
        # Handle user creation event
        pass
    elif event_type == 'order_placed':
        # Handle order placement event
        pass
    else:
        print(f"Unknown event type: {event_type}")
            """),
            timeout=Duration.minutes(1),
            environment={
                "TOPIC_ARN": self.topic.topic_arn
            }
        )

        # Configure Lambda to read events from SQS
        self.processor.add_event_source(
            lambda_events.SqsEventSource(
                self.processing_queue,
                batch_size=10,
                max_batching_window=Duration.seconds(5)
            )
        )

        # Grant publish permissions
        self.topic.grant_publish(self.processor)

    def create_subscriber(self, handler_code: str, handler_name: str) -> lambda_.Function:
        """Create event subscriber"""
        subscriber = lambda_.Function(
            self,
            f"{handler_name}Subscriber",
            runtime=lambda_.Runtime.PYTHON_3_9,
            handler="index.handler",
            code=lambda_.Code.from_inline(handler_code),
            timeout=Duration.seconds(30)
        )

        # Create dedicated queue
        subscriber_queue = sqs.Queue(
            self,
            f"{handler_name}Queue",
            visibility_timeout=Duration.seconds(60)
        )

        # Subscribe to topic
        self.topic.add_subscription(
            subscriptions.SqsSubscription(subscriber_queue)
        )

        # Configure Lambda event source
        subscriber.add_event_source(
            lambda_events.SqsEventSource(subscriber_queue)
        )

        return subscriber

Microservice Pattern

class MicroservicePattern(Construct):
    """Microservice architecture pattern"""

    def __init__(self, scope: Construct, construct_id: str,
                 service_name: str,
                 **kwargs) -> None:
        super().__init__(scope, construct_id, **kwargs)

        self.service_name = service_name

        # Service-specific VPC subnet group
        # (Assuming VPC already exists)

        # 1. Data layer
        self.database = self._create_database()

        # 2. Business logic layer
        self.business_logic = self._create_business_logic()

        # 3. API layer
        self.api = self._create_api()

        # 4. Monitoring and logging
        self.monitoring = self._create_monitoring()

    def _create_database(self):
        """Create service-specific database"""
        return dynamodb.Table(
            self,
            "ServiceDatabase",
            table_name=f"{self.service_name}-data",
            partition_key=dynamodb.Attribute(
                name="pk",
                type=dynamodb.AttributeType.STRING
            ),
            sort_key=dynamodb.Attribute(
                name="sk",
                type=dynamodb.AttributeType.STRING
            ),
            billing_mode=dynamodb.BillingMode.PAY_PER_REQUEST,
            stream=dynamodb.StreamViewType.NEW_AND_OLD_IMAGES
        )

    def _create_business_logic(self):
        """Create business logic processor"""
        return lambda_.Function(
            self,
            "BusinessLogic",
            function_name=f"{self.service_name}-processor",
            runtime=lambda_.Runtime.PYTHON_3_9,
            handler="business.handler",
            code=lambda_.Code.from_asset(f"lambda/{self.service_name}"),
            environment={
                "SERVICE_NAME": self.service_name,
                "TABLE_NAME": self.database.table_name
            },
            timeout=Duration.minutes(5)
        )

    def _create_api(self):
        """Create API Gateway"""
        api = apigw.RestApi(
            self,
            "ServiceApi",
            rest_api_name=f"{self.service_name}-api",
            description=f"API for {self.service_name} microservice"
        )

        # Health check endpoint
        health = api.root.add_resource("health")
        health.add_method(
            "GET",
            apigw.MockIntegration(
                integration_responses=[
                    apigw.IntegrationResponse(
                        status_code="200",
                        response_templates={
                            "application/json": '{"status": "healthy", "service": "' + self.service_name + '"}'
                        }
                    )
                ],
                request_templates={
                    "application/json": '{"statusCode": 200}'
                }
            ),
            method_responses=[
                apigw.MethodResponse(status_code="200")
            ]
        )

        # Business endpoint
        service_resource = api.root.add_resource(self.service_name)
        service_resource.add_method(
            "ANY",
            apigw.LambdaIntegration(self.business_logic),
            authorization_type=apigw.AuthorizationType.IAM
        )

        # Proxy resource to handle all subpaths
        proxy = service_resource.add_resource("{proxy+}")
        proxy.add_method(
            "ANY",
            apigw.LambdaIntegration(self.business_logic)
        )

        return api

    def _create_monitoring(self):
        """Create monitoring and alarms"""
        # This would create CloudWatch alarms, dashboards, etc.
        # Simplified for example
        pass

# Using microservice pattern
class MicroservicesStack(Stack):
    def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
        super().__init__(scope, construct_id, **kwargs)

        # Create multiple microservices
        user_service = MicroservicePattern(
            self, "UserService", service_name="users"
        )

        product_service = MicroservicePattern(
            self, "ProductService", service_name="products"
        )

        order_service = MicroservicePattern(
            self, "OrderService", service_name="orders"
        )

        # Create API Gateway aggregation layer
        self._create_api_gateway(user_service, product_service, order_service)

    def _create_api_gateway(self, *services):
        """Create unified API Gateway"""
        # Implement API Gateway aggregation logic
        pass

Cross-Stack References

Export and Import Mechanism

🔄 正在渲染 Mermaid 图表...

Practical Examples

# shared_infrastructure.py
class SharedInfrastructureStack(Stack):
    def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
        super().__init__(scope, construct_id, **kwargs)

        # Create shared VPC
        self.vpc = ec2.Vpc(
            self,
            "SharedVpc",
            max_azs=3,
            cidr="10.0.0.0/16",
            subnet_configuration=[
                ec2.SubnetConfiguration(
                    name="Public",
                    subnet_type=ec2.SubnetType.PUBLIC,
                    cidr_mask=24
                ),
                ec2.SubnetConfiguration(
                    name="Private",
                    subnet_type=ec2.SubnetType.PRIVATE_WITH_EGRESS,
                    cidr_mask=24
                ),
                ec2.SubnetConfiguration(
                    name="Database",
                    subnet_type=ec2.SubnetType.PRIVATE_ISOLATED,
                    cidr_mask=24
                )
            ]
        )

        # Create shared security groups
        self.app_security_group = ec2.SecurityGroup(
            self,
            "AppSecurityGroup",
            vpc=self.vpc,
            description="Security group for applications",
            allow_all_outbound=True
        )

        self.database_security_group = ec2.SecurityGroup(
            self,
            "DatabaseSecurityGroup",
            vpc=self.vpc,
            description="Security group for databases"
        )

        # Allow app to access database
        self.database_security_group.add_ingress_rule(
            peer=self.app_security_group,
            connection=ec2.Port.tcp(5432),
            description="Allow app access to database"
        )

        # Export resources for other Stacks to use
        CfnOutput(
            self,
            "VpcId",
            value=self.vpc.vpc_id,
            export_name="SharedVpcId",
            description="Shared VPC ID"
        )

        CfnOutput(
            self,
            "PrivateSubnetIds",
            value=",".join([subnet.subnet_id for subnet in self.vpc.private_subnets]),
            export_name="SharedPrivateSubnetIds",
            description="Private subnet IDs"
        )

        CfnOutput(
            self,
            "PublicSubnetIds",
            value=",".join([subnet.subnet_id for subnet in self.vpc.public_subnets]),
            export_name="SharedPublicSubnetIds",
            description="Public subnet IDs"
        )

        CfnOutput(
            self,
            "DatabaseSubnetIds",
            value=",".join([subnet.subnet_id for subnet in self.vpc.isolated_subnets]),
            export_name="SharedDatabaseSubnetIds",
            description="Database subnet IDs"
        )

        CfnOutput(
            self,
            "AppSecurityGroupId",
            value=self.app_security_group.security_group_id,
            export_name="SharedAppSecurityGroupId",
            description="Application security group ID"
        )

        CfnOutput(
            self,
            "DatabaseSecurityGroupId",
            value=self.database_security_group.security_group_id,
            export_name="SharedDatabaseSecurityGroupId",
            description="Database security group ID"
        )

# application_stack.py
class ApplicationStack(Stack):
    def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
        super().__init__(scope, construct_id, **kwargs)

        # Import shared resources
        vpc_id = cdk.Fn.import_value("SharedVpcId")
        private_subnet_ids = cdk.Fn.import_value("SharedPrivateSubnetIds").split(",")
        app_sg_id = cdk.Fn.import_value("SharedAppSecurityGroupId")

        # Or use CDK convenience methods
        vpc = ec2.Vpc.from_lookup(
            self,
            "ImportedVpc",
            vpc_id=vpc_id
        )

        app_security_group = ec2.SecurityGroup.from_security_group_id(
            self,
            "ImportedAppSG",
            security_group_id=app_sg_id
        )

        # Create application resources
        app_function = lambda_.Function(
            self,
            "AppFunction",
            runtime=lambda_.Runtime.PYTHON_3_9,
            handler="app.handler",
            code=lambda_.Code.from_asset("lambda"),
            vpc=vpc,
            vpc_subnets=ec2.SubnetSelection(
                subnet_type=ec2.SubnetType.PRIVATE_WITH_EGRESS
            ),
            security_groups=[app_security_group]
        )

        # Create ALB
        alb = elbv2.ApplicationLoadBalancer(
            self,
            "AppLoadBalancer",
            vpc=vpc,
            internet_facing=True,
            vpc_subnets=ec2.SubnetSelection(
                subnet_type=ec2.SubnetType.PUBLIC
            )
        )

        # Configure target group
        target_group = elbv2.ApplicationTargetGroup(
            self,
            "AppTargets",
            vpc=vpc,
            port=80,
            protocol=elbv2.ApplicationProtocol.HTTP,
            targets=[targets.LambdaTarget(app_function)]
        )

        # Add listener
        alb.add_listener(
            "AppListener",
            port=80,
            default_target_groups=[target_group]
        )

# database_stack.py
class DatabaseStack(Stack):
    def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
        super().__init__(scope, construct_id, **kwargs)

        # Import shared resources
        vpc_id = cdk.Fn.import_value("SharedVpcId")
        db_subnet_ids = cdk.Fn.import_value("SharedDatabaseSubnetIds").split(",")
        db_sg_id = cdk.Fn.import_value("SharedDatabaseSecurityGroupId")

        vpc = ec2.Vpc.from_lookup(self, "ImportedVpc", vpc_id=vpc_id)

        # Create subnet group
        db_subnet_group = rds.SubnetGroup(
            self,
            "DatabaseSubnetGroup",
            description="Subnet group for database",
            vpc=vpc,
            vpc_subnets=ec2.SubnetSelection(
                subnet_type=ec2.SubnetType.PRIVATE_ISOLATED
            ),
            removal_policy=RemovalPolicy.DESTROY
        )

        # Create database
        database = rds.DatabaseInstance(
            self,
            "ApplicationDatabase",
            engine=rds.DatabaseInstanceEngine.postgres(
                version=rds.PostgresEngineVersion.VER_13_7
            ),
            instance_type=ec2.InstanceType.of(
                ec2.InstanceClass.BURSTABLE3,
                ec2.InstanceSize.MICRO
            ),
            vpc=vpc,
            subnet_group=db_subnet_group,
            security_groups=[
                ec2.SecurityGroup.from_security_group_id(
                    self, "ImportedDbSG", db_sg_id
                )
            ],
            credentials=rds.Credentials.from_generated_secret(
                "dbadmin",
                exclude_characters='/@"\'
            ),
            database_name="appdb",
            backup_retention=Duration.days(7),
            deletion_protection=False,
            removal_policy=RemovalPolicy.DESTROY
        )

        # Export database connection information
        CfnOutput(
            self,
            "DatabaseEndpoint",
            value=database.instance_endpoint.hostname,
            export_name="DatabaseEndpoint",
            description="Database endpoint"
        )

        CfnOutput(
            self,
            "DatabaseSecretArn",
            value=database.secret.secret_arn,
            export_name="DatabaseSecretArn",
            description="Database secret ARN"
        )

CDK Context and Feature Flags

Context Configuration Management

# cdk.json context configuration
{
  "app": "python app.py",
  "context": {
    "@aws-cdk/aws-lambda:recognizeLayerVersion": true,
    "@aws-cdk/core:enableStackNameDuplicates": true,
    "@aws-cdk/core:stackRelativeExports": true,

    // Custom context
    "environment": "development",
    "vpc-cidr": "10.0.0.0/16",
    "database-instance-type": "db.t3.micro",
    "enable-logging": true,
    "cors-origins": ["https://localhost:3000", "https://dev.example.com"],

    // Environment-specific configuration
    "dev": {
      "instance-count": 1,
      "database-backup-retention": 1,
      "log-retention-days": 7
    },
    "prod": {
      "instance-count": 3,
      "database-backup-retention": 30,
      "log-retention-days": 365
    }
  },
  "feature_flags": {
    "@aws-cdk/core:newStyleStackSynthesis": true,
    "@aws-cdk/aws-s3:createDefaultLoggingPolicy": true,
    "@aws-cdk/aws-sns-subscriptions:restrictSqsDescryption": true
  }
}

Using Context in Code

class ContextAwareStack(Stack):
    def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
        super().__init__(scope, construct_id, **kwargs)

        # Get context values
        environment = self.node.try_get_context("environment") or "dev"
        vpc_cidr = self.node.try_get_context("vpc-cidr") or "10.0.0.0/16"
        enable_logging = self.node.try_get_context("enable-logging") or False
        cors_origins = self.node.try_get_context("cors-origins") or ["*"]

        # Get environment-specific configuration
        env_config = self.node.try_get_context(environment) or {}
        instance_count = env_config.get("instance-count", 1)
        backup_retention = env_config.get("database-backup-retention", 7)
        log_retention_days = env_config.get("log-retention-days", 7)

        # Create resources using configuration
        vpc = ec2.Vpc(
            self,
            "Vpc",
            cidr=vpc_cidr,
            max_azs=2 if environment == "dev" else 3
        )

        # Conditionally create resources
        if enable_logging:
            log_group = logs.LogGroup(
                self,
                "ApplicationLogs",
                log_group_name=f"/aws/lambda/{environment}-app",
                retention=logs.RetentionDays(log_retention_days),
                removal_policy=RemovalPolicy.DESTROY
            )

        # Create Lambda function with environment-specific configuration
        function = lambda_.Function(
            self,
            "AppFunction",
            runtime=lambda_.Runtime.PYTHON_3_9,
            handler="index.handler",
            code=lambda_.Code.from_inline("def handler(e, c): return {}"),
            environment={
                "ENVIRONMENT": environment,
                "CORS_ORIGINS": ",".join(cors_origins),
                "LOG_LEVEL": "DEBUG" if environment == "dev" else "INFO"
            },
            log_group=log_group if enable_logging else None
        )

        # Create database with environment-specific instance type
        db_instance_type = self.node.try_get_context("database-instance-type")
        if db_instance_type:
            database = rds.DatabaseInstance(
                self,
                "Database",
                engine=rds.DatabaseInstanceEngine.postgres(
                    version=rds.PostgresEngineVersion.VER_13_7
                ),
                instance_type=ec2.InstanceType(db_instance_type),
                vpc=vpc,
                backup_retention=Duration.days(backup_retention),
                removal_policy=RemovalPolicy.DESTROY
            )

Advanced Context Usage

class AdvancedContextStack(Stack):
    def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
        super().__init__(scope, construct_id, **kwargs)

        # Get context from command line or environment variables
        # cdk deploy --context env=prod --context region=us-east-1

        env_name = self.node.try_get_context("env")
        region = self.node.try_get_context("region")

        if not env_name:
            raise ValueError("Environment context 'env' is required")

        # Load environment configuration file
        config = self._load_environment_config(env_name)

        # Validate required context values
        self._validate_context()

        # Create resources using configuration
        self._create_resources(config)

    def _load_environment_config(self, env_name: str) -> dict:
        """Load environment configuration"""
        configs = {
            "dev": {
                "vpc_cidr": "10.0.0.0/16",
                "database": {
                    "instance_type": "db.t3.micro",
                    "multi_az": False,
                    "backup_retention": 1
                },
                "lambda": {
                    "memory_size": 256,
                    "timeout": 30
                },
                "monitoring": {
                    "detailed_monitoring": False,
                    "log_retention": 7
                }
            },
            "staging": {
                "vpc_cidr": "10.1.0.0/16",
                "database": {
                    "instance_type": "db.t3.small",
                    "multi_az": False,
                    "backup_retention": 7
                },
                "lambda": {
                    "memory_size": 512,
                    "timeout": 60
                },
                "monitoring": {
                    "detailed_monitoring": True,
                    "log_retention": 30
                }
            },
            "prod": {
                "vpc_cidr": "10.2.0.0/16",
                "database": {
                    "instance_type": "db.r5.large",
                    "multi_az": True,
                    "backup_retention": 30
                },
                "lambda": {
                    "memory_size": 1024,
                    "timeout": 300
                },
                "monitoring": {
                    "detailed_monitoring": True,
                    "log_retention": 365
                }
            }
        }

        if env_name not in configs:
            raise ValueError(f"Unknown environment: {env_name}")

        return configs[env_name]

    def _validate_context(self):
        """Validate required context values"""
        required_contexts = ["env"]

        for context_key in required_contexts:
            if not self.node.try_get_context(context_key):
                raise ValueError(f"Required context '{context_key}' is missing")

    def _create_resources(self, config: dict):
        """Create resources based on configuration"""
        # Create VPC based on configuration
        vpc = ec2.Vpc(
            self,
            "Vpc",
            cidr=config["vpc_cidr"]
        )

        # Create database based on configuration
        database = rds.DatabaseInstance(
            self,
            "Database",
            engine=rds.DatabaseInstanceEngine.postgres(
                version=rds.PostgresEngineVersion.VER_13_7
            ),
            instance_type=ec2.InstanceType(config["database"]["instance_type"]),
            vpc=vpc,
            multi_az=config["database"]["multi_az"],
            backup_retention=Duration.days(config["database"]["backup_retention"]),
            removal_policy=RemovalPolicy.DESTROY
        )

        # Create Lambda based on configuration
        function = lambda_.Function(
            self,
            "Function",
            runtime=lambda_.Runtime.PYTHON_3_9,
            handler="index.handler",
            code=lambda_.Code.from_inline("def handler(e, c): return {}"),
            memory_size=config["lambda"]["memory_size"],
            timeout=Duration.seconds(config["lambda"]["timeout"]),
            log_retention=logs.RetentionDays(config["monitoring"]["log_retention"])
        )

This chapter provides an in-depth introduction to advanced features of CDK, including custom Construct development, architectural patterns, cross-Stack references, and context management. These skills are crucial for building enterprise-grade, maintainable infrastructure code.