Skip to content

Commit 1c8292b

Browse files
committed
add clean up for synapse table
1 parent 8f936d4 commit 1c8292b

File tree

1 file changed

+94
-8
lines changed

1 file changed

+94
-8
lines changed

dags/synapse_dataset_to_croissant_minimal.py

Lines changed: 94 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -421,7 +421,7 @@ def extract_s3_objects_to_delete(bucket_objects: List[str], combined_dataset_nam
421421

422422
@task
423423
def delete_non_current_croissant_file_in_s3(
424-
root_carrier_context: Dict, dataset_ids: List[str], **context
424+
root_carrier_context: Dict, combined_dataset_name_and_id: List[Dict[str, str]], **context
425425
) -> None:
426426
"""
427427
Delete the non-current croissant files from S3.
@@ -434,7 +434,7 @@ def delete_non_current_croissant_file_in_s3(
434434
435435
Arguments:
436436
root_carrier_context: The root carrier context to use for the trace context.
437-
dataset_ids: a set of dataset ids that are currently present in the data catalog
437+
combined_dataset_name_and_id: a list of dictionaries containing dataset_id and dataset_name for all current datasets
438438
439439
Returns:
440440
None
@@ -446,7 +446,7 @@ def delete_non_current_croissant_file_in_s3(
446446

447447
objects_to_delete = extract_s3_objects_to_delete(
448448
bucket_objects=bucket_objects,
449-
combined_dataset_name_and_id=dataset_ids,
449+
combined_dataset_name_and_id=combined_dataset_name_and_id,
450450
)
451451

452452
if objects_to_delete:
@@ -465,6 +465,87 @@ def delete_non_current_croissant_file_in_s3(
465465
otel_tracer.span_processor.force_flush()
466466
otel_logger.handlers[0].flush()
467467
return None
468+
469+
def extract_synapse_rows_to_delete(synapse_rows: DataFrame, combined_dataset_name_and_id: List[Dict[str, str]]) -> List[str]:
470+
"""
471+
Extract the Synapse table rows to delete. This is done by comparing
472+
the list of dataset IDs in the Synapse table to the list of current datasets.
473+
If a dataset ID is found in the Synapse table that does not correspond to a current dataset,
474+
it is marked for deletion.
475+
476+
Arguments:
477+
synapse_rows: The DataFrame containing the rows from the Synapse table.
478+
combined_dataset_name_and_id: A list of dictionaries containing dataset_id and dataset_name for all current datasets.
479+
"""
480+
rows_to_delete = []
481+
482+
if synapse_rows.empty:
483+
otel_logger.info("No rows found in Synapse table.")
484+
return rows_to_delete
485+
486+
# Create a set of current dataset IDs for fast lookup
487+
current_dataset_ids = set(d["dataset_id"] for d in combined_dataset_name_and_id)
488+
489+
for index, row in synapse_rows.iterrows():
490+
dataset_id_in_row = str(row["dataset"])
491+
if dataset_id_in_row not in current_dataset_ids:
492+
rows_to_delete.append(str(row["ROW_ID"]))
493+
494+
return rows_to_delete
495+
496+
@task
497+
def delete_non_current_croissant_file_in_synapse(
498+
root_carrier_context: Dict, combined_dataset_name_and_id: List[Dict[str, str]], **context
499+
) -> None:
500+
"""
501+
Delete the non-current croissant file links from Synapse table.
502+
This is used to remove the old links from Synapse table that are no longer needed.
503+
A "non-current" link is defined as a croissant file link which is no longer
504+
present in the data catalog.
505+
506+
This can occur if the dataset has been removed from the data catalog and thus synapse table is out of date.
507+
508+
Arguments:
509+
root_carrier_context: The root carrier context to use for the trace context.
510+
combined_dataset_name_and_id: a list of dictionaries containing dataset_id and dataset_name for all current datasets
511+
512+
Returns:
513+
None
514+
"""
515+
# Warning: Using an authenticated Synapse Client during this section of code
516+
with otel_tracer.start_as_current_span("delete_non_current_files_from_s3", context=TraceContextTextMapPropagator().extract(root_carrier_context)) as span:
517+
syn_hook = SynapseHook(context["params"]["synapse_conn_id"])
518+
519+
authenticated_syn_client: Synapse = syn_hook.client
520+
authenticated_syn_client._rest_call = MethodType(
521+
_rest_call_replacement, authenticated_syn_client)
522+
523+
query_string = f"SELECT * FROM {SYNAPSE_TABLE_FOR_CROISSANT_LINKS}"
524+
table_dataframe= query(query=query_string, synapse_client=authenticated_syn_client)
525+
526+
rows_to_delete = extract_synapse_rows_to_delete(
527+
synapse_rows=table_dataframe,
528+
combined_dataset_name_and_id=combined_dataset_name_and_id,
529+
)
530+
531+
if rows_to_delete:
532+
delete_out_of_date_from_synapse = context["params"]["delete_out_of_date_from_synapse"]
533+
if delete_out_of_date_from_synapse:
534+
otel_logger.info(
535+
f"Deleting the following rows from Synapse: {rows_to_delete}")
536+
# Delete rows from the Synapse table
537+
table_to_delete_from = Table(id=SYNAPSE_TABLE_FOR_CROISSANT_LINKS)
538+
table_to_delete_from.delete_rows(row_ids=[int(row_id) for row_id in rows_to_delete])
539+
else:
540+
otel_logger.info(
541+
f"Found rows to delete from Synapse, but not deleting due to `delete_out_of_date_from_synapse` param: {rows_to_delete}")
542+
else:
543+
otel_logger.info(
544+
"No rows to delete from Synapse. All rows are current.")
545+
otel_tracer.span_processor.force_flush()
546+
otel_logger.handlers[0].flush()
547+
return None
548+
468549

469550
@task
470551
def create_and_save_jsonld(**context) -> None:
@@ -485,7 +566,7 @@ def create_and_save_jsonld(**context) -> None:
485566
push_to_s3 = context["params"]["push_results_to_s3"]
486567
push_to_synapse = context["params"]["push_links_to_synapse"]
487568

488-
existing_dataset_ids = []
569+
combined_dataset_name_and_id = []
489570
for index, row in table.iterrows():
490571
if pd.isnull(row["id"]):
491572
continue
@@ -494,7 +575,7 @@ def create_and_save_jsonld(**context) -> None:
494575
dataset_name = row["name"]
495576

496577
# save all the active dataset ids and names to prepare for cleanup later
497-
existing_dataset_ids.append({
578+
combined_dataset_name_and_id.append({
498579
"dataset_id": dataset_id,
499580
"dataset_name": dataset_name
500581
})
@@ -522,14 +603,19 @@ def create_and_save_jsonld(**context) -> None:
522603
s3_url = f"https://{BUCKET_NAME}.s3.us-east-1.amazonaws.com/{quote_plus(s3_key)}"
523604
execute_push_to_synapse(push_to_synapse=push_to_synapse, dataset=dataset, dataset_id=dataset_id, s3_url=s3_url, **context)
524605

525-
return existing_dataset_ids
606+
return combined_dataset_name_and_id
526607

527608
root_carrier_context = create_root_span()
528-
dataset_ids = create_and_save_jsonld()
609+
combined_dataset_name_and_id = create_and_save_jsonld()
529610

530611
delete_non_current_croissant_file_in_s3(
531612
root_carrier_context=root_carrier_context,
532-
dataset_ids=dataset_ids
613+
combined_dataset_name_and_id=combined_dataset_name_and_id,
533614
)
534615

616+
delete_non_current_croissant_file_in_synapse(root_carrier_context=root_carrier_context,
617+
combined_dataset_name_and_id=combined_dataset_name_and_id,
618+
)
619+
620+
535621
save_minimal_jsonld_to_s3()

0 commit comments

Comments
 (0)