Skip to content

Commit 70ff760

Browse files
authored
Merge pull request #3525 from OpenNeuroOrg/taskiq-queue
Add Redis queue using taskiq to worker
2 parents dc4a1d6 + 8b7fb13 commit 70ff760

File tree

13 files changed

+357
-42
lines changed

13 files changed

+357
-42
lines changed

docker-compose.yml

Lines changed: 50 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
# docker compose versions
2-
version: "2.3"
2+
version: '2.3'
33

44
services:
55
# This dummy service provides shared configuration for all Node deps
@@ -30,7 +30,7 @@ services:
3030
retries: 10
3131
ports:
3232
# HMR port
33-
- "9992:9992"
33+
- '9992:9992'
3434
environment:
3535
- NODE_ENV=development
3636
depends_on:
@@ -76,7 +76,7 @@ services:
7676
# mongodb
7777
mongo:
7878
image: docker.io/library/mongo:8
79-
command: ["--replSet", "rs0", "--bind_ip_all", "--port", "27017"]
79+
command: ['--replSet', 'rs0', '--bind_ip_all', '--port', '27017']
8080
ports:
8181
- 27017:27017
8282
healthcheck:
@@ -105,35 +105,62 @@ services:
105105
init: true
106106
command:
107107
[
108-
"uvicorn",
109-
"--host",
110-
"0.0.0.0",
111-
"--port",
112-
"80",
113-
"--reload",
114-
"--factory",
115-
"datalad_service.app:create_app",
116-
"--workers",
117-
"8",
118-
"--timeout-keep-alive",
119-
"30",
120-
"--log-level",
121-
"debug",
108+
'uvicorn',
109+
'--host',
110+
'0.0.0.0',
111+
'--port',
112+
'80',
113+
'--reload',
114+
'--factory',
115+
'datalad_service.app:create_app',
116+
'--workers',
117+
'8',
118+
'--timeout-keep-alive',
119+
'30',
120+
'--log-level',
121+
'debug',
122122
]
123123
networks:
124124
default:
125125
aliases:
126126
- datalad-0
127127
- datalad-1
128128

129+
taskiq:
130+
build:
131+
context: services/datalad
132+
volumes:
133+
- ${PERSISTENT_DIR}/datalad:/datalad:z
134+
- ./services/datalad/datalad_service:/srv/datalad_service
135+
- ./datalad-key:/datalad-key
136+
env_file: ./config.env
137+
init: true
138+
command: [
139+
'taskiq',
140+
'worker',
141+
'datalad_service.broker:broker',
142+
'--tasks-pattern',
143+
'datalad_service/tasks/*.py',
144+
'--fs-discover',
145+
'--reload'
146+
]
147+
depends_on:
148+
redis:
149+
condition: service_started
150+
networks:
151+
default:
152+
aliases:
153+
- datalad-0
154+
- datalad-1
155+
129156
# nginx + app
130157
web:
131158
image: docker.io/library/nginx:1.16.1
132159
volumes:
133160
- ./nginx/nginx.dev.conf:/etc/nginx/conf.d/default.conf:ro
134161
ports:
135-
- "8110:8110"
136-
- "9876:80"
162+
- '8110:8110'
163+
- '9876:80'
137164
depends_on:
138165
server:
139166
condition: service_healthy
@@ -147,7 +174,7 @@ services:
147174
platform: ${ES_PLATFORM}
148175
environment:
149176
discovery.type: single-node
150-
cluster.routing.allocation.disk.threshold_enabled: "true"
177+
cluster.routing.allocation.disk.threshold_enabled: 'true'
151178
cluster.routing.allocation.disk.watermark.flood_stage: 1gb
152179
cluster.routing.allocation.disk.watermark.low: 10gb
153180
cluster.routing.allocation.disk.watermark.high: 5gb
@@ -158,10 +185,10 @@ services:
158185
security_opt:
159186
- seccomp=${SECOMP}
160187
healthcheck:
161-
test: "curl -s -f http://localhost:9200 || exit 1"
188+
test: 'curl -s -f http://localhost:9200 || exit 1'
162189
interval: 10s
163190
timeout: 5s
164191
retries: 3
165192
ports:
166-
- "9200:9200"
167-
- "9300:9300"
193+
- '9200:9200'
194+
- '9300:9300'

helm/openneuro/Chart.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
apiVersion: v1
22
name: openneuro
3-
version: 1.4.0
3+
version: 1.5.0
44
description: OpenNeuro production deployment chart
55
home: https://openneuro.org
66
sources:

