-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathEc2.py
More file actions
123 lines (109 loc) · 5.27 KB
/
Ec2.py
File metadata and controls
123 lines (109 loc) · 5.27 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
import boto3
import argparse
import json
def check_perms(access_key_id, secret_access_key, region, session_token=None):
"""
Check IAM and EC2 permissions for the caller.
"""
# Create STS client with optional session token
sts_kwargs = {
'aws_access_key_id': access_key_id,
'aws_secret_access_key': secret_access_key,
'region_name': region
}
if session_token:
sts_kwargs['aws_session_token'] = session_token
sts = boto3.client('sts', **sts_kwargs)
# Get caller identity
try:
response = sts.get_caller_identity()
except sts.exceptions.ClientError as e:
print(f"Error: {e.response['Error']['Message']}")
return
caller_arn = response['Arn']
principal_type = caller_arn.split(":")[5].split("/")[-2]
principal_name = caller_arn.split("/")[-1]
# List policies attached to the user or role
iam = boto3.client('iam', **sts_kwargs)
attached_policies = []
inline_policies = []
if principal_type == 'user':
attached_policies = iam.list_attached_user_policies(UserName=principal_name)
inline_policies = iam.list_user_policies(UserName=principal_name)
elif principal_type == 'role':
attached_policies = iam.list_attached_role_policies(RoleName=principal_name)
inline_policies = iam.list_role_policies(RoleName=principal_name)
# Required permissions to check
required_permissions = [
'iam:ListRoles',
'iam:PassRole',
'iam:ListInstanceProfiles',
'iam:AddRoleToInstanceProfile',
'iam:RemoveRoleFromInstanceProfile',
'ec2:AssociateIamInstanceProfile',
'ec2:DescribeIamInstanceProfileAssociations',
'ec2:RunInstances',
'ec2:CreateKeyPair',
'ec2:DescribeInstances',
'ec2:DescribeVpcs',
'ec2:DescribeSubnets',
'ec2:DescribeSecurityGroups'
]
# Function to check permissions in a policy document
def check_policy_document(policy_document, required_permissions):
if isinstance(policy_document, str):
# Converts it into dictionary to easily traverse through the policy.
policy_document = json.loads(policy_document)
matched_permissions = []
for statement in policy_document.get('Statement', []):
if isinstance(statement, dict) and statement.get('Effect') == 'Allow':
actions = statement.get('Action', [])
if isinstance(actions, str):
actions = [actions]
# Checks each action in the actions list against the required_permissions list. If matches, then add it to matched_permissions.
matched_permissions.extend([action for action in actions if action in required_permissions])
return matched_permissions
# Get policy versions and check permissions
# Initializing set ds to collect unique elements
found_permissions = set()
for policy in attached_policies.get('AttachedPolicies', []):
policy_arn = policy['PolicyArn']
policy_name = policy['PolicyName']
policy_versions = iam.list_policy_versions(PolicyArn=policy_arn)
for version in policy_versions['Versions']:
version_id = version['VersionId']
try:
policy_version_details = iam.get_policy_version(PolicyArn=policy_arn, VersionId=version_id)
policy_document = policy_version_details['PolicyVersion']['Document']
matched_permissions = check_policy_document(policy_document, required_permissions)
if matched_permissions:
found_permissions.update(matched_permissions)
except iam.exceptions.NoSuchEntityException:
continue
# Check inline policies
for policy_name in inline_policies.get('PolicyNames', []):
try:
if principal_type == 'user':
policy_document = iam.get_user_policy(UserName=principal_name, PolicyName=policy_name)['PolicyDocument']
elif principal_type == 'role':
policy_document = iam.get_role_policy(RoleName=principal_name, PolicyName=policy_name)['PolicyDocument']
matched_permissions = check_policy_document(policy_document, required_permissions)
if matched_permissions:
found_permissions.update(matched_permissions)
except iam.exceptions.NoSuchEntityException:
continue
if found_permissions:
print(f"{principal_type} {principal_name} has the following permissions which can lead to potential attack pathways:")
for permission in found_permissions:
print(f" - {permission}")
else:
print(f"{principal_type} {principal_name} does not have any of the specified permissions.")
if __name__ == '__main__':
# Parse input through command line args
parser = argparse.ArgumentParser(description='Check IAM and EC2 permissions for the caller')
parser.add_argument('--access-key-id', required=True)
parser.add_argument('--secret-access-key', required=True)
parser.add_argument('--region', required=True)
parser.add_argument('--session-token', help='AWS session token (optional)', default=None)
args = parser.parse_args()
check_perms(args.access_key_id, args.secret_access_key, args.region, args.session_token)