feat: add BackgroundTasks for post-response execution#1371
feat: add BackgroundTasks for post-response execution#1371
Conversation
Adds BackgroundTask and BackgroundTasks classes that allow running
callables after the response is sent to the client:
@app.post("/notify")
async def notify(request):
tasks = BackgroundTasks()
tasks.add_task(send_email, to="user@example.com")
return {"status": "queued"}, tasks
Closes #548
Made-with: Cursor
|
The latest updates on your projects. Learn more about Vercel for GitHub.
|
📝 WalkthroughWalkthroughAdds background-task support: new Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant Router as Request Router
participant Handler as Request Handler
participant Response
participant Background as BackgroundTasks
participant Thread as Daemon Thread / EventLoop
Client->>Router: HTTP request
Router->>Handler: invoke handler
Handler-->>Router: returns (response, Background) or response
Router->>Response: format & send response
Response->>Client: HTTP response
alt Background present
Router->>Background: run_in_thread()
Background->>Thread: start daemon thread
Thread->>Thread: execute tasks sequentially
Thread->>Thread: run sync funcs or create/run event loop for coroutines
Thread-->>Background: log exceptions per task, continue
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 4
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
robyn/router.py (1)
276-304:⚠️ Potential issue | 🟠 MajorThis is not actually post-response execution yet.
background.run_in_thread()runs on Lines 302-303 and 333-334 before the wrapper returnsresponse, so the work can start before the socket write. That is especially visible forStreamingResponse, where tasks may run while the body is still streaming. If the contract is “after the response is sent”, scheduling has to move to the dispatch layer after flush.Also applies to: 307-335
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@robyn/router.py` around lines 276 - 304, The background task is being started inside async_inner_handler via background.run_in_thread() before the response is actually sent (affecting StreamingResponse); change it so handlers only attach/pass BackgroundTasks objects and do NOT call background.run_in_thread() inside async_inner_handler; instead ensure the dispatch layer (where the socket/response is flushed) is responsible for detecting a BackgroundTasks instance attached to responses from _format_response (or from exception_handler) and invoking run_in_thread() only after the response has been fully sent. Update async_inner_handler to only extract and preserve the BackgroundTasks object (do not start it), and move the call to background.run_in_thread() into the response-dispatch/flush logic that finalizes the HTTP body.
🧹 Nitpick comments (1)
unit_tests/test_background_tasks.py (1)
6-73: Please add router-level coverage for the tuple return path.This file only exercises
BackgroundTaskandBackgroundTasksdirectly. The new behavior inrobyn/router.py—splitting(response, BackgroundTasks)for both sync and async handlers—still is not asserted here, so regressions in the integration path will slip through.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@unit_tests/test_background_tasks.py` around lines 6 - 73, Add tests that exercise the router-level behavior when a handler returns a tuple (response, BackgroundTasks): write one synchronous handler and one asynchronous handler that each return (response_value, BackgroundTasks([...])) and assert the router (robyn.router request handling entrypoint) correctly unwraps the tuple, sends/returns response_value and executes the BackgroundTasks; reference the router function that dispatches handlers (e.g., the request handler or method that currently splits tuple returns in robyn/router.py) and reuse BackgroundTask/BackgroundTasks to attach side-effecting tasks so the test asserts both the response content and that the background tasks ran.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@robyn/background.py`:
- Around line 58-63: The current run_in_thread method creates a new OS thread
each call (threading.Thread(...).start()), which can exhaust threads; change
this to use a shared bounded executor or a persistent worker so concurrency is
limited: add a module- or class-level ThreadPoolExecutor (or a single background
worker/queue) and submit self.run to that executor instead of spawning
threading.Thread, ensure you initialize the executor lazily and reuse it for
subsequent calls, and keep existing behavior by checking self._tasks before
submitting; update references to run_in_thread and run accordingly.
- Around line 50-56: In run(), the exception handler assumes task.func.__name__
exists which fails for functools.partial and callable instances; change the
logging to compute a safe task name (e.g., target = getattr(task, "func", task);
name = getattr(target, "__name__", None) or fall back to type(task).__name__ or
repr(task)) and use that safe name in _logger.exception so the exception path
itself cannot raise and remaining tasks still run.
- Around line 17-27: The current __call__ uses inspect.iscoroutinefunction(func)
(stored in is_async) which misses objects whose __call__ is async; instead, call
result = self.func(*self.args, **self.kwargs) and detect awaitables with
inspect.isawaitable(result) or asyncio.iscoroutine(result); if the result is
awaitable, create a new event loop (as before), run
loop.run_until_complete(result) and close the loop, otherwise treat it as a
regular synchronous return. Update __call__ to remove reliance on self.is_async
and use the runtime check on the call result (referencing __call__, self.func,
self.args, self.kwargs, and the event loop handling).
In `@unit_tests/test_background_tasks.py`:
- Around line 46-57: The test test_background_tasks_run_in_thread uses
time.sleep and is flaky; replace the sleep with a threading.Event: create an
Event in the test, have the worker function (used with BackgroundTasks.add_task)
call event.set() when done, call BackgroundTasks.run_in_thread(), then wait on
event.wait(timeout=...) and assert the event is set (and/or check results) to
make the test deterministic; reference BackgroundTasks.add_task and
BackgroundTasks.run_in_thread to locate where to attach the event signaling.
---
Outside diff comments:
In `@robyn/router.py`:
- Around line 276-304: The background task is being started inside
async_inner_handler via background.run_in_thread() before the response is
actually sent (affecting StreamingResponse); change it so handlers only
attach/pass BackgroundTasks objects and do NOT call background.run_in_thread()
inside async_inner_handler; instead ensure the dispatch layer (where the
socket/response is flushed) is responsible for detecting a BackgroundTasks
instance attached to responses from _format_response (or from exception_handler)
and invoking run_in_thread() only after the response has been fully sent. Update
async_inner_handler to only extract and preserve the BackgroundTasks object (do
not start it), and move the call to background.run_in_thread() into the
response-dispatch/flush logic that finalizes the HTTP body.
---
Nitpick comments:
In `@unit_tests/test_background_tasks.py`:
- Around line 6-73: Add tests that exercise the router-level behavior when a
handler returns a tuple (response, BackgroundTasks): write one synchronous
handler and one asynchronous handler that each return (response_value,
BackgroundTasks([...])) and assert the router (robyn.router request handling
entrypoint) correctly unwraps the tuple, sends/returns response_value and
executes the BackgroundTasks; reference the router function that dispatches
handlers (e.g., the request handler or method that currently splits tuple
returns in robyn/router.py) and reuse BackgroundTask/BackgroundTasks to attach
side-effecting tasks so the test asserts both the response content and that the
background tasks ran.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 9c003ba1-1f2d-4446-a9e9-69e39e67d822
📒 Files selected for processing (4)
robyn/__init__.pyrobyn/background.pyrobyn/router.pyunit_tests/test_background_tasks.py
| def run_in_thread(self) -> None: | ||
| """Execute all tasks in a background thread.""" | ||
| if not self._tasks: | ||
| return | ||
| thread = threading.Thread(target=self.run, daemon=True) | ||
| thread.start() |
There was a problem hiding this comment.
Avoid one daemon thread per background dispatch.
Lines 62-63 create a fresh OS thread every time run_in_thread() is called. A burst of requests returning background work will scale thread count linearly, and once the process hits its thread limit this can fail the request path with RuntimeError: can't start new thread.
A shared executor or worker queue per process would keep concurrency bounded.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@robyn/background.py` around lines 58 - 63, The current run_in_thread method
creates a new OS thread each call (threading.Thread(...).start()), which can
exhaust threads; change this to use a shared bounded executor or a persistent
worker so concurrency is limited: add a module- or class-level
ThreadPoolExecutor (or a single background worker/queue) and submit self.run to
that executor instead of spawning threading.Thread, ensure you initialize the
executor lazily and reuse it for subsequent calls, and keep existing behavior by
checking self._tasks before submitting; update references to run_in_thread and
run accordingly.
There was a problem hiding this comment.
Resolve — acceptable for v1. Thread pool optimization can be added in a follow-up.
There was a problem hiding this comment.
Rate Limit Exceeded
@sansyrox have exceeded the limit for the number of chat messages per hour. Please wait 2 minutes and 1 seconds before sending another message.
…kgroundTasks Made-with: Cursor
for more information, see https://pre-commit.ci
There was a problem hiding this comment.
♻️ Duplicate comments (1)
unit_tests/test_background_tasks.py (1)
58-59:⚠️ Potential issue | 🟡 MinorAssert the event wait result for deterministic thread synchronization.
On Line 58,
event.wait(timeout=5)is not asserted. If signaling regresses, the test can still pass based only on list mutation. Assert the wait result explicitly.Suggested fix
- event.wait(timeout=5) + assert event.wait(timeout=5), "Background task thread did not complete in time" assert results == ["done"]🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@unit_tests/test_background_tasks.py` around lines 58 - 59, The test currently calls event.wait(timeout=5) without asserting its return, so add an explicit assertion that event.wait(timeout=5) is truthy before asserting results to ensure deterministic synchronization; locate the event.wait call (variable name event) and change the line to assert event.wait(timeout=5) (or assert event.wait(timeout=5) is True) prior to asserting that results == ["done"] so the test fails if the thread wasn't signaled.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Duplicate comments:
In `@unit_tests/test_background_tasks.py`:
- Around line 58-59: The test currently calls event.wait(timeout=5) without
asserting its return, so add an explicit assertion that event.wait(timeout=5) is
truthy before asserting results to ensure deterministic synchronization; locate
the event.wait call (variable name event) and change the line to assert
event.wait(timeout=5) (or assert event.wait(timeout=5) is True) prior to
asserting that results == ["done"] so the test fails if the thread wasn't
signaled.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 9401cf88-18f4-4430-ab69-15708f838103
📒 Files selected for processing (2)
robyn/background.pyunit_tests/test_background_tasks.py
✅ Files skipped from review due to trivial changes (1)
- robyn/background.py
Summary
BackgroundTaskandBackgroundTasksclasses inrobyn/background.pyfor scheduling work to run after the response is sent to the client.(response, BackgroundTasks)tuples; tasks are executed in a background thread after response dispatch.robyn.__init__for convenient access.Usage
Both sync and async callables are supported. Errors in individual tasks are logged but do not affect other tasks or the response.
Closes #548
Test plan
BackgroundTask(sync and async execution)BackgroundTasks(collection,run,run_in_thread, error isolation)Made with Cursor
Summary by CodeRabbit
New Features
Tests