helm/openneuro/templates/dataset-worker-stateful-set.yaml

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,4 +72,22 @@ spec:
7272
periodSeconds: 30
7373
httpGet:
7474
path: '/heartbeat'
75-
port: 80
75+
port: 80
76+
- name: {{ .Release.Name }}-dataset-taskiq-worker
77+
image: 'openneuro/datalad-service:v{{ .Chart.AppVersion }}'
78+
command: ['taskiq', 'worker', 'datalad_service.broker:broker', '--tasks-pattern', 'datalad_service/tasks/*.py', '--fs-discover']
79+
resources:
80+
requests:
81+
cpu: {{ .Values.workerCpuRequests }}
82+
memory: {{ .Values.workerMemoryRequests }}
83+
envFrom:
84+
- configMapRef:
85+
name: {{ .Release.Name }}-configmap
86+
- secretRef:
87+
name: {{ .Release.Name }}-secret
88+
volumeMounts:
89+
- name: datasets
90+
mountPath: /datasets
91+
- name: ssh-key
92+
mountPath: /datalad-key
93+
subPath: datalad-key
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
import sys
2+
3+
from taskiq import InMemoryBroker
4+
from taskiq_redis import RedisAsyncResultBackend, RedisStreamBroker
5+
6+
from datalad_service import config
7+
from datalad_service.broker.get_docker_scale import get_docker_scale
8+
9+
# Use InMemoryBroker to run during pytest
10+
if 'pytest' in sys.modules:
11+
broker = InMemoryBroker()
12+
else:
13+
redis_url = f'redis://{config.REDIS_HOST}:{config.REDIS_PORT}/{get_docker_scale()}'
14+
result_backend = RedisAsyncResultBackend(
15+
redis_url=redis_url,
16+
result_ex_time=5000,
17+
)
18+
broker = RedisStreamBroker(
19+
url=redis_url,
20+
).with_result_backend(result_backend)
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
#!/usr/bin/env python
2+
"""Helper script to get the docker-compose offset from within a container."""
3+
4+
import re
5+
import socket
6+
import dns.resolver
7+
import dns.reversename
8+
9+
10+
def get_docker_scale():
11+
hostname = socket.gethostname()
12+
matches = re.match('.*?-([0-9]+)$', hostname)
13+
if matches:
14+
# Kubernetes
15+
return int(matches[1])
16+
else:
17+
# docker-compose
18+
ip_addr = socket.gethostbyname(hostname)
19+
answer = dns.resolver.resolve(dns.reversename.from_address(ip_addr), 'PTR')
20+
subdomain = str(answer[0]).split('.')[0]
21+
# Subtract 1 to make this zero indexed (matching k8s)
22+
docker_compose_offset = int(subdomain.split('_')[-1]) - 1
23+
# Printed so this can be referenced as a script
24+
return docker_compose_offset

services/datalad/datalad_service/config.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,3 +25,7 @@
2525

2626
# Site URL
2727
CRN_SERVER_URL = os.getenv('CRN_SERVER_URL')
28+
29+
# Redit connection for task queue
30+
REDIS_HOST = os.getenv('REDIS_HOST')
31+
REDIS_PORT = os.getenv('REDIS_PORT')

services/datalad/datalad_service/handlers/validation.py

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import asyncio
2+
import logging
23

34
import falcon
45

@@ -18,13 +19,14 @@ async def on_post(self, req, resp, dataset, hexsha):
1819
try:
1920
dataset_path = self.store.get_dataset_path(dataset)
2021
# Run the validator but don't block on the request
21-
asyncio.create_task(
22-
validate_dataset(
23-
dataset, dataset_path, hexsha, req.cookies, user=name
24-
)
22+
await validate_dataset.kiq(
23+
dataset, dataset_path, hexsha, req.cookies, user=name
2524
)
2625
resp.status = falcon.HTTP_OK
27-
except:
26+
except Exception:
27+
logging.exception(
28+
'Validation task enqueue failed for dataset %s', dataset
29+
)
2830
resp.status = falcon.HTTP_INTERNAL_SERVER_ERROR
2931
else:
3032
resp.media = {'error': 'Missing or malformed dataset parameter in request.'}

services/datalad/datalad_service/tasks/files.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ def commit_files(store, dataset, files, name=None, email=None, cookies=None):
3737
)
3838
ref = git_commit(repo, files, author)
3939
# Run the validator but don't block on the request
40-
asyncio.create_task(validate_dataset(dataset, dataset_path, str(ref), cookies))
40+
asyncio.create_task(validate_dataset.kiq(dataset, dataset_path, str(ref), cookies))
4141
return ref
4242

4343

services/datalad/datalad_service/tasks/validator.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import requests
88

99
from datalad_service.config import GRAPHQL_ENDPOINT
10+
from datalad_service.broker import broker
1011

1112
logger = logging.getLogger('datalad_service.' + __name__)
1213

@@ -106,6 +107,7 @@ def issues_mutation(dataset_id, ref, issues, validator_metadata):
106107
}
107108

108109

110+
@broker.task
109111
async def validate_dataset(dataset_id, dataset_path, ref, cookies=None, user=''):
110112
# New schema validator second in case of issues
111113
validator_output_deno = await validate_dataset_deno_call(dataset_path, ref)
Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,22 @@
11
#!/usr/bin/env python
22
"""Helper script to get the docker-compose offset from within a container."""
3+
34
import re
45
import socket
56
import dns.resolver
67
import dns.reversename
78

89
hostname = socket.gethostname()
9-
matches = re.match(".*?-([0-9]+)$", hostname)
10+
matches = re.match('.*?-([0-9]+)$', hostname)
1011
if matches:
11-
# Kubernetes
12-
print(matches[1])
12+
# Kubernetes
13+
print(matches[1])
1314
else:
14-
# docker-compose
15-
ip_addr = socket.gethostbyname(hostname)
16-
answer = dns.resolver.query(dns.reversename.from_address(ip_addr), "PTR")
17-
subdomain = str(answer[0]).split('.')[0]
18-
# Subtract 1 to make this zero indexed (matching k8s)
19-
docker_compose_offset = int(subdomain.split('_')[-1]) - 1
20-
# Printed so this can be referenced as a script
21-
print(docker_compose_offset)
15+
# docker-compose
16+
ip_addr = socket.gethostbyname(hostname)
17+
answer = dns.resolver.resolve(dns.reversename.from_address(ip_addr), 'PTR')
18+
subdomain = str(answer[0]).split('.')[0]
19+
# Subtract 1 to make this zero indexed (matching k8s)
20+
docker_compose_offset = int(subdomain.split('_')[-1]) - 1
21+
# Printed so this can be referenced as a script
22+
print(docker_compose_offset)

0 commit comments

Comments
 (0)