Chapter 1: Introduction to Lambda and Serverless Computing

Haiyue
37min

Chapter 1: Introduction to Lambda and Serverless Computing

Chapter Overview

This chapter will take you from the basics to understand AWS Lambda and serverless computing. We’ll explore the evolution of serverless architecture, core concepts, advantages and limitations, and the AWS Lambda service model. Through this chapter, you’ll establish a comprehensive understanding of serverless computing.

Learning Objectives

  1. Understand the evolution and core concepts of serverless architecture
  2. Master the basic principles and service model of AWS Lambda
  3. Learn when to use and when not to use serverless architecture
  4. Understand Lambda’s pricing model and cost optimization strategies
  5. Familiarize yourself with Lambda’s relationship with other AWS services

1.1 What is Serverless Computing?

1.1.1 The Evolution of Cloud Computing

Cloud computing has undergone several major evolutionary stages:

🔄 正在渲染 Mermaid 图表...
  1. Physical Server Era: Direct hardware management, high costs, low flexibility
  2. Virtual Machine (IaaS) Era: Resource virtualization, improved utilization, but still need OS management
  3. Container (PaaS) Era: Application-level isolation, improved deployment efficiency
  4. Serverless (FaaS/BaaS) Era: Focus on business logic, infrastructure fully managed

1.1.2 Definition of Serverless

Serverless doesn’t mean there are no servers, but rather developers no longer need to manage servers. The cloud provider is responsible for:

  • Server provisioning and management
  • Operating system updates and patching
  • Capacity planning and auto-scaling
  • High availability and fault tolerance
  • Monitoring and logging

Developers only need to focus on writing business code.

1.1.3 Core Characteristics of Serverless

// Serverless core characteristics
interface ServerlessCharacteristics {
  // 1. No server management
  noServerManagement: true;

  // 2. Auto-scaling
  autoScaling: {
    min: 0,  // Can scale to zero
    max: Infinity,  // Theoretically unlimited scaling
    scalingSpeed: "seconds"  // Second-level scaling
  };

  // 3. Pay-per-use
  pricing: {
    model: "pay-as-you-go",
    granularity: "milliseconds",  // Billed by millisecond
    noIdleCost: true  // No cost when not running
  };

  // 4. Event-driven
  trigger: "event-driven";

  // 5. Stateless execution
  stateless: true;

  // 6. High availability built-in
  highAvailability: "managed-by-provider";
}

1.2 Introduction to AWS Lambda

1.2.1 What is AWS Lambda?

AWS Lambda is Amazon Web Services’ serverless computing service that allows you to run code without provisioning or managing servers.

Core Capabilities:

  • Code Execution: Upload your code, Lambda handles all the rest
  • Auto-Scaling: Automatically scales based on the number of requests
  • Flexible Integration: Integrates with over 200 AWS services
  • Multiple Languages: Supports Python, Node.js, Java, Go, .NET, Ruby, etc.
  • Fine-grained Billing: Billed by request count and execution time

1.2.2 Lambda Execution Model

# Lambda execution model example
def lambda_handler(event, context):
    """
    Lambda function main entry point

    Args:
        event: Event data (dict) - Contains information that triggered the function
        context: Context object - Contains runtime information

    Returns:
        Response data (can be dict, string, number, etc.)
    """

    # 1. Parse event data
    request_data = parse_event(event)

    # 2. Execute business logic
    result = process_business_logic(request_data)

    # 3. Return result
    return {
        'statusCode': 200,
        'body': json.dumps(result)
    }

def parse_event(event):
    """Parse event data"""
    # Different event sources have different event structure
    if 'body' in event:  # API Gateway event
        return json.loads(event['body'])
    elif 'Records' in event:  # S3 or DynamoDB Stream event
        return event['Records']
    else:
        return event

def process_business_logic(data):
    """Business logic processing"""
    # Your business code
    return {"message": "Processing completed", "data": data}

1.2.3 Lambda Lifecycle

🔄 正在渲染 Mermaid 图表...

