Chapter 5: Common AWS Service Integration

Haiyue
44min
Learning Objectives
  • Master CDK implementation of VPC network architecture
  • Learn to configure EC2, RDS, ElastiCache and other compute and storage services
  • Understand serverless architecture implementation with API Gateway and Lambda
  • Master configuration of monitoring and messaging services like CloudWatch, SNS, and SQS

VPC Network Architecture

Complete VPC Setup

from aws_cdk import (
    Stack,
    aws_ec2 as ec2,
    aws_logs as logs,
    CfnOutput
)
from constructs import Construct

class NetworkingStack(Stack):
    def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
        super().__init__(scope, construct_id, **kwargs)

        # Create VPC
        self.vpc = ec2.Vpc(
            self,
            "MainVPC",
            vpc_name="main-vpc",
            max_azs=3,  # Use 3 availability zones
            cidr="10.0.0.0/16",

            # Define subnet configuration
            subnet_configuration=[
                # Public subnet - for load balancers, NAT gateways
                ec2.SubnetConfiguration(
                    name="Public",
                    subnet_type=ec2.SubnetType.PUBLIC,
                    cidr_mask=24  # 10.0.0.0/24, 10.0.1.0/24, 10.0.2.0/24
                ),
                # Private subnet - for application servers
                ec2.SubnetConfiguration(
                    name="Private",
                    subnet_type=ec2.SubnetType.PRIVATE_WITH_EGRESS,
                    cidr_mask=24  # 10.0.10.0/24, 10.0.11.0/24, 10.0.12.0/24
                ),
                # Database subnet - completely isolated
                ec2.SubnetConfiguration(
                    name="Database",
                    subnet_type=ec2.SubnetType.PRIVATE_ISOLATED,
                    cidr_mask=24  # 10.0.20.0/24, 10.0.21.0/24, 10.0.22.0/24
                )
            ],

            # Enable DNS
            enable_dns_hostnames=True,
            enable_dns_support=True,

            # NAT gateway configuration
            nat_gateways=1,  # Production environments should use one per AZ

            # Flow logs
            flow_logs={
                "FlowLogsCloudWatch": ec2.FlowLogOptions(
                    destination=ec2.FlowLogDestination.to_cloud_watch_logs(
                        log_group=logs.LogGroup(
                            self,
                            "VpcFlowLogsGroup",
                            retention=logs.RetentionDays.ONE_MONTH
                        )
                    ),
                    traffic_type=ec2.FlowLogTrafficType.ALL
                )
            }
        )

        # Create security groups
        self._create_security_groups()

        # Create VPC endpoints (reduce NAT gateway costs)
        self._create_vpc_endpoints()

        # Output network information
        self._create_outputs()

    def _create_security_groups(self):
        """Create security groups"""

        # Web tier security group
        self.web_sg = ec2.SecurityGroup(
            self,
            "WebSecurityGroup",
            vpc=self.vpc,
            description="Security group for web servers",
            allow_all_outbound=True
        )

        # Allow HTTP/HTTPS traffic
        self.web_sg.add_ingress_rule(
            peer=ec2.Peer.any_ipv4(),
            connection=ec2.Port.tcp(80),
            description="Allow HTTP"
        )
        self.web_sg.add_ingress_rule(
            peer=ec2.Peer.any_ipv4(),
            connection=ec2.Port.tcp(443),
            description="Allow HTTPS"
        )

        # Application tier security group
        self.app_sg = ec2.SecurityGroup(
            self,
            "AppSecurityGroup",
            vpc=self.vpc,
            description="Security group for application servers",
            allow_all_outbound=True
        )

        # Only allow traffic from web tier
        self.app_sg.add_ingress_rule(
            peer=self.web_sg,
            connection=ec2.Port.tcp(8080),
            description="Allow traffic from web tier"
        )

        # Database tier security group
        self.db_sg = ec2.SecurityGroup(
            self,
            "DatabaseSecurityGroup",
            vpc=self.vpc,
            description="Security group for database servers"
        )

        # Only allow database connections from application tier
        self.db_sg.add_ingress_rule(
            peer=self.app_sg,
            connection=ec2.Port.tcp(5432),
            description="Allow PostgreSQL from app tier"
        )
        self.db_sg.add_ingress_rule(
            peer=self.app_sg,
            connection=ec2.Port.tcp(3306),
            description="Allow MySQL from app tier"
        )

        # Cache tier security group
        self.cache_sg = ec2.SecurityGroup(
            self,
            "CacheSecurityGroup",
            vpc=self.vpc,
            description="Security group for cache servers"
        )

        self.cache_sg.add_ingress_rule(
            peer=self.app_sg,
            connection=ec2.Port.tcp(6379),
            description="Allow Redis from app tier"
        )

        # Management access security group
        self.bastion_sg = ec2.SecurityGroup(
            self,
            "BastionSecurityGroup",
            vpc=self.vpc,
            description="Security group for bastion host"
        )

        # Only allow SSH access from company IP
        self.bastion_sg.add_ingress_rule(
            peer=ec2.Peer.ipv4("203.0.113.0/24"),  # Replace with actual company IP range
            connection=ec2.Port.tcp(22),
            description="Allow SSH from company network"
        )

    def _create_vpc_endpoints(self):
        """Create VPC endpoints to reduce NAT gateway usage"""

        # S3 gateway endpoint
        self.vpc.add_gateway_endpoint(
            "S3Endpoint",
            service=ec2.GatewayVpcEndpointAwsService.S3,
            subnets=[
                ec2.SubnetSelection(subnet_type=ec2.SubnetType.PRIVATE_WITH_EGRESS),
                ec2.SubnetSelection(subnet_type=ec2.SubnetType.PRIVATE_ISOLATED)
            ]
        )

        # DynamoDB gateway endpoint
        self.vpc.add_gateway_endpoint(
            "DynamoDbEndpoint",
            service=ec2.GatewayVpcEndpointAwsService.DYNAMODB,
            subnets=[
                ec2.SubnetSelection(subnet_type=ec2.SubnetType.PRIVATE_WITH_EGRESS),
                ec2.SubnetSelection(subnet_type=ec2.SubnetType.PRIVATE_ISOLATED)
            ]
        )

        # Interface endpoint security group
        endpoint_sg = ec2.SecurityGroup(
            self,
            "VpcEndpointSG",
            vpc=self.vpc,
            description="Security group for VPC endpoints"
        )
        endpoint_sg.add_ingress_rule(
            peer=ec2.Peer.ipv4(self.vpc.vpc_cidr_block),
            connection=ec2.Port.tcp(443),
            description="Allow HTTPS from VPC"
        )

        # Interface endpoints for common services
        services = [
            ec2.InterfaceVpcEndpointAwsService.ECR,
            ec2.InterfaceVpcEndpointAwsService.ECR_DOCKER,
            ec2.InterfaceVpcEndpointAwsService.LAMBDA,
            ec2.InterfaceVpcEndpointAwsService.SECRETS_MANAGER,
            ec2.InterfaceVpcEndpointAwsService.SSM,
            ec2.InterfaceVpcEndpointAwsService.SSM_MESSAGES,
            ec2.InterfaceVpcEndpointAwsService.EC2_MESSAGES
        ]

        for service in services:
            self.vpc.add_interface_endpoint(
                f"{service.name}Endpoint",
                service=service,
                subnets=ec2.SubnetSelection(
                    subnet_type=ec2.SubnetType.PRIVATE_WITH_EGRESS
                ),
                security_groups=[endpoint_sg]
            )

    def _create_outputs(self):
        """Create outputs"""
        CfnOutput(self, "VpcId", value=self.vpc.vpc_id)
        CfnOutput(self, "VpcCidr", value=self.vpc.vpc_cidr_block)

        CfnOutput(
            self, "PublicSubnetIds",
            value=",".join([s.subnet_id for s in self.vpc.public_subnets])
        )
        CfnOutput(
            self, "PrivateSubnetIds",
            value=",".join([s.subnet_id for s in self.vpc.private_subnets])
        )
        CfnOutput(
            self, "DatabaseSubnetIds",
            value=",".join([s.subnet_id for s in self.vpc.isolated_subnets])
        )

