Chapter 1: Introduction to Lambda and Serverless Computing
Chapter 1: Introduction to Lambda and Serverless Computing
This chapter will take you from the basics to understand AWS Lambda and serverless computing. We’ll explore the evolution of serverless architecture, core concepts, advantages and limitations, and the AWS Lambda service model. Through this chapter, you’ll establish a comprehensive understanding of serverless computing.
Learning Objectives
- Understand the evolution and core concepts of serverless architecture
- Master the basic principles and service model of AWS Lambda
- Learn when to use and when not to use serverless architecture
- Understand Lambda’s pricing model and cost optimization strategies
- Familiarize yourself with Lambda’s relationship with other AWS services
1.1 What is Serverless Computing?
1.1.1 The Evolution of Cloud Computing
Cloud computing has undergone several major evolutionary stages:
- Physical Server Era: Direct hardware management, high costs, low flexibility
- Virtual Machine (IaaS) Era: Resource virtualization, improved utilization, but still need OS management
- Container (PaaS) Era: Application-level isolation, improved deployment efficiency
- Serverless (FaaS/BaaS) Era: Focus on business logic, infrastructure fully managed
1.1.2 Definition of Serverless
Serverless doesn’t mean there are no servers, but rather developers no longer need to manage servers. The cloud provider is responsible for:
- Server provisioning and management
- Operating system updates and patching
- Capacity planning and auto-scaling
- High availability and fault tolerance
- Monitoring and logging
Developers only need to focus on writing business code.
1.1.3 Core Characteristics of Serverless
// Serverless core characteristics
interface ServerlessCharacteristics {
// 1. No server management
noServerManagement: true;
// 2. Auto-scaling
autoScaling: {
min: 0, // Can scale to zero
max: Infinity, // Theoretically unlimited scaling
scalingSpeed: "seconds" // Second-level scaling
};
// 3. Pay-per-use
pricing: {
model: "pay-as-you-go",
granularity: "milliseconds", // Billed by millisecond
noIdleCost: true // No cost when not running
};
// 4. Event-driven
trigger: "event-driven";
// 5. Stateless execution
stateless: true;
// 6. High availability built-in
highAvailability: "managed-by-provider";
}
1.2 Introduction to AWS Lambda
1.2.1 What is AWS Lambda?
AWS Lambda is Amazon Web Services’ serverless computing service that allows you to run code without provisioning or managing servers.
Core Capabilities:
- Code Execution: Upload your code, Lambda handles all the rest
- Auto-Scaling: Automatically scales based on the number of requests
- Flexible Integration: Integrates with over 200 AWS services
- Multiple Languages: Supports Python, Node.js, Java, Go, .NET, Ruby, etc.
- Fine-grained Billing: Billed by request count and execution time
1.2.2 Lambda Execution Model
# Lambda execution model example
def lambda_handler(event, context):
"""
Lambda function main entry point
Args:
event: Event data (dict) - Contains information that triggered the function
context: Context object - Contains runtime information
Returns:
Response data (can be dict, string, number, etc.)
"""
# 1. Parse event data
request_data = parse_event(event)
# 2. Execute business logic
result = process_business_logic(request_data)
# 3. Return result
return {
'statusCode': 200,
'body': json.dumps(result)
}
def parse_event(event):
"""Parse event data"""
# Different event sources have different event structure
if 'body' in event: # API Gateway event
return json.loads(event['body'])
elif 'Records' in event: # S3 or DynamoDB Stream event
return event['Records']
else:
return event
def process_business_logic(data):
"""Business logic processing"""
# Your business code
return {"message": "Processing completed", "data": data}
1.2.3 Lambda Lifecycle
1.2.4 Cold Start vs Warm Start
Cold Start occurs when:
- First invocation of the function
- Function not invoked for a long time (typically 15 minutes)
- Concurrency increases requiring new instances
- Function code or configuration updated
# Example showing cold start vs warm start
import time
import logging
# Global variable: initialized during cold start
logger = logging.getLogger()
logger.setLevel(logging.INFO)
# Cold start initialization code (executed once)
INIT_TIME = time.time()
logger.info(f"Cold Start: Initializing at {INIT_TIME}")
# Example: database connection (initialized during cold start)
db_connection = create_db_connection()
def lambda_handler(event, context):
"""
This function executes on every invocation (cold and warm starts)
"""
current_time = time.time()
time_since_init = current_time - INIT_TIME
logger.info(f"Handler invoked {time_since_init:.2f} seconds after init")
# Reuse global database connection (avoid creating new connection each time)
result = db_connection.query("SELECT * FROM users LIMIT 10")
return {
'statusCode': 200,
'body': json.dumps({
'message': 'Query successful',
'init_time': INIT_TIME,
'time_since_init': time_since_init,
'is_likely_warm_start': time_since_init < 60
})
}
def create_db_connection():
"""Create database connection (cold start)"""
logger.info("Creating database connection...")
# Actual database connection logic
return MockDatabaseConnection()
1.3 Serverless Use Cases and Limitations
1.3.1 Ideal Use Cases
# Use Case Analysis Tool
class ServerlessUseCaseEvaluator:
"""Serverless use case evaluator"""
@staticmethod
def evaluate_use_case(characteristics: dict) -> dict:
"""
Evaluate if a use case is suitable for serverless
Args:
characteristics: Use case characteristics
- request_pattern: Request pattern (event-driven, periodic, continuous)
- execution_time: Execution time (seconds)
- memory_requirement: Memory requirement (MB)
- state_requirement: State requirement (stateless, stateful)
- scalability_need: Scalability needs (low, medium, high)
Returns:
Evaluation result
"""
score = 0
reasons = []
# 1. Request pattern evaluation
if characteristics.get('request_pattern') == 'event-driven':
score += 30
reasons.append("✓ Event-driven pattern perfect for Lambda")
elif characteristics.get('request_pattern') == 'periodic':
score += 25
reasons.append("✓ Periodic tasks can use CloudWatch Events")
elif characteristics.get('request_pattern') == 'continuous':
score -= 20
reasons.append("✗ Continuous processing not suitable for Lambda")
# 2. Execution time evaluation
exec_time = characteristics.get('execution_time', 0)
if exec_time <= 300: # ≤5 minutes
score += 25
reasons.append(f"✓ Execution time {exec_time}s suitable for Lambda")
elif exec_time <= 900: # ≤15 minutes
score += 15
reasons.append(f"△ Execution time {exec_time}s acceptable but close to limit")
else:
score -= 30
reasons.append(f"✗ Execution time {exec_time}s exceeds Lambda limit (15min)")
# 3. Memory requirement evaluation
memory = characteristics.get('memory_requirement', 0)
if memory <= 3008: # Lambda max memory
score += 20
reasons.append(f"✓ Memory requirement {memory}MB meets Lambda limits")
else:
score -= 25
reasons.append(f"✗ Memory requirement {memory}MB exceeds Lambda limit (10GB)")
# 4. State requirement evaluation
if characteristics.get('state_requirement') == 'stateless':
score += 25
reasons.append("✓ Stateless application perfect for Lambda")
else:
score -= 15
reasons.append("✗ Stateful applications need additional state management")
# 5. Scalability needs evaluation
scalability = characteristics.get('scalability_need')
if scalability in ['medium', 'high']:
score += 20
reasons.append(f"✓ {scalability.title()} scalability needs perfect for Lambda")
# Calculate recommendation
if score >= 70:
recommendation = "Highly Recommended"
elif score >= 40:
recommendation = "Recommended"
elif score >= 0:
recommendation = "Consider with Caution"
else:
recommendation = "Not Recommended"
return {
'score': score,
'recommendation': recommendation,
'reasons': reasons
}
# Example usage
use_case_1 = {
'request_pattern': 'event-driven',
'execution_time': 30,
'memory_requirement': 512,
'state_requirement': 'stateless',
'scalability_need': 'high'
}
evaluator = ServerlessUseCaseEvaluator()
result = evaluator.evaluate_use_case(use_case_1)
print(f"Score: {result['score']}")
print(f"Recommendation: {result['recommendation']}")
print("\nReasons:")
for reason in result['reasons']:
print(f" {reason}")
1.3.2 Typical Application Scenarios
1. Real-time File Processing
def lambda_handler(event, context):
"""
Triggered when new image uploaded to S3
Automatically generates thumbnail
"""
import boto3
from PIL import Image
import io
s3 = boto3.client('s3')
# Get upload event information
bucket = event['Records'][0]['s3']['bucket']['name']
key = event['Records'][0]['s3']['object']['key']
# Download original image
response = s3.get_object(Bucket=bucket, Key=key)
image_data = response['Body'].read()
# Generate thumbnail
image = Image.open(io.BytesIO(image_data))
image.thumbnail((200, 200))
# Save thumbnail
buffer = io.BytesIO()
image.save(buffer, format='JPEG')
buffer.seek(0)
# Upload to S3
thumbnail_key = f"thumbnails/{key}"
s3.put_object(
Bucket=bucket,
Key=thumbnail_key,
Body=buffer,
ContentType='image/jpeg'
)
return {
'statusCode': 200,
'body': f'Thumbnail created: {thumbnail_key}'
}
2. API Backend
def lambda_handler(event, context):
"""
RESTful API backend
Handles user CRUD operations
"""
import json
import boto3
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('Users')
# Parse HTTP request
http_method = event['httpMethod']
path = event['path']
body = json.loads(event.get('body', '{}'))
# Route handling
if http_method == 'GET' and path == '/users':
# Get user list
response = table.scan(Limit=50)
return {
'statusCode': 200,
'headers': {'Content-Type': 'application/json'},
'body': json.dumps(response['Items'])
}
elif http_method == 'POST' and path == '/users':
# Create user
table.put_item(Item=body)
return {
'statusCode': 201,
'body': json.dumps({'message': 'User created successfully'})
}
elif http_method == 'GET' and '/users/' in path:
# Get single user
user_id = path.split('/')[-1]
response = table.get_item(Key={'user_id': user_id})
if 'Item' in response:
return {
'statusCode': 200,
'body': json.dumps(response['Item'])
}
else:
return {
'statusCode': 404,
'body': json.dumps({'error': 'User not found'})
}
else:
return {
'statusCode': 400,
'body': json.dumps({'error': 'Invalid request'})
}
3. Scheduled Tasks
def lambda_handler(event, context):
"""
Runs daily at 2 AM
Cleans up old database records
"""
import boto3
from datetime import datetime, timedelta
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('Logs')
# Calculate cutoff time (records older than 30 days)
cutoff_date = datetime.now() - timedelta(days=30)
cutoff_timestamp = int(cutoff_date.timestamp())
# Scan old records
response = table.scan(
FilterExpression='#ts < :cutoff',
ExpressionAttributeNames={'#ts': 'timestamp'},
ExpressionAttributeValues={':cutoff': cutoff_timestamp}
)
# Delete old records
deleted_count = 0
with table.batch_writer() as batch:
for item in response['Items']:
batch.delete_item(Key={'log_id': item['log_id']})
deleted_count += 1
return {
'statusCode': 200,
'body': f'Cleaned up {deleted_count} old log entries'
}
4. Data Stream Processing
def lambda_handler(event, context):
"""
Process DynamoDB Stream events
Real-time data synchronization and analytics
"""
import boto3
import json
# Process each record
for record in event['Records']:
event_name = record['eventName'] # INSERT, MODIFY, REMOVE
if event_name == 'INSERT':
new_image = record['dynamodb']['NewImage']
process_new_record(new_image)
elif event_name == 'MODIFY':
old_image = record['dynamodb']['OldImage']
new_image = record['dynamodb']['NewImage']
process_modified_record(old_image, new_image)
elif event_name == 'REMOVE':
old_image = record['dynamodb']['OldImage']
process_deleted_record(old_image)
return {
'statusCode': 200,
'body': f'Processed {len(event["Records"])} records'
}
def process_new_record(record):
"""Process new records"""
# For example: send to analytics system
print(f"New record: {record}")
def process_modified_record(old_record, new_record):
"""Process modified records"""
# For example: update cache
print(f"Updated from {old_record} to {new_record}")
def process_deleted_record(record):
"""Process deleted records"""
# For example: archive data
print(f"Deleted record: {record}")
1.3.3 Limitations and Unsuitable Scenarios
# Lambda limitations configuration
LAMBDA_LIMITS = {
'execution': {
'timeout': {
'max': 900, # seconds (15 minutes)
'default': 3
},
'memory': {
'min': 128, # MB
'max': 10240, # MB (10 GB)
'increment': 1 # MB
},
'ephemeral_storage': {
'min': 512, # MB
'max': 10240, # MB (10 GB)
'default': 512
},
'concurrent_executions': {
'default_limit': 1000, # per region
'burst_limit': 3000 # initial burst
}
},
'deployment': {
'package_size': {
'compressed': 50, # MB
'uncompressed': 250 # MB
},
'layer_size': {
'all_layers': 250 # MB (uncompressed)
},
'environment_variables': {
'total_size': 4 # KB
}
},
'invocation': {
'payload': {
'synchronous': 6, # MB
'asynchronous': 256 # KB
},
'response': {
'synchronous': 6 # MB
}
}
}
class UseCaseValidator:
"""Use case validator"""
@staticmethod
def validate_requirements(requirements: dict) -> dict:
"""
Validate if requirements meet Lambda limits
Args:
requirements: Use case requirements
Returns:
Validation results
"""
validation_results = {
'is_valid': True,
'violations': [],
'warnings': []
}
# Check execution time
if requirements.get('max_execution_time', 0) > LAMBDA_LIMITS['execution']['timeout']['max']:
validation_results['is_valid'] = False
validation_results['violations'].append(
f"Execution time {requirements['max_execution_time']}s exceeds limit "
f"({LAMBDA_LIMITS['execution']['timeout']['max']}s)"
)
# Check memory requirements
if requirements.get('memory_mb', 0) > LAMBDA_LIMITS['execution']['memory']['max']:
validation_results['is_valid'] = False
validation_results['violations'].append(
f"Memory requirement {requirements['memory_mb']}MB exceeds limit "
f"({LAMBDA_LIMITS['execution']['memory']['max']}MB)"
)
# Check deployment package size
if requirements.get('package_size_mb', 0) > LAMBDA_LIMITS['deployment']['package_size']['uncompressed']:
validation_results['is_valid'] = False
validation_results['violations'].append(
f"Package size {requirements['package_size_mb']}MB exceeds limit "
f"({LAMBDA_LIMITS['deployment']['package_size']['uncompressed']}MB)"
)
# Check payload size
if requirements.get('payload_size_mb', 0) > LAMBDA_LIMITS['invocation']['payload']['synchronous']:
validation_results['warnings'].append(
f"Payload size {requirements['payload_size_mb']}MB may require asynchronous invocation"
)
return validation_results
Unsuitable Scenarios:
-
Long-running Tasks (>15 minutes)
- Video transcoding
- Large-scale data processing
- Machine learning model training
-
Stateful Applications
- WebSocket long connections
- Real-time gaming servers
- Session-dependent applications
-
Highly Specialized Hardware Requirements
- GPU-intensive computing
- Custom hardware acceleration
- Specific network card requirements
-
Cost-sensitive Continuous Processing
- 24/7 running services
- High-throughput message processing
- Continuous data stream analysis
1.4 Lambda Pricing Model
1.4.1 Pricing Components
class LambdaCostCalculator:
"""Lambda cost calculator"""
# Pricing (as of 2024, US East region)
PRICING = {
'request': {
'cost_per_million': 0.20 # $0.20 per 1 million requests
},
'duration': {
'cost_per_gb_second': 0.0000166667 # $0.0000166667 per GB-second
},
'free_tier': {
'requests_per_month': 1_000_000, # 1 million requests
'gb_seconds_per_month': 400_000 # 400,000 GB-seconds
}
}
@classmethod
def calculate_monthly_cost(cls, monthly_invocations: int,
avg_duration_ms: int,
memory_mb: int) -> dict:
"""
Calculate monthly Lambda cost
Args:
monthly_invocations: Monthly invocation count
avg_duration_ms: Average execution time (milliseconds)
memory_mb: Configured memory (MB)
Returns:
Cost breakdown
"""
# Convert to GB-seconds
gb_seconds = (monthly_invocations * avg_duration_ms / 1000 *
memory_mb / 1024)
# Request cost (after free tier)
billable_requests = max(0, monthly_invocations -
cls.PRICING['free_tier']['requests_per_month'])
request_cost = (billable_requests / 1_000_000 *
cls.PRICING['request']['cost_per_million'])
# Duration cost (after free tier)
billable_gb_seconds = max(0, gb_seconds -
cls.PRICING['free_tier']['gb_seconds_per_month'])
duration_cost = (billable_gb_seconds *
cls.PRICING['duration']['cost_per_gb_second'])
total_cost = request_cost + duration_cost
return {
'monthly_invocations': monthly_invocations,
'avg_duration_ms': avg_duration_ms,
'memory_mb': memory_mb,
'gb_seconds': gb_seconds,
'costs': {
'request_cost': round(request_cost, 4),
'duration_cost': round(duration_cost, 4),
'total': round(total_cost, 4)
},
'free_tier_savings': {
'request_savings': round(min(monthly_invocations,
cls.PRICING['free_tier']['requests_per_month']) / 1_000_000 *
cls.PRICING['request']['cost_per_million'], 4),
'duration_savings': round(min(gb_seconds,
cls.PRICING['free_tier']['gb_seconds_per_month']) *
cls.PRICING['duration']['cost_per_gb_second'], 4)
}
}
@classmethod
def compare_memory_configs(cls, monthly_invocations: int,
avg_duration_ms: int) -> list:
"""
Compare costs for different memory configurations
Returns:
Cost comparison list
"""
memory_options = [128, 256, 512, 1024, 2048, 3008]
comparisons = []
for memory in memory_options:
cost = cls.calculate_monthly_cost(
monthly_invocations,
avg_duration_ms,
memory
)
comparisons.append({
'memory_mb': memory,
'total_cost': cost['costs']['total'],
'cost_per_invocation': cost['costs']['total'] / monthly_invocations
})
return sorted(comparisons, key=lambda x: x['total_cost'])
# Example usage
calculator = LambdaCostCalculator()
# Scenario: API backend
# 1 million invocations/month, average 100ms execution, 512MB memory
api_cost = calculator.calculate_monthly_cost(
monthly_invocations=1_000_000,
avg_duration_ms=100,
memory_mb=512
)
print("API Backend Cost Analysis:")
print(f"Monthly invocations: {api_cost['monthly_invocations']:,}")
print(f"Average duration: {api_cost['avg_duration_ms']}ms")
print(f"Memory: {api_cost['memory_mb']}MB")
print(f"Total GB-seconds: {api_cost['gb_seconds']:,.2f}")
print(f"\nCost breakdown:")
print(f" Request cost: ${api_cost['costs']['request_cost']}")
print(f" Duration cost: ${api_cost['costs']['duration_cost']}")
print(f" Total cost: ${api_cost['costs']['total']}")
print(f"\nFree tier savings:")
print(f" Request savings: ${api_cost['free_tier_savings']['request_savings']}")
print(f" Duration savings: ${api_cost['free_tier_savings']['duration_savings']}")
# Memory configuration comparison
print("\n\nMemory Configuration Cost Comparison:")
comparisons = calculator.compare_memory_configs(
monthly_invocations=1_000_000,
avg_duration_ms=100
)
for comp in comparisons:
print(f"Memory: {comp['memory_mb']:4d}MB | "
f"Total: ${comp['total_cost']:7.4f} | "
f"Per invocation: ${comp['cost_per_invocation']:.8f}")
1.4.2 Cost Optimization Strategies
class CostOptimizationAdvisor:
"""Cost optimization advisor"""
@staticmethod
def analyze_cost_optimization(metrics: dict) -> list:
"""
Analyze and provide cost optimization suggestions
Args:
metrics: Function metrics
- avg_duration_ms: Average execution time
- avg_memory_used_mb: Average memory used
- allocated_memory_mb: Allocated memory
- cold_start_rate: Cold start rate
- error_rate: Error rate
Returns:
Optimization suggestions list
"""
recommendations = []
# 1. Memory over-allocation check
memory_utilization = (metrics['avg_memory_used_mb'] /
metrics['allocated_memory_mb'])
if memory_utilization < 0.6:
recommended_memory = int(metrics['avg_memory_used_mb'] * 1.2)
recommended_memory = max(128, min(recommended_memory, 10240))
recommendations.append({
'priority': 'HIGH',
'category': 'Memory Optimization',
'current': f"{metrics['allocated_memory_mb']}MB",
'recommended': f"{recommended_memory}MB",
'reason': f"Memory utilization only {memory_utilization:.1%}, "
f"suggesting over-allocation",
'potential_savings': '20-40%'
})
# 2. Execution time optimization check
if metrics['avg_duration_ms'] > 3000:
recommendations.append({
'priority': 'MEDIUM',
'category': 'Performance Optimization',
'issue': f"Average execution time {metrics['avg_duration_ms']}ms",
'suggestions': [
"Check for optimization opportunities in code logic",
"Consider using parallelization",
"Review database query efficiency",
"Check if network calls can be optimized"
]
})
# 3. Cold start rate check
if metrics.get('cold_start_rate', 0) > 0.1: # >10%
recommendations.append({
'priority': 'MEDIUM',
'category': 'Cold Start Optimization',
'current_rate': f"{metrics['cold_start_rate']:.1%}",
'suggestions': [
"Consider using Provisioned Concurrency",
"Optimize initialization code",
"Use Lambda SnapStart (Java)",
"Evaluate if warming strategy needed"
]
})
# 4. Error rate check
if metrics.get('error_rate', 0) > 0.01: # >1%
recommendations.append({
'priority': 'HIGH',
'category': 'Reliability Improvement',
'current_rate': f"{metrics['error_rate']:.2%}",
'impact': "Errors result in wasted costs and retries",
'suggestions': [
"Improve error handling",
"Add input validation",
"Implement circuit breakers",
"Check third-party service dependencies"
]
})
# 5. Batch processing recommendation
if metrics.get('avg_batch_size', 1) == 1:
recommendations.append({
'priority': 'LOW',
'category': 'Batch Processing',
'suggestion': "Consider batch processing to reduce invocation count",
'potential_benefit': "Can reduce request costs by 50-90%"
})
return recommendations
# Example: Cost optimization analysis
metrics = {
'avg_duration_ms': 250,
'avg_memory_used_mb': 180,
'allocated_memory_mb': 512,
'cold_start_rate': 0.15,
'error_rate': 0.02
}
advisor = CostOptimizationAdvisor()
recommendations = advisor.analyze_cost_optimization(metrics)
print("Cost Optimization Recommendations:\n")
for i, rec in enumerate(recommendations, 1):
print(f"{i}. [{rec['priority']}] {rec['category']}")
for key, value in rec.items():
if key not in ['priority', 'category']:
if isinstance(value, list):
print(f" {key}:")
for item in value:
print(f" - {item}")
else:
print(f" {key}: {value}")
print()
1.5 Lambda and AWS Service Integration
1.5.1 Common Integration Services
1.5.2 Integration Example: Event-Driven Architecture
"""
Complete event-driven architecture example
Demonstrates how Lambda integrates with multiple AWS services
"""
# 1. API Gateway + Lambda: Handle HTTP requests
def api_handler(event, context):
"""Handle API Gateway requests"""
import json
import boto3
# Parse request
http_method = event['httpMethod']
body = json.loads(event.get('body', '{}'))
if http_method == 'POST':
# Publish message to SNS
sns = boto3.client('sns')
sns.publish(
TopicArn=os.environ['SNS_TOPIC_ARN'],
Message=json.dumps(body),
Subject='New Order'
)
return {
'statusCode': 202,
'body': json.dumps({'message': 'Order received'})
}
# 2. SNS + Lambda: Async message processing
def sns_processor(event, context):
"""Process SNS messages"""
import json
import boto3
# Parse SNS message
for record in event['Records']:
message = json.loads(record['Sns']['Message'])
# Save to DynamoDB
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table(os.environ['ORDERS_TABLE'])
table.put_item(Item={
'order_id': message['order_id'],
'customer_id': message['customer_id'],
'items': message['items'],
'status': 'pending',
'created_at': record['Sns']['Timestamp']
})
# Send to SQS for further processing
sqs = boto3.client('sqs')
sqs.send_message(
QueueUrl=os.environ['PROCESSING_QUEUE_URL'],
MessageBody=json.dumps(message)
)
# 3. SQS + Lambda: Queue message processing
def sqs_processor(event, context):
"""Process SQS queue messages"""
import json
import boto3
for record in event['Records']:
message = json.loads(record['body'])
# Execute business logic
result = process_order(message)
# Update DynamoDB
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table(os.environ['ORDERS_TABLE'])
table.update_item(
Key={'order_id': message['order_id']},
UpdateExpression='SET #status = :status, processed_at = :time',
ExpressionAttributeNames={'#status': 'status'},
ExpressionAttributeValues={
':status': 'processed',
':time': datetime.utcnow().isoformat()
}
)
# Trigger completion notification
eventbridge = boto3.client('events')
eventbridge.put_events(
Entries=[{
'Source': 'order.system',
'DetailType': 'Order Processed',
'Detail': json.dumps(result)
}]
)
# 4. EventBridge + Lambda: Event routing
def event_router(event, context):
"""Route events based on rules"""
import json
detail = event['detail']
detail_type = event['detail-type']
if detail_type == 'Order Processed':
# Send email notification
send_notification(detail)
# Update analytics
update_analytics(detail)
# Trigger downstream system
trigger_shipping_system(detail)
# 5. S3 + Lambda: File processing
def s3_processor(event, context):
"""Process S3 uploaded files"""
import boto3
s3 = boto3.client('s3')
for record in event['Records']:
bucket = record['s3']['bucket']['name']
key = record['s3']['object']['key']
# Download file
response = s3.get_object(Bucket=bucket, Key=key)
content = response['Body'].read()
# Process file
processed_content = process_file_content(content)
# Save results
output_key = f"processed/{key}"
s3.put_object(
Bucket=bucket,
Key=output_key,
Body=processed_content
)
# Record to DynamoDB
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table(os.environ['FILES_TABLE'])
table.put_item(Item={
'file_id': key,
'status': 'processed',
'output_location': output_key,
'processed_at': datetime.utcnow().isoformat()
})
# 6. DynamoDB Streams + Lambda: Data change capture
def dynamodb_stream_processor(event, context):
"""Process DynamoDB stream events"""
import json
for record in event['Records']:
if record['eventName'] == 'INSERT':
# New record inserted
new_image = record['dynamodb']['NewImage']
handle_new_record(new_image)
elif record['eventName'] == 'MODIFY':
# Record modified
old_image = record['dynamodb']['OldImage']
new_image = record['dynamodb']['NewImage']
handle_record_change(old_image, new_image)
elif record['eventName'] == 'REMOVE':
# Record deleted
old_image = record['dynamodb']['OldImage']
handle_record_deletion(old_image)
# Helper functions
def process_order(order_data):
"""Process order business logic"""
# Actual business logic implementation
return {'order_id': order_data['order_id'], 'status': 'completed'}
def send_notification(data):
"""Send notification"""
pass
def update_analytics(data):
"""Update analytics data"""
pass
def trigger_shipping_system(data):
"""Trigger shipping system"""
pass
def process_file_content(content):
"""Process file content"""
return content
def handle_new_record(record):
"""Handle new record"""
pass
def handle_record_change(old, new):
"""Handle record changes"""
pass
def handle_record_deletion(record):
"""Handle record deletion"""
pass
1.6 Chapter Summary
In this chapter, we systematically learned about:
Core Concepts:
- Serverless doesn’t mean no servers, but rather developers don’t manage servers
- Lambda is a Function-as-a-Service (FaaS) platform
- Event-driven, auto-scaling, pay-per-use are core characteristics
Lambda Advantages:
- No server management, focus on business logic
- Auto-scaling, handles any scale
- Pay-per-use, save idle costs
- Rich service integration, rapid development
Suitable Scenarios:
- Event-driven applications
- Short-duration tasks (<15 minutes)
- Stateless services
- Applications requiring elastic scaling
Cost Model:
- Request count + execution time billing
- Free tier generous
- Multiple optimization strategies available
Service Integration:
- Integrates with 200+ AWS services
- Build complex event-driven architectures
- Realize fully serverless solutions
In the next chapter, we’ll get hands-on and create our first Lambda function to experience the complete serverless development process.