Chapter 12: Lambda Security and Compliance

Haiyue
81min

Chapter 12: Lambda Security and Compliance

Chapter Overview

This chapter will explore in depth AWS Lambda’s security model, permission control, data encryption, security audits, and other critical security topics. We will learn how to build secure serverless applications, follow security best practices, and meet various compliance requirements.

Learning Objectives

  1. Understand Lambda’s security model and shared responsibility model
  2. Master fine-grained IAM permission control
  3. Learn to implement data encryption and key management
  4. Understand network security and VPC configuration
  5. Master security auditing and compliance checks
  6. Learn security incident response and threat detection

12.1 Lambda Security Model

12.1.1 Shared Responsibility Model

🔄 正在渲染 Mermaid 图表...

12.1.2 Security Checklist

# security/security_checker.py
import json
import boto3
import re
from typing import Dict, Any, List
from datetime import datetime, timedelta

class LambdaSecurityChecker:
    """Lambda Security Checker"""

    def __init__(self, region: str = 'us-east-1'):
        self.region = region
        self.lambda_client = boto3.client('lambda', region_name=region)
        self.iam_client = boto3.client('iam')
        self.kms_client = boto3.client('kms', region_name=region)
        self.cloudtrail_client = boto3.client('cloudtrail', region_name=region)

    def comprehensive_security_audit(self, function_name: str) -> Dict[str, Any]:
        """Comprehensive security audit"""
        audit_report = {
            'function_name': function_name,
            'audit_timestamp': datetime.utcnow().isoformat(),
            'security_score': 0,
            'findings': [],
            'recommendations': [],
            'checks_performed': []
        }

        try:
            # Get function configuration
            function_config = self._get_function_config(function_name)

            # Perform security checks
            checks = [
                self._check_iam_permissions,
                self._check_environment_variables,
                self._check_vpc_configuration,
                self._check_encryption_settings,
                self._check_function_configuration,
                self._check_code_security,
                self._check_logging_configuration,
                self._check_network_access
            ]

            total_score = 0
            max_score = len(checks) * 10

            for check in checks:
                check_result = check(function_name, function_config)
                audit_report['findings'].extend(check_result['findings'])
                audit_report['recommendations'].extend(check_result['recommendations'])
                audit_report['checks_performed'].append(check_result['check_name'])
                total_score += check_result['score']

            audit_report['security_score'] = int((total_score / max_score) * 100)

        except Exception as e:
            audit_report['findings'].append({
                'severity': 'HIGH',
                'category': 'AUDIT_ERROR',
                'message': f"Failed to perform security audit: {str(e)}"
            })

        return audit_report

    def _get_function_config(self, function_name: str) -> Dict[str, Any]:
        """Get function configuration"""
        response = self.lambda_client.get_function(FunctionName=function_name)
        return response['Configuration']

    def _check_iam_permissions(self, function_name: str, config: Dict[str, Any]) -> Dict[str, Any]:
        """Check IAM permission configuration"""
        findings = []
        recommendations = []
        score = 10

        try:
            role_arn = config.get('Role')
            role_name = role_arn.split('/')[-1] if role_arn else None

            if role_name:
                # Get role policies
                policies = self._get_role_policies(role_name)

                # Check overprivileged actions
                overprivileged_actions = self._check_overprivileged_actions(policies)
                if overprivileged_actions:
                    findings.append({
                        'severity': 'HIGH',
                        'category': 'IAM_PERMISSIONS',
                        'message': f"Function has overprivileged permissions: {', '.join(overprivileged_actions)}",
                        'details': overprivileged_actions
                    })
                    recommendations.append({
                        'category': 'IAM',
                        'action': 'REDUCE_PERMISSIONS',
                        'description': 'Apply principle of least privilege to IAM role'
                    })
                    score -= 3

                # Check wildcard permissions
                wildcard_permissions = self._check_wildcard_permissions(policies)
                if wildcard_permissions:
                    findings.append({
                        'severity': 'MEDIUM',
                        'category': 'IAM_PERMISSIONS',
                        'message': f"Function uses wildcard permissions: {len(wildcard_permissions)} found",
                        'details': wildcard_permissions
                    })
                    recommendations.append({
                        'category': 'IAM',
                        'action': 'SPECIFY_RESOURCES',
                        'description': 'Replace wildcard resources with specific ARNs'
                    })
                    score -= 2

                # Check inline policies
                inline_policies = self._check_inline_policies(role_name)
                if inline_policies:
                    findings.append({
                        'severity': 'LOW',
                        'category': 'IAM_PERMISSIONS',
                        'message': f"Function role uses inline policies: {len(inline_policies)} found",
                        'details': inline_policies
                    })
                    recommendations.append({
                        'category': 'IAM',
                        'action': 'USE_MANAGED_POLICIES',
                        'description': 'Consider using managed policies instead of inline policies'
                    })
                    score -= 1

        except Exception as e:
            findings.append({
                'severity': 'MEDIUM',
                'category': 'IAM_CHECK_ERROR',
                'message': f"Failed to check IAM permissions: {str(e)}"
            })
            score -= 5

        return {
            'check_name': 'IAM Permissions Check',
            'findings': findings,
            'recommendations': recommendations,
            'score': max(0, score)
        }

    def _check_environment_variables(self, function_name: str, config: Dict[str, Any]) -> Dict[str, Any]:
        """Check environment variable security"""
        findings = []
        recommendations = []
        score = 10

        env_vars = config.get('Environment', {}).get('Variables', {})

        # Check for sensitive information
        sensitive_patterns = [
            (r'(?i)(password|pwd|secret|key|token)', 'Potential password/secret'),
            (r'(?i)(api[_-]?key)', 'API key'),
            (r'(?i)(access[_-]?key)', 'Access key'),
            (r'(?i)(private[_-]?key)', 'Private key'),
            (r'AKIA[0-9A-Z]{16}', 'AWS Access Key ID'),
            (r'[0-9a-zA-Z/+]{40}', 'Potential AWS Secret Key')
        ]

        for var_name, var_value in env_vars.items():
            for pattern, description in sensitive_patterns:
                if re.search(pattern, var_name) or re.search(pattern, str(var_value)):
                    findings.append({
                        'severity': 'HIGH',
                        'category': 'ENVIRONMENT_VARIABLES',
                        'message': f"Potential sensitive data in environment variable: {var_name}",
                        'details': {'variable': var_name, 'pattern': description}
                    })
                    recommendations.append({
                        'category': 'SECRETS_MANAGEMENT',
                        'action': 'USE_SECRETS_MANAGER',
                        'description': f'Move {var_name} to AWS Secrets Manager or Parameter Store'
                    })
                    score -= 2

        # Check environment variable encryption
        kms_key_arn = config.get('KMSKeyArn')
        if not kms_key_arn and env_vars:
            findings.append({
                'severity': 'MEDIUM',
                'category': 'ENCRYPTION',
                'message': 'Environment variables are not encrypted with customer-managed KMS key'
            })
            recommendations.append({
                'category': 'ENCRYPTION',
                'action': 'ENABLE_ENV_ENCRYPTION',
                'description': 'Enable environment variable encryption with KMS'
            })
            score -= 2

        return {
            'check_name': 'Environment Variables Check',
            'findings': findings,
            'recommendations': recommendations,
            'score': max(0, score)
        }

    def _check_vpc_configuration(self, function_name: str, config: Dict[str, Any]) -> Dict[str, Any]:
        """Check VPC configuration"""
        findings = []
        recommendations = []
        score = 10

        vpc_config = config.get('VpcConfig')

        if vpc_config:
            # Check security group configuration
            security_groups = vpc_config.get('SecurityGroupIds', [])
            if not security_groups:
                findings.append({
                    'severity': 'HIGH',
                    'category': 'VPC_SECURITY',
                    'message': 'Function in VPC but no security groups configured'
                })
                score -= 5
            else:
                # Can add more detailed security group checks here
                # Check for overly permissive rules
                pass

            # Check subnet configuration
            subnets = vpc_config.get('SubnetIds', [])
            if len(subnets) < 2:
                findings.append({
                    'severity': 'MEDIUM',
                    'category': 'VPC_AVAILABILITY',
                    'message': 'Function configured with fewer than 2 subnets, may impact availability'
                })
                recommendations.append({
                    'category': 'VPC',
                    'action': 'ADD_SUBNETS',
                    'description': 'Configure function with subnets in multiple AZs'
                })
                score -= 1
        else:
            # Function not in VPC, check if needed
            findings.append({
                'severity': 'LOW',
                'category': 'VPC_CONFIGURATION',
                'message': 'Function not configured to run in VPC'
            })
            recommendations.append({
                'category': 'VPC',
                'action': 'CONSIDER_VPC',
                'description': 'Consider running function in VPC if it accesses private resources'
            })

        return {
            'check_name': 'VPC Configuration Check',
            'findings': findings,
            'recommendations': recommendations,
            'score': max(0, score)
        }

    def _check_encryption_settings(self, function_name: str, config: Dict[str, Any]) -> Dict[str, Any]:
        """Check encryption settings"""
        findings = []
        recommendations = []
        score = 10

        # Check code encryption
        kms_key_arn = config.get('KMSKeyArn')
        if not kms_key_arn:
            findings.append({
                'severity': 'MEDIUM',
                'category': 'ENCRYPTION',
                'message': 'Function code not encrypted with customer-managed KMS key'
            })
            recommendations.append({
                'category': 'ENCRYPTION',
                'action': 'ENABLE_CODE_ENCRYPTION',
                'description': 'Use customer-managed KMS key for function code encryption'
            })
            score -= 2

        # Check environment variable encryption
        env_vars = config.get('Environment', {}).get('Variables', {})
        if env_vars and not kms_key_arn:
            findings.append({
                'severity': 'MEDIUM',
                'category': 'ENCRYPTION',
                'message': 'Environment variables not encrypted with customer-managed KMS key'
            })
            score -= 1

        return {
            'check_name': 'Encryption Settings Check',
            'findings': findings,
            'recommendations': recommendations,
            'score': max(0, score)
        }

    def _check_function_configuration(self, function_name: str, config: Dict[str, Any]) -> Dict[str, Any]:
        """Check function configuration"""
        findings = []
        recommendations = []
        score = 10

        # Check runtime version
        runtime = config.get('Runtime', '')
        deprecated_runtimes = [
            'python2.7', 'python3.6', 'nodejs8.10', 'nodejs10.x',
            'dotnetcore2.1', 'go1.x', 'ruby2.5'
        ]

        if runtime in deprecated_runtimes:
            findings.append({
                'severity': 'HIGH',
                'category': 'RUNTIME_SECURITY',
                'message': f'Function uses deprecated runtime: {runtime}'
            })
            recommendations.append({
                'category': 'RUNTIME',
                'action': 'UPDATE_RUNTIME',
                'description': 'Update to latest supported runtime version'
            })
            score -= 3

        # Check function timeout
        timeout = config.get('Timeout', 3)
        if timeout > 300:  # 5 minutes
            findings.append({
                'severity': 'MEDIUM',
                'category': 'FUNCTION_CONFIG',
                'message': f'Function timeout is very high: {timeout}s'
            })
            recommendations.append({
                'category': 'CONFIGURATION',
                'action': 'OPTIMIZE_TIMEOUT',
                'description': 'Review and optimize function timeout'
            })
            score -= 1

        # Check reserved concurrency
        reserved_concurrency = config.get('ReservedConcurrencyConfig')
        if not reserved_concurrency:
            findings.append({
                'severity': 'LOW',
                'category': 'FUNCTION_CONFIG',
                'message': 'Function does not have reserved concurrency configured'
            })
            recommendations.append({
                'category': 'CONFIGURATION',
                'action': 'SET_CONCURRENCY_LIMITS',
                'description': 'Configure reserved concurrency to prevent resource exhaustion'
            })

        return {
            'check_name': 'Function Configuration Check',
            'findings': findings,
            'recommendations': recommendations,
            'score': max(0, score)
        }

    def _check_code_security(self, function_name: str, config: Dict[str, Any]) -> Dict[str, Any]:
        """Check code security (basic checks)"""
        findings = []
        recommendations = []
        score = 10

        # Can only do limited code security checks as we cannot directly access source code
        # In practice, should integrate code scanning tools

        # Check code package size
        code_size = config.get('CodeSize', 0)
        if code_size > 50 * 1024 * 1024:  # 50MB
            findings.append({
                'severity': 'MEDIUM',
                'category': 'CODE_SECURITY',
                'message': f'Function code package is large: {code_size / (1024*1024):.1f}MB'
            })
            recommendations.append({
                'category': 'CODE',
                'action': 'OPTIMIZE_PACKAGE_SIZE',
                'description': 'Optimize code package size and remove unnecessary dependencies'
            })
            score -= 1

        # Check Layer usage
        layers = config.get('Layers', [])
        if len(layers) > 5:
            findings.append({
                'severity': 'LOW',
                'category': 'CODE_SECURITY',
                'message': f'Function uses many layers: {len(layers)}'
            })
            recommendations.append({
                'category': 'CODE',
                'action': 'REVIEW_LAYERS',
                'description': 'Review and consolidate Lambda layers'
            })

        return {
            'check_name': 'Code Security Check',
            'findings': findings,
            'recommendations': recommendations,
            'score': max(0, score)
        }

    def _check_logging_configuration(self, function_name: str, config: Dict[str, Any]) -> Dict[str, Any]:
        """Check logging configuration"""
        findings = []
        recommendations = []
        score = 10

        try:
            # Check CloudWatch log group
            logs_client = boto3.client('logs', region_name=self.region)
            log_group_name = f"/aws/lambda/{function_name}"

            try:
                log_group = logs_client.describe_log_groups(
                    logGroupNamePrefix=log_group_name
                )['logGroups']

                if log_group:
                    log_group = log_group[0]

                    # Check log retention
                    retention_days = log_group.get('retentionInDays')
                    if not retention_days:
                        findings.append({
                            'severity': 'MEDIUM',
                            'category': 'LOGGING',
                            'message': 'Log group has no retention policy (logs kept indefinitely)'
                        })
                        recommendations.append({
                            'category': 'LOGGING',
                            'action': 'SET_LOG_RETENTION',
                            'description': 'Set appropriate log retention period'
                        })
                        score -= 2
                    elif retention_days > 365:
                        findings.append({
                            'severity': 'LOW',
                            'category': 'LOGGING',
                            'message': f'Log retention period is very long: {retention_days} days'
                        })
                        score -= 1

                    # Check log encryption
                    kms_key_id = log_group.get('kmsKeyId')
                    if not kms_key_id:
                        findings.append({
                            'severity': 'MEDIUM',
                            'category': 'LOGGING',
                            'message': 'CloudWatch logs not encrypted with KMS'
                        })
                        recommendations.append({
                            'category': 'ENCRYPTION',
                            'action': 'ENCRYPT_LOGS',
                            'description': 'Enable KMS encryption for CloudWatch logs'
                        })
                        score -= 2

            except Exception:
                findings.append({
                    'severity': 'LOW',
                    'category': 'LOGGING',
                    'message': 'Could not verify log group configuration'
                })
                score -= 1

        except Exception as e:
            findings.append({
                'severity': 'LOW',
                'category': 'LOGGING_CHECK_ERROR',
                'message': f'Failed to check logging configuration: {str(e)}'
            })
            score -= 1

        return {
            'check_name': 'Logging Configuration Check',
            'findings': findings,
            'recommendations': recommendations,
            'score': max(0, score)
        }

    def _check_network_access(self, function_name: str, config: Dict[str, Any]) -> Dict[str, Any]:
        """Check network access configuration"""
        findings = []
        recommendations = []
        score = 10

        # Check function URL configuration
        try:
            url_config = self.lambda_client.get_function_url_config(
                FunctionName=function_name
            )

            auth_type = url_config.get('AuthType', '')
            if auth_type == 'NONE':
                findings.append({
                    'severity': 'HIGH',
                    'category': 'NETWORK_SECURITY',
                    'message': 'Function URL configured with no authentication'
                })
                recommendations.append({
                    'category': 'AUTHENTICATION',
                    'action': 'ENABLE_AUTH',
                    'description': 'Enable authentication for function URL'
                })
                score -= 5

            # Check CORS configuration
            cors = url_config.get('Cors', {})
            allow_origins = cors.get('AllowOrigins', [])
            if '*' in allow_origins:
                findings.append({
                    'severity': 'MEDIUM',
                    'category': 'NETWORK_SECURITY',
                    'message': 'Function URL allows all origins (*)'
                })
                recommendations.append({
                    'category': 'CORS',
                    'action': 'RESTRICT_ORIGINS',
                    'description': 'Restrict CORS origins to specific domains'
                })
                score -= 2

        except self.lambda_client.exceptions.ResourceNotFoundException:
            # Function doesn't have URL configured, which is good
            pass
        except Exception as e:
            findings.append({
                'severity': 'LOW',
                'category': 'NETWORK_CHECK_ERROR',
                'message': f'Failed to check function URL configuration: {str(e)}'
            })

        return {
            'check_name': 'Network Access Check',
            'findings': findings,
            'recommendations': recommendations,
            'score': max(0, score)
        }

    def _get_role_policies(self, role_name: str) -> List[Dict[str, Any]]:
        """Get role policies"""
        policies = []

        try:
            # Get attached managed policies
            attached_policies = self.iam_client.list_attached_role_policies(
                RoleName=role_name
            )['AttachedPolicies']

            for policy in attached_policies:
                policy_doc = self.iam_client.get_policy_version(
                    PolicyArn=policy['PolicyArn'],
                    VersionId=self.iam_client.get_policy(
                        PolicyArn=policy['PolicyArn']
                    )['Policy']['DefaultVersionId']
                )

                policies.append({
                    'type': 'managed',
                    'name': policy['PolicyName'],
                    'arn': policy['PolicyArn'],
                    'document': policy_doc['PolicyVersion']['Document']
                })

            # Get inline policies
            inline_policies = self.iam_client.list_role_policies(
                RoleName=role_name
            )['PolicyNames']

            for policy_name in inline_policies:
                policy_doc = self.iam_client.get_role_policy(
                    RoleName=role_name,
                    PolicyName=policy_name
                )

                policies.append({
                    'type': 'inline',
                    'name': policy_name,
                    'document': policy_doc['PolicyDocument']
                })

        except Exception as e:
            print(f"Error getting role policies: {str(e)}")

        return policies

    def _check_overprivileged_actions(self, policies: List[Dict[str, Any]]) -> List[str]:
        """Check for overprivileged actions"""
        dangerous_actions = [
            '*',
            'iam:*',
            'sts:AssumeRole',
            'lambda:InvokeFunction',
            's3:*',
            'dynamodb:*',
            'rds:*',
            'ec2:*'
        ]

        found_actions = []

        for policy in policies:
            statements = policy['document'].get('Statement', [])
            if not isinstance(statements, list):
                statements = [statements]

            for statement in statements:
                if statement.get('Effect') == 'Allow':
                    actions = statement.get('Action', [])
                    if isinstance(actions, str):
                        actions = [actions]

                    for action in actions:
                        if action in dangerous_actions:
                            found_actions.append(action)

        return list(set(found_actions))

    def _check_wildcard_permissions(self, policies: List[Dict[str, Any]]) -> List[str]:
        """Check for wildcard permissions"""
        wildcard_resources = []

        for policy in policies:
            statements = policy['document'].get('Statement', [])
            if not isinstance(statements, list):
                statements = [statements]

            for statement in statements:
                if statement.get('Effect') == 'Allow':
                    resources = statement.get('Resource', [])
                    if isinstance(resources, str):
                        resources = [resources]

                    for resource in resources:
                        if resource == '*':
                            wildcard_resources.append(f"Policy: {policy['name']}")

        return wildcard_resources

    def _check_inline_policies(self, role_name: str) -> List[str]:
        """Check for inline policies"""
        try:
            inline_policies = self.iam_client.list_role_policies(
                RoleName=role_name
            )['PolicyNames']
            return inline_policies
        except Exception:
            return []

    def generate_security_report(self, audit_report: Dict[str, Any]) -> str:
        """Generate security report"""
        report = f"""
AWS LAMBDA SECURITY AUDIT REPORT
================================

Function: {audit_report['function_name']}
Audit Date: {audit_report['audit_timestamp']}
Security Score: {audit_report['security_score']}/100

"""

        # High severity issues
        high_severity = [f for f in audit_report['findings'] if f['severity'] == 'HIGH']
        if high_severity:
            report += f"HIGH SEVERITY ISSUES ({len(high_severity)}):\n"
            for finding in high_severity:
                report += f"  - {finding['category']}: {finding['message']}\n"
            report += "\n"

        # Medium severity issues
        medium_severity = [f for f in audit_report['findings'] if f['severity'] == 'MEDIUM']
        if medium_severity:
            report += f"MEDIUM SEVERITY ISSUES ({len(medium_severity)}):\n"
            for finding in medium_severity:
                report += f"  - {finding['category']}: {finding['message']}\n"
            report += "\n"

        # Recommendations
        if audit_report['recommendations']:
            report += "RECOMMENDATIONS:\n"
            for rec in audit_report['recommendations'][:10]:  # Show top 10 recommendations
                report += f"  - {rec['action']}: {rec['description']}\n"
            report += "\n"

        # Checks summary
        report += f"CHECKS PERFORMED:\n"
        for check in audit_report['checks_performed']:
            report += f"  ✓ {check}\n"

        return report

