Skip to content

Commit 97a8e04

Browse files
authored
Merge pull request #3662 from OpenNeuroOrg/hotfix-fsck-async-block
fix: Avoid blocking taskiq worker on fsck tasks
2 parents 62dd6aa + 33aa1e9 commit 97a8e04

File tree

2 files changed

+27
-11
lines changed

2 files changed

+27
-11
lines changed

services/datalad/datalad_service/tasks/fsck.py

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
import json
2-
import io
2+
import asyncio
33
import logging
4-
import subprocess
54

65
import pygit2
76

@@ -25,7 +24,7 @@ def get_head_commit_and_references(repo):
2524

2625

2726
@broker.task
28-
def git_annex_fsck_local(dataset_path):
27+
async def git_annex_fsck_local(dataset_path):
2928
"""Run git-annex fsck for local annexed objects in the draft. Runs on commits, verifies checksums."""
3029
try:
3130
commit, references = get_head_commit_and_references(
@@ -34,24 +33,25 @@ def git_annex_fsck_local(dataset_path):
3433
except pygit2.GitError:
3534
logging.error(f'Could not open git repository for {dataset_path}')
3635
return
37-
annex_command = (
36+
annex_command = [
3837
'git-annex',
3938
'fsck',
4039
'-J4',
4140
'--json',
4241
'--json-error-messages',
4342
'--incremental-schedule=45d',
44-
)
45-
annex_process = subprocess.Popen(
46-
annex_command, cwd=dataset_path, stdout=subprocess.PIPE
43+
]
44+
annex_process = await asyncio.create_subprocess_exec(
45+
*annex_command, cwd=dataset_path, stdout=asyncio.subprocess.PIPE
4746
)
4847
bad_files = []
49-
for annexed_file_json in io.TextIOWrapper(annex_process.stdout, encoding='utf-8'):
48+
async for annexed_file_json in annex_process.stdout:
5049
annexed_file = json.loads(annexed_file_json)
5150
if not annexed_file['success']:
5251
# Rename for GraphQL consistency
5352
annexed_file['errorMessages'] = annexed_file.pop('error-messages')
5453
bad_files.append(annexed_file)
54+
await annex_process.wait()
5555
if len(bad_files) > 0:
5656
logging.error(f'missing or corrupt annexed objects found in {dataset_path}')
5757
update_file_check(dataset_path, commit, references, bad_files)
@@ -81,14 +81,15 @@ async def git_annex_fsck_remote(dataset_path, branches, remote='s3-PUBLIC'):
8181
'--incremental-schedule=365d',
8282
]
8383
annex_command += [f'--branch={branch}' for branch in branches]
84-
annex_process = subprocess.Popen(
85-
annex_command, cwd=dataset_path, stdout=subprocess.PIPE
84+
annex_process = await asyncio.create_subprocess_exec(
85+
*annex_command, cwd=dataset_path, stdout=asyncio.subprocess.PIPE
8686
)
8787
bad_files = []
88-
for annexed_file_json in io.TextIOWrapper(annex_process.stdout, encoding='utf-8'):
88+
async for annexed_file_json in annex_process.stdout:
8989
annexed_file = json.loads(annexed_file_json)
9090
if not annexed_file['success']:
9191
bad_files.append(annexed_file)
92+
await annex_process.wait()
9293
update_file_check(dataset_path, commit, references, bad_files, remote)
9394
if len(bad_files) > 0:
9495
logging.error(

services/datalad/tests/conftest.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -264,3 +264,18 @@ def _mock(dataset_id):
264264
return True
265265

266266
monkeypatch.setattr('datalad_service.tasks.publish.is_public_dataset', _mock)
267+
268+
269+
@pytest.fixture(autouse=True)
270+
def mock_git_annex_fsck_local(monkeypatch):
271+
"""
272+
Auto-used fixture to mock git_annex_fsck_local.kiq for all tests.
273+
274+
This prevents tests from hanging due to event loop issues with the
275+
async fsck task and avoids running slow fsck operations during tests.
276+
"""
277+
mock_kiq = mock.AsyncMock()
278+
# The kiq method is what's called to enqueue the task.
279+
# We mock it on the task object itself.
280+
monkeypatch.setattr('datalad_service.tasks.fsck.git_annex_fsck_local.kiq', mock_kiq)
281+
yield mock_kiq

0 commit comments

Comments
 (0)