Chapter 10: Performance Optimization and Cost Control
9/1/25About 13 min
Chapter 10: Performance Optimization and Cost Control
Learning Objectives
- Master performance optimization strategies for CDK applications
- Understand the cost structure and optimization methods of AWS services
- Learn to use AWS cost management tools
- Implement automated cost monitoring and budget control
- Master resource right-sizing and elastic scaling strategies
- Understand best practices for multi-environment cost management
Performance Optimization Overview
Performance optimization is a continuous process that involves multiple aspects such as architectural design, resource configuration, code optimization, and more.
Lambda Performance Optimization
Optimized Lambda Function Construct
# constructs/optimized_lambda_construct.py
import aws_cdk as cdk
from aws_cdk import (
aws_lambda as lambda_,
aws_logs as logs,
aws_iam as iam,
aws_ec2 as ec2
)
from constructs import Construct
from typing import Optional, Dict, List
class OptimizedLambdaConstruct(Construct):
"""Optimized Lambda Function Construct"""
def __init__(self, scope: Construct, construct_id: str,
function_name: str,
handler: str,
code_asset_path: str,
runtime: lambda_.Runtime = lambda_.Runtime.PYTHON_3_9,
memory_size: int = 256,
timeout_seconds: int = 30,
environment_variables: Optional[Dict[str, str]] = None,
layers: Optional[List[lambda_.LayerVersion]] = None,
vpc: Optional[ec2.Vpc] = None,
enable_tracing: bool = True,
enable_provisioned_concurrency: bool = False,
provisioned_concurrency_count: int = 1,
reserved_concurrent_executions: Optional[int] = None,
**kwargs) -> None:
super().__init__(scope, construct_id, **kwargs)
# Optimized environment variables
optimized_env = {
# Connection reuse
"PYTHONHTTPSVERIFY": "0" if runtime.name.startswith("python") else None,
# Reduce cold starts
"AWS_LAMBDA_EXEC_WRAPPER": "/opt/bootstrap",
}
if environment_variables:
optimized_env.update(environment_variables)
# Filter out None values
optimized_env = {k: v for k, v in optimized_env.items() if v is not None}
# Lambda function
self.function = lambda_.Function(
self,
"Function",
function_name=function_name,
runtime=runtime,
handler=handler,
code=lambda_.Code.from_asset(code_asset_path),
memory_size=memory_size,
timeout=cdk.Duration.seconds(timeout_seconds),
environment=optimized_env,
layers=layers or [],
vpc=vpc,
# Performance optimization configuration
reserved_concurrent_executions=reserved_concurrent_executions,
tracing=lambda_.Tracing.ACTIVE if enable_tracing else lambda_.Tracing.DISABLED,
# Log configuration
log_retention=logs.RetentionDays.ONE_MONTH,
# Architecture optimization (ARM64 is usually cheaper)
architecture=lambda_.Architecture.ARM_64,
# Dead-letter queue
dead_letter_queue_enabled=True,
)
# Provisioned Concurrency
if enable_provisioned_concurrency:
version = self.function.current_version
alias = lambda_.Alias(
self,
"ProdAlias",
alias_name="prod",
version=version
)
alias.add_provisioned_concurrency_config(
"ProvisionedConcurrency",
provisioned_concurrent_executions=provisioned_concurrency_count
)
self.alias = alias
else:
self.alias = None
# Performance monitoring alarms
self._create_performance_alarms()
# Lambda Insights (optional)
if self.node.try_get_context("enable_lambda_insights"):
self.function.add_layers(
lambda_.LayerVersion.from_layer_version_arn(
self,
"LambdaInsightsLayer",
layer_version_arn=f"arn:aws:lambda:{cdk.Aws.REGION}:580247275435:layer:LambdaInsightsExtension:14"
)
)
def _create_performance_alarms(self):
"""Create performance monitoring alarms"""
from aws_cdk import aws_cloudwatch as cloudwatch
from aws_cdk import aws_sns as sns
# Error rate alarm
error_alarm = cloudwatch.Alarm(
self,
"ErrorAlarm",
alarm_name=f"{self.function.function_name}-errors",
metric=self.function.metric_errors(),
threshold=5,
evaluation_periods=2,
datapoints_to_alarm=2
)
# Duration alarm
duration_alarm = cloudwatch.Alarm(
self,
"DurationAlarm",
alarm_name=f"{self.function.function_name}-duration",
metric=self.function.metric_duration(),
threshold=10000, # 10 seconds
evaluation_periods=3,
datapoints_to_alarm=2
)
# Cold start monitoring
cold_start_metric = cloudwatch.Metric(
namespace="AWS/Lambda",
metric_name="Duration",
dimensions_map={
"FunctionName": self.function.function_name
},
statistic="Maximum"
)
cold_start_alarm = cloudwatch.Alarm(
self,
"ColdStartAlarm",
alarm_name=f"{self.function.function_name}-cold-start",
metric=cold_start_metric,
threshold=5000, # 5 seconds
evaluation_periods=2
)
# Concurrent executions alarm
concurrent_executions_alarm = cloudwatch.Alarm(
self,
"ConcurrentExecutionsAlarm",
alarm_name=f"{self.function.function_name}-concurrent-executions",
metric=self.function.metric_invocations(),
threshold=100, # Adjust based on actual needs
evaluation_periods=2
)
def add_performance_dashboard_widgets(self, dashboard):
"""Add performance monitoring widgets to the dashboard"""
from aws_cdk import aws_cloudwatch as cloudwatch
dashboard.add_widgets(
cloudwatch.GraphWidget(
title=f"{self.function.function_name} - Invocations & Errors",
left=[self.function.metric_invocations()],
right=[self.function.metric_errors()],
width=12,
height=6
),
cloudwatch.GraphWidget(
title=f"{self.function.function_name} - Duration & Throttles",
left=[self.function.metric_duration()],
right=[self.function.metric_throttles()],
width=12,
height=6
)
)
Lambda Layer Optimization
# stacks/lambda_layers_stack.py
import aws_cdk as cdk
from aws_cdk import (
aws_lambda as lambda_,
aws_s3 as s3,
aws_s3_deployment as s3_deployment
)
from constructs import Construct
class LambdaLayersStack(cdk.Stack):
"""Lambda Layers Optimization Stack"""
def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
super().__init__(scope, construct_id, **kwargs)
# Python dependencies layer
self.python_dependencies_layer = lambda_.LayerVersion(
self,
"PythonDependenciesLayer",
code=lambda_.Code.from_asset("layers/python-dependencies"),
compatible_runtimes=[
lambda_.Runtime.PYTHON_3_9,
lambda_.Runtime.PYTHON_3_10,
lambda_.Runtime.PYTHON_3_11
],
compatible_architectures=[
lambda_.Architecture.X86_64,
lambda_.Architecture.ARM_64
],
description="Common Python dependencies (boto3, requests, etc.)",
layer_version_name="python-dependencies"
)
# Database connection layer
self.database_layer = lambda_.LayerVersion(
self,
"DatabaseLayer",
code=lambda_.Code.from_asset("layers/database"),
compatible_runtimes=[lambda_.Runtime.PYTHON_3_9],
description="Database connection utilities and drivers",
layer_version_name="database-utilities"
)
# Monitoring and logging layer
self.monitoring_layer = lambda_.LayerVersion(
self,
"MonitoringLayer",
code=lambda_.Code.from_asset("layers/monitoring"),
compatible_runtimes=[lambda_.Runtime.PYTHON_3_9],
description="Monitoring, logging, and tracing utilities",
layer_version_name="monitoring-utilities"
)
# Performance optimization layer
self.performance_layer = lambda_.LayerVersion(
self,
"PerformanceLayer",
code=lambda_.Code.from_asset("layers/performance"),
compatible_runtimes=[lambda_.Runtime.PYTHON_3_9],
description="Performance optimization utilities",
layer_version_name="performance-utilities"
)
# Lambda runtime cache layer (experimental)
if self.node.try_get_context("enable_runtime_cache"):
self.runtime_cache_layer = lambda_.LayerVersion(
self,
"RuntimeCacheLayer",
code=lambda_.Code.from_inline("""
# Runtime cache layer
import os
import json
import time
from functools import lru_cache, wraps
# Connection cache
_connection_cache = {}
def cached_connection(connection_func):
@wraps(connection_func)
def wrapper(*args, **kwargs):
cache_key = f"{connection_func.__name__}:{hash(str(args) + str(kwargs))}"
if cache_key not in _connection_cache:
_connection_cache[cache_key] = connection_func(*args, **kwargs)
return _connection_cache[cache_key]
return wrapper
# Configuration cache
@lru_cache(maxsize=128)
def get_cached_config(key):
return os.environ.get(key)
# Warm-up function
def lambda_handler(event, context):
# Warm-up logic
if event.get('source') == 'aws.cloudwatch':
return {'statusCode': 200, 'body': 'warmed'}
# Normal processing logic
return event
"""),
compatible_runtimes=[lambda_.Runtime.PYTHON_3_9],
description="Runtime caching and optimization utilities"
)
# Outputs
cdk.CfnOutput(self, "PythonDependenciesLayerArn", value=self.python_dependencies_layer.layer_version_arn)
cdk.CfnOutput(self, "DatabaseLayerArn", value=self.database_layer.layer_version_arn)
cdk.CfnOutput(self, "MonitoringLayerArn", value=self.monitoring_layer.layer_version_arn)
cdk.CfnOutput(self, "PerformanceLayerArn", value=self.performance_layer.layer_version_arn)
Database Performance Optimization
Optimized RDS Construct
# constructs/optimized_rds_construct.py
import aws_cdk as cdk
from aws_cdk import (
aws_rds as rds,
aws_ec2 as ec2,
aws_cloudwatch as cloudwatch,
aws_sns as sns,
aws_secretsmanager as secrets
)
from constructs import Construct
from typing import Optional
class OptimizedRDSConstruct(Construct):
"""Optimized RDS Database Construct"""
def __init__(self, scope: Construct, construct_id: str,
vpc: ec2.Vpc,
engine_type: str = "postgres",
instance_class: str = "db.t3.micro",
multi_az: bool = False,
enable_performance_insights: bool = True,
backup_retention_days: int = 7,
enable_monitoring: bool = True,
**kwargs) -> None:
super().__init__(scope, construct_id, **kwargs)
# Database credentials
self.credentials = rds.DatabaseSecret(
self,
"DatabaseCredentials",
username="dbadmin",
secret_name=f"{construct_id}-db-credentials"
)
# Optimized parameter group
parameter_group = self._create_optimized_parameter_group(engine_type)
# Subnet group
subnet_group = rds.SubnetGroup(
self,
"DatabaseSubnetGroup",
description=f"Subnet group for {construct_id}",
vpc=vpc,
vpc_subnets=ec2.SubnetSelection(
subnet_type=ec2.SubnetType.PRIVATE_ISOLATED
)
)
# Security group
security_group = ec2.SecurityGroup(
self,
"DatabaseSecurityGroup",
vpc=vpc,
description=f"Security group for {construct_id} database",
allow_all_outbound=False
)
# Database instance
self.database = rds.DatabaseInstance(
self,
"DatabaseInstance",
engine=self._get_database_engine(engine_type),
instance_type=ec2.InstanceType(instance_class),
vpc=vpc,
subnet_group=subnet_group,
security_groups=[security_group],
credentials=rds.Credentials.from_secret(self.credentials),
parameter_group=parameter_group,
# Performance optimization
multi_az=multi_az,
performance_insights_enabled=enable_performance_insights,
performance_insights_retention=rds.PerformanceInsightsRetention.DEFAULT,
monitoring_interval=cdk.Duration.seconds(60) if enable_monitoring else None,
enable_performance_insights=enable_performance_insights,
# Storage optimization
storage_type=rds.StorageType.GP2,
allocated_storage=20,
max_allocated_storage=1000, # Auto-scaling
storage_encrypted=True,
# Backups and maintenance
backup_retention=cdk.Duration.days(backup_retention_days),
preferred_backup_window="03:00-04:00",
preferred_maintenance_window="Sun:04:00-Sun:05:00",
delete_automated_backups=True,
deletion_protection=False, # For development
# Logs
cloudwatch_logs_exports=self._get_log_exports(engine_type),
# Auto-upgrade
auto_minor_version_upgrade=True
)
# Read replica (optional)
if self.node.try_get_context("create_read_replica"):
self.read_replica = rds.DatabaseInstanceReadReplica(
self,
"ReadReplica",
source_database_instance=self.database,
instance_type=ec2.InstanceType(instance_class),
vpc=vpc,
subnet_group=subnet_group,
security_groups=[security_group],
performance_insights_enabled=enable_performance_insights,
monitoring_interval=cdk.Duration.seconds(60) if enable_monitoring else None
)
# Performance monitoring
if enable_monitoring:
self._create_performance_monitoring()
# Connection pooling (RDS Proxy)
if self.node.try_get_context("enable_rds_proxy"):
self._create_rds_proxy(vpc, security_group)
def _get_database_engine(self, engine_type: str):
"""Get the database engine configuration"""
engines = {
"postgres": rds.DatabaseInstanceEngine.postgres(
version=rds.PostgresEngineVersion.VER_14_9
),
"mysql": rds.DatabaseInstanceEngine.mysql(
version=rds.MysqlEngineVersion.VER_8_0_35
),
"mariadb": rds.DatabaseInstanceEngine.mariadb(
version=rds.MariaDbEngineVersion.VER_10_6_14
)
}
return engines.get(engine_type, engines["postgres"])
def _create_optimized_parameter_group(self, engine_type: str):
"""Create an optimized parameter group"""
if engine_type == "postgres":
return rds.ParameterGroup(
self,
"PostgreSQLParameterGroup",
engine=self._get_database_engine(engine_type),
parameters={
# Connections and memory
"max_connections": "200",
"shared_buffers": "256MB",
"effective_cache_size": "1GB",
"work_mem": "4MB",
"maintenance_work_mem": "64MB",
# Logging and monitoring
"log_statement": "all",
"log_min_duration_statement": "1000", # Log slow queries
"shared_preload_libraries": "pg_stat_statements",
# Checkpoints and WAL
"checkpoint_completion_target": "0.9",
"wal_buffers": "16MB",
"max_wal_size": "1GB",
"min_wal_size": "80MB",
# Query optimization
"random_page_cost": "1.1",
"seq_page_cost": "1.0",
"cpu_tuple_cost": "0.01",
"cpu_index_tuple_cost": "0.005"
}
)
elif engine_type == "mysql":
return rds.ParameterGroup(
self,
"MySQLParameterGroup",
engine=self._get_database_engine(engine_type),
parameters={
"innodb_buffer_pool_size": "{DBInstanceClassMemory*3/4}",
"max_connections": "200",
"query_cache_type": "1",
"query_cache_size": "32M",
"slow_query_log": "1",
"long_query_time": "1",
"innodb_log_file_size": "128M"
}
)
else:
return None
def _get_log_exports(self, engine_type: str):
"""Get the log export configuration"""
log_configs = {
"postgres": ["postgresql"],
"mysql": ["error", "general", "slowquery"],
"mariadb": ["error", "general", "slowquery"]
}
return log_configs.get(engine_type, [])
def _create_performance_monitoring(self):
"""Create performance monitoring"""
# CPU utilization alarm
cpu_alarm = cloudwatch.Alarm(
self,
"DatabaseCPUAlarm",
alarm_name=f"{self.database.instance_identifier}-cpu-high",
metric=self.database.metric_cpu_utilization(),
threshold=80,
evaluation_periods=3,
datapoints_to_alarm=2
)
# Database connections alarm
connections_alarm = cloudwatch.Alarm(
self,
"DatabaseConnectionsAlarm",
alarm_name=f"{self.database.instance_identifier}-connections-high",
metric=self.database.metric_database_connections(),
threshold=80, # 80% of max_connections
evaluation_periods=2
)
# Free storage space alarm
free_space_alarm = cloudwatch.Alarm(
self,
"DatabaseFreeSpaceAlarm",
alarm_name=f"{self.database.instance_identifier}-free-space-low",
metric=self.database.metric_free_storage_space(),
threshold=1000000000, # 1GB
comparison_operator=cloudwatch.ComparisonOperator.LESS_THAN_THRESHOLD,
evaluation_periods=2
)
# Read latency alarm
read_latency_alarm = cloudwatch.Alarm(
self,
"DatabaseReadLatencyAlarm",
alarm_name=f"{self.database.instance_identifier}-read-latency-high",
metric=self.database.metric_read_latency(),
threshold=0.2, # 200ms
evaluation_periods=3
)
# Write latency alarm
write_latency_alarm = cloudwatch.Alarm(
self,
"DatabaseWriteLatencyAlarm",
alarm_name=f"{self.database.instance_identifier}-write-latency-high",
metric=self.database.metric_write_latency(),
threshold=0.2, # 200ms
evaluation_periods=3
)
def _create_rds_proxy(self, vpc: ec2.Vpc, security_group: ec2.SecurityGroup):
"""Create an RDS Proxy connection pool"""
proxy_security_group = ec2.SecurityGroup(
self,
"ProxySecurityGroup",
vpc=vpc,
description="Security group for RDS Proxy"
)
# Allow the application to access the proxy
proxy_security_group.add_ingress_rule(
peer=security_group,
connection=ec2.Port.tcp(5432) # PostgreSQL port
)
self.proxy = rds.DatabaseProxy(
self,
"DatabaseProxy",
proxy_target=rds.ProxyTarget.from_database(self.database),
secrets=[self.credentials],
vpc=vpc,
security_groups=[proxy_security_group],
# Connection pool configuration
max_connections_percent=100,
max_idle_connections_percent=50,
require_tls=True,
# Authentication
auth=[
rds.AuthFormat.secrets(
secret=self.credentials,
)
],
# Session pinning filter
session_pinning_filters=[
rds.SessionPinningFilter.EXCLUDE_VARIABLE_SETS
]
)
Cost Optimization Strategies
Cost Monitoring Construct
# constructs/cost_optimization_construct.py
import aws_cdk as cdk
from aws_cdk import (
aws_budgets as budgets,
aws_sns as sns,
aws_sns_subscriptions as subscriptions,
aws_lambda as lambda_,
aws_events as events,
aws_events_targets as targets,
aws_iam as iam
)
from constructs import Construct
from typing import List, Dict
class CostOptimizationConstruct(Construct):
"""Cost Optimization Construct"""
def __init__(self, scope: Construct, construct_id: str,
budget_limit: float,
alert_emails: List[str],
cost_allocation_tags: Dict[str, str] = None,
**kwargs) -> None:
super().__init__(scope, construct_id, **kwargs)
# SNS topic for cost alerts
self.cost_alert_topic = sns.Topic(
self,
"CostAlertTopic",
topic_name="cost-optimization-alerts"
)
# Add email subscriptions
for email in alert_emails:
self.cost_alert_topic.add_subscription(
subscriptions.EmailSubscription(email)
)
# Budget configuration
self._create_budgets(budget_limit)
# Cost anomaly detection
self._create_cost_anomaly_detection()
# Automated cost optimization
self._create_cost_optimization_lambda()
# Resource tagging enforcement
if cost_allocation_tags:
self._create_resource_tagging_lambda(cost_allocation_tags)
def _create_budgets(self, budget_limit: float):
"""Create budgets and alarms"""
# Total cost budget
total_budget = budgets.CfnBudget(
self,
"TotalCostBudget",
budget=budgets.CfnBudget.BudgetDataProperty(
budget_name="total-monthly-budget",
budget_limit=budgets.CfnBudget.SpendProperty(
amount=budget_limit,
unit="USD"
),
time_unit="MONTHLY",
budget_type="COST",
cost_filters=budgets.CfnBudget.CostFiltersProperty(
# Can add specific filters
)
),
notifications_with_subscribers=[
budgets.CfnBudget.NotificationWithSubscribersProperty(
notification=budgets.CfnBudget.NotificationProperty(
notification_type="ACTUAL",
comparison_operator="GREATER_THAN",
threshold=80, # 80% threshold
threshold_type="PERCENTAGE"
),
subscribers=[
budgets.CfnBudget.SubscriberProperty(
subscription_type="EMAIL",
address=email
) for email in ["admin@example.com"] # Replace with actual email
]
),
budgets.CfnBudget.NotificationWithSubscribersProperty(
notification=budgets.CfnBudget.NotificationProperty(
notification_type="FORECASTED",
comparison_operator="GREATER_THAN",
threshold=100, # 100% forecast threshold
threshold_type="PERCENTAGE"
),
subscribers=[
budgets.CfnBudget.SubscriberProperty(
subscription_type="EMAIL",
address=email
) for email in ["admin@example.com"]
]
)
]
)
# Service-level budgets
services = ["AmazonEC2", "AmazonRDS", "AWSLambda", "AmazonS3"]
for service in services:
service_budget = budgets.CfnBudget(
self,
f"{service}Budget",
budget=budgets.CfnBudget.BudgetDataProperty(
budget_name=f"{service.lower()}-monthly-budget",
budget_limit=budgets.CfnBudget.SpendProperty(
amount=budget_limit * 0.3, # 30% of total budget per service
unit="USD"
),
time_unit="MONTHLY",
budget_type="COST",
cost_filters=budgets.CfnBudget.CostFiltersProperty(
services=[service]
)
),
notifications_with_subscribers=[
budgets.CfnBudget.NotificationWithSubscribersProperty(
notification=budgets.CfnBudget.NotificationProperty(
notification_type="ACTUAL",
comparison_operator="GREATER_THAN",
threshold=90,
threshold_type="PERCENTAGE"
),
subscribers=[
budgets.CfnBudget.SubscriberProperty(
subscription_type="SNS",
address=self.cost_alert_topic.topic_arn
)
]
)
]
)
def _create_cost_anomaly_detection(self):
"""Create cost anomaly detection"""
from aws_cdk import aws_ce as ce
# Cost anomaly detector
anomaly_detector = ce.CfnAnomalyDetector(
self,
"CostAnomalyDetector",
anomaly_detector_name="cost-anomaly-detector",
monitor_type="DIMENSIONAL",
monitor_specification=ce.CfnAnomalyDetector.MonitorSpecificationProperty(
dimension="SERVICE",
match_options=["EQUALS"],
values=["EC2-Instance", "Lambda", "RDS"]
)
)
# Anomaly detection subscription
ce.CfnAnomalySubscription(
self,
"CostAnomalySubscription",
subscription_name="cost-anomaly-alerts",
frequency="DAILY",
monitor_arn_list=[anomaly_detector.attr_anomaly_detector_arn],
subscribers=[
ce.CfnAnomalySubscription.SubscriberProperty(
type="EMAIL",
address="admin@example.com" # Replace with actual email
),
ce.CfnAnomalySubscription.SubscriberProperty(
type="SNS",
address=self.cost_alert_topic.topic_arn
)
],
threshold_expression=ce.CfnAnomalySubscription.ThresholdExpressionProperty(
and_=[
ce.CfnAnomalySubscription.ThresholdExpressionProperty(
dimension=ce.CfnAnomalySubscription.DimensionProperty(
key="ANOMALY_TOTAL_IMPACT_ABSOLUTE",
values=["100"] # $100 threshold
)
)
]
)
)
def _create_cost_optimization_lambda(self):
"""Create an automated cost optimization Lambda"""
self.cost_optimization_function = lambda_.Function(
self,
"CostOptimizationFunction",
runtime=lambda_.Runtime.PYTHON_3_9,
handler="cost_optimizer.handler",
code=lambda_.Code.from_inline("""import boto3
import json
import logging
from datetime import datetime, timedelta
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def handler(event, context):
ec2 = boto3.client('ec2')
rds = boto3.client('rds')
cloudwatch = boto3.client('cloudwatch')
sns = boto3.client('sns')
optimization_actions = []
try:
# Check for unused EBS volumes
unused_volumes = find_unused_ebs_volumes(ec2)
optimization_actions.extend(unused_volumes)
# Check for idle RDS instances
idle_rds_instances = find_idle_rds_instances(rds, cloudwatch)
optimization_actions.extend(idle_rds_instances)
# Check for unused Elastic IPs
unused_eips = find_unused_elastic_ips(ec2)
optimization_actions.extend(unused_eips)
# Generate a report
if optimization_actions:
report = generate_optimization_report(optimization_actions)
# Send a notification
sns.publish(
TopicArn=os.environ['COST_ALERT_TOPIC_ARN'],
Subject='Cost Optimization Recommendations',
Message=report
)
return {
'statusCode': 200,
'body': json.dumps({
'message': 'Cost optimization check completed',
'actions_found': len(optimization_actions)
})
}
except Exception as e:
logger.error(f'Cost optimization error: {str(e)}')
return {
'statusCode': 500,
'body': json.dumps({'error': str(e)})
}
def find_unused_ebs_volumes(ec2):
volumes = ec2.describe_volumes(
Filters=[
{'Name': 'state', 'Values': ['available']}
]
)
unused_volumes = []
for volume in volumes['Volumes']:
unused_volumes.append({
'type': 'unused_ebs_volume',
'resource_id': volume['VolumeId'],
'size': volume['Size'],
'cost_estimate': volume['Size'] * 0.10 # $0.10 per GB per month
})
return unused_volumes
def find_idle_rds_instances(rds, cloudwatch):
instances = rds.describe_db_instances()
idle_instances = []
for instance in instances['DBInstances']:
if instance['DBInstanceStatus'] == 'available':
# Check CPU utilization for the last 7 days
end_time = datetime.utcnow()
start_time = end_time - timedelta(days=7)
cpu_metrics = cloudwatch.get_metric_statistics(
Namespace='AWS/RDS',
MetricName='CPUUtilization',
Dimensions=[
{'Name': 'DBInstanceIdentifier', 'Value': instance['DBInstanceIdentifier']}
],
StartTime=start_time,
EndTime=end_time,
Period=86400, # 1 day
Statistics=['Average']
)
if cpu_metrics['Datapoints']:
avg_cpu = sum(dp['Average'] for dp in cpu_metrics['Datapoints']) / len(cpu_metrics['Datapoints'])
if avg_cpu < 5: # CPU utilization below 5%
idle_instances.append({
'type': 'idle_rds_instance',
'resource_id': instance['DBInstanceIdentifier'],
'instance_class': instance['DBInstanceClass'],
'avg_cpu': avg_cpu
})
return idle_instances
def find_unused_elastic_ips(ec2):
addresses = ec2.describe_addresses()
unused_eips = []
for address in addresses['Addresses']:
if 'InstanceId' not in address and 'NetworkInterfaceId' not in address:
unused_eips.append({
'type': 'unused_elastic_ip',
'resource_id': address['PublicIp'],
'allocation_id': address['AllocationId'],
'cost_estimate': 3.65 # $0.005 per hour * 24 * 30.5
})
return unused_eips
def generate_optimization_report(actions):
total_potential_savings = 0
report_lines = ["Cost Optimization Recommendations Report", "=" * 30, ""]
for action in actions:
if action['type'] == 'unused_ebs_volume':
report_lines.append(f"Unused EBS volume: {action['resource_id']}")
report_lines.append(f" - Size: {action['size']} GB")
report_lines.append(f" - Estimated monthly cost: ${action['cost_estimate']:.2f}")
total_potential_savings += action['cost_estimate']
elif action['type'] == 'idle_rds_instance':
report_lines.append(f"Idle RDS instance: {action['resource_id']}")
report_lines.append(f" - Instance type: {action['instance_class']}")
report_lines.append(f" - Average CPU utilization: {action['avg_cpu']:.2f}%")
elif action['type'] == 'unused_elastic_ip':
report_lines.append(f"Unused Elastic IP: {action['resource_id']}")
report_lines.append(f" - Estimated monthly cost: ${action['cost_estimate']:.2f}")
total_potential_savings += action['cost_estimate']
report_lines.append("")
report_lines.append(f"Total potential savings: ${total_potential_savings:.2f}/month")
return "\n".join(report_lines)
"""),
timeout=cdk.Duration.minutes(5),
environment={
"COST_ALERT_TOPIC_ARN": self.cost_alert_topic.topic_arn
}
)
# Add necessary permissions
self.cost_optimization_function.add_to_role_policy(
iam.PolicyStatement(
effect=iam.Effect.ALLOW,
actions=[
"ec2:DescribeVolumes",
"ec2:DescribeAddresses",
"rds:DescribeDBInstances",
"cloudwatch:GetMetricStatistics",
"sns:Publish"
],
resources=["*"]
)
)
# Schedule cost optimization checks
events.Rule(
self,
"CostOptimizationSchedule",
schedule=events.Schedule.rate(cdk.Duration.days(1)),
targets=[targets.LambdaFunction(self.cost_optimization_function)]
)
def _create_resource_tagging_lambda(self, cost_allocation_tags: Dict[str, str]):
"""Create a resource tagging Lambda"""
self.tagging_function = lambda_.Function(
self,
"ResourceTaggingFunction",
runtime=lambda_.Runtime.PYTHON_3_9,
handler="resource_tagger.handler",
code=lambda_.Code.from_inline(f"""import boto3
import json
import logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
REQUIRED_TAGS = {json.dumps(cost_allocation_tags)}
def handler(event, context):
# Get resources to tag
ec2 = boto3.client('ec2')
rds = boto3.client('rds')
lambda_client = boto3.client('lambda')
s3 = boto3.client('s3')
try:
# Tag EC2 instances
tag_ec2_resources(ec2)
# Tag RDS instances
tag_rds_resources(rds)
# Tag Lambda functions
tag_lambda_functions(lambda_client)
# Tag S3 buckets
tag_s3_buckets(s3)
return {{
'statusCode': 200,
'body': json.dumps('Resource tagging completed successfully')
}}
except Exception as e:
logger.error(f'Resource tagging error: {{str(e)}}')
return {{
'statusCode': 500,
'body': json.dumps({{'error': str(e)}})
}}
def tag_ec2_resources(ec2):
instances = ec2.describe_instances()
required_tags = json.loads(REQUIRED_TAGS)
for reservation in instances['Reservations']:
for instance in reservation['Instances']:
instance_id = instance['InstanceId']
existing_tags = {{tag['Key']: tag['Value'] for tag in instance.get('Tags', [])}}
tags_to_add = []
for key, value in required_tags.items():
if key not in existing_tags:
tags_to_add.append({{'Key': key, 'Value': value}})
if tags_to_add:
ec2.create_tags(Resources=[instance_id], Tags=tags_to_add)
logger.info(f'Tagged EC2 instance {{instance_id}} with {{len(tags_to_add)}} tags')
def tag_rds_resources(rds):
instances = rds.describe_db_instances()
required_tags = json.loads(REQUIRED_TAGS)
for instance in instances['DBInstances']:
instance_arn = instance['DBInstanceArn']
try:
existing_tags = rds.list_tags_for_resource(ResourceName=instance_arn)
existing_tag_keys = {{tag['Key'] for tag in existing_tags['TagList']}}
tags_to_add = []
for key, value in required_tags.items():
if key not in existing_tag_keys:
tags_to_add.append({{'Key': key, 'Value': value}})
if tags_to_add:
rds.add_tags_to_resource(ResourceName=instance_arn, Tags=tags_to_add)
logger.info(f'Tagged RDS instance {{instance["DBInstanceIdentifier"]}} with {{len(tags_to_add)}} tags')
except Exception as e:
logger.error(f'Error tagging RDS instance {{instance["DBInstanceIdentifier"]}}: {{e}}')
def tag_lambda_functions(lambda_client):
functions = lambda_client.list_functions()
required_tags = json.loads(REQUIRED_TAGS)
for function in functions['Functions']:
function_arn = function['FunctionArn']
try:
existing_tags = lambda_client.list_tags(Resource=function_arn)
tags_to_add = {{}}
for key, value in required_tags.items():
if key not in existing_tags['Tags']:
tags_to_add[key] = value
if tags_to_add:
lambda_client.tag_resource(Resource=function_arn, Tags=tags_to_add)
logger.info(f'Tagged Lambda function {{function["FunctionName"]}} with {{len(tags_to_add)}} tags')
except Exception as e:
logger.error(f'Error tagging Lambda function {{function["FunctionName"]}}: {{e}}')
def tag_s3_buckets(s3):
buckets = s3.list_buckets()
required_tags = json.loads(REQUIRED_TAGS)
for bucket in buckets['Buckets']:
bucket_name = bucket['Name']
try:
try:
existing_tags = s3.get_bucket_tagging(Bucket=bucket_name)
existing_tag_keys = {{tag['Key'] for tag in existing_tags['TagSet']}}
except s3.exceptions.ClientError:
existing_tag_keys = set()
tags_to_add = []
for key, value in required_tags.items():
if key not in existing_tag_keys:
tags_to_add.append({{'Key': key, 'Value': value}})
if tags_to_add:
all_tags = list(existing_tags.get('TagSet', [])) + tags_to_add
s3.put_bucket_tagging(
Bucket=bucket_name,
Tagging={{'TagSet': all_tags}}
)
logger.info(f'Tagged S3 bucket {{bucket_name}} with {{len(tags_to_add)}} tags')
except Exception as e:
logger.error(f'Error tagging S3 bucket {{bucket_name}}: {{e}}')
"""),
timeout=cdk.Duration.minutes(10)
)
# Add necessary permissions
self.tagging_function.add_to_role_policy(
iam.PolicyStatement(
effect=iam.Effect.ALLOW,
actions=[
"ec2:DescribeInstances",
"ec2:CreateTags",
"rds:DescribeDBInstances",
"rds:ListTagsForResource",
"rds:AddTagsToResource",
"lambda:ListFunctions",
"lambda:ListTags",
"lambda:TagResource",
"s3:ListAllMyBuckets",
"s3:GetBucketTagging",
"s3:PutBucketTagging"
],
resources=["*"]
)
)
# Schedule resource tagging
events.Rule(
self,
"ResourceTaggingSchedule",
schedule=events.Schedule.rate(cdk.Duration.hours(6)),
targets=[targets.LambdaFunction(self.tagging_function)]
)
Auto Scaling and Resource Optimization
Intelligent Scaling Construct
# constructs/intelligent_scaling_construct.py
import aws_cdk as cdk
from aws_cdk import (
aws_autoscaling as autoscaling,
aws_ec2 as ec2,
aws_cloudwatch as cloudwatch,
aws_applicationautoscaling as app_autoscaling,
aws_lambda as lambda_,
aws_iam as iam
)
from constructs import Construct
from typing import List, Dict
class IntelligentScalingConstruct(Construct):
"""Intelligent Scaling Construct"""
def __init__(self, scope: Construct, construct_id: str,
vpc: ec2.Vpc,
**kwargs) -> None:
super().__init__(scope, construct_id, **kwargs)
# Create a predictive scaling Lambda
self.predictive_scaling_function = self._create_predictive_scaling_lambda()
# Create cost-aware scaling policies
self._create_cost_aware_scaling_policies()
# Create scaling based on business metrics
self._create_business_metric_scaling()
def create_optimized_auto_scaling_group(self,
instance_type: str = "t3.medium",
min_capacity: int = 1,
max_capacity: int = 10,
target_cpu: int = 60) -> autoscaling.AutoScalingGroup:
"""Create an optimized Auto Scaling Group"""
# Launch template
launch_template = ec2.LaunchTemplate(
self,
"OptimizedLaunchTemplate",
instance_type=ec2.InstanceType(instance_type),
machine_image=ec2.AmazonLinuxImage(
generation=ec2.AmazonLinuxGeneration.AMAZON_LINUX_2
),
user_data=ec2.UserData.for_linux(),
# Performance optimization
nitro_enclave_enabled=False,
hibernation_configured=False,
# Security configuration
security_group=self._create_optimized_security_group(),
# Storage optimization
block_devices=[
ec2.BlockDevice(
device_name="/dev/xvda",
volume=ec2.BlockDeviceVolume.ebs(
volume_size=20,
volume_type=ec2.EbsDeviceVolumeType.GP3, # GP3 is cheaper
encrypted=True,
delete_on_termination=True
)
)
]
)
# Auto Scaling Group
asg = autoscaling.AutoScalingGroup(
self,
"OptimizedASG",
vpc=self.vpc,
launch_template=launch_template,
min_capacity=min_capacity,
max_capacity=max_capacity,
desired_capacity=min_capacity,
# Mixed instances policy (cost optimization)
mixed_instances_policy=autoscaling.MixedInstancesPolicy(
launch_template=launch_template,
instances_distribution=autoscaling.InstancesDistribution(
on_demand_base_capacity=1, # At least one on-demand instance
on_demand_percentage_above_base_capacity=25, # 25% on-demand, 75% Spot
spot_allocation_strategy=autoscaling.SpotAllocationStrategy.DIVERSIFIED
),
launch_template_overrides=[
# Provide multiple instance type options
autoscaling.LaunchTemplateOverrides(instance_type=ec2.InstanceType("t3.medium")),
autoscaling.LaunchTemplateOverrides(instance_type=ec2.InstanceType("t3a.medium")),
autoscaling.LaunchTemplateOverrides(instance_type=ec2.InstanceType("t2.medium")),
autoscaling.LaunchTemplateOverrides(instance_type=ec2.InstanceType("m5.large")),
]
),
# Health check
health_check=autoscaling.HealthCheck.elb(grace_period=cdk.Duration.minutes(5)),
# Update policy
update_policy=autoscaling.UpdatePolicy.rolling_update(
min_instances_in_service=1,
max_batch_size=2,
pause_time=cdk.Duration.minutes(5)
),
# Termination policy - terminate the oldest instance first
termination_policies=[autoscaling.TerminationPolicy.OLDEST_INSTANCE]
)
# Multi-metric scaling policy
asg.scale_on_cpu_utilization(
"CPUScaling",
target_utilization_percent=target_cpu,
scale_in_cooldown=cdk.Duration.minutes(5),
scale_out_cooldown=cdk.Duration.minutes(2)
)
# Scaling based on memory utilization
memory_metric = cloudwatch.Metric(
namespace="CWAgent",
metric_name="mem_used_percent",
dimensions_map={"AutoScalingGroupName": asg.auto_scaling_group_name}
)
asg.scale_on_metric(
"MemoryScaling",
metric=memory_metric,
scaling_steps=[
{"lower": 0, "upper": 60, "change": 0},
{"lower": 60, "upper": 80, "change": +1},
{"lower": 80, "upper": 90, "change": +2},
{"lower": 90, "change": +3}
],
adjustment_type=autoscaling.AdjustmentType.CHANGE_IN_CAPACITY,
cooldown=cdk.Duration.minutes(3)
)
# Predictive scaling
if self.node.try_get_context("enable_predictive_scaling"):
self._setup_predictive_scaling(asg)
return asg
def _create_optimized_security_group(self) -> ec2.SecurityGroup:
"""Create an optimized security group"""
sg = ec2.SecurityGroup(
self,
"OptimizedSecurityGroup",
vpc=self.vpc,
description="Optimized security group with minimal required access",
allow_all_outbound=False
)
# Only allow necessary outbound traffic
sg.add_egress_rule(
peer=ec2.Peer.any_ipv4(),
connection=ec2.Port.tcp(80),
description="HTTP outbound"
)
sg.add_egress_rule(
peer=ec2.Peer.any_ipv4(),
connection=ec2.Port.tcp(443),
description="HTTPS outbound"
)
return sg
def _create_predictive_scaling_lambda(self) -> lambda_.Function:
"""Create a predictive scaling Lambda"""
function = lambda_.Function(
self,
"PredictiveScalingFunction",
runtime=lambda_.Runtime.PYTHON_3_9,
handler="predictive_scaling.handler",
code=lambda_.Code.from_inline("""import boto3
import json
import logging
from datetime import datetime, timedelta
import math
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def handler(event, context):
cloudwatch = boto3.client('cloudwatch')
autoscaling = boto3.client('autoscaling')
try:
# Get historical data
end_time = datetime.utcnow()
start_time = end_time - timedelta(days=7) # Analyze the last 7 days of data
# Get CPU utilization data
cpu_data = cloudwatch.get_metric_statistics(
Namespace='AWS/EC2',
MetricName='CPUUtilization',
Dimensions=[
{'Name': 'AutoScalingGroupName', 'Value': event['asg_name']}
],
StartTime=start_time,
EndTime=end_time,
Period=3600, # 1 hour periods
Statistics=['Average']
)
# Simple prediction algorithm: based on historical average and trend
if len(cpu_data['Datapoints']) >= 24: # At least 24 hours of data is needed
sorted_data = sorted(cpu_data['Datapoints'], key=lambda x: x['Timestamp'])
recent_values = [dp['Average'] for dp in sorted_data[-24:]] # Last 24 hours
# Calculate the trend
avg_cpu = sum(recent_values) / len(recent_values)
trend = calculate_trend(recent_values)
# Predict CPU utilization for the next hour
predicted_cpu = avg_cpu + trend
# Calculate the recommended number of instances based on the prediction
current_capacity = get_current_capacity(autoscaling, event['asg_name'])
recommended_capacity = calculate_recommended_capacity(predicted_cpu, current_capacity)
# If capacity needs to be adjusted
if recommended_capacity != current_capacity:
logger.info(f'Recommending capacity change: {current_capacity} -> {recommended_capacity}')
# Can implement auto-adjustment logic here, or just send a notification
if event.get('auto_adjust', False):
adjust_capacity(autoscaling, event['asg_name'], recommended_capacity)
return {
'statusCode': 200,
'body': json.dumps('Predictive scaling analysis completed')
}
except Exception as e:
logger.error(f'Predictive scaling error: {str(e)}')
return {
'statusCode': 500,
'body': json.dumps({'error': str(e)})
}
def calculate_trend(values):
n = len(values)
if n < 2:
return 0
# Simple linear trend calculation
x_sum = sum(range(n))
y_sum = sum(values)
xy_sum = sum(i * values[i] for i in range(n))
x_sq_sum = sum(i * i for i in range(n))
slope = (n * xy_sum - x_sum * y_sum) / (n * x_sq_sum - x_sum * x_sum)
return slope
def get_current_capacity(autoscaling, asg_name):
response = autoscaling.describe_auto_scaling_groups(
AutoScalingGroupNames=[asg_name]
)
return response['AutoScalingGroups'][0]['DesiredCapacity']
def calculate_recommended_capacity(predicted_cpu, current_capacity):
# Simple capacity calculation logic
if predicted_cpu > 80:
return min(current_capacity + 2, 10) # Max 10 instances
elif predicted_cpu > 60:
return min(current_capacity + 1, 10)
elif predicted_cpu < 30:
return max(current_capacity - 1, 1) # Min 1 instance
else:
return current_capacity
def adjust_capacity(autoscaling, asg_name, new_capacity):
autoscaling.set_desired_capacity(
AutoScalingGroupName=asg_name,
DesiredCapacity=new_capacity,
HonorCooldown=True
)
"""),
timeout=cdk.Duration.minutes(5)
)
function.add_to_role_policy(
iam.PolicyStatement(
effect=iam.Effect.ALLOW,
actions=[
"cloudwatch:GetMetricStatistics",
"autoscaling:DescribeAutoScalingGroups",
"autoscaling:SetDesiredCapacity"
],
resources=["*"]
)
)
return function
def _create_cost_aware_scaling_policies(self):
"""Create cost-aware scaling policies"""
# Cost-aware scaling Lambda
cost_aware_function = lambda_.Function(
self,
"CostAwareScalingFunction",
runtime=lambda_.Runtime.PYTHON_3_9,
handler="cost_aware_scaling.handler",
code=lambda_.Code.from_inline("""import boto3
import json
import logging
from datetime import datetime, timedelta
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def handler(event, context):
# Get current Spot instance prices
ec2 = boto3.client('ec2')
autoscaling = boto3.client('autoscaling')
try:
# Get Spot price history
spot_prices = ec2.describe_spot_price_history(
InstanceTypes=['t3.medium', 't3a.medium', 'm5.large'],
ProductDescriptions=['Linux/UNIX'],
MaxResults=10,
StartTime=datetime.utcnow() - timedelta(hours=1)
)
# Select the cheapest instance type
cheapest_instance = min(spot_prices['SpotPriceHistory'],
key=lambda x: float(x['SpotPrice']))
logger.info(f'Cheapest Spot instance: {cheapest_instance['InstanceType']} at ${cheapest_instance['SpotPrice']}')
# Adjust scaling policy based on cost
# If Spot prices are low, scale more aggressively
spot_price = float(cheapest_instance['SpotPrice'])
# Dynamically adjust scaling threshold
if spot_price < 0.02: # Very cheap
cpu_threshold = 50 # Lower CPU threshold, scale out sooner
elif spot_price < 0.05: # Medium price
cpu_threshold = 70
else: # More expensive
cpu_threshold = 85 # Higher threshold, reduce scaling
# Can dynamically update the Auto Scaling policy here
# Actual implementation requires more complex logic
return {
'statusCode': 200,
'body': json.dumps({
'cheapest_instance': cheapest_instance['InstanceType'],
'spot_price': spot_price,
'recommended_cpu_threshold': cpu_threshold
})
}
except Exception as e:
logger.error(f'Cost-aware scaling error: {str(e)}')
return {
'statusCode': 500,
'body': json.dumps({'error': str(e)})
}
"""),
timeout=cdk.Duration.minutes(3)
)
cost_aware_function.add_to_role_policy(
iam.PolicyStatement(
effect=iam.Effect.ALLOW,
actions=[
"ec2:DescribeSpotPriceHistory",
"autoscaling:PutScalingPolicy"
],
resources=["*"]
)
)
def _create_business_metric_scaling(self):
"""Scaling based on business metrics"""
# Custom business metric
business_metric = cloudwatch.Metric(
namespace="CustomApp/Business",
metric_name="ActiveUsers",
statistic="Average"
)
# This can be used by an external ASG
self.business_scaling_metric = business_metric
def _setup_predictive_scaling(self, asg: autoscaling.AutoScalingGroup):
"""Set up predictive scaling"""
# Create a scheduled rule to invoke predictive scaling
from aws_cdk import aws_events as events
from aws_cdk import aws_events_targets as targets
events.Rule(
self,
"PredictiveScalingSchedule",
schedule=events.Schedule.rate(cdk.Duration.hours(1)),
targets=[
targets.LambdaFunction(
self.predictive_scaling_function,
event=events.RuleTargetInput.from_object({
"asg_name": asg.auto_scaling_group_name,
"auto_adjust": True
})
)
]
)
Summary of Performance and Cost Optimization Best Practices
- Continuous Monitoring: Establish a comprehensive performance and cost monitoring system
- Right-size Resources: Select appropriate instance types and sizes based on actual needs
- Elastic Scaling: Use auto-scaling to reduce resource waste
- Spot Instances: Judiciously use Spot Instances to reduce compute costs
- Reserved Instances: Use Reserved Instances for stable workloads
- Data Lifecycle: Properly configure lifecycle policies for data storage
- Caching Strategy: Use CDNs and caching to reduce redundant computations
- Budget Control: Set budgets and alarms to prevent unexpected expenses
- Resource Tagging: Use tags for cost allocation and tracking
- Regular Optimization: Periodically review and optimize resource configurations
By completing this chapter, you should be able to design and implement comprehensive performance optimization and cost control strategies to achieve efficient and economical cloud infrastructure management.