VPC Architecture Diagram

🔄 正在渲染 Mermaid 图表...

RDS Database Configuration

Production-Grade RDS Setup

from aws_cdk import (
    Stack,
    aws_rds as rds,
    aws_ec2 as ec2,
    aws_kms as kms,
    aws_secretsmanager as secretsmanager,
    Duration,
    RemovalPolicy
)

class DatabaseStack(Stack):
    def __init__(self, scope: Construct, construct_id: str,
                 vpc: ec2.Vpc,
                 database_sg: ec2.SecurityGroup,
                 **kwargs) -> None:
        super().__init__(scope, construct_id, **kwargs)

        self.vpc = vpc
        self.database_sg = database_sg

        # Create primary database
        self.primary_db = self._create_primary_database()

        # Create read replica
        self.read_replica = self._create_read_replica()

        # Create ElastiCache cluster
        self.cache_cluster = self._create_cache_cluster()

        # Output database information
        self._create_outputs()

    def _create_primary_database(self):
        """Create primary database"""

        # Create KMS key for encryption
        db_key = kms.Key(
            self,
            "DatabaseKey",
            description="KMS key for database encryption",
            enable_key_rotation=True
        )

        # Create database credentials
        credentials = rds.Credentials.from_generated_secret(
            username="dbadmin",
            secret_name="rds-credentials",
            exclude_characters='/@"\'\\;'
        )

        # Create subnet group
        subnet_group = rds.SubnetGroup(
            self,
            "DatabaseSubnetGroup",
            description="Subnet group for RDS database",
            vpc=self.vpc,
            vpc_subnets=ec2.SubnetSelection(
                subnet_type=ec2.SubnetType.PRIVATE_ISOLATED
            ),
            removal_policy=RemovalPolicy.DESTROY
        )

        # Create parameter group
        parameter_group = rds.ParameterGroup(
            self,
            "DatabaseParameterGroup",
            engine=rds.DatabaseInstanceEngine.postgres(
                version=rds.PostgresEngineVersion.VER_13_7
            ),
            description="Custom parameter group for PostgreSQL",
            parameters={
                "shared_preload_libraries": "pg_stat_statements",
                "log_statement": "all",
                "log_min_duration_statement": "1000",
                "log_checkpoints": "on",
                "log_lock_waits": "on"
            }
        )

        # Create RDS instance
        database = rds.DatabaseInstance(
            self,
            "PrimaryDatabase",
            engine=rds.DatabaseInstanceEngine.postgres(
                version=rds.PostgresEngineVersion.VER_13_7
            ),
            instance_type=ec2.InstanceType.of(
                ec2.InstanceClass.BURSTABLE3,
                ec2.InstanceSize.MEDIUM
            ),
            credentials=credentials,
            vpc=self.vpc,
            subnet_group=subnet_group,
            security_groups=[self.database_sg],

            # Database configuration
            database_name="appdb",
            port=5432,
            parameter_group=parameter_group,

            # Storage configuration
            allocated_storage=100,  # GB
            max_allocated_storage=1000,  # Enable auto scaling
            storage_type=rds.StorageType.GP2,
            storage_encrypted=True,
            storage_encryption_key=db_key,

            # Backup configuration
            backup_retention=Duration.days(7),
            backup_window="03:00-04:00",  # UTC time
            preferred_backup_window="03:00-04:00",

            # Maintenance configuration
            maintenance_window="Sun:04:00-Sun:05:00",
            auto_minor_version_upgrade=True,

            # High availability
            multi_az=True,

            # Monitoring
            monitoring_interval=Duration.seconds(60),
            enable_performance_insights=True,
            performance_insight_retention=rds.PerformanceInsightRetention.DEFAULT,
            performance_insight_encryption_key=db_key,

            # Log exports
            cloudwatch_logs_exports=["postgresql"],
            cloudwatch_logs_retention=logs.RetentionDays.ONE_MONTH,

            # Deletion protection
            deletion_protection=False,  # Dev environment, set to True for production
            delete_automated_backups=True,
            removal_policy=RemovalPolicy.DESTROY  # Dev environment
        )

        return database

    def _create_read_replica(self):
        """Create read replica"""
        read_replica = rds.DatabaseInstanceReadReplica(
            self,
            "ReadReplica",
            source_database_instance=self.primary_db,
            instance_type=ec2.InstanceType.of(
                ec2.InstanceClass.BURSTABLE3,
                ec2.InstanceSize.SMALL
            ),
            vpc=self.vpc,
            vpc_subnets=ec2.SubnetSelection(
                subnet_type=ec2.SubnetType.PRIVATE_ISOLATED
            ),
            security_groups=[self.database_sg],

            # Read replica configuration
            auto_minor_version_upgrade=True,
            backup_retention=Duration.days(0),  # Read replicas don't need backups
            monitoring_interval=Duration.seconds(60),
            enable_performance_insights=True,

            deletion_protection=False,
            removal_policy=RemovalPolicy.DESTROY
        )

        return read_replica

    def _create_cache_cluster(self):
        """Create ElastiCache Redis cluster"""

        # Create cache subnet group
        cache_subnet_group = elasticache.CfnSubnetGroup(
            self,
            "CacheSubnetGroup",
            description="Subnet group for ElastiCache",
            subnet_ids=[subnet.subnet_id for subnet in self.vpc.private_subnets]
        )

        # Create cache parameter group
        cache_parameter_group = elasticache.CfnParameterGroup(
            self,
            "CacheParameterGroup",
            cache_parameter_group_family="redis6.x",
            description="Custom parameter group for Redis",
            properties={
                "maxmemory-policy": "allkeys-lru",
                "timeout": "300"
            }
        )

        # Create Redis replication group
        redis_cluster = elasticache.CfnReplicationGroup(
            self,
            "RedisCluster",
            description="Redis cluster for application caching",

            # Cluster configuration
            replication_group_id="app-redis-cluster",
            num_cache_clusters=2,  # Primary node + 1 read replica

            # Instance configuration
            cache_node_type="cache.t3.micro",
            engine="redis",
            engine_version="6.2",
            port=6379,

            # Network configuration
            cache_subnet_group_name=cache_subnet_group.ref,
            security_group_ids=[self.database_sg.security_group_id],

            # Parameter group
            cache_parameter_group_name=cache_parameter_group.ref,

            # Security configuration
            at_rest_encryption_enabled=True,
            transit_encryption_enabled=True,
            auth_token="your-secure-auth-token-here",  # Production should use Secrets Manager

            # Backup configuration
            automatic_failover_enabled=True,
            snapshot_retention_limit=5,
            snapshot_window="03:00-05:00",
            preferred_maintenance_window="sun:05:00-sun:06:00",

            # Notifications
            notification_topic_arn="",  # Can set SNS topic to receive notifications

            # Logs
            log_delivery_configurations=[
                elasticache.CfnReplicationGroup.LogDeliveryConfigurationRequestProperty(
                    destination_type="cloudwatch-logs",
                    destination_details=elasticache.CfnReplicationGroup.DestinationDetailsProperty(
                        cloud_watch_logs_details=elasticache.CfnReplicationGroup.CloudWatchLogsDestinationDetailsProperty(
                            log_group="/aws/elasticache/redis"
                        )
                    ),
                    log_format="json",
                    log_type="slow-log"
                )
            ]
        )

        # Add dependencies
        redis_cluster.add_dependency(cache_subnet_group)
        redis_cluster.add_dependency(cache_parameter_group)

        return redis_cluster

    def _create_outputs(self):
        """Create outputs"""
        CfnOutput(
            self,
            "DatabaseEndpoint",
            value=self.primary_db.instance_endpoint.hostname,
            description="Primary database endpoint"
        )

        CfnOutput(
            self,
            "DatabaseSecretArn",
            value=self.primary_db.secret.secret_arn,
            description="Database credentials secret ARN"
        )

        CfnOutput(
            self,
            "ReadReplicaEndpoint",
            value=self.read_replica.instance_endpoint.hostname,
            description="Read replica endpoint"
        )

        CfnOutput(
            self,
            "RedisEndpoint",
            value=self.redis_cluster.attr_redis_endpoint_address,
            description="Redis cluster endpoint"
        )

