Skip to content

Commit 0b5ab4b

Browse files
committed
add job submission/job reading endpoints
1 parent e38abea commit 0b5ab4b

File tree

8 files changed

+697
-2
lines changed

8 files changed

+697
-2
lines changed

compose/.env.example

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,5 +13,9 @@ V_HOST=localhost
1313
CELERY_BROKER_URL=redis://compose_redis:6379/0
1414
CELERY_RESULT_BACKEND=redis://compose_redis:6379/0
1515
NEUROVAULT_ACCESS_TOKEN=EXAMPLE_TOKEN
16+
COMPOSE_RUNNER_SUBMIT_URL=https://rruanembp2jcccmeeamv6xgl4q0kuqsp.lambda-url.us-east-1.on.aws/
17+
COMPOSE_RUNNER_STATUS_URL=https://asoqdxtcjngou6odcnvoaqtf7e0rldhf.lambda-url.us-east-1.on.aws/
18+
COMPOSE_RUNNER_LOGS_URL=https://oniyt5po2iorelr326zszptwcy0xbjzz.lambda-url.us-east-1.on.aws/
19+
COMPOSE_RUNNER_ARTIFACTS_URL=https://23llzhs3nz6o4e47ycofhluqqa0zjyko.lambda-url.us-east-1.on.aws/
1620
CELERY_LOG_DIR=/tmp
1721
DEBUG=True

compose/backend/neurosynth_compose/resources/__init__.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,10 @@
1515
)
1616

1717
from .users import UsersView
18+
from .meta_analysis_jobs import (
19+
MetaAnalysisJobsResource,
20+
MetaAnalysisJobResource,
21+
)
1822