def main():
    """Main security check function"""
    function_name = input("Enter Lambda function name for security audit: ").strip()

    checker = LambdaSecurityChecker()
    audit_report = checker.comprehensive_security_audit(function_name)

    # Print report
    report_text = checker.generate_security_report(audit_report)
    print(report_text)

    # Save detailed report
    with open(f"{function_name}_security_audit.json", 'w') as f:
        json.dump(audit_report, f, indent=2, default=str)

    print(f"\nDetailed security audit saved to {function_name}_security_audit.json")

if __name__ == "__main__":
    main()

12.2 Data Encryption and Key Management

12.2.1 End-to-End Encryption Implementation

# security/encryption_manager.py
import json
import boto3
import base64
import os
from typing import Dict, Any, Optional
from cryptography.fernet import Fernet
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
import logging

logger = logging.getLogger()
logger.setLevel(logging.INFO)

class LambdaEncryptionManager:
    """Lambda Encryption Manager"""

    def __init__(self, kms_key_id: str = None, region: str = 'us-east-1'):
        self.kms_key_id = kms_key_id
        self.region = region
        self.kms_client = boto3.client('kms', region_name=region)
        self.secrets_client = boto3.client('secretsmanager', region_name=region)
        self.ssm_client = boto3.client('ssm', region_name=region)

    def encrypt_data(self, plaintext: str, context: Dict[str, str] = None) -> Dict[str, Any]:
        """Encrypt data using KMS"""
        try:
            encrypt_kwargs = {
                'KeyId': self.kms_key_id or 'alias/aws/lambda',
                'Plaintext': plaintext.encode('utf-8')
            }

            if context:
                encrypt_kwargs['EncryptionContext'] = context

            response = self.kms_client.encrypt(**encrypt_kwargs)

            return {
                'ciphertext': base64.b64encode(response['CiphertextBlob']).decode('utf-8'),
                'key_id': response['KeyId'],
                'encryption_context': context or {}
            }

        except Exception as e:
            logger.error(f"Encryption failed: {str(e)}")
            raise

    def decrypt_data(self, ciphertext: str, context: Dict[str, str] = None) -> str:
        """Decrypt data using KMS"""
        try:
            decrypt_kwargs = {
                'CiphertextBlob': base64.b64decode(ciphertext.encode('utf-8'))
            }

            if context:
                decrypt_kwargs['EncryptionContext'] = context

            response = self.kms_client.decrypt(**decrypt_kwargs)

            return response['Plaintext'].decode('utf-8')

        except Exception as e:
            logger.error(f"Decryption failed: {str(e)}")
            raise

    def create_data_key(self, key_spec: str = 'AES_256') -> Dict[str, Any]:
        """Create data key"""
        try:
            response = self.kms_client.generate_data_key(
                KeyId=self.kms_key_id or 'alias/aws/lambda',
                KeySpec=key_spec
            )

            return {
                'plaintext_key': response['Plaintext'],
                'encrypted_key': base64.b64encode(response['CiphertextBlob']).decode('utf-8')
            }

        except Exception as e:
            logger.error(f"Data key creation failed: {str(e)}")
            raise

    def envelope_encrypt(self, data: Dict[str, Any]) -> Dict[str, Any]:
        """Envelope encryption for large data"""
        try:
            # Create data key
            data_key_response = self.create_data_key()
            plaintext_key = data_key_response['plaintext_key']
            encrypted_key = data_key_response['encrypted_key']

            # Encrypt data using data key
            fernet = Fernet(base64.urlsafe_b64encode(plaintext_key[:32]))
            encrypted_data = fernet.encrypt(json.dumps(data).encode('utf-8'))

            return {
                'encrypted_data': base64.b64encode(encrypted_data).decode('utf-8'),
                'encrypted_data_key': encrypted_key
            }

        except Exception as e:
            logger.error(f"Envelope encryption failed: {str(e)}")
            raise

    def envelope_decrypt(self, encrypted_data: str, encrypted_data_key: str) -> Dict[str, Any]:
        """Envelope decryption for large data"""
        try:
            # Decrypt data key
            data_key_response = self.kms_client.decrypt(
                CiphertextBlob=base64.b64decode(encrypted_data_key.encode('utf-8'))
            )
            plaintext_key = data_key_response['Plaintext']

            # Decrypt data using data key
            fernet = Fernet(base64.urlsafe_b64encode(plaintext_key[:32]))
            decrypted_data = fernet.decrypt(base64.b64decode(encrypted_data.encode('utf-8')))

            return json.loads(decrypted_data.decode('utf-8'))

        except Exception as e:
            logger.error(f"Envelope decryption failed: {str(e)}")
            raise

    def store_encrypted_secret(self, secret_name: str, secret_value: str,
                             description: str = None) -> str:
        """Store encrypted secret to Secrets Manager"""
        try:
            create_kwargs = {
                'Name': secret_name,
                'SecretString': secret_value
            }

            if self.kms_key_id:
                create_kwargs['KmsKeyId'] = self.kms_key_id

            if description:
                create_kwargs['Description'] = description

            response = self.secrets_client.create_secret(**create_kwargs)

            return response['ARN']

        except self.secrets_client.exceptions.ResourceExistsException:
            # If secret already exists, update it
            response = self.secrets_client.update_secret(
                SecretId=secret_name,
                SecretString=secret_value,
                KmsKeyId=self.kms_key_id
            )
            return response['ARN']

        except Exception as e:
            logger.error(f"Failed to store secret: {str(e)}")
            raise

    def retrieve_secret(self, secret_name: str) -> str:
        """Retrieve secret from Secrets Manager"""
        try:
            response = self.secrets_client.get_secret_value(SecretId=secret_name)
            return response['SecretString']

        except Exception as e:
            logger.error(f"Failed to retrieve secret: {str(e)}")
            raise

    def store_encrypted_parameter(self, parameter_name: str, parameter_value: str,
                                 description: str = None, parameter_type: str = 'SecureString') -> str:
        """Store encrypted parameter to Parameter Store"""
        try:
            put_kwargs = {
                'Name': parameter_name,
                'Value': parameter_value,
                'Type': parameter_type,
                'Overwrite': True
            }

            if self.kms_key_id and parameter_type == 'SecureString':
                put_kwargs['KeyId'] = self.kms_key_id

            if description:
                put_kwargs['Description'] = description

            response = self.ssm_client.put_parameter(**put_kwargs)

            return parameter_name

        except Exception as e:
            logger.error(f"Failed to store parameter: {str(e)}")
            raise

    def retrieve_parameter(self, parameter_name: str, decrypt: bool = True) -> str:
        """Retrieve parameter from Parameter Store"""
        try:
            response = self.ssm_client.get_parameter(
                Name=parameter_name,
                WithDecryption=decrypt
            )
            return response['Parameter']['Value']

        except Exception as e:
            logger.error(f"Failed to retrieve parameter: {str(e)}")
            raise

