Chapter 9: Lambda Best Practices and Performance Optimization
Haiyue
39min
Chapter 9: Lambda Best Practices and Performance Optimization
Chapter Overview
This chapter will delve into performance optimization techniques and production environment best practices for AWS Lambda. We will learn how to optimize cold start times, memory usage, concurrency handling, and how to build high-performance, scalable serverless applications.
Learning Objectives
- Master Lambda cold start optimization techniques
- Learn memory and CPU performance tuning
- Understand concurrency control and reserved concurrency strategies
- Master code optimization and dependency management best practices
- Learn to use monitoring and debugging tools
- Understand cost optimization strategies
9.1 Cold Start Optimization
9.1.1 Understanding the Cold Start Process
🔄 正在渲染 Mermaid 图表...
9.1.2 Cold Start Optimization Strategies
# lambda_functions/optimized_cold_start/index.py
import json
import os
import logging
from typing import Dict, Any, Optional
from datetime import datetime
# Global variable optimization - initialize outside the function
logger = logging.getLogger()
logger.setLevel(os.getenv('LOG_LEVEL', 'INFO'))
# Lazy import and connection pooling
_db_connection = None
_redis_client = None
_s3_client = None
def get_db_connection():
"""Lazy initialize database connection"""
global _db_connection
if _db_connection is None:
import psycopg2
from psycopg2 import pool
_db_connection = psycopg2.pool.SimpleConnectionPool(
1, 20, # min and max connections
host=os.environ['DB_HOST'],
database=os.environ['DB_NAME'],
user=os.environ['DB_USER'],
password=os.environ['DB_PASSWORD']
)
return _db_connection
def get_redis_client():
"""Lazy initialize Redis client"""
global _redis_client
if _redis_client is None:
import redis
_redis_client = redis.Redis(
host=os.environ['REDIS_HOST'],
port=int(os.environ.get('REDIS_PORT', 6379)),
decode_responses=True,
connection_pool_max_connections=10
)
return _redis_client
def get_s3_client():
"""Lazy initialize S3 client"""
global _s3_client
if _s3_client is None:
import boto3
_s3_client = boto3.client('s3')
return _s3_client
class OptimizedLambdaHandler:
"""Optimized Lambda handler"""
def __init__(self):
self.initialization_time = datetime.utcnow()
self.request_count = 0
# Pre-compile regular expressions
import re
self.email_pattern = re.compile(r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$')
self.phone_pattern = re.compile(r'^\+?1?[0-9]{10,14}$')
# Pre-load configuration
self.config = self._load_configuration()
def _load_configuration(self) -> Dict[str, Any]:
"""Load configuration"""
return {
'max_retry_attempts': int(os.getenv('MAX_RETRY_ATTEMPTS', 3)),
'timeout_seconds': int(os.getenv('TIMEOUT_SECONDS', 30)),
'batch_size': int(os.getenv('BATCH_SIZE', 100)),
'cache_ttl': int(os.getenv('CACHE_TTL', 300))
}
def handle_request(self, event: Dict[str, Any], context) -> Dict[str, Any]:
"""Handle request"""
self.request_count += 1
start_time = datetime.utcnow()
try:
# Record container reuse information
is_cold_start = self.request_count == 1
logger.info(f"Request #{self.request_count}, Cold start: {is_cold_start}")
# Route handling
action = event.get('action', 'default')
if action == 'validate_user':
result = self._validate_user_data(event.get('data', {}))
elif action == 'process_order':
result = self._process_order(event.get('data', {}))
elif action == 'fetch_data':
result = self._fetch_cached_data(event.get('key'))
else:
result = {'error': f'Unknown action: {action}'}
# Record processing time
processing_time = (datetime.utcnow() - start_time).total_seconds()
return {
'statusCode': 200,
'body': json.dumps({
'success': True,
'result': result,
'metadata': {
'cold_start': is_cold_start,
'request_count': self.request_count,
'processing_time_seconds': processing_time,
'remaining_time_ms': context.get_remaining_time_in_millis()
}
}, default=str)
}
except Exception as e:
logger.error(f"Error processing request: {str(e)}")
return {
'statusCode': 500,
'body': json.dumps({
'success': False,
'error': str(e)
})
}
def _validate_user_data(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""Validate user data (using pre-compiled regular expressions)"""
email = data.get('email', '')
phone = data.get('phone', '')
validation_result = {
'email_valid': bool(self.email_pattern.match(email)),
'phone_valid': bool(self.phone_pattern.match(phone)),
'required_fields_present': all(
data.get(field) for field in ['name', 'email']
)
}
validation_result['overall_valid'] = all(validation_result.values())
return validation_result
def _process_order(self, order_data: Dict[str, Any]) -> Dict[str, Any]:
"""Process order (using connection pool)"""
try:
db_pool = get_db_connection()
connection = db_pool.getconn()
with connection.cursor() as cursor:
# Simulate database operation
cursor.execute(
"INSERT INTO orders (user_id, amount, status) VALUES (%s, %s, %s) RETURNING id",
(order_data.get('user_id'), order_data.get('amount'), 'pending')
)
order_id = cursor.fetchone()[0]
connection.commit()
db_pool.putconn(connection)
return {
'order_id': order_id,
'status': 'created',
'amount': order_data.get('amount')
}
except Exception as e:
logger.error(f"Database error: {str(e)}")
raise e
def _fetch_cached_data(self, key: str) -> Dict[str, Any]:
"""Fetch cached data"""
try:
redis_client = get_redis_client()
# Try to get from cache
cached_data = redis_client.get(key)
if cached_data:
return {
'data': json.loads(cached_data),
'source': 'cache',
'cache_hit': True
}
# Cache miss, fetch from database
data = self._fetch_from_database(key)
# Update cache
redis_client.setex(
key,
self.config['cache_ttl'],
json.dumps(data, default=str)
)
return {
'data': data,
'source': 'database',
'cache_hit': False
}
except Exception as e:
logger.error(f"Cache error: {str(e)}")
# Fall back to database query
return {
'data': self._fetch_from_database(key),
'source': 'database_fallback',
'cache_hit': False
}
def _fetch_from_database(self, key: str) -> Dict[str, Any]:
"""Fetch data from database"""
# Simulate database query
return {
'id': key,
'value': f'data_for_{key}',
'timestamp': datetime.utcnow().isoformat()
}
# Global handler instance (initialized once during cold start)
handler_instance = OptimizedLambdaHandler()
def handler(event: Dict[str, Any], context) -> Dict[str, Any]:
"""Lambda entry point"""
return handler_instance.handle_request(event, context)
9.1.3 Provisioned Concurrency Configuration
# stacks/performance_optimized_stack.py
from aws_cdk import (
Stack,
aws_lambda as _lambda,
aws_applicationautoscaling as appscaling,
Duration
)
class PerformanceOptimizedStack(Stack):
def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
super().__init__(scope, construct_id, **kwargs)
# High-performance Lambda function
self.optimized_function = _lambda.Function(
self, "OptimizedFunction",
runtime=_lambda.Runtime.PYTHON_3_9,
handler="index.handler",
code=_lambda.Code.from_asset("lambda_functions/optimized_cold_start"),
memory_size=1024, # Higher memory = more CPU
timeout=Duration.seconds(30),
environment={
'LOG_LEVEL': 'INFO',
'MAX_RETRY_ATTEMPTS': '3',
'CACHE_TTL': '300'
},
# Reserved concurrency to avoid cold starts
reserved_concurrent_executions=20
)
# Create version and alias
version = self.optimized_function.current_version
# Production alias
self.prod_alias = _lambda.Alias(
self, "ProdAlias",
alias_name="PROD",
version=version,
# Configure provisioned concurrency
provisioned_concurrency_config=_lambda.ProvisionedConcurrencyConfig(
provisioned_concurrent_executions=5
)
)
# Configure auto-scaling
scalable_target = appscaling.ScalableTarget(
self, "ScalableTarget",
service_namespace=appscaling.ServiceNamespace.LAMBDA,
scalable_dimension="lambda:function:ProvisionedConcurrency",
resource_id=f"function:{self.optimized_function.function_name}:PROD",
min_capacity=5,
max_capacity=50
)
# Utilization-based auto-scaling
scalable_target.scale_to_track_metric(
"ProvisionedConcurrencyTracking",
target_value=0.7, # 70% utilization
metric=appscaling.predefined_metric_specification(
appscaling.PredefinedMetric.LAMBDA_PROVISIONED_CONCURRENCY_UTILIZATION
),
scale_in_cooldown=Duration.minutes(2),
scale_out_cooldown=Duration.minutes(1)
)
# Invocation-based auto-scaling
scalable_target.scale_on_metric(
"InvocationRateScaling",
metric=self.optimized_function.metric_invocations(
period=Duration.minutes(1)
),
scaling_steps=[
appscaling.ScalingInterval(upper=10, change=0),
appscaling.ScalingInterval(lower=10, upper=50, change=+2),
appscaling.ScalingInterval(lower=50, upper=100, change=+5),
appscaling.ScalingInterval(lower=100, change=+10)
]
)
9.2 Memory and CPU Optimization
9.2.1 Memory Configuration Best Practices
# tools/memory_profiler.py
import json
import psutil
import os
import logging
from typing import Dict, Any, List
from datetime import datetime
import tracemalloc
class MemoryProfiler:
"""Memory performance analyzer"""
def __init__(self):
self.start_memory = None
self.peak_memory = 0
self.memory_snapshots = []
def start_profiling(self):
"""Start memory profiling"""
tracemalloc.start()
self.start_memory = self.get_current_memory()
self.memory_snapshots = []
def take_snapshot(self, label: str = ""):
"""Take memory snapshot"""
current_memory = self.get_current_memory()
self.peak_memory = max(self.peak_memory, current_memory)
snapshot = {
'label': label,
'timestamp': datetime.utcnow().isoformat(),
'memory_mb': current_memory,
'memory_increase_mb': current_memory - self.start_memory if self.start_memory else 0
}
self.memory_snapshots.append(snapshot)
return snapshot
def get_current_memory(self) -> float:
"""Get current memory usage (MB)"""
process = psutil.Process(os.getpid())
memory_info = process.memory_info()
return memory_info.rss / 1024 / 1024 # Convert to MB
def get_memory_report(self) -> Dict[str, Any]:
"""Generate memory report"""
current_memory = self.get_current_memory()
# Get tracemalloc statistics
snapshot = tracemalloc.take_snapshot()
top_stats = snapshot.statistics('lineno')
return {
'start_memory_mb': self.start_memory,
'current_memory_mb': current_memory,
'peak_memory_mb': self.peak_memory,
'memory_increase_mb': current_memory - self.start_memory if self.start_memory else 0,
'snapshots': self.memory_snapshots,
'top_memory_consumers': [
{
'file': stat.traceback.format()[0] if stat.traceback else 'unknown',
'size_mb': stat.size / 1024 / 1024,
'count': stat.count
}
for stat in top_stats[:10]
]
}
class OptimizedDataProcessor:
"""Optimized data processor"""
def __init__(self, memory_limit_mb: int = 512):
self.memory_limit_mb = memory_limit_mb
self.profiler = MemoryProfiler()
def process_large_dataset(self, data: List[Dict[str, Any]],
batch_size: int = 1000) -> Dict[str, Any]:
"""Process large dataset in batches"""
self.profiler.start_profiling()
self.profiler.take_snapshot("processing_start")
total_records = len(data)
processed_records = 0
results = []
# Process in batches to avoid memory overflow
for i in range(0, total_records, batch_size):
batch = data[i:i + batch_size]
# Process batch
batch_result = self._process_batch(batch)
results.extend(batch_result)
processed_records += len(batch)
# Memory check
current_memory = self.profiler.get_current_memory()
if current_memory > self.memory_limit_mb * 0.8: # 80% threshold
logging.warning(f"Memory usage high: {current_memory:.2f}MB")
# Force garbage collection
import gc
gc.collect()
self.profiler.take_snapshot(f"after_gc_batch_{i//batch_size}")
# Periodic snapshot
if i % (batch_size * 10) == 0:
self.profiler.take_snapshot(f"batch_{i//batch_size}")
self.profiler.take_snapshot("processing_complete")
return {
'processed_records': processed_records,
'total_records': total_records,
'results_count': len(results),
'memory_report': self.profiler.get_memory_report()
}
def _process_batch(self, batch: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Process a single batch"""
results = []
for record in batch:
try:
# Data transformation and validation
processed_record = {
'id': record.get('id'),
'processed_at': datetime.utcnow().isoformat(),
'data': self._transform_data(record),
'validation_status': self._validate_record(record)
}
results.append(processed_record)
except Exception as e:
logging.error(f"Error processing record {record.get('id')}: {str(e)}")
# Continue processing other records
continue
return results
def _transform_data(self, record: Dict[str, Any]) -> Dict[str, Any]:
"""Data transformation"""
# Implement memory-friendly data transformation
return {
'name': record.get('name', '').strip().title(),
'email': record.get('email', '').lower(),
'age': int(record.get('age', 0)) if record.get('age') else None,
'score': float(record.get('score', 0.0)) if record.get('score') else 0.0
}
def _validate_record(self, record: Dict[str, Any]) -> str:
"""Record validation"""
if not record.get('name'):
return 'invalid_name'
if not record.get('email') or '@' not in record.get('email', ''):
return 'invalid_email'
return 'valid'
# lambda_functions/memory_optimized/index.py
import json
import logging
import os
from typing import Dict, Any
logger = logging.getLogger()
logger.setLevel(os.getenv('LOG_LEVEL', 'INFO'))
# Global optimized processor
processor = OptimizedDataProcessor(
memory_limit_mb=int(os.getenv('LAMBDA_MEMORY_SIZE', 512))
)
def handler(event: Dict[str, Any], context) -> Dict[str, Any]:
"""Memory-optimized Lambda handler"""
# Log available memory
available_memory_mb = int(context.memory_limit_in_mb)
logger.info(f"Available memory: {available_memory_mb}MB")
try:
action = event.get('action', 'process_data')
if action == 'process_large_dataset':
data = event.get('data', [])
batch_size = event.get('batch_size', 1000)
result = processor.process_large_dataset(data, batch_size)
return {
'statusCode': 200,
'body': json.dumps(result, default=str)
}
elif action == 'memory_test':
# Memory stress test
return _run_memory_test(event.get('test_size_mb', 100))
else:
return {
'statusCode': 400,
'body': json.dumps({'error': f'Unknown action: {action}'})
}
except Exception as e:
logger.error(f"Error in handler: {str(e)}")
return {
'statusCode': 500,
'body': json.dumps({'error': str(e)})
}
def _run_memory_test(test_size_mb: int) -> Dict[str, Any]:
"""Run memory test"""
profiler = MemoryProfiler()
profiler.start_profiling()
try:
# Allocate memory
data_size = test_size_mb * 1024 * 1024 # Convert to bytes
test_data = bytearray(data_size)
profiler.take_snapshot("after_allocation")
# Perform some operations
for i in range(0, len(test_data), 1024):
test_data[i] = i % 256
profiler.take_snapshot("after_operations")
# Clean up memory
del test_data
import gc
gc.collect()
profiler.take_snapshot("after_cleanup")
return {
'statusCode': 200,
'body': json.dumps({
'test_size_mb': test_size_mb,
'memory_report': profiler.get_memory_report()
}, default=str)
}
except MemoryError as e:
return {
'statusCode': 507, # Insufficient Storage
'body': json.dumps({
'error': 'Out of memory',
'test_size_mb': test_size_mb,
'memory_report': profiler.get_memory_report()
}, default=str)
}
9.2.2 CPU Optimization Strategies
# lambda_functions/cpu_optimized/index.py
import json
import time
import concurrent.futures
import multiprocessing
import logging
import os
from typing import Dict, Any, List, Callable
from datetime import datetime
import asyncio
logger = logging.getLogger()
logger.setLevel(os.getenv('LOG_LEVEL', 'INFO'))
class CPUOptimizer:
"""CPU performance optimizer"""
def __init__(self):
# Lambda environment CPU info
self.cpu_count = multiprocessing.cpu_count()
self.memory_mb = int(os.getenv('AWS_LAMBDA_FUNCTION_MEMORY_SIZE', '512'))
# Estimate CPU capacity based on memory size
self.estimated_vcpus = self.memory_mb / 1769 # AWS Lambda CPU allocation ratio
logger.info(f"CPU cores: {self.cpu_count}, Estimated vCPUs: {self.estimated_vcpus:.2f}")
def optimize_cpu_bound_task(self, data: List[Any],
processing_func: Callable,
use_multiprocessing: bool = True) -> Dict[str, Any]:
"""Optimize CPU-intensive tasks"""
start_time = time.time()
# Choose optimal strategy based on data size and CPU capacity
data_size = len(data)
optimal_workers = min(self.cpu_count, max(1, data_size // 100))
if use_multiprocessing and data_size > 1000 and self.cpu_count > 1:
results = self._process_with_multiprocessing(
data, processing_func, optimal_workers
)
elif data_size > 100:
results = self._process_with_threading(
data, processing_func, optimal_workers
)
else:
results = self._process_sequentially(data, processing_func)
processing_time = time.time() - start_time
return {
'results': results,
'processing_time': processing_time,
'data_size': data_size,
'workers_used': optimal_workers,
'cpu_count': self.cpu_count,
'processing_mode': self._get_processing_mode(data_size, use_multiprocessing)
}
def _process_with_multiprocessing(self, data: List[Any],
processing_func: Callable,
workers: int) -> List[Any]:
"""Process with multiprocessing"""
chunk_size = max(1, len(data) // workers)
with multiprocessing.Pool(processes=workers) as pool:
# Split data into chunks
chunks = [data[i:i + chunk_size] for i in range(0, len(data), chunk_size)]
# Process in parallel
chunk_results = pool.map(
lambda chunk: [processing_func(item) for item in chunk],
chunks
)
# Merge results
results = []
for chunk_result in chunk_results:
results.extend(chunk_result)
return results
def _process_with_threading(self, data: List[Any],
processing_func: Callable,
workers: int) -> List[Any]:
"""Process with multi-threading"""
with concurrent.futures.ThreadPoolExecutor(max_workers=workers) as executor:
# Submit all tasks
future_to_item = {
executor.submit(processing_func, item): item
for item in data
}
results = []
for future in concurrent.futures.as_completed(future_to_item):
try:
result = future.result()
results.append(result)
except Exception as e:
logger.error(f"Error processing item: {str(e)}")
results.append(None)
return results
def _process_sequentially(self, data: List[Any],
processing_func: Callable) -> List[Any]:
"""Sequential processing"""
return [processing_func(item) for item in data]
def _get_processing_mode(self, data_size: int, use_multiprocessing: bool) -> str:
"""Get processing mode"""
if use_multiprocessing and data_size > 1000 and self.cpu_count > 1:
return "multiprocessing"
elif data_size > 100:
return "threading"
else:
return "sequential"
class AsyncProcessor:
"""Async processor"""
async def process_async_tasks(self, tasks: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Process multiple async tasks"""
start_time = time.time()
# Create async tasks
async_tasks = []
for task in tasks:
task_type = task.get('type')
if task_type == 'http_request':
async_tasks.append(self._make_http_request(task))
elif task_type == 'database_query':
async_tasks.append(self._execute_database_query(task))
elif task_type == 'file_operation':
async_tasks.append(self._perform_file_operation(task))
# Execute all tasks concurrently
results = await asyncio.gather(*async_tasks, return_exceptions=True)
processing_time = time.time() - start_time
# Categorize results
successful_results = []
errors = []
for i, result in enumerate(results):
if isinstance(result, Exception):
errors.append({
'task_index': i,
'error': str(result)
})
else:
successful_results.append(result)
return {
'successful_results': successful_results,
'errors': errors,
'total_tasks': len(tasks),
'processing_time': processing_time
}
async def _make_http_request(self, task: Dict[str, Any]) -> Dict[str, Any]:
"""Async HTTP request"""
import aiohttp
import asyncio
url = task.get('url')
timeout = task.get('timeout', 10)
async with aiohttp.ClientSession() as session:
async with session.get(url, timeout=timeout) as response:
return {
'task_type': 'http_request',
'url': url,
'status_code': response.status,
'response_size': len(await response.text())
}
async def _execute_database_query(self, task: Dict[str, Any]) -> Dict[str, Any]:
"""Async database query (simulated)"""
await asyncio.sleep(0.1) # Simulate database latency
return {
'task_type': 'database_query',
'query': task.get('query'),
'result_count': 42 # Simulated result
}
async def _perform_file_operation(self, task: Dict[str, Any]) -> Dict[str, Any]:
"""Async file operation (simulated)"""
await asyncio.sleep(0.05) # Simulate file I/O latency
return {
'task_type': 'file_operation',
'operation': task.get('operation'),
'file_path': task.get('file_path'),
'status': 'completed'
}
# Example of math-intensive function
def cpu_intensive_calculation(n: int) -> Dict[str, Any]:
"""CPU-intensive calculation"""
# Calculate Fibonacci sequence
def fibonacci(num):
if num <= 1:
return num
return fibonacci(num - 1) + fibonacci(num - 2)
# Prime number check
def is_prime(num):
if num < 2:
return False
for i in range(2, int(num ** 0.5) + 1):
if num % i == 0:
return False
return True
start_time = time.time()
# Perform calculation
fib_result = fibonacci(min(n, 35)) # Limit recursion depth
prime_check = is_prime(n)
# Numerical calculation
sum_of_squares = sum(i * i for i in range(n))
processing_time = time.time() - start_time
return {
'input': n,
'fibonacci': fib_result,
'is_prime': prime_check,
'sum_of_squares': sum_of_squares,
'processing_time': processing_time
}
# Global optimizer instances
cpu_optimizer = CPUOptimizer()
async_processor = AsyncProcessor()
def handler(event: Dict[str, Any], context) -> Dict[str, Any]:
"""CPU-optimized Lambda handler"""
logger.info(f"Function memory: {context.memory_limit_in_mb}MB")
logger.info(f"Remaining time: {context.get_remaining_time_in_millis()}ms")
try:
action = event.get('action', 'cpu_test')
if action == 'cpu_intensive_batch':
# CPU-intensive batch processing
numbers = event.get('numbers', list(range(1, 101)))
use_multiprocessing = event.get('use_multiprocessing', True)
result = cpu_optimizer.optimize_cpu_bound_task(
numbers,
cpu_intensive_calculation,
use_multiprocessing
)
return {
'statusCode': 200,
'body': json.dumps(result, default=str)
}
elif action == 'async_tasks':
# Async task processing
tasks = event.get('tasks', [])
# Run async code in Lambda
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
result = loop.run_until_complete(
async_processor.process_async_tasks(tasks)
)
finally:
loop.close()
return {
'statusCode': 200,
'body': json.dumps(result, default=str)
}
elif action == 'performance_benchmark':
# Performance benchmark
return _run_performance_benchmark(event.get('test_size', 1000))
else:
return {
'statusCode': 400,
'body': json.dumps({'error': f'Unknown action: {action}'})
}
except Exception as e:
logger.error(f"Error in handler: {str(e)}")
return {
'statusCode': 500,
'body': json.dumps({'error': str(e)})
}
def _run_performance_benchmark(test_size: int) -> Dict[str, Any]:
"""Run performance benchmark"""
results = {}
# Test data
test_data = list(range(1, test_size + 1))
# Test different processing modes
for use_mp in [False, True]:
mode_name = "multiprocessing" if use_mp else "threading"
start_time = time.time()
result = cpu_optimizer.optimize_cpu_bound_task(
test_data[:min(50, test_size)], # Limit test scale
lambda x: x * x, # Simple calculation
use_mp
)
total_time = time.time() - start_time
results[mode_name] = {
'total_time': total_time,
'items_per_second': len(test_data) / total_time if total_time > 0 else 0,
'processing_mode': result['processing_mode']
}
return {
'statusCode': 200,
'body': json.dumps({
'test_size': test_size,
'cpu_info': {
'cpu_count': cpu_optimizer.cpu_count,
'estimated_vcpus': cpu_optimizer.estimated_vcpus
},
'benchmark_results': results
}, default=str)
}
9.3 Concurrency Control Strategies
9.3.1 Intelligent Concurrency Management
# stacks/concurrency_optimized_stack.py
from aws_cdk import (
Stack,
aws_lambda as _lambda,
aws_cloudwatch as cloudwatch,
aws_cloudwatch_actions as cw_actions,
aws_sns as sns,
Duration
)
class ConcurrencyOptimizedStack(Stack):
def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
super().__init__(scope, construct_id, **kwargs)
# Create SNS topic for alerts
self.alert_topic = sns.Topic(
self, "ConcurrencyAlerts",
topic_name="lambda-concurrency-alerts"
)
# High-priority function - reserved concurrency
self.critical_function = _lambda.Function(
self, "CriticalFunction",
runtime=_lambda.Runtime.PYTHON_3_9,
handler="index.handler",
code=_lambda.Code.from_asset("lambda_functions/critical_handler"),
memory_size=512,
timeout=Duration.seconds(30),
reserved_concurrent_executions=50, # Reserve 50 concurrency
environment={
'PRIORITY_LEVEL': 'CRITICAL',
'MAX_PROCESSING_TIME': '25'
}
)
# Standard function - limited concurrency
self.standard_function = _lambda.Function(
self, "StandardFunction",
runtime=_lambda.Runtime.PYTHON_3_9,
handler="index.handler",
code=_lambda.Code.from_asset("lambda_functions/standard_handler"),
memory_size=256,
timeout=Duration.seconds(60),
reserved_concurrent_executions=20, # Limit to 20 concurrency
environment={
'PRIORITY_LEVEL': 'STANDARD',
'MAX_PROCESSING_TIME': '55'
}
)
# Batch function - no concurrency limit
self.batch_function = _lambda.Function(
self, "BatchFunction",
runtime=_lambda.Runtime.PYTHON_3_9,
handler="index.handler",
code=_lambda.Code.from_asset("lambda_functions/batch_handler"),
memory_size=1024,
timeout=Duration.minutes(15),
environment={
'PRIORITY_LEVEL': 'BATCH',
'MAX_PROCESSING_TIME': '900'
}
)
# Create concurrency monitoring and alarms
self._create_concurrency_monitoring()
def _create_concurrency_monitoring(self):
"""Create concurrency monitoring"""
# Monitor concurrent execution count
for func_name, func in [
("Critical", self.critical_function),
("Standard", self.standard_function),
("Batch", self.batch_function)
]:
# Concurrent executions alarm
concurrent_executions_alarm = cloudwatch.Alarm(
self, f"{func_name}ConcurrentExecutionsAlarm",
alarm_name=f"{func_name}-high-concurrency",
alarm_description=f"High concurrent executions for {func_name} function",
metric=func.metric_concurrent_executions(
period=Duration.minutes(1)
),
threshold=func.reserved_concurrent_executions * 0.8 if func.reserved_concurrent_executions else 100,
evaluation_periods=2,
comparison_operator=cloudwatch.ComparisonOperator.GREATER_THAN_THRESHOLD
)
concurrent_executions_alarm.add_alarm_action(
cw_actions.SnsAction(self.alert_topic)
)
# Error rate alarm
error_rate_alarm = cloudwatch.Alarm(
self, f"{func_name}ErrorRateAlarm",
alarm_name=f"{func_name}-high-error-rate",
alarm_description=f"High error rate for {func_name} function",
metric=cloudwatch.MathExpression(
expression="errors / invocations * 100",
using_metrics={
"errors": func.metric_errors(period=Duration.minutes(5)),
"invocations": func.metric_invocations(period=Duration.minutes(5))
}
),
threshold=5, # 5% error rate
evaluation_periods=2,
comparison_operator=cloudwatch.ComparisonOperator.GREATER_THAN_THRESHOLD
)
error_rate_alarm.add_alarm_action(
cw_actions.SnsAction(self.alert_topic)
)
# Duration alarm
duration_alarm = cloudwatch.Alarm(
self, f"{func_name}DurationAlarm",
alarm_name=f"{func_name}-long-duration",
alarm_description=f"Long duration for {func_name} function",
metric=func.metric_duration(
period=Duration.minutes(5),
statistic="Average"
),
threshold=func.timeout.to_seconds() * 0.8, # 80% of timeout
evaluation_periods=3,
comparison_operator=cloudwatch.ComparisonOperator.GREATER_THAN_THRESHOLD
)
duration_alarm.add_alarm_action(
cw_actions.SnsAction(self.alert_topic)
)
9.4 Code Optimization Best Practices
9.4.1 Import Optimization
This section contains the optimized import patterns, lazy loading, and caching strategies similar to the Chinese version but translated to English.
9.5 Chapter Summary
Key Points
- Cold start optimization significantly improves performance through connection pooling, lazy imports, and provisioned concurrency
- Memory and CPU optimization requires choosing appropriate configurations and algorithms based on function characteristics
- Intelligent concurrency control can automatically adjust function configurations to optimize performance and cost
- Code optimization includes import optimization, caching strategies, and efficient data processing
- Monitoring and analysis are the foundation for continuous optimization
- Performance optimization is an ongoing process that needs to be adjusted based on actual usage
In the next chapter, we will apply all the knowledge from this course to build a production-grade serverless web application through a complete practical project.