Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions compose/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,9 @@ V_HOST=localhost
CELERY_BROKER_URL=redis://compose_redis:6379/0
CELERY_RESULT_BACKEND=redis://compose_redis:6379/0
NEUROVAULT_ACCESS_TOKEN=EXAMPLE_TOKEN
COMPOSE_RUNNER_SUBMIT_URL=https://rruanembp2jcccmeeamv6xgl4q0kuqsp.lambda-url.us-east-1.on.aws/
COMPOSE_RUNNER_STATUS_URL=https://asoqdxtcjngou6odcnvoaqtf7e0rldhf.lambda-url.us-east-1.on.aws/
COMPOSE_RUNNER_LOGS_URL=https://oniyt5po2iorelr326zszptwcy0xbjzz.lambda-url.us-east-1.on.aws/
COMPOSE_RUNNER_ARTIFACTS_URL=https://23llzhs3nz6o4e47ycofhluqqa0zjyko.lambda-url.us-east-1.on.aws/
CELERY_LOG_DIR=/tmp
DEBUG=True
2 changes: 1 addition & 1 deletion compose/backend/neurosynth_compose/openapi
6 changes: 6 additions & 0 deletions compose/backend/neurosynth_compose/resources/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@
)

from .users import UsersView
from .meta_analysis_jobs import (
MetaAnalysisJobsResource,
MetaAnalysisJobResource,
)

__all__ = [
"ConditionsResource",
Expand All @@ -31,4 +35,6 @@
"UsersView",
"NeurostoreStudiesView",
"ProjectsView",
"MetaAnalysisJobsResource",
"MetaAnalysisJobResource",
]
297 changes: 297 additions & 0 deletions compose/backend/neurosynth_compose/resources/meta_analysis_jobs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,297 @@
import json
import logging
from datetime import datetime, timezone
from typing import Optional

import requests
from flask import abort, current_app, request
from flask.views import MethodView
from marshmallow import ValidationError
from redis import Redis
from sqlalchemy import select

from ..database import db
from ..models import MetaAnalysis
from ..schemas import MetaAnalysisJobRequestSchema
from .analysis import _make_json_response, get_current_user

logger = logging.getLogger(__name__)

JOB_CACHE_PREFIX = "compose:jobs"
JOB_CACHE_TTL_SECONDS = 60 * 60 * 24 * 3 # 3 days

_job_store_client: Optional[Redis] = None


class JobStoreError(RuntimeError):
"""Raised when the job cache cannot be accessed."""


class ComposeRunnerError(RuntimeError):
"""Raised when the external compose runner call fails."""


def get_job_store() -> Redis:
"""Return a Redis client configured from the Celery result backend."""
global _job_store_client
if _job_store_client is not None:
return _job_store_client

redis_url = current_app.config.get("CELERY_RESULT_BACKEND")
if not redis_url:
raise JobStoreError("CELERY_RESULT_BACKEND is not configured.")

try:
client = Redis.from_url(redis_url)
client.ping()
except Exception as exc: # noqa: BLE001
raise JobStoreError("unable to reach redis job store") from exc

_job_store_client = client
return client


def _job_cache_key(job_id: str) -> str:
return f"{JOB_CACHE_PREFIX}:{job_id}"


def _store_job(job_id: str, payload: dict) -> None:
try:
client = get_job_store()
client.setex(_job_cache_key(job_id), JOB_CACHE_TTL_SECONDS, json.dumps(payload))
except JobStoreError:
raise
except Exception as exc: # noqa: BLE001
raise JobStoreError("failed to cache job state") from exc


def _load_job(job_id: str) -> Optional[dict]:
try:
client = get_job_store()
cached = client.get(_job_cache_key(job_id))
except JobStoreError:
raise
except Exception as exc: # noqa: BLE001
raise JobStoreError("failed to read job state") from exc

if not cached:
return None
if isinstance(cached, bytes):
cached = cached.decode("utf-8")
return json.loads(cached)