class SecureDataProcessor:
    """Secure Data Processor"""

    def __init__(self, encryption_manager: LambdaEncryptionManager):
        self.encryption_manager = encryption_manager

    def process_sensitive_data(self, data: Dict[str, Any]) -> Dict[str, Any]:
        """Process sensitive data"""
        try:
            # Identify sensitive fields
            sensitive_fields = self._identify_sensitive_fields(data)

            # Create processing result
            processed_data = data.copy()
            encrypted_fields = {}

            # Encrypt sensitive fields
            for field in sensitive_fields:
                if field in processed_data:
                    field_value = str(processed_data[field])

                    # Use field name as encryption context
                    context = {
                        'field_name': field,
                        'data_type': 'sensitive'
                    }

                    encrypted_value = self.encryption_manager.encrypt_data(
                        field_value, context
                    )

                    # Replace original value
                    processed_data[field] = f"ENCRYPTED:{encrypted_value['ciphertext']}"
                    encrypted_fields[field] = encrypted_value

            return {
                'processed_data': processed_data,
                'encrypted_fields': encrypted_fields,
                'encryption_metadata': {
                    'encrypted_field_count': len(encrypted_fields),
                    'encryption_timestamp': datetime.utcnow().isoformat()
                }
            }

        except Exception as e:
            logger.error(f"Sensitive data processing failed: {str(e)}")
            raise

    def restore_sensitive_data(self, processed_data: Dict[str, Any],
                             encrypted_fields: Dict[str, Any]) -> Dict[str, Any]:
        """Restore sensitive data"""
        try:
            restored_data = processed_data.copy()

            for field, encryption_info in encrypted_fields.items():
                if field in restored_data:
                    ciphertext = encryption_info['ciphertext']
                    context = encryption_info['encryption_context']

                    # Decrypt field value
                    decrypted_value = self.encryption_manager.decrypt_data(
                        ciphertext, context
                    )

                    # Restore original value
                    restored_data[field] = decrypted_value

            return restored_data

        except Exception as e:
            logger.error(f"Data restoration failed: {str(e)}")
            raise

    def _identify_sensitive_fields(self, data: Dict[str, Any]) -> List[str]:
        """Identify sensitive fields"""
        sensitive_patterns = [
            'password', 'pwd', 'secret', 'key', 'token',
            'ssn', 'social_security', 'credit_card', 'cc_number',
            'phone', 'email', 'address', 'birth_date'
        ]

        sensitive_fields = []

        for field_name in data.keys():
            field_lower = field_name.lower()
            for pattern in sensitive_patterns:
                if pattern in field_lower:
                    sensitive_fields.append(field_name)
                    break

        return sensitive_fields

