学习目标
- 理解IAM角色的概念和用途
- 创建和配置服务角色
- 设置跨账户角色访问
- 配置联合身份角色
- 掌握角色切换和临时凭证
角色系统架构图
3.1 IAM角色基础概念
3.1.1 角色与用户的区别
核心差异
| 特征 | IAM用户 | IAM角色 |
|---|---|---|
| 身份类型 | 永久身份 | 临时身份 |
| 凭证 | 长期凭证 | 临时凭证 |
| 使用场景 | 人员访问 | 服务间访问 |
| 权限获取 | 直接附加 | 代入获取 |
| 安全性 | 需要密钥管理 | 自动轮换 |
9/1/25About 19 min
核心差异
| 特征 | IAM用户 | IAM角色 |
|---|---|---|
| 身份类型 | 永久身份 | 临时身份 |
| 凭证 | 长期凭证 | 临时凭证 |
| 使用场景 | 人员访问 | 服务间访问 |
| 权限获取 | 直接附加 | 代入获取 |
| 安全性 | 需要密钥管理 | 自动轮换 |
策略基本结构
IAM策略使用JSON格式,包含以下基本元素:
权限边界核心概念
权限边界是一种高级功能,用于设置IAM实体(用户或角色)的最大权限。它不授予权限,而是定义了权限的上限。
import boto3
import json
from datetime import datetime, timedelta
import hashlib
import uuid
def implement_security_principles():
"""
实施IAM安全设计原则
"""
security_principles = {
"least_privilege": {
"principle": "最小权限原则",
"description": "只授予执行任务所需的最小权限",
"implementation_strategies": [
"从拒绝所有开始,逐步添加必需权限",
"定期审查和清理不必要的权限",
"使用权限边界限制最大权限范围",
"实施Just-In-Time访问模式",
"基于实际使用情况调整权限"
],
"best_practices": [
"避免使用通配符权限(*)",
"使用具体的资源ARN而非*",
"定期运行Access Analyzer",
"监控权限使用情况",
"实施权限生命周期管理"
]
},
"defense_in_depth": {
"principle": "深度防御原则",
"description": "实施多层安全控制机制",
"implementation_strategies": [
"组织级SCP + 账户级策略",
"权限边界 + 身份策略",
"网络安全 + 应用安全",
"加密传输 + 加密存储",
"监控 + 告警 + 响应"
],
"security_layers": [
"AWS组织和SCP策略",
"网络访问控制(VPC、安全组)",
"IAM身份和权限管理",
"资源级权限控制",
"应用程序安全控制",
"数据加密和保护",
"监控和审计系统"
]
},
"zero_trust": {
"principle": "零信任原则",
"description": "不信任任何用户或系统,始终验证",
"implementation_strategies": [
"强制身份验证和授权",
"基于上下文的访问控制",
"持续监控和验证",
"最小爆炸半径设计",
"动态权限调整"
],
"verification_factors": [
"用户身份(Who)",
"设备状态(Where)",
"应用程序(What)",
"时间上下文(When)",
"访问行为(How)",
"数据分类(Why)"
]
},
"separation_of_duties": {
"principle": "职责分离原则",
"description": "将关键操作分散给不同的人员或系统",
"implementation_strategies": [
"管理员和操作员角色分离",
"开发和生产环境分离",
"读权限和写权限分离",
"审计和执行功能分离",
"审批和执行流程分离"
],
"separation_examples": [
"数据库管理员不能直接访问应用数据",
"开发人员不能直接部署到生产环境",
"安全团队独立于开发团队",
"审计员独立于被审计系统",
"备份恢复需要双人授权"
]
}
}
print("🔐 IAM安全设计原则:")
for principle_key, principle in security_principles.items():
print(f"\n{principle['principle']}:")
print(f" 描述: {principle['description']}")
if 'implementation_strategies' in principle:
print(" 实施策略:")
for strategy in principle['implementation_strategies']:
print(f" • {strategy}")
if 'best_practices' in principle:
print(" 最佳实践:")
for practice in principle['best_practices']:
print(f" ✓ {practice}")
return security_principles
def create_secure_iam_architecture():
"""
创建安全的IAM架构设计
"""
class SecureIAMArchitecture:
def __init__(self):
self.iam = boto3.client('iam')
self.organizations = boto3.client('organizations')
self.sts = boto3.client('sts')
def design_role_hierarchy(self):
"""设计角色层次结构"""
role_hierarchy = {
"administrative_roles": {
"OrganizationAdmin": {
"description": "组织管理员,管理AWS组织结构",
"permissions": ["organizations:*", "account:*"],
"restrictions": ["不能直接访问工作负载", "需要MFA"],
"assume_conditions": {
"Bool": {"aws:MultiFactorAuthPresent": "true"},
"NumericLessThan": {"aws:MultiFactorAuthAge": "1800"}
}
},
"SecurityAdmin": {
"description": "安全管理员,管理IAM和安全配置",
"permissions": ["iam:*", "config:*", "cloudtrail:*"],
"restrictions": ["不能访问应用数据", "独立于开发团队"],
"assume_conditions": {
"Bool": {"aws:MultiFactorAuthPresent": "true"},
"StringEquals": {"aws:PrincipalTag/Department": "Security"}
}
},
"AuditAdmin": {
"description": "审计管理员,只读访问审计日志",
"permissions": ["*:Get*", "*:List*", "*:Describe*"],
"restrictions": ["只读权限", "不能修改配置"],
"assume_conditions": {
"Bool": {"aws:MultiFactorAuthPresent": "true"}
}
}
},
"operational_roles": {
"DevOpsEngineer": {
"description": "DevOps工程师,管理基础设施",
"permissions": ["ec2:*", "cloudformation:*", "lambda:*"],
"restrictions": ["不能修改IAM", "环境隔离"],
"assume_conditions": {
"StringEquals": {"aws:ResourceTag/Environment": "${aws:PrincipalTag/AllowedEnvironment}"}
}
},
"DatabaseAdmin": {
"description": "数据库管理员,管理数据库资源",
"permissions": ["rds:*", "dynamodb:*"],
"restrictions": ["不能访问应用数据", "只能管理数据库实例"],
"assume_conditions": {
"Bool": {"aws:MultiFactorAuthPresent": "true"}
}
}
},
"application_roles": {
"WebServerRole": {
"description": "Web服务器角色",
"permissions": ["s3:GetObject", "dynamodb:GetItem"],
"restrictions": ["只能访问指定资源"],
"assume_conditions": {
"StringEquals": {"ec2:SourceInstanceARN": "${aws:TokenIssueTime}"}
}
},
"LambdaExecutionRole": {
"description": "Lambda函数执行角色",
"permissions": ["logs:*", "s3:GetObject"],
"restrictions": ["最小化权限"],
"assume_conditions": {
"StringEquals": {"aws:SourceArn": "arn:aws:lambda:*"}
}
}
}
}
return role_hierarchy
def implement_mfa_enforcement(self):
"""实施MFA强制策略"""
mfa_policy = {
"Version": "2012-10-17",
"Statement": [
{
"Sid": "AllowViewAccountInfo",
"Effect": "Allow",
"Action": [
"iam:GetAccountPasswordPolicy",
"iam:GetAccountSummary",
"iam:ListVirtualMFADevices"
],
"Resource": "*"
},
{
"Sid": "AllowManageOwnPasswords",
"Effect": "Allow",
"Action": [
"iam:ChangePassword",
"iam:GetUser"
],
"Resource": "arn:aws:iam::*:user/${aws:username}"
},
{
"Sid": "AllowManageOwnMFA",
"Effect": "Allow",
"Action": [
"iam:CreateVirtualMFADevice",
"iam:DeleteVirtualMFADevice",
"iam:EnableMFADevice",
"iam:ListMFADevices",
"iam:ResyncMFADevice"
],
"Resource": [
"arn:aws:iam::*:mfa/${aws:username}",
"arn:aws:iam::*:user/${aws:username}"
]
},
{
"Sid": "DenyAllExceptUnlessSignedInWithMFA",
"Effect": "Deny",
"NotAction": [
"iam:ChangePassword",
"iam:CreateVirtualMFADevice",
"iam:EnableMFADevice",
"iam:GetUser",
"iam:ListMFADevices",
"iam:ListVirtualMFADevices",
"iam:ResyncMFADevice",
"sts:GetSessionToken"
],
"Resource": "*",
"Condition": {
"BoolIfExists": {
"aws:MultiFactorAuthPresent": "false"
}
}
}
]
}
print("🔐 MFA强制策略:")
print(" - 允许查看账户信息")
print(" - 允许管理自己的密码和MFA设备")
print(" - 拒绝所有其他操作,除非已通过MFA认证")
return mfa_policy
def create_break_glass_procedure(self):
"""创建紧急访问(破玻璃)程序"""
break_glass_config = {
"emergency_role": {
"name": "EmergencyBreakGlassRole",
"description": "紧急情况下的全权限角色",
"permissions": ["*:*"],
"conditions": {
"StringEquals": {
"aws:RequestTag/Emergency": "true",
"aws:RequestTag/RequestedBy": "${aws:username}",
"aws:RequestTag/Reason": "EMERGENCY"
},
"DateGreaterThan": {
"aws:CurrentTime": "${aws:RequestTag/ValidFrom}"
},
"DateLessThan": {
"aws:CurrentTime": "${aws:RequestTag/ValidUntil}"
}
},
"monitoring": {
"cloudwatch_alarm": "EmergencyAccessAlarm",
"sns_notification": "SecurityTeamTopic",
"audit_logging": "EmergencyAccessLogGroup"
}
},
"activation_process": [
"1. 安全团队成员申请紧急访问",
"2. 提供详细的紧急情况描述",
"3. 获得安全主管的批准",
"4. 系统自动创建临时访问令牌",
"5. 所有操作被完整记录和监控",
"6. 紧急情况结束后立即撤销访问"
],
"automatic_controls": {
"max_duration": "4小时",
"automatic_revocation": True,
"real_time_monitoring": True,
"approval_required": True,
"full_audit_trail": True
}
}
print("🚨 紧急访问(破玻璃)程序:")
print(f" 角色名称: {break_glass_config['emergency_role']['name']}")
print(f" 最大持续时间: {break_glass_config['automatic_controls']['max_duration']}")
print(" 激活流程:")
for step in break_glass_config['activation_process']:
print(f" {step}")
return break_glass_config
return SecureIAMArchitecture()
# 实施安全原则
security_principles = implement_security_principles()
# 创建安全架构
secure_arch = create_secure_iam_architecture()
role_hierarchy = secure_arch.design_role_hierarchy()
mfa_policy = secure_arch.implement_mfa_enforcement()
break_glass_config = secure_arch.create_break_glass_procedure()
print("\n📋 角色层次结构示例:")
for category, roles in role_hierarchy.items():
print(f"\n{category.replace('_', ' ').title()}:")
for role_name, role_config in roles.items():
print(f" {role_name}: {role_config['description']}")
# cdk_project/app.py
#!/usr/bin/env python3
import aws_cdk as cdk
from iam_stack import IamStack
app = cdk.App()
IamStack(app, "IamStack",
env=cdk.Environment(
account="123456789012", # 替换为您的账户ID
region="us-east-1"
)
)
app.synth()
import boto3
import json
from datetime import datetime
def configure_ec2_iam_integration():
"""
配置EC2与IAM的集成
"""
class EC2IAMIntegration:
def __init__(self):
self.iam = boto3.client('iam')
self.ec2 = boto3.client('ec2')
self.sts = boto3.client('sts')
def create_ec2_service_role(self, role_name, permissions):
"""创建EC2服务角色"""
# EC2信任策略
trust_policy = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": "ec2.amazonaws.com"
},
"Action": "sts:AssumeRole"
}
]
}
try:
# 创建角色
role_response = self.iam.create_role(
RoleName=role_name,
AssumeRolePolicyDocument=json.dumps(trust_policy),
Description=f'Service role for EC2 instances: {role_name}',
Tags=[
{'Key': 'Service', 'Value': 'EC2'},
{'Key': 'ManagedBy', 'Value': 'IAM-Automation'},
{'Key': 'CreatedDate', 'Value': datetime.now().strftime('%Y-%m-%d')}
]
)
role_arn = role_response['Role']['Arn']
print(f"✅ EC2角色创建成功: {role_arn}")
# 附加权限策略
for permission in permissions:
if permission['type'] == 'managed':
self.iam.attach_role_policy(
RoleName=role_name,
PolicyArn=permission['arn']
)
elif permission['type'] == 'inline':
self.iam.put_role_policy(
RoleName=role_name,
PolicyName=permission['name'],
PolicyDocument=json.dumps(permission['document'])
)
# 创建实例配置文件
self.iam.create_instance_profile(
InstanceProfileName=role_name
)
# 将角色添加到实例配置文件
self.iam.add_role_to_instance_profile(
InstanceProfileName=role_name,
RoleName=role_name
)
print(f"✅ 实例配置文件创建成功: {role_name}")
return {
'role_arn': role_arn,
'instance_profile_name': role_name
}
except Exception as e:
print(f"❌ 创建EC2角色失败: {str(e)}")
return None
def create_web_server_role(self):
"""创建Web服务器角色"""
permissions = [
{
'type': 'managed',
'arn': 'arn:aws:iam::aws:policy/CloudWatchAgentServerPolicy'
},
{
'type': 'inline',
'name': 'WebServerCustomPolicy',
'document': {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"s3:GetObject",
"s3:PutObject"
],
"Resource": "arn:aws:s3:::my-web-content/*"
},
{
"Effect": "Allow",
"Action": [
"ssm:GetParameter",
"ssm:GetParameters",
"ssm:GetParametersByPath"
],
"Resource": "arn:aws:ssm:*:*:parameter/webserver/*"
},
{
"Effect": "Allow",
"Action": [
"secretsmanager:GetSecretValue"
],
"Resource": "arn:aws:secretsmanager:*:*:secret:webserver/*"
}
]
}
}
]
return self.create_ec2_service_role("WebServerRole", permissions)
def create_database_server_role(self):
"""创建数据库服务器角色"""
permissions = [
{
'type': 'managed',
'arn': 'arn:aws:iam::aws:policy/CloudWatchAgentServerPolicy'
},
{
'type': 'inline',
'name': 'DatabaseServerPolicy',
'document': {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"s3:GetObject",
"s3:PutObject"
],
"Resource": [
"arn:aws:s3:::database-backups/*",
"arn:aws:s3:::database-logs/*"
]
},
{
"Effect": "Allow",
"Action": [
"kms:Encrypt",
"kms:Decrypt",
"kms:ReEncrypt*",
"kms:GenerateDataKey*",
"kms:DescribeKey"
],
"Resource": "arn:aws:kms:*:*:key/database-encryption-key"
},
{
"Effect": "Allow",
"Action": [
"ssm:GetParameter",
"ssm:PutParameter"
],
"Resource": "arn:aws:ssm:*:*:parameter/database/*"
}
]
}
}
]
return self.create_ec2_service_role("DatabaseServerRole", permissions)
def launch_instance_with_role(self, instance_profile_name, **kwargs):
"""使用IAM角色启动EC2实例"""
launch_params = {
'ImageId': kwargs.get('image_id', 'ami-0abcdef1234567890'),
'InstanceType': kwargs.get('instance_type', 't3.micro'),
'MinCount': 1,
'MaxCount': 1,
'IamInstanceProfile': {
'Name': instance_profile_name
},
'SecurityGroupIds': kwargs.get('security_groups', []),
'SubnetId': kwargs.get('subnet_id'),
'KeyName': kwargs.get('key_name'),
'UserData': kwargs.get('user_data', ''),
'TagSpecifications': [
{
'ResourceType': 'instance',
'Tags': [
{'Key': 'Name', 'Value': kwargs.get('name', 'IAM-Managed-Instance')},
{'Key': 'IAMRole', 'Value': instance_profile_name},
{'Key': 'ManagedBy', 'Value': 'IAM-Integration'}
]
}
]
}
try:
response = self.ec2.run_instances(**launch_params)
instance_id = response['Instances'][0]['InstanceId']
print(f"✅ EC2实例启动成功: {instance_id}")
print(f" 使用IAM角色: {instance_profile_name}")
return instance_id
except Exception as e:
print(f"❌ 启动EC2实例失败: {str(e)}")
return None
def test_instance_permissions(self, instance_id):
"""测试实例权限"""
# 获取实例元数据中的角色信息
test_script = f'''
#!/bin/bash
echo "=== 测试EC2实例IAM权限 ==="
# 获取实例元数据
echo "1. 获取实例IAM角色信息:"
curl -s http://169.254.169.254/latest/meta-data/iam/security-credentials/
echo -e "\\n2. 测试AWS CLI权限:"
aws sts get-caller-identity
echo -e "\\n3. 测试S3权限:"
aws s3 ls
echo -e "\\n4. 测试SSM参数访问:"
aws ssm get-parameters-by-path --path "/webserver" --recursive
echo -e "\\n5. 测试CloudWatch日志权限:"
aws logs describe-log-groups --limit 5
'''
print(f"📋 实例权限测试脚本 (Instance ID: {instance_id}):")
print(test_script)
return test_script
return EC2IAMIntegration()
# 演示EC2 IAM集成
ec2_iam = configure_ec2_iam_integration()
# 创建Web服务器角色
web_role = ec2_iam.create_web_server_role()
# 创建数据库服务器角色
db_role = ec2_iam.create_database_server_role()
print("\n📋 EC2 IAM集成配置完成:")
print(" ✅ Web服务器角色已创建")
print(" ✅ 数据库服务器角色已创建")
print(" ✅ 实例配置文件已配置")
import boto3
import json
import xml.etree.ElementTree as ET
from datetime import datetime, timedelta
import base64
import urllib.parse
def understand_identity_federation():
"""
理解联合身份的概念和架构
"""
federation_concepts = {
"identity_federation": {
"definition": "联合身份允许外部身份提供商的用户访问AWS资源,无需在AWS中创建IAM用户",
"benefits": [
"单点登录体验",
"集中的身份管理",
"减少密码管理复杂性",
"支持企业级身份提供商",
"临时凭证提高安全性"
],
"key_components": [
"身份提供商 (IdP)",
"服务提供商 (SP) - AWS",
"身份断言",
"信任关系",
"角色映射"
]
},
"federation_types": {
"saml_federation": {
"description": "基于SAML 2.0协议的企业级联合",
"use_cases": ["企业Active Directory", "ADFS", "第三方IdP"],
"workflow": [
"用户在企业IdP认证",
"IdP生成SAML断言",
"用户重定向到AWS SAML端点",
"AWS验证断言并提供临时凭证",
"用户使用临时凭证访问AWS资源"
]
},
"oidc_federation": {
"description": "基于OpenID Connect的Web身份联合",
"use_cases": ["移动应用", "Web应用", "第三方登录"],
"workflow": [
"用户通过OIDC提供商认证",
"获取身份令牌",
"使用令牌调用AWS STS",
"获取临时AWS凭证",
"访问AWS资源"
]
},
"web_identity_federation": {
"description": "直接与Web身份提供商集成",
"use_cases": ["Amazon Cognito", "Google", "Facebook登录"],
"workflow": [
"用户通过Web IdP登录",
"获取身份令牌",
"直接使用AssumeRoleWithWebIdentity",
"获取AWS临时凭证"
]
}
}
}
# 联合身份的信任模型
trust_model = {
"trust_establishment": {
"identity_provider_setup": "在AWS中配置身份提供商",
"trust_policy_creation": "创建信任策略允许IdP代入角色",
"attribute_mapping": "映射IdP属性到AWS角色",
"condition_validation": "验证联合条件和约束"
},
"security_considerations": [
"验证SAML断言的完整性和真实性",
"使用适当的条件限制角色访问",
"定期轮换加密密钥",
"监控和审计联合访问",
"实施最小权限原则"
]
}
print("📋 联合身份概念:")
print(f"定义: {federation_concepts['identity_federation']['definition']}")
print("\n优势:")
for benefit in federation_concepts['identity_federation']['benefits']:
print(f" • {benefit}")
print("\n关键组件:")
for component in federation_concepts['identity_federation']['key_components']:
print(f" • {component}")
print("\n📋 联合类型:")
for fed_type, details in federation_concepts['federation_types'].items():
print(f"\n{fed_type.replace('_', ' ').title()}:")
print(f" 描述: {details['description']}")
print(f" 用例: {', '.join(details['use_cases'])}")
return federation_concepts, trust_model
class IdentityFederationManager:
"""联合身份管理器"""
def __init__(self):
self.iam = boto3.client('iam')
self.sts = boto3.client('sts')
def create_saml_identity_provider(self, provider_name, metadata_document):
"""创建SAML身份提供商"""
try:
response = self.iam.create_saml_provider(
SAMLMetadataDocument=metadata_document,
Name=provider_name,
Tags=[
{'Key': 'Type', 'Value': 'SAML'},
{'Key': 'Purpose', 'Value': 'Federation'},
{'Key': 'CreatedDate', 'Value': datetime.now().strftime('%Y-%m-%d')}
]
)
provider_arn = response['SAMLProviderArn']
print(f"✅ SAML身份提供商创建成功: {provider_arn}")
return provider_arn
except Exception as e:
print(f"❌ 创建SAML身份提供商失败: {str(e)}")
return None
def create_oidc_identity_provider(self, provider_url, client_ids, thumbprints):
"""创建OIDC身份提供商"""
try:
response = self.iam.create_open_id_connect_provider(
Url=provider_url,
ClientIDList=client_ids,
ThumbprintList=thumbprints,
Tags=[
{'Key': 'Type', 'Value': 'OIDC'},
{'Key': 'Purpose', 'Value': 'WebIdentityFederation'}
]
)
provider_arn = response['OpenIDConnectProviderArn']
print(f"✅ OIDC身份提供商创建成功: {provider_arn}")
return provider_arn
except Exception as e:
print(f"❌ 创建OIDC身份提供商失败: {str(e)}")
return None
# 演示联合身份概念
federation_concepts, trust_model = understand_identity_federation()
federation_manager = IdentityFederationManager()
print("\n📋 信任模型组件:")
for component, description in trust_model['trust_establishment'].items():
print(f" {component.replace('_', ' ').title()}: {description}")
通过本章学习,你将能够:
IAM访问分析是确保云安全的关键环节,帮助识别潜在的安全风险和合规问题。
通过本章学习,你将能够:
IAM Roles Anywhere允许您的工作负载在AWS之外使用IAM角色,为混合云和多云环境提供统一的身份管理。
学习目标
假设我们为一家中型科技公司设计IAM架构,该公司具有以下特点: