Description
In #1262 and #1289 we mentioned that a middleware feature would be helpful in some scenarios. In the middleware
branch, I have already started to implement it, but I would like to discuss the implementation (@ewjoachim). I use a similar implementation to Django for its middleware. I would like to discuss two API alternatives.
- The way I implemented it currently:
ProcessTask: TypeAlias = Callable[[tasks.Task, job_context.JobContext], Awaitable[Any]]
Middleware: TypeAlias = Callable[
[ProcessTask],
Callable[[tasks.Task, job_context.JobContext], Awaitable[Any]],
]
def simple_middleware(process_task: ProcessTask):
async def middleware(task: tasks.Task, context: job_context.JobContext):
# Do something before the task is processed
result = await process_task(task, context)
# Do something after the task is processed
return result
return middleware
- An alternative (maybe better) way:
ProcessJob: TypeAlias = Callable[[job_context.JobContext], Awaitable[Any]]
Middleware: TypeAlias = Callable[
[ProcessJob],
Callable[[job_context.JobContext], Awaitable[Any]],
]
def simple_middleware(process_job: ProcessTask):
async def middleware(context: job_context.JobContext):
# Do something before the job is processed
result = await process_job(context)
# Do something after the job is processed
return result
return middleware
After some thought, 2) is better because process_job
also wraps the part where the job status is saved back to the db. This is especially important if we allow the user to raise a StopWorker
exception (#1262 (comment)).
One thing that bothers me a bit is that in both scenarios, the context
was already created, and thereby a start_timestamp
is already set. So when the user adds some pause in the middleware (e.g. for #1289) before the job is processed, this pause will be included in the overall job duration. I'm not sure if this is what we want or not.