Skip to content

Commit 36f623f

Browse files
authored
Merge pull request #3648 from OpenNeuroOrg/drop-pipelines
fix(worker): Avoid Pipelines for taskiq drops
2 parents b3a6aeb + 760d6fd commit 36f623f

File tree

7 files changed

+25
-64
lines changed

7 files changed

+25
-64
lines changed

services/datalad/datalad_service/broker/__init__.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
from taskiq import InMemoryBroker, SmartRetryMiddleware
44
from taskiq_redis import RedisAsyncResultBackend, RedisStreamBroker
5-
from taskiq_pipelines import PipelineMiddleware
65

76

87
from datalad_service import config
@@ -38,6 +37,5 @@
3837
use_delay_exponent=True,
3938
max_delay_exponent=120,
4039
),
41-
PipelineMiddleware(),
4240
)
4341
)

services/datalad/datalad_service/common/onchange.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from datalad_service.tasks.fsck import git_annex_fsck_local, git_annex_fsck_remote
1+
from datalad_service.tasks.fsck import git_annex_fsck_local
22

33

44
async def on_head(dataset_path):
Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,7 @@
11
import falcon
22

3-
from taskiq_pipelines import Pipeline
4-
5-
from datalad_service.broker import broker
63
from datalad_service.tasks.publish import (
74
create_remotes_and_export,
8-
set_s3_access_tag,
95
)
106

117

@@ -18,10 +14,6 @@ def __init__(self, store):
1814
async def on_post(self, req, resp, dataset):
1915
dataset_path = self.store.get_dataset_path(dataset)
2016
# Pipeline create and export -> set access tag to public
21-
await (
22-
Pipeline(broker, create_remotes_and_export)
23-
.call_after(set_s3_access_tag, dataset=dataset, value='public')
24-
.kiq(dataset_path) # create_remotes_and_export
25-
)
17+
await create_remotes_and_export.kiq(dataset_path, public=True)
2618
resp.media = {}
2719
resp.status = falcon.HTTP_OK

services/datalad/datalad_service/tasks/fsck.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ def git_annex_fsck_local(dataset_path):
5757

5858

5959
@broker.task
60-
def git_annex_fsck_remote(dataset_path, branch, remote='s3-PUBLIC'):
60+
async def git_annex_fsck_remote(dataset_path, branch, remote='s3-PUBLIC'):
6161
"""
6262
Run incremental fsck for one branch (tag) and remote.
6363

services/datalad/datalad_service/tasks/publish.py

Lines changed: 22 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
import pygit2
99
import boto3
1010
from github import Github
11-
from taskiq_pipelines import Pipeline
1211

1312
import datalad_service.common.s3
1413
import datalad_service.common.github
@@ -63,14 +62,16 @@ def s3_sibling(dataset_path):
6362

6463

6564
@broker.task
66-
async def create_remotes_and_export(dataset_path):
65+
async def create_remotes_and_export(dataset_path, public=False):
6766
"""
6867
Create public S3 and GitHub remotes and export to them.
6968
7069
Called by publish handler to make a dataset public initially.
7170
"""
7271
create_remotes(dataset_path)
7372
await export_dataset(dataset_path)
73+
if public:
74+
await set_s3_access_tag(os.path.basename(dataset_path), 'public')
7475

7576

7677
def create_remotes(dataset_path):
@@ -79,6 +80,17 @@ def create_remotes(dataset_path):
7980
github_sibling(dataset_path, dataset)
8081

8182

83+
async def fsck_and_drop(dataset_path, branch):
84+
# Check and clean local annexed files once export is complete
85+
fsck_success = await git_annex_fsck_remote(dataset_path, branch, get_s3_remote())
86+
if fsck_success:
87+
logger.info(f'{dataset_path} remote fsck passed for {branch}')
88+
await annex_drop(dataset_path, branch)
89+
logger.info(f'{dataset_path} drop complete')
90+
else:
91+
logger.error(f'{dataset_path} remote fsck failed for {branch}')
92+
93+
8294
@broker.task
8395
async def export_backup_and_drop(dataset_path):
8496
"""
@@ -90,18 +102,7 @@ async def export_backup_and_drop(dataset_path):
90102
if tags:
91103
await s3_backup_push(dataset_path)
92104
for tag in tags:
93-
# Check and clean local annexed files once export is complete
94-
pipeline = Pipeline(broker, git_annex_fsck_remote).call_next(
95-
annex_drop,
96-
dataset_path=dataset_path,
97-
branch=tag.name,
98-
)
99-
# Call the pipeline (arguments for git_annex_fsck_remote)
100-
await pipeline.kiq(
101-
dataset_path=dataset_path,
102-
branch=tag.name,
103-
remote=get_s3_remote(),
104-
)
105+
await fsck_and_drop(dataset_path, tag.name)
105106

106107

107108
@broker.task
@@ -137,17 +138,7 @@ async def export_dataset(
137138
# Drop cache once all exports are complete
138139
clear_dataset_cache(dataset_id)
139140
# Check and clean local annexed files once export is complete
140-
pipeline = Pipeline(broker, git_annex_fsck_remote).call_next(
141-
annex_drop,
142-
dataset_path=dataset_path,
143-
branch=new_tag,
144-
)
145-
# Call the pipeline (arguments for git_annex_fsck_remote)
146-
await pipeline.kiq(
147-
dataset_path,
148-
branch=new_tag, # Check the history from the new tag just exported
149-
remote=get_s3_remote(),
150-
)
141+
await fsck_and_drop(dataset_path, new_tag)
151142
else:
152143
# Clear the cache even if only sibling updates occurred
153144
clear_dataset_cache(dataset_id)
@@ -257,7 +248,7 @@ def monitor_remote_configs(dataset_path):
257248

258249

259250
@broker.task
260-
async def annex_drop(fsck_success, dataset_path, branch):
251+
async def annex_drop(dataset_path, branch):
261252
"""Drop local contents from the annex."""
262253
# Ensure numcopies is set to 2 before running drop
263254
await run_check(['git-annex', 'numcopies', '2'], dataset_path)
@@ -269,14 +260,11 @@ async def annex_drop(fsck_success, dataset_path, branch):
269260
# Not an issue if this fails
270261
pass
271262
# Drop will only drop successfully exported files present on both remotes
272-
if fsck_success:
273-
env = os.environ.copy()
274-
# Force git-annex to use cached credentials for this
275-
del env['AWS_ACCESS_KEY_ID']
276-
del env['AWS_SECRET_ACCESS_KEY']
277-
await run_check(
278-
['git-annex', 'drop', '--branch', branch], dataset_path, env=env
279-
)
263+
env = os.environ.copy()
264+
# Force git-annex to use cached credentials for this
265+
del env['AWS_ACCESS_KEY_ID']
266+
del env['AWS_SECRET_ACCESS_KEY']
267+
await run_check(['git-annex', 'drop', '--branch', branch], dataset_path, env=env)
280268

281269

282270
async def set_remote_public(dataset):

services/datalad/pyproject.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ dependencies = [
2020
"taskiq-redis>=1.0.9",
2121
"uvicorn[standard]>=0.34.3",
2222
"httpx>=0.28.1",
23-
"taskiq-pipelines>=0.1.4",
2423
]
2524
dynamic = ["version"]
2625

services/datalad/uv.lock

Lines changed: 0 additions & 16 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)