def handler(event: Dict[str, Any], context) -> Dict[str, Any]:
    """Secure data processing Lambda handler"""

    logger.info(f"Processing secure data operation: {event.get('action', 'unknown')}")

    try:
        # Initialize encryption manager
        kms_key_id = os.environ.get('KMS_KEY_ID')
        encryption_manager = LambdaEncryptionManager(kms_key_id)
        secure_processor = SecureDataProcessor(encryption_manager)

        action = event.get('action', 'process')

        if action == 'encrypt_data':
            # Encrypt data
            plaintext = event.get('data', '')
            context = event.get('context', {})

            result = encryption_manager.encrypt_data(plaintext, context)

            return {
                'statusCode': 200,
                'body': json.dumps({
                    'success': True,
                    'result': result
                })
            }

        elif action == 'decrypt_data':
            # Decrypt data
            ciphertext = event.get('ciphertext', '')
            context = event.get('context', {})

            result = encryption_manager.decrypt_data(ciphertext, context)

            return {
                'statusCode': 200,
                'body': json.dumps({
                    'success': True,
                    'result': result
                })
            }

        elif action == 'process_sensitive':
            # Process sensitive data
            data = event.get('data', {})

            result = secure_processor.process_sensitive_data(data)

            return {
                'statusCode': 200,
                'body': json.dumps({
                    'success': True,
                    'result': result
                }, default=str)
            }

        elif action == 'envelope_encrypt':
            # Envelope encryption
            data = event.get('data', {})

            result = encryption_manager.envelope_encrypt(data)

            return {
                'statusCode': 200,
                'body': json.dumps({
                    'success': True,
                    'result': result
                })
            }

        elif action == 'envelope_decrypt':
            # Envelope decryption
            encrypted_data = event.get('encrypted_data', '')
            encrypted_data_key = event.get('encrypted_data_key', '')

            result = encryption_manager.envelope_decrypt(encrypted_data, encrypted_data_key)

            return {
                'statusCode': 200,
                'body': json.dumps({
                    'success': True,
                    'result': result
                }, default=str)
            }

        elif action == 'store_secret':
            # Store secret
            secret_name = event.get('secret_name', '')
            secret_value = event.get('secret_value', '')
            description = event.get('description')

            result = encryption_manager.store_encrypted_secret(
                secret_name, secret_value, description
            )

            return {
                'statusCode': 200,
                'body': json.dumps({
                    'success': True,
                    'secret_arn': result
                })
            }

        elif action == 'retrieve_secret':
            # Retrieve secret
            secret_name = event.get('secret_name', '')

            result = encryption_manager.retrieve_secret(secret_name)

            return {
                'statusCode': 200,
                'body': json.dumps({
                    'success': True,
                    'secret_value': result
                })
            }

        else:
            return {
                'statusCode': 400,
                'body': json.dumps({
                    'success': False,
                    'error': f'Unknown action: {action}'
                })
            }

    except Exception as e:
        logger.error(f"Error processing request: {str(e)}")
        return {
            'statusCode': 500,
            'body': json.dumps({
                'success': False,
                'error': str(e)
            })
        }

