1
1
import insightconnect_plugin_runtime
2
2
from insightconnect_plugin_runtime .exceptions import APIException , PluginException
3
+ from insightconnect_plugin_runtime .helper import compare_and_dedupe_hashes , hash_sha1
3
4
from .schema import MonitorSiemLogsInput , MonitorSiemLogsOutput , MonitorSiemLogsState , Input , Output , Component , State
4
5
from typing import Dict , List , Tuple
5
6
from datetime import datetime , timezone , timedelta
6
7
import copy
7
8
9
+ # Date format for conversion
10
+ DATE_FORMAT = "%Y-%m-%d"
11
+ # Default and max values
8
12
LOG_TYPES = ["receipt" , "url protect" , "attachment protect" ]
13
+ DEFAULT_THREAD_COUNT = 10
14
+ DEFAULT_PAGE_SIZE = 100
9
15
MAX_LOOKBACK_DAYS = 7
10
16
INITIAL_MAX_LOOKBACK_DAYS = 1
17
+ # Run type
11
18
INITIAL_RUN = "initial_run"
12
19
SUBSEQUENT_RUN = "subsequent_run"
13
20
PAGINATION_RUN = "pagination_run"
21
+ # Access keys for state and custom config
22
+ LOG_HASHES = "log_hashes"
23
+ QUERY_CONFIG = "query_config"
24
+ QUERY_DATE = "query_date"
25
+ CAUGHT_UP = "caught_up"
26
+ NEXT_PAGE = "next_page"
27
+ # Access keys for custom config
28
+ THREAD_COUNT = "thread_count"
29
+ PAGE_SIZE = "page_size"
14
30
15
31
16
32
class MonitorSiemLogs (insightconnect_plugin_runtime .Task ):
@@ -24,21 +40,21 @@ def __init__(self):
24
40
)
25
41
26
42
def run (self , params = {}, state = {}, custom_config = {}): # pylint: disable=unused-argument
27
- self .logger .info (f"TASK: Received State: { state } " )
43
+ self .logger .info (f"TASK: Received State: { state . get ( QUERY_CONFIG ) } " )
28
44
existing_state = state .copy ()
29
45
try :
30
- # TODO: Additional error handling
31
- run_condition = self .detect_run_condition (state .get ("query_config" , {}))
32
- self .logger .info (f"TASK: Current run state is { run_condition } " )
33
- state = self .update_state (state , custom_config )
34
- self .logger .info (f"NEW STATE: { state } " )
35
46
now_date = datetime .now (tz = timezone .utc ).date ()
47
+ run_condition = self .detect_run_condition (state .get (QUERY_CONFIG , {}), now_date )
48
+ self .logger .info (f"TASK: Run state is { run_condition } " )
49
+ state = self .update_state (state )
50
+ page_size , thead_count = self .apply_custom_config (state , custom_config )
36
51
max_run_lookback_date = self .get_max_lookback_date (now_date , run_condition , bool (custom_config ))
37
- query_config = self .prepare_query_params (state .get ("query_config" , {}), max_run_lookback_date , now_date )
38
- logs , query_config = self .get_all_logs (run_condition , query_config )
39
- # TODO: Dedupe
52
+ query_config = self .prepare_query_params (state .get (QUERY_CONFIG , {}), max_run_lookback_date , now_date )
53
+ logs , query_config = self .get_all_logs (run_condition , query_config , page_size , thead_count )
40
54
self .logger .info (f"TASK: Total logs collected this run { len (logs )} " )
41
- exit_state , has_more_pages = self .prepare_exit_state (state , query_config , now_date )
55
+ logs , log_hashes = compare_and_dedupe_hashes (state .get (LOG_HASHES , []), logs )
56
+ self .logger .info (f"TASK: Total logs after deduplication { len (logs )} " )
57
+ exit_state , has_more_pages = self .prepare_exit_state (state , query_config , now_date , log_hashes )
42
58
return logs , exit_state , has_more_pages , 200 , None
43
59
except APIException as error :
44
60
self .logger .info (
@@ -47,39 +63,40 @@ def run(self, params={}, state={}, custom_config={}): # pylint: disable=unused-
47
63
return [], existing_state , False , error .status_code , error
48
64
except PluginException as error :
49
65
self .logger .info (f"Error: A Plugin exception has occurred. Cause: { error .cause } Error data: { error .data } ." )
50
- return [], existing_state , False , error . status_code , error
66
+ return [], existing_state , False , 500 , error
51
67
except Exception as error :
52
68
self .logger .info (f"Error: Unknown exception has occurred. No results returned. Error Data: { error } " )
53
69
return [], existing_state , False , 500 , PluginException (preset = PluginException .Preset .UNKNOWN , data = error )
54
70
55
- def detect_run_condition (self , query_config : Dict ) -> str :
71
+ def detect_run_condition (self , query_config : Dict , now_date : datetime ) -> str :
56
72
"""
57
73
Return runtype based on query configuration
58
74
:param query_config:
75
+ :param now_date:
59
76
:return: runtype string
60
77
"""
61
78
if not query_config :
62
79
return INITIAL_RUN
63
80
for log_type_config in query_config .values ():
64
- if not log_type_config .get ("caught_up" ):
81
+ if not log_type_config .get (CAUGHT_UP ) or log_type_config . get ( QUERY_DATE ) not in str ( now_date ):
65
82
return PAGINATION_RUN
66
83
return SUBSEQUENT_RUN
67
84
68
- def update_state (self , state : Dict , custom_config : Dict ) -> Dict :
85
+ def update_state (self , state : Dict ) -> Dict :
69
86
"""
70
87
Initialise state, validate state, apply custom config
71
88
:param state:
72
- :param custom_config:
73
- :return:
89
+ :return: State
74
90
"""
75
- initial_log_type_config = {"caught_up" : False }
91
+ initial_log_type_config = {CAUGHT_UP : False }
76
92
if not state :
77
- state = { "query_config" : { log_type : copy . deepcopy ( initial_log_type_config ) for log_type in LOG_TYPES }}
78
- self . apply_custom_config ( state , custom_config )
93
+ self . logger . info ( "TASK: Initializing first state..." )
94
+ state = { QUERY_CONFIG : { log_type : copy . deepcopy ( initial_log_type_config ) for log_type in LOG_TYPES }}
79
95
else :
80
96
for log_type in LOG_TYPES :
81
- if log_type not in state .get ("query_config" , {}).keys ():
82
- state ["query_config" ][log_type ] = copy .deepcopy (initial_log_type_config )
97
+ if log_type not in state .get (QUERY_CONFIG , {}).keys ():
98
+ self .logger .info (f"TASK: { log_type } missing from state. Initializing..." )
99
+ state [QUERY_CONFIG ][log_type ] = copy .deepcopy (initial_log_type_config )
83
100
return state
84
101
85
102
def get_max_lookback_date (self , now_date : datetime , run_condition : str , custom_config : bool ) -> datetime :
@@ -97,18 +114,23 @@ def get_max_lookback_date(self, now_date: datetime, run_condition: str, custom_c
97
114
max_run_lookback_date = now_date - timedelta (days = max_run_lookback_days )
98
115
return max_run_lookback_date
99
116
100
- def apply_custom_config (self , state : Dict , custom_config : Dict ) -> None :
117
+ def apply_custom_config (self , state : Dict , custom_config : Dict = {} ) -> Tuple [ int , int ] :
101
118
"""
102
119
Apply custom configuration for lookback, query date applies to start and end time of query
103
120
:param current_query_config:
104
121
:param custom_config:
105
- :return: N/A
122
+ :return:
106
123
"""
107
- # TODO: Additional custom config for page size, thread size, limit
108
- current_query_config = state .get ("query_config" )
109
- for log_type , lookback_date_string in custom_config .items ():
110
- self .logger .info (f"TASK: Supplied lookback date of { lookback_date_string } for { log_type } log type" )
111
- current_query_config [log_type ] = {"query_date" : lookback_date_string }
124
+ if custom_config :
125
+ self .logger .info ("TASK: Custom config detected" )
126
+ if not state :
127
+ current_query_config = state .get (QUERY_CONFIG )
128
+ for log_type , query_date_string in custom_config .items ():
129
+ self .logger .info (f"TASK: Supplied lookback date of { query_date_string } for log type { log_type } " )
130
+ current_query_config [log_type ] = {QUERY_DATE : query_date_string }
131
+ page_size = max (1 , min (custom_config .get (PAGE_SIZE , DEFAULT_PAGE_SIZE ), DEFAULT_PAGE_SIZE ))
132
+ thread_count = max (1 , custom_config .get (THREAD_COUNT , DEFAULT_THREAD_COUNT ))
133
+ return page_size , thread_count
112
134
113
135
def prepare_query_params (self , query_config : Dict , max_lookback_date : Dict , now_date : datetime ) -> Dict :
114
136
"""
@@ -119,18 +141,19 @@ def prepare_query_params(self, query_config: Dict, max_lookback_date: Dict, now_
119
141
:return:
120
142
"""
121
143
for log_type , log_type_config in query_config .items ():
122
- query_date_str = log_type_config .get ("query_date" )
123
- self .logger .info (f"PREPPING { log_type_config } " )
124
- self .logger .info (f"{ log_type } , { query_date_str } " )
144
+ query_date_str = log_type_config .get (QUERY_DATE )
125
145
if query_date_str :
126
- query_date = datetime .strptime (query_date_str , "%Y-%m-%d" ).date ()
146
+ query_date = datetime .strptime (query_date_str , DATE_FORMAT ).date ()
127
147
if not query_date_str :
128
- log_type_config ["query_date" ] = max_lookback_date
129
- elif query_date < now_date and log_type_config .get ("caught_up" ) is True :
148
+ self .logger .info (
149
+ f"TASK: Query date for { log_type } log type is not present. Initializing a { max_lookback_date } "
150
+ )
151
+ log_type_config [QUERY_DATE ] = max_lookback_date
152
+ elif query_date < now_date and log_type_config .get (CAUGHT_UP ) is True :
130
153
self .logger .info (f"TASK: Log type { log_type } has caught up for { query_date } " )
131
- log_type_config ["query_date" ] = query_date + timedelta (days = 1 )
132
- log_type_config ["caught_up" ] = False
133
- log_type_config .pop ("next_page" )
154
+ log_type_config [QUERY_DATE ] = query_date + timedelta (days = 1 )
155
+ log_type_config [CAUGHT_UP ] = False
156
+ log_type_config .pop (NEXT_PAGE )
134
157
query_config [log_type ] = self .validate_config_lookback (log_type_config , max_lookback_date , now_date )
135
158
return query_config
136
159
@@ -142,51 +165,61 @@ def validate_config_lookback(self, log_type_config: Dict, max_lookback_date: dat
142
165
:param now_date:
143
166
:return: log_type_config
144
167
"""
145
- query_date = log_type_config .get ("query_date" )
168
+ query_date = log_type_config .get (QUERY_DATE )
146
169
if isinstance (query_date , str ):
147
- query_date = datetime .strptime (query_date , "%Y-%m-%d" ).date ()
170
+ query_date = datetime .strptime (query_date , DATE_FORMAT ).date ()
148
171
if query_date < max_lookback_date :
149
- return {"query_date" : max_lookback_date }
172
+ return {QUERY_DATE : max_lookback_date }
150
173
if query_date > now_date :
151
- log_type_config ["query_date" ] = now_date
174
+ log_type_config [QUERY_DATE ] = now_date
152
175
return log_type_config
153
176
154
- def get_all_logs (self , run_condition : str , query_config : Dict ) -> Tuple [List , Dict ]:
177
+ def get_all_logs (
178
+ self , run_condition : str , query_config : Dict , page_size : int , thead_count : int
179
+ ) -> Tuple [List , Dict ]:
155
180
"""
156
181
Gets all logs of provided log type. First retrieves batch URLs. Then downloads and reads batches, pooling logs.
157
182
:param run_condition:
158
183
:param query_config:
184
+ :param page_size:
185
+ :param thead_count:
159
186
:return: Logs, updated query configuration (state)
160
187
"""
161
188
complete_logs = []
162
189
for log_type , log_type_config in query_config .items ():
163
- if (not log_type_config .get ("caught_up" )) or (run_condition != PAGINATION_RUN ):
190
+ if (not log_type_config .get (CAUGHT_UP )) or (run_condition != PAGINATION_RUN ):
164
191
logs , results_next_page , caught_up = self .connection .api .get_siem_logs (
165
192
log_type = log_type ,
166
- query_date = log_type_config .get ("query_date" ),
167
- next_page = log_type_config .get ("next_page" ),
193
+ query_date = log_type_config .get (QUERY_DATE ),
194
+ next_page = log_type_config .get (NEXT_PAGE ),
195
+ page_size = page_size ,
196
+ max_threads = thead_count ,
168
197
)
169
198
complete_logs .extend (logs )
170
- log_type_config .update ({"next_page" : results_next_page , "caught_up" : caught_up })
199
+ log_type_config .update ({NEXT_PAGE : results_next_page , CAUGHT_UP : caught_up })
171
200
else :
172
201
self .logger .info (f"TASK: Query for { log_type } is caught up. Skipping as we are currently paginating" )
173
202
return complete_logs , query_config
174
203
175
- def prepare_exit_state (self , state : dict , query_config : dict , now_date : datetime ) -> Tuple [Dict , bool ]:
204
+ def prepare_exit_state (
205
+ self , state : dict , query_config : dict , now_date : datetime , log_hashes : List [str ]
206
+ ) -> Tuple [Dict , bool ]:
176
207
"""
177
208
Prepare state and pagination for task completion. Format date time.
178
209
:param state:
179
210
:param query_config:
180
211
:param now_date:
212
+ :param log_hashes:
181
213
:return: state, has_more_pages
182
214
"""
183
215
has_more_pages = False
184
216
for log_type_config in query_config .values ():
185
- query_date = log_type_config .get ("query_date" )
217
+ query_date = log_type_config .get (QUERY_DATE )
186
218
if isinstance (query_date , str ):
187
- query_date = datetime .strptime (query_date , "%Y-%m-%d" ).date ()
188
- if (not log_type_config .get ("caught_up" )) or query_date < now_date :
219
+ query_date = datetime .strptime (query_date , DATE_FORMAT ).date ()
220
+ if (not log_type_config .get (CAUGHT_UP )) or query_date < now_date :
189
221
has_more_pages = True
190
- log_type_config ["query_date" ] = query_date .strftime ("%Y-%m-%d" )
191
- state ["query_config" ] = query_config
222
+ log_type_config [QUERY_DATE ] = query_date .strftime (DATE_FORMAT )
223
+ state [QUERY_CONFIG ] = query_config
224
+ state [LOG_HASHES ] = log_hashes
192
225
return state , has_more_pages
0 commit comments