33
33
from xfd_api .utils .chunk import chunk_list_by_bytes
34
34
from xfd_api .utils .csv_utils import create_checksum
35
35
from xfd_api .utils .hash import hash_ip
36
+ from xfd_api .utils .scan_utils .alerting import (
37
+ IngestionError ,
38
+ QueryError ,
39
+ ScanExecutionError ,
40
+ SyncError ,
41
+ )
36
42
from xfd_api .utils .scan_utils .vuln_scanning_sync_utils import (
37
43
enforce_latest_flag_port_scan ,
38
44
fetch_orgs_and_relations ,
67
73
)
68
74
LOGGER = logging .getLogger (__name__ )
69
75
IS_LOCAL = os .getenv ("IS_LOCAL" )
76
+ SCAN_NAME = "VulnScanningSync"
70
77
71
78
VS_PULL_DATE_RANGE = os .getenv ("VS_PULL_DATE_RANGE" , "2" )
72
79
@@ -89,8 +96,9 @@ def handler(event):
89
96
main ()
90
97
return {"status_code" : 200 , "body" : "VS Sync completed successfully" }
91
98
except Exception as e :
92
- LOGGER .info ("Error occurred: %s" , e )
93
- return {"status_code" : 500 , "body" : str (e )}
99
+ raise ScanExecutionError (SCAN_NAME , str (e ), event ) from e
100
+ # LOGGER.info("Error occurred: %s", e)
101
+ # return {"status_code": 500, "body": str(e)}
94
102
95
103
96
104
def query_redshift (query , params = None ):
@@ -111,6 +119,8 @@ def query_redshift(query, params=None):
111
119
cursor .execute (query ) # <-- this avoids the IndexError
112
120
results = cursor .fetchall ()
113
121
return [dict (row ) for row in results ]
122
+ except Exception as e :
123
+ raise QueryError (SCAN_NAME , str (e )) from e
114
124
finally :
115
125
cursor .close ()
116
126
conn .close ()
@@ -217,12 +227,33 @@ def main():
217
227
chunk_number - 1 ,
218
228
)
219
229
LOGGER .info ("Finished processing tickets" )
220
- create_vuln_scan_summary ()
230
+ try :
231
+ create_vuln_scan_summary ()
232
+ except Exception as e :
233
+ raise QueryError (
234
+ SCAN_NAME , str (e ), "Error creating vulnerability scan summary"
235
+ ) from e
221
236
222
- create_domain_view ("mini_data_lake" )
223
- create_service_view ("mini_data_lake" )
224
- create_vuln_normal_views ("mini_data_lake" )
225
- create_vuln_materialized_views ("mini_data_lake" )
237
+ try :
238
+ create_domain_view ("mini_data_lake" )
239
+ except Exception as e :
240
+ raise QueryError (SCAN_NAME , str (e ), "Error creating domain view" ) from e
241
+ try :
242
+ create_service_view ("mini_data_lake" )
243
+ except Exception as e :
244
+ raise QueryError (SCAN_NAME , str (e ), "Error creating service view" ) from e
245
+ try :
246
+ create_vuln_normal_views ("mini_data_lake" )
247
+ except Exception as e :
248
+ raise QueryError (
249
+ SCAN_NAME , str (e ), "Error creating vulnerability normal views"
250
+ ) from e
251
+ try :
252
+ create_vuln_materialized_views ("mini_data_lake" )
253
+ except Exception as e :
254
+ raise QueryError (
255
+ SCAN_NAME , str (e ), "Error creating vulnerability materialized views"
256
+ ) from e
226
257
227
258
228
259
def detect_data_set (query ):
@@ -291,6 +322,7 @@ def send_organizations_to_dmz():
291
322
traceback .format_exc (),
292
323
)
293
324
print (e )
325
+ raise SyncError (SCAN_NAME , str (e ), "Error sending organizations to dmz" ) from e
294
326
295
327
296
328
def send_csv_to_sync (csv_data , bounds ):
@@ -330,6 +362,10 @@ def send_csv_to_sync(csv_data, bounds):
330
362
)
331
363
except Exception as e :
332
364
LOGGER .error ("Unexpected error sending chunk: %s" , str (e ))
365
+ raise SyncError (
366
+ SCAN_NAME ,
367
+ str (e ),
368
+ ) from e
333
369
334
370
335
371
def process_vulnerability_scans (vuln_scans , org_id_dict ):
@@ -357,9 +393,14 @@ def process_vulnerability_scans(vuln_scans, org_id_dict):
357
393
except Exception as e :
358
394
LOGGER .error ("Error saving vulnerability scan: %s" , e )
359
395
print (traceback .format_exc ())
396
+ # Raise to catch in the outer block
397
+ raise e
360
398
except Exception as e :
361
399
LOGGER .error ("Error processing Vulnerability Scan: %s" , e )
362
400
print (traceback .format_exc ())
401
+ raise IngestionError (
402
+ SCAN_NAME , str (e ), "Failed processing vulnerability scans"
403
+ ) from e
363
404
364
405
365
406
def safe_fromisoformat (date_input ) -> datetime .datetime | None :
@@ -518,6 +559,9 @@ def create_daily_host_summary(org_id_dict, summary_date=None):
518
559
owner_id ,
519
560
e ,
520
561
)
562
+ raise QueryError (
563
+ SCAN_NAME , str (e ), "Error creating daily host summary"
564
+ ) from e
521
565
522
566
LOGGER .info ("Completed host summary creation from Redshift." )
523
567
@@ -581,6 +625,7 @@ def create_port_scan_summary(summary_date=None):
581
625
582
626
except Exception as e :
583
627
print ("Error creating port scan summary: {}" .format (e ))
628
+ raise QueryError (SCAN_NAME , str (e ), "Error creating port scan summary" ) from e
584
629
585
630
586
631
def create_port_scan_service_summaries (summary_date = None ):
@@ -633,6 +678,9 @@ def create_port_scan_service_summaries(summary_date=None):
633
678
)
634
679
except Exception as e :
635
680
print ("Error creating port scan service summary: {}" .format (e ))
681
+ raise QueryError (
682
+ SCAN_NAME , str (e ), "Error creating port scan service summary"
683
+ ) from e
636
684
637
685
638
686
def process_tickets (tickets , org_id_dict ):
@@ -706,6 +754,7 @@ def process_tickets(tickets, org_id_dict):
706
754
print (
707
755
f"Error processing ticket data: { e } - { owner_id } - { ticket .get ('owner' )} "
708
756
)
757
+ raise IngestionError (SCAN_NAME , str (e ), "Failed processing tickets" ) from e
709
758
710
759
711
760
def get_asset_owned_count (org ):
@@ -1034,6 +1083,9 @@ def process_port_scans(port_scans, org_id_dict):
1034
1083
save_port_scan_to_datalake (port_scan_dict )
1035
1084
except Exception as e :
1036
1085
print (f"Error processing port scan data: { e } " )
1086
+ raise IngestionError (
1087
+ SCAN_NAME , str (e ), "Failed processing port scans"
1088
+ ) from e
1037
1089
1038
1090
1039
1091
def process_orgs (request_list ):
@@ -1044,16 +1096,23 @@ def process_orgs(request_list):
1044
1096
parent_child_dict = {}
1045
1097
1046
1098
# Process the request data
1047
- if request_list and isinstance (request_list , list ):
1048
- process_request (request_list , sector_child_dict , parent_child_dict , org_id_dict )
1099
+ try :
1100
+ if request_list and isinstance (request_list , list ):
1101
+ process_request (
1102
+ request_list , sector_child_dict , parent_child_dict , org_id_dict
1103
+ )
1049
1104
1050
- # Link parent-child organizations
1051
- link_parent_child_organizations (parent_child_dict , org_id_dict )
1105
+ # Link parent-child organizations
1106
+ link_parent_child_organizations (parent_child_dict , org_id_dict )
1052
1107
1053
- # Assign organizations to sectors
1054
- assign_organizations_to_sectors (sector_child_dict , org_id_dict )
1108
+ # Assign organizations to sectors
1109
+ assign_organizations_to_sectors (sector_child_dict , org_id_dict )
1055
1110
1056
- return org_id_dict
1111
+ return org_id_dict
1112
+ except Exception as e :
1113
+ raise IngestionError (
1114
+ SCAN_NAME , str (e ), "Failed processing organizations"
1115
+ ) from e
1057
1116
1058
1117
1059
1118
def link_parent_child_organizations (
@@ -1108,6 +1167,7 @@ def assign_organizations_to_sectors(
1108
1167
except Exception as e :
1109
1168
print ("Error assigning organization to sectors:" )
1110
1169
print (e )
1170
+ raise e
1111
1171
1112
1172
1113
1173
def process_request (request_list , sector_child_dict , parent_child_dict , org_id_dict ):
@@ -1264,6 +1324,9 @@ def process_organization(request, network_list, location_dict, org_id_dict):
1264
1324
org_id_dict [request ["_id" ]] = org_record .id
1265
1325
except Exception as e :
1266
1326
LOGGER .info ("Error saving organization: %s - %s" , e , request ["_id" ])
1327
+ raise IngestionError (
1328
+ SCAN_NAME , str (e ), "Failed processing organizations"
1329
+ ) from e
1267
1330
1268
1331
1269
1332
if __name__ == "__main__" :
0 commit comments