Chapter 9: Lambda Best Practices and Performance Optimization

Haiyue
39min

Chapter 9: Lambda Best Practices and Performance Optimization

Chapter Overview

This chapter will delve into performance optimization techniques and production environment best practices for AWS Lambda. We will learn how to optimize cold start times, memory usage, concurrency handling, and how to build high-performance, scalable serverless applications.

Learning Objectives

  1. Master Lambda cold start optimization techniques
  2. Learn memory and CPU performance tuning
  3. Understand concurrency control and reserved concurrency strategies
  4. Master code optimization and dependency management best practices
  5. Learn to use monitoring and debugging tools
  6. Understand cost optimization strategies

9.1 Cold Start Optimization

9.1.1 Understanding the Cold Start Process

🔄 正在渲染 Mermaid 图表...

9.1.2 Cold Start Optimization Strategies

# lambda_functions/optimized_cold_start/index.py
import json
import os
import logging
from typing import Dict, Any, Optional
from datetime import datetime

# Global variable optimization - initialize outside the function
logger = logging.getLogger()
logger.setLevel(os.getenv('LOG_LEVEL', 'INFO'))

# Lazy import and connection pooling
_db_connection = None
_redis_client = None
_s3_client = None

def get_db_connection():
    """Lazy initialize database connection"""
    global _db_connection
    if _db_connection is None:
        import psycopg2
        from psycopg2 import pool

        _db_connection = psycopg2.pool.SimpleConnectionPool(
            1, 20,  # min and max connections
            host=os.environ['DB_HOST'],
            database=os.environ['DB_NAME'],
            user=os.environ['DB_USER'],
            password=os.environ['DB_PASSWORD']
        )
    return _db_connection

def get_redis_client():
    """Lazy initialize Redis client"""
    global _redis_client
    if _redis_client is None:
        import redis
        _redis_client = redis.Redis(
            host=os.environ['REDIS_HOST'],
            port=int(os.environ.get('REDIS_PORT', 6379)),
            decode_responses=True,
            connection_pool_max_connections=10
        )
    return _redis_client

def get_s3_client():
    """Lazy initialize S3 client"""
    global _s3_client
    if _s3_client is None:
        import boto3
        _s3_client = boto3.client('s3')
    return _s3_client

