Skip to content

Commit baa6768

Browse files
authored
Merge pull request #3565 from OpenNeuroOrg/feat/taskiq-dashboard
Log task execution for workers to MongoDB
2 parents 219ddb2 + 45673d4 commit baa6768

File tree

8 files changed

+249
-2
lines changed

8 files changed

+249
-2
lines changed

packages/openneuro-server/src/graphql/permissions.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,3 +169,12 @@ export const checkAdmin = (userId, userInfo) =>
169169
userId && userInfo.admin
170170
? Promise.resolve(true)
171171
: Promise.reject(states.ADMIN.errorMessage)
172+
173+
/**
174+
* Check if the user is a worker
175+
* @param userInfo User context
176+
*/
177+
export const checkWorker = (userInfo) => {
178+
if (userInfo?.worker) return true
179+
else throw new Error("You must be a worker to make this request.")
180+
}

packages/openneuro-server/src/graphql/resolvers/mutation.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ import {
4646
import { saveAdminNote } from "./datasetEvents"
4747
import { createGitEvent } from "./gitEvents"
4848
import { updateFileCheck } from "./fileCheck"
49+
import { updateWorkerTask } from "./worker"
4950

5051
const Mutation = {
5152
createDataset,
@@ -95,6 +96,7 @@ const Mutation = {
9596
saveAdminNote,
9697
createGitEvent,
9798
updateFileCheck,
99+
updateWorkerTask,
98100
}
99101

100102
export default Mutation
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
import WorkerTask from "../../models/worker-task"
2+
import { checkWorker } from "../permissions"
3+
4+
/**
5+
* Update a worker task record
6+
*
7+
* This can be called for new tasks, or to update existing tasks.
8+
*/
9+
export const updateWorkerTask = async (obj, args, { userInfo }) => {
10+
checkWorker(userInfo)
11+
const { id, ...updateData } = args
12+
13+
// Don't allow null values to unset fields
14+
const update = Object.fromEntries(
15+
Object.entries(updateData).filter(([, value]) => value != null),
16+
)
17+
18+
const task = await WorkerTask.findOneAndUpdate({ id }, { $set: update }, {
19+
new: true,
20+
upsert: true,
21+
}).exec()
22+
return task
23+
}

packages/openneuro-server/src/graphql/schema.ts

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -214,6 +214,19 @@ export const typeDefs = `
214214
annexFsck: [AnnexFsckInput!]!
215215
remote: String
216216
): FileCheck
217+
# Update worker task queue status
218+
updateWorkerTask(
219+
id: ID!,
220+
args: JSON,
221+
kwargs: JSON,
222+
taskName: String,
223+
worker: String,
224+
queuedAt: DateTime,
225+
startedAt: DateTime,
226+
finishedAt: DateTime,
227+
error: String,
228+
executionTime: Int,
229+
): WorkerTask
217230
}
218231
219232
# Anonymous dataset reviewer
@@ -969,6 +982,18 @@ export const typeDefs = `
969982
input: [String]
970983
}
971984
985+
type WorkerTask {
986+
id: ID!
987+
args: JSON
988+
kwargs: JSON
989+
taskName: String
990+
worker: String
991+
queuedAt: DateTime
992+
startedAt: DateTime
993+
finishedAt: DateTime
994+
error: String
995+
executionTime: Int
996+
}
972997
`
973998

974999
schemaComposer.addTypeDefs(typeDefs)
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
import mongoose from "mongoose"
2+
import type { Document } from "mongoose"
3+
const { Schema, model } = mongoose
4+
5+
export interface WorkerTaskDocument extends Document {
6+
id: string
7+
args?: Record<string, unknown>
8+
kwargs?: Record<string, unknown>
9+
taskName?: string
10+
worker?: string
11+
queuedAt?: Date
12+
startedAt?: Date
13+
finishedAt?: Date
14+
error?: string
15+
executionTime?: number
16+
}
17+
18+
const workerTaskSchema = new Schema({
19+
id: { type: String, required: true, unique: true },
20+
args: { type: Object },
21+
kwargs: { type: Object },
22+
taskName: { type: String },
23+
worker: { type: String },
24+
queuedAt: { type: Date },
25+
startedAt: { type: Date },
26+
finishedAt: { type: Date },
27+
error: { type: String },
28+
executionTime: { type: Number },
29+
})
30+
31+
workerTaskSchema.index({ id: 1 })
32+
33+
const WorkerTaskModel = model<WorkerTaskDocument>(
34+
"WorkerTask",
35+
workerTaskSchema,
36+
)
37+
38+
export default WorkerTaskModel
Lines changed: 121 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,83 @@
11
import random
2+
from datetime import datetime, timedelta, timezone, UTC
3+
from typing import Any
4+
import httpx
5+
import logging
6+
import jwt
7+
8+
from taskiq import TaskiqMessage, TaskiqMiddleware, TaskiqResult
29

3-
from taskiq import TaskiqMessage, TaskiqMiddleware
410

511
from datalad_service import config
612

13+
logger = logging.getLogger('datalad_service.' + __name__)
14+
15+
_UPDATE_WORKER_TASK_MUTATION = """
16+
mutation UpdateWorkerTask(
17+
$id: ID!
18+
$args: JSON
19+
$kwargs: JSON
20+
$taskName: String
21+
$worker: String
22+
$queuedAt: DateTime
23+
$startedAt: DateTime
24+
$finishedAt: DateTime
25+
$error: String
26+
$executionTime: Int
27+
) {
28+
updateWorkerTask(
29+
id: $id
30+
args: $args
31+
kwargs: $kwargs
32+
taskName: $taskName
33+
worker: $worker
34+
queuedAt: $queuedAt
35+
startedAt: $startedAt
36+
finishedAt: $finishedAt
37+
error: $error
38+
executionTime: $executionTime
39+
) {
40+
id
41+
}
42+
}
43+
"""
44+
45+
46+
def _update_worker_task_body(**kwargs):
47+
"""Create a GraphQL mutation body for updateWorkerTask."""
48+
return {
49+
'query': _UPDATE_WORKER_TASK_MUTATION,
50+
'variables': kwargs,
51+
'operationName': 'UpdateWorkerTask',
52+
}
53+
54+
55+
def generate_worker_token():
56+
utc_now = datetime.now(timezone.utc)
57+
one_day_ahead = utc_now + timedelta(hours=24)
58+
return jwt.encode(
59+
{
60+
'sub': 'dataset-worker',
61+
'iat': int(utc_now.timestamp()),
62+
'exp': int(one_day_ahead.timestamp()),
63+
'scopes': ['dataset:worker'],
64+
},
65+
config.JWT_SECRET,
66+
algorithm='HS256',
67+
)
68+
769

870
class WorkerMiddleware(TaskiqMiddleware):
971
"""
10-
This middleware adds a custom worker label to outgoing tasks scheduled by workers.
72+
This middleware adds a custom worker label to outgoing tasks scheduled by workers
73+
and reports task status back to the OpenNeuro API.
1174
"""
1275

1376
def __init__(self, worker_id=None):
1477
self.worker_id = worker_id
1578

1679
async def pre_send(self, message: TaskiqMessage) -> TaskiqMessage:
80+
"""Assign new tasks to the correct worker."""
1781
if self.worker_id:
1882
message.labels['queue_name'] = f'worker-{self.worker_id}'
1983
else:
@@ -22,3 +86,58 @@ async def pre_send(self, message: TaskiqMessage) -> TaskiqMessage:
2286
f'worker-{random.randint(0, config.DATALAD_WORKERS - 1)}'
2387
)
2488
return message
89+
90+
async def _update_task_status(self, **kwargs):
91+
"""Helper to send updates to the GraphQL API."""
92+
api_token = generate_worker_token()
93+
body = _update_worker_task_body(**kwargs)
94+
try:
95+
async with httpx.AsyncClient() as client:
96+
response = await client.post(
97+
url=config.GRAPHQL_ENDPOINT,
98+
json=body,
99+
headers={'Authorization': f'Bearer {api_token}'},
100+
)
101+
response.raise_for_status()
102+
response_json = response.json()
103+
if 'errors' in response_json:
104+
logger.error(
105+
f'GraphQL error updating task status: {response_json["errors"]}',
106+
)
107+
except httpx.HTTPError as e:
108+
logger.error(f'HTTP error updating task status: {e}')
109+
logger.error(f'Response: {e.response.text}')
110+
except Exception as e:
111+
logger.error(f'Unexpected error updating task status: {e}')
112+
113+
async def post_send(self, message: TaskiqMessage):
114+
"""Called after a task is sent to the broker."""
115+
now = datetime.now(UTC).isoformat()
116+
await self._update_task_status(
117+
id=message.task_id,
118+
args=message.args,
119+
kwargs=message.kwargs,
120+
taskName=message.task_name,
121+
worker=message.labels.get('queue_name'),
122+
queuedAt=now,
123+
)
124+
125+
async def pre_execute(self, message: TaskiqMessage) -> TaskiqMessage:
126+
"""Called before a worker executes a task."""
127+
now = datetime.now(UTC).isoformat()
128+
await self._update_task_status(id=message.task_id, startedAt=now)
129+
return message
130+
131+
async def post_execute(
132+
self,
133+
message: TaskiqMessage,
134+
result: TaskiqResult[Any],
135+
):
136+
"""Called after a worker executes a task."""
137+
now = datetime.now(UTC).isoformat()
138+
await self._update_task_status(
139+
id=message.task_id,
140+
finishedAt=now,
141+
error=None if result.error is None else repr(result.error),
142+
executionTime=round(result.execution_time * 1000),
143+
)

services/datalad/pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ dependencies = [
1919
"taskiq[reload]>=0.11.18",
2020
"taskiq-redis>=1.0.9",
2121
"uvicorn[standard]>=0.34.3",
22+
"httpx>=0.28.1",
2223
]
2324
dynamic = ["version"]
2425

services/datalad/uv.lock

Lines changed: 30 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)