1923
__all__ = [
2024
"ConditionsResource",
@@ -31,4 +35,6 @@
3135
"UsersView",
3236
"NeurostoreStudiesView",
3337
"ProjectsView",
38+
"MetaAnalysisJobsResource",
39+
"MetaAnalysisJobResource",
3440
]
Lines changed: 297 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,297 @@
1+
import json
2+
import logging
3+
from datetime import datetime, timezone
4+
from typing import Optional
5+
6+
import requests
7+
from flask import abort, current_app, request
8+
from flask.views import MethodView
9+
from marshmallow import ValidationError
10+
from redis import Redis
11+
from sqlalchemy import select
12+
13+
from ..database import db
14+
from ..models import MetaAnalysis
15+
from ..schemas import MetaAnalysisJobRequestSchema
16+
from .analysis import _make_json_response, get_current_user
17+
18+
logger = logging.getLogger(__name__)
19+
20+
JOB_CACHE_PREFIX = "compose:jobs"
21+
JOB_CACHE_TTL_SECONDS = 60 * 60 * 24 * 3 # 3 days
22+
23+
_job_store_client: Optional[Redis] = None
24+
25+
26+
class JobStoreError(RuntimeError):
27+
"""Raised when the job cache cannot be accessed."""
28+
29+
30+
class ComposeRunnerError(RuntimeError):
31+
"""Raised when the external compose runner call fails."""
32+
33+
34+
def get_job_store() -> Redis:
35+
"""Return a Redis client configured from the Celery result backend."""
36+
global _job_store_client
37+
if _job_store_client is not None:
38+
return _job_store_client
39+
40+
redis_url = current_app.config.get("CELERY_RESULT_BACKEND")
41+
if not redis_url:
42+
raise JobStoreError("CELERY_RESULT_BACKEND is not configured.")
43+
44+
try:
45+
client = Redis.from_url(redis_url)
46+
client.ping()
47+
except Exception as exc: # noqa: BLE001
48+
raise JobStoreError("unable to reach redis job store") from exc
49+
50+
_job_store_client = client
51+
return client
52+
53+
54+
def _job_cache_key(job_id: str) -> str:
55+
return f"{JOB_CACHE_PREFIX}:{job_id}"
56+
57+
58+
def _store_job(job_id: str, payload: dict) -> None:
59+
try:
60+
client = get_job_store()
61+
client.setex(_job_cache_key(job_id), JOB_CACHE_TTL_SECONDS, json.dumps(payload))
62+
except JobStoreError:
63+
raise
64+
except Exception as exc: # noqa: BLE001
65+
raise JobStoreError("failed to cache job state") from exc
66+
67+
68+
def _load_job(job_id: str) -> Optional[dict]:
69+
try:
70+
client = get_job_store()
71+
cached = client.get(_job_cache_key(job_id))
72+
except JobStoreError:
73+
raise
74+
except Exception as exc: # noqa: BLE001
75+
raise JobStoreError("failed to read job state") from exc
76+
77+
if not cached:
78+
return None
79+
if isinstance(cached, bytes):
80+
cached = cached.decode("utf-8")
81+
return json.loads(cached)
82+
83+
84+
def call_lambda(url: Optional[str], payload: dict) -> dict:
85+
"""Call an external AWS Lambda-style HTTPS endpoint."""
86+
if not url:
87+
raise ComposeRunnerError("compose runner url is not configured")
88+
try:
89+
response = requests.post(url, json=payload, timeout=30)
90+
except requests.RequestException as exc: # noqa: PERF203
91+
raise ComposeRunnerError("compose runner request failed") from exc
92+
93+
if response.status_code >= 400:
94+
raise ComposeRunnerError(f"compose runner returned HTTP {response.status_code}")
95+
96+
try:
97+
return response.json()
98+
except ValueError as exc: # noqa: BLE001
99+
raise ComposeRunnerError("invalid compose runner response") from exc
100+
101+
102+
def _abort_with_runner_error(exc: Exception) -> None:
103+
logger.exception("Compose runner call failed", exc_info=exc)
104+
abort(502, description="compose runner unavailable")
105+
106+
107+
def _abort_with_job_store_error(exc: Exception) -> None:
108+
logger.exception("Job store error", exc_info=exc)
109+
abort(503, description="job store unavailable")
110+
111+
112+
def _ensure_authenticated_user():
113+
user = get_current_user()
114+
if not user:
115+
abort(401, description="authentication required")
116+
return user
117+
118+
119+
def submit_job():
120+
schema = MetaAnalysisJobRequestSchema()
121+
try:
122+
request_data = request.get_json(force=True)
123+
except Exception: # noqa: BLE001
124+
abort(400, description="invalid JSON payload")
125+
126+
try:
127+
data = schema.load(request_data or {})
128+
except ValidationError as exc:
129+
abort(422, description=f"input does not conform to specification: {exc}")
130+
131+
current_user = _ensure_authenticated_user()
132+
133+
meta_analysis = (
134+
db.session.execute(
135+
select(MetaAnalysis).where(MetaAnalysis.id == data["meta_analysis_id"])
136+
)
137+
.scalars()
138+
.first()
139+
)
140+
if meta_analysis is None:
141+
abort(404, description="meta-analysis not found")
142+
143+
if meta_analysis.user_id != current_user.external_id:
144+
abort(
145+
403,
146+
description="user is not authorized to submit jobs for this meta-analysis",
147+
)
148+
149+
submit_url = current_app.config.get("COMPOSE_RUNNER_SUBMIT_URL")
150+
environment = current_app.config.get("ENV", "production")
151+
152+
submission_payload = {
153+
"meta_analysis_id": meta_analysis.id,
154+
"environment": environment,
155+
"no_upload": data.get("no_upload", False),
156+
}
157+
158+
try:
159+
submission_response = call_lambda(submit_url, submission_payload)
160+
except (ComposeRunnerError, Exception) as exc: # noqa: BLE001
161+
_abort_with_runner_error(exc)
162+
163+
job_id = submission_response.get("job_id")
164+
artifact_prefix = submission_response.get("artifact_prefix")
165+
status = submission_response.get("status", "SUBMITTED")
166+
167+
if not job_id:
168+
abort(502, description="compose runner returned an invalid response")
169+
170+
now = datetime.now(timezone.utc).isoformat()
171+
status_url = f"/meta-analysis-jobs/{job_id}"
172+
cached_payload = {
173+
"job_id": job_id,
174+
"meta_analysis_id": meta_analysis.id,
175+
"artifact_prefix": artifact_prefix,
176+
"status": status,
177+
"environment": environment,
178+
"no_upload": data.get("no_upload", False),
179+
"user_id": current_user.external_id,
180+
"status_url": status_url,
181+
"created_at": now,
182+
"updated_at": now,
183+
"output": submission_response.get("output"),
184+
"start_time": submission_response.get("start_time"),
185+
"logs": submission_response.get("logs", []),
186+
}
187+
188+
try:
189+
_store_job(job_id, cached_payload)
190+
except JobStoreError as exc:
191+
_abort_with_job_store_error(exc)
192+
193+
response_payload = cached_payload.copy()
194+
response_payload["status"] = status
195+
196+
return _make_json_response(response_payload, status=202)
197+
198+
199+
def get_job_status(job_id: str):
200+
try:
201+
cached_job = _load_job(job_id)
202+
except JobStoreError as exc:
203+
_abort_with_job_store_error(exc)
204+
205+
if cached_job is None:
206+
abort(404, description="job not found")
207+
208+
status_url = current_app.config.get("COMPOSE_RUNNER_STATUS_URL")
209+
logs_url = current_app.config.get("COMPOSE_RUNNER_LOGS_URL")
210+
211+
try:
212+
status_response = call_lambda(status_url, {"job_id": job_id})
213+
logs_payload = {"events": []}
214+
if cached_job.get("artifact_prefix"):
215+
logs_payload = call_lambda(
216+
logs_url, {"artifact_prefix": cached_job["artifact_prefix"]}
217+
)
218+
except (ComposeRunnerError, Exception) as exc: # noqa: BLE001
219+
_abort_with_runner_error(exc)
220+
221+
now = datetime.now(timezone.utc).isoformat()
222+
cached_job.update(
223+
{
224+
"status": status_response.get("status", cached_job.get("status")),
225+
"artifact_prefix": status_response.get(
226+
"artifact_prefix", cached_job.get("artifact_prefix")
227+
),
228+
"start_time": status_response.get("start_time"),
229+
"output": status_response.get("output"),
230+
"updated_at": now,
231+
}
232+
)
233+
cached_job["logs"] = [
234+
{
235+
"timestamp": event.get("timestamp"),
236+
"message": event.get("message"),
237+
}
238+
for event in logs_payload.get("events", []) or []
239+
]
240+
241+
try:
242+
_store_job(job_id, cached_job)
243+
except JobStoreError as exc:
244+
_abort_with_job_store_error(exc)
245+
246+
return _make_json_response(cached_job)
247+
248+
249+
def list_jobs():
250+
current_user = _ensure_authenticated_user()
251+
try:
252+
client = get_job_store()
253+
keys = list(client.scan_iter(f"{JOB_CACHE_PREFIX}:*"))
254+
except JobStoreError as exc:
255+
_abort_with_job_store_error(exc)
256+
except Exception as exc: # noqa: BLE001
257+
error = JobStoreError("failed to list jobs")
258+
error.__cause__ = exc
259+
_abort_with_job_store_error(error)
260+
261+
jobs = []
262+
for key in keys:
263+
if isinstance(key, bytes):
264+
key = key.decode("utf-8")
265+
job_id = key[len(JOB_CACHE_PREFIX) + 1:]
266+
try:
267+
job = _load_job(job_id)
268+
except JobStoreError as exc:
269+
_abort_with_job_store_error(exc)
270+
if not job:
271+
continue
272+
if job.get("user_id") != current_user.external_id:
273+
continue
274+
jobs.append(job)
275+
276+
jobs.sort(
277+
key=lambda job: job.get("updated_at") or job.get("created_at") or "",
278+
reverse=True,
279+
)
280+
payload = {"results": jobs, "metadata": {"count": len(jobs)}}
281+
return _make_json_response(payload)
282+
283+
284+
class MetaAnalysisJobsResource(MethodView):
285+
@classmethod
286+
def post(cls):
287+
return submit_job()
288+
289+
@classmethod
290+
def get(cls):
291+
return list_jobs()
292+
293+
294+
class MetaAnalysisJobResource(MethodView):
295+
@classmethod
296+
def get(cls, job_id: str):
297+
return get_job_status(job_id)