1.2.4 Cold Start vs Warm Start

Cold Start occurs when:

  • First invocation of the function
  • Function not invoked for a long time (typically 15 minutes)
  • Concurrency increases requiring new instances
  • Function code or configuration updated
# Example showing cold start vs warm start
import time
import logging

# Global variable: initialized during cold start
logger = logging.getLogger()
logger.setLevel(logging.INFO)

# Cold start initialization code (executed once)
INIT_TIME = time.time()
logger.info(f"Cold Start: Initializing at {INIT_TIME}")

# Example: database connection (initialized during cold start)
db_connection = create_db_connection()

def lambda_handler(event, context):
    """
    This function executes on every invocation (cold and warm starts)
    """
    current_time = time.time()
    time_since_init = current_time - INIT_TIME

    logger.info(f"Handler invoked {time_since_init:.2f} seconds after init")

    # Reuse global database connection (avoid creating new connection each time)
    result = db_connection.query("SELECT * FROM users LIMIT 10")

    return {
        'statusCode': 200,
        'body': json.dumps({
            'message': 'Query successful',
            'init_time': INIT_TIME,
            'time_since_init': time_since_init,
            'is_likely_warm_start': time_since_init < 60
        })
    }

def create_db_connection():
    """Create database connection (cold start)"""
    logger.info("Creating database connection...")
    # Actual database connection logic
    return MockDatabaseConnection()

1.3 Serverless Use Cases and Limitations

1.3.1 Ideal Use Cases

# Use Case Analysis Tool
class ServerlessUseCaseEvaluator:
    """Serverless use case evaluator"""

    @staticmethod
    def evaluate_use_case(characteristics: dict) -> dict:
        """
        Evaluate if a use case is suitable for serverless

        Args:
            characteristics: Use case characteristics
                - request_pattern: Request pattern (event-driven, periodic, continuous)
                - execution_time: Execution time (seconds)
                - memory_requirement: Memory requirement (MB)
                - state_requirement: State requirement (stateless, stateful)
                - scalability_need: Scalability needs (low, medium, high)

        Returns:
            Evaluation result
        """
        score = 0
        reasons = []

        # 1. Request pattern evaluation
        if characteristics.get('request_pattern') == 'event-driven':
            score += 30
            reasons.append("✓ Event-driven pattern perfect for Lambda")
        elif characteristics.get('request_pattern') == 'periodic':
            score += 25
            reasons.append("✓ Periodic tasks can use CloudWatch Events")
        elif characteristics.get('request_pattern') == 'continuous':
            score -= 20
            reasons.append("✗ Continuous processing not suitable for Lambda")

        # 2. Execution time evaluation
        exec_time = characteristics.get('execution_time', 0)
        if exec_time <= 300:  # ≤5 minutes
            score += 25
            reasons.append(f"✓ Execution time {exec_time}s suitable for Lambda")
        elif exec_time <= 900:  # ≤15 minutes
            score += 15
            reasons.append(f"△ Execution time {exec_time}s acceptable but close to limit")
        else:
            score -= 30
            reasons.append(f"✗ Execution time {exec_time}s exceeds Lambda limit (15min)")

        # 3. Memory requirement evaluation
        memory = characteristics.get('memory_requirement', 0)
        if memory <= 3008:  # Lambda max memory
            score += 20
            reasons.append(f"✓ Memory requirement {memory}MB meets Lambda limits")
        else:
            score -= 25
            reasons.append(f"✗ Memory requirement {memory}MB exceeds Lambda limit (10GB)")

        # 4. State requirement evaluation
        if characteristics.get('state_requirement') == 'stateless':
            score += 25
            reasons.append("✓ Stateless application perfect for Lambda")
        else:
            score -= 15
            reasons.append("✗ Stateful applications need additional state management")

        # 5. Scalability needs evaluation
        scalability = characteristics.get('scalability_need')
        if scalability in ['medium', 'high']:
            score += 20
            reasons.append(f"✓ {scalability.title()} scalability needs perfect for Lambda")

        # Calculate recommendation
        if score >= 70:
            recommendation = "Highly Recommended"
        elif score >= 40:
            recommendation = "Recommended"
        elif score >= 0:
            recommendation = "Consider with Caution"
        else:
            recommendation = "Not Recommended"

        return {
            'score': score,
            'recommendation': recommendation,
            'reasons': reasons
        }

