Chapter 5: Integration of Common AWS Services
9/1/25About 9 min
Learning Objectives
- Master the CDK implementation of VPC network architecture
- Learn to configure compute and storage services like EC2, RDS, and ElastiCache
- Understand the implementation of serverless architectures with API Gateway and Lambda
- Master the configuration of monitoring and messaging services like CloudWatch, SNS, and SQS
VPC Network Architecture
Complete VPC Setup
from aws_cdk import (
Stack,
aws_ec2 as ec2,
aws_logs as logs,
CfnOutput
)
from constructs import Construct
class NetworkingStack(Stack):
def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
super().__init__(scope, construct_id, **kwargs)
# Create a VPC
self.vpc = ec2.Vpc(
self,
"MainVPC",
vpc_name="main-vpc",
max_azs=3, # Use 3 Availability Zones
cidr="10.0.0.0/16",
# Define subnet configuration
subnet_configuration=[
# Public subnet - for load balancers, NAT gateways
ec2.SubnetConfiguration(
name="Public",
subnet_type=ec2.SubnetType.PUBLIC,
cidr_mask=24 # 10.0.0.0/24, 10.0.1.0/24, 10.0.2.0/24
),
# Private subnet - for application servers
ec2.SubnetConfiguration(
name="Private",
subnet_type=ec2.SubnetType.PRIVATE_WITH_EGRESS,
cidr_mask=24 # 10.0.10.0/24, 10.0.11.0/24, 10.0.12.0/24
),
# Database subnet - fully isolated
ec2.SubnetConfiguration(
name="Database",
subnet_type=ec2.SubnetType.PRIVATE_ISOLATED,
cidr_mask=24 # 10.0.20.0/24, 10.0.21.0/24, 10.0.22.0/24
)
],
# Enable DNS
enable_dns_hostnames=True,
enable_dns_support=True,
# NAT gateway configuration
nat_gateways=1, # Recommended one per AZ for production
# Flow logs
flow_logs={
"FlowLogsCloudWatch": ec2.FlowLogOptions(
destination=ec2.FlowLogDestination.to_cloud_watch_logs(
log_group=logs.LogGroup(
self,
"VpcFlowLogsGroup",
retention=logs.RetentionDays.ONE_MONTH
)
),
traffic_type=ec2.FlowLogTrafficType.ALL
)
}
)
# Create security groups
self._create_security_groups()
# Create VPC endpoints (to reduce NAT gateway costs)
self._create_vpc_endpoints()
# Output network information
self._create_outputs()
def _create_security_groups(self):
"""Create security groups"""
# Web tier security group
self.web_sg = ec2.SecurityGroup(
self,
"WebSecurityGroup",
vpc=self.vpc,
description="Security group for web servers",
allow_all_outbound=True
)
# Allow HTTP/HTTPS traffic
self.web_sg.add_ingress_rule(
peer=ec2.Peer.any_ipv4(),
connection=ec2.Port.tcp(80),
description="Allow HTTP"
)
self.web_sg.add_ingress_rule(
peer=ec2.Peer.any_ipv4(),
connection=ec2.Port.tcp(443),
description="Allow HTTPS"
)
# App tier security group
self.app_sg = ec2.SecurityGroup(
self,
"AppSecurityGroup",
vpc=self.vpc,
description="Security group for application servers",
allow_all_outbound=True
)
# Only allow traffic from the web tier
self.app_sg.add_ingress_rule(
peer=self.web_sg,
connection=ec2.Port.tcp(8080),
description="Allow traffic from web tier"
)
# Database tier security group
self.db_sg = ec2.SecurityGroup(
self,
"DatabaseSecurityGroup",
vpc=self.vpc,
description="Security group for database servers"
)
# Only allow database connections from the app tier
self.db_sg.add_ingress_rule(
peer=self.app_sg,
connection=ec2.Port.tcp(5432),
description="Allow PostgreSQL from app tier"
)
self.db_sg.add_ingress_rule(
peer=self.app_sg,
connection=ec2.Port.tcp(3306),
description="Allow MySQL from app tier"
)
# Cache tier security group
self.cache_sg = ec2.SecurityGroup(
self,
"CacheSecurityGroup",
vpc=self.vpc,
description="Security group for cache servers"
)
self.cache_sg.add_ingress_rule(
peer=self.app_sg,
connection=ec2.Port.tcp(6379),
description="Allow Redis from app tier"
)
# Management access security group
self.bastion_sg = ec2.SecurityGroup(
self,
"BastionSecurityGroup",
vpc=self.vpc,
description="Security group for bastion host"
)
# Only allow SSH access from the company IP
self.bastion_sg.add_ingress_rule(
peer=ec2.Peer.ipv4("203.0.113.0/24"), # Replace with your actual company IP range
connection=ec2.Port.tcp(22),
description="Allow SSH from company network"
)
def _create_vpc_endpoints(self):
"""Create VPC endpoints to reduce NAT gateway usage"""
# S3 gateway endpoint
self.vpc.add_gateway_endpoint(
"S3Endpoint",
service=ec2.GatewayVpcEndpointAwsService.S3,
subnets=[
ec2.SubnetSelection(subnet_type=ec2.SubnetType.PRIVATE_WITH_EGRESS),
ec2.SubnetSelection(subnet_type=ec2.SubnetType.PRIVATE_ISOLATED)
]
)
# DynamoDB gateway endpoint
self.vpc.add_gateway_endpoint(
"DynamoDbEndpoint",
service=ec2.GatewayVpcEndpointAwsService.DYNAMODB,
subnets=[
ec2.SubnetSelection(subnet_type=ec2.SubnetType.PRIVATE_WITH_EGRESS),
ec2.SubnetSelection(subnet_type=ec2.SubnetType.PRIVATE_ISOLATED)
]
)
# Interface endpoint security group
endpoint_sg = ec2.SecurityGroup(
self,
"VpcEndpointSG",
vpc=self.vpc,
description="Security group for VPC endpoints"
)
endpoint_sg.add_ingress_rule(
peer=ec2.Peer.ipv4(self.vpc.vpc_cidr_block),
connection=ec2.Port.tcp(443),
description="Allow HTTPS from VPC"
)
# Interface endpoints for common services
services = [
ec2.InterfaceVpcEndpointAwsService.ECR,
ec2.InterfaceVpcEndpointAwsService.ECR_DOCKER,
ec2.InterfaceVpcEndpointAwsService.LAMBDA,
ec2.InterfaceVpcEndpointAwsService.SECRETS_MANAGER,
ec2.InterfaceVpcEndpointAwsService.SSM,
ec2.InterfaceVpcEndpointAwsService.SSM_MESSAGES,
ec2.InterfaceVpcEndpointAwsService.EC2_MESSAGES
]
for service in services:
self.vpc.add_interface_endpoint(
f"{service.name}Endpoint",
service=service,
subnets=ec2.SubnetSelection(
subnet_type=ec2.SubnetType.PRIVATE_WITH_EGRESS
),
security_groups=[endpoint_sg]
)
def _create_outputs(self):
"""Create outputs"""
CfnOutput(self, "VpcId", value=self.vpc.vpc_id)
CfnOutput(self, "VpcCidr", value=self.vpc.vpc_cidr_block)
CfnOutput(
self, "PublicSubnetIds",
value=",".join([s.subnet_id for s in self.vpc.public_subnets])
)
CfnOutput(
self, "PrivateSubnetIds",
value=",".join([s.subnet_id for s in self.vpc.private_subnets])
)
CfnOutput(
self, "DatabaseSubnetIds",
value=",".join([s.subnet_id for s in self.vpc.isolated_subnets])
)
VPC Architecture Diagram
RDS Database Configuration
Production-grade RDS Setup
from aws_cdk import (
Stack,
aws_rds as rds,
aws_ec2 as ec2,
aws_kms as kms,
aws_secretsmanager as secretsmanager,
Duration,
RemovalPolicy
)
class DatabaseStack(Stack):
def __init__(self, scope: Construct, construct_id: str,
vpc: ec2.Vpc,
database_sg: ec2.SecurityGroup,
**kwargs) -> None:
super().__init__(scope, construct_id, **kwargs)
self.vpc = vpc
self.database_sg = database_sg
# Create the primary database
self.primary_db = self._create_primary_database()
# Create a read replica
self.read_replica = self._create_read_replica()
# Create an ElastiCache cluster
self.cache_cluster = self._create_cache_cluster()
# Output database information
self._create_outputs()
def _create_primary_database(self):
"""Create the primary database"""
# Create a KMS key for encryption
db_key = kms.Key(
self,
"DatabaseKey",
description="KMS key for database encryption",
enable_key_rotation=True
)
# Create database credentials
credentials = rds.Credentials.from_generated_secret(
username="dbadmin",
secret_name="rds-credentials",
exclude_characters='''/@"\";'''
)
# Create a subnet group
subnet_group = rds.SubnetGroup(
self,
"DatabaseSubnetGroup",
description="Subnet group for RDS database",
vpc=self.vpc,
vpc_subnets=ec2.SubnetSelection(
subnet_type=ec2.SubnetType.PRIVATE_ISOLATED
),
removal_policy=RemovalPolicy.DESTROY
)
# Create a parameter group
parameter_group = rds.ParameterGroup(
self,
"DatabaseParameterGroup",
engine=rds.DatabaseInstanceEngine.postgres(
version=rds.PostgresEngineVersion.VER_13_7
),
description="Custom parameter group for PostgreSQL",
parameters={
"shared_preload_libraries": "pg_stat_statements",
"log_statement": "all",
"log_min_duration_statement": "1000",
"log_checkpoints": "on",
"log_lock_waits": "on"
}
)
# Create an RDS instance
database = rds.DatabaseInstance(
self,
"PrimaryDatabase",
engine=rds.DatabaseInstanceEngine.postgres(
version=rds.PostgresEngineVersion.VER_13_7
),
instance_type=ec2.InstanceType.of(
ec2.InstanceClass.BURSTABLE3,
ec2.InstanceSize.MEDIUM
),
credentials=credentials,
vpc=self.vpc,
subnet_group=subnet_group,
security_groups=[self.database_sg],
# Database configuration
database_name="appdb",
port=5432,
parameter_group=parameter_group,
# Storage configuration
allocated_storage=100, # GB
max_allocated_storage=1000, # Enable auto-scaling
storage_type=rds.StorageType.GP2,
storage_encrypted=True,
storage_encryption_key=db_key,
# Backup configuration
backup_retention=Duration.days(7),
backup_window="03:00-04:00", # UTC time
preferred_backup_window="03:00-04:00",
# Maintenance configuration
maintenance_window="Sun:04:00-Sun:05:00",
auto_minor_version_upgrade=True,
# High availability
multi_az=True,
# Monitoring
monitoring_interval=Duration.seconds(60),
enable_performance_insights=True,
performance_insight_retention=rds.PerformanceInsightRetention.DEFAULT,
performance_insight_encryption_key=db_key,
# Log exports
cloudwatch_logs_exports=["postgresql"],
cloudwatch_logs_retention=logs.RetentionDays.ONE_MONTH,
# Deletion protection
deletion_protection=False, # Set to True for production
delete_automated_backups=True,
removal_policy=RemovalPolicy.DESTROY # For development
)
return database
def _create_read_replica(self):
"""Create a read replica"""
read_replica = rds.DatabaseInstanceReadReplica(
self,
"ReadReplica",
source_database_instance=self.primary_db,
instance_type=ec2.InstanceType.of(
ec2.InstanceClass.BURSTABLE3,
ec2.InstanceSize.SMALL
),
vpc=self.vpc,
vpc_subnets=ec2.SubnetSelection(
subnet_type=ec2.SubnetType.PRIVATE_ISOLATED
),
security_groups=[self.database_sg],
# Read replica configuration
auto_minor_version_upgrade=True,
backup_retention=Duration.days(0), # No backups needed for read replicas
monitoring_interval=Duration.seconds(60),
enable_performance_insights=True,
deletion_protection=False,
removal_policy=RemovalPolicy.DESTROY
)
return read_replica
def _create_cache_cluster(self):
"""Create an ElastiCache Redis cluster"""
# Create a cache subnet group
cache_subnet_group = elasticache.CfnSubnetGroup(
self,
"CacheSubnetGroup",
description="Subnet group for ElastiCache",
subnet_ids=[subnet.subnet_id for subnet in self.vpc.private_subnets]
)
# Create a cache parameter group
cache_parameter_group = elasticache.CfnParameterGroup(
self,
"CacheParameterGroup",
cache_parameter_group_family="redis6.x",
description="Custom parameter group for Redis",
properties={
"maxmemory-policy": "allkeys-lru",
"timeout": "300"
}
)
# Create a Redis replication group
redis_cluster = elasticache.CfnReplicationGroup(
self,
"RedisCluster",
description="Redis cluster for application caching",
# Cluster configuration
replication_group_id="app-redis-cluster",
num_cache_clusters=2, # Primary node + 1 read replica
# Instance configuration
cache_node_type="cache.t3.micro",
engine="redis",
engine_version="6.2",
port=6379,
# Network configuration
cache_subnet_group_name=cache_subnet_group.ref,
security_group_ids=[self.database_sg.security_group_id],
# Parameter group
cache_parameter_group_name=cache_parameter_group.ref,
# Security configuration
at_rest_encryption_enabled=True,
transit_encryption_enabled=True,
auth_token="your-secure-auth-token-here", # Should use Secrets Manager in production
# Backup configuration
automatic_failover_enabled=True,
snapshot_retention_limit=5,
snapshot_window="03:00-05:00",
preferred_maintenance_window="sun:05:00-sun:06:00",
# Notifications
notification_topic_arn="", # Can set an SNS topic to receive notifications
# Logs
log_delivery_configurations=[
elasticache.CfnReplicationGroup.LogDeliveryConfigurationRequestProperty(
destination_type="cloudwatch-logs",
destination_details=elasticache.CfnReplicationGroup.DestinationDetailsProperty(
cloud_watch_logs_details=elasticache.CfnReplicationGroup.CloudWatchLogsDestinationDetailsProperty(
log_group="/aws/elasticache/redis"
)
),
log_format="json",
log_type="slow-log"
)
]
)
# Add dependencies
redis_cluster.add_dependency(cache_subnet_group)
redis_cluster.add_dependency(cache_parameter_group)
return redis_cluster
def _create_outputs(self):
"""Create outputs"""
CfnOutput(
self,
"DatabaseEndpoint",
value=self.primary_db.instance_endpoint.hostname,
description="Primary database endpoint"
)
CfnOutput(
self,
"DatabaseSecretArn",
value=self.primary_db.secret.secret_arn,
description="Database credentials secret ARN"
)
CfnOutput(
self,
"ReadReplicaEndpoint",
value=self.read_replica.instance_endpoint.hostname,
description="Read replica endpoint"
)
CfnOutput(
self,
"RedisEndpoint",
value=self.redis_cluster.attr_redis_endpoint_address,
description="Redis cluster endpoint"
)
Lambda and API Gateway Serverless Architecture
Complete Serverless API
from aws_cdk import (
Stack,
aws_lambda as lambda_,
aws_apigateway as apigw,
aws_dynamodb as dynamodb,
aws_cognito as cognito,
aws_iam as iam,
Duration,
RemovalPolicy
)
class ServerlessApiStack(Stack):
def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
super().__init__(scope, construct_id, **kwargs)
# Create a user pool for authentication
self.user_pool = self._create_user_pool()
# Create a DynamoDB table
self.table = self._create_dynamodb_table()
# Create a Lambda layer
self.common_layer = self._create_lambda_layer()
# Create Lambda functions
self.functions = self._create_lambda_functions()
# Create an API Gateway
self.api = self._create_api_gateway()
# Configure API routes
self._configure_api_routes()
# Output API information
self._create_outputs()
def _create_user_pool(self):
"""Create a Cognito User Pool"""
user_pool = cognito.UserPool(
self,
"ApiUserPool",
user_pool_name="api-users",
# Sign-up configuration
self_sign_up_enabled=True,
sign_in_aliases=cognito.SignInAliases(email=True),
# Password policy
password_policy=cognito.PasswordPolicy(
min_length=8,
require_lowercase=True,
require_uppercase=True,
require_digits=True,
require_symbols=False
),
# Account recovery
account_recovery=cognito.AccountRecovery.EMAIL_ONLY,
# Attribute verification
auto_verify=cognito.AutoVerifiedAttrs(email=True),
# User attributes
standard_attributes=cognito.StandardAttributes(
email=cognito.StandardAttribute(required=True, mutable=True),
given_name=cognito.StandardAttribute(required=True, mutable=True),
family_name=cognito.StandardAttribute(required=True, mutable=True)
),
# Removal policy
removal_policy=RemovalPolicy.DESTROY
)
# Create a user pool client
user_pool_client = user_pool.add_client(
"ApiClient",
user_pool_client_name="api-client",
# Authentication flows
auth_flows=cognito.AuthFlow(
user_srp=True,
user_password=True,
admin_user_password=True
),
# Token validity
access_token_validity=Duration.minutes(60),
id_token_validity=Duration.minutes(60),
refresh_token_validity=Duration.days(30),
# Read/write permissions
read_attributes=cognito.ClientAttributes(
email=True,
given_name=True,
family_name=True
),
write_attributes=cognito.ClientAttributes(
email=True,
given_name=True,
family_name=True
)
)
return user_pool
def _create_dynamodb_table(self):
"""Create a DynamoDB table"""
table = dynamodb.Table(
self,
"ApiDataTable",
table_name="api-data",
# Primary key design
partition_key=dynamodb.Attribute(
name="PK",
type=dynamodb.AttributeType.STRING
),
sort_key=dynamodb.Attribute(
name="SK",
type=dynamodb.AttributeType.STRING
),
# Billing mode
billing_mode=dynamodb.BillingMode.PAY_PER_REQUEST,
# Global secondary indexes
global_secondary_indexes=[
dynamodb.GlobalSecondaryIndex(
index_name="GSI1",
partition_key=dynamodb.Attribute(
name="GSI1PK",
type=dynamodb.AttributeType.STRING
),
sort_key=dynamodb.Attribute(
name="GSI1SK",
type=dynamodb.AttributeType.STRING
),
projection_type=dynamodb.ProjectionType.ALL
)
],
# Stream configuration
stream=dynamodb.StreamViewType.NEW_AND_OLD_IMAGES,
# Point-in-time recovery
point_in_time_recovery=True,
# Removal policy
removal_policy=RemovalPolicy.DESTROY
)
return table
def _create_lambda_layer(self):
"""Create a Lambda layer"""
layer = lambda_.LayerVersion(
self,
"CommonLayer",
code=lambda_.Code.from_asset("layers/common"),
compatible_runtimes=[lambda_.Runtime.PYTHON_3_9],
description="Common utilities and dependencies",
layer_version_name="common-utils"
)
return layer
def _create_lambda_functions(self):
"""Create Lambda functions"""
# Common environment variables
common_env = {
"TABLE_NAME": self.table.table_name,
"USER_POOL_ID": self.user_pool.user_pool_id,
"POWERTOOLS_SERVICE_NAME": "api-service",
"LOG_LEVEL": "INFO"
}
# Common configuration
function_props = {
"runtime": lambda_.Runtime.PYTHON_3_9,
"timeout": Duration.seconds(30),
"memory_size": 256,
"layers": [self.common_layer],
"environment": common_env,
"tracing": lambda_.Tracing.ACTIVE # Enable X-Ray tracing
}
functions = {}
# User management function
functions['users'] = lambda_.Function(
self,
"UsersFunction",
handler="users.handler",
code=lambda_.Code.from_asset("lambda/users"),
description="Handle user operations",
**function_props
)
# Product management function
functions['products'] = lambda_.Function(
self,
"ProductsFunction",
handler="products.handler",
code=lambda_.Code.from_asset("lambda/products"),
description="Handle product operations",
**function_props
)
# Order management function
functions['orders'] = lambda_.Function(
self,
"OrdersFunction",
handler="orders.handler",
code=lambda_.Code.from_asset("lambda/orders"),
description="Handle order operations",
**function_props
)
# Authorizer function
functions['authorizer'] = lambda_.Function(
self,
"AuthorizerFunction",
handler="authorizer.handler",
code=lambda_.Code.from_asset("lambda/authorizer"),
description="Custom API Gateway authorizer",
timeout=Duration.seconds(10),
**{k: v for k, v in function_props.items() if k != 'timeout'}
)
# Grant DynamoDB permissions to all functions
for func in functions.values():
if func != functions['authorizer']:
self.table.grant_read_write_data(func)
return functions
def _create_api_gateway(self):
"""Create an API Gateway"""
# Create a custom authorizer
authorizer = apigw.RequestAuthorizer(
self,
"CustomAuthorizer",
handler=self.functions['authorizer'],
identity_sources=[
apigw.IdentitySource.header('Authorization'),
apigw.IdentitySource.header('x-api-key')
],
results_cache_ttl=Duration.minutes(5)
)
# Create the API
api = apigw.RestApi(
self,
"ServerlessApi",
rest_api_name="serverless-api",
description="Serverless API with Lambda and DynamoDB",
# CORS configuration
default_cors_preflight_options=apigw.CorsOptions(
allow_origins=apigw.Cors.ALL_ORIGINS,
allow_methods=apigw.Cors.ALL_METHODS,
allow_headers=[
"Content-Type",
"X-Amz-Date",
"Authorization",
"X-Api-Key",
"X-Amz-Security-Token"
]
),
# Deployment configuration
deploy=True,
deploy_options=apigw.StageOptions(
stage_name="prod",
throttling_rate_limit=1000,
throttling_burst_limit=2000,
logging_level=apigw.MethodLoggingLevel.INFO,
data_trace_enabled=True,
metrics_enabled=True,
# Caching configuration
caching_enabled=True,
cache_cluster_enabled=True,
cache_cluster_size=apigw.CacheClusterSize.SMALL,
# Access logs
access_log_destination=apigw.LogGroupLogDestination(
logs.LogGroup(
self,
"ApiAccessLogs",
retention=logs.RetentionDays.ONE_MONTH
)
),
access_log_format=apigw.AccessLogFormat.json_with_standard_fields()
),
# Endpoint configuration
endpoint_configuration=apigw.EndpointConfiguration(
types=[apigw.EndpointType.REGIONAL]
)
)
return api
def _configure_api_routes(self):
"""Configure API routes"""
# v1 API root path
v1 = self.api.root.add_resource("v1")
# User routes
users = v1.add_resource("users")
users.add_method(
"GET",
apigw.LambdaIntegration(self.functions['users']),
authorizer=self.authorizer,
authorization_type=apigw.AuthorizationType.CUSTOM,
request_validator=apigw.RequestValidator(
self,
"UsersValidator",
rest_api=self.api,
validate_request_parameters=True,
validate_request_body=True
)
)
users.add_method(
"POST",
apigw.LambdaIntegration(self.functions['users']),
authorizer=self.authorizer,
authorization_type=apigw.AuthorizationType.CUSTOM
)
# User detail routes
user_detail = users.add_resource("{userId}")
user_detail.add_method(
"GET",
apigw.LambdaIntegration(self.functions['users']),
authorizer=self.authorizer,
authorization_type=apigw.AuthorizationType.CUSTOM
)
user_detail.add_method(
"PUT",
apigw.LambdaIntegration(self.functions['users']),
authorizer=self.authorizer,
authorization_type=apigw.AuthorizationType.CUSTOM
)
user_detail.add_method(
"DELETE",
apigw.LambdaIntegration(self.functions['users']),
authorizer=self.authorizer,
authorization_type=apigw.AuthorizationType.CUSTOM
)
# Product routes
products = v1.add_resource("products")
products.add_method("ANY", apigw.LambdaIntegration(self.functions['products']))
# Product detail routes (proxy integration)
product_proxy = products.add_resource("{proxy+}")
product_proxy.add_method("ANY", apigw.LambdaIntegration(self.functions['products']))
# Order routes
orders = v1.add_resource("orders")
orders.add_method("ANY", apigw.LambdaIntegration(self.functions['orders']))
order_proxy = orders.add_resource("{proxy+}")
order_proxy.add_method("ANY", apigw.LambdaIntegration(self.functions['orders']))
# Health check route (no authentication)
health = self.api.root.add_resource("health")
health.add_method(
"GET",
apigw.MockIntegration(
integration_responses=[
apigw.IntegrationResponse(
status_code="200",
response_templates={
"application/json": '{"status": "healthy", "timestamp": "$context.requestTime"}'
}
)
],
request_templates={
"application/json": '{"statusCode": 200}'
}
),
method_responses=[
apigw.MethodResponse(
status_code="200",
response_models={
"application/json": apigw.Model.EMPTY_MODEL
}
)
]
)
def _create_outputs(self):
"""Create outputs"""
CfnOutput(
self,
"ApiUrl",
value=self.api.url,
description="API Gateway URL"
)
CfnOutput(
self,
"UserPoolId",
value=self.user_pool.user_pool_id,
description="Cognito User Pool ID"
)
CfnOutput(
self,
"TableName",
value=self.table.table_name,
description="DynamoDB table name"
)
Monitoring and Messaging Services
CloudWatch Monitoring Configuration
from aws_cdk import (
Stack,
aws_cloudwatch as cloudwatch,
aws_cloudwatch_actions as cw_actions,
aws_sns as sns,
aws_sns_subscriptions as subscriptions,
aws_lambda as lambda_,
aws_logs as logs,
Duration
)
class MonitoringStack(Stack):
def __init__(self, scope: Construct, construct_id: str,
lambda_functions: dict,
api_gateway: apigw.RestApi,
database: rds.DatabaseInstance,
**kwargs) -> None:
super().__init__(scope, construct_id, **kwargs)
self.functions = lambda_functions
self.api = api_gateway
self.database = database
# Create an SNS topic for alarms
self.alarm_topic = self._create_alarm_topic()
# Create a CloudWatch dashboard
self.dashboard = self._create_dashboard()
# Create alarms
self._create_alarms()
# Create custom metrics
self._create_custom_metrics()
def _create_alarm_topic(self):
"""Create an alarm topic"""
topic = sns.Topic(
self,
"AlarmTopic",
topic_name="system-alarms",
display_name="System Alarms"
)
# Add an email subscription
topic.add_subscription(
subscriptions.EmailSubscription("admin@example.com")
)
# Add a Slack notification (via Lambda)
slack_notifier = lambda_.Function(
self,
"SlackNotifier",
runtime=lambda_.Runtime.PYTHON_3_9,
handler="slack_notifier.handler",
code=lambda_.Code.from_inline("""
import json
import urllib3
import os
def handler(event, context):
webhook_url = os.environ['SLACK_WEBHOOK_URL']
for record in event['Records']:
sns_message = json.loads(record['Sns']['Message'])
alarm_name = sns_message['AlarmName']
new_state = sns_message['NewStateValue']
reason = sns_message['NewStateReason']
color = "danger" if new_state == "ALARM" else "good"
slack_message = {
"attachments": [{
"color": color,
"title": f"CloudWatch Alarm: {alarm_name}",
"text": f"State: {new_state}\\nReason: {reason}",
"ts": record['Sns']['Timestamp']
}]
}
http = urllib3.PoolManager()
response = http.request(
'POST',
webhook_url,
body=json.dumps(slack_message).encode('utf-8'),
headers={'Content-Type': 'application/json'}
)
print(f"Slack notification sent: {response.status}")
return {'statusCode': 200}
"""),
environment={
"SLACK_WEBHOOK_URL": "https://hooks.slack.com/services/YOUR/SLACK/WEBHOOK"
},
timeout=Duration.seconds(30)
)
topic.add_subscription(
subscriptions.LambdaSubscription(slack_notifier)
)
return topic
def _create_dashboard(self):
"""Create a CloudWatch dashboard"""
dashboard = cloudwatch.Dashboard(
self,
"SystemDashboard",
dashboard_name="system-overview"
)
# API Gateway metrics
api_widgets = [
cloudwatch.GraphWidget(
title="API Gateway Requests",
left=[
cloudwatch.Metric(
namespace="AWS/ApiGateway",
metric_name="Count",
dimensions_map={
"ApiName": self.api.rest_api_name
},
statistic="Sum"
)
],
period=Duration.minutes(5)
),
cloudwatch.GraphWidget(
title="API Gateway Latency",
left=[
cloudwatch.Metric(
namespace="AWS/ApiGateway",
metric_name="Latency",
dimensions_map={
"ApiName": self.api.rest_api_name
},
statistic="Average"
)
],
period=Duration.minutes(5)
),
cloudwatch.GraphWidget(
title="API Gateway Errors",
left=[
cloudwatch.Metric(
namespace="AWS/ApiGateway",
metric_name="4XXError",
dimensions_map={
"ApiName": self.api.rest_api_name
},
statistic="Sum"
),
cloudwatch.Metric(
namespace="AWS/ApiGateway",
metric_name="5XXError",
dimensions_map={
"ApiName": self.api.rest_api_name
},
statistic="Sum"
)
],
period=Duration.minutes(5)
)
]
# Lambda metrics
lambda_widgets = []
for func_name, func in self.functions.items():
lambda_widgets.extend([
cloudwatch.GraphWidget(
title=f"Lambda {func_name} - Invocations",
left=[func.metric_invocations()],
period=Duration.minutes(5)
),
cloudwatch.GraphWidget(
title=f"Lambda {func_name} - Duration",
left=[func.metric_duration()],
period=Duration.minutes(5)
),
cloudwatch.GraphWidget(
title=f"Lambda {func_name} - Errors",
left=[func.metric_errors()],
period=Duration.minutes(5)
)
])
# RDS metrics
rds_widgets = [
cloudwatch.GraphWidget(
title="RDS CPU Utilization",
left=[
cloudwatch.Metric(
namespace="AWS/RDS",
metric_name="CPUUtilization",
dimensions_map={
"DBInstanceIdentifier": self.database.instance_identifier
},
statistic="Average"
)
],
period=Duration.minutes(5)
),
cloudwatch.GraphWidget(
title="RDS Connections",
left=[
cloudwatch.Metric(
namespace="AWS/RDS",
metric_name="DatabaseConnections",
dimensions_map={
"DBInstanceIdentifier": self.database.instance_identifier
},
statistic="Average"
)
],
period=Duration.minutes(5)
)
]
# Add all widgets to the dashboard
dashboard.add_widgets(*api_widgets)
dashboard.add_widgets(*lambda_widgets)
dashboard.add_widgets(*rds_widgets)
return dashboard
def _create_alarms(self):
"""Create alarms"""
# API Gateway alarms
high_latency_alarm = cloudwatch.Alarm(
self,
"ApiHighLatency",
alarm_name="API-High-Latency",
metric=cloudwatch.Metric(
namespace="AWS/ApiGateway",
metric_name="Latency",
dimensions_map={
"ApiName": self.api.rest_api_name
},
statistic="Average"
),
threshold=5000, # 5 seconds
evaluation_periods=2,
datapoints_to_alarm=2,
comparison_operator=cloudwatch.ComparisonOperator.GREATER_THAN_THRESHOLD,
alarm_description="API Gateway latency is high",
treat_missing_data=cloudwatch.TreatMissingData.NOT_BREACHING
)
high_latency_alarm.add_alarm_action(
cw_actions.SnsAction(self.alarm_topic)
)
# Lambda error alarms
for func_name, func in self.functions.items():
error_alarm = cloudwatch.Alarm(
self,
f"{func_name}Errors",
alarm_name=f"Lambda-{func_name}-Errors",
metric=func.metric_errors(period=Duration.minutes(5)),
threshold=5,
evaluation_periods=2,
alarm_description=f"High error rate for {func_name} function"
)
error_alarm.add_alarm_action(
cw_actions.SnsAction(self.alarm_topic)
)
# RDS alarms
db_cpu_alarm = cloudwatch.Alarm(
self,
"DatabaseHighCPU",
alarm_name="RDS-High-CPU",
metric=cloudwatch.Metric(
namespace="AWS/RDS",
metric_name="CPUUtilization",
dimensions_map={
"DBInstanceIdentifier": self.database.instance_identifier
},
statistic="Average"
),
threshold=80,
evaluation_periods=2,
alarm_description="Database CPU utilization is high"
)
db_cpu_alarm.add_alarm_action(
cw_actions.SnsAction(self.alarm_topic)
)
def _create_custom_metrics(self):
"""Create custom metrics"""
# Create a Lambda function to send custom metrics
metrics_publisher = lambda_.Function(
self,
"MetricsPublisher",
runtime=lambda_.Runtime.PYTHON_3_9,
handler="metrics.handler",
code=lambda_.Code.from_inline("""import boto3
import json
import time
cloudwatch = boto3.client('cloudwatch')
def handler(event, context):
# Send custom business metrics
# Example: Number of user registrations
cloudwatch.put_metric_data(
Namespace='MyApp/Users',
MetricData=[
{
'MetricName': 'NewRegistrations',
'Value': event.get('new_users', 0),
'Unit': 'Count',
'Timestamp': time.time()
}
]
)
# Example: Order amount
if 'order_amount' in event:
cloudwatch.put_metric_data(
Namespace='MyApp/Orders',
MetricData=[
{
'MetricName': 'OrderValue',
'Value': event['order_amount'],
'Unit': 'None',
'Timestamp': time.time()
}
]
)
return {'statusCode': 200}
"""),
timeout=Duration.seconds(30)
)
# Grant permission to send metrics
metrics_publisher.add_to_role_policy(
iam.PolicyStatement(
effect=iam.Effect.ALLOW,
actions=["cloudwatch:PutMetricData"],
resources=["*"]
)
)
return metrics_publisher
This chapter provides a comprehensive introduction to integrating major AWS services in CDK, including network architecture, database configuration, serverless architecture, and monitoring setup. These concepts form the foundation for building production-grade applications.