Lambda and API Gateway Serverless Architecture

Complete Serverless API

from aws_cdk import (
    Stack,
    aws_lambda as lambda_,
    aws_apigateway as apigw,
    aws_dynamodb as dynamodb,
    aws_cognito as cognito,
    aws_iam as iam,
    Duration,
    RemovalPolicy
)

class ServerlessApiStack(Stack):
    def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
        super().__init__(scope, construct_id, **kwargs)

        # Create user pool for authentication
        self.user_pool = self._create_user_pool()

        # Create DynamoDB table
        self.table = self._create_dynamodb_table()

        # Create Lambda layer
        self.common_layer = self._create_lambda_layer()

        # Create Lambda functions
        self.functions = self._create_lambda_functions()

        # Create API Gateway
        self.api = self._create_api_gateway()

        # Configure API routes
        self._configure_api_routes()

        # Output API information
        self._create_outputs()

    def _create_user_pool(self):
        """Create Cognito user pool"""
        user_pool = cognito.UserPool(
            self,
            "ApiUserPool",
            user_pool_name="api-users",

            # Registration configuration
            self_sign_up_enabled=True,
            sign_in_aliases=cognito.SignInAliases(email=True),

            # Password policy
            password_policy=cognito.PasswordPolicy(
                min_length=8,
                require_lowercase=True,
                require_uppercase=True,
                require_digits=True,
                require_symbols=False
            ),

            # Account recovery
            account_recovery=cognito.AccountRecovery.EMAIL_ONLY,

            # Attribute verification
            auto_verify=cognito.AutoVerifiedAttrs(email=True),

            # User attributes
            standard_attributes=cognito.StandardAttributes(
                email=cognito.StandardAttribute(required=True, mutable=True),
                given_name=cognito.StandardAttribute(required=True, mutable=True),
                family_name=cognito.StandardAttribute(required=True, mutable=True)
            ),

            # Removal policy
            removal_policy=RemovalPolicy.DESTROY
        )

        # Create user pool client
        user_pool_client = user_pool.add_client(
            "ApiClient",
            user_pool_client_name="api-client",

            # Authentication flows
            auth_flows=cognito.AuthFlow(
                user_srp=True,
                user_password=True,
                admin_user_password=True
            ),

            # Token validity
            access_token_validity=Duration.minutes(60),
            id_token_validity=Duration.minutes(60),
            refresh_token_validity=Duration.days(30),

            # Read/write permissions
            read_attributes=cognito.ClientAttributes(
                email=True,
                given_name=True,
                family_name=True
            ),
            write_attributes=cognito.ClientAttributes(
                email=True,
                given_name=True,
                family_name=True
            )
        )

        return user_pool

    def _create_dynamodb_table(self):
        """Create DynamoDB table"""
        table = dynamodb.Table(
            self,
            "ApiDataTable",
            table_name="api-data",

            # Primary key design
            partition_key=dynamodb.Attribute(
                name="PK",
                type=dynamodb.AttributeType.STRING
            ),
            sort_key=dynamodb.Attribute(
                name="SK",
                type=dynamodb.AttributeType.STRING
            ),

            # Billing mode
            billing_mode=dynamodb.BillingMode.PAY_PER_REQUEST,

            # Global secondary indexes
            global_secondary_indexes=[
                dynamodb.GlobalSecondaryIndex(
                    index_name="GSI1",
                    partition_key=dynamodb.Attribute(
                        name="GSI1PK",
                        type=dynamodb.AttributeType.STRING
                    ),
                    sort_key=dynamodb.Attribute(
                        name="GSI1SK",
                        type=dynamodb.AttributeType.STRING
                    ),
                    projection_type=dynamodb.ProjectionType.ALL
                )
            ],

            # Stream configuration
            stream=dynamodb.StreamViewType.NEW_AND_OLD_IMAGES,

            # Point-in-time recovery
            point_in_time_recovery=True,

            # Removal policy
            removal_policy=RemovalPolicy.DESTROY
        )

        return table

    def _create_lambda_layer(self):
        """Create Lambda layer"""
        layer = lambda_.LayerVersion(
            self,
            "CommonLayer",
            code=lambda_.Code.from_asset("layers/common"),
            compatible_runtimes=[lambda_.Runtime.PYTHON_3_9],
            description="Common utilities and dependencies",
            layer_version_name="common-utils"
        )

        return layer

    def _create_lambda_functions(self):
        """Create Lambda functions"""

        # Common environment variables
        common_env = {
            "TABLE_NAME": self.table.table_name,
            "USER_POOL_ID": self.user_pool.user_pool_id,
            "POWERTOOLS_SERVICE_NAME": "api-service",
            "LOG_LEVEL": "INFO"
        }

        # Common configuration
        function_props = {
            "runtime": lambda_.Runtime.PYTHON_3_9,
            "timeout": Duration.seconds(30),
            "memory_size": 256,
            "layers": [self.common_layer],
            "environment": common_env,
            "tracing": lambda_.Tracing.ACTIVE  # Enable X-Ray tracing
        }

        functions = {}

        # User management function
        functions['users'] = lambda_.Function(
            self,
            "UsersFunction",
            handler="users.handler",
            code=lambda_.Code.from_asset("lambda/users"),
            description="Handle user operations",
            **function_props
        )

        # Product management function
        functions['products'] = lambda_.Function(
            self,
            "ProductsFunction",
            handler="products.handler",
            code=lambda_.Code.from_asset("lambda/products"),
            description="Handle product operations",
            **function_props
        )

        # Order management function
        functions['orders'] = lambda_.Function(
            self,
            "OrdersFunction",
            handler="orders.handler",
            code=lambda_.Code.from_asset("lambda/orders"),
            description="Handle order operations",
            **function_props
        )

        # Authorizer function
        functions['authorizer'] = lambda_.Function(
            self,
            "AuthorizerFunction",
            handler="authorizer.handler",
            code=lambda_.Code.from_asset("lambda/authorizer"),
            description="Custom API Gateway authorizer",
            timeout=Duration.seconds(10),
            **{k: v for k, v in function_props.items() if k != 'timeout'}
        )

        # Grant DynamoDB permissions to all functions
        for func in functions.values():
            if func != functions['authorizer']:
                self.table.grant_read_write_data(func)

        return functions

    def _create_api_gateway(self):
        """Create API Gateway"""

        # Create custom authorizer
        authorizer = apigw.RequestAuthorizer(
            self,
            "CustomAuthorizer",
            handler=self.functions['authorizer'],
            identity_sources=[
                apigw.IdentitySource.header('Authorization'),
                apigw.IdentitySource.header('x-api-key')
            ],
            results_cache_ttl=Duration.minutes(5)
        )

        # Create API
        api = apigw.RestApi(
            self,
            "ServerlessApi",
            rest_api_name="serverless-api",
            description="Serverless API with Lambda and DynamoDB",

            # CORS configuration
            default_cors_preflight_options=apigw.CorsOptions(
                allow_origins=apigw.Cors.ALL_ORIGINS,
                allow_methods=apigw.Cors.ALL_METHODS,
                allow_headers=[
                    "Content-Type",
                    "X-Amz-Date",
                    "Authorization",
                    "X-Api-Key",
                    "X-Amz-Security-Token"
                ]
            ),

            # Deployment configuration
            deploy=True,
            deploy_options=apigw.StageOptions(
                stage_name="prod",
                throttling_rate_limit=1000,
                throttling_burst_limit=2000,
                logging_level=apigw.MethodLoggingLevel.INFO,
                data_trace_enabled=True,
                metrics_enabled=True,

                # Cache configuration
                caching_enabled=True,
                cache_cluster_enabled=True,
                cache_cluster_size=apigw.CacheClusterSize.SMALL,

                # Access logs
                access_log_destination=apigw.LogGroupLogDestination(
                    logs.LogGroup(
                        self,
                        "ApiAccessLogs",
                        retention=logs.RetentionDays.ONE_MONTH
                    )
                ),
                access_log_format=apigw.AccessLogFormat.json_with_standard_fields()
            ),

            # Endpoint configuration
            endpoint_configuration=apigw.EndpointConfiguration(
                types=[apigw.EndpointType.REGIONAL]
            )
        )

        return api

    def _configure_api_routes(self):
        """Configure API routes"""

        # v1 API root path
        v1 = self.api.root.add_resource("v1")

        # User routes
        users = v1.add_resource("users")
        users.add_method(
            "GET",
            apigw.LambdaIntegration(self.functions['users']),
            authorizer=self.authorizer,
            authorization_type=apigw.AuthorizationType.CUSTOM,
            request_validator=apigw.RequestValidator(
                self,
                "UsersValidator",
                rest_api=self.api,
                validate_request_parameters=True,
                validate_request_body=True
            )
        )
        users.add_method(
            "POST",
            apigw.LambdaIntegration(self.functions['users']),
            authorizer=self.authorizer,
            authorization_type=apigw.AuthorizationType.CUSTOM
        )

        # User detail route
        user_detail = users.add_resource("{userId}")
        user_detail.add_method(
            "GET",
            apigw.LambdaIntegration(self.functions['users']),
            authorizer=self.authorizer,
            authorization_type=apigw.AuthorizationType.CUSTOM
        )
        user_detail.add_method(
            "PUT",
            apigw.LambdaIntegration(self.functions['users']),
            authorizer=self.authorizer,
            authorization_type=apigw.AuthorizationType.CUSTOM
        )
        user_detail.add_method(
            "DELETE",
            apigw.LambdaIntegration(self.functions['users']),
            authorizer=self.authorizer,
            authorization_type=apigw.AuthorizationType.CUSTOM
        )

        # Product routes
        products = v1.add_resource("products")
        products.add_method("ANY", apigw.LambdaIntegration(self.functions['products']))

        # Product detail route (proxy integration)
        product_proxy = products.add_resource("{proxy+}")
        product_proxy.add_method("ANY", apigw.LambdaIntegration(self.functions['products']))

        # Order routes
        orders = v1.add_resource("orders")
        orders.add_method("ANY", apigw.LambdaIntegration(self.functions['orders']))
        order_proxy = orders.add_resource("{proxy+}")
        order_proxy.add_method("ANY", apigw.LambdaIntegration(self.functions['orders']))

        # Health check route (no authentication required)
        health = self.api.root.add_resource("health")
        health.add_method(
            "GET",
            apigw.MockIntegration(
                integration_responses=[
                    apigw.IntegrationResponse(
                        status_code="200",
                        response_templates={
                            "application/json": '{"status": "healthy", "timestamp": "$context.requestTime"}'
                        }
                    )
                ],
                request_templates={
                    "application/json": '{"statusCode": 200}'
                }
            ),
            method_responses=[
                apigw.MethodResponse(
                    status_code="200",
                    response_models={
                        "application/json": apigw.Model.EMPTY_MODEL
                    }
                )
            ]
        )

    def _create_outputs(self):
        """Create outputs"""
        CfnOutput(
            self,
            "ApiUrl",
            value=self.api.url,
            description="API Gateway URL"
        )

        CfnOutput(
            self,
            "UserPoolId",
            value=self.user_pool.user_pool_id,
            description="Cognito User Pool ID"
        )

        CfnOutput(
            self,
            "TableName",
            value=self.table.table_name,
            description="DynamoDB table name"
        )