# Example usage
use_case_1 = {
    'request_pattern': 'event-driven',
    'execution_time': 30,
    'memory_requirement': 512,
    'state_requirement': 'stateless',
    'scalability_need': 'high'
}

evaluator = ServerlessUseCaseEvaluator()
result = evaluator.evaluate_use_case(use_case_1)

print(f"Score: {result['score']}")
print(f"Recommendation: {result['recommendation']}")
print("\nReasons:")
for reason in result['reasons']:
    print(f"  {reason}")

1.3.2 Typical Application Scenarios

1. Real-time File Processing

def lambda_handler(event, context):
    """
    Triggered when new image uploaded to S3
    Automatically generates thumbnail
    """
    import boto3
    from PIL import Image
    import io

    s3 = boto3.client('s3')

    # Get upload event information
    bucket = event['Records'][0]['s3']['bucket']['name']
    key = event['Records'][0]['s3']['object']['key']

    # Download original image
    response = s3.get_object(Bucket=bucket, Key=key)
    image_data = response['Body'].read()

    # Generate thumbnail
    image = Image.open(io.BytesIO(image_data))
    image.thumbnail((200, 200))

    # Save thumbnail
    buffer = io.BytesIO()
    image.save(buffer, format='JPEG')
    buffer.seek(0)

    # Upload to S3
    thumbnail_key = f"thumbnails/{key}"
    s3.put_object(
        Bucket=bucket,
        Key=thumbnail_key,
        Body=buffer,
        ContentType='image/jpeg'
    )

    return {
        'statusCode': 200,
        'body': f'Thumbnail created: {thumbnail_key}'
    }

2. API Backend

def lambda_handler(event, context):
    """
    RESTful API backend
    Handles user CRUD operations
    """
    import json
    import boto3

    dynamodb = boto3.resource('dynamodb')
    table = dynamodb.Table('Users')

    # Parse HTTP request
    http_method = event['httpMethod']
    path = event['path']
    body = json.loads(event.get('body', '{}'))

    # Route handling
    if http_method == 'GET' and path == '/users':
        # Get user list
        response = table.scan(Limit=50)
        return {
            'statusCode': 200,
            'headers': {'Content-Type': 'application/json'},
            'body': json.dumps(response['Items'])
        }

    elif http_method == 'POST' and path == '/users':
        # Create user
        table.put_item(Item=body)
        return {
            'statusCode': 201,
            'body': json.dumps({'message': 'User created successfully'})
        }

    elif http_method == 'GET' and '/users/' in path:
        # Get single user
        user_id = path.split('/')[-1]
        response = table.get_item(Key={'user_id': user_id})

        if 'Item' in response:
            return {
                'statusCode': 200,
                'body': json.dumps(response['Item'])
            }
        else:
            return {
                'statusCode': 404,
                'body': json.dumps({'error': 'User not found'})
            }

    else:
        return {
            'statusCode': 400,
            'body': json.dumps({'error': 'Invalid request'})
        }

3. Scheduled Tasks

def lambda_handler(event, context):
    """
    Runs daily at 2 AM
    Cleans up old database records
    """
    import boto3
    from datetime import datetime, timedelta

    dynamodb = boto3.resource('dynamodb')
    table = dynamodb.Table('Logs')

    # Calculate cutoff time (records older than 30 days)
    cutoff_date = datetime.now() - timedelta(days=30)
    cutoff_timestamp = int(cutoff_date.timestamp())

    # Scan old records
    response = table.scan(
        FilterExpression='#ts < :cutoff',
        ExpressionAttributeNames={'#ts': 'timestamp'},
        ExpressionAttributeValues={':cutoff': cutoff_timestamp}
    )

    # Delete old records
    deleted_count = 0
    with table.batch_writer() as batch:
        for item in response['Items']:
            batch.delete_item(Key={'log_id': item['log_id']})
            deleted_count += 1

    return {
        'statusCode': 200,
        'body': f'Cleaned up {deleted_count} old log entries'
    }

