Skip to content

Commit ca93267

Browse files
authored
Merge pull request #3529 from OpenNeuroOrg/maintenance-job-scheduler
Add worker task schedules and maintenance tasks
2 parents 0e11871 + 681464f commit ca93267

File tree

8 files changed

+168
-1
lines changed

8 files changed

+168
-1
lines changed

docker-compose.yml

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,34 @@ services:
157157
- datalad-0
158158
- datalad-1
159159

160+
scheduler:
161+
platform: linux/amd64
162+
build:
163+
context: services/datalad
164+
platform: linux/amd64
165+
volumes:
166+
- ${PERSISTENT_DIR}/datalad:/datalad:z
167+
- ./services/datalad/datalad_service:/srv/datalad_service
168+
- ./datalad-key:/datalad-key
169+
env_file: ./config.env
170+
init: true
171+
command: [
172+
'taskiq',
173+
'scheduler',
174+
'datalad_service.broker.scheduler:scheduler',
175+
'--tasks-pattern',
176+
'datalad_service/tasks/*.py',
177+
'--fs-discover',
178+
]
179+
depends_on:
180+
redis:
181+
condition: service_started
182+
networks:
183+
default:
184+
aliases:
185+
- datalad-0
186+
- datalad-1
187+
160188
# nginx + app
161189
web:
162190
image: docker.io/library/nginx:1.16.1

helm/openneuro/Chart.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
apiVersion: v1
22
name: openneuro
3-
version: 1.5.0
3+
version: 1.6.0
44
description: OpenNeuro production deployment chart
55
home: https://openneuro.org
66
sources:
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
apiVersion: apps/v1
2+
kind: Deployment
3+
metadata:
4+
name: {{ .Release.Name }}-scheduler
5+
labels:
6+
app: {{ .Release.Name }}-scheduler
7+
chart: '{{ .Chart.Name }}-{{ .Chart.Version }}'
8+
release: '{{ .Release.Name }}'
9+
heritage: '{{ .Release.Service }}'
10+
spec:
11+
replicas: 1
12+
selector:
13+
matchLabels:
14+
app: {{ .Release.Name }}-scheduler
15+
template:
16+
metadata:
17+
labels:
18+
app: {{ .Release.Name }}-scheduler
19+
annotations:
20+
checksum/config: {{ include (print $.Template.BasePath "/configmap.yaml") . | sha256sum }}
21+
checksum/secret: {{ include (print $.Template.BasePath "/secret.yaml") . | sha256sum }}
22+
spec:
23+
containers:
24+
- name: {{ .Release.Name }}-scheduler
25+
image: 'openneuro/datalad-service:v{{ .Chart.AppVersion }}'
26+
command:
27+
resources:
28+
requests:
29+
cpu: {{ .Values.workerCpuRequests }}
30+
memory: {{ .Values.workerMemoryRequests }}
31+
envFrom:
32+
- configMapRef:
33+
name: {{ .Release.Name }}-configmap
34+
- secretRef:
35+
name: {{ .Release.Name }}-secret

services/datalad/datalad_service/app.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,9 @@ def before_send(event):
5353
release=f'openneuro-datalad-service@{datalad_service.version.get_version()}',
5454
server_name=socket.gethostname(),
5555
before_send=before_send,
56+
_experiments={
57+
'enable_logs': True,
58+
},
5659
)
5760

5861

services/datalad/datalad_service/broker/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from taskiq import InMemoryBroker
44
from taskiq_redis import RedisAsyncResultBackend, RedisStreamBroker
55

6+
67
from datalad_service import config
78
from datalad_service.broker.get_docker_scale import get_docker_scale
89

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
from taskiq import TaskiqScheduler
2+
from taskiq.schedule_sources import LabelScheduleSource
3+
from taskiq_redis import RedisScheduleSource
4+
5+
from datalad_service import config
6+
from datalad_service.broker import broker
7+
8+
redis_source = RedisScheduleSource(f'redis://{config.REDIS_HOST}:{config.REDIS_PORT}/0')
9+
label_source = LabelScheduleSource(broker)
10+
scheduler = TaskiqScheduler(broker, sources=[redis_source, label_source])
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
import os
2+
import logging
3+
import subprocess
4+
import random
5+
6+
7+
from datalad_service.config import DATALAD_DATASET_PATH
8+
from datalad_service.broker import broker
9+
10+
11+
def dataset_factory():
12+
"""
13+
Factory that yields dataset paths from the DATALAD_DATASET_PATH directory.
14+
It shuffles the list of datasets and yields them one by one.
15+
When all datasets have been yielded, it re-reads the directory and shuffles again.
16+
"""
17+
datasets = []
18+
while True:
19+
if not datasets:
20+
# Read the directory and shuffle if the list is empty
21+
try:
22+
datasets = [
23+
os.path.join(DATALAD_DATASET_PATH, d)
24+
for d in os.listdir(DATALAD_DATASET_PATH)
25+
if os.path.isdir(os.path.join(DATALAD_DATASET_PATH, d))
26+
and d.startswith('ds')
27+
]
28+
if not datasets:
29+
logging.warning(
30+
f'No datasets found in {DATALAD_DATASET_PATH} for maintenance tasks.'
31+
)
32+
return
33+
random.shuffle(datasets)
34+
except FileNotFoundError:
35+
logging.error(f'DATALAD_DATASET_PATH not found: {DATALAD_DATASET_PATH}')
36+
return
37+
yield datasets.pop()
38+
39+
40+
gc_dataset_generator = dataset_factory()
41+
fsck_dataset_generator = dataset_factory()
42+
43+
44+
@broker.task(schedule=[{'cron': '*/15 * * * *'}])
45+
def gc_dataset(dataset_path=None):
46+
"""Run git gc on a random dataset periodically."""
47+
try:
48+
if not dataset_path:
49+
dataset_path = next(gc_dataset_generator)
50+
except StopIteration:
51+
logging.info('No datasets available for git gc.')
52+
return
53+
54+
logging.info(f'Running git gc on dataset: {dataset_path}')
55+
56+
gc = subprocess.run(
57+
['git', 'gc', '--cruft', '--prune=1.day.ago'],
58+
cwd=dataset_path,
59+
capture_output=True,
60+
text=True,
61+
)
62+
if gc.returncode != 0:
63+
logging.error(f'`git gc` failed for `{dataset_path}`: {gc.stderr}')
64+
65+
66+
@broker.task(schedule=[{'cron': '7 * * * *'}])
67+
def git_fsck_dataset(dataset_path=None):
68+
"""Routinely verify git repository integrity and produce an error if any issues are reported."""
69+
try:
70+
if not dataset_path:
71+
dataset_path = next(fsck_dataset_generator)
72+
except StopIteration:
73+
logging.info('No datasets available for git fsck.')
74+
return
75+
git_fsck = subprocess.run(
76+
['git', 'fsck', '--full'], cwd=dataset_path, capture_output=True, text=True
77+
)
78+
if git_fsck.returncode != 0:
79+
logging.error(
80+
f'`git fsck --full` failed for `{dataset_path}`: {git_fsck.stderr}'
81+
)
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
from datalad_service.tasks.maintenance import gc_dataset, git_fsck_dataset
2+
3+
4+
def test_gc_dataset(new_dataset):
5+
gc_dataset(new_dataset.path)
6+
7+
8+
def test_git_fsck_dataset(new_dataset):
9+
git_fsck_dataset(new_dataset.path)

0 commit comments

Comments
 (0)