Skip to content

Commit 403fb1e

Browse files
authored
Merge pull request #3638 from OpenNeuroOrg/queue-export-drop
fix(worker): Update DropResource to run fsck and drop for each snapshot
2 parents 47d1598 + 61180cf commit 403fb1e

File tree

2 files changed

+30
-3
lines changed

2 files changed

+30
-3
lines changed

services/datalad/datalad_service/handlers/drop.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import falcon
22

3-
from datalad_service.tasks.publish import annex_drop
3+
from datalad_service.tasks.publish import export_backup_and_drop
44

55

66
class DropResource:
@@ -11,6 +11,6 @@ def __init__(self, store):
1111

1212
async def on_post(self, req, resp, dataset):
1313
dataset_path = self.store.get_dataset_path(dataset)
14-
await annex_drop.kiq(dataset_path)
14+
await export_backup_and_drop.kiq(dataset_path)
1515
resp.media = {}
1616
resp.status = falcon.HTTP_OK

services/datalad/datalad_service/tasks/publish.py

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,30 @@ def create_remotes(dataset_path):
7979
github_sibling(dataset_path, dataset)
8080

8181

82+
async def export_backup_and_drop(dataset_path):
83+
"""
84+
Export dataset to S3 backup, verify s3-PUBLIC, and drop local data.
85+
"""
86+
repo = pygit2.Repository(dataset_path)
87+
tags = sorted(git_tag(repo), key=lambda tag: tag.name)
88+
if tags:
89+
s3_backup_push(dataset_path)
90+
for tag in tags:
91+
# Check and clean local annexed files once export is complete
92+
pipeline = Pipeline(broker, git_annex_fsck_remote).call_next(
93+
annex_drop,
94+
dataset_path=dataset_path,
95+
branch=tag,
96+
remote=get_s3_remote(),
97+
)
98+
# Call the pipeline (arguments for git_annex_fsck_remote)
99+
await pipeline.kiq(
100+
dataset_path,
101+
branch=tag, # Check the history from the new tag just exported
102+
remote=get_s3_remote(),
103+
)
104+
105+
82106
@broker.task
83107
async def export_dataset(
84108
dataset_path,
@@ -111,7 +135,10 @@ async def export_dataset(
111135
clear_dataset_cache(dataset_id)
112136
# Check and clean local annexed files once export is complete
113137
pipeline = Pipeline(broker, git_annex_fsck_remote).call_next(
114-
annex_drop, dataset_path=dataset_path, branch=new_tag
138+
annex_drop,
139+
dataset_path=dataset_path,
140+
branch=new_tag,
141+
remote=get_s3_remote(),
115142
)
116143
# Call the pipeline (arguments for git_annex_fsck_remote)
117144
await pipeline.kiq(

0 commit comments

Comments
 (0)