4. Data Stream Processing

def lambda_handler(event, context):
    """
    Process DynamoDB Stream events
    Real-time data synchronization and analytics
    """
    import boto3
    import json

    # Process each record
    for record in event['Records']:
        event_name = record['eventName']  # INSERT, MODIFY, REMOVE

        if event_name == 'INSERT':
            new_image = record['dynamodb']['NewImage']
            process_new_record(new_image)

        elif event_name == 'MODIFY':
            old_image = record['dynamodb']['OldImage']
            new_image = record['dynamodb']['NewImage']
            process_modified_record(old_image, new_image)

        elif event_name == 'REMOVE':
            old_image = record['dynamodb']['OldImage']
            process_deleted_record(old_image)

    return {
        'statusCode': 200,
        'body': f'Processed {len(event["Records"])} records'
    }

def process_new_record(record):
    """Process new records"""
    # For example: send to analytics system
    print(f"New record: {record}")

def process_modified_record(old_record, new_record):
    """Process modified records"""
    # For example: update cache
    print(f"Updated from {old_record} to {new_record}")

def process_deleted_record(record):
    """Process deleted records"""
    # For example: archive data
    print(f"Deleted record: {record}")

1.3.3 Limitations and Unsuitable Scenarios

# Lambda limitations configuration
LAMBDA_LIMITS = {
    'execution': {
        'timeout': {
            'max': 900,  # seconds (15 minutes)
            'default': 3
        },
        'memory': {
            'min': 128,  # MB
            'max': 10240,  # MB (10 GB)
            'increment': 1  # MB
        },
        'ephemeral_storage': {
            'min': 512,  # MB
            'max': 10240,  # MB (10 GB)
            'default': 512
        },
        'concurrent_executions': {
            'default_limit': 1000,  # per region
            'burst_limit': 3000  # initial burst
        }
    },
    'deployment': {
        'package_size': {
            'compressed': 50,  # MB
            'uncompressed': 250  # MB
        },
        'layer_size': {
            'all_layers': 250  # MB (uncompressed)
        },
        'environment_variables': {
            'total_size': 4  # KB
        }
    },
    'invocation': {
        'payload': {
            'synchronous': 6,  # MB
            'asynchronous': 256  # KB
        },
        'response': {
            'synchronous': 6  # MB
        }
    }
}

class UseCaseValidator:
    """Use case validator"""

    @staticmethod
    def validate_requirements(requirements: dict) -> dict:
        """
        Validate if requirements meet Lambda limits

        Args:
            requirements: Use case requirements

        Returns:
            Validation results
        """
        validation_results = {
            'is_valid': True,
            'violations': [],
            'warnings': []
        }

        # Check execution time
        if requirements.get('max_execution_time', 0) > LAMBDA_LIMITS['execution']['timeout']['max']:
            validation_results['is_valid'] = False
            validation_results['violations'].append(
                f"Execution time {requirements['max_execution_time']}s exceeds limit "
                f"({LAMBDA_LIMITS['execution']['timeout']['max']}s)"
            )

        # Check memory requirements
        if requirements.get('memory_mb', 0) > LAMBDA_LIMITS['execution']['memory']['max']:
            validation_results['is_valid'] = False
            validation_results['violations'].append(
                f"Memory requirement {requirements['memory_mb']}MB exceeds limit "
                f"({LAMBDA_LIMITS['execution']['memory']['max']}MB)"
            )

        # Check deployment package size
        if requirements.get('package_size_mb', 0) > LAMBDA_LIMITS['deployment']['package_size']['uncompressed']:
            validation_results['is_valid'] = False
            validation_results['violations'].append(
                f"Package size {requirements['package_size_mb']}MB exceeds limit "
                f"({LAMBDA_LIMITS['deployment']['package_size']['uncompressed']}MB)"
            )

        # Check payload size
        if requirements.get('payload_size_mb', 0) > LAMBDA_LIMITS['invocation']['payload']['synchronous']:
            validation_results['warnings'].append(
                f"Payload size {requirements['payload_size_mb']}MB may require asynchronous invocation"
            )

        return validation_results

