From 4b7817f81b3a9ff449e7a007138c2fcb073c0c89 Mon Sep 17 00:00:00 2001 From: Munna Shaik Date: Tue, 11 Feb 2025 14:43:59 +0530 Subject: [PATCH] trivial: fixing the dataset query data input time --- TA_dataset/bin/dataset_query.py | 41 +++++++++++++++------------------ 1 file changed, 19 insertions(+), 22 deletions(-) diff --git a/TA_dataset/bin/dataset_query.py b/TA_dataset/bin/dataset_query.py index 0f1b51c0..1ce6964c 100755 --- a/TA_dataset/bin/dataset_query.py +++ b/TA_dataset/bin/dataset_query.py @@ -128,10 +128,22 @@ def stream_events(self, inputs, ew): ds_columns = input_items.get("dataset_query_columns") maxcount = input_items.get("max_count") - ds_st = relative_to_epoch(ds_start) + # Fetching the start and endtime from the checkpoint + checkpoint = checkpointer.KVStoreCheckpointer( + input_name, session_key, APP_NAME + ) + get_checkpoint = checkpoint.get(input_name) + if get_checkpoint is None: + logger.info("The checkpoint object is None, so we are retrieving the values from the configuration instead.") + ds_st = relative_to_epoch(ds_start) + else: + logger.info("The checkpoint object exists, so we are retrieving the values from it.") + ds_st = get_checkpoint["start_time"] if ds_end: + logger.info("The query end time is provided in the configuration.") ds_et = relative_to_epoch(ds_end) else: + logger.info("The query end time is not provided in the configuration.") ds_et = relative_to_epoch("1s") if maxcount: ds_maxcount = int(maxcount) @@ -141,7 +153,7 @@ def stream_events(self, inputs, ew): ds_payload = build_payload( ds_st, ds_et, "query", ds_search, ds_columns, ds_maxcount ) - logger.debug("ds_payload = {}".format(ds_payload)) + logger.info("Data input query payload: {}".format(ds_payload)) proxy = get_proxy(session_key, logger) acct_dict = get_acct_info(self, logger, ds_account) for ds_acct in acct_dict.keys(): @@ -158,10 +170,6 @@ def stream_events(self, inputs, ew): "User-Agent": get_user_agent(), } - # Create checkpointer - checkpoint = checkpointer.KVStoreCheckpointer( - input_name, session_key, APP_NAME - ) ds_api_max = query_api_max() ds_iterations = math.ceil(ds_maxcount / ds_api_max) @@ -199,16 +207,10 @@ def stream_events(self, inputs, ew): ds_event, splunk_dt = parse_query( ds_columns, match_list, sessions ) - get_checkpoint = checkpoint.get(input_name) + + checkpoint_time = ds_st - # if checkpoint doesn't exist, set to 0 - if get_checkpoint is None: - checkpoint.update(input_name, {"timestamp": 0}) - checkpoint_time = 0 - else: - checkpoint_time = get_checkpoint["timestamp"] - - if splunk_dt > checkpoint_time: + if splunk_dt >= checkpoint_time: # if greater than current checkpoint, # write event and update checkpoint event = smi.Event( @@ -222,13 +224,6 @@ def stream_events(self, inputs, ew): .format(splunk_dt, checkpoint_time) ) ew.write_event(event) - - logger.debug( - "saving checkpoint {}".format(splunk_dt) - ) - checkpoint.update( - input_name, {"timestamp": splunk_dt} - ) else: logger.debug( "skipping due to splunk_dt={} is less than" @@ -256,6 +251,8 @@ def stream_events(self, inputs, ew): else: logger.warning(r_json) break + logger.info("saving checkpoint {}".format(ds_et)) + checkpoint.update(input_name, {"start_time": ds_et}) except Exception as e: logger.exception(e)