compose/backend/neurosynth_compose/schemas/__init__.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,9 @@
1414
ProjectSchema,
1515
ResultInitSchema,
1616
ResultUploadSchema,
17+
MetaAnalysisJobRequestSchema,
18+
MetaAnalysisJobResponseSchema,
19+
MetaAnalysisJobLogSchema,
1720
)
1821
from .users import UserSchema
1922

@@ -35,4 +38,7 @@
3538
"NeurostoreStudySchema",
3639
"ProjectSchema",
3740
"UserSchema",
41+
"MetaAnalysisJobRequestSchema",
42+
"MetaAnalysisJobResponseSchema",
43+
"MetaAnalysisJobLogSchema",
3844
]

compose/backend/neurosynth_compose/schemas/analysis.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -399,6 +399,31 @@ def create_neurovault_url(self, data, **kwargs):
399399
return data
400400

401401

402+
class MetaAnalysisJobRequestSchema(Schema):
403+
meta_analysis_id = fields.String(required=True)
404+
no_upload = fields.Boolean(load_default=False)
405+
406+
407+
class MetaAnalysisJobLogSchema(Schema):
408+
timestamp = fields.Float()
409+
message = fields.String()
410+
411+
412+
class MetaAnalysisJobResponseSchema(Schema):
413+
job_id = fields.String()
414+
meta_analysis_id = fields.String()
415+
artifact_prefix = fields.String(allow_none=True)
416+
status = fields.String()
417+
status_url = fields.String(allow_none=True)
418+
environment = fields.String()
419+
no_upload = fields.Boolean()
420+
start_time = fields.String(allow_none=True)
421+
output = fields.Dict(allow_none=True)
422+
logs = fields.List(fields.Nested(MetaAnalysisJobLogSchema), allow_none=True)
423+
created_at = fields.String(allow_none=True)
424+
updated_at = fields.String(allow_none=True)
425+
426+
402427
class NeurovaultCollectionSchema(BaseSchema):
403428
collection_id = fields.String()
404429
url = fields.String(dump_only=True)

0 commit comments

Comments
 (0)