Unsuitable Scenarios:

  1. Long-running Tasks (>15 minutes)

    • Video transcoding
    • Large-scale data processing
    • Machine learning model training
  2. Stateful Applications

    • WebSocket long connections
    • Real-time gaming servers
    • Session-dependent applications
  3. Highly Specialized Hardware Requirements

    • GPU-intensive computing
    • Custom hardware acceleration
    • Specific network card requirements
  4. Cost-sensitive Continuous Processing

    • 24/7 running services
    • High-throughput message processing
    • Continuous data stream analysis

1.4 Lambda Pricing Model

1.4.1 Pricing Components

class LambdaCostCalculator:
    """Lambda cost calculator"""

    # Pricing (as of 2024, US East region)
    PRICING = {
        'request': {
            'cost_per_million': 0.20  # $0.20 per 1 million requests
        },
        'duration': {
            'cost_per_gb_second': 0.0000166667  # $0.0000166667 per GB-second
        },
        'free_tier': {
            'requests_per_month': 1_000_000,  # 1 million requests
            'gb_seconds_per_month': 400_000  # 400,000 GB-seconds
        }
    }

    @classmethod
    def calculate_monthly_cost(cls, monthly_invocations: int,
                              avg_duration_ms: int,
                              memory_mb: int) -> dict:
        """
        Calculate monthly Lambda cost

        Args:
            monthly_invocations: Monthly invocation count
            avg_duration_ms: Average execution time (milliseconds)
            memory_mb: Configured memory (MB)

        Returns:
            Cost breakdown
        """
        # Convert to GB-seconds
        gb_seconds = (monthly_invocations * avg_duration_ms / 1000 *
                     memory_mb / 1024)

        # Request cost (after free tier)
        billable_requests = max(0, monthly_invocations -
                               cls.PRICING['free_tier']['requests_per_month'])
        request_cost = (billable_requests / 1_000_000 *
                       cls.PRICING['request']['cost_per_million'])

        # Duration cost (after free tier)
        billable_gb_seconds = max(0, gb_seconds -
                                 cls.PRICING['free_tier']['gb_seconds_per_month'])
        duration_cost = (billable_gb_seconds *
                        cls.PRICING['duration']['cost_per_gb_second'])

        total_cost = request_cost + duration_cost

        return {
            'monthly_invocations': monthly_invocations,
            'avg_duration_ms': avg_duration_ms,
            'memory_mb': memory_mb,
            'gb_seconds': gb_seconds,
            'costs': {
                'request_cost': round(request_cost, 4),
                'duration_cost': round(duration_cost, 4),
                'total': round(total_cost, 4)
            },
            'free_tier_savings': {
                'request_savings': round(min(monthly_invocations,
                    cls.PRICING['free_tier']['requests_per_month']) / 1_000_000 *
                    cls.PRICING['request']['cost_per_million'], 4),
                'duration_savings': round(min(gb_seconds,
                    cls.PRICING['free_tier']['gb_seconds_per_month']) *
                    cls.PRICING['duration']['cost_per_gb_second'], 4)
            }
        }

    @classmethod
    def compare_memory_configs(cls, monthly_invocations: int,
                              avg_duration_ms: int) -> list:
        """
        Compare costs for different memory configurations

        Returns:
            Cost comparison list
        """
        memory_options = [128, 256, 512, 1024, 2048, 3008]
        comparisons = []

        for memory in memory_options:
            cost = cls.calculate_monthly_cost(
                monthly_invocations,
                avg_duration_ms,
                memory
            )
            comparisons.append({
                'memory_mb': memory,
                'total_cost': cost['costs']['total'],
                'cost_per_invocation': cost['costs']['total'] / monthly_invocations
            })

        return sorted(comparisons, key=lambda x: x['total_cost'])

