1
1
# encoding = utf-8
2
2
import json
3
3
import sys
4
+ import time
4
5
from datetime import datetime , timezone , timedelta
5
6
6
7
import six
@@ -141,7 +142,6 @@ def enrich_payload(splunk_helper, payload):
141
142
return None
142
143
payload ["type" ] = parsed_stix ["type" ]
143
144
payload ["value" ] = parsed_stix ["value" ]
144
- payload ["value" ] = parsed_stix ["value" ]
145
145
146
146
if "extensions" in payload :
147
147
for extension_definition in payload ["extensions" ].values ():
@@ -160,6 +160,10 @@ def enrich_payload(splunk_helper, payload):
160
160
payload ["_key" ] = attribute_value
161
161
else :
162
162
payload [attribute_name ] = attribute_value
163
+
164
+ if "detection" not in payload :
165
+ payload ["detection" ] = False
166
+
163
167
# remove extensions
164
168
del payload ["extensions" ]
165
169
@@ -324,44 +328,54 @@ def collect_events(helper, ew):
324
328
)
325
329
326
330
for msg in messages :
327
- if msg .event in ["create" , "update" , "delete" ]:
328
- data = json .loads (msg .data )["data" ]
329
- if data ['type' ] == "indicator" and data ['pattern_type' ] == "stix" :
330
- parsed_stix = enrich_payload (helper , data )
331
- if parsed_stix is None :
332
- helper .log_error (f"Unable to process indicator: { data ['name' ]} - { data ['pattern' ]} " )
333
- continue
334
- helper .log_info ("processing msg: " + msg .event + " - " + msg .id + " - " + parsed_stix ['name' ]
335
- + " - " + parsed_stix ['pattern' ])
336
- if msg .event == "create" or msg .event == "update" :
337
- exist = exist_in_kvstore (kv_store , parsed_stix ["_key" ])
338
- if exist :
339
- kv_store .update (parsed_stix ["_key" ], parsed_stix )
340
- else :
331
+ try :
332
+ if msg .event in ["create" , "update" , "delete" ]:
333
+ data = json .loads (msg .data )["data" ]
334
+ if data ['type' ] == "indicator" and data ['pattern_type' ] == "stix" :
335
+ parsed_stix = enrich_payload (helper , data )
336
+ if parsed_stix is None :
337
+ helper .log_error (f"Unsupported indicator pattern: { data ['name' ]} - { data ['pattern' ]} " )
338
+ continue
339
+ helper .log_info ("processing msg: " + msg .event + " - " + msg .id + " - " + parsed_stix ['name' ]
340
+ + " - " + parsed_stix ['pattern' ])
341
+ if msg .event == "create" or msg .event == "update" :
342
+ # update code to use bach_save
341
343
parsed_stix ['added_at' ] = datetime .now (timezone .utc ).strftime ("%Y-%m-%dT%H:%M:%SZ" )
342
- kv_store .insert (parsed_stix )
343
- if msg .event == "delete" :
344
- exist = exist_in_kvstore (kv_store , parsed_stix ["_key" ])
345
- if exist :
346
- kv_store .delete_by_id (parsed_stix ["_key" ])
347
-
348
- if data ['type' ] == "marking-definition" :
349
- helper .log_info ("processing msg: " + msg .event + " - " + msg .id + " - "
350
- + data ['name' ] + " - " + data ['id' ])
351
- if msg .event == "create" or msg .event == "update" :
352
- if data ['id' ] not in MARKING_DEFs :
353
- MARKING_DEFs [data ['id' ]] = data ['name' ]
354
-
355
- if data ['type' ] == "identity" :
356
- helper .log_info ("processing msg: " + msg .event + " - " + msg .id + " - "
357
- + data ['name' ] + " - " + data ['id' ])
358
- if msg .event == "create" or msg .event == "update" :
359
- if data ['id' ] not in IDENTITY_DEFs :
360
- IDENTITY_DEFs [data ['id' ]] = data ['name' ]
361
-
362
- # update checkpoint (take 0:00:00.005544 to update)
363
- state ["start_from" ] = msg .id
364
- helper .save_check_point (input_name , json .dumps (state ))
344
+ kv_store .batch_save (* [parsed_stix ])
345
+ """
346
+ exist = exist_in_kvstore(kv_store, parsed_stix["_key"])
347
+ if exist:
348
+ parsed_stix['added_at'] = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
349
+ kv_store.update(parsed_stix["_key"], parsed_stix)
350
+ else:
351
+ parsed_stix['added_at'] = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
352
+ kv_store.insert(parsed_stix)
353
+ """
354
+ if msg .event == "delete" :
355
+ exist = exist_in_kvstore (kv_store , parsed_stix ["_key" ])
356
+ if exist :
357
+ kv_store .delete_by_id (parsed_stix ["_key" ])
358
+
359
+ if data ['type' ] == "marking-definition" :
360
+ helper .log_info ("processing msg: " + msg .event + " - " + msg .id + " - "
361
+ + data ['name' ] + " - " + data ['id' ])
362
+ if msg .event == "create" or msg .event == "update" :
363
+ if data ['id' ] not in MARKING_DEFs :
364
+ MARKING_DEFs [data ['id' ]] = data ['name' ]
365
+
366
+ if data ['type' ] == "identity" :
367
+ helper .log_info ("processing msg: " + msg .event + " - " + msg .id + " - "
368
+ + data ['name' ] + " - " + data ['id' ])
369
+ if msg .event == "create" or msg .event == "update" :
370
+ if data ['id' ] not in IDENTITY_DEFs :
371
+ IDENTITY_DEFs [data ['id' ]] = data ['name' ]
372
+
373
+ # update checkpoint (take 0:00:00.005544 to update)
374
+ state ["start_from" ] = msg .id
375
+ helper .save_check_point (input_name , json .dumps (state ))
376
+ except Exception as ex :
377
+ helper .log_debug (f"Error when processing message, reason: { ex } , msg: { msg } " )
378
+
365
379
except Exception as ex :
366
380
helper .log_error (f"Error in ListenStream loop, exit, reason: { ex } " )
367
381
sys .excepthook (* sys .exc_info ())
0 commit comments