9/12/25About 7 min
title: "Chapter 3: Using Basic Constructs"
date: 2025-09-01
icon: circle-dot
author: Haiyue
category:
- aws
star: false
Learning Objectives
- Master the differences and use cases for L1, L2, and L3 Constructs
- Learn to create and configure basic AWS resources (S3, Lambda, IAM)
- Understand the dependencies between resources
- Master parameter passing and configuration management in CDK
In-depth Understanding of Construct Levels
Three-Tier Architecture Comparison
Choosing the Right Construct Level
Scenario | Recommended Level | Reason |
---|---|---|
Rapid prototyping | L3 | Reduce code volume, quick validation |
Production environment standard configuration | L2 | Balance ease of use and control |
Special configuration requirements | L1 | Full control over all properties |
Legacy system migration | L1 | Precisely map existing configurations |
Complete S3 Bucket Example
L1 Level: Full Control
from aws_cdk import (
Stack,
aws_s3 as s3,
CfnOutput
)
from constructs import Construct
class S3L1Stack(Stack):
def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
super().__init__(scope, construct_id, **kwargs)
# L1 Construct: Direct use of CloudFormation resources
bucket_l1 = s3.CfnBucket(
self,
"MyBucketL1",
bucket_name="my-app-data-bucket-l1",
# Versioning configuration
versioning_configuration=s3.CfnBucket.VersioningConfigurationProperty(
status="Enabled"
),
# Encryption configuration
bucket_encryption=s3.CfnBucket.BucketEncryptionProperty(
server_side_encryption_configuration=[
s3.CfnBucket.ServerSideEncryptionRuleProperty(
server_side_encryption_by_default=s3.CfnBucket.ServerSideEncryptionByDefaultProperty(
sse_algorithm="AES256"
)
)
]
),
# Public access block
public_access_block_configuration=s3.CfnBucket.PublicAccessBlockConfigurationProperty(
block_public_acls=True,
block_public_policy=True,
ignore_public_acls=True,
restrict_public_buckets=True
),
# Lifecycle rules
lifecycle_configuration=s3.CfnBucket.LifecycleConfigurationProperty(
rules=[
s3.CfnBucket.RuleProperty(
id="DeleteOldVersions",
status="Enabled",
noncurrent_version_expiration=s3.CfnBucket.NoncurrentVersionExpirationProperty(
noncurrent_days=30
)
)
]
),
# Notification configuration
notification_configuration=s3.CfnBucket.NotificationConfigurationProperty(
# Can configure Lambda, SQS, SNS notifications
)
)
CfnOutput(self, "BucketNameL1", value=bucket_l1.ref)
L2 Level: Best Practices
from aws_cdk import (
Stack,
aws_s3 as s3,
aws_s3_notifications as s3n,
aws_lambda as lambda_,
RemovalPolicy,
Duration,
CfnOutput
)
from constructs import Construct
class S3L2Stack(Stack):
def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
super().__init__(scope, construct_id, **kwargs)
# L2 Construct: Simplified API with best practices
bucket_l2 = s3.Bucket(
self,
"MyBucketL2",
bucket_name="my-app-data-bucket-l2",
# Versioning (simplified)
versioned=True,
# Encryption (simplified)
encryption=s3.BucketEncryption.S3_MANAGED,
# Public access (simplified)
block_public_access=s3.BlockPublicAccess.BLOCK_ALL,
# Auto-delete objects (for development environments)
auto_delete_objects=True,
removal_policy=RemovalPolicy.DESTROY,
# Lifecycle rules (simplified)
lifecycle_rules=[
s3.LifecycleRule(
id="delete-old-versions",
enabled=True,
noncurrent_version_expiration=Duration.days(30),
abort_incomplete_multipart_upload_after=Duration.days(1)
)
],
# CORS configuration
cors=[
s3.CorsRule(
allowed_methods=[s3.HttpMethods.GET, s3.HttpMethods.POST],
allowed_origins=["https://example.com"],
allowed_headers=["*"],
max_age=3000
)
]
)
# Create a handler function
processor_function = lambda_.Function(
self,
"S3Processor",
runtime=lambda_.Runtime.PYTHON_3_9,
handler="index.handler",
code=lambda_.Code.from_inline('''
def handler(event, context):
for record in event['Records']:
bucket = record['s3']['bucket']['name']
key = record['s3']['object']['key']
print(f"File {key} uploaded to bucket {bucket}")
return {'statusCode': 200}
''')
)
# Add S3 event notification (advantage of L2: simple API)
bucket_l2.add_event_notification(
s3.EventType.OBJECT_CREATED,
s3n.LambdaDestination(processor_function),
s3.NotificationKeyFilter(prefix="uploads/", suffix=".jpg")
)
# Grant Lambda read permission
bucket_l2.grant_read(processor_function)
CfnOutput(self, "BucketNameL2", value=bucket_l2.bucket_name)
CfnOutput(self, "BucketArnL2", value=bucket_l2.bucket_arn)
L3 Level: Pattern Combination
from constructs import Construct
from aws_cdk import (
Stack,
aws_s3 as s3,
aws_s3_deployment as s3deploy,
aws_cloudfront as cloudfront,
aws_cloudfront_origins as origins,
RemovalPolicy,
CfnOutput
)
class StaticWebsiteConstruct(Construct):
"""L3 Construct: Static website hosting pattern"""
def __init__(self, scope: Construct, construct_id: str,
domain_name: str = None, **kwargs) -> None:
super().__init__(scope, construct_id, **kwargs)
# Bucket for website content
self.website_bucket = s3.Bucket(
self,
"WebsiteBucket",
# Enable static website hosting
website_index_document="index.html",
website_error_document="error.html",
# Public read access
public_read_access=True,
block_public_access=s3.BlockPublicAccess(
block_public_acls=False,
block_public_policy=False,
ignore_public_acls=False,
restrict_public_buckets=False
),
removal_policy=RemovalPolicy.DESTROY,
auto_delete_objects=True
)
# Bucket for logs
self.logs_bucket = s3.Bucket(
self,
"LogsBucket",
removal_policy=RemovalPolicy.DESTROY,
auto_delete_objects=True
)
# CloudFront distribution
self.distribution = cloudfront.Distribution(
self,
"WebsiteDistribution",
default_behavior=cloudfront.BehaviorOptions(
origin=origins.S3Origin(self.website_bucket),
viewer_protocol_policy=cloudfront.ViewerProtocolPolicy.REDIRECT_TO_HTTPS,
cache_policy=cloudfront.CachePolicy.CACHING_OPTIMIZED
),
# Default root object
default_root_object="index.html",
# Error pages
error_responses=[
cloudfront.ErrorResponse(
http_status=404,
response_http_status=404,
response_page_path="/error.html"
)
],
# Access logs
enable_logging=True,
log_bucket=self.logs_bucket,
log_file_prefix="access-logs/"
)
@property
def bucket_name(self) -> str:
return self.website_bucket.bucket_name
@property
def distribution_domain_name(self) -> str:
return self.distribution.distribution_domain_name
@property
def website_url(self) -> str:
return f"https://{self.distribution.distribution_domain_name}"
class S3L3Stack(Stack):
def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
super().__init__(scope, construct_id, **kwargs)
# Use the L3 construct
website = StaticWebsiteConstruct(
self,
"MyWebsite",
domain_name="example.com"
)
# Deploy website content (if website-dist directory exists)
# s3deploy.BucketDeployment(
# self,
# "DeployWebsite",
# sources=[s3deploy.Source.asset("./website-dist")],
# destination_bucket=website.website_bucket,
# distribution=website.distribution,
# distribution_paths=["/*"]
# )
CfnOutput(self, "WebsiteURL", value=website.website_url)
CfnOutput(self, "BucketName", value=website.bucket_name)
Complete Lambda Function Example
Basic Lambda Function
from aws_cdk import (
Stack,
aws_lambda as lambda_,
aws_iam as iam,
aws_logs as logs,
Duration,
CfnOutput
)
from constructs import Construct
class LambdaStack(Stack):
def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
super().__init__(scope, construct_id, **kwargs)
# Basic Lambda function
basic_function = lambda_.Function(
self,
"BasicFunction",
runtime=lambda_.Runtime.PYTHON_3_9,
handler="index.handler",
code=lambda_.Code.from_inline('''
import json
import boto3
def handler(event, context):
print(f"Event: {json.dumps(event)}")
return {
'statusCode': 200,
'headers': {
'Content-Type': 'application/json',
'Access-Control-Allow-Origin': '*'
},
'body': json.dumps({
'message': 'Hello from Lambda!',
'requestId': context.aws_request_id
})
}
'''),
timeout=Duration.seconds(30),
memory_size=256,
# Environment variables
environment={
"ENV": "production",
"LOG_LEVEL": "INFO"
},
# Log retention period
log_retention=logs.RetentionDays.ONE_WEEK,
# Description
description="Basic Lambda function example"
)
# Lambda function deployed from a file
file_function = lambda_.Function(
self,
"FileFunction",
runtime=lambda_.Runtime.PYTHON_3_9,
handler="app.lambda_handler",
code=lambda_.Code.from_asset("lambda"), # From the local lambda directory
timeout=Duration.minutes(1),
memory_size=512,
environment={
"BUCKET_NAME": "my-data-bucket"
}
)
# Layered function (using a Lambda Layer)
# Create a Layer
shared_layer = lambda_.LayerVersion(
self,
"SharedLayer",
code=lambda_.Code.from_asset("layers/shared"), # Contains shared libraries
compatible_runtimes=[lambda_.Runtime.PYTHON_3_9],
description="Shared utilities layer"
)
layered_function = lambda_.Function(
self,
"LayeredFunction",
runtime=lambda_.Runtime.PYTHON_3_9,
handler="index.handler",
code=lambda_.Code.from_asset("lambda/layered"),
layers=[shared_layer],
timeout=Duration.minutes(2)
)
CfnOutput(self, "BasicFunctionArn", value=basic_function.function_arn)
CfnOutput(self, "FileFunctionName", value=file_function.function_name)
Lambda Permissions and Integration
from aws_cdk import (
Stack,
aws_lambda as lambda_,
aws_s3 as s3,
aws_dynamodb as dynamodb,
aws_iam as iam,
aws_apigateway as apigw,
aws_events as events,
aws_events_targets as targets,
Duration
)
from constructs import Construct
class LambdaIntegrationStack(Stack):
def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
super().__init__(scope, construct_id, **kwargs)
# Create a DynamoDB table
table = dynamodb.Table(
self,
"DataTable",
table_name="user-data",
partition_key=dynamodb.Attribute(
name="id",
type=dynamodb.AttributeType.STRING
),
billing_mode=dynamodb.BillingMode.PAY_PER_REQUEST,
removal_policy=cdk.RemovalPolicy.DESTROY
)
# Create an S3 bucket
bucket = s3.Bucket(
self,
"DataBucket",
removal_policy=cdk.RemovalPolicy.DESTROY,
auto_delete_objects=True
)
# Lambda function
processor_function = lambda_.Function(
self,
"DataProcessor",
runtime=lambda_.Runtime.PYTHON_3_9,
handler="processor.handler",
code=lambda_.Code.from_inline('''
import json
import boto3
import os
dynamodb = boto3.resource('dynamodb')
s3 = boto3.client('s3')
def handler(event, context):
table_name = os.environ['TABLE_NAME']
bucket_name = os.environ['BUCKET_NAME']
table = dynamodb.Table(table_name)
# Handle API Gateway request
if 'httpMethod' in event:
if event['httpMethod'] == 'POST':
body = json.loads(event['body'])
# Write to DynamoDB
table.put_item(Item=body)
return {
'statusCode': 200,
'body': json.dumps({'message': 'Data saved'})
}
# Handle S3 event
if 'Records' in event:
for record in event['Records']:
if 's3' in record:
bucket = record['s3']['bucket']['name']
key = record['s3']['object']['key']
print(f"Processing file {key} from bucket {bucket}")
return {'statusCode': 200}
'''),
environment={
"TABLE_NAME": table.table_name,
"BUCKET_NAME": bucket.bucket_name
},
timeout=Duration.minutes(1)
)
# Grant permissions
table.grant_read_write_data(processor_function)
bucket.grant_read_write(processor_function)
# API Gateway integration
api = apigw.RestApi(
self,
"DataAPI",
rest_api_name="Data Processing API",
description="API for data processing"
)
# Add resource and method
data_resource = api.root.add_resource("data")
data_resource.add_method(
"POST",
apigw.LambdaIntegration(processor_function),
authorization_type=apigw.AuthorizationType.NONE
)
# S3 event trigger
bucket.add_event_notification(
s3.EventType.OBJECT_CREATED,
s3n.LambdaDestination(processor_function),
s3.NotificationKeyFilter(prefix="incoming/")
)
# Scheduled trigger (EventBridge)
schedule_rule = events.Rule(
self,
"ScheduleRule",
schedule=events.Schedule.cron(hour="2", minute="0"), # Every day at 2 AM
description="Daily processing trigger"
)
schedule_rule.add_target(targets.LambdaFunction(processor_function))
IAM Roles and Permissions
Fine-grained Permission Control
from aws_cdk import (
Stack,
aws_iam as iam,
aws_s3 as s3,
aws_lambda as lambda_,
aws_dynamodb as dynamodb
)
from constructs import Construct
class IAMExamplesStack(Stack):
def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
super().__init__(scope, construct_id, **kwargs)
# Create a custom IAM role
lambda_role = iam.Role(
self,
"LambdaExecutionRole",
assumed_by=iam.ServicePrincipal("lambda.amazonaws.com"),
role_name="MyLambdaRole",
description="Custom role for Lambda function",
# Basic execution permissions
managed_policies=[
iam.ManagedPolicy.from_aws_managed_policy_name(
"service-role/AWSLambdaBasicExecutionRole"
)
]
)
# Create a custom policy
s3_policy = iam.Policy(
self,
"S3AccessPolicy",
policy_name="S3BucketAccess",
statements=[
iam.PolicyStatement(
effect=iam.Effect.ALLOW,
actions=[
"s3:GetObject",
"s3:PutObject",
"s3:DeleteObject"
],
resources=["arn:aws:s3:::my-bucket/*"]
),
iam.PolicyStatement(
effect=iam.Effect.ALLOW,
actions=["s3:ListBucket"],
resources=["arn:aws:s3:::my-bucket"]
)
]
)
# Attach the policy to the role
lambda_role.attach_inline_policy(s3_policy)
# Create a user and group
developer_group = iam.Group(
self,
"DeveloperGroup",
group_name="Developers"
)
# Add permissions to the group
developer_group.add_managed_policy(
iam.ManagedPolicy.from_aws_managed_policy_name("ReadOnlyAccess")
)
# Create a user
dev_user = iam.User(
self,
"DevUser",
user_name="john-developer",
groups=[developer_group]
)
# Create an access key (use with caution)
access_key = iam.AccessKey(
self,
"DevUserAccessKey",
user=dev_user
)
# Create a cross-account role
cross_account_role = iam.Role(
self,
"CrossAccountRole",
assumed_by=iam.AccountPrincipal("123456789012"), # Trusted account ID
role_name="CrossAccountDataAccess",
external_ids=["unique-external-id"] # External ID for added security
)
Resource-level Permissions
class ResourceLevelPermissions(Stack):
def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
super().__init__(scope, construct_id, **kwargs)
# Create resources
bucket = s3.Bucket(self, "MyBucket")
table = dynamodb.Table(
self,
"MyTable",
partition_key=dynamodb.Attribute(
name="id",
type=dynamodb.AttributeType.STRING
)
)
# Create a Lambda function
function = lambda_.Function(
self,
"MyFunction",
runtime=lambda_.Runtime.PYTHON_3_9,
handler="index.handler",
code=lambda_.Code.from_inline("def handler(event, context): pass")
)
# Authorize using convenience methods of L2 constructs
# These methods automatically create the necessary IAM policies
# Grant S3 permissions
bucket.grant_read(function) # Read-only permission
bucket.grant_write(function) # Write permission
bucket.grant_read_write(function) # Read-write permission
bucket.grant_delete(function) # Delete permission
# Grant DynamoDB permissions
table.grant_read_data(function) # Read data
table.grant_write_data(function) # Write data
table.grant_read_write_data(function) # Read-write data
table.grant_full_access(function) # Full access
# More fine-grained permission control
custom_policy = iam.PolicyStatement(
effect=iam.Effect.ALLOW,
actions=[
"dynamodb:GetItem",
"dynamodb:PutItem"
],
resources=[table.table_arn],
conditions={
"ForAllValues:StringEquals": {
"dynamodb:LeadingKeys": ["user123"] # Can only access a specific partition key
}
}
)
function.add_to_role_policy(custom_policy)
Resource Dependency Management
Explicit Dependencies
from aws_cdk import (
Stack,
aws_vpc as ec2,
aws_rds as rds,
aws_lambda as lambda_,
aws_apigateway as apigw,
Duration
)
class DependencyStack(Stack):
def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
super().__init__(scope, construct_id, **kwargs)
# 1. Create a VPC (infrastructure)
vpc = ec2.Vpc(
self,
"MyVPC",
max_azs=2,
cidr="10.0.0.0/16",
subnet_configuration=[
ec2.SubnetConfiguration(
name="Public",
subnet_type=ec2.SubnetType.PUBLIC,
cidr_mask=24
),
ec2.SubnetConfiguration(
name="Private",
subnet_type=ec2.SubnetType.PRIVATE_WITH_EGRESS,
cidr_mask=24
),
ec2.SubnetConfiguration(
name="Database",
subnet_type=ec2.SubnetType.PRIVATE_ISOLATED,
cidr_mask=24
)
]
)
# 2. Create a database (dependent on VPC)
database = rds.DatabaseInstance(
self,
"MyDatabase",
engine=rds.DatabaseInstanceEngine.postgres(
version=rds.PostgresEngineVersion.VER_13_7
),
instance_type=ec2.InstanceType.of(
ec2.InstanceClass.BURSTABLE3,
ec2.InstanceSize.MICRO
),
vpc=vpc, # Explicit dependency
vpc_subnets=ec2.SubnetSelection(
subnet_type=ec2.SubnetType.PRIVATE_ISOLATED
),
database_name="myapp",
credentials=rds.Credentials.from_generated_secret(
"dbadmin",
exclude_characters='''/@"\'''
),
backup_retention=Duration.days(7),
delete_automated_backups=True,
deletion_protection=False,
removal_policy=cdk.RemovalPolicy.DESTROY
)
# 3. Create a Lambda function (dependent on the database)
db_function = lambda_.Function(
self,
"DatabaseFunction",
runtime=lambda_.Runtime.PYTHON_3_9,
handler="db.handler",
code=lambda_.Code.from_asset("lambda/database"),
vpc=vpc, # In the same VPC as the database
vpc_subnets=ec2.SubnetSelection(
subnet_type=ec2.SubnetType.PRIVATE_WITH_EGRESS
),
environment={
"DB_HOST": database.instance_endpoint.hostname,
"DB_PORT": database.instance_endpoint.port,
"DB_NAME": "myapp",
"SECRET_ARN": database.secret.secret_arn
},
timeout=Duration.seconds(30)
)
# Grant permission to access the database secret
database.secret.grant_read(db_function)
# 4. Create an API Gateway (dependent on Lambda)
api = apigw.RestApi(
self,
"DatabaseAPI",
rest_api_name="Database API"
)
users_resource = api.root.add_resource("users")
users_resource.add_method(
"GET",
apigw.LambdaIntegration(db_function)
)
users_resource.add_method(
"POST",
apigw.LambdaIntegration(db_function)
)
Cross-Stack Dependencies
# base_stack.py
class BaseInfrastructureStack(Stack):
def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
super().__init__(scope, construct_id, **kwargs)
# Create a VPC
self.vpc = ec2.Vpc(
self,
"SharedVPC",
max_azs=3,
cidr="10.0.0.0/16"
)
# Create a security group
self.database_security_group = ec2.SecurityGroup(
self,
"DatabaseSG",
vpc=self.vpc,
description="Security group for database access"
)
# Output the VPC ID for other Stacks to use
CfnOutput(
self,
"VpcId",
value=self.vpc.vpc_id,
export_name="SharedVpcId"
)
CfnOutput(
self,
"DatabaseSGId",
value=self.database_security_group.security_group_id,
export_name="DatabaseSGId"
)
# application_stack.py
class ApplicationStack(Stack):
def __init__(self, scope: Construct, construct_id: str,
base_stack: BaseInfrastructureStack, # Accept the dependent Stack
**kwargs) -> None:
super().__init__(scope, construct_id, **kwargs)
# Use resources from the base_stack
vpc = base_stack.vpc
database_sg = base_stack.database_security_group
# Or use by importing
# vpc = ec2.Vpc.from_lookup(self, "ImportedVpc", vpc_id="vpc-xxx")
# Create application resources
app_function = lambda_.Function(
self,
"AppFunction",
runtime=lambda_.Runtime.PYTHON_3_9,
handler="app.handler",
code=lambda_.Code.from_asset("lambda/app"),
vpc=vpc # Use the shared VPC
)
# Allow Lambda to access the database security group
database_sg.add_ingress_rule(
peer=ec2.Peer.security_group_id(app_function.connections.security_groups[0].security_group_id),
connection=ec2.Port.tcp(5432),
description="Allow Lambda access to database"
)
# app.py
app = cdk.App()
# Create the infrastructure Stack
base_stack = BaseInfrastructureStack(app, "BaseInfrastructure")
# Create the application Stack, passing the dependency
app_stack = ApplicationStack(
app,
"ApplicationStack",
base_stack=base_stack # Pass the dependency
)
# Ensure deployment order
app_stack.add_dependency(base_stack)
app.synth()
Configuration Management Best Practices
Environment Configuration
# config.py
from dataclasses import dataclass
from typing import Dict, Any
@dataclass
class EnvironmentConfig:
environment: str
vpc_cidr: str
database_instance_type: str
lambda_memory_size: int
api_throttle_limit: int
@classmethod
def get_config(cls, environment: str) -> 'EnvironmentConfig':
configs = {
"dev": cls(
environment="dev",
vpc_cidr="10.0.0.0/16",
database_instance_type="db.t3.micro",
lambda_memory_size=256,
api_throttle_limit=100
),
"staging": cls(
environment="staging",
vpc_cidr="10.1.0.0/16",
database_instance_type="db.t3.small",
lambda_memory_size=512,
api_throttle_limit=500
),
"prod": cls(
environment="prod",
vpc_cidr="10.2.0.0/16",
database_instance_type="db.r5.large",
lambda_memory_size=1024,
api_throttle_limit=2000
)
}
if environment not in configs:
raise ValueError(f"Unknown environment: {environment}")
return configs[environment]
# Stack using the configuration
class ConfigurableStack(Stack):
def __init__(self, scope: Construct, construct_id: str,
config: EnvironmentConfig, **kwargs) -> None:
super().__init__(scope, construct_id, **kwargs)
self.config = config
# Create resources using the configuration
vpc = ec2.Vpc(
self,
"VPC",
cidr=config.vpc_cidr,
max_azs=2 if config.environment == "dev" else 3
)
function = lambda_.Function(
self,
"Function",
runtime=lambda_.Runtime.PYTHON_3_9,
handler="index.handler",
code=lambda_.Code.from_inline("def handler(e, c): pass"),
memory_size=config.lambda_memory_size,
environment={
"ENV": config.environment,
"VPC_ID": vpc.vpc_id
}
)
This chapter provides a comprehensive introduction to using basic Constructs in CDK, including selecting different levels, configuring resources, managing permissions, and handling dependencies. Mastering these fundamentals will lay a solid foundation for building complex cloud infrastructure.