# Example usage
calculator = LambdaCostCalculator()

# Scenario: API backend
# 1 million invocations/month, average 100ms execution, 512MB memory
api_cost = calculator.calculate_monthly_cost(
    monthly_invocations=1_000_000,
    avg_duration_ms=100,
    memory_mb=512
)

print("API Backend Cost Analysis:")
print(f"Monthly invocations: {api_cost['monthly_invocations']:,}")
print(f"Average duration: {api_cost['avg_duration_ms']}ms")
print(f"Memory: {api_cost['memory_mb']}MB")
print(f"Total GB-seconds: {api_cost['gb_seconds']:,.2f}")
print(f"\nCost breakdown:")
print(f"  Request cost: ${api_cost['costs']['request_cost']}")
print(f"  Duration cost: ${api_cost['costs']['duration_cost']}")
print(f"  Total cost: ${api_cost['costs']['total']}")
print(f"\nFree tier savings:")
print(f"  Request savings: ${api_cost['free_tier_savings']['request_savings']}")
print(f"  Duration savings: ${api_cost['free_tier_savings']['duration_savings']}")

# Memory configuration comparison
print("\n\nMemory Configuration Cost Comparison:")
comparisons = calculator.compare_memory_configs(
    monthly_invocations=1_000_000,
    avg_duration_ms=100
)

for comp in comparisons:
    print(f"Memory: {comp['memory_mb']:4d}MB | "
          f"Total: ${comp['total_cost']:7.4f} | "
          f"Per invocation: ${comp['cost_per_invocation']:.8f}")

1.4.2 Cost Optimization Strategies

class CostOptimizationAdvisor:
    """Cost optimization advisor"""

    @staticmethod
    def analyze_cost_optimization(metrics: dict) -> list:
        """
        Analyze and provide cost optimization suggestions

        Args:
            metrics: Function metrics
                - avg_duration_ms: Average execution time
                - avg_memory_used_mb: Average memory used
                - allocated_memory_mb: Allocated memory
                - cold_start_rate: Cold start rate
                - error_rate: Error rate

        Returns:
            Optimization suggestions list
        """
        recommendations = []

        # 1. Memory over-allocation check
        memory_utilization = (metrics['avg_memory_used_mb'] /
                            metrics['allocated_memory_mb'])

        if memory_utilization < 0.6:
            recommended_memory = int(metrics['avg_memory_used_mb'] * 1.2)
            recommended_memory = max(128, min(recommended_memory, 10240))

            recommendations.append({
                'priority': 'HIGH',
                'category': 'Memory Optimization',
                'current': f"{metrics['allocated_memory_mb']}MB",
                'recommended': f"{recommended_memory}MB",
                'reason': f"Memory utilization only {memory_utilization:.1%}, "
                         f"suggesting over-allocation",
                'potential_savings': '20-40%'
            })

        # 2. Execution time optimization check
        if metrics['avg_duration_ms'] > 3000:
            recommendations.append({
                'priority': 'MEDIUM',
                'category': 'Performance Optimization',
                'issue': f"Average execution time {metrics['avg_duration_ms']}ms",
                'suggestions': [
                    "Check for optimization opportunities in code logic",
                    "Consider using parallelization",
                    "Review database query efficiency",
                    "Check if network calls can be optimized"
                ]
            })

        # 3. Cold start rate check
        if metrics.get('cold_start_rate', 0) > 0.1:  # >10%
            recommendations.append({
                'priority': 'MEDIUM',
                'category': 'Cold Start Optimization',
                'current_rate': f"{metrics['cold_start_rate']:.1%}",
                'suggestions': [
                    "Consider using Provisioned Concurrency",
                    "Optimize initialization code",
                    "Use Lambda SnapStart (Java)",
                    "Evaluate if warming strategy needed"
                ]
            })

        # 4. Error rate check
        if metrics.get('error_rate', 0) > 0.01:  # >1%
            recommendations.append({
                'priority': 'HIGH',
                'category': 'Reliability Improvement',
                'current_rate': f"{metrics['error_rate']:.2%}",
                'impact': "Errors result in wasted costs and retries",
                'suggestions': [
                    "Improve error handling",
                    "Add input validation",
                    "Implement circuit breakers",
                    "Check third-party service dependencies"
                ]
            })

        # 5. Batch processing recommendation
        if metrics.get('avg_batch_size', 1) == 1:
            recommendations.append({
                'priority': 'LOW',
                'category': 'Batch Processing',
                'suggestion': "Consider batch processing to reduce invocation count",
                'potential_benefit': "Can reduce request costs by 50-90%"
            })

        return recommendations