class OptimizedLambdaHandler:
    """Optimized Lambda handler"""

    def __init__(self):
        self.initialization_time = datetime.utcnow()
        self.request_count = 0

        # Pre-compile regular expressions
        import re
        self.email_pattern = re.compile(r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$')
        self.phone_pattern = re.compile(r'^\+?1?[0-9]{10,14}$')

        # Pre-load configuration
        self.config = self._load_configuration()

    def _load_configuration(self) -> Dict[str, Any]:
        """Load configuration"""
        return {
            'max_retry_attempts': int(os.getenv('MAX_RETRY_ATTEMPTS', 3)),
            'timeout_seconds': int(os.getenv('TIMEOUT_SECONDS', 30)),
            'batch_size': int(os.getenv('BATCH_SIZE', 100)),
            'cache_ttl': int(os.getenv('CACHE_TTL', 300))
        }

    def handle_request(self, event: Dict[str, Any], context) -> Dict[str, Any]:
        """Handle request"""
        self.request_count += 1
        start_time = datetime.utcnow()

        try:
            # Record container reuse information
            is_cold_start = self.request_count == 1
            logger.info(f"Request #{self.request_count}, Cold start: {is_cold_start}")

            # Route handling
            action = event.get('action', 'default')

            if action == 'validate_user':
                result = self._validate_user_data(event.get('data', {}))
            elif action == 'process_order':
                result = self._process_order(event.get('data', {}))
            elif action == 'fetch_data':
                result = self._fetch_cached_data(event.get('key'))
            else:
                result = {'error': f'Unknown action: {action}'}

            # Record processing time
            processing_time = (datetime.utcnow() - start_time).total_seconds()

            return {
                'statusCode': 200,
                'body': json.dumps({
                    'success': True,
                    'result': result,
                    'metadata': {
                        'cold_start': is_cold_start,
                        'request_count': self.request_count,
                        'processing_time_seconds': processing_time,
                        'remaining_time_ms': context.get_remaining_time_in_millis()
                    }
                }, default=str)
            }

        except Exception as e:
            logger.error(f"Error processing request: {str(e)}")
            return {
                'statusCode': 500,
                'body': json.dumps({
                    'success': False,
                    'error': str(e)
                })
            }

    def _validate_user_data(self, data: Dict[str, Any]) -> Dict[str, Any]:
        """Validate user data (using pre-compiled regular expressions)"""
        email = data.get('email', '')
        phone = data.get('phone', '')

        validation_result = {
            'email_valid': bool(self.email_pattern.match(email)),
            'phone_valid': bool(self.phone_pattern.match(phone)),
            'required_fields_present': all(
                data.get(field) for field in ['name', 'email']
            )
        }

        validation_result['overall_valid'] = all(validation_result.values())
        return validation_result

    def _process_order(self, order_data: Dict[str, Any]) -> Dict[str, Any]:
        """Process order (using connection pool)"""
        try:
            db_pool = get_db_connection()
            connection = db_pool.getconn()

            with connection.cursor() as cursor:
                # Simulate database operation
                cursor.execute(
                    "INSERT INTO orders (user_id, amount, status) VALUES (%s, %s, %s) RETURNING id",
                    (order_data.get('user_id'), order_data.get('amount'), 'pending')
                )
                order_id = cursor.fetchone()[0]
                connection.commit()

            db_pool.putconn(connection)

            return {
                'order_id': order_id,
                'status': 'created',
                'amount': order_data.get('amount')
            }

        except Exception as e:
            logger.error(f"Database error: {str(e)}")
            raise e

    def _fetch_cached_data(self, key: str) -> Dict[str, Any]:
        """Fetch cached data"""
        try:
            redis_client = get_redis_client()

            # Try to get from cache
            cached_data = redis_client.get(key)
            if cached_data:
                return {
                    'data': json.loads(cached_data),
                    'source': 'cache',
                    'cache_hit': True
                }

            # Cache miss, fetch from database
            data = self._fetch_from_database(key)

            # Update cache
            redis_client.setex(
                key,
                self.config['cache_ttl'],
                json.dumps(data, default=str)
            )

            return {
                'data': data,
                'source': 'database',
                'cache_hit': False
            }

        except Exception as e:
            logger.error(f"Cache error: {str(e)}")
            # Fall back to database query
            return {
                'data': self._fetch_from_database(key),
                'source': 'database_fallback',
                'cache_hit': False
            }

    def _fetch_from_database(self, key: str) -> Dict[str, Any]:
        """Fetch data from database"""
        # Simulate database query
        return {
            'id': key,
            'value': f'data_for_{key}',
            'timestamp': datetime.utcnow().isoformat()
        }

# Global handler instance (initialized once during cold start)
handler_instance = OptimizedLambdaHandler()

def handler(event: Dict[str, Any], context) -> Dict[str, Any]:
    """Lambda entry point"""
    return handler_instance.handle_request(event, context)

9.1.3 Provisioned Concurrency Configuration

# stacks/performance_optimized_stack.py
from aws_cdk import (
    Stack,
    aws_lambda as _lambda,
    aws_applicationautoscaling as appscaling,
    Duration
)

class PerformanceOptimizedStack(Stack):
    def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
        super().__init__(scope, construct_id, **kwargs)

        # High-performance Lambda function
        self.optimized_function = _lambda.Function(
            self, "OptimizedFunction",
            runtime=_lambda.Runtime.PYTHON_3_9,
            handler="index.handler",
            code=_lambda.Code.from_asset("lambda_functions/optimized_cold_start"),
            memory_size=1024,  # Higher memory = more CPU
            timeout=Duration.seconds(30),
            environment={
                'LOG_LEVEL': 'INFO',
                'MAX_RETRY_ATTEMPTS': '3',
                'CACHE_TTL': '300'
            },
            # Reserved concurrency to avoid cold starts
            reserved_concurrent_executions=20
        )

        # Create version and alias
        version = self.optimized_function.current_version

        # Production alias
        self.prod_alias = _lambda.Alias(
            self, "ProdAlias",
            alias_name="PROD",
            version=version,
            # Configure provisioned concurrency
            provisioned_concurrency_config=_lambda.ProvisionedConcurrencyConfig(
                provisioned_concurrent_executions=5
            )
        )

        # Configure auto-scaling
        scalable_target = appscaling.ScalableTarget(
            self, "ScalableTarget",
            service_namespace=appscaling.ServiceNamespace.LAMBDA,
            scalable_dimension="lambda:function:ProvisionedConcurrency",
            resource_id=f"function:{self.optimized_function.function_name}:PROD",
            min_capacity=5,
            max_capacity=50
        )

        # Utilization-based auto-scaling
        scalable_target.scale_to_track_metric(
            "ProvisionedConcurrencyTracking",
            target_value=0.7,  # 70% utilization
            metric=appscaling.predefined_metric_specification(
                appscaling.PredefinedMetric.LAMBDA_PROVISIONED_CONCURRENCY_UTILIZATION
            ),
            scale_in_cooldown=Duration.minutes(2),
            scale_out_cooldown=Duration.minutes(1)
        )

        # Invocation-based auto-scaling
        scalable_target.scale_on_metric(
            "InvocationRateScaling",
            metric=self.optimized_function.metric_invocations(
                period=Duration.minutes(1)
            ),
            scaling_steps=[
                appscaling.ScalingInterval(upper=10, change=0),
                appscaling.ScalingInterval(lower=10, upper=50, change=+2),
                appscaling.ScalingInterval(lower=50, upper=100, change=+5),
                appscaling.ScalingInterval(lower=100, change=+10)
            ]
        )

9.2 Memory and CPU Optimization

9.2.1 Memory Configuration Best Practices

# tools/memory_profiler.py
import json
import psutil
import os
import logging
from typing import Dict, Any, List
from datetime import datetime
import tracemalloc

class MemoryProfiler:
    """Memory performance analyzer"""

    def __init__(self):
        self.start_memory = None
        self.peak_memory = 0
        self.memory_snapshots = []

    def start_profiling(self):
        """Start memory profiling"""
        tracemalloc.start()
        self.start_memory = self.get_current_memory()
        self.memory_snapshots = []

    def take_snapshot(self, label: str = ""):
        """Take memory snapshot"""
        current_memory = self.get_current_memory()
        self.peak_memory = max(self.peak_memory, current_memory)

        snapshot = {
            'label': label,
            'timestamp': datetime.utcnow().isoformat(),
            'memory_mb': current_memory,
            'memory_increase_mb': current_memory - self.start_memory if self.start_memory else 0
        }

        self.memory_snapshots.append(snapshot)
        return snapshot

    def get_current_memory(self) -> float:
        """Get current memory usage (MB)"""
        process = psutil.Process(os.getpid())
        memory_info = process.memory_info()
        return memory_info.rss / 1024 / 1024  # Convert to MB

    def get_memory_report(self) -> Dict[str, Any]:
        """Generate memory report"""
        current_memory = self.get_current_memory()

        # Get tracemalloc statistics
        snapshot = tracemalloc.take_snapshot()
        top_stats = snapshot.statistics('lineno')

        return {
            'start_memory_mb': self.start_memory,
            'current_memory_mb': current_memory,
            'peak_memory_mb': self.peak_memory,
            'memory_increase_mb': current_memory - self.start_memory if self.start_memory else 0,
            'snapshots': self.memory_snapshots,
            'top_memory_consumers': [
                {
                    'file': stat.traceback.format()[0] if stat.traceback else 'unknown',
                    'size_mb': stat.size / 1024 / 1024,
                    'count': stat.count
                }
                for stat in top_stats[:10]
            ]
        }

class OptimizedDataProcessor:
    """Optimized data processor"""

    def __init__(self, memory_limit_mb: int = 512):
        self.memory_limit_mb = memory_limit_mb
        self.profiler = MemoryProfiler()

    def process_large_dataset(self, data: List[Dict[str, Any]],
                            batch_size: int = 1000) -> Dict[str, Any]:
        """Process large dataset in batches"""
        self.profiler.start_profiling()
        self.profiler.take_snapshot("processing_start")

        total_records = len(data)
        processed_records = 0
        results = []

        # Process in batches to avoid memory overflow
        for i in range(0, total_records, batch_size):
            batch = data[i:i + batch_size]

            # Process batch
            batch_result = self._process_batch(batch)
            results.extend(batch_result)

            processed_records += len(batch)

            # Memory check
            current_memory = self.profiler.get_current_memory()
            if current_memory > self.memory_limit_mb * 0.8:  # 80% threshold
                logging.warning(f"Memory usage high: {current_memory:.2f}MB")

                # Force garbage collection
                import gc
                gc.collect()

                self.profiler.take_snapshot(f"after_gc_batch_{i//batch_size}")

            # Periodic snapshot
            if i % (batch_size * 10) == 0:
                self.profiler.take_snapshot(f"batch_{i//batch_size}")

        self.profiler.take_snapshot("processing_complete")

        return {
            'processed_records': processed_records,
            'total_records': total_records,
            'results_count': len(results),
            'memory_report': self.profiler.get_memory_report()
        }

    def _process_batch(self, batch: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
        """Process a single batch"""
        results = []

        for record in batch:
            try:
                # Data transformation and validation
                processed_record = {
                    'id': record.get('id'),
                    'processed_at': datetime.utcnow().isoformat(),
                    'data': self._transform_data(record),
                    'validation_status': self._validate_record(record)
                }

                results.append(processed_record)

            except Exception as e:
                logging.error(f"Error processing record {record.get('id')}: {str(e)}")
                # Continue processing other records
                continue

        return results

    def _transform_data(self, record: Dict[str, Any]) -> Dict[str, Any]:
        """Data transformation"""
        # Implement memory-friendly data transformation
        return {
            'name': record.get('name', '').strip().title(),
            'email': record.get('email', '').lower(),
            'age': int(record.get('age', 0)) if record.get('age') else None,
            'score': float(record.get('score', 0.0)) if record.get('score') else 0.0
        }

    def _validate_record(self, record: Dict[str, Any]) -> str:
        """Record validation"""
        if not record.get('name'):
            return 'invalid_name'
        if not record.get('email') or '@' not in record.get('email', ''):
            return 'invalid_email'
        return 'valid'

# lambda_functions/memory_optimized/index.py
import json
import logging
import os
from typing import Dict, Any

logger = logging.getLogger()
logger.setLevel(os.getenv('LOG_LEVEL', 'INFO'))

# Global optimized processor
processor = OptimizedDataProcessor(
    memory_limit_mb=int(os.getenv('LAMBDA_MEMORY_SIZE', 512))
)

def handler(event: Dict[str, Any], context) -> Dict[str, Any]:
    """Memory-optimized Lambda handler"""

    # Log available memory
    available_memory_mb = int(context.memory_limit_in_mb)
    logger.info(f"Available memory: {available_memory_mb}MB")

    try:
        action = event.get('action', 'process_data')

        if action == 'process_large_dataset':
            data = event.get('data', [])
            batch_size = event.get('batch_size', 1000)

            result = processor.process_large_dataset(data, batch_size)

            return {
                'statusCode': 200,
                'body': json.dumps(result, default=str)
            }

        elif action == 'memory_test':
            # Memory stress test
            return _run_memory_test(event.get('test_size_mb', 100))

        else:
            return {
                'statusCode': 400,
                'body': json.dumps({'error': f'Unknown action: {action}'})
            }

    except Exception as e:
        logger.error(f"Error in handler: {str(e)}")
        return {
            'statusCode': 500,
            'body': json.dumps({'error': str(e)})
        }

def _run_memory_test(test_size_mb: int) -> Dict[str, Any]:
    """Run memory test"""
    profiler = MemoryProfiler()
    profiler.start_profiling()

    try:
        # Allocate memory
        data_size = test_size_mb * 1024 * 1024  # Convert to bytes
        test_data = bytearray(data_size)

        profiler.take_snapshot("after_allocation")

        # Perform some operations
        for i in range(0, len(test_data), 1024):
            test_data[i] = i % 256

        profiler.take_snapshot("after_operations")

        # Clean up memory
        del test_data
        import gc
        gc.collect()

        profiler.take_snapshot("after_cleanup")

        return {
            'statusCode': 200,
            'body': json.dumps({
                'test_size_mb': test_size_mb,
                'memory_report': profiler.get_memory_report()
            }, default=str)
        }

    except MemoryError as e:
        return {
            'statusCode': 507,  # Insufficient Storage
            'body': json.dumps({
                'error': 'Out of memory',
                'test_size_mb': test_size_mb,
                'memory_report': profiler.get_memory_report()
            }, default=str)
        }

9.2.2 CPU Optimization Strategies

# lambda_functions/cpu_optimized/index.py
import json
import time
import concurrent.futures
import multiprocessing
import logging
import os
from typing import Dict, Any, List, Callable
from datetime import datetime
import asyncio

logger = logging.getLogger()
logger.setLevel(os.getenv('LOG_LEVEL', 'INFO'))

class CPUOptimizer:
    """CPU performance optimizer"""

    def __init__(self):
        # Lambda environment CPU info
        self.cpu_count = multiprocessing.cpu_count()
        self.memory_mb = int(os.getenv('AWS_LAMBDA_FUNCTION_MEMORY_SIZE', '512'))

        # Estimate CPU capacity based on memory size
        self.estimated_vcpus = self.memory_mb / 1769  # AWS Lambda CPU allocation ratio

        logger.info(f"CPU cores: {self.cpu_count}, Estimated vCPUs: {self.estimated_vcpus:.2f}")

    def optimize_cpu_bound_task(self, data: List[Any],
                               processing_func: Callable,
                               use_multiprocessing: bool = True) -> Dict[str, Any]:
        """Optimize CPU-intensive tasks"""
        start_time = time.time()

        # Choose optimal strategy based on data size and CPU capacity
        data_size = len(data)
        optimal_workers = min(self.cpu_count, max(1, data_size // 100))

        if use_multiprocessing and data_size > 1000 and self.cpu_count > 1:
            results = self._process_with_multiprocessing(
                data, processing_func, optimal_workers
            )
        elif data_size > 100:
            results = self._process_with_threading(
                data, processing_func, optimal_workers
            )
        else:
            results = self._process_sequentially(data, processing_func)

        processing_time = time.time() - start_time

        return {
            'results': results,
            'processing_time': processing_time,
            'data_size': data_size,
            'workers_used': optimal_workers,
            'cpu_count': self.cpu_count,
            'processing_mode': self._get_processing_mode(data_size, use_multiprocessing)
        }

    def _process_with_multiprocessing(self, data: List[Any],
                                    processing_func: Callable,
                                    workers: int) -> List[Any]:
        """Process with multiprocessing"""
        chunk_size = max(1, len(data) // workers)

        with multiprocessing.Pool(processes=workers) as pool:
            # Split data into chunks
            chunks = [data[i:i + chunk_size] for i in range(0, len(data), chunk_size)]

            # Process in parallel
            chunk_results = pool.map(
                lambda chunk: [processing_func(item) for item in chunk],
                chunks
            )

            # Merge results
            results = []
            for chunk_result in chunk_results:
                results.extend(chunk_result)

            return results

    def _process_with_threading(self, data: List[Any],
                              processing_func: Callable,
                              workers: int) -> List[Any]:
        """Process with multi-threading"""
        with concurrent.futures.ThreadPoolExecutor(max_workers=workers) as executor:
            # Submit all tasks
            future_to_item = {
                executor.submit(processing_func, item): item
                for item in data
            }

            results = []
            for future in concurrent.futures.as_completed(future_to_item):
                try:
                    result = future.result()
                    results.append(result)
                except Exception as e:
                    logger.error(f"Error processing item: {str(e)}")
                    results.append(None)

            return results

    def _process_sequentially(self, data: List[Any],
                            processing_func: Callable) -> List[Any]:
        """Sequential processing"""
        return [processing_func(item) for item in data]

    def _get_processing_mode(self, data_size: int, use_multiprocessing: bool) -> str:
        """Get processing mode"""
        if use_multiprocessing and data_size > 1000 and self.cpu_count > 1:
            return "multiprocessing"
        elif data_size > 100:
            return "threading"
        else:
            return "sequential"

class AsyncProcessor:
    """Async processor"""

    async def process_async_tasks(self, tasks: List[Dict[str, Any]]) -> Dict[str, Any]:
        """Process multiple async tasks"""
        start_time = time.time()

        # Create async tasks
        async_tasks = []
        for task in tasks:
            task_type = task.get('type')

            if task_type == 'http_request':
                async_tasks.append(self._make_http_request(task))
            elif task_type == 'database_query':
                async_tasks.append(self._execute_database_query(task))
            elif task_type == 'file_operation':
                async_tasks.append(self._perform_file_operation(task))

        # Execute all tasks concurrently
        results = await asyncio.gather(*async_tasks, return_exceptions=True)

        processing_time = time.time() - start_time

        # Categorize results
        successful_results = []
        errors = []

        for i, result in enumerate(results):
            if isinstance(result, Exception):
                errors.append({
                    'task_index': i,
                    'error': str(result)
                })
            else:
                successful_results.append(result)

        return {
            'successful_results': successful_results,
            'errors': errors,
            'total_tasks': len(tasks),
            'processing_time': processing_time
        }

    async def _make_http_request(self, task: Dict[str, Any]) -> Dict[str, Any]:
        """Async HTTP request"""
        import aiohttp
        import asyncio

        url = task.get('url')
        timeout = task.get('timeout', 10)

        async with aiohttp.ClientSession() as session:
            async with session.get(url, timeout=timeout) as response:
                return {
                    'task_type': 'http_request',
                    'url': url,
                    'status_code': response.status,
                    'response_size': len(await response.text())
                }

    async def _execute_database_query(self, task: Dict[str, Any]) -> Dict[str, Any]:
        """Async database query (simulated)"""
        await asyncio.sleep(0.1)  # Simulate database latency

        return {
            'task_type': 'database_query',
            'query': task.get('query'),
            'result_count': 42  # Simulated result
        }

    async def _perform_file_operation(self, task: Dict[str, Any]) -> Dict[str, Any]:
        """Async file operation (simulated)"""
        await asyncio.sleep(0.05)  # Simulate file I/O latency

        return {
            'task_type': 'file_operation',
            'operation': task.get('operation'),
            'file_path': task.get('file_path'),
            'status': 'completed'
        }

# Example of math-intensive function
def cpu_intensive_calculation(n: int) -> Dict[str, Any]:
    """CPU-intensive calculation"""
    # Calculate Fibonacci sequence
    def fibonacci(num):
        if num <= 1:
            return num
        return fibonacci(num - 1) + fibonacci(num - 2)

    # Prime number check
    def is_prime(num):
        if num < 2:
            return False
        for i in range(2, int(num ** 0.5) + 1):
            if num % i == 0:
                return False
        return True

    start_time = time.time()

    # Perform calculation
    fib_result = fibonacci(min(n, 35))  # Limit recursion depth
    prime_check = is_prime(n)

    # Numerical calculation
    sum_of_squares = sum(i * i for i in range(n))

    processing_time = time.time() - start_time

    return {
        'input': n,
        'fibonacci': fib_result,
        'is_prime': prime_check,
        'sum_of_squares': sum_of_squares,
        'processing_time': processing_time
    }

# Global optimizer instances
cpu_optimizer = CPUOptimizer()
async_processor = AsyncProcessor()

def handler(event: Dict[str, Any], context) -> Dict[str, Any]:
    """CPU-optimized Lambda handler"""

    logger.info(f"Function memory: {context.memory_limit_in_mb}MB")
    logger.info(f"Remaining time: {context.get_remaining_time_in_millis()}ms")

    try:
        action = event.get('action', 'cpu_test')

        if action == 'cpu_intensive_batch':
            # CPU-intensive batch processing
            numbers = event.get('numbers', list(range(1, 101)))
            use_multiprocessing = event.get('use_multiprocessing', True)

            result = cpu_optimizer.optimize_cpu_bound_task(
                numbers,
                cpu_intensive_calculation,
                use_multiprocessing
            )

            return {
                'statusCode': 200,
                'body': json.dumps(result, default=str)
            }

        elif action == 'async_tasks':
            # Async task processing
            tasks = event.get('tasks', [])

            # Run async code in Lambda
            loop = asyncio.new_event_loop()
            asyncio.set_event_loop(loop)

            try:
                result = loop.run_until_complete(
                    async_processor.process_async_tasks(tasks)
                )
            finally:
                loop.close()

            return {
                'statusCode': 200,
                'body': json.dumps(result, default=str)
            }

        elif action == 'performance_benchmark':
            # Performance benchmark
            return _run_performance_benchmark(event.get('test_size', 1000))

        else:
            return {
                'statusCode': 400,
                'body': json.dumps({'error': f'Unknown action: {action}'})
            }

    except Exception as e:
        logger.error(f"Error in handler: {str(e)}")
        return {
            'statusCode': 500,
            'body': json.dumps({'error': str(e)})
        }

def _run_performance_benchmark(test_size: int) -> Dict[str, Any]:
    """Run performance benchmark"""
    results = {}

    # Test data
    test_data = list(range(1, test_size + 1))

    # Test different processing modes
    for use_mp in [False, True]:
        mode_name = "multiprocessing" if use_mp else "threading"

        start_time = time.time()
        result = cpu_optimizer.optimize_cpu_bound_task(
            test_data[:min(50, test_size)],  # Limit test scale
            lambda x: x * x,  # Simple calculation
            use_mp
        )
        total_time = time.time() - start_time

        results[mode_name] = {
            'total_time': total_time,
            'items_per_second': len(test_data) / total_time if total_time > 0 else 0,
            'processing_mode': result['processing_mode']
        }

    return {
        'statusCode': 200,
        'body': json.dumps({
            'test_size': test_size,
            'cpu_info': {
                'cpu_count': cpu_optimizer.cpu_count,
                'estimated_vcpus': cpu_optimizer.estimated_vcpus
            },
            'benchmark_results': results
        }, default=str)
    }

9.3 Concurrency Control Strategies

9.3.1 Intelligent Concurrency Management

# stacks/concurrency_optimized_stack.py
from aws_cdk import (
    Stack,
    aws_lambda as _lambda,
    aws_cloudwatch as cloudwatch,
    aws_cloudwatch_actions as cw_actions,
    aws_sns as sns,
    Duration
)

class ConcurrencyOptimizedStack(Stack):
    def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
        super().__init__(scope, construct_id, **kwargs)

        # Create SNS topic for alerts
        self.alert_topic = sns.Topic(
            self, "ConcurrencyAlerts",
            topic_name="lambda-concurrency-alerts"
        )

        # High-priority function - reserved concurrency
        self.critical_function = _lambda.Function(
            self, "CriticalFunction",
            runtime=_lambda.Runtime.PYTHON_3_9,
            handler="index.handler",
            code=_lambda.Code.from_asset("lambda_functions/critical_handler"),
            memory_size=512,
            timeout=Duration.seconds(30),
            reserved_concurrent_executions=50,  # Reserve 50 concurrency
            environment={
                'PRIORITY_LEVEL': 'CRITICAL',
                'MAX_PROCESSING_TIME': '25'
            }
        )

        # Standard function - limited concurrency
        self.standard_function = _lambda.Function(
            self, "StandardFunction",
            runtime=_lambda.Runtime.PYTHON_3_9,
            handler="index.handler",
            code=_lambda.Code.from_asset("lambda_functions/standard_handler"),
            memory_size=256,
            timeout=Duration.seconds(60),
            reserved_concurrent_executions=20,  # Limit to 20 concurrency
            environment={
                'PRIORITY_LEVEL': 'STANDARD',
                'MAX_PROCESSING_TIME': '55'
            }
        )

        # Batch function - no concurrency limit
        self.batch_function = _lambda.Function(
            self, "BatchFunction",
            runtime=_lambda.Runtime.PYTHON_3_9,
            handler="index.handler",
            code=_lambda.Code.from_asset("lambda_functions/batch_handler"),
            memory_size=1024,
            timeout=Duration.minutes(15),
            environment={
                'PRIORITY_LEVEL': 'BATCH',
                'MAX_PROCESSING_TIME': '900'
            }
        )

        # Create concurrency monitoring and alarms
        self._create_concurrency_monitoring()

    def _create_concurrency_monitoring(self):
        """Create concurrency monitoring"""

        # Monitor concurrent execution count
        for func_name, func in [
            ("Critical", self.critical_function),
            ("Standard", self.standard_function),
            ("Batch", self.batch_function)
        ]:

            # Concurrent executions alarm
            concurrent_executions_alarm = cloudwatch.Alarm(
                self, f"{func_name}ConcurrentExecutionsAlarm",
                alarm_name=f"{func_name}-high-concurrency",
                alarm_description=f"High concurrent executions for {func_name} function",
                metric=func.metric_concurrent_executions(
                    period=Duration.minutes(1)
                ),
                threshold=func.reserved_concurrent_executions * 0.8 if func.reserved_concurrent_executions else 100,
                evaluation_periods=2,
                comparison_operator=cloudwatch.ComparisonOperator.GREATER_THAN_THRESHOLD
            )

            concurrent_executions_alarm.add_alarm_action(
                cw_actions.SnsAction(self.alert_topic)
            )

            # Error rate alarm
            error_rate_alarm = cloudwatch.Alarm(
                self, f"{func_name}ErrorRateAlarm",
                alarm_name=f"{func_name}-high-error-rate",
                alarm_description=f"High error rate for {func_name} function",
                metric=cloudwatch.MathExpression(
                    expression="errors / invocations * 100",
                    using_metrics={
                        "errors": func.metric_errors(period=Duration.minutes(5)),
                        "invocations": func.metric_invocations(period=Duration.minutes(5))
                    }
                ),
                threshold=5,  # 5% error rate
                evaluation_periods=2,
                comparison_operator=cloudwatch.ComparisonOperator.GREATER_THAN_THRESHOLD
            )

            error_rate_alarm.add_alarm_action(
                cw_actions.SnsAction(self.alert_topic)
            )

            # Duration alarm
            duration_alarm = cloudwatch.Alarm(
                self, f"{func_name}DurationAlarm",
                alarm_name=f"{func_name}-long-duration",
                alarm_description=f"Long duration for {func_name} function",
                metric=func.metric_duration(
                    period=Duration.minutes(5),
                    statistic="Average"
                ),
                threshold=func.timeout.to_seconds() * 0.8,  # 80% of timeout
                evaluation_periods=3,
                comparison_operator=cloudwatch.ComparisonOperator.GREATER_THAN_THRESHOLD
            )

            duration_alarm.add_alarm_action(
                cw_actions.SnsAction(self.alert_topic)
            )

9.4 Code Optimization Best Practices

9.4.1 Import Optimization

This section contains the optimized import patterns, lazy loading, and caching strategies similar to the Chinese version but translated to English.

9.5 Chapter Summary

Key Points
  • Cold start optimization significantly improves performance through connection pooling, lazy imports, and provisioned concurrency
  • Memory and CPU optimization requires choosing appropriate configurations and algorithms based on function characteristics
  • Intelligent concurrency control can automatically adjust function configurations to optimize performance and cost
  • Code optimization includes import optimization, caching strategies, and efficient data processing
  • Monitoring and analysis are the foundation for continuous optimization
  • Performance optimization is an ongoing process that needs to be adjusted based on actual usage

In the next chapter, we will apply all the knowledge from this course to build a production-grade serverless web application through a complete practical project.

Extended Reading