12.3 Network Security

12.3.1 VPC Security Configuration

# stacks/secure_vpc_stack.py
from aws_cdk import (
    Stack,
    aws_ec2 as ec2,
    aws_lambda as _lambda,
    aws_logs as logs,
    Duration,
    RemovalPolicy
)
from constructs import Construct

class SecureVpcStack(Stack):
    def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
        super().__init__(scope, construct_id, **kwargs)

        # Create secure VPC
        self._create_secure_vpc()

        # Create security groups
        self._create_security_groups()

        # Create network ACLs
        self._create_network_acls()

        # Create VPC Flow Logs
        self._create_vpc_flow_logs()

        # Create NAT gateways
        self._create_nat_gateways()

        # Create VPC endpoints
        self._create_vpc_endpoints()

    def _create_secure_vpc(self):
        """Create secure VPC"""
        self.vpc = ec2.Vpc(
            self, "SecureVpc",
            max_azs=2,
            cidr="10.0.0.0/16",
            subnet_configuration=[
                # Public subnet (only for NAT gateways)
                ec2.SubnetConfiguration(
                    name="Public",
                    subnet_type=ec2.SubnetType.PUBLIC,
                    cidr_mask=24
                ),
                # Private subnet (Lambda functions)
                ec2.SubnetConfiguration(
                    name="Private",
                    subnet_type=ec2.SubnetType.PRIVATE_WITH_EGRESS,
                    cidr_mask=24
                ),
                # Isolated subnet (databases, etc.)
                ec2.SubnetConfiguration(
                    name="Isolated",
                    subnet_type=ec2.SubnetType.PRIVATE_ISOLATED,
                    cidr_mask=24
                )
            ],
            enable_dns_hostnames=True,
            enable_dns_support=True
        )

    def _create_security_groups(self):
        """Create security groups"""

        # Lambda function security group
        self.lambda_sg = ec2.SecurityGroup(
            self, "LambdaSecurityGroup",
            vpc=self.vpc,
            description="Security group for Lambda functions",
            allow_all_outbound=False  # Explicitly control outbound traffic
        )

        # Allow HTTPS outbound traffic
        self.lambda_sg.add_egress_rule(
            peer=ec2.Peer.any_ipv4(),
            connection=ec2.Port.tcp(443),
            description="HTTPS outbound for API calls"
        )

        # Allow HTTP outbound traffic (only when necessary)
        self.lambda_sg.add_egress_rule(
            peer=ec2.Peer.any_ipv4(),
            connection=ec2.Port.tcp(80),
            description="HTTP outbound for specific services"
        )

        # Database security group
        self.database_sg = ec2.SecurityGroup(
            self, "DatabaseSecurityGroup",
            vpc=self.vpc,
            description="Security group for databases",
            allow_all_outbound=False
        )

        # Only allow Lambda security group to access database
        self.database_sg.add_ingress_rule(
            peer=self.lambda_sg,
            connection=ec2.Port.tcp(5432),  # PostgreSQL
            description="Allow Lambda access to PostgreSQL"
        )

        self.database_sg.add_ingress_rule(
            peer=self.lambda_sg,
            connection=ec2.Port.tcp(3306),  # MySQL
            description="Allow Lambda access to MySQL"
        )

        # Cache security group
        self.cache_sg = ec2.SecurityGroup(
            self, "CacheSecurityGroup",
            vpc=self.vpc,
            description="Security group for cache services",
            allow_all_outbound=False
        )

        # Allow Lambda to access Redis
        self.cache_sg.add_ingress_rule(
            peer=self.lambda_sg,
            connection=ec2.Port.tcp(6379),
            description="Allow Lambda access to Redis"
        )

    def _create_network_acls(self):
        """Create network ACLs"""

        # Private subnet ACL
        self.private_nacl = ec2.NetworkAcl(
            self, "PrivateNetworkAcl",
            vpc=self.vpc,
            subnet_selection=ec2.SubnetSelection(
                subnet_type=ec2.SubnetType.PRIVATE_WITH_EGRESS
            )
        )

        # Inbound rules
        self.private_nacl.add_entry(
            "AllowInboundHTTPS",
            rule_number=100,
            cidr=ec2.AclCidr.any_ipv4(),
            traffic=ec2.AclTraffic.tcp_port(443),
            direction=ec2.TrafficDirection.INGRESS,
            rule_action=ec2.Action.ALLOW
        )

        self.private_nacl.add_entry(
            "AllowInboundEphemeral",
            rule_number=110,
            cidr=ec2.AclCidr.any_ipv4(),
            traffic=ec2.AclTraffic.tcp_port_range(1024, 65535),
            direction=ec2.TrafficDirection.INGRESS,
            rule_action=ec2.Action.ALLOW
        )

        # Outbound rules
        self.private_nacl.add_entry(
            "AllowOutboundHTTPS",
            rule_number=100,
            cidr=ec2.AclCidr.any_ipv4(),
            traffic=ec2.AclTraffic.tcp_port(443),
            direction=ec2.TrafficDirection.EGRESS,
            rule_action=ec2.Action.ALLOW
        )

        self.private_nacl.add_entry(
            "AllowOutboundEphemeral",
            rule_number=110,
            cidr=ec2.AclCidr.any_ipv4(),
            traffic=ec2.AclTraffic.tcp_port_range(1024, 65535),
            direction=ec2.TrafficDirection.EGRESS,
            rule_action=ec2.Action.ALLOW
        )

        # Isolated subnet ACL (stricter)
        self.isolated_nacl = ec2.NetworkAcl(
            self, "IsolatedNetworkAcl",
            vpc=self.vpc,
            subnet_selection=ec2.SubnetSelection(
                subnet_type=ec2.SubnetType.PRIVATE_ISOLATED
            )
        )

        # Only allow VPC internal traffic
        self.isolated_nacl.add_entry(
            "AllowInboundVPC",
            rule_number=100,
            cidr=ec2.AclCidr.ipv4("10.0.0.0/16"),
            traffic=ec2.AclTraffic.all_traffic(),
            direction=ec2.TrafficDirection.INGRESS,
            rule_action=ec2.Action.ALLOW
        )

        self.isolated_nacl.add_entry(
            "AllowOutboundVPC",
            rule_number=100,
            cidr=ec2.AclCidr.ipv4("10.0.0.0/16"),
            traffic=ec2.AclTraffic.all_traffic(),
            direction=ec2.TrafficDirection.EGRESS,
            rule_action=ec2.Action.ALLOW
        )

    def _create_vpc_flow_logs(self):
        """Create VPC flow logs"""

        # Create CloudWatch log group
        self.flow_log_group = logs.LogGroup(
            self, "VpcFlowLogGroup",
            log_group_name="/aws/vpc/flowlogs",
            retention=logs.RetentionDays.ONE_MONTH,
            removal_policy=RemovalPolicy.DESTROY
        )

        # Create flow log
        self.vpc.add_flow_log(
            "VpcFlowLog",
            destination=ec2.FlowLogDestination.to_cloud_watch_logs(
                self.flow_log_group
            ),
            traffic_type=ec2.FlowLogTrafficType.ALL
        )

    def _create_nat_gateways(self):
        """Create NAT gateways (automatically created by CDK)"""
        # NAT gateways are automatically created by CDK in PRIVATE_WITH_EGRESS subnet configuration
        pass

    def _create_vpc_endpoints(self):
        """Create VPC endpoints"""

        # S3 gateway endpoint
        self.vpc.add_gateway_endpoint(
            "S3Endpoint",
            service=ec2.GatewayVpcEndpointAwsService.S3,
            subnets=[
                ec2.SubnetSelection(subnet_type=ec2.SubnetType.PRIVATE_WITH_EGRESS),
                ec2.SubnetSelection(subnet_type=ec2.SubnetType.PRIVATE_ISOLATED)
            ]
        )

        # DynamoDB gateway endpoint
        self.vpc.add_gateway_endpoint(
            "DynamoDbEndpoint",
            service=ec2.GatewayVpcEndpointAwsService.DYNAMODB,
            subnets=[
                ec2.SubnetSelection(subnet_type=ec2.SubnetType.PRIVATE_WITH_EGRESS),
                ec2.SubnetSelection(subnet_type=ec2.SubnetType.PRIVATE_ISOLATED)
            ]
        )

        # Lambda interface endpoint
        self.vpc.add_interface_endpoint(
            "LambdaEndpoint",
            service=ec2.InterfaceVpcEndpointAwsService.LAMBDA_,
            subnets=ec2.SubnetSelection(
                subnet_type=ec2.SubnetType.PRIVATE_WITH_EGRESS
            ),
            security_groups=[self.lambda_sg]
        )

        # Secrets Manager interface endpoint
        self.vpc.add_interface_endpoint(
            "SecretsManagerEndpoint",
            service=ec2.InterfaceVpcEndpointAwsService.SECRETS_MANAGER,
            subnets=ec2.SubnetSelection(
                subnet_type=ec2.SubnetType.PRIVATE_WITH_EGRESS
            ),
            security_groups=[self.lambda_sg]
        )

        # KMS interface endpoint
        self.vpc.add_interface_endpoint(
            "KmsEndpoint",
            service=ec2.InterfaceVpcEndpointAwsService.KMS,
            subnets=ec2.SubnetSelection(
                subnet_type=ec2.SubnetType.PRIVATE_WITH_EGRESS
            ),
            security_groups=[self.lambda_sg]
        )

