Chapter 03: In-depth Understanding of IAM Roles
9/1/25About 15 min
Chapter 03: In-depth Understanding of IAM Roles
Learning Objectives
- Understand the concept and purpose of IAM roles
- Create and configure service roles
- Set up cross-account role access
- Configure federated identity roles
- Master role switching and temporary credentials
Role System Architecture Diagram
3.1 Basic Concepts of IAM Roles
3.1.1 Difference Between Roles and Users
Core Differences
Feature | IAM User | IAM Role |
---|---|---|
Identity Type | Permanent Identity | Temporary Identity |
Credentials | Long-term Credentials | Temporary Credentials |
Use Case | Human Access | Inter-service Access |
Permission Acquisition | Directly Attached | Assumed |
Security | Requires Key Management | Automatic Rotation |
3.1.2 How Roles Work
3.1.3 Creating a Basic IAM Role
import boto3
import json
from datetime import datetime, timedelta
def create_basic_role(role_name, trusted_entity, description=""):
"""
Create a basic IAM role
Args:
role_name: The role name
trusted_entity: The trusted entity (service or account)
description: The role description
"""
iam = boto3.client('iam')
# Basic trust policy templates
trust_policy_templates = {
'ec2': {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": "ec2.amazonaws.com"
},
"Action": "sts:AssumeRole"
}
]
},
'lambda': {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": "lambda.amazonaws.com"
},
"Action": "sts:AssumeRole"
}
]
},
'cross-account': {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"AWS": f"arn:aws:iam::{trusted_entity}:root"
},
"Action": "sts:AssumeRole"
}
]
}
}
try:
# Select the trust policy
if trusted_entity in trust_policy_templates:
trust_policy = trust_policy_templates[trusted_entity]
elif trusted_entity.startswith('arn:aws:iam::'):
# Cross-account role
trust_policy = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"AWS": trusted_entity
},
"Action": "sts:AssumeRole"
}
]
}
else:
# Custom service
trust_policy = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": trusted_entity
},
"Action": "sts:AssumeRole"
}
]
}
# Create the role
response = iam.create_role(
RoleName=role_name,
AssumeRolePolicyDocument=json.dumps(trust_policy),
Description=description,
MaxSessionDuration=3600, # 1 hour
Tags=[
{
'Key': 'CreatedBy',
'Value': 'IAM-Automation'
},
{
'Key': 'CreatedDate',
'Value': datetime.now().strftime('%Y-%m-%d')
}
]
)
print(f"✅ Role {role_name} created successfully")
print(f" ARN: {response['Role']['Arn']}")
return response['Role']
except Exception as e:
print(f"❌ Failed to create role: {str(e)}")
return None
# Example of creating different types of roles
ec2_role = create_basic_role("EC2-S3-Access-Role", "ec2", "Role for EC2 instances to access S3")
lambda_role = create_basic_role("Lambda-Execution-Role", "lambda", "Execution role for Lambda functions")
3.2 Service Roles Explained
3.2.1 EC2 Instance Role
def create_ec2_instance_role():
"""
Create an EC2 instance role and instance profile
"""
iam = boto3.client('iam')
# 1. Create the role
trust_policy = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": "ec2.amazonaws.com"
},
"Action": "sts:AssumeRole"
}
]
}
try:
# Create the role
role_response = iam.create_role(
RoleName='EC2-WebServer-Role',
AssumeRolePolicyDocument=json.dumps(trust_policy),
Description='Role for EC2 web servers with S3 and CloudWatch access'
)
role_arn = role_response['Role']['Arn']
print(f"✅ EC2 role created successfully: {role_arn}")
# 2. Attach AWS managed policies
managed_policies = [
'arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess',
'arn:aws:iam::aws:policy/CloudWatchAgentServerPolicy'
]
for policy_arn in managed_policies:
iam.attach_role_policy(
RoleName='EC2-WebServer-Role',
PolicyArn=policy_arn
)
print(f"✅ Policy attached: {policy_arn}")
# 3. Create a custom policy
custom_policy = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"s3:PutObject",
"s3:PutObjectAcl"
],
"Resource": "arn:aws:s3:::my-web-logs/*"
},
{
"Effect": "Allow",
"Action": [
"ssm:GetParameter",
"ssm:GetParameters",
"ssm:GetParametersByPath"
],
"Resource": "arn:aws:ssm:*:*:parameter/webserver/*"
}
]
}
policy_response = iam.create_policy(
PolicyName='EC2-WebServer-CustomPolicy',
PolicyDocument=json.dumps(custom_policy),
Description='Custom policy for web server specific permissions'
)
# Attach the custom policy
iam.attach_role_policy(
RoleName='EC2-WebServer-Role',
PolicyArn=policy_response['Policy']['Arn']
)
# 4. Create an instance profile
iam.create_instance_profile(
InstanceProfileName='EC2-WebServer-Profile'
)
# 5. Add the role to the instance profile
iam.add_role_to_instance_profile(
InstanceProfileName='EC2-WebServer-Profile',
RoleName='EC2-WebServer-Role'
)
print("✅ EC2 instance profile created successfully")
# 6. Example of using the role when launching an EC2 instance
ec2_launch_example = """
# Specify the instance profile when launching an EC2 instance
import boto3
ec2 = boto3.client('ec2')
response = ec2.run_instances(
ImageId='ami-0abcdef1234567890',
MinCount=1,
MaxCount=1,
InstanceType='t3.micro',
IamInstanceProfile={
'Name': 'EC2-WebServer-Profile'
},
SecurityGroupIds=['sg-12345678'],
SubnetId='subnet-12345678'
)
"""
print("📝 Example code for launching an EC2 instance:")
print(ec2_launch_example)
return role_arn
except Exception as e:
print(f"❌ Failed to create EC2 role: {str(e)}")
return None
# Test permissions in the EC2 instance role
def test_ec2_role_permissions():
"""
Example code for testing role permissions within an EC2 instance
"""
test_code = """
# This code should be run within an EC2 instance
import boto3
# The EC2 instance will automatically use the attached IAM role
s3 = boto3.client('s3')
ssm = boto3.client('ssm')
try:
# Test S3 read permission
buckets = s3.list_buckets()
print(f"✅ S3 access successful, found {len(buckets['Buckets'])} buckets")
# Test SSM parameter access
parameters = ssm.get_parameters_by_path(
Path='/webserver/',
Recursive=True
)
print(f"✅ SSM parameter access successful, found {len(parameters['Parameters'])} parameters")
# Test uploading logs to S3
s3.put_object(
Bucket='my-web-logs',
Key=f'access-logs/{datetime.now().strftime("%Y/%m/%d")}/access.log',
Body='Example log content'
)
print("✅ Log upload to S3 successful")
except Exception as e:
print(f"❌ Permission test failed: {str(e)}")
"""
print("📝 Permission test code within an EC2 instance:")
print(test_code)
create_ec2_instance_role()
test_ec2_role_permissions()
3.2.2 Lambda Execution Role
def create_lambda_execution_role(function_name, additional_permissions=None):
"""
Create a Lambda function execution role
Args:
function_name: The Lambda function name
additional_permissions: A list of additional permissions
"""
iam = boto3.client('iam')
role_name = f"Lambda-{function_name}-ExecutionRole"
# Basic Lambda trust policy
trust_policy = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": "lambda.amazonaws.com"
},
"Action": "sts:AssumeRole"
}
]
}
try:
# 1. Create the role
role_response = iam.create_role(
RoleName=role_name,
AssumeRolePolicyDocument=json.dumps(trust_policy),
Description=f'Execution role for Lambda function {function_name}'
)
role_arn = role_response['Role']['Arn']
print(f"✅ Lambda execution role created successfully: {role_arn}")
# 2. Attach the basic Lambda execution policy
iam.attach_role_policy(
RoleName=role_name,
PolicyArn='arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole'
)
# 3. Attach additional permissions as needed
if additional_permissions:
# Create a custom policy
custom_policy = {
"Version": "2012-10-17",
"Statement": additional_permissions
}
policy_response = iam.create_policy(
PolicyName=f'Lambda-{function_name}-CustomPolicy',
PolicyDocument=json.dumps(custom_policy),
Description=f'Custom permissions for Lambda function {function_name}'
)
# Attach the custom policy
iam.attach_role_policy(
RoleName=role_name,
PolicyArn=policy_response['Policy']['Arn']
)
print(f"✅ Custom policy attached")
return role_arn
except Exception as e:
print(f"❌ Failed to create Lambda role: {str(e)}")
return None
# Create Lambda roles for different scenarios
def create_specialized_lambda_roles():
"""
Create Lambda roles for specific scenarios
"""
# 1. S3 processing Lambda role
s3_permissions = [
{
"Effect": "Allow",
"Action": [
"s3:GetObject",
"s3:PutObject",
"s3:DeleteObject"
],
"Resource": "arn:aws:s3:::my-lambda-bucket/*"
},
{
"Effect": "Allow",
"Action": [
"s3:ListBucket"
],
"Resource": "arn:aws:s3:::my-lambda-bucket"
}
]
s3_role = create_lambda_execution_role("S3Processor", s3_permissions)
# 2. DynamoDB processing Lambda role
dynamodb_permissions = [
{
"Effect": "Allow",
"Action": [
"dynamodb:GetItem",
"dynamodb:PutItem",
"dynamodb:UpdateItem",
"dynamodb:DeleteItem",
"dynamodb:Query",
"dynamodb:Scan"
],
"Resource": "arn:aws:dynamodb:*:*:table/MyTable*"
}
]
dynamodb_role = create_lambda_execution_role("DynamoDBProcessor", dynamodb_permissions)
# 3. SNS/SQS processing Lambda role
messaging_permissions = [
{
"Effect": "Allow",
"Action": [
"sns:Publish"
],
"Resource": "arn:aws:sns:*:*:MyTopic"
},
{
"Effect": "Allow",
"Action": [
"sqs:ReceiveMessage",
"sqs:DeleteMessage",
"sqs:GetQueueAttributes",
"sqs:SendMessage"
],
"Resource": "arn:aws:sqs:*:*:MyQueue"
}
]
messaging_role = create_lambda_execution_role("MessagingProcessor", messaging_permissions)
print("\n📋 Summary of created Lambda roles:")
print(f" S3 processing role: {s3_role}")
print(f" DynamoDB processing role: {dynamodb_role}")
print(f" Messaging processing role: {messaging_role}")
create_specialized_lambda_roles()
3.3 Cross-Account Role Access
3.3.1 Setting up a Cross-Account Role
def create_cross_account_role(trusted_account_id, role_name, external_id=None):
"""
Create a cross-account access role
Args:
trusted_account_id: The trusted AWS account ID
role_name: The role name
external_id: An external ID (optional, for enhanced security)
"""
iam = boto3.client('iam')
# Build the trust policy
trust_statement = {
"Effect": "Allow",
"Principal": {
"AWS": f"arn:aws:iam::{trusted_account_id}:root"
},
"Action": "sts:AssumeRole"
}
# Add a condition if an external ID is provided
if external_id:
trust_statement["Condition"] = {
"StringEquals": {
"sts:ExternalId": external_id
}
}
trust_policy = {
"Version": "2012-10-17",
"Statement": [trust_statement]
}
try:
# Create the cross-account role
role_response = iam.create_role(
RoleName=role_name,
AssumeRolePolicyDocument=json.dumps(trust_policy),
Description=f'Cross-account role for account {trusted_account_id}',
MaxSessionDuration=3600,
Tags=[
{
'Key': 'Type',
'Value': 'CrossAccount'
},
{
'Key': 'TrustedAccount',
'Value': trusted_account_id
}
]
)
role_arn = role_response['Role']['Arn']
print(f"✅ Cross-account role created successfully: {role_arn}")
# Attach an example permission policy
cross_account_policy = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"s3:ListBucket",
"s3:GetObject"
],
"Resource": [
"arn:aws:s3:::shared-data-bucket",
"arn:aws:s3:::shared-data-bucket/*"
]
},
{
"Effect": "Allow",
"Action": [
"ec2:DescribeInstances",
"ec2:DescribeSecurityGroups"
],
"Resource": "*"
}
]
}
# Create and attach the policy
policy_response = iam.create_policy(
PolicyName=f'{role_name}-Policy',
PolicyDocument=json.dumps(cross_account_policy),
Description=f'Policy for cross-account role {role_name}'
)
iam.attach_role_policy(
RoleName=role_name,
PolicyArn=policy_response['Policy']['Arn']
)
print(f"✅ Cross-account policy attached")
# Provide usage instructions
usage_info = f"""
📝 Instructions for using the cross-account role:
1. Role ARN: {role_arn}
2. External ID: {external_id if external_id else 'None'}
3. Trusted account: {trusted_account_id}
To use this role in the trusted account:
import boto3
sts = boto3.client('sts')
# Assume the role
response = sts.assume_role(
RoleArn='{role_arn}',
RoleSessionName='CrossAccountSession'{external_id_param}
)
# Use the temporary credentials
credentials = response['Credentials']
s3 = boto3.client(
's3',
aws_access_key_id=credentials['AccessKeyId'],
aws_secret_access_key=credentials['SecretAccessKey'],
aws_session_token=credentials['SessionToken']
)
"""
external_id_param = f",\n ExternalId='{external_id}'" if external_id else ""
print(usage_info.format(
role_arn=role_arn,
external_id_param=external_id_param
))
return role_arn
except Exception as e:
print(f"❌ Failed to create cross-account role: {str(e)}")
return None
def assume_cross_account_role(role_arn, session_name, external_id=None, duration=3600):
"""
Assume a cross-account role
Args:
role_arn: The role ARN
session_name: The session name
external_id: The external ID
duration: The session duration in seconds
"""
sts = boto3.client('sts')
try:
# Build the AssumeRole parameters
assume_role_params = {
'RoleArn': role_arn,
'RoleSessionName': session_name,
'DurationSeconds': duration
}
if external_id:
assume_role_params['ExternalId'] = external_id
# Assume the role
response = sts.assume_role(**assume_role_params)
credentials = response['Credentials']
assumed_role_user = response['AssumedRoleUser']
print(f"✅ Successfully assumed role")
print(f" User ARN: {assumed_role_user['Arn']}")
print(f" Credential expiration: {credentials['Expiration']}")
# Create a client example using the temporary credentials
temp_session = boto3.Session(
aws_access_key_id=credentials['AccessKeyId'],
aws_secret_access_key=credentials['SecretAccessKey'],
aws_session_token=credentials['SessionToken']
)
# Test permissions
s3_client = temp_session.client('s3')
try:
buckets = s3_client.list_buckets()
print(f"✅ S3 access test successful, can access {len(buckets['Buckets'])} buckets")
except Exception as e:
print(f"⚠️ S3 access test failed: {str(e)}")
return temp_session
except Exception as e:
print(f"❌ Failed to assume role: {str(e)}")
return None
# Example of creating a cross-account role
cross_account_role = create_cross_account_role(
trusted_account_id="123456789012",
role_name="CrossAccount-DataAccess-Role",
external_id="unique-external-id-12345"
)
3.3.2 Advanced Cross-Account Role Configuration
def create_advanced_cross_account_role():
"""
Create a cross-account role with advanced security configurations
"""
iam = boto3.client('iam')
# Advanced trust policy with multiple security conditions
trust_policy = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"AWS": "arn:aws:iam::123456789012:root"
},
"Action": "sts:AssumeRole",
"Condition": {
"StringEquals": {
"sts:ExternalId": "secure-external-id-2024"
},
"IpAddress": {
"aws:SourceIp": [
"203.0.113.0/24", # Allowed IP range
"198.51.100.0/24"
]
},
"DateGreaterThan": {
"aws:CurrentTime": "2024-01-01T00:00:00Z"
},
"DateLessThan": {
"aws:CurrentTime": "2024-12-31T23:59:59Z"
},
"StringLike": {
"aws:userid": "AIDACKCEVSQ6C2EXAMPLE:*"
}
}
},
{
"Effect": "Allow",
"Principal": {
"AWS": [
"arn:aws:iam::123456789012:user/TrustedUser1",
"arn:aws:iam::123456789012:user/TrustedUser2"
]
},
"Action": "sts:AssumeRole",
"Condition": {
"Bool": {
"aws:MultiFactorAuthPresent": "true"
},
"NumericLessThan": {
"aws:MultiFactorAuthAge": "3600" # MFA must be within 1 hour
}
}
}
]
}
try:
# Create the role
role_response = iam.create_role(
RoleName='SecureCrossAccount-Role',
AssumeRolePolicyDocument=json.dumps(trust_policy),
Description='Secure cross-account role with advanced conditions',
MaxSessionDuration=3600, # Max session duration of 1 hour
Tags=[
{'Key': 'SecurityLevel', 'Value': 'High'},
{'Key': 'Purpose', 'Value': 'SecureCrossAccountAccess'}
]
)
print(f"✅ Advanced cross-account role created successfully")
# Create tiered permission policies
policies = {
'ReadOnlyPolicy': {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"s3:GetObject",
"s3:ListBucket",
"ec2:Describe*",
"rds:Describe*"
],
"Resource": "*"
}
]
},
'LimitedWritePolicy': {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"s3:PutObject"
],
"Resource": "arn:aws:s3:::shared-data/*",
"Condition": {
"StringEquals": {
"s3:x-amz-server-side-encryption": "AES256"
}
}
}
]
}
}
# Create and attach policies
for policy_name, policy_doc in policies.items():
policy_response = iam.create_policy(
PolicyName=f'SecureCrossAccount-{policy_name}',
PolicyDocument=json.dumps(policy_doc),
Description=f'Secure cross-account {policy_name.lower()}'
)
iam.attach_role_policy(
RoleName='SecureCrossAccount-Role',
PolicyArn=policy_response['Policy']['Arn']
)
print(f"✅ Policy {policy_name} attached")
return role_response['Role']['Arn']
except Exception as e:
print(f"❌ Failed to create advanced cross-account role: {str(e)}")
return None
# Function to monitor cross-account role usage
def monitor_cross_account_role_usage():
"""
Monitor the usage of cross-account roles
"""
cloudtrail = boto3.client('cloudtrail')
try:
# Query AssumeRole events
response = cloudtrail.lookup_events(
LookupAttributes=[
{
'AttributeKey': 'EventName',
'AttributeValue': 'AssumeRole'
}
],
StartTime=datetime.now() - timedelta(days=7), # Last 7 days
EndTime=datetime.now()
)
print("🔍 Cross-account role usage monitoring report")
print("=" * 50)
assume_role_events = []
for event in response['Events']:
if 'AssumeRole' in event['EventName']:
assume_role_events.append({
'time': event['EventTime'],
'user': event.get('Username', 'Unknown'),
'source_ip': event.get('SourceIPAddress', 'Unknown'),
'resources': event.get('Resources', [])
})
print(f"📊 Total AssumeRole events: {len(assume_role_events)}")
# Group statistics by user
user_stats = {}
for event in assume_role_events:
user = event['user']
if user not in user_stats:
user_stats[user] = {'count': 0, 'last_access': None}
user_stats[user]['count'] += 1
if not user_stats[user]['last_access'] or event['time'] > user_stats[user]['last_access']:
user_stats[user]['last_access'] = event['time']
print("\n👤 User usage statistics:")
for user, stats in user_stats.items():
print(f" {user}: {stats['count']} times, last access: {stats['last_access']}")
return assume_role_events
except Exception as e:
print(f"❌ Monitoring query failed: {str(e)}")
return []
# Execute advanced configuration
advanced_role_arn = create_advanced_cross_account_role()
usage_events = monitor_cross_account_role_usage()
3.4 Federated Identity Roles
3.4.1 SAML 2.0 Federation
def create_saml_identity_provider(provider_name, metadata_document):
"""
Create a SAML 2.0 identity provider
Args:
provider_name: The identity provider name
metadata_document: The SAML metadata document
"""
iam = boto3.client('iam')
try:
# Create the SAML identity provider
response = iam.create_saml_provider(
SAMLMetadataDocument=metadata_document,
Name=provider_name,
Tags=[
{'Key': 'Type', 'Value': 'SAML'},
{'Key': 'Purpose', 'Value': 'Federation'}
]
)
provider_arn = response['SAMLProviderArn']
print(f"✅ SAML identity provider created successfully: {provider_arn}")
return provider_arn
except Exception as e:
print(f"❌ Failed to create SAML identity provider: {str(e)}")
return None
def create_saml_federated_role(provider_arn, role_name):
"""
Create a SAML federated identity role
Args:
provider_arn: The SAML identity provider ARN
role_name: The role name
"""
iam = boto3.client('iam')
# SAML federated identity trust policy
trust_policy = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Federated": provider_arn
},
"Action": "sts:AssumeRoleWithSAML",
"Condition": {
"StringEquals": {
"SAML:aud": "https://signin.aws.amazon.com/saml"
}
}
}
]
}
try:
# Create the federated identity role
role_response = iam.create_role(
RoleName=role_name,
AssumeRolePolicyDocument=json.dumps(trust_policy),
Description='SAML federated role for enterprise users'
)
role_arn = role_response['Role']['Arn']
print(f"✅ SAML federated identity role created successfully: {role_arn}")
# Attach a permission policy
saml_policy = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"ec2:Describe*",
"s3:ListBucket",
"s3:GetObject"
],
"Resource": "*"
},
{
"Effect": "Allow",
"Action": [
"iam:GetUser",
"iam:ListMFADevices"
],
"Resource": "arn:aws:iam::*:user/${saml:uid}"
}
]
}
# Create and attach the policy
policy_response = iam.create_policy(
PolicyName=f'{role_name}-Policy',
PolicyDocument=json.dumps(saml_policy),
Description=f'Policy for SAML federated role {role_name}'
)
iam.attach_role_policy(
RoleName=role_name,
PolicyArn=policy_response['Policy']['Arn']
)
print(f"✅ SAML federated identity policy attached")
# Provide integration instructions
integration_guide = f"""
📝 SAML Integration Configuration Guide:
1. Identity Provider ARN: {provider_arn}
2. Role ARN: {role_arn}
Configure the following attribute mapping in your IdP:
- https://aws.amazon.com/SAML/Attributes/Role: {role_arn},{provider_arn}
- https://aws.amazon.com/SAML/Attributes/RoleSessionName: {{user.email}}
User login flow:
1. The user authenticates with the IdP
2. The IdP generates a SAML assertion
3. The user is redirected to the AWS SAML endpoint
4. AWS validates the assertion and provides temporary credentials
"""
print(integration_guide)
return role_arn
except Exception as e:
print(f"❌ Failed to create SAML federated identity role: {str(e)}")
return None
# Example SAML metadata document (simplified)
saml_metadata = '''<?xml version="1.0" encoding="UTF-8"?>
<md:EntityDescriptor xmlns:md="urn:oasis:names:tc:SAML:2.0:metadata"
entityID="https://example.com/saml">
<md:IDPSSODescriptor WantAuthnRequestsSigned="false"
protocolSupportEnumeration="urn:oasis:names:tc:SAML:2.0:protocol">
<md:KeyDescriptor use="signing">
<ds:KeyInfo xmlns:ds="http://www.w3.org/2000/09/xmldsig#">
<ds:X509Data>
<ds:X509Certificate>
<!-- Certificate content -->
</ds:X509Certificate>
</ds:X509Data>
</ds:KeyInfo>
</md:KeyDescriptor>
<md:SingleSignOnService Binding="urn:oasis:names:tc:SAML:2.0:bindings:HTTP-POST"
Location="https://example.com/sso"/>
</md:IDPSSODescriptor>
</md:EntityDescriptor>'''
# Create SAML federation
# provider_arn = create_saml_identity_provider("CompanySAML", saml_metadata)
# saml_role = create_saml_federated_role(provider_arn, "SAML-FederatedUser-Role")
3.4.2 OpenID Connect (OIDC) Federation
def create_oidc_identity_provider(provider_url, client_ids, thumbprints):
"""
Create an OpenID Connect identity provider
Args:
provider_url: The OIDC provider URL
client_ids: A list of client IDs
thumbprints: A list of certificate thumbprints
"""
iam = boto3.client('iam')
try:
# Create the OIDC identity provider
response = iam.create_open_id_connect_provider(
Url=provider_url,
ClientIDList=client_ids,
ThumbprintList=thumbprints,
Tags=[
{'Key': 'Type', 'Value': 'OIDC'},
{'Key': 'Purpose', 'Value': 'WebIdentityFederation'}
]
)
provider_arn = response['OpenIDConnectProviderArn']
print(f"✅ OIDC identity provider created successfully: {provider_arn}")
return provider_arn
except Exception as e:
print(f"❌ Failed to create OIDC identity provider: {str(e)}")
return None
def create_oidc_federated_role(provider_arn, role_name, audience):
"""
Create an OIDC federated identity role
Args:
provider_arn: The OIDC identity provider ARN
role_name: The role name
audience: The audience (usually the client ID)
"""
iam = boto3.client('iam')
# OIDC federated identity trust policy
trust_policy = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Federated": provider_arn
},
"Action": "sts:AssumeRoleWithWebIdentity",
"Condition": {
"StringEquals": {
f"{provider_arn.split('/')[-1]}:aud": audience
},
"StringLike": {
f"{provider_arn.split('/')[-1]}:sub": "*"
}
}
}
]
}
try:
# Create the federated identity role
role_response = iam.create_role(
RoleName=role_name,
AssumeRolePolicyDocument=json.dumps(trust_policy),
Description='OIDC federated role for web identity federation'
)
role_arn = role_response['Role']['Arn']
print(f"✅ OIDC federated identity role created successfully: {role_arn}")
# Attach a permission policy
oidc_policy = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"s3:GetObject",
"s3:PutObject"
],
"Resource": "arn:aws:s3:::user-data-bucket/${www.example.com:sub}/*"
},
{
"Effect": "Allow",
"Action": [
"dynamodb:GetItem",
"dynamodb:PutItem",
"dynamodb:UpdateItem"
],
"Resource": "arn:aws:dynamodb:*:*:table/UserProfiles",
"Condition": {
"ForAllValues:StringEquals": {
"dynamodb:Attributes": [
"UserId",
"ProfileData"
]
},
"StringEquals": {
"dynamodb:LeadingKeys": ["${www.example.com:sub}"]
}
}
}
]
}
# Create and attach the policy
policy_response = iam.create_policy(
PolicyName=f'{role_name}-Policy',
PolicyDocument=json.dumps(oidc_policy),
Description=f'Policy for OIDC federated role {role_name}'
)
iam.attach_role_policy(
RoleName=role_name,
PolicyArn=policy_response['Policy']['Arn']
)
print(f"✅ OIDC federated identity policy attached")
return role_arn
except Exception as e:
print(f"❌ Failed to create OIDC federated identity role: {str(e)}")
return None
# Example of setting up OIDC federation for GitHub Actions
def setup_github_actions_oidc():
"""
Set up OIDC federation for GitHub Actions
"""
# GitHub's OIDC provider information
github_provider_url = "https://token.actions.githubusercontent.com"
github_thumbprints = ["6938fd4d98bab03faadb97b34396831e3780aea1"] # GitHub's certificate thumbprint
# Create the GitHub OIDC provider
provider_arn = create_oidc_identity_provider(
provider_url=github_provider_url,
client_ids=["sts.amazonaws.com"],
thumbprints=github_thumbprints
)
if provider_arn:
# Create the GitHub Actions role
github_role = create_oidc_federated_role(
provider_arn=provider_arn,
role_name="GitHubActions-DeploymentRole",
audience="sts.amazonaws.com"
)
# Provide an example GitHub Actions workflow configuration
workflow_example = f"""
# Example GitHub Actions workflow configuration
name: Deploy to AWS
on:
push:
branches: [main]
permissions:
id-token: write
contents: read
jobs:
deploy:
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v3
- name: Configure AWS credentials
uses: aws-actions/configure-aws-credentials@v2
with:
role-to-assume: {github_role}
role-session-name: GitHubActions-Deploy
aws-region: us-east-1
- name: Deploy to S3
run: |
aws s3 sync ./dist s3://my-deployment-bucket
"""
print("📝 Example GitHub Actions workflow configuration:")
print(workflow_example)
# Execute the GitHub Actions OIDC setup
setup_github_actions_oidc()
3.5 Role Sessions and Temporary Credentials
3.5.1 Managing Role Sessions
def create_role_session_manager():
"""
Role session manager
"""
class RoleSessionManager:
def __init__(self):
self.sts = boto3.client('sts')
self.active_sessions = {}
def assume_role_with_session_policy(self, role_arn, session_name,
session_policy=None, duration=3600):
"""
Assume a role with a session policy
Args:
role_arn: The role ARN
session_name: The session name
session_policy: A session policy (optional, to further restrict permissions)
duration: The session duration
"""
try:
assume_role_params = {
'RoleArn': role_arn,
'RoleSessionName': session_name,
'DurationSeconds': duration
}
# Add the session policy to the parameters if provided
if session_policy:
assume_role_params['Policy'] = json.dumps(session_policy)
response = self.sts.assume_role(**assume_role_params)
credentials = response['Credentials']
session_info = {
'credentials': credentials,
'assumed_role_user': response['AssumedRoleUser'],
'session_name': session_name,
'role_arn': role_arn,
'expires_at': credentials['Expiration']
}
# Store the session information
self.active_sessions[session_name] = session_info
print(f"✅ Role session created successfully: {session_name}")
print(f" Expiration time: {credentials['Expiration']}")
return session_info
except Exception as e:
print(f"❌ Failed to create role session: {str(e)}")
return None
def get_session_credentials(self, session_name):
"""
Get session credentials
"""
if session_name in self.active_sessions:
session = self.active_sessions[session_name]
# Check if the credentials have expired
if datetime.now(session['expires_at'].tzinfo) < session['expires_at']:
return session['credentials']
else:
print(f"⚠️ Session {session_name} has expired")
del self.active_sessions[session_name]
return None
else:
print(f"❌ Session {session_name} does not exist")
return None
def create_boto3_session(self, session_name):
"""
Create a boto3 session using role credentials
"""
credentials = self.get_session_credentials(session_name)
if credentials:
return boto3.Session(
aws_access_key_id=credentials['AccessKeyId'],
aws_secret_access_key=credentials['SecretAccessKey'],
aws_session_token=credentials['SessionToken']
)
return None
def refresh_session(self, session_name):
"""
Refresh session credentials
"""
if session_name in self.active_sessions:
session_info = self.active_sessions[session_name]
return self.assume_role_with_session_policy(
session_info['role_arn'],
session_name
)
return None
def list_active_sessions(self):
"""
List active sessions
"""
print("📋 Active role sessions:")
for session_name, session_info in self.active_sessions.items():
expires_at = session_info['expires_at']
remaining = expires_at - datetime.now(expires_at.tzinfo)
print(f" {session_name}:")
print(f" Role: {session_info['role_arn']}")
print(f" Expiration time: {expires_at}")
print(f" Time remaining: {remaining}")
def cleanup_expired_sessions(self):
"""
Clean up expired sessions
"""
expired_sessions = []
current_time = datetime.now()
for session_name, session_info in self.active_sessions.items():
expires_at = session_info['expires_at']
if current_time.replace(tzinfo=expires_at.tzinfo) >= expires_at:
expired_sessions.append(session_name)
for session_name in expired_sessions:
del self.active_sessions[session_name]
print(f"🧹 Cleaned up expired session: {session_name}")
return len(expired_sessions)
return RoleSessionManager()
# Example usage
session_manager = create_role_session_manager()
# Create a restricted session policy
restricted_session_policy = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"s3:GetObject"
],
"Resource": "arn:aws:s3:::specific-bucket/*"
},
{
"Effect": "Deny",
"Action": [
"s3:DeleteObject"
],
"Resource": "*"
}
]
}
# Assume a role with a session policy
session_info = session_manager.assume_role_with_session_policy(
role_arn="arn:aws:iam::123456789012:role/MyRole",
session_name="RestrictedSession",
session_policy=restricted_session_policy,
duration=1800 # 30 minutes
)
# Create a client using the temporary credentials
if session_info:
temp_session = session_manager.create_boto3_session("RestrictedSession")
s3_client = temp_session.client('s3')
# Access S3 with restricted permissions
try:
objects = s3_client.list_objects_v2(Bucket='specific-bucket')
print(f"✅ Can list objects, {objects.get('KeyCount', 0)} total")
except Exception as e:
print(f"❌ Access failed: {str(e)}")
# List active sessions
session_manager.list_active_sessions()
3.6 Role Best Practices and Security Considerations
3.6.1 Role Security Best Practices
Security Principles
- Principle of Least Privilege: Grant only the minimum permissions required to complete a task
- Temporary Credentials: Prefer using roles over long-term access keys
- External ID: Use an external ID for cross-account access to prevent the confused deputy problem
- Condition Constraints: Use IAM conditions to restrict the use cases for roles
- Session Duration: Set an appropriate maximum session duration
3.6.2 Role Permission Auditing
def audit_role_permissions():
"""
Audit IAM role permissions
"""
iam = boto3.client('iam')
try:
# Get all roles
paginator = iam.get_paginator('list_roles')
audit_report = {
'high_risk_roles': [],
'cross_account_roles': [],
'service_roles': [],
'unused_roles': [],
'roles_without_mfa': []
}
for page in paginator.paginate():
for role in page['Roles']:
role_name = role['RoleName']
print(f"🔍 Auditing role: {role_name}")
# Get the role's trust policy
trust_policy = json.loads(role['AssumeRolePolicyDocument'])
# Analyze the trust policy
for statement in trust_policy.get('Statement', []):
principal = statement.get('Principal', {})
# Check for cross-account roles
if isinstance(principal.get('AWS'), str):
if 'arn:aws:iam::' in principal['AWS'] and ':root' in principal['AWS']:
audit_report['cross_account_roles'].append({
'role_name': role_name,
'trusted_account': principal['AWS'],
'conditions': statement.get('Condition', {})
})
# Check for service roles
if 'Service' in principal:
audit_report['service_roles'].append({
'role_name': role_name,
'service': principal['Service']
})
# Check for MFA requirements
conditions = statement.get('Condition', {})
has_mfa_condition = False
for condition_operator, condition_block in conditions.items():
if 'aws:MultiFactorAuthPresent' in str(condition_block):
has_mfa_condition = True
break
if not has_mfa_condition and 'AWS' in principal:
audit_report['roles_without_mfa'].append(role_name)
# Get the role's attached policies
attached_policies = iam.list_attached_role_policies(RoleName=role_name)
inline_policies = iam.list_role_policies(RoleName=role_name)
# Check for high-risk permissions
high_risk_actions = [
'iam:*',
'*:*',
'sts:AssumeRole',
's3:DeleteBucket',
'ec2:TerminateInstances',
'rds:DeleteDBInstance'
]
has_high_risk = False
all_policies = []
# Check managed policies
for policy in attached_policies['AttachedPolicies']:
policy_version = iam.get_policy_version(
PolicyArn=policy['PolicyArn'],
VersionId=iam.get_policy(PolicyArn=policy['PolicyArn'])['Policy']['DefaultVersionId']
)
all_policies.append(policy_version['PolicyVersion']['Document'])
# Check inline policies
for policy_name in inline_policies['PolicyNames']:
policy_doc = iam.get_role_policy(RoleName=role_name, PolicyName=policy_name)
all_policies.append(policy_doc['PolicyDocument'])
# Analyze policy content
for policy_doc in all_policies:
for statement in policy_doc.get('Statement', []):
actions = statement.get('Action', [])
if isinstance(actions, str):
actions = [actions]
for action in actions:
if any(risk_action in action for risk_action in high_risk_actions):
has_high_risk = True
break
if has_high_risk:
break
if has_high_risk:
break
if has_high_risk:
audit_report['high_risk_roles'].append({
'role_name': role_name,
'last_used': role.get('RoleLastUsed', {}).get('LastUsedDate'),
'attached_policies': len(attached_policies['AttachedPolicies']),
'inline_policies': len(inline_policies['PolicyNames'])
})
# Check for unused roles (not used in the last 30 days)
last_used = role.get('RoleLastUsed', {}).get('LastUsedDate')
if not last_used or (datetime.now().replace(tzinfo=None) - last_used.replace(tzinfo=None)).days > 30:
audit_report['unused_roles'].append({
'role_name': role_name,
'last_used': last_used,
'created_date': role['CreateDate']
})
# Generate the audit report
print("\n📊 IAM Role Security Audit Report")
print("=" * 50)
print(f"\n⚠️ High-risk roles ({len(audit_report['high_risk_roles'])}):")
for role_info in audit_report['high_risk_roles']:
print(f" - {role_info['role_name']} (Last used: {role_info['last_used']})")
print(f"\n🔗 Cross-account roles ({len(audit_report['cross_account_roles'])}):")
for role_info in audit_report['cross_account_roles']:
print(f" - {role_info['role_name']} -> {role_info['trusted_account']}")
print(f"\n🤖 Service roles ({len(audit_report['service_roles'])}):")
for role_info in audit_report['service_roles']:
print(f" - {role_info['role_name']} ({role_info['service']})")
print(f"\n🔒 Roles without MFA requirement ({len(audit_report['roles_without_mfa'])}):")
for role_name in audit_report['roles_without_mfa']:
print(f" - {role_name}")
print(f"\n💤 Unused roles ({len(audit_report['unused_roles'])}):")
for role_info in audit_report['unused_roles']:
print(f" - {role_info['role_name']} (Created on: {role_info['created_date']})")
return audit_report
except Exception as e:
print(f"❌ Role audit failed: {str(e)}")
return None
# Execute the role permission audit
audit_results = audit_role_permissions()
3.6.3 Role Usage Monitoring and Alerting
def setup_role_monitoring():
"""
Set up role usage monitoring and alerting
"""
cloudwatch = boto3.client('cloudwatch')
sns = boto3.client('sns')
try:
# Create an SNS topic for alerts
topic_response = sns.create_topic(
Name='IAM-Role-Security-Alerts'
)
topic_arn = topic_response['TopicArn']
# Create a CloudWatch alarm to monitor for unusual role assumptions
cloudwatch.put_metric_alarm(
AlarmName='IAM-UnusualRoleAssumption',
ComparisonOperator='GreaterThanThreshold',
EvaluationPeriods=1,
MetricName='AssumeRole',
Namespace='AWS/CloudTrail',
Period=300, # 5 minutes
Statistic='Sum',
Threshold=10.0, # More than 10 AssumeRole calls in 5 minutes
ActionsEnabled=True,
AlarmActions=[topic_arn],
AlarmDescription='Alert when unusual role assumption activity is detected',
Dimensions=[
{
'Name': 'EventName',
'Value': 'AssumeRole'
}
],
Unit='Count'
)
print("✅ Role usage monitoring alarm set up")
# Create a custom metric monitoring script
monitoring_script = '''
# Role usage monitoring script
import boto3
import json
from datetime import datetime, timedelta
def monitor_role_usage():
cloudtrail = boto3.client('cloudtrail')
cloudwatch = boto3.client('cloudwatch')
# Query AssumeRole events for the last hour
end_time = datetime.now()
start_time = end_time - timedelta(hours=1)
events = cloudtrail.lookup_events(
LookupAttributes=[
{
'AttributeKey': 'EventName',
'AttributeValue': 'AssumeRole'
}
],
StartTime=start_time,
EndTime=end_time
)
# Tally various unusual patterns
unusual_patterns = {
'failed_attempts': 0,
'unusual_source_ips': set(),
'high_frequency_users': {},
'cross_account_assumptions': 0
}
for event in events['Events']:
# Check for failed AssumeRole attempts
if event.get('ErrorCode'):
unusual_patterns['failed_attempts'] += 1
# Check for unusual source IPs
source_ip = event.get('SourceIPAddress', '')
if source_ip and not source_ip.startswith('AWS Internal'):
unusual_patterns['unusual_source_ips'].add(source_ip)
# Check for high-frequency users
user = event.get('Username', 'Unknown')
unusual_patterns['high_frequency_users'][user] = unusual_patterns['high_frequency_users'].get(user, 0) + 1
# Check for cross-account role assumptions
if 'AssumeRole' in event['EventName']:
for resource in event.get('Resources', []):
if resource.get('ResourceType') == 'AWS::IAM::Role':
resource_arn = resource.get('ResourceName', '')
if '::' in resource_arn:
account_id = resource_arn.split(':')[4]
# This would require getting the current account ID for comparison
# if account_id != current_account_id:
# unusual_patterns['cross_account_assumptions'] += 1
# Send custom metrics to CloudWatch
metrics = [
{
'MetricName': 'FailedAssumeRoleAttempts',
'Value': unusual_patterns['failed_attempts'],
'Unit': 'Count'
},
{
'MetricName': 'UnusualSourceIPs',
'Value': len(unusual_patterns['unusual_source_ips']),
'Unit': 'Count'
}
]
for metric in metrics:
cloudwatch.put_metric_data(
Namespace='IAM/Security',
MetricData=[
{
'MetricName': metric['MetricName'],
'Value': metric['Value'],
'Unit': metric['Unit'],
'Timestamp': datetime.now()
}
]
)
print(f"Monitoring data sent to CloudWatch")
return unusual_patterns
# Execute monitoring periodically
if __name__ == "__main__":
monitor_role_usage()
'''
print("📝 Role monitoring script:")
print(monitoring_script)
return topic_arn
except Exception as e:
print(f"❌ Failed to set up role monitoring: {str(e)}")
return None
# Set up role monitoring
monitoring_topic = setup_role_monitoring()
Summary
This chapter provided an in-depth look at various aspects of AWS IAM roles:
- Role Basics: Understanding the differences between roles and users and how they work
- Service Roles: Creating and configuring EC2 instance roles and Lambda execution roles
- Cross-Account Access: Securely setting up cross-account roles and advanced condition configurations
- Federated Identity: Implementing SAML 2.0 and OIDC federation
- Session Management: Using temporary credentials and session policies
- Security Best Practices: Auditing role permissions and setting up monitoring and alerting
By completing this chapter, you should be able to:
- Skillfully create and manage various types of IAM roles
- Implement secure cross-account access
- Configure federated identity integration
- Manage role sessions and temporary credentials
- Audit and monitor role usage
In the next chapter, we will learn about the detailed syntax and advanced features of IAM policies.