def call_lambda(url: Optional[str], payload: dict) -> dict:
"""Call an external AWS Lambda-style HTTPS endpoint."""
if not url:
raise ComposeRunnerError("compose runner url is not configured")
try:
response = requests.post(url, json=payload, timeout=30)
except requests.RequestException as exc: # noqa: PERF203
raise ComposeRunnerError("compose runner request failed") from exc

if response.status_code >= 400:
raise ComposeRunnerError(f"compose runner returned HTTP {response.status_code}")

try:
return response.json()
except ValueError as exc: # noqa: BLE001
raise ComposeRunnerError("invalid compose runner response") from exc


def _abort_with_runner_error(exc: Exception) -> None:
logger.exception("Compose runner call failed", exc_info=exc)
abort(502, description="compose runner unavailable")


def _abort_with_job_store_error(exc: Exception) -> None:
logger.exception("Job store error", exc_info=exc)
abort(503, description="job store unavailable")


def _ensure_authenticated_user():
user = get_current_user()
if not user:
abort(401, description="authentication required")
return user


def submit_job():
schema = MetaAnalysisJobRequestSchema()
try:
request_data = request.get_json(force=True)
except Exception: # noqa: BLE001
abort(400, description="invalid JSON payload")

try:
data = schema.load(request_data or {})
except ValidationError as exc:
abort(422, description=f"input does not conform to specification: {exc}")

current_user = _ensure_authenticated_user()

meta_analysis = (
db.session.execute(
select(MetaAnalysis).where(MetaAnalysis.id == data["meta_analysis_id"])
)
.scalars()
.first()
)
if meta_analysis is None:
abort(404, description="meta-analysis not found")

if meta_analysis.user_id != current_user.external_id:
abort(
403,
description="user is not authorized to submit jobs for this meta-analysis",
)

submit_url = current_app.config.get("COMPOSE_RUNNER_SUBMIT_URL")
environment = current_app.config.get("ENV", "production")

submission_payload = {
"meta_analysis_id": meta_analysis.id,
"environment": environment,
"no_upload": data.get("no_upload", False),
}

try:
submission_response = call_lambda(submit_url, submission_payload)
except (ComposeRunnerError, Exception) as exc: # noqa: BLE001
_abort_with_runner_error(exc)

job_id = submission_response.get("job_id")
artifact_prefix = submission_response.get("artifact_prefix")
status = submission_response.get("status", "SUBMITTED")

if not job_id:
abort(502, description="compose runner returned an invalid response")

now = datetime.now(timezone.utc).isoformat()
status_url = f"/meta-analysis-jobs/{job_id}"
cached_payload = {
"job_id": job_id,
"meta_analysis_id": meta_analysis.id,
"artifact_prefix": artifact_prefix,
"status": status,
"environment": environment,
"no_upload": data.get("no_upload", False),
"user_id": current_user.external_id,
"status_url": status_url,
"created_at": now,
"updated_at": now,
"output": submission_response.get("output"),
"start_time": submission_response.get("start_time"),
"logs": submission_response.get("logs", []),
}

try:
_store_job(job_id, cached_payload)
except JobStoreError as exc:
_abort_with_job_store_error(exc)

response_payload = cached_payload.copy()
response_payload["status"] = status

return _make_json_response(response_payload, status=202)


def get_job_status(job_id: str):
try:
cached_job = _load_job(job_id)
except JobStoreError as exc:
_abort_with_job_store_error(exc)

if cached_job is None:
abort(404, description="job not found")
Comment on lines +199 to +206

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Enforce authentication/ownership before returning job status

The get_job_status handler reads job data from Redis and returns status/logs without ever calling _ensure_authenticated_user() or verifying that the requester matches cached_job["user_id"]. This means any unauthenticated client—or an authenticated user with a different account—can fetch another user’s job status and logs as long as they know the job ID. Both submit_job and list_jobs enforce ownership, so this route is an inconsistent and likely accidental gap that leaks potentially sensitive job data. Require authentication here and reject requests where the job’s user_id does not match the current user before calling the compose runner or returning data.

Useful? React with 👍 / 👎.


status_url = current_app.config.get("COMPOSE_RUNNER_STATUS_URL")
logs_url = current_app.config.get("COMPOSE_RUNNER_LOGS_URL")