# Example: Cost optimization analysis
metrics = {
    'avg_duration_ms': 250,
    'avg_memory_used_mb': 180,
    'allocated_memory_mb': 512,
    'cold_start_rate': 0.15,
    'error_rate': 0.02
}

advisor = CostOptimizationAdvisor()
recommendations = advisor.analyze_cost_optimization(metrics)

print("Cost Optimization Recommendations:\n")
for i, rec in enumerate(recommendations, 1):
    print(f"{i}. [{rec['priority']}] {rec['category']}")
    for key, value in rec.items():
        if key not in ['priority', 'category']:
            if isinstance(value, list):
                print(f"   {key}:")
                for item in value:
                    print(f"     - {item}")
            else:
                print(f"   {key}: {value}")
    print()

1.5 Lambda and AWS Service Integration

1.5.1 Common Integration Services

🔄 正在渲染 Mermaid 图表...

1.5.2 Integration Example: Event-Driven Architecture

"""
Complete event-driven architecture example
Demonstrates how Lambda integrates with multiple AWS services
"""

# 1. API Gateway + Lambda: Handle HTTP requests
def api_handler(event, context):
    """Handle API Gateway requests"""
    import json
    import boto3

    # Parse request
    http_method = event['httpMethod']
    body = json.loads(event.get('body', '{}'))

    if http_method == 'POST':
        # Publish message to SNS
        sns = boto3.client('sns')
        sns.publish(
            TopicArn=os.environ['SNS_TOPIC_ARN'],
            Message=json.dumps(body),
            Subject='New Order'
        )

        return {
            'statusCode': 202,
            'body': json.dumps({'message': 'Order received'})
        }

# 2. SNS + Lambda: Async message processing
def sns_processor(event, context):
    """Process SNS messages"""
    import json
    import boto3

    # Parse SNS message
    for record in event['Records']:
        message = json.loads(record['Sns']['Message'])

        # Save to DynamoDB
        dynamodb = boto3.resource('dynamodb')
        table = dynamodb.Table(os.environ['ORDERS_TABLE'])

        table.put_item(Item={
            'order_id': message['order_id'],
            'customer_id': message['customer_id'],
            'items': message['items'],
            'status': 'pending',
            'created_at': record['Sns']['Timestamp']
        })

        # Send to SQS for further processing
        sqs = boto3.client('sqs')
        sqs.send_message(
            QueueUrl=os.environ['PROCESSING_QUEUE_URL'],
            MessageBody=json.dumps(message)
        )

# 3. SQS + Lambda: Queue message processing
def sqs_processor(event, context):
    """Process SQS queue messages"""
    import json
    import boto3

    for record in event['Records']:
        message = json.loads(record['body'])

        # Execute business logic
        result = process_order(message)

        # Update DynamoDB
        dynamodb = boto3.resource('dynamodb')
        table = dynamodb.Table(os.environ['ORDERS_TABLE'])

        table.update_item(
            Key={'order_id': message['order_id']},
            UpdateExpression='SET #status = :status, processed_at = :time',
            ExpressionAttributeNames={'#status': 'status'},
            ExpressionAttributeValues={
                ':status': 'processed',
                ':time': datetime.utcnow().isoformat()
            }
        )

        # Trigger completion notification
        eventbridge = boto3.client('events')
        eventbridge.put_events(
            Entries=[{
                'Source': 'order.system',
                'DetailType': 'Order Processed',
                'Detail': json.dumps(result)
            }]
        )

