Skip to content

Check if aggregated summary vector is stale and recompute if needed #979

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,34 @@ async def get_aggregated_single_column_async(
if existing_agg_table_count > 1:
raise MultipleDataMatchesError(f"Multiple tables found for: {self._make_req_info_str()}", Service.SUMO)

sumo_table_obj: Table
sumo_table_obj: Table | None = None

if existing_agg_table_count == 1:
# We're good, just get hold of the single object and fetch the blob into the object
sumo_table_obj = await sc_existing_agg_tables.getitem_async(0)
await sumo_table_obj.blob_async
perf_metrics.record_lap("fetch")
else:
# No aggregated table exists for the column, we need to aggregate it
# We're probably good and have an already aggregated table, but we also need to check if the
# aggregation has become stale with regards to the underlying realizations.
# Since we assume that most of the time we will have valid aggregations, we optimistically do
# fetching of the blob payload contents and validation concurrently.
existing_agg_table_obj: Table = await sc_existing_agg_tables.getitem_async(0)
perf_metrics.record_lap("get-sumo-obj")

async with asyncio.TaskGroup() as tg:
tg.create_task(existing_agg_table_obj.blob_async)
validate_task = tg.create_task(_is_agg_valid_for_reals_async(existing_agg_table_obj, sc_tables_basis))
perf_metrics.record_lap("fetch-and-validate")

if validate_task.result():
sumo_table_obj = existing_agg_table_obj
else:
LOGGER.debug(
f"ArrowTableLoader.get_aggregated_single_column() found stale aggregation for: {column_name=}, {self._make_req_info_str()}"
)

if not sumo_table_obj:
# No valid aggregated table exists for the column, we need to aggregate it
LOGGER.debug(
f"ArrowTableLoader.get_aggregated_single_column() doing aggregation for: {column_name=}, {self._make_req_info_str()}"
)

sc_agg_input_tables = sc_tables_basis.filter(aggregation=False, realization=True)

# If we have a wildcard table name, we must ensure that we're down to a single table name in the search context
Expand All @@ -72,6 +92,11 @@ async def get_aggregated_single_column_async(
f"Multiple tables found for {self._make_req_info_str()}", Service.SUMO
)

if await sc_agg_input_tables.length_async() == 0:
raise NoDataError(
f"No per-realization tables found for: {column_name=}, {self._make_req_info_str()}", Service.SUMO
)

# Does the aggregation and gets the blob (also writes the resulting aggregation back into Sumo)
sumo_table_obj = await sc_agg_input_tables.aggregate_async(columns=[column_name], operation="collection")
perf_metrics.record_lap("aggregate")
Expand Down Expand Up @@ -147,3 +172,30 @@ def _make_req_info_str(self) -> str:
if self._req_tagname is not None:
info_str += f", tagname={self._req_tagname}"
return info_str


async def _is_agg_valid_for_reals_async(agg_sumo_table_obj: Table, sc_tables: SearchContext) -> bool:
"""
Check if the aggregation is valid with regards to the underlying realizations.
"""
agg_ts = agg_sumo_table_obj.metadata["_sumo"]["timestamp"]

sc_real_tables = sc_tables.filter(realization=True, aggregation=False)
sc_real_tables_older_than_agg = sc_real_tables.filter(complex={"range": {"_sumo.timestamp": {"lt": agg_ts}}})

# Get the realization ids for the tables that are older than the aggregation
real_ids_older_than_agg = await sc_real_tables_older_than_agg.realizationids_async

# Get the current realization count
current_real_count = await sc_real_tables.filter().length_async()

# If there are any new realizations the aggregation is invalid
if current_real_count != len(real_ids_older_than_agg):
return False

# Compare the set of realization ids that are older than the aggregation with the realization
# ids that were actually used to construct the aggregation.
if set(real_ids_older_than_agg) != set(agg_sumo_table_obj.metadata["fmu"]["aggregation"]["realization_ids"]):
return False

return True
Loading