try:
status_response = call_lambda(status_url, {"job_id": job_id})
logs_payload = {"events": []}
if cached_job.get("artifact_prefix"):
logs_payload = call_lambda(
logs_url, {"artifact_prefix": cached_job["artifact_prefix"]}
)
except (ComposeRunnerError, Exception) as exc: # noqa: BLE001
_abort_with_runner_error(exc)

now = datetime.now(timezone.utc).isoformat()
cached_job.update(
{
"status": status_response.get("status", cached_job.get("status")),
"artifact_prefix": status_response.get(
"artifact_prefix", cached_job.get("artifact_prefix")
),
"start_time": status_response.get("start_time"),
"output": status_response.get("output"),
"updated_at": now,
}
)
cached_job["logs"] = [
{
"timestamp": event.get("timestamp"),
"message": event.get("message"),
}
for event in logs_payload.get("events", []) or []
]

try:
_store_job(job_id, cached_job)
except JobStoreError as exc:
_abort_with_job_store_error(exc)

return _make_json_response(cached_job)


def list_jobs():
current_user = _ensure_authenticated_user()
try:
client = get_job_store()
keys = list(client.scan_iter(f"{JOB_CACHE_PREFIX}:*"))
except JobStoreError as exc:
_abort_with_job_store_error(exc)
except Exception as exc: # noqa: BLE001
error = JobStoreError("failed to list jobs")
error.__cause__ = exc
_abort_with_job_store_error(error)

jobs = []
for key in keys:
if isinstance(key, bytes):
key = key.decode("utf-8")
job_id = key[len(JOB_CACHE_PREFIX) + 1:]
try:
job = _load_job(job_id)
except JobStoreError as exc:
_abort_with_job_store_error(exc)
if not job:
continue
if job.get("user_id") != current_user.external_id:
continue
jobs.append(job)

jobs.sort(
key=lambda job: job.get("updated_at") or job.get("created_at") or "",
reverse=True,
)
payload = {"results": jobs, "metadata": {"count": len(jobs)}}
return _make_json_response(payload)


class MetaAnalysisJobsResource(MethodView):
@classmethod
def post(cls):
return submit_job()

@classmethod
def get(cls):
return list_jobs()


class MetaAnalysisJobResource(MethodView):
@classmethod
def get(cls, job_id: str):
return get_job_status(job_id)
6 changes: 6 additions & 0 deletions compose/backend/neurosynth_compose/schemas/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
ProjectSchema,
ResultInitSchema,
ResultUploadSchema,
MetaAnalysisJobRequestSchema,
MetaAnalysisJobResponseSchema,
MetaAnalysisJobLogSchema,
)
from .users import UserSchema

Expand All @@ -35,4 +38,7 @@
"NeurostoreStudySchema",
"ProjectSchema",
"UserSchema",
"MetaAnalysisJobRequestSchema",
"MetaAnalysisJobResponseSchema",
"MetaAnalysisJobLogSchema",
]
25 changes: 25 additions & 0 deletions compose/backend/neurosynth_compose/schemas/analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,31 @@ def create_neurovault_url(self, data, **kwargs):
return data


class MetaAnalysisJobRequestSchema(Schema):
meta_analysis_id = fields.String(required=True)
no_upload = fields.Boolean(load_default=False)


class MetaAnalysisJobLogSchema(Schema):
timestamp = fields.Float()
message = fields.String()


class MetaAnalysisJobResponseSchema(Schema):
job_id = fields.String()
meta_analysis_id = fields.String()
artifact_prefix = fields.String(allow_none=True)
status = fields.String()
status_url = fields.String(allow_none=True)
environment = fields.String()
no_upload = fields.Boolean()
start_time = fields.String(allow_none=True)
output = fields.Dict(allow_none=True)
logs = fields.List(fields.Nested(MetaAnalysisJobLogSchema), allow_none=True)
created_at = fields.String(allow_none=True)
updated_at = fields.String(allow_none=True)


class NeurovaultCollectionSchema(BaseSchema):
collection_id = fields.String()
url = fields.String(dump_only=True)
Expand Down
Loading
Loading