Skip to content

Commit d2bc55f

Browse files
committed
create ResourcePolicyDocument with internet reachable check
1 parent 8acac6a commit d2bc55f

File tree

3 files changed

+421
-59
lines changed

3 files changed

+421
-59
lines changed

cloudsplaining/scan/assume_role_policy_document.py

+15-59
Original file line numberDiff line numberDiff line change
@@ -4,35 +4,38 @@
44
# Licensed under the BSD 3-Clause license.
55
# For full license text, see the LICENSE file in the repo root
66
# or https://opensource.org/licenses/BSD-3-Clause
7+
from __future__ import annotations
8+
79
import logging
810
from typing import Dict, Any, List
911

12+
from cloudsplaining.scan.resource_policy_document import (
13+
ResourcePolicyDocument,
14+
ResourceStatement,
15+
)
1016
from cloudsplaining.shared.constants import SERVICE_PREFIXES_WITH_COMPUTE_ROLES
1117

1218
logger = logging.getLogger(__name__)
1319

1420

15-
class AssumeRolePolicyDocument:
16-
"""
17-
Holds the AssumeRole/Trust Policy document
21+
class AssumeRolePolicyDocument(ResourcePolicyDocument):
22+
"""Holds the AssumeRole/Trust Policy document
23+
24+
It is a specialized version of a Resource-based policy
1825
"""
1926

20-
def __init__(self, policy: Dict[str, Any]) -> None:
27+
def __init__(self, policy: dict[str, Any]) -> None:
2128
statement_structure = policy.get("Statement", [])
2229
self.policy = policy
23-
self.statements = []
30+
# We would actually need to define a proper base class with a generic type for statements
31+
self.statements: list[AssumeRoleStatement] = [] # type:ignore[assignment]
2432
# leaving here but excluding from tests because IAM Policy grammar dictates that it must be a list
2533
if not isinstance(statement_structure, list): # pragma: no cover
2634
statement_structure = [statement_structure]
2735

2836
for statement in statement_structure:
2937
self.statements.append(AssumeRoleStatement(statement))
3038

31-
@property
32-
def json(self) -> Dict[str, Any]:
33-
"""Return the AssumeRole Policy in JSON"""
34-
return self.policy
35-
3639
@property
3740
def role_assumable_by_compute_services(self) -> List[str]:
3841
"""Determines whether or not the role is assumed from a compute service, and if so which ones."""
@@ -45,17 +48,13 @@ def role_assumable_by_compute_services(self) -> List[str]:
4548
return assumable_by_compute_services
4649

4750

48-
class AssumeRoleStatement:
51+
class AssumeRoleStatement(ResourceStatement):
4952
"""
5053
Statements in an AssumeRole/Trust Policy document
5154
"""
5255

5356
def __init__(self, statement: Dict[str, Any]) -> None:
54-
self.json = statement
55-
self.statement = statement
56-
self.effect = statement["Effect"]
57-
self.actions = self._assume_role_actions()
58-
self.principals = self._principals()
57+
super().__init__(statement=statement)
5958

6059
# self.not_principal = statement.get("NotPrincipal")
6160
if statement.get("NotPrincipal"):
@@ -76,49 +75,6 @@ def _assume_role_actions(self) -> List[str]:
7675

7776
return [actions]
7877

79-
def _principals(self) -> List[str]:
80-
"""Extracts all principals from IAM statement.
81-
Should handle these cases:
82-
"Principal": "value"
83-
"Principal": ["value"]
84-
"Principal": { "AWS": "value" }
85-
"Principal": { "AWS": ["value", "value"] }
86-
"Principal": { "Federated": "value" }
87-
"Principal": { "Federated": ["value", "value"] }
88-
"Principal": { "Service": "value" }
89-
"Principal": { "Service": ["value", "value"] }
90-
Return: Set of principals
91-
"""
92-
principals: List[str] = []
93-
principal = self.statement.get("Principal", None)
94-
if not principal:
95-
# It is possible not to define a principal, AWS ignores these statements.
96-
return principals # pragma: no cover
97-
98-
if isinstance(principal, dict):
99-
100-
if "AWS" in principal:
101-
if isinstance(principal["AWS"], list):
102-
principals.extend(principal["AWS"])
103-
else:
104-
principals.append(principal["AWS"])
105-
106-
if "Federated" in principal:
107-
if isinstance(principal["Federated"], list):
108-
principals.extend(principal["Federated"])
109-
else:
110-
principals.append(principal["Federated"])
111-
112-
if "Service" in principal:
113-
if isinstance(principal["Service"], list):
114-
principals.extend(principal["Service"])
115-
else:
116-
principals.append(principal["Service"])
117-
else:
118-
principals.append(principal)
119-
# principals = list(principals).sort()
120-
return principals
121-
12278
@property
12379
def role_assumable_by_compute_services(self) -> List[str]:
12480
"""Determines whether or not the role is assumed from a compute service, and if so which ones."""
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,260 @@
1+
"""Represents the Resource-based policy"""
2+
from __future__ import annotations
3+
4+
import logging
5+
import re
6+
from typing import Any
7+
8+
from policy_sentry.util.arns import ARN
9+
10+
11+
CONDITION_KEY_CATEGORIES = {
12+
"aws:sourcearn": "arn",
13+
"aws:principalarn": "arn",
14+
"aws:sourceowner": "account",
15+
"aws:sourceaccount": "account",
16+
"aws:principalaccount": "account",
17+
"aws:principalorgid": "organization",
18+
"aws:principalorgpaths": "organization",
19+
"kms:calleraccount": "account",
20+
"aws:userid": "userid",
21+
"aws:sourceip": "cidr",
22+
"aws:sourcevpc": "vpc",
23+
"aws:sourcevpce": "vpce",
24+
# a key for SAML Federation trust policy.
25+
# https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles_create_for-idp_saml.html
26+
# https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles_providers_create_saml_assertions.html
27+
"saml:aud": "saml-endpoint",
28+
}
29+
RELEVANT_CONDITION_OPERATORS_PATTERN = re.compile(
30+
"((ForAllValues|ForAnyValue):)?(ARN(Equals|Like)|String(Equals|Like)(IgnoreCase)?|IpAddress)(IfExists)?",
31+
re.IGNORECASE,
32+
)
33+
34+
logger = logging.getLogger(__name__)
35+
36+
37+
class ResourcePolicyDocument:
38+
"""Holds the Resource Policy document"""
39+
40+
def __init__(self, policy: dict[str, Any]) -> None:
41+
statement_structure = policy.get("Statement", [])
42+
self.policy = policy
43+
self.statements = []
44+
# leaving here but excluding from tests because IAM Policy grammar dictates that it must be a list
45+
if not isinstance(statement_structure, list): # pragma: no cover
46+
statement_structure = [statement_structure]
47+
48+
for statement in statement_structure:
49+
self.statements.append(ResourceStatement(statement))
50+
51+
@property
52+
def json(self) -> dict[str, Any]:
53+
"""Return the Resource Policy in JSON"""
54+
return self.policy
55+
56+
@property
57+
def internet_accessible_actions(self) -> list[str]:
58+
result = []
59+
for statement in self.statements:
60+
actions = statement.internet_accessible_actions
61+
if actions:
62+
result.extend(actions)
63+
64+
return result
65+
66+
67+
class ResourceStatement:
68+
"""Statements in a Resource Policy document"""
69+
70+
def __init__(self, statement: dict[str, Any]) -> None:
71+
self.json = statement
72+
self.statement = statement
73+
self.effect = statement["Effect"]
74+
self.actions = self._actions()
75+
self.principals = self._principals()
76+
self.conditions = self._conditions()
77+
78+
def _actions(self) -> list[str]:
79+
"""Extracts all actions"""
80+
actions = self.statement.get("Action", [])
81+
if not actions:
82+
return []
83+
84+
if isinstance(actions, list):
85+
return actions
86+
87+
return [actions]
88+
89+
def _principals(self) -> list[str]:
90+
"""Extracts all principals from IAM statement.
91+
Should handle these cases:
92+
"Principal": "value"
93+
"Principal": ["value"]
94+
"Principal": { "AWS": "value" }
95+
"Principal": { "AWS": ["value", "value"] }
96+
"Principal": { "Federated": "value" }
97+
"Principal": { "Federated": ["value", "value"] }
98+
"Principal": { "Service": "value" }
99+
"Principal": { "Service": ["value", "value"] }
100+
Return: Set of principals
101+
"""
102+
principals: list[str] = []
103+
principal = self.statement.get("Principal", None)
104+
if not principal:
105+
# It is possible not to define a principal, AWS ignores these statements.
106+
return principals # pragma: no cover
107+
108+
if isinstance(principal, dict):
109+
if "AWS" in principal:
110+
if isinstance(principal["AWS"], list):
111+
principals.extend(principal["AWS"])
112+
else:
113+
principals.append(principal["AWS"])
114+
115+
if "Federated" in principal:
116+
if isinstance(principal["Federated"], list):
117+
principals.extend(principal["Federated"])
118+
else:
119+
principals.append(principal["Federated"])
120+
121+
if "Service" in principal:
122+
if isinstance(principal["Service"], list):
123+
principals.extend(principal["Service"])
124+
else:
125+
principals.append(principal["Service"])
126+
else:
127+
principals.append(principal)
128+
129+
return principals
130+
131+
# Adapted version of policyuniverse's _condition_entries, here:
132+
# https://github.com/Netflix-Skunkworks/policyuniverse/blob/master/policyuniverse/statement.py#L146
133+
def _conditions(self) -> list[tuple[str, Any]]:
134+
"""Extracts any ARNs, Account Numbers, UserIDs, Usernames, CIDRs, VPCs, and VPC Endpoints from a condition block.
135+
136+
Ignores any negated condition operators like StringNotLike.
137+
Ignores weak condition keys like referer, date, etc.
138+
139+
Reason: A condition is meant to limit the principal in a statement. Often, resource policies use a wildcard principal
140+
and rely exclusively on the Condition block to limit access.
141+
142+
We would want to alert if the Condition had no limitations (like a non-existent Condition block), or very weak
143+
limitations. Any negation would be weak, and largely equivelant to having no condition block whatsoever.
144+
145+
The alerting code that relies on this data must ensure the condition has at least one of the following:
146+
- A limiting ARN
147+
- Account Identifier
148+
- AWS Organization Principal Org ID
149+
- User ID
150+
- Source IP / CIDR
151+
- VPC
152+
- VPC Endpoint
153+
154+
https://docs.aws.amazon.com/IAM/latest/UserGuide/reference_policies_condition-keys.html
155+
"""
156+
157+
conditions: list[tuple[str, Any]] = []
158+
condition = self.statement.get("Condition")
159+
if not condition:
160+
return conditions
161+
162+
for condition_operator, condition_context in condition.items():
163+
if RELEVANT_CONDITION_OPERATORS_PATTERN.match(condition_operator):
164+
for key, value in condition_context.items():
165+
key_lower = key.lower()
166+
if key_lower in CONDITION_KEY_CATEGORIES:
167+
if isinstance(value, list):
168+
conditions.extend(
169+
(CONDITION_KEY_CATEGORIES[key_lower], v) for v in value
170+
)
171+
else:
172+
conditions.append(
173+
(CONDITION_KEY_CATEGORIES[key_lower], value)
174+
)
175+
176+
return conditions
177+
178+
@property
179+
def internet_accessible_actions(self) -> list[str]:
180+
"""Determines whether the actions can be used by everyone"""
181+
182+
# compared to policyuniverse's implementation,
183+
# there is no need to check for the existence of 'NotPrincipal',
184+
# because it is not support with self.effect == "Allow"
185+
if self.effect == "Deny":
186+
return []
187+
188+
for entry in self.conditions:
189+
if self._is_condition_entry_internet_accessible(entry=entry):
190+
return self.actions
191+
192+
if self.conditions:
193+
# this means we have conditions, but they protect the policy to be accessible by everyone
194+
return []
195+
196+
for principal in self.principals:
197+
if self._arn_internet_accessible(arn=principal):
198+
return self.actions
199+
200+
return []
201+
202+
# Adapted version of policyuniverse's _is_condition_entry_internet_accessible and the called methods, here:
203+
# https://github.com/Netflix-Skunkworks/policyuniverse/blob/master/policyuniverse/statement.py#L301
204+
# and onwards
205+
def _is_condition_entry_internet_accessible(self, entry: tuple[str, Any]) -> bool:
206+
category, condition_value = entry
207+
208+
if category == "arn":
209+
return self._arn_internet_accessible(arn=condition_value)
210+
elif category == "cidr":
211+
return self._cidr_internet_accessible(cidr=condition_value)
212+
elif category == "organization":
213+
return self._organization_internet_accessible(org=condition_value)
214+
elif category == "userid":
215+
return self._userid_internet_accessible(userid=condition_value)
216+
217+
return "*" in condition_value
218+
219+
def _arn_internet_accessible(self, arn: str) -> bool:
220+
if "*" == arn:
221+
return True
222+
223+
if not arn.startswith("arn:"):
224+
# probably an account ID or AWS service
225+
return False
226+
227+
try:
228+
parsed_arn = ARN(provided_arn=arn)
229+
except Exception:
230+
logger.info(f"ARN {arn} is not parsable")
231+
return "*" in arn
232+
233+
if parsed_arn.service_prefix == "s3":
234+
# S3 ARNs don't have account numbers
235+
return False
236+
237+
if not parsed_arn.account and not parsed_arn.service_prefix:
238+
logger.info(f"ARN {arn} is not valid")
239+
return True
240+
241+
if parsed_arn.account == "*":
242+
return True
243+
244+
return False
245+
246+
def _cidr_internet_accessible(self, cidr: str) -> bool:
247+
return cidr.endswith("/0")
248+
249+
def _organization_internet_accessible(self, org: str) -> bool:
250+
if "o-*" in org:
251+
return True
252+
return False
253+
254+
def _userid_internet_accessible(self, userid: str) -> bool:
255+
# Trailing wildcards are okay for user IDs:
256+
# AROAIIIIIIIIIIIIIIIII:*
257+
if userid.find("*") == len(userid) - 1:
258+
# note: this will also return False for a zero-length userid
259+
return False
260+
return True

0 commit comments

Comments
 (0)