# 4. EventBridge + Lambda: Event routing
def event_router(event, context):
    """Route events based on rules"""
    import json

    detail = event['detail']
    detail_type = event['detail-type']

    if detail_type == 'Order Processed':
        # Send email notification
        send_notification(detail)

        # Update analytics
        update_analytics(detail)

        # Trigger downstream system
        trigger_shipping_system(detail)

# 5. S3 + Lambda: File processing
def s3_processor(event, context):
    """Process S3 uploaded files"""
    import boto3

    s3 = boto3.client('s3')

    for record in event['Records']:
        bucket = record['s3']['bucket']['name']
        key = record['s3']['object']['key']

        # Download file
        response = s3.get_object(Bucket=bucket, Key=key)
        content = response['Body'].read()

        # Process file
        processed_content = process_file_content(content)

        # Save results
        output_key = f"processed/{key}"
        s3.put_object(
            Bucket=bucket,
            Key=output_key,
            Body=processed_content
        )

        # Record to DynamoDB
        dynamodb = boto3.resource('dynamodb')
        table = dynamodb.Table(os.environ['FILES_TABLE'])

        table.put_item(Item={
            'file_id': key,
            'status': 'processed',
            'output_location': output_key,
            'processed_at': datetime.utcnow().isoformat()
        })

# 6. DynamoDB Streams + Lambda: Data change capture
def dynamodb_stream_processor(event, context):
    """Process DynamoDB stream events"""
    import json

    for record in event['Records']:
        if record['eventName'] == 'INSERT':
            # New record inserted
            new_image = record['dynamodb']['NewImage']
            handle_new_record(new_image)

        elif record['eventName'] == 'MODIFY':
            # Record modified
            old_image = record['dynamodb']['OldImage']
            new_image = record['dynamodb']['NewImage']
            handle_record_change(old_image, new_image)

        elif record['eventName'] == 'REMOVE':
            # Record deleted
            old_image = record['dynamodb']['OldImage']
            handle_record_deletion(old_image)

# Helper functions
def process_order(order_data):
    """Process order business logic"""
    # Actual business logic implementation
    return {'order_id': order_data['order_id'], 'status': 'completed'}

def send_notification(data):
    """Send notification"""
    pass

def update_analytics(data):
    """Update analytics data"""
    pass

def trigger_shipping_system(data):
    """Trigger shipping system"""
    pass

def process_file_content(content):
    """Process file content"""
    return content

def handle_new_record(record):
    """Handle new record"""
    pass

def handle_record_change(old, new):
    """Handle record changes"""
    pass

def handle_record_deletion(record):
    """Handle record deletion"""
    pass

1.6 Chapter Summary

Key Takeaways

In this chapter, we systematically learned about:

Core Concepts:

  • Serverless doesn’t mean no servers, but rather developers don’t manage servers
  • Lambda is a Function-as-a-Service (FaaS) platform
  • Event-driven, auto-scaling, pay-per-use are core characteristics

Lambda Advantages:

  • No server management, focus on business logic
  • Auto-scaling, handles any scale
  • Pay-per-use, save idle costs
  • Rich service integration, rapid development

Suitable Scenarios:

  • Event-driven applications
  • Short-duration tasks (<15 minutes)
  • Stateless services
  • Applications requiring elastic scaling

Cost Model:

  • Request count + execution time billing
  • Free tier generous
  • Multiple optimization strategies available

Service Integration:

  • Integrates with 200+ AWS services
  • Build complex event-driven architectures
  • Realize fully serverless solutions

In the next chapter, we’ll get hands-on and create our first Lambda function to experience the complete serverless development process.

Further Reading