class SecureLambdaStack(Stack):
    def __init__(self, scope: Construct, construct_id: str,
                 vpc_stack: SecureVpcStack, **kwargs) -> None:
        super().__init__(scope, construct_id, **kwargs)

        self.vpc_stack = vpc_stack

        # Create secure Lambda function
        self._create_secure_lambda_function()

    def _create_secure_lambda_function(self):
        """Create secure Lambda function"""

        self.secure_function = _lambda.Function(
            self, "SecureFunction",
            runtime=_lambda.Runtime.PYTHON_3_9,
            handler="index.handler",
            code=_lambda.Code.from_asset("lambda_functions/secure_function"),
            vpc=self.vpc_stack.vpc,
            vpc_subnets=ec2.SubnetSelection(
                subnet_type=ec2.SubnetType.PRIVATE_WITH_EGRESS
            ),
            security_groups=[self.vpc_stack.lambda_sg],
            timeout=Duration.minutes(5),
            memory_size=512,
            environment={
                'VPC_ENABLED': 'true',
                'ENVIRONMENT': 'production'
            },
            # Enable X-Ray tracing
            tracing=_lambda.Tracing.ACTIVE,
            # Set reserved concurrency
            reserved_concurrent_executions=10
        )

12.4 Security Auditing and Compliance

12.4.1 Compliance Check Framework

# security/compliance_checker.py
import json
import boto3
from typing import Dict, Any, List
from datetime import datetime, timedelta
import re

