Chapter 11: Advanced IAM Features and Tools
9/1/25About 13 min
Chapter 11: Advanced IAM Features and Tools
Learning Objectives
By the end of this chapter, you will be able to:
- Master the configuration and use of advanced IAM features
- Learn to use IAM Roles Anywhere for hybrid cloud identity management
- Understand and implement advanced uses of IAM condition keys
- Master the IAM policy simulator and evaluation tools
- Learn to build custom IAM management tools and automation workflows
- Implement deep integration of IAM with DevOps pipelines
11.1 IAM Roles Anywhere
11.1.1 IAM Roles Anywhere Overview
IAM Roles Anywhere allows your workloads outside of AWS to use IAM roles, providing unified identity management for hybrid and multi-cloud environments.
11.1.2 IAM Roles Anywhere Configuration and Usage
import boto3
import json
import base64
import subprocess
from typing import Dict, Any, Optional
from cryptography import x509
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives import serialization
from datetime import datetime, timedelta
class IAMRolesAnywhereManager:
"""IAM Roles Anywhere Manager"""
def __init__(self):
self.rolesanywhere = boto3.client('rolesanywhere')
self.iam = boto3.client('iam')
def create_trust_anchor(self, name: str, certificate_body: str) -> Dict[str, Any]:
"""Create a trust anchor"""
try:
response = self.rolesanywhere.create_trust_anchor(
name=name,
source={
'sourceType': 'CERTIFICATE_BUNDLE',
'sourceData': {
'x509CertificateData': certificate_body
}
},
enabled=True,
tags=[
{
'key': 'Purpose',
'value': 'IAM-Roles-Anywhere'
},
{
'key': 'Environment',
'value': 'Production'
}
]
)
print(f"Trust anchor created successfully: {response['trustAnchor']['trustAnchorId']}")
return response
except Exception as e:
print(f"Failed to create trust anchor: {e}")
return {}
def create_profile(self,
name: str,
role_arns: List[str],
trust_anchor_id: str,
session_policy: Optional[str] = None) -> Dict[str, Any]:
"""Create a profile"""
try:
profile_config = {
'name': name,
'roleArns': role_arns,
'enabled': True,
'requireInstanceProperties': False,
'tags': [
{
'key': 'TrustAnchor',
'value': trust_anchor_id
}
]
}
if session_policy:
profile_config['sessionPolicy'] = session_policy
response = self.rolesanywhere.create_profile(**profile_config)
print(f"Profile created successfully: {response['profile']['profileId']}")
return response
except Exception as e:
print(f"Failed to create profile: {e}")
return {}
def generate_client_certificate(self,
ca_cert_path: str,
ca_key_path: str,
client_name: str) -> Dict[str, str]:
"""Generate a client certificate"""
try:
# Generate a client private key
client_key = rsa.generate_private_key(
public_exponent=65537,
key_size=2048
)
# Load the CA certificate and private key
with open(ca_cert_path, 'rb') as f:
ca_cert = x509.load_pem_x509_certificate(f.read())
with open(ca_key_path, 'rb') as f:
ca_key = serialization.load_pem_private_key(f.read(), password=None)
# Create a client certificate request
subject = x509.Name([
x509.NameAttribute(x509.NameOID.COUNTRY_NAME, "US"),
x509.NameAttribute(x509.NameOID.STATE_OR_PROVINCE_NAME, "CA"),
x509.NameAttribute(x509.NameOID.LOCALITY_NAME, "San Francisco"),
x509.NameAttribute(x509.NameOID.ORGANIZATION_NAME, "Example Corp"),
x509.NameAttribute(x509.NameOID.COMMON_NAME, client_name),
])
# Build the client certificate
cert_builder = x509.CertificateBuilder()
cert_builder = cert_builder.subject_name(subject)
cert_builder = cert_builder.issuer_name(ca_cert.subject)
cert_builder = cert_builder.public_key(client_key.public_key())
cert_builder = cert_builder.serial_number(x509.random_serial_number())
cert_builder = cert_builder.not_valid_before(datetime.utcnow())
cert_builder = cert_builder.not_valid_after(
datetime.utcnow() + timedelta(days=365)
)
# Add extensions
cert_builder = cert_builder.add_extension(
x509.SubjectAlternativeName([
x509.DNSName(client_name),
]),
critical=False,
)
cert_builder = cert_builder.add_extension(
x509.KeyUsage(
key_encipherment=True,
digital_signature=True,
content_commitment=False,
data_encipherment=False,
key_agreement=False,
key_cert_sign=False,
crl_sign=False,
encipher_only=False,
decipher_only=False,
),
critical=True,
)
# Sign the certificate with the CA private key
client_cert = cert_builder.sign(ca_key, hashes.SHA256())
# Serialize the certificate and private key
cert_pem = client_cert.public_bytes(serialization.Encoding.PEM)
key_pem = client_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption()
)
return {
'certificate': cert_pem.decode('utf-8'),
'private_key': key_pem.decode('utf-8'),
'certificate_path': f"{client_name}.crt",
'private_key_path': f"{client_name}.key"
}
except Exception as e:
print(f"Failed to generate client certificate: {e}")
return {}
def create_credentials_helper(self,
profile_arn: str,
role_arn: str,
trust_anchor_arn: str,
certificate_path: str,
private_key_path: str) -> str:
"""Create a credential retrieval script"""
script_content = f'''#!/usr/bin/env python3
import boto3
import json
import sys
from rolesanywhere_credential_helper import CredentialHelper
def get_credentials():
"""Get IAM Roles Anywhere temporary credentials"""
try:
helper = CredentialHelper(
certificate='{certificate_path}',
private_key='{private_key_path}',
profile_arn='{profile_arn}',
role_arn='{role_arn}',
trust_anchor_arn='{trust_anchor_arn}',
region='us-east-1'
)
credentials = helper.get_credentials()
# Output formatted credentials
output = {{
"Version": 1,
"AccessKeyId": credentials["AccessKeyId"],
"SecretAccessKey": credentials["SecretAccessKey"],
"SessionToken": credentials["SessionToken"],
"Expiration": credentials["Expiration"]
}}
print(json.dumps(output))
return credentials
except Exception as e:
print(f"Failed to get credentials: {{e}}", file=sys.stderr)
sys.exit(1)
if __name__ == "__main__":
get_credentials()
'''
return script_content
def setup_credential_process(self,
profile_name: str,
script_path: str) -> str:
"""Set up the AWS CLI credential process configuration"""
aws_config = f'''
[profile {profile_name}]
credential_process = python3 {script_path}
region = us-east-1
'''
return aws_config
# Example usage
roles_anywhere_manager = IAMRolesAnywhereManager()
# Assuming a CA certificate already exists
ca_certificate = """-----BEGIN CERTIFICATE-----
MIIBkTCB+wIJAMlyFqk69v+9MA0GCSqGSIb3DQEBCwUAMBQxEjAQBgNVBAMMCWxv
Y2FsaG9zdDAeFw0yNDAxMDEwMDAwMDBaFw0yNTAxMDEwMDAwMDBaMBQxEjAQBgNV
BAMMCWxvY2FsaG9zdDBcMA0GCSqGSIb3DQEBAQUAA0sAMEgCQQDTgvwjlRHZ9osN
...
-----END CERTIFICATE-----"""
# Create a trust anchor
trust_anchor = roles_anywhere_manager.create_trust_anchor(
name="production-trust-anchor",
certificate_body=ca_certificate
)
# Create an IAM role (assuming it already exists)
role_arn = "arn:aws:iam::123456789012:role/RolesAnywhereTestRole"
# Create a profile
profile = roles_anywhere_manager.create_profile(
name="production-profile",
role_arns=[role_arn],
trust_anchor_id=trust_anchor.get('trustAnchor', {}).get('trustAnchorId', ''),
session_policy=json.dumps({
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": ["s3:GetObject", "s3:PutObject"],
"Resource": "*"
}
]
})
)
print("IAM Roles Anywhere configured successfully!")
11.2 Advanced Use of Condition Keys
11.2.1 Advanced Applications of Condition Keys
class AdvancedConditionKeys:
"""Advanced Condition Key Management"""
def __init__(self):
self.iam = boto3.client('iam')
def create_context_aware_policy(self) -> Dict[str, Any]:
"""Create a context-aware policy"""
# Access control policy based on time, location, and device
advanced_policy = {
"Version": "2012-10-17",
"Statement": [
{
"Sid": "TimeBasedAccess",
"Effect": "Allow",
"Action": [
"s3:GetObject",
"s3:PutObject"
],
"Resource": "arn:aws:s3:::sensitive-data/*",
"Condition": {
"DateGreaterThan": {
"aws:CurrentTime": "2024-01-01T00:00:00Z"
},
"DateLessThan": {
"aws:CurrentTime": "2024-12-31T23:59:59Z"
},
"ForAllValues:StringEquals": {
"aws:RequestedRegion": [
"us-east-1",
"us-west-2"
]
}
}
},
{
"Sid": "IPBasedAccess",
"Effect": "Allow",
"Action": [
"ec2:DescribeInstances",
"ec2:StartInstances",
"ec2:StopInstances"
],
"Resource": "*",
"Condition": {
"IpAddress": {
"aws:SourceIp": [
"203.0.113.0/24", # Corporate network
"198.51.100.0/24" # VPN network
]
}
}
},
{
"Sid": "MFARequiredForSensitiveActions",
"Effect": "Allow",
"Action": [
"iam:CreateUser",
"iam:DeleteUser",
"iam:CreateRole"
],
"Resource": "*",
"Condition": {
"Bool": {
"aws:MultiFactorAuthPresent": "true"
},
"NumericLessThan": {
"aws:MultiFactorAuthAge": "3600" # MFA within 1 hour
}
}
},
{
"Sid": "TagBasedResourceAccess",
"Effect": "Allow",
"Action": [
"ec2:*"
],
"Resource": "*",
"Condition": {
"StringEquals": {
"ec2:ResourceTag/Department": "${aws:PrincipalTag/Department}",
"ec2:ResourceTag/Project": "${aws:PrincipalTag/Project}"
}
}
},
{
"Sid": "BusinessHoursOnly",
"Effect": "Allow",
"Action": [
"rds:StartDBInstance",
"rds:StopDBInstance"
],
"Resource": "*",
"Condition": {
"ForAllValues:StringLike": {
"aws:RequestedTimezone": "America/New_York"
},
"DateGreaterThan": {
"aws:TokenIssueTime": "${aws:CurrentTime - 28800}" # 8 hours ago
},
"IpAddress": {
"aws:SourceIp": "10.0.0.0/8" # Internal network access
}
}
},
{
"Sid": "SecureTransportOnly",
"Effect": "Deny",
"Action": "s3:*",
"Resource": [
"arn:aws:s3:::secure-bucket",
"arn:aws:s3:::secure-bucket/*"
],
"Condition": {
"Bool": {
"aws:SecureTransport": "false"
}
}
}
]
}
return advanced_policy
def create_dynamic_policy_generator(self) -> str:
"""Create a dynamic policy generator"""
generator_code = '''
import json
from datetime import datetime, timedelta
from typing import Dict, List, Any
class DynamicPolicyGenerator:
"""Dynamic Policy Generator"""
def generate_temporary_access_policy(self,
actions: List[str],
resources: List[str],
duration_hours: int = 8,
source_ips: List[str] = None,
require_mfa: bool = True) -> Dict[str, Any]:
"""Generate a temporary access policy"""
# Calculate the expiration time
now = datetime.utcnow()
expiry = now + timedelta(hours=duration_hours)
# Basic conditions
conditions = {
"DateLessThan": {
"aws:CurrentTime": expiry.strftime("%Y-%m-%dT%H:%M:%SZ")
}
}
# Add IP restrictions
if source_ips:
conditions["IpAddress"] = {
"aws:SourceIp": source_ips
}
# Add MFA requirement
if require_mfa:
conditions["Bool"] = {
"aws:MultiFactorAuthPresent": "true"
}
conditions["NumericLessThan"] = {
"aws:MultiFactorAuthAge": "3600"
}
policy = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": actions,
"Resource": resources,
"Condition": conditions
}
]
}
return policy
def generate_department_scoped_policy(self,
department: str,
base_permissions: List[str]) -> Dict[str, Any]:
"""Generate a department-scoped policy"""
policy = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": base_permissions,
"Resource": "*",
"Condition": {
"StringEquals": {
"aws:ResourceTag/Department": department,
"aws:PrincipalTag/Department": department
}
}
},
{
"Effect": "Allow",
"Action": [
"tag:GetResources",
"tag:TagResources",
"tag:UntagResources"
],
"Resource": "*",
"Condition": {
"StringEquals": {
"aws:RequestedTag/Department": department
}
}
}
]
}
return policy
def generate_cost_control_policy(self,
monthly_budget: float,
allowed_instance_types: List[str]) -> Dict[str, Any]:
"""Generate a cost control policy"""
policy = {
"Version": "2012-10-17",
"Statement": [
{
"Sid": "AllowInstanceLaunchWithTypeRestriction",
"Effect": "Allow",
"Action": [
"ec2:RunInstances"
],
"Resource": "arn:aws:ec2:*:*:instance/*",
"Condition": {
"ForAnyValue:StringEquals": {
"ec2:InstanceType": allowed_instance_types
}
}
},
{
"Sid": "PreventLargeInstanceLaunch",
"Effect": "Deny",
"Action": [
"ec2:RunInstances"
],
"Resource": "arn:aws:ec2:*:*:instance/*",
"Condition": {
"ForAnyValue:StringLike": {
"ec2:InstanceType": [
"*.8xlarge",
"*.12xlarge",
"*.16xlarge",
"*.24xlarge"
]
}
}
},
{
"Sid": "RequireCostAllocationTags",
"Effect": "Deny",
"NotAction": [
"iam:*",
"sts:*"
],
"Resource": "*",
"Condition": {
"Null": {
"aws:RequestedTag/CostCenter": "true"
}
}
}
]
}
return policy
# Example usage
generator = DynamicPolicyGenerator()
# Generate a temporary access policy
temp_policy = generator.generate_temporary_access_policy(
actions=["s3:GetObject", "s3:PutObject"],
resources=["arn:aws:s3:::my-bucket/*"],
duration_hours=4,
source_ips=["203.0.113.0/24"],
require_mfa=True
)
print("Temporary access policy:")
print(json.dumps(temp_policy, indent=2))
# Generate a department-scoped policy
dept_policy = generator.generate_department_scoped_policy(
department="Engineering",
base_permissions=["ec2:DescribeInstances", "s3:ListBucket"]
)
print("\nDepartment-scoped policy:")
print(json.dumps(dept_policy, indent=2))
'''
return generator_code
# Example usage
condition_manager = AdvancedConditionKeys()
# Create an advanced context-aware policy
advanced_policy = condition_manager.create_context_aware_policy()
print("Advanced condition policy:")
print(json.dumps(advanced_policy, indent=2))
# Create a dynamic policy generator
generator_code = condition_manager.create_dynamic_policy_generator()
print("\nDynamic policy generator created")
11.3 IAM Policy Simulator and Evaluation Tools
11.3.1 Using the Policy Simulator
class IAMPolicySimulator:
"""IAM Policy Simulator"""
def __init__(self):
self.iam = boto3.client('iam')
def simulate_principal_policy(self,
principal_arn: str,
action_names: List[str],
resource_arns: List[str],
context_entries: List[Dict[str, Any]] = None) -> Dict[str, Any]:
"""Simulate a principal's policy"""
try:
simulation_params = {
'PolicySourceArn': principal_arn,
'ActionNames': action_names,
'ResourceArns': resource_arns
}
if context_entries:
simulation_params['ContextEntries'] = context_entries
response = self.iam.simulate_principal_policy(**simulation_params)
# Analyze the simulation results
results = {
'principal': principal_arn,
'total_evaluations': len(response['EvaluationResults']),
'allowed_actions': [],
'denied_actions': [],
'detailed_results': []
}
for result in response['EvaluationResults']:
action = result['EvalActionName']
resource = result['EvalResourceName']
decision = result['EvalDecision']
result_detail = {
'action': action,
'resource': resource,
'decision': decision,
'matched_statements': result.get('MatchedStatements', []),
'missing_context_values': result.get('MissingContextValues', [])
}
results['detailed_results'].append(result_detail)
if decision == 'allowed':
results['allowed_actions'].append(f"{action} on {resource}")
else:
results['denied_actions'].append(f"{action} on {resource}")
return results
except Exception as e:
print(f"Policy simulation failed: {e}")
return {}
def simulate_custom_policy(self,
policy_input_list: List[str],
action_names: List[str],
resource_arns: List[str],
context_entries: List[Dict[str, Any]] = None) -> Dict[str, Any]:
"""Simulate a custom policy"""
try:
simulation_params = {
'PolicyInputList': policy_input_list,
'ActionNames': action_names,
'ResourceArns': resource_arns
}
if context_entries:
simulation_params['ContextEntries'] = context_entries
response = self.iam.simulate_custom_policy(**simulation_params)
# Analyze the simulation results
results = {
'policy_count': len(policy_input_list),
'total_evaluations': len(response['EvaluationResults']),
'permission_matrix': {},
'policy_effectiveness': {}
}
# Build a permission matrix
for result in response['EvaluationResults']:
action = result['EvalActionName']
resource = result['EvalResourceName']
decision = result['EvalDecision']
if action not in results['permission_matrix']:
results['permission_matrix'][action] = {}
results['permission_matrix'][action][resource] = {
'decision': decision,
'matched_statements': len(result.get('MatchedStatements', []))
}
return results
except Exception as e:
print(f"Custom policy simulation failed: {e}")
return {}
def batch_policy_evaluation(self,
policies: List[Dict[str, Any]],
test_scenarios: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Batch policy evaluation"""
evaluation_results = {
'policies_tested': len(policies),
'scenarios_tested': len(test_scenarios),
'results': {},
'coverage_analysis': {},
'recommendations': []
}
for policy_info in policies:
policy_name = policy_info['name']
policy_document = policy_info['document']
evaluation_results['results'][policy_name] = {
'scenarios': {},
'effectiveness_score': 0,
'coverage_percentage': 0
}
covered_scenarios = 0
for scenario in test_scenarios:
scenario_name = scenario['name']
actions = scenario['actions']
resources = scenario['resources']
context = scenario.get('context', [])
# Run the simulation
simulation = self.simulate_custom_policy(
policy_input_list=[json.dumps(policy_document)],
action_names=actions,
resource_arns=resources,
context_entries=context
)
# Analyze scenario coverage
scenario_coverage = self._analyze_scenario_coverage(
simulation, actions, resources
)
evaluation_results['results'][policy_name]['scenarios'][scenario_name] = {
'coverage': scenario_coverage,
'allowed_actions': scenario_coverage['allowed_count'],
'total_actions': scenario_coverage['total_actions']
}
if scenario_coverage['coverage_percentage'] > 50:
covered_scenarios += 1
# Calculate the policy effectiveness score
coverage_percentage = (covered_scenarios / len(test_scenarios)) * 100
evaluation_results['results'][policy_name]['coverage_percentage'] = coverage_percentage
evaluation_results['results'][policy_name]['effectiveness_score'] = (
coverage_percentage * 0.7 +
self._calculate_security_score(policy_document) * 0.3
)
# Generate recommendations
evaluation_results['recommendations'] = self._generate_policy_recommendations(
evaluation_results['results']
)
return evaluation_results
def _analyze_scenario_coverage(self,
simulation: Dict[str, Any],
actions: List[str],
resources: List[str]) -> Dict[str, Any]:
"""Analyze scenario coverage"""
total_combinations = len(actions) * len(resources)
allowed_count = 0
for action, resource_results in simulation.get('permission_matrix', {}).items():
for resource, result in resource_results.items():
if result['decision'] == 'allowed':
allowed_count += 1
coverage_percentage = (allowed_count / total_combinations * 100) if total_combinations > 0 else 0
return {
'total_actions': len(actions),
'total_resources': len(resources),
'total_combinations': total_combinations,
'allowed_count': allowed_count,
'coverage_percentage': coverage_percentage
}
def _calculate_security_score(self, policy_document: Dict[str, Any]) -> float:
"""Calculate a policy's security score"""
score = 100.0
statements = policy_document.get('Statement', [])
if not isinstance(statements, list):
statements = [statements]
for statement in statements:
# Check for wildcard usage
actions = statement.get('Action', [])
if isinstance(actions, str):
actions = [actions]
if '*' in actions:
score -= 20 # Using wildcard permissions
# Check resource restrictions
resources = statement.get('Resource', [])
if isinstance(resources, str):
resources = [resources]
if '*' in resources:
score -= 15 # Granting permissions to all resources
# Check for condition usage
if not statement.get('Condition'):
score -= 10 # Missing condition restrictions
return max(score, 0)
def _generate_policy_recommendations(self, results: Dict[str, Any]) -> List[str]:
"""Generate policy recommendations"""
recommendations = []
for policy_name, policy_result in results.items():
effectiveness = policy_result['effectiveness_score']
if effectiveness < 60:
recommendations.append(
f"Policy {policy_name} has low effectiveness ({effectiveness:.1f}%), "
"optimizing permission configurations is recommended"
)
if policy_result['coverage_percentage'] < 70:
recommendations.append(
f"Policy {policy_name} has low scenario coverage, "
"there may be insufficient permissions"
)
return recommendations
# Example usage
simulator = IAMPolicySimulator()
# Simulate an existing user policy
user_arn = "arn:aws:iam::123456789012:user/test-user"
actions_to_test = [
"s3:GetObject",
"s3:PutObject",
"ec2:DescribeInstances",
"ec2:StartInstances"
]
resources_to_test = [
"arn:aws:s3:::my-bucket/*",
"arn:aws:ec2:*:*:instance/*"
]
# Add context conditions
context_entries = [
{
'ContextKeyName': 'aws:SourceIp',
'ContextKeyValues': ['203.0.113.0/24'],
'ContextKeyType': 'string'
},
{
'ContextKeyName': 'aws:MultiFactorAuthPresent',
'ContextKeyValues': ['true'],
'ContextKeyType': 'boolean'
}
]
simulation_result = simulator.simulate_principal_policy(
principal_arn=user_arn,
action_names=actions_to_test,
resource_arns=resources_to_test,
context_entries=context_entries
)
print("Policy simulation results:")
print(f"Principal: {simulation_result.get('principal')}")
print(f"Total evaluations: {simulation_result.get('total_evaluations')}")
print(f"Allowed actions: {len(simulation_result.get('allowed_actions', []))}")
print(f"Denied actions: {len(simulation_result.get('denied_actions', []))}")
print("\nAllowed actions:")
for action in simulation_result.get('allowed_actions', [])[:5]:
print(f" - {action}")
print("\nDenied actions:")
for action in simulation_result.get('denied_actions', [])[:5]:
print(f" - {action}")
11.4 Custom IAM Management Tools
11.4.1 IAM Resource Management Tool
import csv
import io
from concurrent.futures import ThreadPoolExecutor, as_completed
class CustomIAMManager:
"""Custom IAM Management Tool"""
def __init__(self):
self.iam = boto3.client('iam')
self.sts = boto3.client('sts')
def bulk_user_management(self, operations_file: str) -> Dict[str, Any]:
"""Bulk user management"""
results = {
'created_users': [],
'updated_users': [],
'deleted_users': [],
'errors': [],
'total_operations': 0
}
try:
with open(operations_file, 'r', encoding='utf-8') as file:
csv_reader = csv.DictReader(file)
for row in csv_reader:
results['total_operations'] += 1
operation = row.get('operation', '').lower()
username = row.get('username', '')
try:
if operation == 'create':
result = self._create_user_with_details(row)
results['created_users'].append(result)
elif operation == 'update':
result = self._update_user_details(row)
results['updated_users'].append(result)
elif operation == 'delete':
result = self._delete_user_safely(username)
results['deleted_users'].append(result)
except Exception as e:
results['errors'].append({
'username': username,
'operation': operation,
'error': str(e)
})
return results
except Exception as e:
print(f"Bulk operation failed: {e}")
return results
def _create_user_with_details(self, user_details: Dict[str, str]) -> Dict[str, Any]:
"""Create a user and related configurations"""
username = user_details['username']
email = user_details.get('email', '')
department = user_details.get('department', '')
groups = user_details.get('groups', '').split(',')
policies = user_details.get('policies', '').split(',')
# Create the user
user_response = self.iam.create_user(
UserName=username,
Tags=[
{'Key': 'Email', 'Value': email},
{'Key': 'Department', 'Value': department},
{'Key': 'CreatedBy', 'Value': 'BulkManager'},
{'Key': 'CreatedAt', 'Value': datetime.utcnow().isoformat()}
]
)
# Add to groups
for group in groups:
if group.strip():
try:
self.iam.add_user_to_group(
GroupName=group.strip(),
UserName=username
)
except:
pass
# Attach policies
for policy in policies:
if policy.strip():
try:
self.iam.attach_user_policy(
UserName=username,
PolicyArn=policy.strip()
)
except:
pass
# Create an access key (if needed)
access_key = None
if user_details.get('create_access_key', '').lower() == 'true':
key_response = self.iam.create_access_key(UserName=username)
access_key = {
'AccessKeyId': key_response['AccessKey']['AccessKeyId'],
'SecretAccessKey': key_response['AccessKey']['SecretAccessKey']
}
return {
'username': username,
'user_arn': user_response['User']['Arn'],
'access_key': access_key,
'groups_added': [g.strip() for g in groups if g.strip()],
'policies_attached': [p.strip() for p in policies if p.strip()]
}
def _update_user_details(self, user_details: Dict[str, str]) -> Dict[str, Any]:
"""Update user details"""
username = user_details['username']
updates = {}
# Update tags
new_tags = []
if user_details.get('email'):
new_tags.append({'Key': 'Email', 'Value': user_details['email']})
if user_details.get('department'):
new_tags.append({'Key': 'Department', 'Value': user_details['department']})
if new_tags:
self.iam.tag_user(UserName=username, Tags=new_tags)
updates['tags_updated'] = True
# Update group memberships
if user_details.get('groups'):
current_groups = self.iam.get_groups_for_user(UserName=username)
current_group_names = {g['GroupName'] for g in current_groups['Groups']}
target_groups = {g.strip() for g in user_details['groups'].split(',') if g.strip()}
# Remove from unnecessary groups
for group in current_group_names - target_groups:
self.iam.remove_user_from_group(UserName=username, GroupName=group)
# Add to new groups
for group in target_groups - current_group_names:
try:
self.iam.add_user_to_group(UserName=username, GroupName=group)
except:
pass
updates['groups_updated'] = True
return {
'username': username,
'updates': updates
}
def _delete_user_safely(self, username: str) -> Dict[str, Any]:
"""Safely delete a user"""
cleanup_actions = []
try:
# Remove from group memberships
groups = self.iam.get_groups_for_user(UserName=username)
for group in groups['Groups']:
self.iam.remove_user_from_group(
UserName=username,
GroupName=group['GroupName']
)
cleanup_actions.append(f"Removed from group {group['GroupName']}")
# Detach attached policies
attached_policies = self.iam.list_attached_user_policies(UserName=username)
for policy in attached_policies['AttachedPolicies']:
self.iam.detach_user_policy(
UserName=username,
PolicyArn=policy['PolicyArn']
)
cleanup_actions.append(f"Detached policy {policy['PolicyName']}")
# Delete inline policies
inline_policies = self.iam.list_user_policies(UserName=username)
for policy_name in inline_policies['PolicyNames']:
self.iam.delete_user_policy(
UserName=username,
PolicyName=policy_name
)
cleanup_actions.append(f"Deleted inline policy {policy_name}")
# Delete access keys
access_keys = self.iam.list_access_keys(UserName=username)
for key in access_keys['AccessKeyMetadata']:
self.iam.delete_access_key(
UserName=username,
AccessKeyId=key['AccessKeyId']
)
cleanup_actions.append(f"Deleted access key {key['AccessKeyId']}")
# Delete MFA devices
mfa_devices = self.iam.list_mfa_devices(UserName=username)
for device in mfa_devices['MFADevices']:
self.iam.deactivate_mfa_device(
UserName=username,
SerialNumber=device['SerialNumber']
)
cleanup_actions.append(f"Deactivated MFA device {device['SerialNumber']}")
# Finally, delete the user
self.iam.delete_user(UserName=username)
cleanup_actions.append("Deleted user")
return {
'username': username,
'deleted': True,
'cleanup_actions': cleanup_actions
}
except Exception as e:
return {
'username': username,
'deleted': False,
'error': str(e),
'cleanup_actions': cleanup_actions
}
def generate_access_report(self, output_format: str = 'csv') -> str:
"""Generate an access report"""
# Generate the credential report
self.iam.generate_credential_report()
# Wait for the report to complete
import time
while True:
try:
report = self.iam.get_credential_report()
if report['State'] == 'COMPLETE':
break
except:
pass
time.sleep(2)
# Parse the report content
report_content = report['Content'].decode('utf-8')
csv_data = list(csv.DictReader(io.StringIO(report_content)))
if output_format.lower() == 'json':
return json.dumps(csv_data, indent=2, default=str)
elif output_format.lower() == 'html':
return self._generate_html_report(csv_data)
else:
return report_content
def _generate_html_report(self, data: List[Dict[str, Any]]) -> str:
"""Generate an HTML format report"""
html_template = '''
<!DOCTYPE html>
<html>
<head>
<title>IAM Access Report</title>
<style>
table { border-collapse: collapse; width: 100%; }
th, td { border: 1px solid #ddd; padding: 8px; text-align: left; }
th { background-color: #f2f2f2; }
.risk-high { background-color: #ffebee; }
.risk-medium { background-color: #fff3e0; }
.risk-low { background-color: #e8f5e8; }
</style>
</head>
<body>
<h1>IAM User Access Report</h1>
<table>
<tr>
<th>Username</th>
<th>Creation Time</th>
<th>Password Last Used</th>
<th>Access Key 1 Status</th>
<th>Access Key 1 Last Used</th>
<th>MFA Enabled</th>
<th>Risk Level</th>
</tr>
'''
for user in data:
username = user.get('user', '')
created = user.get('user_creation_time', '')
password_used = user.get('password_last_used', 'N/A')
key1_active = user.get('access_key_1_active', 'false')
key1_used = user.get('access_key_1_last_used_date', 'N/A')
mfa_active = user.get('mfa_active', 'false')
# Calculate risk level
risk_class = self._calculate_risk_class(user)
html_template += f'''
<tr class="{risk_class}">
<td>{username}</td>
<td>{created}</td>
<td>{password_used}</td>
<td>{key1_active}</td>
<td>{key1_used}</td>
<td>{mfa_active}</td>
<td>{risk_class.replace('risk-', '').upper()}</td>
</tr>
'''
html_template += '''
</table>
</body>
</html>
'''
return html_template
def _calculate_risk_class(self, user_data: Dict[str, str]) -> str:
"""Calculate a user's risk level"""
risk_score = 0
# Check password usage
password_used = user_data.get('password_last_used', 'N/A')
if password_used == 'N/A':
risk_score += 30
elif password_used != 'no_information':
try:
last_used = datetime.strptime(password_used.split('T')[0], '%Y-%m-%d')
days_since = (datetime.now() - last_used).days
if days_since > 90:
risk_score += 20
elif days_since > 30:
risk_score += 10
except:
pass
# Check MFA status
if user_data.get('mfa_active', 'false').lower() != 'true':
risk_score += 25
# Check access keys
if user_data.get('access_key_1_active', 'false').lower() == 'true':
key_used = user_data.get('access_key_1_last_used_date', 'N/A')
if key_used == 'N/A':
risk_score += 15
if risk_score >= 50:
return 'risk-high'
elif risk_score >= 25:
return 'risk-medium'
else:
return 'risk-low'
# Example usage
iam_manager = CustomIAMManager()
# Example of a bulk operations CSV file
csv_content = """operation,username,email,department,groups,policies,create_access_key
create,john.doe,john@example.com,Engineering,"Developers,TeamLead",arn:aws:iam::aws:policy/PowerUserAccess,true
create,jane.smith,jane@example.com,Marketing,"Marketing,ContentCreators",arn:aws:iam::aws:policy/ReadOnlyAccess,false
update,john.doe,john.doe@example.com,Engineering,"Developers,SeniorDev",,false
delete,old.user,,,,false"""
# Save the CSV file
with open('bulk_operations.csv', 'w') as f:
f.write(csv_content)
# Execute bulk operations
bulk_results = iam_manager.bulk_user_management('bulk_operations.csv')
print("Bulk operation results:")
print(f"Users created: {len(bulk_results['created_users'])}")
print(f"Users updated: {len(bulk_results['updated_users'])}")
print(f"Users deleted: {len(bulk_results['deleted_users'])}")
print(f"Number of errors: {len(bulk_results['errors'])}")
# Generate an access report
access_report = iam_manager.generate_access_report(output_format='json')
print("\nAccess report generated")
11.5 DevOps Integration
11.5.1 CI/CD Pipeline IAM Integration
class DevOpsIAMIntegration:
"""DevOps IAM Integration"""
def __init__(self):
self.iam = boto3.client('iam')
self.sts = boto3.client('sts')
def create_cicd_role_template(self) -> Dict[str, Any]:
"""Create a CI/CD role template"""
# Basic deployment role
deployment_role = {
"trust_policy": {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": "codebuild.amazonaws.com"
},
"Action": "sts:AssumeRole"
},
{
"Effect": "Allow",
"Principal": {
"AWS": "arn:aws:iam::ACCOUNT-ID:root"
},
"Action": "sts:AssumeRole",
"Condition": {
"StringEquals": {
"sts:ExternalId": "unique-external-id"
}
}
}
]
},
"permissions_policy": {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"s3:GetObject",
"s3:PutObject",
"s3:DeleteObject"
],
"Resource": [
"arn:aws:s3:::deployment-artifacts/*",
"arn:aws:s3:::application-configs/*"
]
},
{
"Effect": "Allow",
"Action": [
"lambda:UpdateFunctionCode",
"lambda:UpdateFunctionConfiguration",
"lambda:PublishVersion",
"lambda:CreateAlias",
"lambda:UpdateAlias"
],
"Resource": "arn:aws:lambda:*:*:function:app-*"
},
{
"Effect": "Allow",
"Action": [
"cloudformation:CreateStack",
"cloudformation:UpdateStack",
"cloudformation:DeleteStack",
"cloudformation:DescribeStacks",
"cloudformation:DescribeStackEvents"
],
"Resource": "arn:aws:cloudformation:*:*:stack/app-*/*"
}
]
}
}
return deployment_role
def setup_oidc_provider_github(self, github_repo: str) -> Dict[str, Any]:
"""Set up an OIDC provider for GitHub Actions"""
try:
# Create an OIDC identity provider
oidc_response = self.iam.create_open_id_connect_provider(
Url='https://token.actions.githubusercontent.com',
ClientIDList=['sts.amazonaws.com'],
ThumbprintList=[
'6938fd4d98bab03faadb97b34396831e3780aea1',
'1c58a3a8518e8759bf075b76b750d4f2df264fcd'
],
Tags=[
{
'Key': 'Purpose',
'Value': 'GitHub-Actions-OIDC'
}
]
)
provider_arn = oidc_response['OpenIDConnectProviderArn']
# Create a GitHub Actions role
trust_policy = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Federated": provider_arn
},
"Action": "sts:AssumeRoleWithWebIdentity",
"Condition": {
"StringEquals": {
"token.actions.githubusercontent.com:aud": "sts.amazonaws.com",
"token.actions.githubusercontent.com:sub": f"repo:{github_repo}:ref:refs/heads/main"
}
}
}
]
}
role_response = self.iam.create_role(
RoleName='GitHubActionsDeploymentRole',
AssumeRolePolicyDocument=json.dumps(trust_policy),
Description='Role for GitHub Actions deployments',
Tags=[
{
'Key': 'Repository',
'Value': github_repo
},
{
'Key': 'Purpose',
'Value': 'CICD-Deployment'
}
]
)
# Attach a deployment permission policy
deployment_policy = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"s3:*"
],
"Resource": [
"arn:aws:s3:::github-actions-*",
"arn:aws:s3:::github-actions-*/*"
]
},
{
"Effect": "Allow",
"Action": [
"cloudformation:*"
],
"Resource": "*",
"Condition": {
"StringLike": {
"cloudformation:TemplateURL": [
"https://github.com/*/*"
]
}
}
}
]
}
self.iam.put_role_policy(
RoleName='GitHubActionsDeploymentRole',
PolicyName='DeploymentPermissions',
PolicyDocument=json.dumps(deployment_policy)
)
return {
'provider_arn': provider_arn,
'role_arn': role_response['Role']['Arn'],
'github_workflow_config': self._generate_github_workflow_config(
role_response['Role']['Arn']
)
}
except Exception as e:
print(f"Failed to set up GitHub OIDC: {e}")
return {}
def _generate_github_workflow_config(self, role_arn: str) -> str:
"""Generate a GitHub Actions workflow configuration"""
workflow_config = f'''
name: Deploy to AWS
on:
push:
branches: [ main ]
pull_request:
branches: [ main ]
permissions:
id-token: write
contents: read
jobs:
deploy:
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v3
- name: Configure AWS credentials
uses: aws-actions/configure-aws-credentials@v2
with:
role-to-assume: {role_arn}
role-session-name: GitHubActionsSession
aws-region: us-east-1
- name: Deploy to AWS
run: |
# Deployment script
aws sts get-caller-identity
aws s3 ls
# Add your deployment commands here
- name: Validate deployment
run: |
# Validate the deployment
echo "Deployment validation completed"
'''
return workflow_config
def create_environment_roles(self, environments: List[str]) -> Dict[str, Dict[str, Any]]:
"""Create roles for different environments"""
environment_roles = {}
for env in environments:
role_name = f"DeploymentRole-{env.title()}"
# Environment-specific trust policy
trust_policy = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"AWS": f"arn:aws:iam::{self._get_account_id()}:root"
},
"Action": "sts:AssumeRole",
"Condition": {
"StringEquals": {
"aws:RequestedTag/Environment": env,
"aws:PrincipalTag/Department": ["DevOps", "Engineering"]
}
}
}
]
}
# Environment-specific permission policy
permissions = self._generate_environment_permissions(env)
try:
# Create the role
role_response = self.iam.create_role(
RoleName=role_name,
AssumeRolePolicyDocument=json.dumps(trust_policy),
Description=f'Deployment role for {env} environment',
Tags=[
{
'Key': 'Environment',
'Value': env
},
{
'Key': 'Purpose',
'Value': 'Environment-Deployment'
}
]
)
# Attach the permission policy
self.iam.put_role_policy(
RoleName=role_name,
PolicyName=f'{env}DeploymentPermissions',
PolicyDocument=json.dumps(permissions)
)
environment_roles[env] = {
'role_arn': role_response['Role']['Arn'],
'role_name': role_name,
'permissions': permissions
}
except Exception as e:
print(f"Failed to create {env} environment role: {e}")
environment_roles[env] = {'error': str(e)}
return environment_roles
def _generate_environment_permissions(self, environment: str) -> Dict[str, Any]:
"""Generate environment-specific permissions"""
base_permissions = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"s3:GetObject",
"s3:PutObject"
],
"Resource": f"arn:aws:s3:::app-{environment}-*/*"
},
{
"Effect": "Allow",
"Action": [
"cloudformation:CreateStack",
"cloudformation:UpdateStack",
"cloudformation:DescribeStacks"
],
"Resource": f"arn:aws:cloudformation:*:*:stack/app-{environment}-*/*"
}
]
}
# Additional restrictions for the production environment
if environment.lower() == 'production':
base_permissions["Statement"].append({
"Effect": "Deny",
"Action": [
"cloudformation:DeleteStack"
],
"Resource": "*",
"Condition": {
"Bool": {
"aws:MultiFactorAuthPresent": "false"
}
}
})
# More lenient permissions for the development environment
elif environment.lower() == 'development':
base_permissions["Statement"].append({
"Effect": "Allow",
"Action": [
"cloudformation:DeleteStack"
],
"Resource": f"arn:aws:cloudformation:*:*:stack/app-{environment}-*/*"
})
return base_permissions
def _get_account_id(self) -> str:
"""Get the current account ID"""
try:
return self.sts.get_caller_identity()['Account']
except:
return "123456789012" # Default value
# Example usage
devops_integration = DevOpsIAMIntegration()
# Create a CI/CD role template
cicd_template = devops_integration.create_cicd_role_template()
print("CI/CD Role Template:")
print(f"Number of trust policy statements: {len(cicd_template['trust_policy']['Statement'])}")
print(f"Number of permission policy statements: {len(cicd_template['permissions_policy']['Statement'])}")
# Set up GitHub Actions OIDC
github_setup = devops_integration.setup_oidc_provider_github('myorg/myapp')
if github_setup:
print(f"\nGitHub OIDC setup complete:")
print(f"Provider ARN: {github_setup.get('provider_arn')}")
print(f"Role ARN: {github_setup.get('role_arn')}")
# Create environment roles
environment_roles = devops_integration.create_environment_roles([
'development',
'staging',
'production'
])
print("\nEnvironment role creation results:")
for env, role_info in environment_roles.items():
if 'error' in role_info:
print(f" {env}: Creation failed - {role_info['error']}")
else:
print(f" {env}: {role_info['role_arn']}")