From bfe8ee656521fc05a2911576f8294044b29ae763 Mon Sep 17 00:00:00 2001 From: Nell Hardcastle Date: Wed, 30 Jul 2025 14:05:50 -0700 Subject: [PATCH 1/4] feat(worker): Add git-annex fsck tasks for local and remote siblings --- .../datalad/datalad_service/tasks/fsck.py | 119 ++++++++++++++++++ 1 file changed, 119 insertions(+) create mode 100644 services/datalad/datalad_service/tasks/fsck.py diff --git a/services/datalad/datalad_service/tasks/fsck.py b/services/datalad/datalad_service/tasks/fsck.py new file mode 100644 index 000000000..cc0369f7b --- /dev/null +++ b/services/datalad/datalad_service/tasks/fsck.py @@ -0,0 +1,119 @@ +import json +import io +import logging +import subprocess + +import pygit2 +import requests + +from datalad_service.broker import broker +from datalad_service.config import GRAPHQL_ENDPOINT + + +def get_head_commit_and_references(repo): + """ + Returns the current HEAD commit and any references (tags) pointing to it. + """ + head_commit = repo[repo.head.target] + references = [] + for ref in repo.references: + if ( + ref.startswith('refs/tags/') + and repo.lookup_reference(ref).target == head_commit.id + ): + references.append(ref) + return head_commit, references + + +@broker.task +def git_annex_fsck_local(dataset_path): + """Run git-annex fsck for local annexed objects in the draft. Runs on commits, verifies checksums.""" + try: + commit, references = get_head_commit_and_references( + pygit2.Repository(dataset_path) + ) + except pygit2.GitError: + logging.error(f'Could not open git repository for {dataset_path}') + return + annex_command = ( + 'git-annex', + 'fsck', + '--json', + '--json-error-messages', + '--incremental-schedule', + '7d', + '--time-limit=15m', + ) + annex_process = subprocess.Popen( + annex_command, cwd=dataset_path, stdout=subprocess.PIPE + ) + bad_files = [] + for annexed_file_json in io.TextIOWrapper(annex_process.stdout, encoding='utf-8'): + annexed_file = json.loads(annexed_file_json) + if not annexed_file['success']: + bad_files.append(annexed_file) + if len(bad_files) > 0: + logging.error(f'missing or corrupt annexed objects found in {dataset_path}') + # Send any bad files to updateFileCheck + requests.post( + url=GRAPHQL_ENDPOINT, + json={ + 'query': 'mutation ($datasetId: ID!, $hexsha: String!, $refs: [String], $annexFsck: [AnnexFsck]) { updateFileChecks(datasetId: $datasetId, hexsha: $hexsha, refs: $refs, annexFsck: $annexFsck) { datasetId, hexsha } }', + 'variables': { + 'datasetId': dataset_path.split('/')[-1], + 'hexsha': str(commit.id), + 'refs': references, + 'annexFsck': bad_files, + }, + }, + ) + + +@broker.task +def git_annex_fsck_remote(dataset_path, branch=None, remote='s3-PUBLIC'): + """Run incremental fsck for one branch (tag) and remote.""" + try: + # Basic sanity check opening the repo before running git-annex + repo = pygit2.Repository(dataset_path) + except pygit2.GitError: + logging.error(f'Could not open git repository for {dataset_path}') + return + if not branch: + # Find the newest tag chronologically + all_tags = sorted( + [tag for tag in repo.references if tag.startswith('refs/tags/')], + key=lambda tag: repo.lookup_reference(tag).target.commit_time, + reverse=True, + ) + if all_tags: + branch = all_tags[0] + else: + logging.info( + f'No tags found for dataset: {dataset_path}. Skipping remote fsck.' + ) + return + + # Run at most once per month per dataset + annex_command = ( + 'git-annex', + 'fsck', + f'--branch={branch}', + '--from={remote}', + '--fast', + '--json', + '--json-error-messages', + '--incremental-schedule=7d', + '--time-limit=15m', + ) + annex_process = subprocess.Popen( + annex_command, cwd=dataset_path, stdout=subprocess.PIPE + ) + bad_files = [] + for annexed_file_json in io.TextIOWrapper(annex_process.stdout, encoding='utf-8'): + annexed_file = json.loads(annexed_file_json) + if not annexed_file['success']: + bad_files.append(annexed_file) + if len(bad_files) > 0: + logging.error( + f'{dataset_path} remote {remote} has missing or corrupt annexed objects' + ) From b156885f394831b6a9cd54d16504d6af678581d7 Mon Sep 17 00:00:00 2001 From: Nell Hardcastle Date: Wed, 30 Jul 2025 14:06:31 -0700 Subject: [PATCH 2/4] feat(worker): Add onchange hooks for tags and HEAD updates --- services/datalad/datalad_service/common/onchange.py | 11 +++++++++++ 1 file changed, 11 insertions(+) create mode 100644 services/datalad/datalad_service/common/onchange.py diff --git a/services/datalad/datalad_service/common/onchange.py b/services/datalad/datalad_service/common/onchange.py new file mode 100644 index 000000000..08cd4b522 --- /dev/null +++ b/services/datalad/datalad_service/common/onchange.py @@ -0,0 +1,11 @@ +from datalad_service.tasks.fsck import git_annex_fsck_local, git_annex_fsck_remote + + +async def on_head(dataset_path): + """Called after any change to HEAD.""" + await git_annex_fsck_local.kiq(dataset_path) + + +async def on_tag(dataset_path, tag): + """Called after any new tag.""" + await git_annex_fsck_remote.kiq(dataset_path, tag) From 51eec49782db1e6b63d8375eabeec5caebbc6320 Mon Sep 17 00:00:00 2001 From: Nell Hardcastle Date: Wed, 30 Jul 2025 14:07:13 -0700 Subject: [PATCH 3/4] refactor(worker): Use async/await for commits and enable onchange hooks --- services/datalad/datalad_service/common/git.py | 9 ++++++--- .../datalad/datalad_service/handlers/dataset.py | 2 +- .../datalad/datalad_service/handlers/draft.py | 4 ++-- .../datalad/datalad_service/handlers/files.py | 5 ++--- .../datalad/datalad_service/handlers/git.py | 6 ++++-- .../datalad/datalad_service/handlers/upload.py | 4 ++-- .../datalad/datalad_service/tasks/dataset.py | 4 ++-- .../datalad_service/tasks/description.py | 2 +- services/datalad/datalad_service/tasks/files.py | 12 +++++++----- .../datalad/datalad_service/tasks/snapshots.py | 17 ++++++++++------- services/datalad/tests/test_bids.py | 4 ++-- services/datalad/tests/test_datalad.py | 15 +++++++-------- 12 files changed, 46 insertions(+), 38 deletions(-) diff --git a/services/datalad/datalad_service/common/git.py b/services/datalad/datalad_service/common/git.py index 46f3fbc2f..48118b1aa 100644 --- a/services/datalad/datalad_service/common/git.py +++ b/services/datalad/datalad_service/common/git.py @@ -10,6 +10,8 @@ import pygit2 from charset_normalizer import from_bytes +from datalad_service.common.onchange import on_head + tag_ref = re.compile('^refs/tags/') COMMITTER_NAME = 'Git Worker' @@ -130,7 +132,7 @@ def git_rename_master_to_main(repo): repo.references['HEAD'].set_target('refs/heads/main') -def git_commit( +async def git_commit( repo, file_paths, author=None, message='[OpenNeuro] Recorded changes', parents=None ): """Commit array of paths at HEAD.""" @@ -161,10 +163,10 @@ def git_commit( sentry_sdk.capture_exception(e) logger.error(f'Failed to read index after git-annex add: {e}') raise OpenNeuroGitError(f'Failed to read index: {e}') from e - return git_commit_index(repo, author, message, parents) + return await git_commit_index(repo, author, message, parents) -def git_commit_index( +async def git_commit_index( repo, author=None, message='[OpenNeuro] Recorded changes', parents=None ): """Commit any existing index changes.""" @@ -180,4 +182,5 @@ def git_commit_index( 'refs/heads/main', author, committer, message, tree, parent_commits ) repo.head.set_target(commit) + await on_head(repo.workdir) return commit diff --git a/services/datalad/datalad_service/handlers/dataset.py b/services/datalad/datalad_service/handlers/dataset.py index d5ba5d110..729dcf966 100644 --- a/services/datalad/datalad_service/handlers/dataset.py +++ b/services/datalad/datalad_service/handlers/dataset.py @@ -41,7 +41,7 @@ async def on_post(self, req, resp, dataset): author = pygit2.Signature(name, email) else: author = None - hexsha = create_dataset(self.store, dataset, author) + hexsha = await create_dataset(self.store, dataset, author) resp.media = {'hexsha': hexsha} resp.status = falcon.HTTP_OK diff --git a/services/datalad/datalad_service/handlers/draft.py b/services/datalad/datalad_service/handlers/draft.py index b6ac7e3d8..62d9e42fa 100644 --- a/services/datalad/datalad_service/handlers/draft.py +++ b/services/datalad/datalad_service/handlers/draft.py @@ -53,9 +53,9 @@ async def on_post(self, req, resp, dataset): # Add all changes to the index if name and email: author = pygit2.Signature(name, email) - media_dict['ref'] = str(git_commit(repo, ['.'], author)) + media_dict['ref'] = str(await git_commit(repo, ['.'], author)) else: - media_dict['ref'] = str(git_commit(repo, ['.'])) + media_dict['ref'] = str(await git_commit(repo, ['.'])) resp.media = media_dict resp.status = falcon.HTTP_OK except: diff --git a/services/datalad/datalad_service/handlers/files.py b/services/datalad/datalad_service/handlers/files.py index bac61cf8c..b9b913b76 100644 --- a/services/datalad/datalad_service/handlers/files.py +++ b/services/datalad/datalad_service/handlers/files.py @@ -6,7 +6,6 @@ import pygit2 from datalad_service.common.git import ( - git_show, git_show_content, git_tree, OpenNeuroGitError, @@ -125,7 +124,7 @@ async def on_delete(self, req, resp, dataset): media_dict['email'] = email try: if len(dirs_to_delete) > 0: - remove_files( + await remove_files( self.store, dataset, dirs_to_delete, @@ -135,7 +134,7 @@ async def on_delete(self, req, resp, dataset): ) resp.status = falcon.HTTP_INTERNAL_SERVER_ERROR if len(files_to_delete) > 0: - remove_files( + await remove_files( self.store, dataset, files_to_delete, diff --git a/services/datalad/datalad_service/handlers/git.py b/services/datalad/datalad_service/handlers/git.py index cbbacbcab..705ba5fd0 100644 --- a/services/datalad/datalad_service/handlers/git.py +++ b/services/datalad/datalad_service/handlers/git.py @@ -6,6 +6,7 @@ from datalad_service.common.events import log_git_event from datalad_service.common.const import CHUNK_SIZE_BYTES +from datalad_service.common.onchange import on_head cache_control = ['no-cache', 'max-age=0', 'must-revalidate'] @@ -137,11 +138,12 @@ async def on_post(self, req, resp, worker, dataset): resp.status = falcon.HTTP_OK # After this request finishes successfully, log it to the OpenNeuro API - def schedule_git_event(): + async def schedule_git_event(): + await on_head(dataset_path) for new_commit, new_ref in refs_updated: log_git_event(dataset, new_commit, new_ref, req.context['token']) - resp.schedule_sync(schedule_git_event) + resp.schedule(schedule_git_event) else: resp.status = falcon.HTTP_UNPROCESSABLE_ENTITY diff --git a/services/datalad/datalad_service/handlers/upload.py b/services/datalad/datalad_service/handlers/upload.py index b6b51ad03..05807f072 100644 --- a/services/datalad/datalad_service/handlers/upload.py +++ b/services/datalad/datalad_service/handlers/upload.py @@ -34,9 +34,9 @@ async def move_files_into_repo( await move_files(upload_path, dataset_path) if name and email: author = pygit2.Signature(name, email) - hexsha = str(git_commit(repo, unlock_files, author)) + hexsha = str(await git_commit(repo, unlock_files, author)) else: - hexsha = str(git_commit(repo, unlock_files)) + hexsha = str(await git_commit(repo, unlock_files)) class UploadResource: diff --git a/services/datalad/datalad_service/tasks/dataset.py b/services/datalad/datalad_service/tasks/dataset.py index 6f2c7c50e..f2010c967 100644 --- a/services/datalad/datalad_service/tasks/dataset.py +++ b/services/datalad/datalad_service/tasks/dataset.py @@ -42,7 +42,7 @@ def create_datalad_config(dataset_path): configfile.write(config) -def create_dataset(store, dataset, author=None, initial_head='main'): +async def create_dataset(store, dataset, author=None, initial_head='main'): """Create a DataLad git-annex repo for a new dataset. initial_head is only meant for tests and is overridden by the implementation of git_commit @@ -61,7 +61,7 @@ def create_dataset(store, dataset, author=None, initial_head='main'): # Set a datalad UUID create_datalad_config(dataset_path) repo.index.add('.datalad/config') - git_commit( + await git_commit( repo, ['.gitattributes', '.datalad/config'], author, diff --git a/services/datalad/datalad_service/tasks/description.py b/services/datalad/datalad_service/tasks/description.py index 92b10b1d5..07bb07d58 100644 --- a/services/datalad/datalad_service/tasks/description.py +++ b/services/datalad/datalad_service/tasks/description.py @@ -32,7 +32,7 @@ async def update_description(store, dataset, description_fields, name=None, emai path, description, json.dumps(updated, indent=4, ensure_ascii=False) ) # Commit new content, run validator - commit_files(store, dataset, ['dataset_description.json']) + await commit_files(store, dataset, ['dataset_description.json']) return updated else: return description_json diff --git a/services/datalad/datalad_service/tasks/files.py b/services/datalad/datalad_service/tasks/files.py index 3acaa300a..d6bf8865c 100644 --- a/services/datalad/datalad_service/tasks/files.py +++ b/services/datalad/datalad_service/tasks/files.py @@ -21,7 +21,7 @@ from datalad_service.config import AWS_S3_PUBLIC_BUCKET -def commit_files(store, dataset, files, name=None, email=None, cookies=None): +async def commit_files(store, dataset, files, name=None, email=None, cookies=None): """ Commit a list of files with the email and name provided. @@ -35,9 +35,9 @@ def commit_files(store, dataset, files, name=None, email=None, cookies=None): and pygit2.Signature(name, email) or pygit2.Signature(COMMITTER_NAME, COMMITTER_EMAIL) ) - ref = git_commit(repo, files, author) + ref = await git_commit(repo, files, author) # Run the validator but don't block on the request - asyncio.create_task(validate_dataset.kiq(dataset, dataset_path, str(ref), cookies)) + await validate_dataset.kiq(dataset, dataset_path, str(ref), cookies) return ref @@ -47,7 +47,7 @@ def get_tree(store, dataset, tree): return get_repo_files(dataset, dataset_path, tree) -def remove_files(store, dataset, paths, name=None, email=None, cookies=None): +async def remove_files(store, dataset, paths, name=None, email=None, cookies=None): dataset_path = store.get_dataset_path(dataset) repo = pygit2.Repository(dataset_path) if name and email: @@ -57,7 +57,9 @@ def remove_files(store, dataset, paths, name=None, email=None, cookies=None): repo.index.remove_all(paths) repo.index.write() repo.checkout_index() - hexsha = str(git_commit_index(repo, author, message='[OpenNeuro] Files removed')) + hexsha = str( + await git_commit_index(repo, author, message='[OpenNeuro] Files removed') + ) def parse_s3_annex_url(url, bucket_name=AWS_S3_PUBLIC_BUCKET): diff --git a/services/datalad/datalad_service/tasks/snapshots.py b/services/datalad/datalad_service/tasks/snapshots.py index 54bd378cc..ee27687be 100644 --- a/services/datalad/datalad_service/tasks/snapshots.py +++ b/services/datalad/datalad_service/tasks/snapshots.py @@ -9,6 +9,7 @@ from datalad_service.tasks.dataset import create_datalad_config from datalad_service.tasks.description import update_description from datalad_service.tasks.files import commit_files +from datalad_service.common.onchange import on_tag class SnapshotExistsException(Exception): @@ -119,7 +120,7 @@ async def update_changes(store, dataset, tag, new_changes): current_date = datetime.today().strftime('%Y-%m-%d') updated = await write_new_changes(dataset_path, tag, new_changes, current_date) # Commit new content, run validator - commit_files(store, dataset, ['CHANGES']) + await commit_files(store, dataset, ['CHANGES']) return updated else: return await get_head_changes(dataset_path) @@ -133,7 +134,7 @@ def validate_snapshot_name(store, dataset, snapshot): raise SnapshotExistsException(f'Tag "{snapshot}" already exists, name conflict') -def validate_datalad_config(store, dataset): +async def validate_datalad_config(store, dataset): """Add a .datalad/config file if one does not exist.""" dataset_path = store.get_dataset_path(dataset) repo = store.get_dataset_repo(dataset) @@ -141,12 +142,14 @@ def validate_datalad_config(store, dataset): git_show(repo, 'HEAD', '.datalad/config') except KeyError: create_datalad_config(dataset_path) - commit_files(store, dataset, ['.datalad/config']) + await commit_files(store, dataset, ['.datalad/config']) -def save_snapshot(store, dataset, snapshot): - repo = pygit2.Repository(store.get_dataset_path(dataset)) +async def save_snapshot(store, dataset, snapshot): + ds_path = store.get_dataset_path(dataset) + repo = pygit2.Repository(ds_path) repo.references.create(f'refs/tags/{snapshot}', str(repo.head.target)) + await on_tag(ds_path, snapshot) async def create_snapshot( @@ -158,8 +161,8 @@ async def create_snapshot( Raises an exception if the tag already exists. """ validate_snapshot_name(store, dataset, snapshot) - validate_datalad_config(store, dataset) + await validate_datalad_config(store, dataset) await update_description(store, dataset, description_fields) await update_changes(store, dataset, snapshot, snapshot_changes) - save_snapshot(store, dataset, snapshot) + await save_snapshot(store, dataset, snapshot) return get_snapshot(store, dataset, snapshot) diff --git a/services/datalad/tests/test_bids.py b/services/datalad/tests/test_bids.py index 114307dcd..f3507c1f1 100644 --- a/services/datalad/tests/test_bids.py +++ b/services/datalad/tests/test_bids.py @@ -70,10 +70,10 @@ def test_read_dataset_description(new_dataset): assert description['Name'] == 'Test fixture new dataset' -def test_read_dataset_description_invalid_json(new_dataset): +async def test_read_dataset_description_invalid_json(new_dataset): repo = Repository(new_dataset.path) open(os.path.join(new_dataset.path, 'dataset_description.json'), 'w').close() - git_commit(repo, ['dataset_description.json']) + await git_commit(repo, ['dataset_description.json']) description = read_dataset_description(new_dataset.path, 'HEAD') assert description is None diff --git a/services/datalad/tests/test_datalad.py b/services/datalad/tests/test_datalad.py index 60ea69214..951ad5c46 100644 --- a/services/datalad/tests/test_datalad.py +++ b/services/datalad/tests/test_datalad.py @@ -1,7 +1,6 @@ """Test DataLad tasks.""" import os -from unittest.mock import patch import uuid import pytest @@ -10,14 +9,14 @@ from datalad_service.common.annex import init_annex from datalad_service.common.git import OpenNeuroGitError -from datalad_service.tasks.dataset import * +from datalad_service.tasks.dataset import create_dataset, delete_dataset from datalad_service.tasks.files import commit_files -def test_create_dataset(datalad_store): +async def test_create_dataset(datalad_store): ds_id = 'ds000002' author = pygit2.Signature('test author', 'test@example.com') - create_dataset(datalad_store, ds_id, author) + await create_dataset(datalad_store, ds_id, author) ds = Dataset(os.path.join(datalad_store.annex_path, ds_id)) assert ds.repo is not None # Verify the dataset is created with datalad config @@ -45,18 +44,18 @@ async def test_create_dataset_master(datalad_store): file_path = os.path.join(ds.path, 'LICENSE') with open(file_path, 'w') as fd: fd.write("""MIT""") - commit_files(datalad_store, ds_id, ['LICENSE']) + await commit_files(datalad_store, ds_id, ['LICENSE']) # Verify the branch is now set to main assert ds.repo.get_active_branch() == 'main' -def test_create_dataset_unusual_default_branch(datalad_store): +async def test_create_dataset_unusual_default_branch(datalad_store): ds_id = 'ds000026' author = pygit2.Signature('test author', 'test@example.com') # Create dataset will commit data and this should fail since HEAD is something 'unusual' # (such as the git-annex branch as a plausible example) with pytest.raises(OpenNeuroGitError) as e: - create_dataset(datalad_store, ds_id, author, 'unusual') + await create_dataset(datalad_store, ds_id, author, 'unusual') async def test_delete_dataset(datalad_store, new_dataset): @@ -70,6 +69,6 @@ async def test_commit_file(datalad_store, new_dataset): file_path = os.path.join(new_dataset.path, 'LICENSE') with open(file_path, 'w') as fd: fd.write("""GPL""") - commit_files(datalad_store, ds_id, ['LICENSE']) + await commit_files(datalad_store, ds_id, ['LICENSE']) dataset = Dataset(os.path.join(datalad_store.annex_path, ds_id)) assert not dataset.repo.dirty From c5fb5aeb564b935852ccc150fb4be8b4a53b9c5a Mon Sep 17 00:00:00 2001 From: Nell Hardcastle Date: Mon, 11 Aug 2025 09:51:52 -0700 Subject: [PATCH 4/4] fix(worker): Remove time limit for annex fsck calls We want the complete annex fsck results, these tasks are queued and queue size is better to watch than limit runtime --- services/datalad/datalad_service/tasks/fsck.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/services/datalad/datalad_service/tasks/fsck.py b/services/datalad/datalad_service/tasks/fsck.py index cc0369f7b..9a90b832e 100644 --- a/services/datalad/datalad_service/tasks/fsck.py +++ b/services/datalad/datalad_service/tasks/fsck.py @@ -42,7 +42,6 @@ def git_annex_fsck_local(dataset_path): '--json-error-messages', '--incremental-schedule', '7d', - '--time-limit=15m', ) annex_process = subprocess.Popen( annex_command, cwd=dataset_path, stdout=subprocess.PIPE @@ -103,7 +102,6 @@ def git_annex_fsck_remote(dataset_path, branch=None, remote='s3-PUBLIC'): '--json', '--json-error-messages', '--incremental-schedule=7d', - '--time-limit=15m', ) annex_process = subprocess.Popen( annex_command, cwd=dataset_path, stdout=subprocess.PIPE