Skip to content

Commit

Permalink
feat(registry): add remove latest partition job (#44795)
Browse files Browse the repository at this point in the history
## What
Provides a job to delete all latest partitions.

Why?

If you remove the partitions they will be readded **triggering a reprocess of latest**
## Review guide
<!--
1. `x.py`
2. `y.py`
-->

## User Impact
<!--
* What is the end result perceived by the user?
* If there are negative side effects, please list them. 
-->

## Can this PR be safely reverted and rolled back?
<!--
* If unsure, leave it blank.
-->
- [ ] YES 💚
- [ ] NO ❌
  • Loading branch information
bnchrch authored Aug 27, 2024
1 parent 94ceb17 commit 6d4ed9d
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from orchestrator.jobs.registry import (
add_new_metadata_partitions,
remove_stale_metadata_partitions,
remove_latest_metadata_partitions,
generate_cloud_registry,
generate_oss_registry,
generate_registry_entry,
Expand Down Expand Up @@ -203,6 +204,7 @@
generate_nightly_reports,
add_new_metadata_partitions,
remove_stale_metadata_partitions,
remove_latest_metadata_partitions,
generate_stale_gcs_latest_metadata_file,
]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,35 @@ def remove_stale_metadata_partitions():
remove_stale_metadata_partitions_op()


@op(required_resource_keys={"latest_metadata_file_blobs"})
def remove_latest_metadata_partitions_op(context):
"""
This op is responsible for removing for latest metadata files. (Generally used to reprocess metadata files).
"""
latest_metadata_file_blobs = context.resources.latest_metadata_file_blobs
partition_name = registry_entry.metadata_partitions_def.name

all_latest = [blob.etag for blob in latest_metadata_file_blobs]
context.log.info(f"Found {len(all_latest)} latest metadata files found in GCS bucket")

all_etag_partitions = context.instance.get_dynamic_partitions(partition_name)
context.log.info(f"Found {len(all_etag_partitions)} existing metadata partitions")

for latest_etag in all_latest:
if latest_etag in all_etag_partitions:
context.log.info(f"Removing latest etag: {latest_etag}")
context.instance.delete_dynamic_partition(partition_name, latest_etag)
context.log.info(f"Removed latest etag: {latest_etag}")


@job(tags={"dagster/priority": HIGH_QUEUE_PRIORITY})
def remove_latest_metadata_partitions():
"""
This job is responsible for removing latest metadata partitions. (Generally used to reprocess metadata files).
"""
remove_latest_metadata_partitions_op()


@op(required_resource_keys={"slack", "all_metadata_file_blobs"})
def add_new_metadata_partitions_op(context):
"""
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "orchestrator"
version = "0.4.0"
version = "0.4.1"
description = ""
authors = ["Ben Church <[email protected]>"]
readme = "README.md"
Expand Down

0 comments on commit 6d4ed9d

Please sign in to comment.