Chapter 10: IAM Access Analysis and Auditing
9/1/25About 12 min
Chapter 10: IAM Access Analysis and Auditing
Learning Objectives
By the end of this chapter, you will be able to:
- Understand the importance and core concepts of IAM access analysis
- Master IAM operation auditing using AWS CloudTrail
- Learn to use IAM Access Analyzer for permission risk analysis
- Implement automated IAM compliance checks and reporting
- Build an IAM security monitoring and alerting system
- Master IAM access pattern analysis and optimization techniques
10.1 IAM Access Analysis Basics
10.1.1 Access Analysis Overview
IAM access analysis is a key part of ensuring cloud security, helping to identify potential security risks and compliance issues.
10.1.2 Access Analysis Python Tool Framework
import boto3
import json
from datetime import datetime, timedelta
from typing import Dict, List, Any
import pandas as pd
from collections import defaultdict
class IAMAccessAnalyzer:
"""IAM Access Analyzer"""
def __init__(self):
self.iam = boto3.client('iam')
self.cloudtrail = boto3.client('cloudtrail')
self.access_analyzer = boto3.client('accessanalyzer')
self.sts = boto3.client('sts')
def get_account_summary(self) -> Dict[str, Any]:
"""Get an overview of IAM account usage"""
try:
summary = self.iam.get_account_summary()
return summary['SummaryMap']
except Exception as e:
print(f"Failed to get account summary: {e}")
return {}
def analyze_user_permissions(self, username: str) -> Dict[str, Any]:
"""Analyze a user's permission usage"""
analysis = {
'user': username,
'attached_policies': [],
'inline_policies': [],
'group_memberships': [],
'last_activity': None,
'unused_permissions': [],
'risk_score': 0
}
try:
# Get user information
user_info = self.iam.get_user(UserName=username)
analysis['created_date'] = user_info['User']['CreateDate']
# Get directly attached policies
attached_policies = self.iam.list_attached_user_policies(
UserName=username
)
analysis['attached_policies'] = attached_policies['AttachedPolicies']
# Get inline policies
inline_policies = self.iam.list_user_policies(UserName=username)
for policy_name in inline_policies['PolicyNames']:
policy_doc = self.iam.get_user_policy(
UserName=username,
PolicyName=policy_name
)
analysis['inline_policies'].append({
'name': policy_name,
'document': policy_doc['PolicyDocument']
})
# Get group memberships
groups = self.iam.get_groups_for_user(UserName=username)
analysis['group_memberships'] = groups['Groups']
# Analyze last activity time
analysis['last_activity'] = self._get_user_last_activity(username)
# Calculate risk score
analysis['risk_score'] = self._calculate_user_risk_score(analysis)
return analysis
except Exception as e:
print(f"Failed to analyze user permissions: {e}")
return analysis
def _get_user_last_activity(self, username: str) -> datetime:
"""Get a user's last activity time"""
try:
# Query CloudTrail logs for the last 30 days
end_time = datetime.utcnow()
start_time = end_time - timedelta(days=30)
events = self.cloudtrail.lookup_events(
LookupAttributes=[
{
'AttributeKey': 'Username',
'AttributeValue': username
}
],
StartTime=start_time,
EndTime=end_time,
MaxItems=1
)
if events['Events']:
return events['Events'][0]['EventTime']
return None
except Exception as e:
print(f"Failed to get user activity time: {e}")
return None
def _calculate_user_risk_score(self, analysis: Dict[str, Any]) -> int:
"""Calculate a user's risk score"""
risk_score = 0
# Over-permission risk
total_policies = (
len(analysis['attached_policies']) +
len(analysis['inline_policies'])
)
if total_policies > 5:
risk_score += 20
# Long-term inactivity risk
if analysis['last_activity']:
days_inactive = (datetime.utcnow() - analysis['last_activity']).days
if days_inactive > 90:
risk_score += 30
elif days_inactive > 30:
risk_score += 15
else:
risk_score += 40 # Never active
# Administrator permission risk
for policy in analysis['attached_policies']:
if 'Admin' in policy['PolicyName']:
risk_score += 25
return min(risk_score, 100)
# Example usage
analyzer = IAMAccessAnalyzer()
# Get account summary
account_summary = analyzer.get_account_summary()
print("IAM Account Summary:")
for key, value in account_summary.items():
print(f" {key}: {value}")
# Analyze a specific user
user_analysis = analyzer.analyze_user_permissions('example-user')
print(f"\nUser Analysis Results:")
print(f"User: {user_analysis['user']}")
print(f"Risk Score: {user_analysis['risk_score']}/100")
print(f"Number of attached policies: {len(user_analysis['attached_policies'])}")
print(f"Last activity: {user_analysis['last_activity']}")
10.2 Auditing Operations with CloudTrail
10.2.1 CloudTrail IAM Event Monitoring
class CloudTrailIAMMonitor:
"""CloudTrail IAM Event Monitor"""
def __init__(self):
self.cloudtrail = boto3.client('cloudtrail')
self.logs = boto3.client('logs')
def analyze_iam_events(self, days: int = 7) -> Dict[str, Any]:
"""Analyze IAM-related events"""
end_time = datetime.utcnow()
start_time = end_time - timedelta(days=days)
iam_events = {
'total_events': 0,
'user_creation': [],
'policy_changes': [],
'role_assumptions': [],
'permission_changes': [],
'failed_attempts': [],
'high_risk_events': []
}
try:
# Query IAM-related events
paginator = self.cloudtrail.get_paginator('lookup_events')
for page in paginator.paginate(
LookupAttributes=[
{
'AttributeKey': 'EventName',
'AttributeValue': 'CreateUser'
}
],
StartTime=start_time,
EndTime=end_time
):
for event in page['Events']:
iam_events['total_events'] += 1
event_name = event['EventName']
username = event.get('Username', 'Unknown')
# Classify events
if event_name in ['CreateUser', 'DeleteUser']:
iam_events['user_creation'].append({
'event': event_name,
'time': event['EventTime'],
'user': username,
'source_ip': event.get('SourceIPAddress')
})
elif event_name in ['CreatePolicy', 'DeletePolicy',
'PutUserPolicy', 'DeleteUserPolicy']:
iam_events['policy_changes'].append({
'event': event_name,
'time': event['EventTime'],
'user': username,
'details': self._extract_policy_details(event)
})
elif event_name == 'AssumeRole':
iam_events['role_assumptions'].append({
'time': event['EventTime'],
'user': username,
'role': self._extract_role_name(event),
'source_ip': event.get('SourceIPAddress')
})
# Identify high-risk events
iam_events['high_risk_events'] = self._identify_high_risk_events(
iam_events
)
return iam_events
except Exception as e:
print(f"Failed to analyze IAM events: {e}")
return iam_events
def _extract_policy_details(self, event: Dict[str, Any]) -> Dict[str, Any]:
"""Extract policy change details"""
try:
cloud_trail_event = json.loads(event.get('CloudTrailEvent', '{}'))
request_params = cloud_trail_event.get('requestParameters', {})
return {
'policy_name': request_params.get('policyName'),
'policy_arn': request_params.get('policyArn'),
'user_name': request_params.get('userName'),
'role_name': request_params.get('roleName')
}
except:
return {}
def _extract_role_name(self, event: Dict[str, Any]) -> str:
"""Extract the role name"""
try:
cloud_trail_event = json.loads(event.get('CloudTrailEvent', '{}'))
role_arn = cloud_trail_event.get('requestParameters', {}).get('roleArn', '')
return role_arn.split('/')[-1] if role_arn else 'Unknown'
except:
return 'Unknown'
def _identify_high_risk_events(self, events: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Identify high-risk events"""
high_risk = []
# Check user creation frequency
if len(events['user_creation']) > 10:
high_risk.append({
'type': 'frequent_user_creation',
'count': len(events['user_creation']),
'risk_level': 'high'
})
# Check policy change frequency
if len(events['policy_changes']) > 20:
high_risk.append({
'type': 'frequent_policy_changes',
'count': len(events['policy_changes']),
'risk_level': 'medium'
})
# Check for unusual IP addresses
ip_frequencies = defaultdict(int)
for event in events['role_assumptions']:
if event.get('source_ip'):
ip_frequencies[event['source_ip']] += 1
for ip, count in ip_frequencies.items():
if count > 100: # Excessive use from a single IP
high_risk.append({
'type': 'suspicious_ip_activity',
'ip_address': ip,
'count': count,
'risk_level': 'high'
})
return high_risk
# Example usage
monitor = CloudTrailIAMMonitor()
iam_events = monitor.analyze_iam_events(days=7)
print("IAM Event Analysis Results:")
print(f"Total events: {iam_events['total_events']}")
print(f"User creation events: {len(iam_events['user_creation'])}")
print(f"Policy change events: {len(iam_events['policy_changes'])}")
print(f"Role assumption events: {len(iam_events['role_assumptions'])}")
print(f"High-risk events: {len(iam_events['high_risk_events'])}")
# Display high-risk event details
for risk_event in iam_events['high_risk_events']:
print(f"\nHigh-risk event:")
print(f" Type: {risk_event['type']}")
print(f" Risk level: {risk_event['risk_level']}")
10.2.2 Automated Audit Report Generation
import matplotlib.pyplot as plt
import seaborn as sns
from jinja2 import Template
class IAMAuditReporter:
"""IAM Audit Report Generator"""
def __init__(self):
self.analyzer = IAMAccessAnalyzer()
self.monitor = CloudTrailIAMMonitor()
def generate_comprehensive_report(self, output_path: str = "iam_audit_report.html"):
"""Generate a comprehensive audit report"""
# Collect data
account_summary = self.analyzer.get_account_summary()
iam_events = self.monitor.analyze_iam_events(days=30)
# Get a list of all users
users_response = self.analyzer.iam.list_users()
users_analysis = []
for user in users_response['Users']:
user_analysis = self.analyzer.analyze_user_permissions(
user['UserName']
)
users_analysis.append(user_analysis)
# Generate statistical charts
self._generate_charts(users_analysis, iam_events)
# Generate an HTML report
html_content = self._generate_html_report(
account_summary,
users_analysis,
iam_events
)
with open(output_path, 'w', encoding='utf-8') as f:
f.write(html_content)
print(f"Audit report generated: {output_path}")
return {
'report_path': output_path,
'total_users': len(users_analysis),
'high_risk_users': sum(1 for u in users_analysis if u['risk_score'] > 70),
'total_events': iam_events['total_events']
}
def _generate_charts(self, users_analysis: List[Dict], iam_events: Dict):
"""Generate statistical charts"""
plt.style.use('seaborn-v0_8')
# User risk score distribution chart
risk_scores = [user['risk_score'] for user in users_analysis]
plt.figure(figsize=(12, 8))
plt.subplot(2, 2, 1)
plt.hist(risk_scores, bins=10, alpha=0.7, color='skyblue')
plt.title('User Risk Score Distribution')
plt.xlabel('Risk Score')
plt.ylabel('Number of Users')
# Policy count distribution chart
policy_counts = [
len(user['attached_policies']) + len(user['inline_policies'])
for user in users_analysis
]
plt.subplot(2, 2, 2)
plt.hist(policy_counts, bins=10, alpha=0.7, color='lightcoral')
plt.title('User Policy Count Distribution')
plt.xlabel('Number of Policies')
plt.ylabel('Number of Users')
# IAM event time trend chart
plt.subplot(2, 2, 3)
event_dates = []
for event_list in [iam_events['user_creation'],
iam_events['policy_changes']]:
event_dates.extend([event['time'].date() for event in event_list])
if event_dates:
event_dates_df = pd.DataFrame({'date': event_dates})
event_counts = event_dates_df.groupby('date').size()
event_counts.plot(kind='line', color='green')
plt.title('IAM Event Time Trend')
plt.xlabel('Date')
plt.ylabel('Number of Events')
# Risk level pie chart
plt.subplot(2, 2, 4)
risk_levels = {
'Low Risk (0-30)': sum(1 for score in risk_scores if score <= 30),
'Medium Risk (31-70)': sum(1 for score in risk_scores if 31 <= score <= 70),
'High Risk (71-100)': sum(1 for score in risk_scores if score > 70)
}
plt.pie(risk_levels.values(), labels=risk_levels.keys(), autopct='%1.1f%%')
plt.title('User Risk Level Distribution')
plt.tight_layout()
plt.savefig('iam_analysis_charts.png', dpi=300, bbox_inches='tight')
plt.close()
def _generate_html_report(self, account_summary: Dict,
users_analysis: List[Dict],
iam_events: Dict) -> str:
"""Generate an HTML audit report"""
template_str = """
<!DOCTYPE html>
<html>
<head>
<title>IAM Access Audit Report</title>
<style>
body { font-family: Arial, sans-serif; margin: 40px; }
.header { background-color: #f4f4f4; padding: 20px; border-radius: 5px; }
.section { margin: 20px 0; }
.risk-high { color: red; font-weight: bold; }
.risk-medium { color: orange; font-weight: bold; }
.risk-low { color: green; font-weight: bold; }
table { border-collapse: collapse; width: 100%; margin: 10px 0; }
th, td { border: 1px solid #ddd; padding: 8px; text-align: left; }
th { background-color: #f2f2f2; }
.chart { text-align: center; margin: 20px 0; }
</style>
</head>
<body>
<div class="header">
<h1>IAM Access Audit Report</h1>
<p>Generated on: {{ report_time }}</p>
<p>Audit period: Last 30 days</p>
</div>
<div class="section">
<h2>Account Summary</h2>
<table>
<tr><th>Item</th><th>Count</th></tr>
{% for key, value in account_summary.items() %}
<tr><td>{{ key }}</td><td>{{ value }}</td></tr>
{% endfor %}
</table>
</div>
<div class="section">
<h2>High-risk Users</h2>
<table>
<tr><th>Username</th><th>Risk Score</th><th>Policy Count</th><th>Last Activity</th></tr>
{% for user in high_risk_users %}
<tr>
<td>{{ user.user }}</td>
<td class="risk-high">{{ user.risk_score }}</td>
<td>{{ user.policy_count }}</td>
<td>{{ user.last_activity or 'Never active' }}</td>
</tr>
{% endfor %}
</table>
</div>
<div class="section">
<h2>IAM Event Statistics</h2>
<table>
<tr><th>Event Type</th><th>Count</th></tr>
<tr><td>User Creation/Deletion</td><td>{{ iam_events.user_creation|length }}</td></tr>
<tr><td>Policy Changes</td><td>{{ iam_events.policy_changes|length }}</td></tr>
<tr><td>Role Assumptions</td><td>{{ iam_events.role_assumptions|length }}</td></tr>
<tr><td>High-risk Events</td><td class="risk-high">{{ iam_events.high_risk_events|length }}</td></tr>
</table>
</div>
<div class="chart">
<h2>Analysis Charts</h2>
<img src="iam_analysis_charts.png" alt="IAM analysis charts" style="max-width: 100%;">
</div>
<div class="section">
<h2>Recommendations and Next Steps</h2>
<ul>
<li>Regularly review the permission configurations of high-risk users</li>
<li>Clean up user accounts that have been inactive for a long time</li>
<li>Implement the principle of least privilege and remove unnecessary policies</li>
<li>Enable MFA for all administrative users</li>
<li>Set up CloudWatch alarms to monitor for unusual IAM activity</li>
</ul>
</div>
</body>
</html>
"""
template = Template(template_str)
# Filter high-risk users
high_risk_users = [
{
'user': user['user'],
'risk_score': user['risk_score'],
'policy_count': len(user['attached_policies']) + len(user['inline_policies']),
'last_activity': user['last_activity']
}
for user in users_analysis if user['risk_score'] > 50
]
return template.render(
report_time=datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
account_summary=account_summary,
high_risk_users=high_risk_users,
iam_events=iam_events
)
# Example usage
reporter = IAMAuditReporter()
report_result = reporter.generate_comprehensive_report()
print("Audit report generated:")
print(f" Report path: {report_result['report_path']}")
print(f" Total users: {report_result['total_users']}")
print(f" High-risk users: {report_result['high_risk_users']}")
print(f" Total events: {report_result['total_events']}")
10.3 Using IAM Access Analyzer
10.3.1 Access Analyzer Configuration and Usage
class IAMAccessAnalyzerService:
"""IAM Access Analyzer Service Management"""
def __init__(self):
self.access_analyzer = boto3.client('accessanalyzer')
self.iam = boto3.client('iam')
def create_analyzer(self, analyzer_name: str, type: str = 'ACCOUNT') -> Dict[str, Any]:
"""Create an access analyzer"""
try:
response = self.access_analyzer.create_analyzer(
analyzerName=analyzer_name,
type=type,
tags={
'Purpose': 'IAM-Security-Analysis',
'Environment': 'Production'
}
)
print(f"Access analyzer created successfully: {response['arn']}")
return response
except Exception as e:
print(f"Failed to create access analyzer: {e}")
return {}
def analyze_external_access(self, analyzer_name: str) -> Dict[str, Any]:
"""Analyze external access permissions"""
try:
# Get all findings
findings = self.access_analyzer.list_findings(
analyzerArn=self._get_analyzer_arn(analyzer_name)
)
analysis_result = {
'total_findings': len(findings['findings']),
'active_findings': [],
'archived_findings': [],
'by_resource_type': defaultdict(int),
'by_severity': defaultdict(int),
'recommendations': []
}
for finding in findings['findings']:
# Classify by status
if finding['status'] == 'ACTIVE':
analysis_result['active_findings'].append(finding)
else:
analysis_result['archived_findings'].append(finding)
# Tally by resource type
resource_type = finding['resourceType']
analysis_result['by_resource_type'][resource_type] += 1
# Classify by severity (judged by the number of conditions)
condition_count = len(finding.get('condition', {}))
if condition_count == 0:
severity = 'HIGH'
elif condition_count <= 2:
severity = 'MEDIUM'
else:
severity = 'LOW'
analysis_result['by_severity'][severity] += 1
# Generate recommendations
analysis_result['recommendations'] = self._generate_access_recommendations(
analysis_result
)
return analysis_result
except Exception as e:
print(f"Failed to analyze external access: {e}")
return {}
def _get_analyzer_arn(self, analyzer_name: str) -> str:
"""Get the analyzer ARN"""
try:
analyzers = self.access_analyzer.list_analyzers()
for analyzer in analyzers['analyzers']:
if analyzer['name'] == analyzer_name:
return analyzer['arn']
return ""
except:
return ""
def _generate_access_recommendations(self, analysis: Dict[str, Any]) -> List[str]:
"""Generate access optimization recommendations"""
recommendations = []
if analysis['by_severity']['HIGH'] > 0:
recommendations.append(
f"Found {analysis['by_severity']['HIGH']} high-risk external access permissions, "
"immediate review and restriction of access conditions is recommended"
)
if analysis['by_resource_type']['AWS::S3::Bucket'] > 5:
recommendations.append(
"Too much external access to S3 buckets, enabling S3 Block Public Access is recommended"
)
if analysis['by_resource_type']['AWS::IAM::Role'] > 0:
recommendations.append(
"Found IAM roles being accessed by external accounts, reviewing trust relationship configurations is recommended"
)
return recommendations
def monitor_access_changes(self, analyzer_name: str) -> Dict[str, Any]:
"""Monitor access permission changes"""
try:
analyzer_arn = self._get_analyzer_arn(analyzer_name)
# Get recent findings
findings = self.access_analyzer.list_findings(
analyzerArn=analyzer_arn,
sort={
'attributeName': 'updatedAt',
'orderBy': 'DESC'
},
maxResults=50
)
# Analyze change trends
recent_changes = {
'new_findings_today': 0,
'resolved_findings_today': 0,
'critical_changes': [],
'trend_analysis': {}
}
today = datetime.utcnow().date()
for finding in findings['findings']:
updated_date = finding['updatedAt'].date()
if updated_date == today:
if finding['status'] == 'ACTIVE':
recent_changes['new_findings_today'] += 1
else:
recent_changes['resolved_findings_today'] += 1
# Identify critical changes
if finding['resourceType'] in ['AWS::IAM::Role', 'AWS::KMS::Key']:
recent_changes['critical_changes'].append({
'resource': finding['resource'],
'type': finding['resourceType'],
'principal': finding.get('principal', {}),
'action': finding.get('action', [])
})
return recent_changes
except Exception as e:
print(f"Failed to monitor access changes: {e}")
return {}
# Example usage
access_analyzer_service = IAMAccessAnalyzerService()
# Create an access analyzer
analyzer_result = access_analyzer_service.create_analyzer('security-analyzer')
# Analyze external access
access_analysis = access_analyzer_service.analyze_external_access('security-analyzer')
print("External Access Analysis Results:")
print(f" Total findings: {access_analysis.get('total_findings', 0)}")
print(f" Active findings: {len(access_analysis.get('active_findings', []))}")
print(f" High-risk findings: {access_analysis.get('by_severity', {}).get('HIGH', 0)}")
print("\nDistribution by resource type:")
for resource_type, count in access_analysis.get('by_resource_type', {}).items():
print(f" {resource_type}: {count}")
print("\nOptimization recommendations:")
for recommendation in access_analysis.get('recommendations', []):
print(f" - {recommendation}")
# Monitor access changes
changes = access_analyzer_service.monitor_access_changes('security-analyzer')
print(f"\nNew findings today: {changes.get('new_findings_today', 0)}")
print(f"Resolved today: {changes.get('resolved_findings_today', 0)}")
print(f"Number of critical changes: {len(changes.get('critical_changes', []))}")
10.4 Automated Compliance Checks
10.4.1 IAM Compliance Check Framework
class IAMComplianceChecker:
"""IAM Compliance Checker"""
def __init__(self):
self.iam = boto3.client('iam')
self.config = boto3.client('config')
def run_comprehensive_compliance_check(self) -> Dict[str, Any]:
"""Run a comprehensive compliance check"""
compliance_result = {
'overall_score': 0,
'checks': {
'password_policy': self._check_password_policy(),
'mfa_enforcement': self._check_mfa_enforcement(),
'unused_credentials': self._check_unused_credentials(),
'privilege_escalation': self._check_privilege_escalation(),
'cross_account_access': self._check_cross_account_access(),
'service_accounts': self._check_service_accounts(),
'policy_compliance': self._check_policy_compliance()
},
'recommendations': [],
'critical_issues': []
}
# Calculate the overall compliance score
passed_checks = sum(1 for check in compliance_result['checks'].values()
if check['status'] == 'PASS')
total_checks = len(compliance_result['checks'])
compliance_result['overall_score'] = int((passed_checks / total_checks) * 100)
# Generate recommendations and critical issues
for check_name, check_result in compliance_result['checks'].items():
if check_result['status'] == 'FAIL':
compliance_result['recommendations'].extend(
check_result.get('recommendations', [])
)
if check_result.get('severity') == 'HIGH':
compliance_result['critical_issues'].append({
'check': check_name,
'issue': check_result.get('message', ''),
'impact': check_result.get('impact', '')
})
return compliance_result
def _check_password_policy(self) -> Dict[str, Any]:
"""Check password policy compliance"""
try:
policy = self.iam.get_account_password_policy()
password_policy = policy['PasswordPolicy']
compliance_issues = []
# Check password length
if password_policy.get('MinimumPasswordLength', 0) < 12:
compliance_issues.append("Minimum password length should be at least 12 characters")
# Check complexity requirements
if not password_policy.get('RequireUppercaseCharacters', False):
compliance_issues.append("Should require uppercase letters")
if not password_policy.get('RequireLowercaseCharacters', False):
compliance_issues.append("Should require lowercase letters")
if not password_policy.get('RequireNumbers', False):
compliance_issues.append("Should require numbers")
if not password_policy.get('RequireSymbols', False):
compliance_issues.append("Should require special symbols")
# Check password rotation
if password_policy.get('MaxPasswordAge', 365) > 90:
compliance_issues.append("Password validity should not exceed 90 days")
# Check password history
if password_policy.get('PasswordReusePrevention', 0) < 12:
compliance_issues.append("Should prevent reuse of the last 12 passwords")
status = 'PASS' if not compliance_issues else 'FAIL'
return {
'status': status,
'message': f"Password policy check - {status}",
'issues': compliance_issues,
'severity': 'HIGH' if compliance_issues else 'LOW',
'recommendations': [
f"Fix password policy issue: {issue}" for issue in compliance_issues
]
}
except self.iam.exceptions.NoSuchEntityException:
return {
'status': 'FAIL',
'message': "No account password policy configured",
'severity': 'HIGH',
'recommendations': ["Create and configure a password policy that meets security requirements"]
}
except Exception as e:
return {
'status': 'ERROR',
'message': f"Failed to check password policy: {e}",
'severity': 'MEDIUM'
}
def _check_mfa_enforcement(self) -> Dict[str, Any]:
"""Check MFA enforcement compliance"""
try:
users_without_mfa = []
privileged_users_without_mfa = []
# Get all users
paginator = self.iam.get_paginator('list_users')
for page in paginator.paginate():
for user in page['Users']:
username = user['UserName']
# Check for MFA devices
mfa_devices = self.iam.list_mfa_devices(UserName=username)
if not mfa_devices['MFADevices']:
users_without_mfa.append(username)
# Check if the user is privileged
if self._is_privileged_user(username):
privileged_users_without_mfa.append(username)
# Evaluate compliance
total_users = len(users_without_mfa) + len(mfa_devices['MFADevices'])
mfa_adoption_rate = 0 if total_users == 0 else (
(total_users - len(users_without_mfa)) / total_users * 100
)
status = 'PASS' if len(privileged_users_without_mfa) == 0 else 'FAIL'
return {
'status': status,
'message': f"MFA enforcement check - {status}",
'mfa_adoption_rate': round(mfa_adoption_rate, 2),
'users_without_mfa': len(users_without_mfa),
'privileged_users_without_mfa': privileged_users_without_mfa,
'severity': 'HIGH' if privileged_users_without_mfa else 'MEDIUM',
'recommendations': [
f"Enable MFA for privileged users: {', '.join(privileged_users_without_mfa)}"
] if privileged_users_without_mfa else []
}
except Exception as e:
return {
'status': 'ERROR',
'message': f"Failed to check MFA enforcement: {e}",
'severity': 'MEDIUM'
}
def _is_privileged_user(self, username: str) -> bool:
"""Determine if a user is privileged"""
try:
# Check directly attached policies
attached_policies = self.iam.list_attached_user_policies(UserName=username)
for policy in attached_policies['AttachedPolicies']:
if 'Admin' in policy['PolicyName'] or 'PowerUser' in policy['PolicyName']:
return True
# Check group memberships
groups = self.iam.get_groups_for_user(UserName=username)
for group in groups['Groups']:
group_policies = self.iam.list_attached_group_policies(
GroupName=group['GroupName']
)
for policy in group_policies['AttachedPolicies']:
if 'Admin' in policy['PolicyName']:
return True
return False
except:
return False
def _check_unused_credentials(self) -> Dict[str, Any]:
"""Check for unused credentials"""
try:
unused_users = []
unused_access_keys = []
# Get the credential report
report = self.iam.generate_credential_report()
# Wait for the report to be generated
import time
while report['State'] != 'COMPLETE':
time.sleep(2)
report = self.iam.get_credential_report()
# Analyze the report content
import csv
import io
report_content = report['Content'].decode('utf-8')
csv_reader = csv.DictReader(io.StringIO(report_content))
for row in csv_reader:
username = row['user']
# Check password usage
password_last_used = row.get('password_last_used', 'N/A')
if password_last_used not in ['N/A', 'no_information']:
last_used = datetime.strptime(
password_last_used.split('T')[0],
'%Y-%m-%d'
)
if (datetime.now() - last_used).days > 90:
unused_users.append({
'username': username,
'last_used': password_last_used,
'days_unused': (datetime.now() - last_used).days
})
# Check access key usage
for key_num in ['1', '2']:
key_last_used = row.get(f'access_key_{key_num}_last_used_date', 'N/A')
if (row.get(f'access_key_{key_num}_active', 'false') == 'true' and
key_last_used not in ['N/A', 'no_information']):
last_used = datetime.strptime(
key_last_used.split('T')[0],
'%Y-%m-%d'
)
if (datetime.now() - last_used).days > 90:
unused_access_keys.append({
'username': username,
'key_id': row.get(f'access_key_{key_num}'),
'last_used': key_last_used,
'days_unused': (datetime.now() - last_used).days
})
status = 'PASS' if not unused_users and not unused_access_keys else 'FAIL'
return {
'status': status,
'message': f"Unused credentials check - {status}",
'unused_users': unused_users,
'unused_access_keys': unused_access_keys,
'severity': 'MEDIUM',
'recommendations': [
"Disable or delete user accounts that have not been used for 90 days",
"Rotate or delete access keys that have not been used for 90 days"
] if status == 'FAIL' else []
}
except Exception as e:
return {
'status': 'ERROR',
'message': f"Failed to check for unused credentials: {e}",
'severity': 'MEDIUM'
}
def _check_privilege_escalation(self) -> Dict[str, Any]:
"""Check for privilege escalation risks"""
try:
risky_policies = []
# Check all custom policies
paginator = self.iam.get_paginator('list_policies')
for page in paginator.paginate(Scope='Local'):
for policy in page['Policies']:
# Get the policy document
policy_version = self.iam.get_policy_version(
PolicyArn=policy['Arn'],
VersionId=policy['DefaultVersionId']
)
policy_doc = policy_version['PolicyVersion']['Document']
# Check for dangerous permissions
if self._has_privilege_escalation_risk(policy_doc):
risky_policies.append({
'policy_name': policy['PolicyName'],
'policy_arn': policy['Arn'],
'risk_factors': self._analyze_escalation_risks(policy_doc)
})
status = 'PASS' if not risky_policies else 'FAIL'
return {
'status': status,
'message': f"Privilege escalation risk check - {status}",
'risky_policies': risky_policies,
'severity': 'HIGH' if risky_policies else 'LOW',
'recommendations': [
f"Review the permission configuration of policy {policy['policy_name']}"
for policy in risky_policies
]
}
except Exception as e:
return {
'status': 'ERROR',
'message': f"Failed to check for privilege escalation risks: {e}",
'severity': 'MEDIUM'
}
def _has_privilege_escalation_risk(self, policy_doc: Dict[str, Any]) -> bool:
"""Determine if a policy has a privilege escalation risk"""
dangerous_actions = [
'iam:CreatePolicy',
'iam:CreateRole',
'iam:PutUserPolicy',
'iam:PutRolePolicy',
'iam:AttachUserPolicy',
'iam:AttachRolePolicy',
'iam:SetDefaultPolicyVersion',
'sts:AssumeRole'
]
statements = policy_doc.get('Statement', [])
if not isinstance(statements, list):
statements = [statements]
for statement in statements:
if statement.get('Effect') == 'Allow':
actions = statement.get('Action', [])
if isinstance(actions, str):
actions = [actions]
for action in actions:
if action == '*' or any(dangerous in action for dangerous in dangerous_actions):
return True
return False
def _analyze_escalation_risks(self, policy_doc: Dict[str, Any]) -> List[str]:
"""Analyze privilege escalation risk factors"""
risk_factors = []
statements = policy_doc.get('Statement', [])
if not isinstance(statements, list):
statements = [statements]
for statement in statements:
if statement.get('Effect') == 'Allow':
actions = statement.get('Action', [])
resources = statement.get('Resource', [])
if isinstance(actions, str):
actions = [actions]
if isinstance(resources, str):
resources = [resources]
# Check for wildcard usage
if '*' in actions:
risk_factors.append("Grants all permissions using a wildcard")
if '*' in resources:
risk_factors.append("Grants permissions to all resources")
# Check for IAM management permissions
iam_actions = [action for action in actions if action.startswith('iam:')]
if iam_actions:
risk_factors.append(f"Contains IAM management permissions: {', '.join(iam_actions[:3])}")
# Check for STS permissions
sts_actions = [action for action in actions if action.startswith('sts:')]
if sts_actions:
risk_factors.append(f"Contains STS permissions: {', '.join(sts_actions[:3])}")
return risk_factors
def _check_cross_account_access(self) -> Dict[str, Any]:
"""Check cross-account access configurations"""
# Implement cross-account access check logic
return {
'status': 'PASS',
'message': 'Cross-account access check - PASS',
'severity': 'LOW'
}
def _check_service_accounts(self) -> Dict[str, Any]:
"""Check service account configurations"""
# Implement service account check logic
return {
'status': 'PASS',
'message': 'Service account check - PASS',
'severity': 'LOW'
}
def _check_policy_compliance(self) -> Dict[str, Any]:
"""Check policy compliance"""
# Implement policy compliance check logic
return {
'status': 'PASS',
'message': 'Policy compliance check - PASS',
'severity': 'LOW'
}
# Example usage
compliance_checker = IAMComplianceChecker()
compliance_result = compliance_checker.run_comprehensive_compliance_check()
print("IAM Compliance Check Results:")
print(f"Overall compliance score: {compliance_result['overall_score']}/100")
print(f"Number of critical issues: {len(compliance_result['critical_issues'])}")
print("\nCheck results:")
for check_name, result in compliance_result['checks'].items():
print(f" {check_name}: {result['status']} ({result.get('severity', 'N/A')})")
print("\nRecommended actions:")
for recommendation in compliance_result['recommendations'][:5]:
print(f" - {recommendation}")
if compliance_result['critical_issues']:
print("\nCritical issues:")
for issue in compliance_result['critical_issues']:
print(f" - {issue['check']}: {issue['issue']}")