Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 19 additions & 22 deletions TA_dataset/bin/dataset_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,10 +128,22 @@ def stream_events(self, inputs, ew):
ds_columns = input_items.get("dataset_query_columns")
maxcount = input_items.get("max_count")

ds_st = relative_to_epoch(ds_start)
# Fetching the start and endtime from the checkpoint
checkpoint = checkpointer.KVStoreCheckpointer(
input_name, session_key, APP_NAME
)
get_checkpoint = checkpoint.get(input_name)
if get_checkpoint is None:
logger.info("The checkpoint object is None, so we are retrieving the values from the configuration instead.")
ds_st = relative_to_epoch(ds_start)
else:
logger.info("The checkpoint object exists, so we are retrieving the values from it.")
ds_st = get_checkpoint["start_time"]
if ds_end:
logger.info("The query end time is provided in the configuration.")
ds_et = relative_to_epoch(ds_end)
else:
logger.info("The query end time is not provided in the configuration.")
ds_et = relative_to_epoch("1s")
if maxcount:
ds_maxcount = int(maxcount)
Expand All @@ -141,7 +153,7 @@ def stream_events(self, inputs, ew):
ds_payload = build_payload(
ds_st, ds_et, "query", ds_search, ds_columns, ds_maxcount
)
logger.debug("ds_payload = {}".format(ds_payload))
logger.info("Data input query payload: {}".format(ds_payload))
proxy = get_proxy(session_key, logger)
acct_dict = get_acct_info(self, logger, ds_account)
for ds_acct in acct_dict.keys():
Expand All @@ -158,10 +170,6 @@ def stream_events(self, inputs, ew):
"User-Agent": get_user_agent(),
}

# Create checkpointer
checkpoint = checkpointer.KVStoreCheckpointer(
input_name, session_key, APP_NAME
)

ds_api_max = query_api_max()
ds_iterations = math.ceil(ds_maxcount / ds_api_max)
Expand Down Expand Up @@ -199,16 +207,10 @@ def stream_events(self, inputs, ew):
ds_event, splunk_dt = parse_query(
ds_columns, match_list, sessions
)
get_checkpoint = checkpoint.get(input_name)

checkpoint_time = ds_st

# if checkpoint doesn't exist, set to 0
if get_checkpoint is None:
checkpoint.update(input_name, {"timestamp": 0})
checkpoint_time = 0
else:
checkpoint_time = get_checkpoint["timestamp"]

if splunk_dt > checkpoint_time:
if splunk_dt >= checkpoint_time:
# if greater than current checkpoint,
# write event and update checkpoint
event = smi.Event(
Expand All @@ -222,13 +224,6 @@ def stream_events(self, inputs, ew):
.format(splunk_dt, checkpoint_time)
)
ew.write_event(event)

logger.debug(
"saving checkpoint {}".format(splunk_dt)
)
checkpoint.update(
input_name, {"timestamp": splunk_dt}
)
else:
logger.debug(
"skipping due to splunk_dt={} is less than"
Expand Down Expand Up @@ -256,6 +251,8 @@ def stream_events(self, inputs, ew):
else:
logger.warning(r_json)
break
logger.info("saving checkpoint {}".format(ds_et))
checkpoint.update(input_name, {"start_time": ds_et})

except Exception as e:
logger.exception(e)
Expand Down