From 9d31297d5835333408fd49b3fcdc0f4517182f3c Mon Sep 17 00:00:00 2001 From: Nell Hardcastle Date: Tue, 11 Nov 2025 11:40:56 -0800 Subject: [PATCH 1/5] feat(worker): Allow run_check to override the environment --- .../datalad/datalad_service/common/asyncio.py | 23 +++++++++++++------ 1 file changed, 16 insertions(+), 7 deletions(-) diff --git a/services/datalad/datalad_service/common/asyncio.py b/services/datalad/datalad_service/common/asyncio.py index af70b871d..91bdc292a 100644 --- a/services/datalad/datalad_service/common/asyncio.py +++ b/services/datalad/datalad_service/common/asyncio.py @@ -2,14 +2,23 @@ import subprocess -async def run_check(command, dataset_path): +async def run_check(command, dataset_path, env=None): """Helper to run an async command and check for failure""" - process = await asyncio.create_subprocess_exec( - *command, - cwd=dataset_path, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE, - ) + if env: + process = await asyncio.create_subprocess_exec( + *command, + cwd=dataset_path, + env=env, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + else: + process = await asyncio.create_subprocess_exec( + *command, + cwd=dataset_path, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) stdout, stderr = await process.communicate() if process.returncode != 0: raise subprocess.CalledProcessError(process.returncode, command, stdout, stderr) From c2040147db77e2dcfb960d13f020e914e08c8a54 Mon Sep 17 00:00:00 2001 From: Nell Hardcastle Date: Tue, 11 Nov 2025 11:42:16 -0800 Subject: [PATCH 2/5] feat(worker): Run exports as coroutines (avoid blocking worker entirely) --- services/datalad/datalad_service/common/s3.py | 14 ++++++++------ .../datalad/datalad_service/handlers/exports.py | 17 +++++++++++++++++ .../datalad/datalad_service/tasks/publish.py | 6 +++--- services/datalad/tests/test_publish.py | 4 ++-- 4 files changed, 30 insertions(+), 11 deletions(-) create mode 100644 services/datalad/datalad_service/handlers/exports.py diff --git a/services/datalad/datalad_service/common/s3.py b/services/datalad/datalad_service/common/s3.py index 3c5f66ee1..cde8ebcee 100644 --- a/services/datalad/datalad_service/common/s3.py +++ b/services/datalad/datalad_service/common/s3.py @@ -6,6 +6,7 @@ import datalad_service.config from datalad_service.common.annex import annex_initremote, is_git_annex_remote +from datalad_service.common.asyncio import run_check class S3ConfigException(Exception): @@ -179,17 +180,18 @@ def validate_s3_config(dataset_path): return True -def s3_export(dataset_path, target, treeish): +async def s3_export(dataset_path, target, treeish): """Perform an S3 export on a git-annex repo.""" - subprocess.check_call( - ['git-annex', 'export', treeish, '--to', target], cwd=dataset_path + await run_check( + ['git-annex', 'export', treeish, '--to', target], + dataset_path, ) -def s3_backup_push(dataset_path): +async def s3_backup_push(dataset_path): """Perform an S3 push to the backup remote on a git-annex repo.""" - subprocess.check_call( + await run_check( ['git-annex', 'push', get_s3_backup_remote()], - cwd=dataset_path, + dataset_path, env=backup_remote_env(), ) diff --git a/services/datalad/datalad_service/handlers/exports.py b/services/datalad/datalad_service/handlers/exports.py new file mode 100644 index 000000000..0f61eaf7e --- /dev/null +++ b/services/datalad/datalad_service/handlers/exports.py @@ -0,0 +1,17 @@ +import logging + +import falcon + +from datalad_service.tasks.files import remove_annex_object + + +class ExportsResource: + """Handler to report status for exports.""" + + def __init__(self, store): + self.store = store + self.logger = logging.getLogger('datalad_service.' + __name__) + + async def on_get(self, req, resp, dataset, remote): + """Report status for exports""" + dataset_path = self.store.get_dataset_path(dataset) diff --git a/services/datalad/datalad_service/tasks/publish.py b/services/datalad/datalad_service/tasks/publish.py index aae7320ed..c45fcd7fd 100644 --- a/services/datalad/datalad_service/tasks/publish.py +++ b/services/datalad/datalad_service/tasks/publish.py @@ -86,7 +86,7 @@ async def export_backup_and_drop(dataset_path): repo = pygit2.Repository(dataset_path) tags = sorted(git_tag(repo), key=lambda tag: tag.name) if tags: - s3_backup_push(dataset_path) + await s3_backup_push(dataset_path) for tag in tags: # Check and clean local annexed files once export is complete pipeline = Pipeline(broker, git_annex_fsck_remote).call_next( @@ -125,8 +125,8 @@ async def export_dataset( # Push the most recent tag if tags: new_tag = tags[-1].name - s3_export(dataset_path, get_s3_remote(), new_tag) - s3_backup_push(dataset_path) + await s3_export(dataset_path, get_s3_remote(), new_tag) + await s3_backup_push(dataset_path) # Once all S3 tags are exported, update GitHub if github_enabled: # Perform all GitHub export steps diff --git a/services/datalad/tests/test_publish.py b/services/datalad/tests/test_publish.py index 0f21d71c3..43c67d3d1 100644 --- a/services/datalad/tests/test_publish.py +++ b/services/datalad/tests/test_publish.py @@ -1,6 +1,6 @@ import json import os -from unittest.mock import Mock, call +from unittest.mock import AsyncMock, Mock, call import falcon @@ -50,7 +50,7 @@ async def test_export_snapshots(no_init_remote, client, new_dataset): # Make it public create_remotes(new_dataset.path) # Export - s3_export_mock = Mock() + s3_export_mock = AsyncMock() github_export_mock = Mock() update_s3_sibling_mock = Mock() await export_dataset( From 858c1d93a06faabd17bbb22765ade679d821e388 Mon Sep 17 00:00:00 2001 From: Nell Hardcastle Date: Tue, 11 Nov 2025 13:10:16 -0800 Subject: [PATCH 3/5] fix(worker): Setup credentials for workaround GCP object remote --- services/datalad/datalad_service/common/s3.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/services/datalad/datalad_service/common/s3.py b/services/datalad/datalad_service/common/s3.py index cde8ebcee..89278f41c 100644 --- a/services/datalad/datalad_service/common/s3.py +++ b/services/datalad/datalad_service/common/s3.py @@ -119,6 +119,9 @@ def setup_s3_backup_sibling_workaround(dataset_path): Key=f'{dataset_id}/annex-uuid', Body=uuid.encode('utf-8'), ) + # Create the creds file + with open(os.path.join(dataset_path, '.git', 'annex', 'creds', uuid), 'w') as f: + f.write(f'{aws_access_key_id}\n{aws_secret_access_key}\n') # Enableremote after subprocess.run( ['git-annex', 'enableremote', get_s3_backup_remote()], From 6ec3aff74174a90cccf8d75083d2a8f3478aaeee Mon Sep 17 00:00:00 2001 From: Nell Hardcastle Date: Tue, 11 Nov 2025 13:10:41 -0800 Subject: [PATCH 4/5] fix(worker): Unset AWS credentials when calling drop (allows access to both remotes) --- services/datalad/datalad_service/tasks/publish.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/services/datalad/datalad_service/tasks/publish.py b/services/datalad/datalad_service/tasks/publish.py index c45fcd7fd..90fb18650 100644 --- a/services/datalad/datalad_service/tasks/publish.py +++ b/services/datalad/datalad_service/tasks/publish.py @@ -268,4 +268,10 @@ async def annex_drop(fsck_success, dataset_path, branch): pass # Drop will only drop successfully exported files present on both remotes if fsck_success: - await run_check(['git-annex', 'drop', '--branch', branch], dataset_path) + env = os.environ.copy() + # Force git-annex to use cached credentials for this + del env['AWS_ACCESS_KEY_ID'] + del env['AWS_SECRET_ACCESS_KEY'] + await run_check( + ['git-annex', 'drop', '--branch', branch], dataset_path, env=env + ) From 66ba43c7a43a2d5c67410098542e228be9cf30df Mon Sep 17 00:00:00 2001 From: Nell Hardcastle Date: Tue, 11 Nov 2025 13:11:15 -0800 Subject: [PATCH 5/5] fix(worker): Remove extra argument from fsck -> drop pipeline --- services/datalad/datalad_service/tasks/publish.py | 1 - 1 file changed, 1 deletion(-) diff --git a/services/datalad/datalad_service/tasks/publish.py b/services/datalad/datalad_service/tasks/publish.py index 90fb18650..5d8d52dd9 100644 --- a/services/datalad/datalad_service/tasks/publish.py +++ b/services/datalad/datalad_service/tasks/publish.py @@ -138,7 +138,6 @@ async def export_dataset( annex_drop, dataset_path=dataset_path, branch=new_tag, - remote=get_s3_remote(), ) # Call the pipeline (arguments for git_annex_fsck_remote) await pipeline.kiq(