Chapter 10: Performance Optimization and Cost Control
Haiyue
59min
Chapter 10: Performance Optimization and Cost Control
Learning Objectives
- Master CDK application performance optimization strategies
- Understand AWS service cost structure and optimization methods
- Learn to use AWS cost management tools
- Implement automated cost monitoring and budget control
- Master resource right-sizing and elastic scaling strategies
- Understand multi-environment cost management best practices
Performance Optimization Overview
Performance optimization is a continuous process involving architecture design, resource configuration, code optimization, and many other aspects.
🔄 正在渲染 Mermaid 图表...
Lambda Performance Optimization
Optimized Lambda Construct
# constructs/optimized_lambda_construct.py
import aws_cdk as cdk
from aws_cdk import (
aws_lambda as lambda_,
aws_logs as logs,
aws_iam as iam,
aws_ec2 as ec2
)
from constructs import Construct
from typing import Optional, Dict, List
class OptimizedLambdaConstruct(Construct):
"""Optimized Lambda function construct"""
def __init__(self, scope: Construct, construct_id: str,
function_name: str,
handler: str,
code_asset_path: str,
runtime: lambda_.Runtime = lambda_.Runtime.PYTHON_3_9,
memory_size: int = 256,
timeout_seconds: int = 30,
environment_variables: Optional[Dict[str, str]] = None,
layers: Optional[List[lambda_.LayerVersion]] = None,
vpc: Optional[ec2.Vpc] = None,
enable_tracing: bool = True,
enable_provisioned_concurrency: bool = False,
provisioned_concurrency_count: int = 1,
reserved_concurrent_executions: Optional[int] = None,
**kwargs) -> None:
super().__init__(scope, construct_id, **kwargs)
# Optimized environment variables
optimized_env = {
# Connection reuse
"PYTHONHTTPSVERIFY": "0" if runtime.name.startswith("python") else None,
# Reduce cold starts
"AWS_LAMBDA_EXEC_WRAPPER": "/opt/bootstrap",
}
if environment_variables:
optimized_env.update(environment_variables)
# Filter None values
optimized_env = {k: v for k, v in optimized_env.items() if v is not None}
# Lambda function
self.function = lambda_.Function(
self,
"Function",
function_name=function_name,
runtime=runtime,
handler=handler,
code=lambda_.Code.from_asset(code_asset_path),
memory_size=memory_size,
timeout=cdk.Duration.seconds(timeout_seconds),
environment=optimized_env,
layers=layers or [],
vpc=vpc,
# Performance optimization configuration
reserved_concurrent_executions=reserved_concurrent_executions,
tracing=lambda_.Tracing.ACTIVE if enable_tracing else lambda_.Tracing.DISABLED,
# Log configuration
log_retention=logs.RetentionDays.ONE_MONTH,
# Architecture optimization (ARM64 is usually cheaper)
architecture=lambda_.Architecture.ARM_64,
# Dead letter queue
dead_letter_queue_enabled=True,
)
# Provisioned Concurrency
if enable_provisioned_concurrency:
version = self.function.current_version
alias = lambda_.Alias(
self,
"ProdAlias",
alias_name="prod",
version=version
)
alias.add_provisioned_concurrency_config(
"ProvisionedConcurrency",
provisioned_concurrent_executions=provisioned_concurrency_count
)
self.alias = alias
else:
self.alias = None
# Performance monitoring alarms
self._create_performance_alarms()
# Lambda Insights (optional)
if self.node.try_get_context("enable_lambda_insights"):
self.function.add_layers(
lambda_.LayerVersion.from_layer_version_arn(
self,
"LambdaInsightsLayer",
layer_version_arn=f"arn:aws:lambda:{cdk.Aws.REGION}:580247275435:layer:LambdaInsightsExtension:14"
)
)
def _create_performance_alarms(self):
"""Create performance monitoring alarms"""
from aws_cdk import aws_cloudwatch as cloudwatch
from aws_cdk import aws_sns as sns
# Error rate alarm
error_alarm = cloudwatch.Alarm(
self,
"ErrorAlarm",
alarm_name=f"{self.function.function_name}-errors",
metric=self.function.metric_errors(),
threshold=5,
evaluation_periods=2,
datapoints_to_alarm=2
)
# Duration alarm
duration_alarm = cloudwatch.Alarm(
self,
"DurationAlarm",
alarm_name=f"{self.function.function_name}-duration",
metric=self.function.metric_duration(),
threshold=10000, # 10 seconds
evaluation_periods=3,
datapoints_to_alarm=2
)
# Cold start monitoring
cold_start_metric = cloudwatch.Metric(
namespace="AWS/Lambda",
metric_name="Duration",
dimensions_map={
"FunctionName": self.function.function_name
},
statistic="Maximum"
)
cold_start_alarm = cloudwatch.Alarm(
self,
"ColdStartAlarm",
alarm_name=f"{self.function.function_name}-cold-start",
metric=cold_start_metric,
threshold=5000, # 5 seconds
evaluation_periods=2
)
# Concurrent executions alarm
concurrent_executions_alarm = cloudwatch.Alarm(
self,
"ConcurrentExecutionsAlarm",
alarm_name=f"{self.function.function_name}-concurrent-executions",
metric=self.function.metric_invocations(),
threshold=100, # Adjust based on actual needs
evaluation_periods=2
)
def add_performance_dashboard_widgets(self, dashboard):
"""Add performance monitoring widgets to dashboard"""
from aws_cdk import aws_cloudwatch as cloudwatch
dashboard.add_widgets(
cloudwatch.GraphWidget(
title=f"{self.function.function_name} - Invocations & Errors",
left=[self.function.metric_invocations()],
right=[self.function.metric_errors()],
width=12,
height=6
),
cloudwatch.GraphWidget(
title=f"{self.function.function_name} - Duration & Throttles",
left=[self.function.metric_duration()],
right=[self.function.metric_throttles()],
width=12,
height=6
)
)
Lambda Layer Optimization
# stacks/lambda_layers_stack.py
import aws_cdk as cdk
from aws_cdk import (
aws_lambda as lambda_,
aws_s3 as s3,
aws_s3_deployment as s3_deployment
)
from constructs import Construct
class LambdaLayersStack(cdk.Stack):
"""Lambda Layers Optimization Stack"""
def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
super().__init__(scope, construct_id, **kwargs)
# Python dependencies layer
self.python_dependencies_layer = lambda_.LayerVersion(
self,
"PythonDependenciesLayer",
code=lambda_.Code.from_asset("layers/python-dependencies"),
compatible_runtimes=[
lambda_.Runtime.PYTHON_3_9,
lambda_.Runtime.PYTHON_3_10,
lambda_.Runtime.PYTHON_3_11
],
compatible_architectures=[
lambda_.Architecture.X86_64,
lambda_.Architecture.ARM_64
],
description="Common Python dependencies (boto3, requests, etc.)",
layer_version_name="python-dependencies"
)
# Database connection layer
self.database_layer = lambda_.LayerVersion(
self,
"DatabaseLayer",
code=lambda_.Code.from_asset("layers/database"),
compatible_runtimes=[lambda_.Runtime.PYTHON_3_9],
description="Database connection utilities and drivers",
layer_version_name="database-utilities"
)
# Monitoring and logging layer
self.monitoring_layer = lambda_.LayerVersion(
self,
"MonitoringLayer",
code=lambda_.Code.from_asset("layers/monitoring"),
compatible_runtimes=[lambda_.Runtime.PYTHON_3_9],
description="Monitoring, logging, and tracing utilities",
layer_version_name="monitoring-utilities"
)
# Performance optimization layer
self.performance_layer = lambda_.LayerVersion(
self,
"PerformanceLayer",
code=lambda_.Code.from_asset("layers/performance"),
compatible_runtimes=[lambda_.Runtime.PYTHON_3_9],
description="Performance optimization utilities",
layer_version_name="performance-utilities"
)
# Lambda runtime cache layer (experimental)
if self.node.try_get_context("enable_runtime_cache"):
self.runtime_cache_layer = lambda_.LayerVersion(
self,
"RuntimeCacheLayer",
code=lambda_.Code.from_inline("""
# Runtime cache layer
import os
import json
import time
from functools import lru_cache, wraps
# Connection cache
_connection_cache = {}
def cached_connection(connection_func):
@wraps(connection_func)
def wrapper(*args, **kwargs):
cache_key = f"{connection_func.__name__}:{hash(str(args) + str(kwargs))}"
if cache_key not in _connection_cache:
_connection_cache[cache_key] = connection_func(*args, **kwargs)
return _connection_cache[cache_key]
return wrapper
# Configuration cache
@lru_cache(maxsize=128)
def get_cached_config(key):
return os.environ.get(key)
# Warm-up function
def lambda_handler(event, context):
# Warm-up logic
if event.get('source') == 'aws.cloudwatch':
return {'statusCode': 200, 'body': 'warmed'}
# Normal processing logic
return event
"""),
compatible_runtimes=[lambda_.Runtime.PYTHON_3_9],
description="Runtime caching and optimization utilities"
)
# Outputs
cdk.CfnOutput(self, "PythonDependenciesLayerArn", value=self.python_dependencies_layer.layer_version_arn)
cdk.CfnOutput(self, "DatabaseLayerArn", value=self.database_layer.layer_version_arn)
cdk.CfnOutput(self, "MonitoringLayerArn", value=self.monitoring_layer.layer_version_arn)
cdk.CfnOutput(self, "PerformanceLayerArn", value=self.performance_layer.layer_version_arn)
Database Performance Optimization
Optimized RDS Construct
# constructs/optimized_rds_construct.py
import aws_cdk as cdk
from aws_cdk import (
aws_rds as rds,
aws_ec2 as ec2,
aws_cloudwatch as cloudwatch,
aws_sns as sns,
aws_secretsmanager as secrets
)
from constructs import Construct
from typing import Optional
class OptimizedRDSConstruct(Construct):
"""Optimized RDS database construct"""
def __init__(self, scope: Construct, construct_id: str,
vpc: ec2.Vpc,
engine_type: str = "postgres",
instance_class: str = "db.t3.micro",
multi_az: bool = False,
enable_performance_insights: bool = True,
backup_retention_days: int = 7,
enable_monitoring: bool = True,
**kwargs) -> None:
super().__init__(scope, construct_id, **kwargs)
# Database credentials
self.credentials = rds.DatabaseSecret(
self,
"DatabaseCredentials",
username="dbadmin",
secret_name=f"{construct_id}-db-credentials"
)
# Optimized parameter group
parameter_group = self._create_optimized_parameter_group(engine_type)
# Subnet group
subnet_group = rds.SubnetGroup(
self,
"DatabaseSubnetGroup",
description=f"Subnet group for {construct_id}",
vpc=vpc,
vpc_subnets=ec2.SubnetSelection(
subnet_type=ec2.SubnetType.PRIVATE_ISOLATED
)
)
# Security group
security_group = ec2.SecurityGroup(
self,
"DatabaseSecurityGroup",
vpc=vpc,
description=f"Security group for {construct_id} database",
allow_all_outbound=False
)
# Database instance
self.database = rds.DatabaseInstance(
self,
"DatabaseInstance",
engine=self._get_database_engine(engine_type),
instance_type=ec2.InstanceType(instance_class),
vpc=vpc,
subnet_group=subnet_group,
security_groups=[security_group],
credentials=rds.Credentials.from_secret(self.credentials),
parameter_group=parameter_group,
# Performance optimization
multi_az=multi_az,
performance_insights_enabled=enable_performance_insights,
performance_insights_retention=rds.PerformanceInsightsRetention.DEFAULT,
monitoring_interval=cdk.Duration.seconds(60) if enable_monitoring else None,
enable_performance_insights=enable_performance_insights,
# Storage optimization
storage_type=rds.StorageType.GP2,
allocated_storage=20,
max_allocated_storage=1000, # Auto-scaling
storage_encrypted=True,
# Backup and maintenance
backup_retention=cdk.Duration.days(backup_retention_days),
preferred_backup_window="03:00-04:00",
preferred_maintenance_window="Sun:04:00-Sun:05:00",
delete_automated_backups=True,
deletion_protection=False, # Development environment
# Logs
cloudwatch_logs_exports=self._get_log_exports(engine_type),
# Auto upgrade
auto_minor_version_upgrade=True
)
# Read replica (optional)
if self.node.try_get_context("create_read_replica"):
self.read_replica = rds.DatabaseInstanceReadReplica(
self,
"ReadReplica",
source_database_instance=self.database,
instance_type=ec2.InstanceType(instance_class),
vpc=vpc,
subnet_group=subnet_group,
security_groups=[security_group],
performance_insights_enabled=enable_performance_insights,
monitoring_interval=cdk.Duration.seconds(60) if enable_monitoring else None
)
# Performance monitoring
if enable_monitoring:
self._create_performance_monitoring()
# Connection pooling configuration (RDS Proxy)
if self.node.try_get_context("enable_rds_proxy"):
self._create_rds_proxy(vpc, security_group)
def _get_database_engine(self, engine_type: str):
"""Get database engine configuration"""
engines = {
"postgres": rds.DatabaseInstanceEngine.postgres(
version=rds.PostgresEngineVersion.VER_14_9
),
"mysql": rds.DatabaseInstanceEngine.mysql(
version=rds.MysqlEngineVersion.VER_8_0_35
),
"mariadb": rds.DatabaseInstanceEngine.mariadb(
version=rds.MariaDbEngineVersion.VER_10_6_14
)
}
return engines.get(engine_type, engines["postgres"])
def _create_optimized_parameter_group(self, engine_type: str):
"""Create optimized parameter group"""
if engine_type == "postgres":
return rds.ParameterGroup(
self,
"PostgreSQLParameterGroup",
engine=self._get_database_engine(engine_type),
parameters={
# Connection and memory
"max_connections": "200",
"shared_buffers": "256MB",
"effective_cache_size": "1GB",
"work_mem": "4MB",
"maintenance_work_mem": "64MB",
# Logging and monitoring
"log_statement": "all",
"log_min_duration_statement": "1000", # Log slow queries
"shared_preload_libraries": "pg_stat_statements",
# Checkpoint and WAL
"checkpoint_completion_target": "0.9",
"wal_buffers": "16MB",
"max_wal_size": "1GB",
"min_wal_size": "80MB",
# Query optimization
"random_page_cost": "1.1",
"seq_page_cost": "1.0",
"cpu_tuple_cost": "0.01",
"cpu_index_tuple_cost": "0.005"
}
)
elif engine_type == "mysql":
return rds.ParameterGroup(
self,
"MySQLParameterGroup",
engine=self._get_database_engine(engine_type),
parameters={
"innodb_buffer_pool_size": "{DBInstanceClassMemory*3/4}",
"max_connections": "200",
"query_cache_type": "1",
"query_cache_size": "32M",
"slow_query_log": "1",
"long_query_time": "1",
"innodb_log_file_size": "128M"
}
)
else:
return None
def _get_log_exports(self, engine_type: str):
"""Get log export configuration"""
log_configs = {
"postgres": ["postgresql"],
"mysql": ["error", "general", "slowquery"],
"mariadb": ["error", "general", "slowquery"]
}
return log_configs.get(engine_type, [])
def _create_performance_monitoring(self):
"""Create performance monitoring"""
# CPU utilization alarm
cpu_alarm = cloudwatch.Alarm(
self,
"DatabaseCPUAlarm",
alarm_name=f"{self.database.instance_identifier}-cpu-high",
metric=self.database.metric_cpu_utilization(),
threshold=80,
evaluation_periods=3,
datapoints_to_alarm=2
)
# Database connections alarm
connections_alarm = cloudwatch.Alarm(
self,
"DatabaseConnectionsAlarm",
alarm_name=f"{self.database.instance_identifier}-connections-high",
metric=self.database.metric_database_connections(),
threshold=80, # 80% of max_connections
evaluation_periods=2
)
# Storage space alarm
free_space_alarm = cloudwatch.Alarm(
self,
"DatabaseFreeSpaceAlarm",
alarm_name=f"{self.database.instance_identifier}-free-space-low",
metric=self.database.metric_free_storage_space(),
threshold=1000000000, # 1GB
comparison_operator=cloudwatch.ComparisonOperator.LESS_THAN_THRESHOLD,
evaluation_periods=2
)
# Read latency alarm
read_latency_alarm = cloudwatch.Alarm(
self,
"DatabaseReadLatencyAlarm",
alarm_name=f"{self.database.instance_identifier}-read-latency-high",
metric=self.database.metric_read_latency(),
threshold=0.2, # 200ms
evaluation_periods=3
)
# Write latency alarm
write_latency_alarm = cloudwatch.Alarm(
self,
"DatabaseWriteLatencyAlarm",
alarm_name=f"{self.database.instance_identifier}-write-latency-high",
metric=self.database.metric_write_latency(),
threshold=0.2, # 200ms
evaluation_periods=3
)
def _create_rds_proxy(self, vpc: ec2.Vpc, security_group: ec2.SecurityGroup):
"""Create RDS Proxy connection pool"""
proxy_security_group = ec2.SecurityGroup(
self,
"ProxySecurityGroup",
vpc=vpc,
description="Security group for RDS Proxy"
)
# Allow application access to proxy
proxy_security_group.add_ingress_rule(
peer=security_group,
connection=ec2.Port.tcp(5432) # PostgreSQL port
)
self.proxy = rds.DatabaseProxy(
self,
"DatabaseProxy",
proxy_target=rds.ProxyTarget.from_database(self.database),
secrets=[self.credentials],
vpc=vpc,
security_groups=[proxy_security_group],
# Connection pool configuration
max_connections_percent=100,
max_idle_connections_percent=50,
require_tls=True,
# Authentication
auth=[
rds.AuthFormat.secrets(
secret=self.credentials,
)
],
# Session pinning filters
session_pinning_filters=[
rds.SessionPinningFilter.EXCLUDE_VARIABLE_SETS
]
)
Cost Optimization Strategies
Cost Monitoring Construct
# constructs/cost_optimization_construct.py
import aws_cdk as cdk
from aws_cdk import (
aws_budgets as budgets,
aws_sns as sns,
aws_sns_subscriptions as subscriptions,
aws_lambda as lambda_,
aws_events as events,
aws_events_targets as targets,
aws_iam as iam
)
from constructs import Construct
from typing import List, Dict
class CostOptimizationConstruct(Construct):
"""Cost optimization construct"""
def __init__(self, scope: Construct, construct_id: str,
budget_limit: float,
alert_emails: List[str],
cost_allocation_tags: Dict[str, str] = None,
**kwargs) -> None:
super().__init__(scope, construct_id, **kwargs)
# SNS topic for cost alerts
self.cost_alert_topic = sns.Topic(
self,
"CostAlertTopic",
topic_name="cost-optimization-alerts"
)
# Add email subscriptions
for email in alert_emails:
self.cost_alert_topic.add_subscription(
subscriptions.EmailSubscription(email)
)
# Budget configuration
self._create_budgets(budget_limit)
# Cost anomaly detection
self._create_cost_anomaly_detection()
# Automated cost optimization
self._create_cost_optimization_lambda()
# Resource tagging enforcement
if cost_allocation_tags:
self._create_resource_tagging_lambda(cost_allocation_tags)
def _create_budgets(self, budget_limit: float):
"""Create budgets and alerts"""
# Total cost budget
total_budget = budgets.CfnBudget(
self,
"TotalCostBudget",
budget=budgets.CfnBudget.BudgetDataProperty(
budget_name="total-monthly-budget",
budget_limit=budgets.CfnBudget.SpendProperty(
amount=budget_limit,
unit="USD"
),
time_unit="MONTHLY",
budget_type="COST",
cost_filters=budgets.CfnBudget.CostFiltersProperty(
# Can add specific filters
)
),
notifications_with_subscribers=[
budgets.CfnBudget.NotificationWithSubscribersProperty(
notification=budgets.CfnBudget.NotificationProperty(
notification_type="ACTUAL",
comparison_operator="GREATER_THAN",
threshold=80, # 80% threshold
threshold_type="PERCENTAGE"
),
subscribers=[
budgets.CfnBudget.SubscriberProperty(
subscription_type="EMAIL",
address=email
) for email in ["admin@example.com"] # Replace with actual email
]
),
budgets.CfnBudget.NotificationWithSubscribersProperty(
notification=budgets.CfnBudget.NotificationProperty(
notification_type="FORECASTED",
comparison_operator="GREATER_THAN",
threshold=100, # 100% forecast threshold
threshold_type="PERCENTAGE"
),
subscribers=[
budgets.CfnBudget.SubscriberProperty(
subscription_type="EMAIL",
address=email
) for email in ["admin@example.com"]
]
)
]
)
# Service-level budgets
services = ["AmazonEC2", "AmazonRDS", "AWSLambda", "AmazonS3"]
for service in services:
service_budget = budgets.CfnBudget(
self,
f"{service}Budget",
budget=budgets.CfnBudget.BudgetDataProperty(
budget_name=f"{service.lower()}-monthly-budget",
budget_limit=budgets.CfnBudget.SpendProperty(
amount=budget_limit * 0.3, # 30% of total budget per service
unit="USD"
),
time_unit="MONTHLY",
budget_type="COST",
cost_filters=budgets.CfnBudget.CostFiltersProperty(
services=[service]
)
),
notifications_with_subscribers=[
budgets.CfnBudget.NotificationWithSubscribersProperty(
notification=budgets.CfnBudget.NotificationProperty(
notification_type="ACTUAL",
comparison_operator="GREATER_THAN",
threshold=90,
threshold_type="PERCENTAGE"
),
subscribers=[
budgets.CfnBudget.SubscriberProperty(
subscription_type="SNS",
address=self.cost_alert_topic.topic_arn
)
]
)
]
)
def _create_cost_anomaly_detection(self):
"""Create cost anomaly detection"""
from aws_cdk import aws_ce as ce
# Cost anomaly detector
anomaly_detector = ce.CfnAnomalyDetector(
self,
"CostAnomalyDetector",
anomaly_detector_name="cost-anomaly-detector",
monitor_type="DIMENSIONAL",
monitor_specification=ce.CfnAnomalyDetector.MonitorSpecificationProperty(
dimension="SERVICE",
match_options=["EQUALS"],
values=["EC2-Instance", "Lambda", "RDS"]
)
)
# Anomaly detection subscription
ce.CfnAnomalySubscription(
self,
"CostAnomalySubscription",
subscription_name="cost-anomaly-alerts",
frequency="DAILY",
monitor_arn_list=[anomaly_detector.attr_anomaly_detector_arn],
subscribers=[
ce.CfnAnomalySubscription.SubscriberProperty(
type="EMAIL",
address="admin@example.com" # Replace with actual email
),
ce.CfnAnomalySubscription.SubscriberProperty(
type="SNS",
address=self.cost_alert_topic.topic_arn
)
],
threshold_expression=ce.CfnAnomalySubscription.ThresholdExpressionProperty(
and_=[
ce.CfnAnomalySubscription.ThresholdExpressionProperty(
dimension=ce.CfnAnomalySubscription.DimensionProperty(
key="ANOMALY_TOTAL_IMPACT_ABSOLUTE",
values=["100"] # $100 threshold
)
)
]
)
)
def _create_cost_optimization_lambda(self):
"""Create automated cost optimization Lambda"""
self.cost_optimization_function = lambda_.Function(
self,
"CostOptimizationFunction",
runtime=lambda_.Runtime.PYTHON_3_9,
handler="cost_optimizer.handler",
code=lambda_.Code.from_inline("""
import boto3
import json
import logging
from datetime import datetime, timedelta
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def handler(event, context):
ec2 = boto3.client('ec2')
rds = boto3.client('rds')
cloudwatch = boto3.client('cloudwatch')
sns = boto3.client('sns')
optimization_actions = []
try:
# Check for unused EBS volumes
unused_volumes = find_unused_ebs_volumes(ec2)
optimization_actions.extend(unused_volumes)
# Check for idle RDS instances
idle_rds_instances = find_idle_rds_instances(rds, cloudwatch)
optimization_actions.extend(idle_rds_instances)
# Check for unused Elastic IPs
unused_eips = find_unused_elastic_ips(ec2)
optimization_actions.extend(unused_eips)
# Generate report
if optimization_actions:
report = generate_optimization_report(optimization_actions)
# Send notification
sns.publish(
TopicArn=os.environ['COST_ALERT_TOPIC_ARN'],
Subject='Cost Optimization Recommendations',
Message=report
)
return {
'statusCode': 200,
'body': json.dumps({
'message': 'Cost optimization check completed',
'actions_found': len(optimization_actions)
})
}
except Exception as e:
logger.error(f'Cost optimization error: {str(e)}')
return {
'statusCode': 500,
'body': json.dumps({'error': str(e)})
}
def find_unused_ebs_volumes(ec2):
volumes = ec2.describe_volumes(
Filters=[
{'Name': 'state', 'Values': ['available']}
]
)
unused_volumes = []
for volume in volumes['Volumes']:
unused_volumes.append({
'type': 'unused_ebs_volume',
'resource_id': volume['VolumeId'],
'size': volume['Size'],
'cost_estimate': volume['Size'] * 0.10 # $0.10 per GB per month
})
return unused_volumes
def find_idle_rds_instances(rds, cloudwatch):
instances = rds.describe_db_instances()
idle_instances = []
for instance in instances['DBInstances']:
if instance['DBInstanceStatus'] == 'available':
# Check CPU usage for the last 7 days
end_time = datetime.utcnow()
start_time = end_time - timedelta(days=7)
cpu_metrics = cloudwatch.get_metric_statistics(
Namespace='AWS/RDS',
MetricName='CPUUtilization',
Dimensions=[
{'Name': 'DBInstanceIdentifier', 'Value': instance['DBInstanceIdentifier']}
],
StartTime=start_time,
EndTime=end_time,
Period=86400, # 1 day
Statistics=['Average']
)
if cpu_metrics['Datapoints']:
avg_cpu = sum(dp['Average'] for dp in cpu_metrics['Datapoints']) / len(cpu_metrics['Datapoints'])
if avg_cpu < 5: # CPU usage less than 5%
idle_instances.append({
'type': 'idle_rds_instance',
'resource_id': instance['DBInstanceIdentifier'],
'instance_class': instance['DBInstanceClass'],
'avg_cpu': avg_cpu
})
return idle_instances
def find_unused_elastic_ips(ec2):
addresses = ec2.describe_addresses()
unused_eips = []
for address in addresses['Addresses']:
if 'InstanceId' not in address and 'NetworkInterfaceId' not in address:
unused_eips.append({
'type': 'unused_elastic_ip',
'resource_id': address['PublicIp'],
'allocation_id': address['AllocationId'],
'cost_estimate': 3.65 # $0.005 per hour * 24 * 30.5
})
return unused_eips
def generate_optimization_report(actions):
total_potential_savings = 0
report_lines = ["Cost Optimization Report", "=" * 30, ""]
for action in actions:
if action['type'] == 'unused_ebs_volume':
report_lines.append(f"Unused EBS Volume: {action['resource_id']}")
report_lines.append(f" - Size: {action['size']} GB")
report_lines.append(f" - Estimated monthly cost: ${action['cost_estimate']:.2f}")
total_potential_savings += action['cost_estimate']
elif action['type'] == 'idle_rds_instance':
report_lines.append(f"Idle RDS Instance: {action['resource_id']}")
report_lines.append(f" - Instance class: {action['instance_class']}")
report_lines.append(f" - Average CPU usage: {action['avg_cpu']:.2f}%")
elif action['type'] == 'unused_elastic_ip':
report_lines.append(f"Unused Elastic IP: {action['resource_id']}")
report_lines.append(f" - Estimated monthly cost: ${action['cost_estimate']:.2f}")
total_potential_savings += action['cost_estimate']
report_lines.append("")
report_lines.append(f"Total potential savings: ${total_potential_savings:.2f}/month")
return "\\n".join(report_lines)
"""),
timeout=cdk.Duration.minutes(5),
environment={
"COST_ALERT_TOPIC_ARN": self.cost_alert_topic.topic_arn
}
)
# Add necessary permissions
self.cost_optimization_function.add_to_role_policy(
iam.PolicyStatement(
effect=iam.Effect.ALLOW,
actions=[
"ec2:DescribeVolumes",
"ec2:DescribeAddresses",
"rds:DescribeDBInstances",
"cloudwatch:GetMetricStatistics",
"sns:Publish"
],
resources=["*"]
)
)
# Schedule cost optimization checks
events.Rule(
self,
"CostOptimizationSchedule",
schedule=events.Schedule.rate(cdk.Duration.days(1)),
targets=[targets.LambdaFunction(self.cost_optimization_function)]
)
def _create_resource_tagging_lambda(self, cost_allocation_tags: Dict[str, str]):
"""Create resource tagging Lambda"""
self.tagging_function = lambda_.Function(
self,
"ResourceTaggingFunction",
runtime=lambda_.Runtime.PYTHON_3_9,
handler="resource_tagger.handler",
code=lambda_.Code.from_inline(f"""
import boto3
import json
import logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
REQUIRED_TAGS = {json.dumps(cost_allocation_tags)}
def handler(event, context):
# Get resources that need tagging
ec2 = boto3.client('ec2')
rds = boto3.client('rds')
lambda_client = boto3.client('lambda')
s3 = boto3.client('s3')
try:
# Tag EC2 instances
tag_ec2_resources(ec2)
# Tag RDS instances
tag_rds_resources(rds)
# Tag Lambda functions
tag_lambda_functions(lambda_client)
# Tag S3 buckets
tag_s3_buckets(s3)
return {{
'statusCode': 200,
'body': json.dumps('Resource tagging completed successfully')
}}
except Exception as e:
logger.error(f'Resource tagging error: {{str(e)}}')
return {{
'statusCode': 500,
'body': json.dumps({{'error': str(e)}})
}}
def tag_ec2_resources(ec2):
instances = ec2.describe_instances()
required_tags = json.loads(REQUIRED_TAGS)
for reservation in instances['Reservations']:
for instance in reservation['Instances']:
instance_id = instance['InstanceId']
existing_tags = {{tag['Key']: tag['Value'] for tag in instance.get('Tags', [])}}
tags_to_add = []
for key, value in required_tags.items():
if key not in existing_tags:
tags_to_add.append({{'Key': key, 'Value': value}})
if tags_to_add:
ec2.create_tags(Resources=[instance_id], Tags=tags_to_add)
logger.info(f'Tagged EC2 instance {{instance_id}} with {{len(tags_to_add)}} tags')
def tag_rds_resources(rds):
instances = rds.describe_db_instances()
required_tags = json.loads(REQUIRED_TAGS)
for instance in instances['DBInstances']:
instance_arn = instance['DBInstanceArn']
try:
existing_tags = rds.list_tags_for_resource(ResourceName=instance_arn)
existing_tag_keys = {{tag['Key'] for tag in existing_tags['TagList']}}
tags_to_add = []
for key, value in required_tags.items():
if key not in existing_tag_keys:
tags_to_add.append({{'Key': key, 'Value': value}})
if tags_to_add:
rds.add_tags_to_resource(ResourceName=instance_arn, Tags=tags_to_add)
logger.info(f'Tagged RDS instance {{instance["DBInstanceIdentifier"]}} with {{len(tags_to_add)}} tags')
except Exception as e:
logger.error(f'Error tagging RDS instance {{instance["DBInstanceIdentifier"]}}: {{e}}')
def tag_lambda_functions(lambda_client):
functions = lambda_client.list_functions()
required_tags = json.loads(REQUIRED_TAGS)
for function in functions['Functions']:
function_arn = function['FunctionArn']
try:
existing_tags = lambda_client.list_tags(Resource=function_arn)
tags_to_add = {{}}
for key, value in required_tags.items():
if key not in existing_tags['Tags']:
tags_to_add[key] = value
if tags_to_add:
lambda_client.tag_resource(Resource=function_arn, Tags=tags_to_add)
logger.info(f'Tagged Lambda function {{function["FunctionName"]}} with {{len(tags_to_add)}} tags')
except Exception as e:
logger.error(f'Error tagging Lambda function {{function["FunctionName"]}}: {{e}}')
def tag_s3_buckets(s3):
buckets = s3.list_buckets()
required_tags = json.loads(REQUIRED_TAGS)
for bucket in buckets['Buckets']:
bucket_name = bucket['Name']
try:
try:
existing_tags = s3.get_bucket_tagging(Bucket=bucket_name)
existing_tag_keys = {{tag['Key'] for tag in existing_tags['TagSet']}}
except s3.exceptions.ClientError:
existing_tag_keys = set()
tags_to_add = []
for key, value in required_tags.items():
if key not in existing_tag_keys:
tags_to_add.append({{'Key': key, 'Value': value}})
if tags_to_add:
all_tags = list(existing_tags.get('TagSet', [])) + tags_to_add
s3.put_bucket_tagging(
Bucket=bucket_name,
Tagging={{'TagSet': all_tags}}
)
logger.info(f'Tagged S3 bucket {{bucket_name}} with {{len(tags_to_add)}} tags')
except Exception as e:
logger.error(f'Error tagging S3 bucket {{bucket_name}}: {{e}}')
"""),
timeout=cdk.Duration.minutes(10)
)
# Add necessary permissions
self.tagging_function.add_to_role_policy(
iam.PolicyStatement(
effect=iam.Effect.ALLOW,
actions=[
"ec2:DescribeInstances",
"ec2:CreateTags",
"rds:DescribeDBInstances",
"rds:ListTagsForResource",
"rds:AddTagsToResource",
"lambda:ListFunctions",
"lambda:ListTags",
"lambda:TagResource",
"s3:ListAllMyBuckets",
"s3:GetBucketTagging",
"s3:PutBucketTagging"
],
resources=["*"]
)
)
# Schedule resource tagging
events.Rule(
self,
"ResourceTaggingSchedule",
schedule=events.Schedule.rate(cdk.Duration.hours(6)),
targets=[targets.LambdaFunction(self.tagging_function)]
)
Auto Scaling and Resource Optimization
Intelligent Scaling Construct
# constructs/intelligent_scaling_construct.py
import aws_cdk as cdk
from aws_cdk import (
aws_autoscaling as autoscaling,
aws_ec2 as ec2,
aws_cloudwatch as cloudwatch,
aws_applicationautoscaling as app_autoscaling,
aws_lambda as lambda_,
aws_iam as iam
)
from constructs import Construct
from typing import List, Dict
class IntelligentScalingConstruct(Construct):
"""Intelligent scaling construct"""
def __init__(self, scope: Construct, construct_id: str,
vpc: ec2.Vpc,
**kwargs) -> None:
super().__init__(scope, construct_id, **kwargs)
# Create predictive scaling Lambda
self.predictive_scaling_function = self._create_predictive_scaling_lambda()
# Create cost-aware scaling policies
self._create_cost_aware_scaling_policies()
# Create business metric-based scaling
self._create_business_metric_scaling()
def create_optimized_auto_scaling_group(self,
instance_type: str = "t3.medium",
min_capacity: int = 1,
max_capacity: int = 10,
target_cpu: int = 60) -> autoscaling.AutoScalingGroup:
"""Create optimized Auto Scaling Group"""
# Launch template
launch_template = ec2.LaunchTemplate(
self,
"OptimizedLaunchTemplate",
instance_type=ec2.InstanceType(instance_type),
machine_image=ec2.AmazonLinuxImage(
generation=ec2.AmazonLinuxGeneration.AMAZON_LINUX_2
),
user_data=ec2.UserData.for_linux(),
# Performance optimization
nitro_enclave_enabled=False,
hibernation_configured=False,
# Security configuration
security_group=self._create_optimized_security_group(),
# Storage optimization
block_devices=[
ec2.BlockDevice(
device_name="/dev/xvda",
volume=ec2.BlockDeviceVolume.ebs(
volume_size=20,
volume_type=ec2.EbsDeviceVolumeType.GP3, # GP3 is cheaper
encrypted=True,
delete_on_termination=True
)
)
]
)
# Auto Scaling Group
asg = autoscaling.AutoScalingGroup(
self,
"OptimizedASG",
vpc=self.vpc,
launch_template=launch_template,
min_capacity=min_capacity,
max_capacity=max_capacity,
desired_capacity=min_capacity,
# Mixed instances policy (cost optimization)
mixed_instances_policy=autoscaling.MixedInstancesPolicy(
launch_template=launch_template,
instances_distribution=autoscaling.InstancesDistribution(
on_demand_base_capacity=1, # Keep at least 1 on-demand instance
on_demand_percentage_above_base_capacity=25, # 25% on-demand, 75% Spot
spot_allocation_strategy=autoscaling.SpotAllocationStrategy.DIVERSIFIED
),
launch_template_overrides=[
# Provide multiple instance type options
autoscaling.LaunchTemplateOverrides(instance_type=ec2.InstanceType("t3.medium")),
autoscaling.LaunchTemplateOverrides(instance_type=ec2.InstanceType("t3a.medium")),
autoscaling.LaunchTemplateOverrides(instance_type=ec2.InstanceType("t2.medium")),
autoscaling.LaunchTemplateOverrides(instance_type=ec2.InstanceType("m5.large")),
]
),
# Health check
health_check=autoscaling.HealthCheck.elb(grace_period=cdk.Duration.minutes(5)),
# Update policy
update_policy=autoscaling.UpdatePolicy.rolling_update(
min_instances_in_service=1,
max_batch_size=2,
pause_time=cdk.Duration.minutes(5)
),
# Termination policy - prioritize terminating oldest instances
termination_policies=[autoscaling.TerminationPolicy.OLDEST_INSTANCE]
)
# Multi-metric scaling policy
asg.scale_on_cpu_utilization(
"CPUScaling",
target_utilization_percent=target_cpu,
scale_in_cooldown=cdk.Duration.minutes(5),
scale_out_cooldown=cdk.Duration.minutes(2)
)
# Memory utilization based scaling
memory_metric = cloudwatch.Metric(
namespace="CWAgent",
metric_name="mem_used_percent",
dimensions_map={"AutoScalingGroupName": asg.auto_scaling_group_name}
)
asg.scale_on_metric(
"MemoryScaling",
metric=memory_metric,
scaling_steps=[
{"lower": 0, "upper": 60, "change": 0},
{"lower": 60, "upper": 80, "change": +1},
{"lower": 80, "upper": 90, "change": +2},
{"lower": 90, "change": +3}
],
adjustment_type=autoscaling.AdjustmentType.CHANGE_IN_CAPACITY,
cooldown=cdk.Duration.minutes(3)
)
# Predictive scaling
if self.node.try_get_context("enable_predictive_scaling"):
self._setup_predictive_scaling(asg)
return asg
def _create_optimized_security_group(self) -> ec2.SecurityGroup:
"""Create optimized security group"""
sg = ec2.SecurityGroup(
self,
"OptimizedSecurityGroup",
vpc=self.vpc,
description="Optimized security group with minimal required access",
allow_all_outbound=False
)
# Only allow necessary outbound traffic
sg.add_egress_rule(
peer=ec2.Peer.any_ipv4(),
connection=ec2.Port.tcp(80),
description="HTTP outbound"
)
sg.add_egress_rule(
peer=ec2.Peer.any_ipv4(),
connection=ec2.Port.tcp(443),
description="HTTPS outbound"
)
return sg
def _create_predictive_scaling_lambda(self) -> lambda_.Function:
"""Create predictive scaling Lambda"""
function = lambda_.Function(
self,
"PredictiveScalingFunction",
runtime=lambda_.Runtime.PYTHON_3_9,
handler="predictive_scaling.handler",
code=lambda_.Code.from_inline("""
import boto3
import json
import logging
from datetime import datetime, timedelta
import math
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def handler(event, context):
cloudwatch = boto3.client('cloudwatch')
autoscaling = boto3.client('autoscaling')
try:
# Get historical data
end_time = datetime.utcnow()
start_time = end_time - timedelta(days=7) # Analyze past 7 days
# Get CPU utilization data
cpu_data = cloudwatch.get_metric_statistics(
Namespace='AWS/EC2',
MetricName='CPUUtilization',
Dimensions=[
{'Name': 'AutoScalingGroupName', 'Value': event['asg_name']}
],
StartTime=start_time,
EndTime=end_time,
Period=3600, # 1 hour periods
Statistics=['Average']
)
# Simple prediction algorithm: based on historical average and trend
if len(cpu_data['Datapoints']) >= 24: # Need at least 24 hours of data
sorted_data = sorted(cpu_data['Datapoints'], key=lambda x: x['Timestamp'])
recent_values = [dp['Average'] for dp in sorted_data[-24:]] # Last 24 hours
# Calculate trend
avg_cpu = sum(recent_values) / len(recent_values)
trend = calculate_trend(recent_values)
# Predict CPU usage for next hour
predicted_cpu = avg_cpu + trend
# Calculate recommended instance count based on prediction
current_capacity = get_current_capacity(autoscaling, event['asg_name'])
recommended_capacity = calculate_recommended_capacity(predicted_cpu, current_capacity)
# Adjust capacity if needed
if recommended_capacity != current_capacity:
logger.info(f'Recommending capacity change: {current_capacity} -> {recommended_capacity}')
# Can implement auto-adjustment logic here, or just send notification
if event.get('auto_adjust', False):
adjust_capacity(autoscaling, event['asg_name'], recommended_capacity)
return {
'statusCode': 200,
'body': json.dumps('Predictive scaling analysis completed')
}
except Exception as e:
logger.error(f'Predictive scaling error: {str(e)}')
return {
'statusCode': 500,
'body': json.dumps({'error': str(e)})
}
def calculate_trend(values):
n = len(values)
if n < 2:
return 0
# Simple linear trend calculation
x_sum = sum(range(n))
y_sum = sum(values)
xy_sum = sum(i * values[i] for i in range(n))
x_sq_sum = sum(i * i for i in range(n))
slope = (n * xy_sum - x_sum * y_sum) / (n * x_sq_sum - x_sum * x_sum)
return slope
def get_current_capacity(autoscaling, asg_name):
response = autoscaling.describe_auto_scaling_groups(
AutoScalingGroupNames=[asg_name]
)
return response['AutoScalingGroups'][0]['DesiredCapacity']
def calculate_recommended_capacity(predicted_cpu, current_capacity):
# Simple capacity calculation logic
if predicted_cpu > 80:
return min(current_capacity + 2, 10) # Scale up to max 10 instances
elif predicted_cpu > 60:
return min(current_capacity + 1, 10)
elif predicted_cpu < 30:
return max(current_capacity - 1, 1) # Keep at least 1 instance
else:
return current_capacity
def adjust_capacity(autoscaling, asg_name, new_capacity):
autoscaling.set_desired_capacity(
AutoScalingGroupName=asg_name,
DesiredCapacity=new_capacity,
HonorCooldown=True
)
"""),
timeout=cdk.Duration.minutes(5)
)
function.add_to_role_policy(
iam.PolicyStatement(
effect=iam.Effect.ALLOW,
actions=[
"cloudwatch:GetMetricStatistics",
"autoscaling:DescribeAutoScalingGroups",
"autoscaling:SetDesiredCapacity"
],
resources=["*"]
)
)
return function
def _create_cost_aware_scaling_policies(self):
"""Create cost-aware scaling policies"""
# Cost-aware scaling Lambda
cost_aware_function = lambda_.Function(
self,
"CostAwareScalingFunction",
runtime=lambda_.Runtime.PYTHON_3_9,
handler="cost_aware_scaling.handler",
code=lambda_.Code.from_inline("""
import boto3
import json
import logging
from datetime import datetime, timedelta
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def handler(event, context):
# Get current Spot instance prices
ec2 = boto3.client('ec2')
autoscaling = boto3.client('autoscaling')
try:
# Get Spot price history
spot_prices = ec2.describe_spot_price_history(
InstanceTypes=['t3.medium', 't3a.medium', 'm5.large'],
ProductDescriptions=['Linux/UNIX'],
MaxResults=10,
StartTime=datetime.utcnow() - timedelta(hours=1)
)
# Choose the cheapest instance type
cheapest_instance = min(spot_prices['SpotPriceHistory'],
key=lambda x: float(x['SpotPrice']))
logger.info(f'Cheapest Spot instance: {cheapest_instance["InstanceType"]} at ${cheapest_instance["SpotPrice"]}')
# Adjust scaling policy based on cost
# If Spot price is very low, can scale more aggressively
spot_price = float(cheapest_instance['SpotPrice'])
# Dynamically adjust scaling threshold
if spot_price < 0.02: # Very cheap
cpu_threshold = 50 # Lower CPU threshold, scale earlier
elif spot_price < 0.05: # Medium price
cpu_threshold = 70
else: # Expensive
cpu_threshold = 85 # Higher threshold, reduce scaling
# Can dynamically update Auto Scaling policy here
# Actual implementation requires more complex logic
return {
'statusCode': 200,
'body': json.dumps({
'cheapest_instance': cheapest_instance['InstanceType'],
'spot_price': spot_price,
'recommended_cpu_threshold': cpu_threshold
})
}
except Exception as e:
logger.error(f'Cost-aware scaling error: {str(e)}')
return {
'statusCode': 500,
'body': json.dumps({'error': str(e)})
}
"""),
timeout=cdk.Duration.minutes(3)
)
cost_aware_function.add_to_role_policy(
iam.PolicyStatement(
effect=iam.Effect.ALLOW,
actions=[
"ec2:DescribeSpotPriceHistory",
"autoscaling:PutScalingPolicy"
],
resources=["*"]
)
)
def _create_business_metric_scaling(self):
"""Business metric-based scaling"""
# Custom business metric
business_metric = cloudwatch.Metric(
namespace="CustomApp/Business",
metric_name="ActiveUsers",
statistic="Average"
)
# This can be used by external ASG
self.business_scaling_metric = business_metric
def _setup_predictive_scaling(self, asg: autoscaling.AutoScalingGroup):
"""Setup predictive scaling"""
# Create scheduled rule to invoke predictive scaling
from aws_cdk import aws_events as events
from aws_cdk import aws_events_targets as targets
events.Rule(
self,
"PredictiveScalingSchedule",
schedule=events.Schedule.rate(cdk.Duration.hours(1)),
targets=[
targets.LambdaFunction(
self.predictive_scaling_function,
event=events.RuleTargetInput.from_object({
"asg_name": asg.auto_scaling_group_name,
"auto_adjust": True
})
)
]
)
Performance and Cost Optimization Best Practices Summary
- Continuous Monitoring: Establish comprehensive performance and cost monitoring systems
- Resource Right-sizing: Choose appropriate instance types and sizes based on actual needs
- Elastic Scaling: Use auto-scaling to reduce resource waste
- Spot Instances: Reasonably use Spot instances to reduce compute costs
- Reserved Instances: Use Reserved Instances for stable workloads
- Data Lifecycle: Configure appropriate lifecycle policies for data storage
- Caching Strategy: Use CDN and caching to reduce repetitive calculations
- Budget Control: Set budgets and alerts to prevent unexpected expenses
- Resource Tagging: Use tags for cost allocation and tracking
- Regular Optimization: Regularly review and optimize resource configurations
Through this chapter, you should be able to design and implement comprehensive performance optimization and cost control strategies, achieving efficient and economical cloud infrastructure management.