Monitoring and Messaging Services

CloudWatch Monitoring Configuration

from aws_cdk import (
    Stack,
    aws_cloudwatch as cloudwatch,
    aws_cloudwatch_actions as cw_actions,
    aws_sns as sns,
    aws_sns_subscriptions as subscriptions,
    aws_lambda as lambda_,
    aws_logs as logs,
    Duration
)

class MonitoringStack(Stack):
    def __init__(self, scope: Construct, construct_id: str,
                 lambda_functions: dict,
                 api_gateway: apigw.RestApi,
                 database: rds.DatabaseInstance,
                 **kwargs) -> None:
        super().__init__(scope, construct_id, **kwargs)

        self.functions = lambda_functions
        self.api = api_gateway
        self.database = database

        # Create SNS topic for alarms
        self.alarm_topic = self._create_alarm_topic()

        # Create CloudWatch dashboard
        self.dashboard = self._create_dashboard()

        # Create alarms
        self._create_alarms()

        # Create custom metrics
        self._create_custom_metrics()

    def _create_alarm_topic(self):
        """Create alarm topic"""
        topic = sns.Topic(
            self,
            "AlarmTopic",
            topic_name="system-alarms",
            display_name="System Alarms"
        )

        # Add email subscription
        topic.add_subscription(
            subscriptions.EmailSubscription("admin@example.com")
        )

        # Add Slack notification (via Lambda)
        slack_notifier = lambda_.Function(
            self,
            "SlackNotifier",
            runtime=lambda_.Runtime.PYTHON_3_9,
            handler="slack_notifier.handler",
            code=lambda_.Code.from_inline("""
import json
import urllib3
import os

def handler(event, context):
    webhook_url = os.environ['SLACK_WEBHOOK_URL']

    for record in event['Records']:
        sns_message = json.loads(record['Sns']['Message'])
        alarm_name = sns_message['AlarmName']
        new_state = sns_message['NewStateValue']
        reason = sns_message['NewStateReason']

        color = "danger" if new_state == "ALARM" else "good"

        slack_message = {
            "attachments": [{
                "color": color,
                "title": f"CloudWatch Alarm: {alarm_name}",
                "text": f"State: {new_state}\\nReason: {reason}",
                "ts": record['Sns']['Timestamp']
            }]
        }

        http = urllib3.PoolManager()
        response = http.request(
            'POST',
            webhook_url,
            body=json.dumps(slack_message).encode('utf-8'),
            headers={'Content-Type': 'application/json'}
        )

        print(f"Slack notification sent: {response.status}")

    return {'statusCode': 200}
            """),
            environment={
                "SLACK_WEBHOOK_URL": "https://hooks.slack.com/services/YOUR/SLACK/WEBHOOK"
            },
            timeout=Duration.seconds(30)
        )

        topic.add_subscription(
            subscriptions.LambdaSubscription(slack_notifier)
        )

        return topic

    def _create_dashboard(self):
        """Create CloudWatch dashboard"""
        dashboard = cloudwatch.Dashboard(
            self,
            "SystemDashboard",
            dashboard_name="system-overview"
        )

        # API Gateway metrics
        api_widgets = [
            cloudwatch.GraphWidget(
                title="API Gateway Requests",
                left=[
                    cloudwatch.Metric(
                        namespace="AWS/ApiGateway",
                        metric_name="Count",
                        dimensions_map={
                            "ApiName": self.api.rest_api_name
                        },
                        statistic="Sum"
                    )
                ],
                period=Duration.minutes(5)
            ),
            cloudwatch.GraphWidget(
                title="API Gateway Latency",
                left=[
                    cloudwatch.Metric(
                        namespace="AWS/ApiGateway",
                        metric_name="Latency",
                        dimensions_map={
                            "ApiName": self.api.rest_api_name
                        },
                        statistic="Average"
                    )
                ],
                period=Duration.minutes(5)
            ),
            cloudwatch.GraphWidget(
                title="API Gateway Errors",
                left=[
                    cloudwatch.Metric(
                        namespace="AWS/ApiGateway",
                        metric_name="4XXError",
                        dimensions_map={
                            "ApiName": self.api.rest_api_name
                        },
                        statistic="Sum"
                    ),
                    cloudwatch.Metric(
                        namespace="AWS/ApiGateway",
                        metric_name="5XXError",
                        dimensions_map={
                            "ApiName": self.api.rest_api_name
                        },
                        statistic="Sum"
                    )
                ],
                period=Duration.minutes(5)
            )
        ]

        # Lambda metrics
        lambda_widgets = []
        for func_name, func in self.functions.items():
            lambda_widgets.extend([
                cloudwatch.GraphWidget(
                    title=f"Lambda {func_name} - Invocations",
                    left=[func.metric_invocations()],
                    period=Duration.minutes(5)
                ),
                cloudwatch.GraphWidget(
                    title=f"Lambda {func_name} - Duration",
                    left=[func.metric_duration()],
                    period=Duration.minutes(5)
                ),
                cloudwatch.GraphWidget(
                    title=f"Lambda {func_name} - Errors",
                    left=[func.metric_errors()],
                    period=Duration.minutes(5)
                )
            ])

        # RDS metrics
        rds_widgets = [
            cloudwatch.GraphWidget(
                title="RDS CPU Utilization",
                left=[
                    cloudwatch.Metric(
                        namespace="AWS/RDS",
                        metric_name="CPUUtilization",
                        dimensions_map={
                            "DBInstanceIdentifier": self.database.instance_identifier
                        },
                        statistic="Average"
                    )
                ],
                period=Duration.minutes(5)
            ),
            cloudwatch.GraphWidget(
                title="RDS Connections",
                left=[
                    cloudwatch.Metric(
                        namespace="AWS/RDS",
                        metric_name="DatabaseConnections",
                        dimensions_map={
                            "DBInstanceIdentifier": self.database.instance_identifier
                        },
                        statistic="Average"
                    )
                ],
                period=Duration.minutes(5)
            )
        ]

        # Add all components to dashboard
        dashboard.add_widgets(*api_widgets)
        dashboard.add_widgets(*lambda_widgets)
        dashboard.add_widgets(*rds_widgets)

        return dashboard

    def _create_alarms(self):
        """Create alarms"""

        # API Gateway alarms
        high_latency_alarm = cloudwatch.Alarm(
            self,
            "ApiHighLatency",
            alarm_name="API-High-Latency",
            metric=cloudwatch.Metric(
                namespace="AWS/ApiGateway",
                metric_name="Latency",
                dimensions_map={
                    "ApiName": self.api.rest_api_name
                },
                statistic="Average"
            ),
            threshold=5000,  # 5 seconds
            evaluation_periods=2,
            datapoints_to_alarm=2,
            comparison_operator=cloudwatch.ComparisonOperator.GREATER_THAN_THRESHOLD,
            alarm_description="API Gateway latency is high",
            treat_missing_data=cloudwatch.TreatMissingData.NOT_BREACHING
        )

        high_latency_alarm.add_alarm_action(
            cw_actions.SnsAction(self.alarm_topic)
        )

        # Lambda error alarms
        for func_name, func in self.functions.items():
            error_alarm = cloudwatch.Alarm(
                self,
                f"{func_name}Errors",
                alarm_name=f"Lambda-{func_name}-Errors",
                metric=func.metric_errors(period=Duration.minutes(5)),
                threshold=5,
                evaluation_periods=2,
                alarm_description=f"High error rate for {func_name} function"
            )

            error_alarm.add_alarm_action(
                cw_actions.SnsAction(self.alarm_topic)
            )

        # RDS alarms
        db_cpu_alarm = cloudwatch.Alarm(
            self,
            "DatabaseHighCPU",
            alarm_name="RDS-High-CPU",
            metric=cloudwatch.Metric(
                namespace="AWS/RDS",
                metric_name="CPUUtilization",
                dimensions_map={
                    "DBInstanceIdentifier": self.database.instance_identifier
                },
                statistic="Average"
            ),
            threshold=80,
            evaluation_periods=2,
            alarm_description="Database CPU utilization is high"
        )

        db_cpu_alarm.add_alarm_action(
            cw_actions.SnsAction(self.alarm_topic)
        )

    def _create_custom_metrics(self):
        """Create custom metrics"""

        # Create Lambda function to publish custom metrics
        metrics_publisher = lambda_.Function(
            self,
            "MetricsPublisher",
            runtime=lambda_.Runtime.PYTHON_3_9,
            handler="metrics.handler",
            code=lambda_.Code.from_inline("""
import boto3
import json
import time

cloudwatch = boto3.client('cloudwatch')

def handler(event, context):
    # Publish custom business metrics

    # Example: Number of user registrations
    cloudwatch.put_metric_data(
        Namespace='MyApp/Users',
        MetricData=[
            {
                'MetricName': 'NewRegistrations',
                'Value': event.get('new_users', 0),
                'Unit': 'Count',
                'Timestamp': time.time()
            }
        ]
    )

    # Example: Order amount
    if 'order_amount' in event:
        cloudwatch.put_metric_data(
            Namespace='MyApp/Orders',
            MetricData=[
                {
                    'MetricName': 'OrderValue',
                    'Value': event['order_amount'],
                    'Unit': 'None',
                    'Timestamp': time.time()
                }
            ]
        )

    return {'statusCode': 200}
            """),
            timeout=Duration.seconds(30)
        )

        # Grant permission to publish metrics
        metrics_publisher.add_to_role_policy(
            iam.PolicyStatement(
                effect=iam.Effect.ALLOW,
                actions=["cloudwatch:PutMetricData"],
                resources=["*"]
            )
        )

        return metrics_publisher

This chapter comprehensively covers the integration of major AWS services in CDK, including network architecture, database configuration, serverless architecture, and monitoring setup. These concepts form the foundation for building production-grade applications.