Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 63 additions & 26 deletions data_subscriber/rtc/evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,42 +45,37 @@ def main(
min_num_bursts = 0

# query GRQ catalog
grq_es = es_conn_util.get_es_connection(logger)

if mgrs_set_id_acquisition_ts_cycle_indexes:
logger.info(f"Supplied {mgrs_set_id_acquisition_ts_cycle_indexes=}. Adding criteria to query")
logger.info(f"Supplied {sensor=}. Adding criteria to query")
if mgrs_set_id_acquisition_ts_cycle_indexes: # native-id / reprocessing mode use case
es_docs = []
for mgrs_set_id_acquisition_ts_cycle_idx in mgrs_set_id_acquisition_ts_cycle_indexes:
body = get_body(match_all=False)
body["query"]["bool"]["must"].append({"match": {"mgrs_set_id_acquisition_ts_cycle_index": mgrs_set_id_acquisition_ts_cycle_idx}})
body["query"]["bool"]["must"].append({"match": {"instrument": sensor}})
# this constraint seems redundant, but it results in more consistent results
body["query"]["bool"]["must"].append({"match": {"mgrs_set_id": mgrs_set_id_acquisition_ts_cycle_idx.split("$")[0]}})
tmp_es_docs = grq_es.query(body=body, index=RTCProductCatalog.ES_INDEX_PATTERNS)
# filter out any redundant results
tmp_es_docs = [
doc for doc in tmp_es_docs
if doc["_source"]["mgrs_set_id_acquisition_ts_cycle_index"] in mgrs_set_id_acquisition_ts_cycle_indexes and doc["_source"]["instrument"] == sensor
]
tmp_es_docs = get_rtc_catalog_products_by_mgrs_set_id_acquisition_ts_cycle_index_and_sensor(
sensor,
mgrs_set_id_acquisition_ts_cycle_idx,
mgrs_set_id_acquisition_ts_cycle_indexes
)
es_docs.extend(tmp_es_docs)
# NOTE: skipping job-submission filters to allow reprocessing
else:
else: # forward mode use case
# query 1: query for unsubmitted docs
body = get_body(match_all=False)
body["query"]["bool"]["must_not"].append({"exists": {"field": "download_job_ids"}})
body["query"]["bool"]["must"].append({"match": {"instrument": sensor}})
unsubmitted_docs = grq_es.query(body=body, index=RTCProductCatalog.ES_INDEX_PATTERNS)
unsubmitted_docs = get_unsubmitted_rtc_catalog_products_by_sensor(sensor)
logger.info(f"Found {len(unsubmitted_docs)=}")

now = current_evaluation_datetime()

def is_recent_unsubmitted(doc):
return (now - timedelta(days=30)) <= dateutil.parser.parse(doc["_source"]["creation_timestamp"]) < now
unsubmitted_docs = list(filter(is_recent_unsubmitted, unsubmitted_docs))
logger.info(f"Limiting unsubmitted granules by recent creation_timestamp. {len(unsubmitted_docs)=}")

# query 2: query for submitted but not 100%
body = get_body(match_all=False)
body["query"]["bool"]["must"].append({"exists": {"field": "download_job_ids"}})
body["query"]["bool"]["must"].append({"range": {"coverage": {"gte": 0, "lt": 100}}})
body["query"]["bool"]["must"].append({"match": {"instrument": sensor}})
submitted_but_incomplete_docs = grq_es.query(body=body, index=RTCProductCatalog.ES_INDEX_PATTERNS)
submitted_but_incomplete_docs = get_partial_submitted_rtc_catalog_products_by_sensor(sensor)
logger.info(f"Found {len(submitted_but_incomplete_docs)=}")

def is_recent_partial_submitted(doc):
return (now - timedelta(days=30)) <= dateutil.parser.parse(doc["_source"]["creation_timestamp"]) < now
submitted_but_incomplete_docs = list(filter(is_recent_partial_submitted, submitted_but_incomplete_docs))
logger.info(f"Limiting partial submitted granules by recent creation_timestamp. {len(submitted_but_incomplete_docs)=}")

es_docs = unsubmitted_docs + submitted_but_incomplete_docs

grouped_es_docs = defaultdict(list)
Expand Down Expand Up @@ -190,6 +185,48 @@ def main(
return evaluator_results


def get_rtc_catalog_products_by_mgrs_set_id_acquisition_ts_cycle_index_and_sensor(
sensor: str,
mgrs_set_id_acquisition_ts_cycle_idx: str,
mgrs_set_id_acquisition_ts_cycle_indexes: set[str]
) -> list[dict]:
body = get_body(match_all=False)
body["query"]["bool"]["must"].append({"match": {"mgrs_set_id_acquisition_ts_cycle_index": mgrs_set_id_acquisition_ts_cycle_idx}})
body["query"]["bool"]["must"].append({"match": {"instrument": sensor}})
# this constraint seems redundant, but it results in more consistent results
body["query"]["bool"]["must"].append({"match": {"mgrs_set_id": mgrs_set_id_acquisition_ts_cycle_idx.split("$")[0]}})

grq_es = es_conn_util.get_es_connection(logger)
docs = grq_es.query(body=body, index=RTCProductCatalog.ES_INDEX_PATTERNS)
# filter out any redundant results
docs = [
doc for doc in docs
if doc["_source"]["mgrs_set_id_acquisition_ts_cycle_index"] in mgrs_set_id_acquisition_ts_cycle_indexes and doc["_source"]["instrument"] == sensor
]
return docs


def get_unsubmitted_rtc_catalog_products_by_sensor(sensor: str) -> list[dict]:
body = get_body(match_all=False)
body["query"]["bool"]["must_not"].append({"exists": {"field": "download_job_ids"}})
body["query"]["bool"]["must"].append({"match": {"instrument": sensor}})

grq_es = es_conn_util.get_es_connection(logger)
unsubmitted_docs = grq_es.query(body=body, index=RTCProductCatalog.ES_INDEX_PATTERNS)
return unsubmitted_docs


def get_partial_submitted_rtc_catalog_products_by_sensor(sensor: str) -> list[dict]:
body = get_body(match_all=False)
body["query"]["bool"]["must"].append({"exists": {"field": "download_job_ids"}})
body["query"]["bool"]["must"].append({"range": {"coverage": {"gte": 0, "lt": 100}}})
body["query"]["bool"]["must"].append({"match": {"instrument": sensor}})

grq_es = es_conn_util.get_es_connection(logger)
submitted_but_incomplete_docs = grq_es.query(body=body, index=RTCProductCatalog.ES_INDEX_PATTERNS)
return submitted_but_incomplete_docs


def current_evaluation_datetime():
"""Calls datetime.now(). Encapsulated in a function for unit-testing purposes."""
return datetime.now()
Expand Down