Skip to content

Commit 16aeb2d

Browse files
authored
Merge pull request #3624 from OpenNeuroOrg/export-states
feat(worker): Only fsck remotes on exports and run drop if zero errors occur
2 parents b8b4608 + 45347b5 commit 16aeb2d

File tree

5 files changed

+56
-31
lines changed

5 files changed

+56
-31
lines changed

services/datalad/datalad_service/broker/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
from taskiq import InMemoryBroker, SmartRetryMiddleware
44
from taskiq_redis import RedisAsyncResultBackend, RedisStreamBroker
5+
from taskiq_pipelines import PipelineMiddleware
56

67

78
from datalad_service import config
@@ -37,5 +38,6 @@
3738
use_delay_exponent=True,
3839
max_delay_exponent=120,
3940
),
41+
PipelineMiddleware(),
4042
)
4143
)

services/datalad/datalad_service/tasks/fsck.py

Lines changed: 13 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -57,39 +57,27 @@ def git_annex_fsck_local(dataset_path):
5757

5858

5959
@broker.task
60-
def git_annex_fsck_remote(dataset_path, branch=None, remote='s3-PUBLIC'):
61-
"""Run incremental fsck for one branch (tag) and remote."""
60+
def git_annex_fsck_remote(dataset_path, branch, remote='s3-PUBLIC'):
61+
"""
62+
Run incremental fsck for one branch (tag) and remote.
63+
64+
Returns True if the check ran and no files failed, False otherwise.
65+
"""
6266
try:
6367
repo = pygit2.Repository(dataset_path)
6468
commit, references = get_head_commit_and_references(repo)
6569
except pygit2.GitError:
6670
logging.error(f'Could not open git repository for {dataset_path}')
67-
return
68-
if not branch:
69-
# Find the newest tag chronologically
70-
all_tags = sorted(
71-
[tag for tag in repo.references if tag.startswith('refs/tags/')],
72-
key=lambda tag: repo.lookup_reference(tag).target.commit_time,
73-
reverse=True,
74-
)
75-
if all_tags:
76-
branch = all_tags[0]
77-
else:
78-
logging.info(
79-
f'No tags found for dataset: {dataset_path}. Skipping remote fsck.'
80-
)
81-
return
71+
return False
8272

83-
# Run at most once per month per dataset
8473
annex_command = (
8574
'git-annex',
8675
'fsck',
8776
f'--branch={branch}',
88-
'--from={remote}',
89-
'--fast',
77+
f'--from={remote}',
9078
'--json',
9179
'--json-error-messages',
92-
'--incremental-schedule=7d',
80+
'--incremental-schedule=30d',
9381
)
9482
annex_process = subprocess.Popen(
9583
annex_command, cwd=dataset_path, stdout=subprocess.PIPE
@@ -99,8 +87,11 @@ def git_annex_fsck_remote(dataset_path, branch=None, remote='s3-PUBLIC'):
9987
annexed_file = json.loads(annexed_file_json)
10088
if not annexed_file['success']:
10189
bad_files.append(annexed_file)
90+
update_file_check(dataset_path, commit, references, bad_files, remote)
10291
if len(bad_files) > 0:
10392
logging.error(
10493
f'{dataset_path} remote {remote} has missing or corrupt annexed objects'
10594
)
106-
update_file_check(dataset_path, commit, references, bad_files, remote)
95+
return False
96+
else:
97+
return True

services/datalad/datalad_service/tasks/publish.py

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import pygit2
99
import boto3
1010
from github import Github
11+
from taskiq_pipelines import Pipeline
1112

1213
import datalad_service.common.s3
1314
import datalad_service.common.github
@@ -33,6 +34,7 @@
3334
update_s3_sibling,
3435
)
3536
from datalad_service.broker import broker
37+
from datalad_service.tasks.fsck import git_annex_fsck_remote
3638

3739
logger = logging.getLogger('datalad_service.' + __name__)
3840

@@ -98,16 +100,28 @@ async def export_dataset(
98100
update_s3_sibling(dataset_path)
99101
# Push the most recent tag
100102
if tags:
101-
s3_export(dataset_path, get_s3_remote(), tags[-1].name)
103+
new_tag = tags[-1].name
104+
s3_export(dataset_path, get_s3_remote(), new_tag)
102105
s3_backup_push(dataset_path)
103106
# Once all S3 tags are exported, update GitHub
104107
if github_enabled:
105108
# Perform all GitHub export steps
106-
github_export(dataset_id, dataset_path, tags[-1].name)
107-
# Drop cache once all exports are complete
108-
clear_dataset_cache(dataset_id)
109-
# Clean local annexed files once export is complete
110-
await annex_drop.kiq(dataset_path)
109+
github_export(dataset_id, dataset_path, new_tag)
110+
# Drop cache once all exports are complete
111+
clear_dataset_cache(dataset_id)
112+
# Check and clean local annexed files once export is complete
113+
pipeline = Pipeline(broker, git_annex_fsck_remote).call_next(
114+
annex_drop, dataset_path=dataset_path, branch=new_tag
115+
)
116+
# Call the pipeline (arguments for git_annex_fsck_remote)
117+
await pipeline.kiq(
118+
dataset_path,
119+
branch=new_tag, # Check the history from the new tag just exported
120+
remote=get_s3_remote(),
121+
)
122+
else:
123+
# Clear the cache even if only sibling updates occurred
124+
clear_dataset_cache(dataset_id)
111125

112126

113127
def check_remote_has_version(dataset_path, remote, tag):
@@ -214,7 +228,7 @@ def monitor_remote_configs(dataset_path):
214228

215229

216230
@broker.task
217-
async def annex_drop(dataset_path):
231+
async def annex_drop(fsck_success, dataset_path, branch):
218232
"""Drop local contents from the annex."""
219233
# Ensure numcopies is set to 2 before running drop
220234
await run_check(['git-annex', 'numcopies', '2'], dataset_path)
@@ -226,4 +240,5 @@ async def annex_drop(dataset_path):
226240
# Not an issue if this fails
227241
pass
228242
# Drop will only drop successfully exported files present on both remotes
229-
await run_check(['git-annex', 'drop'], dataset_path)
243+
if fsck_success:
244+
await run_check(['git-annex', 'drop', '--branch', branch], dataset_path)

services/datalad/pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ dependencies = [
2020
"taskiq-redis>=1.0.9",
2121
"uvicorn[standard]>=0.34.3",
2222
"httpx>=0.28.1",
23+
"taskiq-pipelines>=0.1.4",
2324
]
2425
dynamic = ["version"]
2526

services/datalad/uv.lock

Lines changed: 17 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)