第10章:IAM访问分析和审计
2025/9/1大约 15 分钟
第10章:IAM访问分析和审计
学习目标
通过本章学习,你将能够:
- 理解IAM访问分析的重要性和核心概念
- 掌握使用AWS CloudTrail进行IAM操作审计
- 学会使用IAM Access Analyzer进行权限风险分析
- 实现IAM合规性检查和报告自动化
- 构建IAM安全监控和告警系统
- 掌握IAM访问模式分析和优化技术
10.1 IAM访问分析基础
10.1.1 访问分析概述
IAM访问分析是确保云安全的关键环节,帮助识别潜在的安全风险和合规问题。
10.1.2 访问分析Python工具框架
import boto3
import json
from datetime import datetime, timedelta
from typing import Dict, List, Any
import pandas as pd
from collections import defaultdict
class IAMAccessAnalyzer:
"""IAM访问分析器"""
def __init__(self):
self.iam = boto3.client('iam')
self.cloudtrail = boto3.client('cloudtrail')
self.access_analyzer = boto3.client('accessanalyzer')
self.sts = boto3.client('sts')
def get_account_summary(self) -> Dict[str, Any]:
"""获取IAM账户使用概览"""
try:
summary = self.iam.get_account_summary()
return summary['SummaryMap']
except Exception as e:
print(f"获取账户概览失败: {e}")
return {}
def analyze_user_permissions(self, username: str) -> Dict[str, Any]:
"""分析用户权限使用情况"""
analysis = {
'user': username,
'attached_policies': [],
'inline_policies': [],
'group_memberships': [],
'last_activity': None,
'unused_permissions': [],
'risk_score': 0
}
try:
# 获取用户信息
user_info = self.iam.get_user(UserName=username)
analysis['created_date'] = user_info['User']['CreateDate']
# 获取直接附加的策略
attached_policies = self.iam.list_attached_user_policies(
UserName=username
)
analysis['attached_policies'] = attached_policies['AttachedPolicies']
# 获取内联策略
inline_policies = self.iam.list_user_policies(UserName=username)
for policy_name in inline_policies['PolicyNames']:
policy_doc = self.iam.get_user_policy(
UserName=username,
PolicyName=policy_name
)
analysis['inline_policies'].append({
'name': policy_name,
'document': policy_doc['PolicyDocument']
})
# 获取组成员身份
groups = self.iam.get_groups_for_user(UserName=username)
analysis['group_memberships'] = groups['Groups']
# 分析最后活动时间
analysis['last_activity'] = self._get_user_last_activity(username)
# 计算风险评分
analysis['risk_score'] = self._calculate_user_risk_score(analysis)
return analysis
except Exception as e:
print(f"分析用户权限失败: {e}")
return analysis
def _get_user_last_activity(self, username: str) -> datetime:
"""获取用户最后活动时间"""
try:
# 查询最近30天的CloudTrail日志
end_time = datetime.utcnow()
start_time = end_time - timedelta(days=30)
events = self.cloudtrail.lookup_events(
LookupAttributes=[
{
'AttributeKey': 'Username',
'AttributeValue': username
}
],
StartTime=start_time,
EndTime=end_time,
MaxItems=1
)
if events['Events']:
return events['Events'][0]['EventTime']
return None
except Exception as e:
print(f"获取用户活动时间失败: {e}")
return None
def _calculate_user_risk_score(self, analysis: Dict[str, Any]) -> int:
"""计算用户风险评分"""
risk_score = 0
# 权限过多风险
total_policies = (
len(analysis['attached_policies']) +
len(analysis['inline_policies'])
)
if total_policies > 5:
risk_score += 20
# 长时间未活动风险
if analysis['last_activity']:
days_inactive = (datetime.utcnow() - analysis['last_activity']).days
if days_inactive > 90:
risk_score += 30
elif days_inactive > 30:
risk_score += 15
else:
risk_score += 40 # 从未活动
# 管理员权限风险
for policy in analysis['attached_policies']:
if 'Admin' in policy['PolicyName']:
risk_score += 25
return min(risk_score, 100)
# 使用示例
analyzer = IAMAccessAnalyzer()
# 获取账户概览
account_summary = analyzer.get_account_summary()
print("IAM账户概览:")
for key, value in account_summary.items():
print(f" {key}: {value}")
# 分析特定用户
user_analysis = analyzer.analyze_user_permissions('example-user')
print(f"\n用户分析结果:")
print(f"用户: {user_analysis['user']}")
print(f"风险评分: {user_analysis['risk_score']}/100")
print(f"附加策略数量: {len(user_analysis['attached_policies'])}")
print(f"最后活动: {user_analysis['last_activity']}")
10.2 使用CloudTrail进行操作审计
10.2.1 CloudTrail IAM事件监控
class CloudTrailIAMMonitor:
"""CloudTrail IAM事件监控器"""
def __init__(self):
self.cloudtrail = boto3.client('cloudtrail')
self.logs = boto3.client('logs')
def analyze_iam_events(self, days: int = 7) -> Dict[str, Any]:
"""分析IAM相关事件"""
end_time = datetime.utcnow()
start_time = end_time - timedelta(days=days)
iam_events = {
'total_events': 0,
'user_creation': [],
'policy_changes': [],
'role_assumptions': [],
'permission_changes': [],
'failed_attempts': [],
'high_risk_events': []
}
try:
# 查询IAM相关事件
paginator = self.cloudtrail.get_paginator('lookup_events')
for page in paginator.paginate(
LookupAttributes=[
{
'AttributeKey': 'EventName',
'AttributeValue': 'CreateUser'
}
],
StartTime=start_time,
EndTime=end_time
):
for event in page['Events']:
iam_events['total_events'] += 1
event_name = event['EventName']
username = event.get('Username', 'Unknown')
# 分类事件
if event_name in ['CreateUser', 'DeleteUser']:
iam_events['user_creation'].append({
'event': event_name,
'time': event['EventTime'],
'user': username,
'source_ip': event.get('SourceIPAddress')
})
elif event_name in ['CreatePolicy', 'DeletePolicy',
'PutUserPolicy', 'DeleteUserPolicy']:
iam_events['policy_changes'].append({
'event': event_name,
'time': event['EventTime'],
'user': username,
'details': self._extract_policy_details(event)
})
elif event_name == 'AssumeRole':
iam_events['role_assumptions'].append({
'time': event['EventTime'],
'user': username,
'role': self._extract_role_name(event),
'source_ip': event.get('SourceIPAddress')
})
# 识别高风险事件
iam_events['high_risk_events'] = self._identify_high_risk_events(
iam_events
)
return iam_events
except Exception as e:
print(f"分析IAM事件失败: {e}")
return iam_events
def _extract_policy_details(self, event: Dict[str, Any]) -> Dict[str, Any]:
"""提取策略变更详情"""
try:
cloud_trail_event = json.loads(event.get('CloudTrailEvent', '{}'))
request_params = cloud_trail_event.get('requestParameters', {})
return {
'policy_name': request_params.get('policyName'),
'policy_arn': request_params.get('policyArn'),
'user_name': request_params.get('userName'),
'role_name': request_params.get('roleName')
}
except:
return {}
def _extract_role_name(self, event: Dict[str, Any]) -> str:
"""提取角色名称"""
try:
cloud_trail_event = json.loads(event.get('CloudTrailEvent', '{}'))
role_arn = cloud_trail_event.get('requestParameters', {}).get('roleArn', '')
return role_arn.split('/')[-1] if role_arn else 'Unknown'
except:
return 'Unknown'
def _identify_high_risk_events(self, events: Dict[str, Any]) -> List[Dict[str, Any]]:
"""识别高风险事件"""
high_risk = []
# 检查用户创建频率
if len(events['user_creation']) > 10:
high_risk.append({
'type': 'frequent_user_creation',
'count': len(events['user_creation']),
'risk_level': 'high'
})
# 检查策略变更频率
if len(events['policy_changes']) > 20:
high_risk.append({
'type': 'frequent_policy_changes',
'count': len(events['policy_changes']),
'risk_level': 'medium'
})
# 检查异常IP地址
ip_frequencies = defaultdict(int)
for event in events['role_assumptions']:
if event.get('source_ip'):
ip_frequencies[event['source_ip']] += 1
for ip, count in ip_frequencies.items():
if count > 100: # 单个IP过度使用
high_risk.append({
'type': 'suspicious_ip_activity',
'ip_address': ip,
'count': count,
'risk_level': 'high'
})
return high_risk
# 使用示例
monitor = CloudTrailIAMMonitor()
iam_events = monitor.analyze_iam_events(days=7)
print("IAM事件分析结果:")
print(f"总事件数: {iam_events['total_events']}")
print(f"用户创建事件: {len(iam_events['user_creation'])}")
print(f"策略变更事件: {len(iam_events['policy_changes'])}")
print(f"角色假设事件: {len(iam_events['role_assumptions'])}")
print(f"高风险事件: {len(iam_events['high_risk_events'])}")
# 显示高风险事件详情
for risk_event in iam_events['high_risk_events']:
print(f"\n高风险事件:")
print(f" 类型: {risk_event['type']}")
print(f" 风险级别: {risk_event['risk_level']}")
10.2.2 自动化审计报告生成
import matplotlib.pyplot as plt
import seaborn as sns
from jinja2 import Template
class IAMAuditReporter:
"""IAM审计报告生成器"""
def __init__(self):
self.analyzer = IAMAccessAnalyzer()
self.monitor = CloudTrailIAMMonitor()
def generate_comprehensive_report(self, output_path: str = "iam_audit_report.html"):
"""生成综合审计报告"""
# 收集数据
account_summary = self.analyzer.get_account_summary()
iam_events = self.monitor.analyze_iam_events(days=30)
# 获取所有用户列表
users_response = self.analyzer.iam.list_users()
users_analysis = []
for user in users_response['Users']:
user_analysis = self.analyzer.analyze_user_permissions(
user['UserName']
)
users_analysis.append(user_analysis)
# 生成统计图表
self._generate_charts(users_analysis, iam_events)
# 生成HTML报告
html_content = self._generate_html_report(
account_summary,
users_analysis,
iam_events
)
with open(output_path, 'w', encoding='utf-8') as f:
f.write(html_content)
print(f"审计报告已生成: {output_path}")
return {
'report_path': output_path,
'total_users': len(users_analysis),
'high_risk_users': sum(1 for u in users_analysis if u['risk_score'] > 70),
'total_events': iam_events['total_events']
}
def _generate_charts(self, users_analysis: List[Dict], iam_events: Dict):
"""生成统计图表"""
plt.style.use('seaborn-v0_8')
# 用户风险评分分布图
risk_scores = [user['risk_score'] for user in users_analysis]
plt.figure(figsize=(12, 8))
plt.subplot(2, 2, 1)
plt.hist(risk_scores, bins=10, alpha=0.7, color='skyblue')
plt.title('用户风险评分分布')
plt.xlabel('风险评分')
plt.ylabel('用户数量')
# 策略数量分布图
policy_counts = [
len(user['attached_policies']) + len(user['inline_policies'])
for user in users_analysis
]
plt.subplot(2, 2, 2)
plt.hist(policy_counts, bins=10, alpha=0.7, color='lightcoral')
plt.title('用户策略数量分布')
plt.xlabel('策略数量')
plt.ylabel('用户数量')
# IAM事件时间趋势图
plt.subplot(2, 2, 3)
event_dates = []
for event_list in [iam_events['user_creation'],
iam_events['policy_changes']]:
event_dates.extend([event['time'].date() for event in event_list])
if event_dates:
event_dates_df = pd.DataFrame({'date': event_dates})
event_counts = event_dates_df.groupby('date').size()
event_counts.plot(kind='line', color='green')
plt.title('IAM事件时间趋势')
plt.xlabel('日期')
plt.ylabel('事件数量')
# 风险等级饼图
plt.subplot(2, 2, 4)
risk_levels = {
'低风险 (0-30)': sum(1 for score in risk_scores if score <= 30),
'中风险 (31-70)': sum(1 for score in risk_scores if 31 <= score <= 70),
'高风险 (71-100)': sum(1 for score in risk_scores if score > 70)
}
plt.pie(risk_levels.values(), labels=risk_levels.keys(), autopct='%1.1f%%')
plt.title('用户风险等级分布')
plt.tight_layout()
plt.savefig('iam_analysis_charts.png', dpi=300, bbox_inches='tight')
plt.close()
def _generate_html_report(self, account_summary: Dict,
users_analysis: List[Dict],
iam_events: Dict) -> str:
"""生成HTML审计报告"""
template_str = """
<!DOCTYPE html>
<html>
<head>
<title>IAM访问审计报告</title>
<style>
body { font-family: Arial, sans-serif; margin: 40px; }
.header { background-color: #f4f4f4; padding: 20px; border-radius: 5px; }
.section { margin: 20px 0; }
.risk-high { color: red; font-weight: bold; }
.risk-medium { color: orange; font-weight: bold; }
.risk-low { color: green; font-weight: bold; }
table { border-collapse: collapse; width: 100%; margin: 10px 0; }
th, td { border: 1px solid #ddd; padding: 8px; text-align: left; }
th { background-color: #f2f2f2; }
.chart { text-align: center; margin: 20px 0; }
</style>
</head>
<body>
<div class="header">
<h1>IAM访问审计报告</h1>
<p>生成时间: {{ report_time }}</p>
<p>审计周期: 最近30天</p>
</div>
<div class="section">
<h2>账户概览</h2>
<table>
<tr><th>项目</th><th>数量</th></tr>
{% for key, value in account_summary.items() %}
<tr><td>{{ key }}</td><td>{{ value }}</td></tr>
{% endfor %}
</table>
</div>
<div class="section">
<h2>高风险用户</h2>
<table>
<tr><th>用户名</th><th>风险评分</th><th>策略数量</th><th>最后活动</th></tr>
{% for user in high_risk_users %}
<tr>
<td>{{ user.user }}</td>
<td class="risk-high">{{ user.risk_score }}</td>
<td>{{ user.policy_count }}</td>
<td>{{ user.last_activity or '从未活动' }}</td>
</tr>
{% endfor %}
</table>
</div>
<div class="section">
<h2>IAM事件统计</h2>
<table>
<tr><th>事件类型</th><th>数量</th></tr>
<tr><td>用户创建/删除</td><td>{{ iam_events.user_creation|length }}</td></tr>
<tr><td>策略变更</td><td>{{ iam_events.policy_changes|length }}</td></tr>
<tr><td>角色假设</td><td>{{ iam_events.role_assumptions|length }}</td></tr>
<tr><td>高风险事件</td><td class="risk-high">{{ iam_events.high_risk_events|length }}</td></tr>
</table>
</div>
<div class="chart">
<h2>分析图表</h2>
<img src="iam_analysis_charts.png" alt="IAM分析图表" style="max-width: 100%;">
</div>
<div class="section">
<h2>建议和后续行动</h2>
<ul>
<li>定期审查高风险评分用户的权限配置</li>
<li>清理长时间未活动的用户账户</li>
<li>实施最小权限原则,移除不必要的策略</li>
<li>启用MFA为所有管理用户</li>
<li>设置CloudWatch告警监控异常IAM活动</li>
</ul>
</div>
</body>
</html>
"""
template = Template(template_str)
# 筛选高风险用户
high_risk_users = [
{
'user': user['user'],
'risk_score': user['risk_score'],
'policy_count': len(user['attached_policies']) + len(user['inline_policies']),
'last_activity': user['last_activity']
}
for user in users_analysis if user['risk_score'] > 50
]
return template.render(
report_time=datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
account_summary=account_summary,
high_risk_users=high_risk_users,
iam_events=iam_events
)
# 使用示例
reporter = IAMAuditReporter()
report_result = reporter.generate_comprehensive_report()
print("审计报告生成完成:")
print(f" 报告路径: {report_result['report_path']}")
print(f" 总用户数: {report_result['total_users']}")
print(f" 高风险用户: {report_result['high_risk_users']}")
print(f" 总事件数: {report_result['total_events']}")
10.3 IAM Access Analyzer使用
10.3.1 Access Analyzer配置和使用
class IAMAccessAnalyzerService:
"""IAM Access Analyzer服务管理"""
def __init__(self):
self.access_analyzer = boto3.client('accessanalyzer')
self.iam = boto3.client('iam')
def create_analyzer(self, analyzer_name: str, type: str = 'ACCOUNT') -> Dict[str, Any]:
"""创建访问分析器"""
try:
response = self.access_analyzer.create_analyzer(
analyzerName=analyzer_name,
type=type,
tags={
'Purpose': 'IAM-Security-Analysis',
'Environment': 'Production'
}
)
print(f"访问分析器创建成功: {response['arn']}")
return response
except Exception as e:
print(f"创建访问分析器失败: {e}")
return {}
def analyze_external_access(self, analyzer_name: str) -> Dict[str, Any]:
"""分析外部访问权限"""
try:
# 获取所有发现的问题
findings = self.access_analyzer.list_findings(
analyzerArn=self._get_analyzer_arn(analyzer_name)
)
analysis_result = {
'total_findings': len(findings['findings']),
'active_findings': [],
'archived_findings': [],
'by_resource_type': defaultdict(int),
'by_severity': defaultdict(int),
'recommendations': []
}
for finding in findings['findings']:
# 按状态分类
if finding['status'] == 'ACTIVE':
analysis_result['active_findings'].append(finding)
else:
analysis_result['archived_findings'].append(finding)
# 按资源类型统计
resource_type = finding['resourceType']
analysis_result['by_resource_type'][resource_type] += 1
# 按严重程度分类(根据条件数量判断)
condition_count = len(finding.get('condition', {}))
if condition_count == 0:
severity = 'HIGH'
elif condition_count <= 2:
severity = 'MEDIUM'
else:
severity = 'LOW'
analysis_result['by_severity'][severity] += 1
# 生成建议
analysis_result['recommendations'] = self._generate_access_recommendations(
analysis_result
)
return analysis_result
except Exception as e:
print(f"分析外部访问失败: {e}")
return {}
def _get_analyzer_arn(self, analyzer_name: str) -> str:
"""获取分析器ARN"""
try:
analyzers = self.access_analyzer.list_analyzers()
for analyzer in analyzers['analyzers']:
if analyzer['name'] == analyzer_name:
return analyzer['arn']
return ""
except:
return ""
def _generate_access_recommendations(self, analysis: Dict[str, Any]) -> List[str]:
"""生成访问优化建议"""
recommendations = []
if analysis['by_severity']['HIGH'] > 0:
recommendations.append(
f"发现 {analysis['by_severity']['HIGH']} 个高风险外部访问权限,"
"建议立即审查并限制访问条件"
)
if analysis['by_resource_type']['AWS::S3::Bucket'] > 5:
recommendations.append(
"S3存储桶外部访问过多,建议启用S3 Block Public Access"
)
if analysis['by_resource_type']['AWS::IAM::Role'] > 0:
recommendations.append(
"发现IAM角色被外部账户访问,建议审查信任关系配置"
)
return recommendations
def monitor_access_changes(self, analyzer_name: str) -> Dict[str, Any]:
"""监控访问权限变化"""
try:
analyzer_arn = self._get_analyzer_arn(analyzer_name)
# 获取最近的发现
findings = self.access_analyzer.list_findings(
analyzerArn=analyzer_arn,
sort={
'attributeName': 'updatedAt',
'orderBy': 'DESC'
},
maxResults=50
)
# 分析变化趋势
recent_changes = {
'new_findings_today': 0,
'resolved_findings_today': 0,
'critical_changes': [],
'trend_analysis': {}
}
today = datetime.utcnow().date()
for finding in findings['findings']:
updated_date = finding['updatedAt'].date()
if updated_date == today:
if finding['status'] == 'ACTIVE':
recent_changes['new_findings_today'] += 1
else:
recent_changes['resolved_findings_today'] += 1
# 识别关键变化
if finding['resourceType'] in ['AWS::IAM::Role', 'AWS::KMS::Key']:
recent_changes['critical_changes'].append({
'resource': finding['resource'],
'type': finding['resourceType'],
'principal': finding.get('principal', {}),
'action': finding.get('action', [])
})
return recent_changes
except Exception as e:
print(f"监控访问变化失败: {e}")
return {}
# 使用示例
access_analyzer_service = IAMAccessAnalyzerService()
# 创建访问分析器
analyzer_result = access_analyzer_service.create_analyzer('security-analyzer')
# 分析外部访问
access_analysis = access_analyzer_service.analyze_external_access('security-analyzer')
print("外部访问分析结果:")
print(f" 总发现数: {access_analysis.get('total_findings', 0)}")
print(f" 活跃发现: {len(access_analysis.get('active_findings', []))}")
print(f" 高风险发现: {access_analysis.get('by_severity', {}).get('HIGH', 0)}")
print("\n按资源类型分布:")
for resource_type, count in access_analysis.get('by_resource_type', {}).items():
print(f" {resource_type}: {count}")
print("\n优化建议:")
for recommendation in access_analysis.get('recommendations', []):
print(f" - {recommendation}")
# 监控访问变化
changes = access_analyzer_service.monitor_access_changes('security-analyzer')
print(f"\n今日新发现: {changes.get('new_findings_today', 0)}")
print(f"今日解决: {changes.get('resolved_findings_today', 0)}")
print(f"关键变化数量: {len(changes.get('critical_changes', []))}")
10.4 合规性检查自动化
10.4.1 IAM合规性检查框架
class IAMComplianceChecker:
"""IAM合规性检查器"""
def __init__(self):
self.iam = boto3.client('iam')
self.config = boto3.client('config')
def run_comprehensive_compliance_check(self) -> Dict[str, Any]:
"""运行综合合规性检查"""
compliance_result = {
'overall_score': 0,
'checks': {
'password_policy': self._check_password_policy(),
'mfa_enforcement': self._check_mfa_enforcement(),
'unused_credentials': self._check_unused_credentials(),
'privilege_escalation': self._check_privilege_escalation(),
'cross_account_access': self._check_cross_account_access(),
'service_accounts': self._check_service_accounts(),
'policy_compliance': self._check_policy_compliance()
},
'recommendations': [],
'critical_issues': []
}
# 计算总体合规分数
passed_checks = sum(1 for check in compliance_result['checks'].values()
if check['status'] == 'PASS')
total_checks = len(compliance_result['checks'])
compliance_result['overall_score'] = int((passed_checks / total_checks) * 100)
# 生成建议和关键问题
for check_name, check_result in compliance_result['checks'].items():
if check_result['status'] == 'FAIL':
compliance_result['recommendations'].extend(
check_result.get('recommendations', [])
)
if check_result.get('severity') == 'HIGH':
compliance_result['critical_issues'].append({
'check': check_name,
'issue': check_result.get('message', ''),
'impact': check_result.get('impact', '')
})
return compliance_result
def _check_password_policy(self) -> Dict[str, Any]:
"""检查密码策略合规性"""
try:
policy = self.iam.get_account_password_policy()
password_policy = policy['PasswordPolicy']
compliance_issues = []
# 检查密码长度
if password_policy.get('MinimumPasswordLength', 0) < 12:
compliance_issues.append("密码最小长度应不少于12位")
# 检查复杂度要求
if not password_policy.get('RequireUppercaseCharacters', False):
compliance_issues.append("应要求包含大写字母")
if not password_policy.get('RequireLowercaseCharacters', False):
compliance_issues.append("应要求包含小写字母")
if not password_policy.get('RequireNumbers', False):
compliance_issues.append("应要求包含数字")
if not password_policy.get('RequireSymbols', False):
compliance_issues.append("应要求包含特殊符号")
# 检查密码轮换
if password_policy.get('MaxPasswordAge', 365) > 90:
compliance_issues.append("密码有效期应不超过90天")
# 检查密码历史
if password_policy.get('PasswordReusePrevention', 0) < 12:
compliance_issues.append("应防止重复使用最近12个密码")
status = 'PASS' if not compliance_issues else 'FAIL'
return {
'status': status,
'message': f"密码策略检查 - {status}",
'issues': compliance_issues,
'severity': 'HIGH' if compliance_issues else 'LOW',
'recommendations': [
f"修复密码策略问题: {issue}" for issue in compliance_issues
]
}
except self.iam.exceptions.NoSuchEntityException:
return {
'status': 'FAIL',
'message': "未配置账户密码策略",
'severity': 'HIGH',
'recommendations': ["创建并配置符合安全要求的密码策略"]
}
except Exception as e:
return {
'status': 'ERROR',
'message': f"检查密码策略失败: {e}",
'severity': 'MEDIUM'
}
def _check_mfa_enforcement(self) -> Dict[str, Any]:
"""检查MFA强制执行合规性"""
try:
users_without_mfa = []
privileged_users_without_mfa = []
# 获取所有用户
paginator = self.iam.get_paginator('list_users')
for page in paginator.paginate():
for user in page['Users']:
username = user['UserName']
# 检查MFA设备
mfa_devices = self.iam.list_mfa_devices(UserName=username)
if not mfa_devices['MFADevices']:
users_without_mfa.append(username)
# 检查是否为特权用户
if self._is_privileged_user(username):
privileged_users_without_mfa.append(username)
# 评估合规性
total_users = len(users_without_mfa) + len(mfa_devices['MFADevices'])
mfa_adoption_rate = 0 if total_users == 0 else (
(total_users - len(users_without_mfa)) / total_users * 100
)
status = 'PASS' if len(privileged_users_without_mfa) == 0 else 'FAIL'
return {
'status': status,
'message': f"MFA强制执行检查 - {status}",
'mfa_adoption_rate': round(mfa_adoption_rate, 2),
'users_without_mfa': len(users_without_mfa),
'privileged_users_without_mfa': privileged_users_without_mfa,
'severity': 'HIGH' if privileged_users_without_mfa else 'MEDIUM',
'recommendations': [
f"为特权用户启用MFA: {', '.join(privileged_users_without_mfa)}"
] if privileged_users_without_mfa else []
}
except Exception as e:
return {
'status': 'ERROR',
'message': f"检查MFA强制执行失败: {e}",
'severity': 'MEDIUM'
}
def _is_privileged_user(self, username: str) -> bool:
"""判断是否为特权用户"""
try:
# 检查直接附加的策略
attached_policies = self.iam.list_attached_user_policies(UserName=username)
for policy in attached_policies['AttachedPolicies']:
if 'Admin' in policy['PolicyName'] or 'PowerUser' in policy['PolicyName']:
return True
# 检查组成员身份
groups = self.iam.get_groups_for_user(UserName=username)
for group in groups['Groups']:
group_policies = self.iam.list_attached_group_policies(
GroupName=group['GroupName']
)
for policy in group_policies['AttachedPolicies']:
if 'Admin' in policy['PolicyName']:
return True
return False
except:
return False
def _check_unused_credentials(self) -> Dict[str, Any]:
"""检查未使用的凭证"""
try:
unused_users = []
unused_access_keys = []
# 获取凭证使用报告
report = self.iam.generate_credential_report()
# 等待报告生成
import time
while report['State'] != 'COMPLETE':
time.sleep(2)
report = self.iam.get_credential_report()
# 分析报告内容
import csv
import io
report_content = report['Content'].decode('utf-8')
csv_reader = csv.DictReader(io.StringIO(report_content))
for row in csv_reader:
username = row['user']
# 检查密码使用
password_last_used = row.get('password_last_used', 'N/A')
if password_last_used not in ['N/A', 'no_information']:
last_used = datetime.strptime(
password_last_used.split('T')[0],
'%Y-%m-%d'
)
if (datetime.now() - last_used).days > 90:
unused_users.append({
'username': username,
'last_used': password_last_used,
'days_unused': (datetime.now() - last_used).days
})
# 检查访问密钥使用
for key_num in ['1', '2']:
key_last_used = row.get(f'access_key_{key_num}_last_used_date', 'N/A')
if (row.get(f'access_key_{key_num}_active', 'false') == 'true' and
key_last_used not in ['N/A', 'no_information']):
last_used = datetime.strptime(
key_last_used.split('T')[0],
'%Y-%m-%d'
)
if (datetime.now() - last_used).days > 90:
unused_access_keys.append({
'username': username,
'key_id': row.get(f'access_key_{key_num}'),
'last_used': key_last_used,
'days_unused': (datetime.now() - last_used).days
})
status = 'PASS' if not unused_users and not unused_access_keys else 'FAIL'
return {
'status': status,
'message': f"未使用凭证检查 - {status}",
'unused_users': unused_users,
'unused_access_keys': unused_access_keys,
'severity': 'MEDIUM',
'recommendations': [
"禁用或删除90天未使用的用户账户",
"轮换或删除90天未使用的访问密钥"
] if status == 'FAIL' else []
}
except Exception as e:
return {
'status': 'ERROR',
'message': f"检查未使用凭证失败: {e}",
'severity': 'MEDIUM'
}
def _check_privilege_escalation(self) -> Dict[str, Any]:
"""检查特权升级风险"""
try:
risky_policies = []
# 检查所有自定义策略
paginator = self.iam.get_paginator('list_policies')
for page in paginator.paginate(Scope='Local'):
for policy in page['Policies']:
# 获取策略文档
policy_version = self.iam.get_policy_version(
PolicyArn=policy['Arn'],
VersionId=policy['DefaultVersionId']
)
policy_doc = policy_version['PolicyVersion']['Document']
# 检查危险权限
if self._has_privilege_escalation_risk(policy_doc):
risky_policies.append({
'policy_name': policy['PolicyName'],
'policy_arn': policy['Arn'],
'risk_factors': self._analyze_escalation_risks(policy_doc)
})
status = 'PASS' if not risky_policies else 'FAIL'
return {
'status': status,
'message': f"特权升级风险检查 - {status}",
'risky_policies': risky_policies,
'severity': 'HIGH' if risky_policies else 'LOW',
'recommendations': [
f"审查策略 {policy['policy_name']} 的权限配置"
for policy in risky_policies
]
}
except Exception as e:
return {
'status': 'ERROR',
'message': f"检查特权升级风险失败: {e}",
'severity': 'MEDIUM'
}
def _has_privilege_escalation_risk(self, policy_doc: Dict[str, Any]) -> bool:
"""判断策略是否存在特权升级风险"""
dangerous_actions = [
'iam:CreatePolicy',
'iam:CreateRole',
'iam:PutUserPolicy',
'iam:PutRolePolicy',
'iam:AttachUserPolicy',
'iam:AttachRolePolicy',
'iam:SetDefaultPolicyVersion',
'sts:AssumeRole'
]
statements = policy_doc.get('Statement', [])
if not isinstance(statements, list):
statements = [statements]
for statement in statements:
if statement.get('Effect') == 'Allow':
actions = statement.get('Action', [])
if isinstance(actions, str):
actions = [actions]
for action in actions:
if action == '*' or any(dangerous in action for dangerous in dangerous_actions):
return True
return False
def _analyze_escalation_risks(self, policy_doc: Dict[str, Any]) -> List[str]:
"""分析特权升级风险因素"""
risk_factors = []
statements = policy_doc.get('Statement', [])
if not isinstance(statements, list):
statements = [statements]
for statement in statements:
if statement.get('Effect') == 'Allow':
actions = statement.get('Action', [])
resources = statement.get('Resource', [])
if isinstance(actions, str):
actions = [actions]
if isinstance(resources, str):
resources = [resources]
# 检查通配符使用
if '*' in actions:
risk_factors.append("使用通配符授予所有权限")
if '*' in resources:
risk_factors.append("对所有资源授予权限")
# 检查IAM管理权限
iam_actions = [action for action in actions if action.startswith('iam:')]
if iam_actions:
risk_factors.append(f"包含IAM管理权限: {', '.join(iam_actions[:3])}")
# 检查STS权限
sts_actions = [action for action in actions if action.startswith('sts:')]
if sts_actions:
risk_factors.append(f"包含STS权限: {', '.join(sts_actions[:3])}")
return risk_factors
def _check_cross_account_access(self) -> Dict[str, Any]:
"""检查跨账户访问配置"""
# 实现跨账户访问检查逻辑
return {
'status': 'PASS',
'message': '跨账户访问检查 - PASS',
'severity': 'LOW'
}
def _check_service_accounts(self) -> Dict[str, Any]:
"""检查服务账户配置"""
# 实现服务账户检查逻辑
return {
'status': 'PASS',
'message': '服务账户检查 - PASS',
'severity': 'LOW'
}
def _check_policy_compliance(self) -> Dict[str, Any]:
"""检查策略合规性"""
# 实现策略合规性检查逻辑
return {
'status': 'PASS',
'message': '策略合规性检查 - PASS',
'severity': 'LOW'
}
# 使用示例
compliance_checker = IAMComplianceChecker()
compliance_result = compliance_checker.run_comprehensive_compliance_check()
print("IAM合规性检查结果:")
print(f"总体合规分数: {compliance_result['overall_score']}/100")
print(f"关键问题数量: {len(compliance_result['critical_issues'])}")
print("\n各项检查结果:")
for check_name, result in compliance_result['checks'].items():
print(f" {check_name}: {result['status']} ({result.get('severity', 'N/A')})")
print("\n建议措施:")
for recommendation in compliance_result['recommendations'][:5]:
print(f" - {recommendation}")
if compliance_result['critical_issues']:
print("\n关键问题:")
for issue in compliance_result['critical_issues']:
print(f" - {issue['check']}: {issue['issue']}")
现在我完成了第10章的创建,让我更新todo状态并继续第11章。