@@ -128,10 +128,22 @@ def stream_events(self, inputs, ew):
128128 ds_columns = input_items .get ("dataset_query_columns" )
129129 maxcount = input_items .get ("max_count" )
130130
131- ds_st = relative_to_epoch (ds_start )
131+ # Fetching the start and endtime from the checkpoint
132+ checkpoint = checkpointer .KVStoreCheckpointer (
133+ input_name , session_key , APP_NAME
134+ )
135+ get_checkpoint = checkpoint .get (input_name )
136+ if get_checkpoint is None :
137+ logger .info ("The checkpoint object is None, so we are retrieving the values from the configuration instead." )
138+ ds_st = relative_to_epoch (ds_start )
139+ else :
140+ logger .info ("The checkpoint object exists, so we are retrieving the values from it." )
141+ ds_st = get_checkpoint ["start_time" ]
132142 if ds_end :
143+ logger .info ("The query end time is provided in the configuration." )
133144 ds_et = relative_to_epoch (ds_end )
134145 else :
146+ logger .info ("The query end time is not provided in the configuration." )
135147 ds_et = relative_to_epoch ("1s" )
136148 if maxcount :
137149 ds_maxcount = int (maxcount )
@@ -141,7 +153,7 @@ def stream_events(self, inputs, ew):
141153 ds_payload = build_payload (
142154 ds_st , ds_et , "query" , ds_search , ds_columns , ds_maxcount
143155 )
144- logger .debug ( "ds_payload = {}" .format (ds_payload ))
156+ logger .info ( "Data input query payload: {}" .format (ds_payload ))
145157 proxy = get_proxy (session_key , logger )
146158 acct_dict = get_acct_info (self , logger , ds_account )
147159 for ds_acct in acct_dict .keys ():
@@ -158,10 +170,6 @@ def stream_events(self, inputs, ew):
158170 "User-Agent" : get_user_agent (),
159171 }
160172
161- # Create checkpointer
162- checkpoint = checkpointer .KVStoreCheckpointer (
163- input_name , session_key , APP_NAME
164- )
165173
166174 ds_api_max = query_api_max ()
167175 ds_iterations = math .ceil (ds_maxcount / ds_api_max )
@@ -199,16 +207,10 @@ def stream_events(self, inputs, ew):
199207 ds_event , splunk_dt = parse_query (
200208 ds_columns , match_list , sessions
201209 )
202- get_checkpoint = checkpoint .get (input_name )
210+
211+ checkpoint_time = ds_st
203212
204- # if checkpoint doesn't exist, set to 0
205- if get_checkpoint is None :
206- checkpoint .update (input_name , {"timestamp" : 0 })
207- checkpoint_time = 0
208- else :
209- checkpoint_time = get_checkpoint ["timestamp" ]
210-
211- if splunk_dt > checkpoint_time :
213+ if splunk_dt >= checkpoint_time :
212214 # if greater than current checkpoint,
213215 # write event and update checkpoint
214216 event = smi .Event (
@@ -222,13 +224,6 @@ def stream_events(self, inputs, ew):
222224 .format (splunk_dt , checkpoint_time )
223225 )
224226 ew .write_event (event )
225-
226- logger .debug (
227- "saving checkpoint {}" .format (splunk_dt )
228- )
229- checkpoint .update (
230- input_name , {"timestamp" : splunk_dt }
231- )
232227 else :
233228 logger .debug (
234229 "skipping due to splunk_dt={} is less than"
@@ -256,6 +251,8 @@ def stream_events(self, inputs, ew):
256251 else :
257252 logger .warning (r_json )
258253 break
254+ logger .info ("saving checkpoint {}" .format (ds_et ))
255+ checkpoint .update (input_name , {"start_time" : ds_et })
259256
260257 except Exception as e :
261258 logger .exception (e )
0 commit comments