14
14
import click
15
15
import click_log
16
16
from cloudsplaining .shared .constants import EXCLUSIONS_FILE
17
- from cloudsplaining .shared .validation import (
18
- check_exclusions_schema ,
19
- check_authorization_details_schema ,
20
- )
17
+ from cloudsplaining .shared .validation import check_authorization_details_schema
18
+ from cloudsplaining .shared .exclusions import Exclusions , DEFAULT_EXCLUSIONS
21
19
from cloudsplaining .scan .authorization_details import AuthorizationDetails
22
20
from cloudsplaining .output .html_report import generate_html_report
23
21
from cloudsplaining .output .data_file import write_results_data_file
67
65
)
68
66
@click_log .simple_verbosity_option ()
69
67
# pylint: disable=redefined-builtin
70
- def scan (input , exclusions_file , output , all_access_levels , skip_open_report ): # pragma: no cover
68
+ def scan (
69
+ input , exclusions_file , output , all_access_levels , skip_open_report
70
+ ): # pragma: no cover
71
71
"""
72
72
Given the path to account authorization details files and the exclusions config file, scan all inline and
73
73
managed policies in the account to identify actions that do not leverage resource constraints.
74
74
"""
75
75
if exclusions_file :
76
+ # Get the exclusions configuration
76
77
with open (exclusions_file , "r" ) as yaml_file :
77
78
try :
78
79
exclusions_cfg = yaml .safe_load (yaml_file )
79
80
except yaml .YAMLError as exc :
80
81
logger .critical (exc )
81
- check_exclusions_schema (exclusions_cfg )
82
+ exclusions = Exclusions (exclusions_cfg )
83
+ else :
84
+ exclusions = DEFAULT_EXCLUSIONS
85
+
82
86
if os .path .isfile (input ):
83
- scan_account_authorization_file (input , exclusions_cfg , output , all_access_levels , skip_open_report )
87
+ scan_account_authorization_file (
88
+ input , exclusions , output , all_access_levels , skip_open_report
89
+ )
84
90
if os .path .isdir (input ):
85
- logger .info ("The path given is a directory. Scanning for account authorization files and generating report." )
91
+ logger .info (
92
+ "The path given is a directory. Scanning for account authorization files and generating report."
93
+ )
86
94
input_files = get_authorization_files_in_directory (input )
87
95
for file in input_files :
88
96
logger .info (f"Scanning file: { file } " )
89
- scan_account_authorization_file (file , exclusions_cfg , output , all_access_levels , skip_open_report )
97
+ scan_account_authorization_file (
98
+ file , exclusions , output , all_access_levels , skip_open_report
99
+ )
90
100
91
101
92
- def scan_account_authorization_file (input_file , exclusions_cfg , output , all_access_levels , skip_open_report ): # pragma: no cover
102
+ def scan_account_authorization_file (
103
+ input_file , exclusions , output , all_access_levels , skip_open_report
104
+ ): # pragma: no cover
93
105
"""
94
106
Given the path to account authorization details files and the exclusions config file, scan all inline and
95
107
managed policies in the account to identify actions that do not leverage resource constraints.
@@ -107,7 +119,7 @@ def scan_account_authorization_file(input_file, exclusions_cfg, output, all_acce
107
119
)
108
120
authorization_details = AuthorizationDetails (account_authorization_details_cfg )
109
121
results = authorization_details .missing_resource_constraints (
110
- exclusions_cfg , modify_only = False
122
+ exclusions , modify_only = False
111
123
)
112
124
else :
113
125
logger .debug (
@@ -116,10 +128,26 @@ def scan_account_authorization_file(input_file, exclusions_cfg, output, all_acce
116
128
)
117
129
authorization_details = AuthorizationDetails (account_authorization_details_cfg )
118
130
results = authorization_details .missing_resource_constraints (
119
- exclusions_cfg , modify_only = True
131
+ exclusions , modify_only = True
120
132
)
121
133
122
134
principal_policy_mapping = authorization_details .principal_policy_mapping
135
+ for principal_policy_entry in principal_policy_mapping :
136
+ for finding_result in results :
137
+ if (
138
+ principal_policy_entry .get ("PolicyName" ).lower ()
139
+ == finding_result .get ("PolicyName" ).lower ()
140
+ ):
141
+ principal_policy_entry ["Actions" ] = len (finding_result ["Actions" ])
142
+ principal_policy_entry ["PrivilegeEscalation" ] = len (
143
+ finding_result ["PrivilegeEscalation" ]
144
+ )
145
+ principal_policy_entry ["DataExfiltrationActions" ] = len (
146
+ finding_result ["DataExfiltrationActions" ]
147
+ )
148
+ principal_policy_entry ["PermissionsManagementActions" ] = len (
149
+ finding_result ["PermissionsManagementActions" ]
150
+ )
123
151
124
152
account_name = Path (input_file ).stem
125
153
@@ -149,8 +177,12 @@ def scan_account_authorization_file(input_file, exclusions_cfg, output, all_acce
149
177
print (f"Raw data file saved: { str (raw_data_filepath )} " )
150
178
151
179
# Principal policy mapping
152
- principal_policy_mapping_file = os .path .join (output , f"iam-principals-{ account_name } .json" )
153
- principal_policy_mapping_filepath = write_results_data_file (principal_policy_mapping , principal_policy_mapping_file )
180
+ principal_policy_mapping_file = os .path .join (
181
+ output , f"iam-principals-{ account_name } .json"
182
+ )
183
+ principal_policy_mapping_filepath = write_results_data_file (
184
+ principal_policy_mapping , principal_policy_mapping_file
185
+ )
154
186
print (f"Principals data file saved: { str (principal_policy_mapping_filepath )} " )
155
187
156
188
print ("Creating the HTML Report" )
@@ -159,24 +191,30 @@ def scan_account_authorization_file(input_file, exclusions_cfg, output, all_acce
159
191
results ,
160
192
principal_policy_mapping ,
161
193
output_directory ,
162
- exclusions_cfg ,
194
+ exclusions . config ,
163
195
skip_open_report = skip_open_report ,
164
196
)
165
197
166
198
167
199
def get_authorization_files_in_directory (directory ): # pragma: no cover
168
200
"""Get a list of download-account-authorization-files in a directory"""
169
- file_list = [f for f in os .listdir (directory ) if os .path .isfile (os .path .join (directory , f ))]
201
+ file_list = [
202
+ f for f in os .listdir (directory ) if os .path .isfile (os .path .join (directory , f ))
203
+ ]
170
204
file_list_with_full_path = []
171
205
for file in file_list :
172
206
if file .endswith (".json" ):
173
- file_list_with_full_path .append (os .path .abspath (os .path .join (directory , file )))
207
+ file_list_with_full_path .append (
208
+ os .path .abspath (os .path .join (directory , file ))
209
+ )
174
210
new_file_list = []
175
211
for file in file_list_with_full_path :
176
212
with open (file ) as f :
177
213
contents = f .read ()
178
214
account_authorization_details_cfg = json .loads (contents )
179
- valid_schema = check_authorization_details_schema (account_authorization_details_cfg )
215
+ valid_schema = check_authorization_details_schema (
216
+ account_authorization_details_cfg
217
+ )
180
218
if valid_schema :
181
219
new_file_list .append (file )
182
220
return new_file_list
0 commit comments