-
Notifications
You must be signed in to change notification settings - Fork 14
/
Copy pathAzurePEASS.py
276 lines (224 loc) · 13.6 KB
/
AzurePEASS.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
import argparse
import requests
import jwt
import time
import os
from concurrent.futures import ThreadPoolExecutor
from tqdm import tqdm
from colorama import Fore, Style, init, Back
init(autoreset=True)
from src.CloudPEASS.cloudpeass import CloudPEASS
from src.sensitive_permissions.azure import very_sensitive_combinations, sensitive_combinations
from src.azure.entraid import EntraIDPEASS
AZURE_MALICIOUS_RESPONSE_EXAMPLE = """[
{
"Title": "Privilege Escalationto arbitrary Managed Identities ",
"Description": " Using the permissions Microsoft.Compute/virtualMachines/write and Microsoft.ManagedIdentity/userAssignedIdentities/assign/action among other it's possible to escalate privileges to arbitrary Managed Identities by creating a VM, assigning Managed Identities and then get tokens from the assigned Managed Identities from the metadata.",
"Commands": "az vm create \\
--resource-group Resource_Group_1 \\
--name cli_vm \\
--image Ubuntu2204 \\
--admin-username azureuser \\
--generate-ssh-keys \\
--assign-identity /subscriptions/<sub-id>/resourcegroups/<res-group>/providers/Microsoft.ManagedIdentity/userAssignedIdentities/<mi-name> \\
--nsg-rule ssh \\
--location centralus"
},
[...]
]"""
AZURE_SENSITIVE_RESPONSE_EXAMPLE = """[
{
"permission": "Microsoft.Web/sites/host/listkeys/action",
"is_very_sensitive": true,
"is_sensitive": false,
"description": "This permission allows to list the keys of a web app, which can be used to access sensitive information and modify the code and escalate privleges to the managed identity."
},
[...]
]"""
class AzurePEASS(CloudPEASS):
def __init__(self, arm_token, graph_token, very_sensitive_combos, sensitive_combos, not_use_ht_ai, num_threads, out_path=None):
self.arm_token= arm_token
self.graph_token = graph_token
self.EntraIDPEASS = EntraIDPEASS(graph_token, num_threads)
super().__init__(very_sensitive_combos, sensitive_combos, "Azure", not_use_ht_ai, num_threads, AZURE_MALICIOUS_RESPONSE_EXAMPLE, AZURE_SENSITIVE_RESPONSE_EXAMPLE, out_path)
if not self.arm_token and not self.graph_token:
print(f"{Fore.RED}At lest an ARM token or Graph token is needed. Exiting.")
exit(1)
if not self.arm_token:
print(f"{Fore.RED}ARM token not provided. Skipping Azure permissions analysis")
if not self.graph_token:
print(f"{Fore.RED}Graph token not provided. Skipping EntraID permissions analysis")
if self.arm_token:
self.check_jwt_token(self.arm_token, ["https://management.azure.com/", "https://management.core.windows.net/"])
if self.graph_token:
self.check_jwt_token(self.graph_token, ["https://graph.microsoft.com/", "00000003-0000-0000-c000-000000000000"])
def check_jwt_token(self, token, expected_audiences):
try:
# Decode the token without verifying the signature
decoded = jwt.decode(token, options={"verify_signature": False, "verify_aud": False})
# Check if "aud" matches
if decoded.get("aud") not in expected_audiences:
raise ValueError(f"Invalid audience. Expected '{expected_audiences}', got '{decoded.get('aud')}'")
# Check if token has expired
current_time = int(time.time() + 30) # Extra 30 secs to account for clock skew
if decoded.get("exp", 0) < current_time:
raise ValueError(f"Token {decoded.get('exp')} has expired")
return True
except jwt.DecodeError:
raise ValueError("Token is invalid or badly formatted")
def list_subscriptions(self):
url = "https://management.azure.com/subscriptions?api-version=2020-01-01"
resp = requests.get(url, headers={"Authorization": f"Bearer {self.arm_token}"})
resp.raise_for_status()
subs = [sub["subscriptionId"] for sub in resp.json().get("value", [])]
return subs
def list_resources_in_subscription(self, subscription_id):
resources = []
url = f"https://management.azure.com/subscriptions/{subscription_id}/resources?api-version=2021-04-01"
resp = requests.get(url, headers={"Authorization": f"Bearer {self.arm_token}"})
if resp.status_code != 200:
return resources
data = resp.json()
resources.extend(data.get("value", []))
while "nextLink" in data:
next_url = data["nextLink"]
resp = requests.get(next_url, headers={"Authorization": f"Bearer {self.arm_token}"})
resp.raise_for_status()
data = resp.json()
resources.extend(data.get("value", []))
return resources
def get_permissions_for_resource(self, resource_id):
perms = set()
# Retrieve active permissions
permissions_url = f"https://management.azure.com{resource_id}/providers/Microsoft.Authorization/permissions?api-version=2022-04-01"
resp = requests.get(permissions_url, headers={"Authorization": f"Bearer {self.arm_token}"})
if resp.status_code != 200:
raise Exception(f"Failed fetching permissions: {resp.text}")
perm_data = resp.json().get('value', [])
for perm_block in perm_data:
actions = set(perm_block.get("actions", []))
data_actions = set(perm_block.get("dataActions", []))
not_actions = set(perm_block.get("notActions", []))
not_data_actions = set(perm_block.get("notDataActions", []))
perms.update(actions - not_actions)
perms.update(data_actions - not_data_actions)
# Retrieve eligible roles for the resource
eligible_roles_url = f"https://management.azure.com{resource_id}/providers/Microsoft.Authorization/roleEligibilitySchedules?api-version=2020-10-01&$filter=asTarget()"
resp_eligible = requests.get(eligible_roles_url, headers={"Authorization": f"Bearer {self.arm_token}"})
if resp_eligible.status_code == 200:
eligible_roles = resp_eligible.json().get('value', [])
for eligible in eligible_roles:
role_definition_id = eligible['properties']['roleDefinitionId']
# Fetch granular permissions for each eligible role
role_def_url = f"https://management.azure.com{role_definition_id}?api-version=2022-04-01"
resp_role = requests.get(role_def_url, headers={"Authorization": f"Bearer {self.arm_token}"})
if resp_role.status_code == 200:
role_properties = resp_role.json().get("properties", {})
role_permissions = role_properties.get("permissions", [])
for perm_block in role_permissions:
actions = set(perm_block.get("actions", []))
data_actions = set(perm_block.get("dataActions", []))
not_actions = set(perm_block.get("notActions", []))
not_data_actions = set(perm_block.get("notDataActions", []))
perms.update(actions - not_actions)
perms.update(data_actions - not_data_actions)
else:
print(f"Unable to retrieve eligible roles: {resp_eligible.status_code} {resp_eligible.text} ( This is common, you need an Azure permission to list eligible roles )")
return list(perms)
def print_whoami_info(self):
"""
Prints the current principal information.
This is useful for debugging and understanding the context of the permissions being analyzed.
"""
if self.arm_token:
try:
# Get also email and groups
decoded = jwt.decode(self.arm_token, options={"verify_signature": False, "verify_aud": False})
print(f"{Fore.BLUE}Current Principal ID (ARM Token): {Fore.WHITE}{decoded.get('oid', 'Unknown')}")
print(f"{Fore.BLUE}Current Audience (ARM Token): {Fore.WHITE}{decoded.get('aud', 'Unknown')}")
if 'upn' in decoded:
print(f"{Fore.BLUE}User Principal Name (UPN) (ARM Token): {Fore.WHITE}{decoded.get('upn', 'Unknown')}")
if 'email' in decoded:
print(f"{Fore.BLUE}Email (ARM Token): {Fore.WHITE}{decoded.get('email', 'Unknown')}")
if 'groups' in decoded:
groups = decoded.get('groups', [])
print(f"{Fore.BLUE}Groups (ARM Token): {Fore.WHITE}{', '.join(groups) if groups else 'None'}")
if 'exp' in decoded:
expiration_time = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(decoded.get('exp')))
print(f"{Fore.BLUE}Token Expiration Time (ARM Token): {Fore.WHITE}{expiration_time}")
except Exception as e:
print(f"{Fore.RED}Failed to decode ARM token: {str(e)}")
if self.graph_token:
try:
# Decode the Graph token to get the current principal information
decoded = jwt.decode(self.graph_token, options={"verify_signature": False, "verify_aud": False})
print(f"{Fore.BLUE}Current Principal ID (Graph Token): {Fore.WHITE}{decoded.get('oid', 'Unknown')}")
print(f"{Fore.BLUE}Current Audience (Graph Token): {Fore.WHITE}{decoded.get('aud', 'Unknown')}")
if 'upn' in decoded:
print(f"{Fore.BLUE}User Principal Name (UPN) (Graph Token): {Fore.WHITE}{decoded.get('upn', 'Unknown')}")
if 'email' in decoded:
print(f"{Fore.BLUE}Email (Graph Token): {Fore.WHITE}{decoded.get('email', 'Unknown')}")
if 'groups' in decoded:
groups = decoded.get('groups', [])
print(f"{Fore.BLUE}Groups (Graph Token): {Fore.WHITE}{', '.join(groups) if groups else 'None'}")
if 'exp' in decoded:
expiration_time = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(decoded.get('exp')))
print(f"{Fore.BLUE}Token Expiration Time (Graph Token): {Fore.WHITE}{expiration_time}")
except Exception as e:
print(f"{Fore.RED}Failed to decode Graph token: {str(e)}")
def get_resources_and_permissions(self):
resources_data = []
if self.arm_token:
subs = self.list_subscriptions()
def process_subscription(sub_id):
sub_resources = []
raw_resources = self.list_resources_in_subscription(sub_id)
perms = self.get_permissions_for_resource(f"/subscriptions/{sub_id}")
if perms:
sub_resources.append({
"id": f"/subscriptions/{sub_id}",
"name": sub_id,
"type": "subscription",
"permissions": perms
})
for res in raw_resources:
res_id = res.get("id")
res_name = res.get("name")
res_type = res.get("type")
perms = self.get_permissions_for_resource(res_id)
sub_resources.append({
"id": res_id,
"name": res_name,
"type": res_type,
"permissions": perms
})
return sub_resources
with ThreadPoolExecutor(max_workers=self.num_threads) as executor:
results = list(tqdm(executor.map(process_subscription, subs), total=len(subs), desc="Processing Subscriptions"))
for sub_result in results:
resources_data.extend(sub_result)
if self.graph_token:
print(f"{Fore.MAGENTA}Getting Permissions from EntraID...")
# If None, then it's a MI token without access to get its Entra ID permissions (probably it doesn't have them)
## Important: Keep this Entra ID check first
memberships = self.EntraIDPEASS.get_entraid_memberships()
if memberships is None:
return resources_data
resources_data += memberships
resources_data += self.EntraIDPEASS.get_eligible_roles()
resources_data += self.EntraIDPEASS.get_entraid_owns()
return resources_data
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Run AzurePEASS to find all your current privileges in Azure and EntraID and check for potential privilege escalation attacks.\nTo check for Azure permissions an ARM token is neded.\nTo check for Entra ID permissions a Graph token is needed.")
parser.add_argument('--arm-token', help="Azure Management authentication token")
parser.add_argument('--graph-token', help="Azure Graph authentication token")
parser.add_argument('--out-json-path', default=None, help="Output JSON file path (e.g. /tmp/azure_results.json)")
parser.add_argument('--threads', default=5, type=int, help="Number of threads to use")
parser.add_argument('--not-use-hacktricks-ai', action="store_false", default=False, help="Don't use Hacktricks AI to analyze permissions")
args = parser.parse_args()
arm_token = args.arm_token
arm_token = os.getenv("AZURE_ARM_TOKEN", arm_token)
graph_token = args.graph_token
graph_token = os.getenv("AZURE_GRAPH_TOKEN", graph_token)
azure_peass = AzurePEASS(arm_token, graph_token, very_sensitive_combinations, sensitive_combinations, not_use_ht_ai=args.not_use_hacktricks_ai, num_threads=args.threads, out_path=args.out_json_path)
azure_peass.run_analysis()