Chapter 5: Common AWS Service Integration
Haiyue
44min
Learning Objectives
- Master CDK implementation of VPC network architecture
- Learn to configure EC2, RDS, ElastiCache and other compute and storage services
- Understand serverless architecture implementation with API Gateway and Lambda
- Master configuration of monitoring and messaging services like CloudWatch, SNS, and SQS
VPC Network Architecture
Complete VPC Setup
from aws_cdk import (
Stack,
aws_ec2 as ec2,
aws_logs as logs,
CfnOutput
)
from constructs import Construct
class NetworkingStack(Stack):
def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
super().__init__(scope, construct_id, **kwargs)
# Create VPC
self.vpc = ec2.Vpc(
self,
"MainVPC",
vpc_name="main-vpc",
max_azs=3, # Use 3 availability zones
cidr="10.0.0.0/16",
# Define subnet configuration
subnet_configuration=[
# Public subnet - for load balancers, NAT gateways
ec2.SubnetConfiguration(
name="Public",
subnet_type=ec2.SubnetType.PUBLIC,
cidr_mask=24 # 10.0.0.0/24, 10.0.1.0/24, 10.0.2.0/24
),
# Private subnet - for application servers
ec2.SubnetConfiguration(
name="Private",
subnet_type=ec2.SubnetType.PRIVATE_WITH_EGRESS,
cidr_mask=24 # 10.0.10.0/24, 10.0.11.0/24, 10.0.12.0/24
),
# Database subnet - completely isolated
ec2.SubnetConfiguration(
name="Database",
subnet_type=ec2.SubnetType.PRIVATE_ISOLATED,
cidr_mask=24 # 10.0.20.0/24, 10.0.21.0/24, 10.0.22.0/24
)
],
# Enable DNS
enable_dns_hostnames=True,
enable_dns_support=True,
# NAT gateway configuration
nat_gateways=1, # Production environments should use one per AZ
# Flow logs
flow_logs={
"FlowLogsCloudWatch": ec2.FlowLogOptions(
destination=ec2.FlowLogDestination.to_cloud_watch_logs(
log_group=logs.LogGroup(
self,
"VpcFlowLogsGroup",
retention=logs.RetentionDays.ONE_MONTH
)
),
traffic_type=ec2.FlowLogTrafficType.ALL
)
}
)
# Create security groups
self._create_security_groups()
# Create VPC endpoints (reduce NAT gateway costs)
self._create_vpc_endpoints()
# Output network information
self._create_outputs()
def _create_security_groups(self):
"""Create security groups"""
# Web tier security group
self.web_sg = ec2.SecurityGroup(
self,
"WebSecurityGroup",
vpc=self.vpc,
description="Security group for web servers",
allow_all_outbound=True
)
# Allow HTTP/HTTPS traffic
self.web_sg.add_ingress_rule(
peer=ec2.Peer.any_ipv4(),
connection=ec2.Port.tcp(80),
description="Allow HTTP"
)
self.web_sg.add_ingress_rule(
peer=ec2.Peer.any_ipv4(),
connection=ec2.Port.tcp(443),
description="Allow HTTPS"
)
# Application tier security group
self.app_sg = ec2.SecurityGroup(
self,
"AppSecurityGroup",
vpc=self.vpc,
description="Security group for application servers",
allow_all_outbound=True
)
# Only allow traffic from web tier
self.app_sg.add_ingress_rule(
peer=self.web_sg,
connection=ec2.Port.tcp(8080),
description="Allow traffic from web tier"
)
# Database tier security group
self.db_sg = ec2.SecurityGroup(
self,
"DatabaseSecurityGroup",
vpc=self.vpc,
description="Security group for database servers"
)
# Only allow database connections from application tier
self.db_sg.add_ingress_rule(
peer=self.app_sg,
connection=ec2.Port.tcp(5432),
description="Allow PostgreSQL from app tier"
)
self.db_sg.add_ingress_rule(
peer=self.app_sg,
connection=ec2.Port.tcp(3306),
description="Allow MySQL from app tier"
)
# Cache tier security group
self.cache_sg = ec2.SecurityGroup(
self,
"CacheSecurityGroup",
vpc=self.vpc,
description="Security group for cache servers"
)
self.cache_sg.add_ingress_rule(
peer=self.app_sg,
connection=ec2.Port.tcp(6379),
description="Allow Redis from app tier"
)
# Management access security group
self.bastion_sg = ec2.SecurityGroup(
self,
"BastionSecurityGroup",
vpc=self.vpc,
description="Security group for bastion host"
)
# Only allow SSH access from company IP
self.bastion_sg.add_ingress_rule(
peer=ec2.Peer.ipv4("203.0.113.0/24"), # Replace with actual company IP range
connection=ec2.Port.tcp(22),
description="Allow SSH from company network"
)
def _create_vpc_endpoints(self):
"""Create VPC endpoints to reduce NAT gateway usage"""
# S3 gateway endpoint
self.vpc.add_gateway_endpoint(
"S3Endpoint",
service=ec2.GatewayVpcEndpointAwsService.S3,
subnets=[
ec2.SubnetSelection(subnet_type=ec2.SubnetType.PRIVATE_WITH_EGRESS),
ec2.SubnetSelection(subnet_type=ec2.SubnetType.PRIVATE_ISOLATED)
]
)
# DynamoDB gateway endpoint
self.vpc.add_gateway_endpoint(
"DynamoDbEndpoint",
service=ec2.GatewayVpcEndpointAwsService.DYNAMODB,
subnets=[
ec2.SubnetSelection(subnet_type=ec2.SubnetType.PRIVATE_WITH_EGRESS),
ec2.SubnetSelection(subnet_type=ec2.SubnetType.PRIVATE_ISOLATED)
]
)
# Interface endpoint security group
endpoint_sg = ec2.SecurityGroup(
self,
"VpcEndpointSG",
vpc=self.vpc,
description="Security group for VPC endpoints"
)
endpoint_sg.add_ingress_rule(
peer=ec2.Peer.ipv4(self.vpc.vpc_cidr_block),
connection=ec2.Port.tcp(443),
description="Allow HTTPS from VPC"
)
# Interface endpoints for common services
services = [
ec2.InterfaceVpcEndpointAwsService.ECR,
ec2.InterfaceVpcEndpointAwsService.ECR_DOCKER,
ec2.InterfaceVpcEndpointAwsService.LAMBDA,
ec2.InterfaceVpcEndpointAwsService.SECRETS_MANAGER,
ec2.InterfaceVpcEndpointAwsService.SSM,
ec2.InterfaceVpcEndpointAwsService.SSM_MESSAGES,
ec2.InterfaceVpcEndpointAwsService.EC2_MESSAGES
]
for service in services:
self.vpc.add_interface_endpoint(
f"{service.name}Endpoint",
service=service,
subnets=ec2.SubnetSelection(
subnet_type=ec2.SubnetType.PRIVATE_WITH_EGRESS
),
security_groups=[endpoint_sg]
)
def _create_outputs(self):
"""Create outputs"""
CfnOutput(self, "VpcId", value=self.vpc.vpc_id)
CfnOutput(self, "VpcCidr", value=self.vpc.vpc_cidr_block)
CfnOutput(
self, "PublicSubnetIds",
value=",".join([s.subnet_id for s in self.vpc.public_subnets])
)
CfnOutput(
self, "PrivateSubnetIds",
value=",".join([s.subnet_id for s in self.vpc.private_subnets])
)
CfnOutput(
self, "DatabaseSubnetIds",
value=",".join([s.subnet_id for s in self.vpc.isolated_subnets])
)
VPC Architecture Diagram
🔄 正在渲染 Mermaid 图表...
RDS Database Configuration
Production-Grade RDS Setup
from aws_cdk import (
Stack,
aws_rds as rds,
aws_ec2 as ec2,
aws_kms as kms,
aws_secretsmanager as secretsmanager,
Duration,
RemovalPolicy
)
class DatabaseStack(Stack):
def __init__(self, scope: Construct, construct_id: str,
vpc: ec2.Vpc,
database_sg: ec2.SecurityGroup,
**kwargs) -> None:
super().__init__(scope, construct_id, **kwargs)
self.vpc = vpc
self.database_sg = database_sg
# Create primary database
self.primary_db = self._create_primary_database()
# Create read replica
self.read_replica = self._create_read_replica()
# Create ElastiCache cluster
self.cache_cluster = self._create_cache_cluster()
# Output database information
self._create_outputs()
def _create_primary_database(self):
"""Create primary database"""
# Create KMS key for encryption
db_key = kms.Key(
self,
"DatabaseKey",
description="KMS key for database encryption",
enable_key_rotation=True
)
# Create database credentials
credentials = rds.Credentials.from_generated_secret(
username="dbadmin",
secret_name="rds-credentials",
exclude_characters='/@"\'\\;'
)
# Create subnet group
subnet_group = rds.SubnetGroup(
self,
"DatabaseSubnetGroup",
description="Subnet group for RDS database",
vpc=self.vpc,
vpc_subnets=ec2.SubnetSelection(
subnet_type=ec2.SubnetType.PRIVATE_ISOLATED
),
removal_policy=RemovalPolicy.DESTROY
)
# Create parameter group
parameter_group = rds.ParameterGroup(
self,
"DatabaseParameterGroup",
engine=rds.DatabaseInstanceEngine.postgres(
version=rds.PostgresEngineVersion.VER_13_7
),
description="Custom parameter group for PostgreSQL",
parameters={
"shared_preload_libraries": "pg_stat_statements",
"log_statement": "all",
"log_min_duration_statement": "1000",
"log_checkpoints": "on",
"log_lock_waits": "on"
}
)
# Create RDS instance
database = rds.DatabaseInstance(
self,
"PrimaryDatabase",
engine=rds.DatabaseInstanceEngine.postgres(
version=rds.PostgresEngineVersion.VER_13_7
),
instance_type=ec2.InstanceType.of(
ec2.InstanceClass.BURSTABLE3,
ec2.InstanceSize.MEDIUM
),
credentials=credentials,
vpc=self.vpc,
subnet_group=subnet_group,
security_groups=[self.database_sg],
# Database configuration
database_name="appdb",
port=5432,
parameter_group=parameter_group,
# Storage configuration
allocated_storage=100, # GB
max_allocated_storage=1000, # Enable auto scaling
storage_type=rds.StorageType.GP2,
storage_encrypted=True,
storage_encryption_key=db_key,
# Backup configuration
backup_retention=Duration.days(7),
backup_window="03:00-04:00", # UTC time
preferred_backup_window="03:00-04:00",
# Maintenance configuration
maintenance_window="Sun:04:00-Sun:05:00",
auto_minor_version_upgrade=True,
# High availability
multi_az=True,
# Monitoring
monitoring_interval=Duration.seconds(60),
enable_performance_insights=True,
performance_insight_retention=rds.PerformanceInsightRetention.DEFAULT,
performance_insight_encryption_key=db_key,
# Log exports
cloudwatch_logs_exports=["postgresql"],
cloudwatch_logs_retention=logs.RetentionDays.ONE_MONTH,
# Deletion protection
deletion_protection=False, # Dev environment, set to True for production
delete_automated_backups=True,
removal_policy=RemovalPolicy.DESTROY # Dev environment
)
return database
def _create_read_replica(self):
"""Create read replica"""
read_replica = rds.DatabaseInstanceReadReplica(
self,
"ReadReplica",
source_database_instance=self.primary_db,
instance_type=ec2.InstanceType.of(
ec2.InstanceClass.BURSTABLE3,
ec2.InstanceSize.SMALL
),
vpc=self.vpc,
vpc_subnets=ec2.SubnetSelection(
subnet_type=ec2.SubnetType.PRIVATE_ISOLATED
),
security_groups=[self.database_sg],
# Read replica configuration
auto_minor_version_upgrade=True,
backup_retention=Duration.days(0), # Read replicas don't need backups
monitoring_interval=Duration.seconds(60),
enable_performance_insights=True,
deletion_protection=False,
removal_policy=RemovalPolicy.DESTROY
)
return read_replica
def _create_cache_cluster(self):
"""Create ElastiCache Redis cluster"""
# Create cache subnet group
cache_subnet_group = elasticache.CfnSubnetGroup(
self,
"CacheSubnetGroup",
description="Subnet group for ElastiCache",
subnet_ids=[subnet.subnet_id for subnet in self.vpc.private_subnets]
)
# Create cache parameter group
cache_parameter_group = elasticache.CfnParameterGroup(
self,
"CacheParameterGroup",
cache_parameter_group_family="redis6.x",
description="Custom parameter group for Redis",
properties={
"maxmemory-policy": "allkeys-lru",
"timeout": "300"
}
)
# Create Redis replication group
redis_cluster = elasticache.CfnReplicationGroup(
self,
"RedisCluster",
description="Redis cluster for application caching",
# Cluster configuration
replication_group_id="app-redis-cluster",
num_cache_clusters=2, # Primary node + 1 read replica
# Instance configuration
cache_node_type="cache.t3.micro",
engine="redis",
engine_version="6.2",
port=6379,
# Network configuration
cache_subnet_group_name=cache_subnet_group.ref,
security_group_ids=[self.database_sg.security_group_id],
# Parameter group
cache_parameter_group_name=cache_parameter_group.ref,
# Security configuration
at_rest_encryption_enabled=True,
transit_encryption_enabled=True,
auth_token="your-secure-auth-token-here", # Production should use Secrets Manager
# Backup configuration
automatic_failover_enabled=True,
snapshot_retention_limit=5,
snapshot_window="03:00-05:00",
preferred_maintenance_window="sun:05:00-sun:06:00",
# Notifications
notification_topic_arn="", # Can set SNS topic to receive notifications
# Logs
log_delivery_configurations=[
elasticache.CfnReplicationGroup.LogDeliveryConfigurationRequestProperty(
destination_type="cloudwatch-logs",
destination_details=elasticache.CfnReplicationGroup.DestinationDetailsProperty(
cloud_watch_logs_details=elasticache.CfnReplicationGroup.CloudWatchLogsDestinationDetailsProperty(
log_group="/aws/elasticache/redis"
)
),
log_format="json",
log_type="slow-log"
)
]
)
# Add dependencies
redis_cluster.add_dependency(cache_subnet_group)
redis_cluster.add_dependency(cache_parameter_group)
return redis_cluster
def _create_outputs(self):
"""Create outputs"""
CfnOutput(
self,
"DatabaseEndpoint",
value=self.primary_db.instance_endpoint.hostname,
description="Primary database endpoint"
)
CfnOutput(
self,
"DatabaseSecretArn",
value=self.primary_db.secret.secret_arn,
description="Database credentials secret ARN"
)
CfnOutput(
self,
"ReadReplicaEndpoint",
value=self.read_replica.instance_endpoint.hostname,
description="Read replica endpoint"
)
CfnOutput(
self,
"RedisEndpoint",
value=self.redis_cluster.attr_redis_endpoint_address,
description="Redis cluster endpoint"
)
Lambda and API Gateway Serverless Architecture
Complete Serverless API
from aws_cdk import (
Stack,
aws_lambda as lambda_,
aws_apigateway as apigw,
aws_dynamodb as dynamodb,
aws_cognito as cognito,
aws_iam as iam,
Duration,
RemovalPolicy
)
class ServerlessApiStack(Stack):
def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
super().__init__(scope, construct_id, **kwargs)
# Create user pool for authentication
self.user_pool = self._create_user_pool()
# Create DynamoDB table
self.table = self._create_dynamodb_table()
# Create Lambda layer
self.common_layer = self._create_lambda_layer()
# Create Lambda functions
self.functions = self._create_lambda_functions()
# Create API Gateway
self.api = self._create_api_gateway()
# Configure API routes
self._configure_api_routes()
# Output API information
self._create_outputs()
def _create_user_pool(self):
"""Create Cognito user pool"""
user_pool = cognito.UserPool(
self,
"ApiUserPool",
user_pool_name="api-users",
# Registration configuration
self_sign_up_enabled=True,
sign_in_aliases=cognito.SignInAliases(email=True),
# Password policy
password_policy=cognito.PasswordPolicy(
min_length=8,
require_lowercase=True,
require_uppercase=True,
require_digits=True,
require_symbols=False
),
# Account recovery
account_recovery=cognito.AccountRecovery.EMAIL_ONLY,
# Attribute verification
auto_verify=cognito.AutoVerifiedAttrs(email=True),
# User attributes
standard_attributes=cognito.StandardAttributes(
email=cognito.StandardAttribute(required=True, mutable=True),
given_name=cognito.StandardAttribute(required=True, mutable=True),
family_name=cognito.StandardAttribute(required=True, mutable=True)
),
# Removal policy
removal_policy=RemovalPolicy.DESTROY
)
# Create user pool client
user_pool_client = user_pool.add_client(
"ApiClient",
user_pool_client_name="api-client",
# Authentication flows
auth_flows=cognito.AuthFlow(
user_srp=True,
user_password=True,
admin_user_password=True
),
# Token validity
access_token_validity=Duration.minutes(60),
id_token_validity=Duration.minutes(60),
refresh_token_validity=Duration.days(30),
# Read/write permissions
read_attributes=cognito.ClientAttributes(
email=True,
given_name=True,
family_name=True
),
write_attributes=cognito.ClientAttributes(
email=True,
given_name=True,
family_name=True
)
)
return user_pool
def _create_dynamodb_table(self):
"""Create DynamoDB table"""
table = dynamodb.Table(
self,
"ApiDataTable",
table_name="api-data",
# Primary key design
partition_key=dynamodb.Attribute(
name="PK",
type=dynamodb.AttributeType.STRING
),
sort_key=dynamodb.Attribute(
name="SK",
type=dynamodb.AttributeType.STRING
),
# Billing mode
billing_mode=dynamodb.BillingMode.PAY_PER_REQUEST,
# Global secondary indexes
global_secondary_indexes=[
dynamodb.GlobalSecondaryIndex(
index_name="GSI1",
partition_key=dynamodb.Attribute(
name="GSI1PK",
type=dynamodb.AttributeType.STRING
),
sort_key=dynamodb.Attribute(
name="GSI1SK",
type=dynamodb.AttributeType.STRING
),
projection_type=dynamodb.ProjectionType.ALL
)
],
# Stream configuration
stream=dynamodb.StreamViewType.NEW_AND_OLD_IMAGES,
# Point-in-time recovery
point_in_time_recovery=True,
# Removal policy
removal_policy=RemovalPolicy.DESTROY
)
return table
def _create_lambda_layer(self):
"""Create Lambda layer"""
layer = lambda_.LayerVersion(
self,
"CommonLayer",
code=lambda_.Code.from_asset("layers/common"),
compatible_runtimes=[lambda_.Runtime.PYTHON_3_9],
description="Common utilities and dependencies",
layer_version_name="common-utils"
)
return layer
def _create_lambda_functions(self):
"""Create Lambda functions"""
# Common environment variables
common_env = {
"TABLE_NAME": self.table.table_name,
"USER_POOL_ID": self.user_pool.user_pool_id,
"POWERTOOLS_SERVICE_NAME": "api-service",
"LOG_LEVEL": "INFO"
}
# Common configuration
function_props = {
"runtime": lambda_.Runtime.PYTHON_3_9,
"timeout": Duration.seconds(30),
"memory_size": 256,
"layers": [self.common_layer],
"environment": common_env,
"tracing": lambda_.Tracing.ACTIVE # Enable X-Ray tracing
}
functions = {}
# User management function
functions['users'] = lambda_.Function(
self,
"UsersFunction",
handler="users.handler",
code=lambda_.Code.from_asset("lambda/users"),
description="Handle user operations",
**function_props
)
# Product management function
functions['products'] = lambda_.Function(
self,
"ProductsFunction",
handler="products.handler",
code=lambda_.Code.from_asset("lambda/products"),
description="Handle product operations",
**function_props
)
# Order management function
functions['orders'] = lambda_.Function(
self,
"OrdersFunction",
handler="orders.handler",
code=lambda_.Code.from_asset("lambda/orders"),
description="Handle order operations",
**function_props
)
# Authorizer function
functions['authorizer'] = lambda_.Function(
self,
"AuthorizerFunction",
handler="authorizer.handler",
code=lambda_.Code.from_asset("lambda/authorizer"),
description="Custom API Gateway authorizer",
timeout=Duration.seconds(10),
**{k: v for k, v in function_props.items() if k != 'timeout'}
)
# Grant DynamoDB permissions to all functions
for func in functions.values():
if func != functions['authorizer']:
self.table.grant_read_write_data(func)
return functions
def _create_api_gateway(self):
"""Create API Gateway"""
# Create custom authorizer
authorizer = apigw.RequestAuthorizer(
self,
"CustomAuthorizer",
handler=self.functions['authorizer'],
identity_sources=[
apigw.IdentitySource.header('Authorization'),
apigw.IdentitySource.header('x-api-key')
],
results_cache_ttl=Duration.minutes(5)
)
# Create API
api = apigw.RestApi(
self,
"ServerlessApi",
rest_api_name="serverless-api",
description="Serverless API with Lambda and DynamoDB",
# CORS configuration
default_cors_preflight_options=apigw.CorsOptions(
allow_origins=apigw.Cors.ALL_ORIGINS,
allow_methods=apigw.Cors.ALL_METHODS,
allow_headers=[
"Content-Type",
"X-Amz-Date",
"Authorization",
"X-Api-Key",
"X-Amz-Security-Token"
]
),
# Deployment configuration
deploy=True,
deploy_options=apigw.StageOptions(
stage_name="prod",
throttling_rate_limit=1000,
throttling_burst_limit=2000,
logging_level=apigw.MethodLoggingLevel.INFO,
data_trace_enabled=True,
metrics_enabled=True,
# Cache configuration
caching_enabled=True,
cache_cluster_enabled=True,
cache_cluster_size=apigw.CacheClusterSize.SMALL,
# Access logs
access_log_destination=apigw.LogGroupLogDestination(
logs.LogGroup(
self,
"ApiAccessLogs",
retention=logs.RetentionDays.ONE_MONTH
)
),
access_log_format=apigw.AccessLogFormat.json_with_standard_fields()
),
# Endpoint configuration
endpoint_configuration=apigw.EndpointConfiguration(
types=[apigw.EndpointType.REGIONAL]
)
)
return api
def _configure_api_routes(self):
"""Configure API routes"""
# v1 API root path
v1 = self.api.root.add_resource("v1")
# User routes
users = v1.add_resource("users")
users.add_method(
"GET",
apigw.LambdaIntegration(self.functions['users']),
authorizer=self.authorizer,
authorization_type=apigw.AuthorizationType.CUSTOM,
request_validator=apigw.RequestValidator(
self,
"UsersValidator",
rest_api=self.api,
validate_request_parameters=True,
validate_request_body=True
)
)
users.add_method(
"POST",
apigw.LambdaIntegration(self.functions['users']),
authorizer=self.authorizer,
authorization_type=apigw.AuthorizationType.CUSTOM
)
# User detail route
user_detail = users.add_resource("{userId}")
user_detail.add_method(
"GET",
apigw.LambdaIntegration(self.functions['users']),
authorizer=self.authorizer,
authorization_type=apigw.AuthorizationType.CUSTOM
)
user_detail.add_method(
"PUT",
apigw.LambdaIntegration(self.functions['users']),
authorizer=self.authorizer,
authorization_type=apigw.AuthorizationType.CUSTOM
)
user_detail.add_method(
"DELETE",
apigw.LambdaIntegration(self.functions['users']),
authorizer=self.authorizer,
authorization_type=apigw.AuthorizationType.CUSTOM
)
# Product routes
products = v1.add_resource("products")
products.add_method("ANY", apigw.LambdaIntegration(self.functions['products']))
# Product detail route (proxy integration)
product_proxy = products.add_resource("{proxy+}")
product_proxy.add_method("ANY", apigw.LambdaIntegration(self.functions['products']))
# Order routes
orders = v1.add_resource("orders")
orders.add_method("ANY", apigw.LambdaIntegration(self.functions['orders']))
order_proxy = orders.add_resource("{proxy+}")
order_proxy.add_method("ANY", apigw.LambdaIntegration(self.functions['orders']))
# Health check route (no authentication required)
health = self.api.root.add_resource("health")
health.add_method(
"GET",
apigw.MockIntegration(
integration_responses=[
apigw.IntegrationResponse(
status_code="200",
response_templates={
"application/json": '{"status": "healthy", "timestamp": "$context.requestTime"}'
}
)
],
request_templates={
"application/json": '{"statusCode": 200}'
}
),
method_responses=[
apigw.MethodResponse(
status_code="200",
response_models={
"application/json": apigw.Model.EMPTY_MODEL
}
)
]
)
def _create_outputs(self):
"""Create outputs"""
CfnOutput(
self,
"ApiUrl",
value=self.api.url,
description="API Gateway URL"
)
CfnOutput(
self,
"UserPoolId",
value=self.user_pool.user_pool_id,
description="Cognito User Pool ID"
)
CfnOutput(
self,
"TableName",
value=self.table.table_name,
description="DynamoDB table name"
)
Monitoring and Messaging Services
CloudWatch Monitoring Configuration
from aws_cdk import (
Stack,
aws_cloudwatch as cloudwatch,
aws_cloudwatch_actions as cw_actions,
aws_sns as sns,
aws_sns_subscriptions as subscriptions,
aws_lambda as lambda_,
aws_logs as logs,
Duration
)
class MonitoringStack(Stack):
def __init__(self, scope: Construct, construct_id: str,
lambda_functions: dict,
api_gateway: apigw.RestApi,
database: rds.DatabaseInstance,
**kwargs) -> None:
super().__init__(scope, construct_id, **kwargs)
self.functions = lambda_functions
self.api = api_gateway
self.database = database
# Create SNS topic for alarms
self.alarm_topic = self._create_alarm_topic()
# Create CloudWatch dashboard
self.dashboard = self._create_dashboard()
# Create alarms
self._create_alarms()
# Create custom metrics
self._create_custom_metrics()
def _create_alarm_topic(self):
"""Create alarm topic"""
topic = sns.Topic(
self,
"AlarmTopic",
topic_name="system-alarms",
display_name="System Alarms"
)
# Add email subscription
topic.add_subscription(
subscriptions.EmailSubscription("admin@example.com")
)
# Add Slack notification (via Lambda)
slack_notifier = lambda_.Function(
self,
"SlackNotifier",
runtime=lambda_.Runtime.PYTHON_3_9,
handler="slack_notifier.handler",
code=lambda_.Code.from_inline("""
import json
import urllib3
import os
def handler(event, context):
webhook_url = os.environ['SLACK_WEBHOOK_URL']
for record in event['Records']:
sns_message = json.loads(record['Sns']['Message'])
alarm_name = sns_message['AlarmName']
new_state = sns_message['NewStateValue']
reason = sns_message['NewStateReason']
color = "danger" if new_state == "ALARM" else "good"
slack_message = {
"attachments": [{
"color": color,
"title": f"CloudWatch Alarm: {alarm_name}",
"text": f"State: {new_state}\\nReason: {reason}",
"ts": record['Sns']['Timestamp']
}]
}
http = urllib3.PoolManager()
response = http.request(
'POST',
webhook_url,
body=json.dumps(slack_message).encode('utf-8'),
headers={'Content-Type': 'application/json'}
)
print(f"Slack notification sent: {response.status}")
return {'statusCode': 200}
"""),
environment={
"SLACK_WEBHOOK_URL": "https://hooks.slack.com/services/YOUR/SLACK/WEBHOOK"
},
timeout=Duration.seconds(30)
)
topic.add_subscription(
subscriptions.LambdaSubscription(slack_notifier)
)
return topic
def _create_dashboard(self):
"""Create CloudWatch dashboard"""
dashboard = cloudwatch.Dashboard(
self,
"SystemDashboard",
dashboard_name="system-overview"
)
# API Gateway metrics
api_widgets = [
cloudwatch.GraphWidget(
title="API Gateway Requests",
left=[
cloudwatch.Metric(
namespace="AWS/ApiGateway",
metric_name="Count",
dimensions_map={
"ApiName": self.api.rest_api_name
},
statistic="Sum"
)
],
period=Duration.minutes(5)
),
cloudwatch.GraphWidget(
title="API Gateway Latency",
left=[
cloudwatch.Metric(
namespace="AWS/ApiGateway",
metric_name="Latency",
dimensions_map={
"ApiName": self.api.rest_api_name
},
statistic="Average"
)
],
period=Duration.minutes(5)
),
cloudwatch.GraphWidget(
title="API Gateway Errors",
left=[
cloudwatch.Metric(
namespace="AWS/ApiGateway",
metric_name="4XXError",
dimensions_map={
"ApiName": self.api.rest_api_name
},
statistic="Sum"
),
cloudwatch.Metric(
namespace="AWS/ApiGateway",
metric_name="5XXError",
dimensions_map={
"ApiName": self.api.rest_api_name
},
statistic="Sum"
)
],
period=Duration.minutes(5)
)
]
# Lambda metrics
lambda_widgets = []
for func_name, func in self.functions.items():
lambda_widgets.extend([
cloudwatch.GraphWidget(
title=f"Lambda {func_name} - Invocations",
left=[func.metric_invocations()],
period=Duration.minutes(5)
),
cloudwatch.GraphWidget(
title=f"Lambda {func_name} - Duration",
left=[func.metric_duration()],
period=Duration.minutes(5)
),
cloudwatch.GraphWidget(
title=f"Lambda {func_name} - Errors",
left=[func.metric_errors()],
period=Duration.minutes(5)
)
])
# RDS metrics
rds_widgets = [
cloudwatch.GraphWidget(
title="RDS CPU Utilization",
left=[
cloudwatch.Metric(
namespace="AWS/RDS",
metric_name="CPUUtilization",
dimensions_map={
"DBInstanceIdentifier": self.database.instance_identifier
},
statistic="Average"
)
],
period=Duration.minutes(5)
),
cloudwatch.GraphWidget(
title="RDS Connections",
left=[
cloudwatch.Metric(
namespace="AWS/RDS",
metric_name="DatabaseConnections",
dimensions_map={
"DBInstanceIdentifier": self.database.instance_identifier
},
statistic="Average"
)
],
period=Duration.minutes(5)
)
]
# Add all components to dashboard
dashboard.add_widgets(*api_widgets)
dashboard.add_widgets(*lambda_widgets)
dashboard.add_widgets(*rds_widgets)
return dashboard
def _create_alarms(self):
"""Create alarms"""
# API Gateway alarms
high_latency_alarm = cloudwatch.Alarm(
self,
"ApiHighLatency",
alarm_name="API-High-Latency",
metric=cloudwatch.Metric(
namespace="AWS/ApiGateway",
metric_name="Latency",
dimensions_map={
"ApiName": self.api.rest_api_name
},
statistic="Average"
),
threshold=5000, # 5 seconds
evaluation_periods=2,
datapoints_to_alarm=2,
comparison_operator=cloudwatch.ComparisonOperator.GREATER_THAN_THRESHOLD,
alarm_description="API Gateway latency is high",
treat_missing_data=cloudwatch.TreatMissingData.NOT_BREACHING
)
high_latency_alarm.add_alarm_action(
cw_actions.SnsAction(self.alarm_topic)
)
# Lambda error alarms
for func_name, func in self.functions.items():
error_alarm = cloudwatch.Alarm(
self,
f"{func_name}Errors",
alarm_name=f"Lambda-{func_name}-Errors",
metric=func.metric_errors(period=Duration.minutes(5)),
threshold=5,
evaluation_periods=2,
alarm_description=f"High error rate for {func_name} function"
)
error_alarm.add_alarm_action(
cw_actions.SnsAction(self.alarm_topic)
)
# RDS alarms
db_cpu_alarm = cloudwatch.Alarm(
self,
"DatabaseHighCPU",
alarm_name="RDS-High-CPU",
metric=cloudwatch.Metric(
namespace="AWS/RDS",
metric_name="CPUUtilization",
dimensions_map={
"DBInstanceIdentifier": self.database.instance_identifier
},
statistic="Average"
),
threshold=80,
evaluation_periods=2,
alarm_description="Database CPU utilization is high"
)
db_cpu_alarm.add_alarm_action(
cw_actions.SnsAction(self.alarm_topic)
)
def _create_custom_metrics(self):
"""Create custom metrics"""
# Create Lambda function to publish custom metrics
metrics_publisher = lambda_.Function(
self,
"MetricsPublisher",
runtime=lambda_.Runtime.PYTHON_3_9,
handler="metrics.handler",
code=lambda_.Code.from_inline("""
import boto3
import json
import time
cloudwatch = boto3.client('cloudwatch')
def handler(event, context):
# Publish custom business metrics
# Example: Number of user registrations
cloudwatch.put_metric_data(
Namespace='MyApp/Users',
MetricData=[
{
'MetricName': 'NewRegistrations',
'Value': event.get('new_users', 0),
'Unit': 'Count',
'Timestamp': time.time()
}
]
)
# Example: Order amount
if 'order_amount' in event:
cloudwatch.put_metric_data(
Namespace='MyApp/Orders',
MetricData=[
{
'MetricName': 'OrderValue',
'Value': event['order_amount'],
'Unit': 'None',
'Timestamp': time.time()
}
]
)
return {'statusCode': 200}
"""),
timeout=Duration.seconds(30)
)
# Grant permission to publish metrics
metrics_publisher.add_to_role_policy(
iam.PolicyStatement(
effect=iam.Effect.ALLOW,
actions=["cloudwatch:PutMetricData"],
resources=["*"]
)
)
return metrics_publisher
This chapter comprehensively covers the integration of major AWS services in CDK, including network architecture, database configuration, serverless architecture, and monitoring setup. These concepts form the foundation for building production-grade applications.