class ComplianceChecker:
    """Compliance Checker"""

    def __init__(self, region: str = 'us-east-1'):
        self.region = region
        self.lambda_client = boto3.client('lambda', region_name=region)
        self.iam_client = boto3.client('iam')
        self.cloudtrail_client = boto3.client('cloudtrail', region_name=region)
        self.config_client = boto3.client('config', region_name=region)

        # Compliance framework definitions
        self.compliance_frameworks = {
            'SOC2': self._soc2_checks,
            'GDPR': self._gdpr_checks,
            'HIPAA': self._hipaa_checks,
            'PCI_DSS': self._pci_dss_checks,
            'ISO27001': self._iso27001_checks
        }

    def comprehensive_compliance_check(self, function_name: str,
                                     frameworks: List[str] = None) -> Dict[str, Any]:
        """Comprehensive compliance check"""
        if frameworks is None:
            frameworks = list(self.compliance_frameworks.keys())

        compliance_report = {
            'function_name': function_name,
            'check_timestamp': datetime.utcnow().isoformat(),
            'frameworks_checked': frameworks,
            'compliance_status': {},
            'recommendations': [],
            'audit_trail': []
        }

        # Get function configuration
        function_config = self._get_function_config(function_name)

        # Execute checks for each framework
        for framework in frameworks:
            if framework in self.compliance_frameworks:
                check_func = self.compliance_frameworks[framework]
                framework_result = check_func(function_name, function_config)
                compliance_report['compliance_status'][framework] = framework_result

        # Generate comprehensive recommendations
        compliance_report['recommendations'] = self._generate_compliance_recommendations(
            compliance_report['compliance_status']
        )

        # Get audit trail
        compliance_report['audit_trail'] = self._get_audit_trail(function_name)

        return compliance_report

    def _get_function_config(self, function_name: str) -> Dict[str, Any]:
        """Get function configuration"""
        response = self.lambda_client.get_function(FunctionName=function_name)
        return response['Configuration']

    def _soc2_checks(self, function_name: str, config: Dict[str, Any]) -> Dict[str, Any]:
        """SOC 2 compliance checks"""
        findings = []
        recommendations = []
        score = 0
        total_checks = 8

        # Trust Services Criteria checks

        # Security
        if config.get('VpcConfig'):
            score += 1
            findings.append({
                'criterion': 'Security',
                'check': 'VPC Configuration',
                'status': 'PASS',
                'message': 'Function is configured to run in VPC'
            })
        else:
            findings.append({
                'criterion': 'Security',
                'check': 'VPC Configuration',
                'status': 'FAIL',
                'message': 'Function is not configured to run in VPC'
            })
            recommendations.append('Configure function to run in VPC for network isolation')

        # Availability
        reserved_concurrency = config.get('ReservedConcurrencyConfig')
        if reserved_concurrency:
            score += 1
            findings.append({
                'criterion': 'Availability',
                'check': 'Concurrency Limits',
                'status': 'PASS',
                'message': 'Reserved concurrency configured'
            })
        else:
            findings.append({
                'criterion': 'Availability',
                'check': 'Concurrency Limits',
                'status': 'FAIL',
                'message': 'No reserved concurrency configured'
            })
            recommendations.append('Configure reserved concurrency to ensure availability')

        # Processing Integrity
        if config.get('TracingConfig', {}).get('Mode') == 'Active':
            score += 1
            findings.append({
                'criterion': 'Processing Integrity',
                'check': 'Tracing Configuration',
                'status': 'PASS',
                'message': 'X-Ray tracing is enabled'
            })
        else:
            findings.append({
                'criterion': 'Processing Integrity',
                'check': 'Tracing Configuration',
                'status': 'FAIL',
                'message': 'X-Ray tracing is not enabled'
            })
            recommendations.append('Enable X-Ray tracing for processing integrity')

        # Confidentiality
        if config.get('KMSKeyArn'):
            score += 1
            findings.append({
                'criterion': 'Confidentiality',
                'check': 'Encryption at Rest',
                'status': 'PASS',
                'message': 'Function uses customer-managed KMS key'
            })
        else:
            findings.append({
                'criterion': 'Confidentiality',
                'check': 'Encryption at Rest',
                'status': 'FAIL',
                'message': 'Function does not use customer-managed KMS key'
            })
            recommendations.append('Configure customer-managed KMS key for encryption')

        # More SOC 2 checks...

        compliance_percentage = (score / total_checks) * 100

        return {
            'framework': 'SOC2',
            'compliance_percentage': compliance_percentage,
            'findings': findings,
            'recommendations': recommendations,
            'status': 'COMPLIANT' if compliance_percentage >= 80 else 'NON_COMPLIANT'
        }

    def _gdpr_checks(self, function_name: str, config: Dict[str, Any]) -> Dict[str, Any]:
        """GDPR compliance checks"""
        findings = []
        recommendations = []
        score = 0
        total_checks = 6

        # Data protection principles checks

        # Data encryption
        if config.get('KMSKeyArn'):
            score += 1
            findings.append({
                'article': 'Article 32',
                'check': 'Data Encryption',
                'status': 'PASS',
                'message': 'Personal data is encrypted at rest'
            })
        else:
            findings.append({
                'article': 'Article 32',
                'check': 'Data Encryption',
                'status': 'FAIL',
                'message': 'Personal data may not be properly encrypted'
            })
            recommendations.append('Implement encryption for personal data protection')

        # Records of processing
        try:
            # Check CloudTrail logs
            trail_response = self.cloudtrail_client.describe_trails()
            trails = trail_response.get('trailList', [])

            if trails:
                score += 1
                findings.append({
                    'article': 'Article 30',
                    'check': 'Records of Processing',
                    'status': 'PASS',
                    'message': 'CloudTrail logging is configured'
                })
            else:
                findings.append({
                    'article': 'Article 30',
                    'check': 'Records of Processing',
                    'status': 'FAIL',
                    'message': 'No CloudTrail logging detected'
                })
                recommendations.append('Enable CloudTrail for processing activity records')
        except Exception:
            findings.append({
                'article': 'Article 30',
                'check': 'Records of Processing',
                'status': 'UNKNOWN',
                'message': 'Could not verify CloudTrail configuration'
            })

        # Data minimization
        timeout = config.get('Timeout', 3)
        if timeout <= 300:  # 5 minutes or less
            score += 1
            findings.append({
                'article': 'Article 5',
                'check': 'Data Minimization',
                'status': 'PASS',
                'message': 'Function timeout supports data minimization principle'
            })
        else:
            findings.append({
                'article': 'Article 5',
                'check': 'Data Minimization',
                'status': 'FAIL',
                'message': 'Long timeout may indicate excessive data processing'
            })
            recommendations.append('Review function timeout to ensure data minimization')

        # More GDPR checks...

        compliance_percentage = (score / total_checks) * 100

        return {
            'framework': 'GDPR',
            'compliance_percentage': compliance_percentage,
            'findings': findings,
            'recommendations': recommendations,
            'status': 'COMPLIANT' if compliance_percentage >= 80 else 'NON_COMPLIANT'
        }

    def _hipaa_checks(self, function_name: str, config: Dict[str, Any]) -> Dict[str, Any]:
        """HIPAA compliance checks"""
        findings = []
        recommendations = []
        score = 0
        total_checks = 5

        # HIPAA Safeguards checks

        # Administrative Safeguards
        role_arn = config.get('Role', '')
        if role_arn:
            # Check if role has appropriate permission restrictions
            score += 1
            findings.append({
                'safeguard': 'Administrative',
                'check': 'Access Control',
                'status': 'PASS',
                'message': 'Function has dedicated IAM role'
            })

        # Physical Safeguards (AWS responsibility)
        score += 1
        findings.append({
            'safeguard': 'Physical',
            'check': 'Facility Controls',
            'status': 'PASS',
            'message': 'AWS provides physical safeguards for infrastructure'
        })

        # Technical Safeguards
        if config.get('KMSKeyArn'):
            score += 1
            findings.append({
                'safeguard': 'Technical',
                'check': 'Encryption',
                'status': 'PASS',
                'message': 'PHI is encrypted at rest'
            })
        else:
            findings.append({
                'safeguard': 'Technical',
                'check': 'Encryption',
                'status': 'FAIL',
                'message': 'PHI may not be properly encrypted'
            })
            recommendations.append('Implement encryption for PHI protection')

        # Audit Controls
        if config.get('TracingConfig', {}).get('Mode') == 'Active':
            score += 1
            findings.append({
                'safeguard': 'Technical',
                'check': 'Audit Controls',
                'status': 'PASS',
                'message': 'X-Ray tracing provides audit capabilities'
            })
        else:
            findings.append({
                'safeguard': 'Technical',
                'check': 'Audit Controls',
                'status': 'FAIL',
                'message': 'Insufficient audit controls'
            })
            recommendations.append('Enable X-Ray tracing for audit controls')

        # Data Integrity
        if config.get('VpcConfig'):
            score += 1
            findings.append({
                'safeguard': 'Technical',
                'check': 'Data Integrity',
                'status': 'PASS',
                'message': 'VPC configuration supports data integrity'
            })
        else:
            findings.append({
                'safeguard': 'Technical',
                'check': 'Data Integrity',
                'status': 'FAIL',
                'message': 'No network isolation configured'
            })
            recommendations.append('Configure VPC for network isolation')

        compliance_percentage = (score / total_checks) * 100

        return {
            'framework': 'HIPAA',
            'compliance_percentage': compliance_percentage,
            'findings': findings,
            'recommendations': recommendations,
            'status': 'COMPLIANT' if compliance_percentage >= 80 else 'NON_COMPLIANT'
        }

    def _pci_dss_checks(self, function_name: str, config: Dict[str, Any]) -> Dict[str, Any]:
        """PCI DSS compliance checks"""
        findings = []
        recommendations = []
        score = 0
        total_checks = 4

        # PCI DSS Requirements checks

        # Requirement 3: Protect stored cardholder data
        if config.get('KMSKeyArn'):
            score += 1
            findings.append({
                'requirement': '3',
                'check': 'Data Encryption',
                'status': 'PASS',
                'message': 'Cardholder data is encrypted at rest'
            })
        else:
            findings.append({
                'requirement': '3',
                'check': 'Data Encryption',
                'status': 'FAIL',
                'message': 'Cardholder data may not be encrypted'
            })
            recommendations.append('Encrypt cardholder data at rest using KMS')

        # Requirement 4: Encrypt transmission of cardholder data
        if config.get('VpcConfig'):
            score += 1
            findings.append({
                'requirement': '4',
                'check': 'Network Security',
                'status': 'PASS',
                'message': 'VPC provides network isolation'
            })
        else:
            findings.append({
                'requirement': '4',
                'check': 'Network Security',
                'status': 'FAIL',
                'message': 'No network isolation configured'
            })
            recommendations.append('Configure VPC for secure data transmission')

        # Requirement 7: Restrict access to cardholder data
        reserved_concurrency = config.get('ReservedConcurrencyConfig')
        if reserved_concurrency:
            score += 1
            findings.append({
                'requirement': '7',
                'check': 'Access Control',
                'status': 'PASS',
                'message': 'Concurrency limits restrict access'
            })
        else:
            findings.append({
                'requirement': '7',
                'check': 'Access Control',
                'status': 'FAIL',
                'message': 'No access restrictions configured'
            })
            recommendations.append('Configure access restrictions')

        # Requirement 10: Track and monitor access
        if config.get('TracingConfig', {}).get('Mode') == 'Active':
            score += 1
            findings.append({
                'requirement': '10',
                'check': 'Monitoring',
                'status': 'PASS',
                'message': 'X-Ray tracing enables monitoring'
            })
        else:
            findings.append({
                'requirement': '10',
                'check': 'Monitoring',
                'status': 'FAIL',
                'message': 'Insufficient monitoring configured'
            })
            recommendations.append('Enable comprehensive monitoring and logging')

        compliance_percentage = (score / total_checks) * 100

        return {
            'framework': 'PCI_DSS',
            'compliance_percentage': compliance_percentage,
            'findings': findings,
            'recommendations': recommendations,
            'status': 'COMPLIANT' if compliance_percentage >= 80 else 'NON_COMPLIANT'
        }

    def _iso27001_checks(self, function_name: str, config: Dict[str, Any]) -> Dict[str, Any]:
        """ISO 27001 compliance checks"""
        findings = []
        recommendations = []
        score = 0
        total_checks = 6

        # ISO 27001 Controls checks

        # A.9.1 Business requirements for access control
        if config.get('Role'):
            score += 1
            findings.append({
                'control': 'A.9.1',
                'check': 'Access Control Policy',
                'status': 'PASS',
                'message': 'IAM role implements access control'
            })

        # A.10.1 Cryptographic controls
        if config.get('KMSKeyArn'):
            score += 1
            findings.append({
                'control': 'A.10.1',
                'check': 'Cryptographic Controls',
                'status': 'PASS',
                'message': 'Cryptographic controls implemented'
            })
        else:
            findings.append({
                'control': 'A.10.1',
                'check': 'Cryptographic Controls',
                'status': 'FAIL',
                'message': 'No cryptographic controls detected'
            })
            recommendations.append('Implement cryptographic controls')

        # A.12.4 Logging and monitoring
        if config.get('TracingConfig', {}).get('Mode') == 'Active':
            score += 1
            findings.append({
                'control': 'A.12.4',
                'check': 'Logging and Monitoring',
                'status': 'PASS',
                'message': 'Logging and monitoring implemented'
            })
        else:
            findings.append({
                'control': 'A.12.4',
                'check': 'Logging and Monitoring',
                'status': 'FAIL',
                'message': 'Insufficient logging and monitoring'
            })
            recommendations.append('Implement comprehensive logging and monitoring')

        # A.13.1 Network security management
        if config.get('VpcConfig'):
            score += 1
            findings.append({
                'control': 'A.13.1',
                'check': 'Network Security',
                'status': 'PASS',
                'message': 'Network security controls implemented'
            })
        else:
            findings.append({
                'control': 'A.13.1',
                'check': 'Network Security',
                'status': 'FAIL',
                'message': 'No network security controls'
            })
            recommendations.append('Implement network security controls')

        # A.14.1 Security requirements analysis
        runtime = config.get('Runtime', '')
        deprecated_runtimes = ['python2.7', 'python3.6', 'nodejs8.10']
        if runtime not in deprecated_runtimes:
            score += 1
            findings.append({
                'control': 'A.14.1',
                'check': 'Security Requirements',
                'status': 'PASS',
                'message': 'Using supported runtime version'
            })
        else:
            findings.append({
                'control': 'A.14.1',
                'check': 'Security Requirements',
                'status': 'FAIL',
                'message': 'Using deprecated runtime version'
            })
            recommendations.append('Update to supported runtime version')

        # A.17.1 Information security continuity
        reserved_concurrency = config.get('ReservedConcurrencyConfig')
        if reserved_concurrency:
            score += 1
            findings.append({
                'control': 'A.17.1',
                'check': 'Continuity Planning',
                'status': 'PASS',
                'message': 'Continuity measures implemented'
            })
        else:
            findings.append({
                'control': 'A.17.1',
                'check': 'Continuity Planning',
                'status': 'FAIL',
                'message': 'No continuity measures detected'
            })
            recommendations.append('Implement business continuity measures')

        compliance_percentage = (score / total_checks) * 100

        return {
            'framework': 'ISO27001',
            'compliance_percentage': compliance_percentage,
            'findings': findings,
            'recommendations': recommendations,
            'status': 'COMPLIANT' if compliance_percentage >= 80 else 'NON_COMPLIANT'
        }

    def _generate_compliance_recommendations(self, compliance_status: Dict[str, Any]) -> List[Dict[str, Any]]:
        """Generate compliance recommendations"""
        all_recommendations = []

        for framework, status in compliance_status.items():
            for rec in status.get('recommendations', []):
                all_recommendations.append({
                    'framework': framework,
                    'recommendation': rec,
                    'priority': 'HIGH' if status['compliance_percentage'] < 50 else 'MEDIUM'
                })

        return all_recommendations

    def _get_audit_trail(self, function_name: str, days: int = 30) -> List[Dict[str, Any]]:
        """Get audit trail"""
        audit_events = []

        try:
            end_time = datetime.utcnow()
            start_time = end_time - timedelta(days=days)

            # Find CloudTrail events
            response = self.cloudtrail_client.lookup_events(
                LookupAttributes=[
                    {
                        'AttributeKey': 'ResourceName',
                        'AttributeValue': function_name
                    }
                ],
                StartTime=start_time,
                EndTime=end_time,
                MaxItems=50
            )

            for event in response.get('Events', []):
                audit_events.append({
                    'event_time': event['EventTime'].isoformat(),
                    'event_name': event['EventName'],
                    'user_name': event.get('Username', 'Unknown'),
                    'source_ip': event.get('SourceIPAddress', 'Unknown'),
                    'user_agent': event.get('UserAgent', 'Unknown')
                })

        except Exception as e:
            audit_events.append({
                'error': f'Failed to retrieve audit trail: {str(e)}'
            })

        return audit_events

    def generate_compliance_report(self, compliance_result: Dict[str, Any]) -> str:
        """Generate compliance report"""
        report = f"""
LAMBDA COMPLIANCE AUDIT REPORT
==============================

Function: {compliance_result['function_name']}
Audit Date: {compliance_result['check_timestamp']}
Frameworks Checked: {', '.join(compliance_result['frameworks_checked'])}

COMPLIANCE STATUS:
"""

        for framework, status in compliance_result['compliance_status'].items():
            status_icon = "✓" if status['status'] == 'COMPLIANT' else "✗"
            report += f"  {status_icon} {framework}: {status['compliance_percentage']:.1f}% ({status['status']})\n"

        # High priority recommendations
        high_priority_recs = [r for r in compliance_result['recommendations'] if r['priority'] == 'HIGH']
        if high_priority_recs:
            report += f"\nHIGH PRIORITY RECOMMENDATIONS:\n"
            for rec in high_priority_recs[:5]:
                report += f"  - [{rec['framework']}] {rec['recommendation']}\n"

        # Audit activity summary
        audit_trail = compliance_result['audit_trail']
        if audit_trail and not any('error' in event for event in audit_trail):
            report += f"\nRECENT AUDIT ACTIVITIES ({len(audit_trail)}):\n"
            for event in audit_trail[-5:]:
                report += f"  - {event['event_time']}: {event['event_name']} by {event['user_name']}\n"

        return report

def main():
    """Main compliance check function"""
    function_name = input("Enter Lambda function name for compliance check: ").strip()
    frameworks = input("Enter frameworks to check (comma-separated, or 'all' for all): ").strip()

    if frameworks.lower() == 'all':
        frameworks = None
    else:
        frameworks = [f.strip().upper() for f in frameworks.split(',')]

    checker = ComplianceChecker()
    compliance_result = checker.comprehensive_compliance_check(function_name, frameworks)

    # Print report
    report_text = checker.generate_compliance_report(compliance_result)
    print(report_text)

    # Save detailed report
    with open(f"{function_name}_compliance_report.json", 'w') as f:
        json.dump(compliance_result, f, indent=2, default=str)

    print(f"\nDetailed compliance report saved to {function_name}_compliance_report.json")

if __name__ == "__main__":
    main()

12.5 Chapter Summary

Key Takeaways

Security Model:

  • Understand AWS shared responsibility model
  • Implement defense-in-depth strategy
  • Conduct regular security audits

Data Protection:

  • Use KMS for end-to-end encryption
  • Implement key rotation strategies
  • Protect sensitive data in transit

Network Security:

  • Properly configure VPC and security groups
  • Use network ACLs for enhanced protection
  • Enable VPC Flow Logs for monitoring

Compliance Management:

  • Understand various compliance framework requirements
  • Implement automated compliance checks
  • Maintain complete audit trails

Best Practices:

  • Follow principle of least privilege
  • Regularly update runtime versions
  • Implement continuous security monitoring
  • Establish incident response processes

Lambda security is an ongoing process that requires integrating security considerations throughout the entire development lifecycle. With the security techniques and compliance frameworks learned in this chapter, you can build secure, compliant serverless applications that protect business data and user privacy.

This is also the final chapter of the complete AWS Lambda course. Through these 12 chapters, you have mastered comprehensive knowledge from Lambda fundamentals to advanced security practices, enabling you to build high-quality serverless applications in real-world projects.

Further Reading