Fix remote-device tool timing out on scheduled runs (Redis-backed broker)#2511
Conversation
…he device
The remote-device tool worked interactively but timed out on every scheduled
run. DeviceBroker was an in-process, in-memory singleton, but scheduled runs
execute in the Celery worker — a different process from the gunicorn web tier
that holds the device's SSE session — so a worker-side dispatch never reached
the device and the tool always hit its deadline.
Make the broker Redis-backed so every hop crosses the process boundary:
- queued commands -> Redis list dev:cmd:{device_id}
- output chunks -> Redis stream dev:out:{invocation_id}
- invocation metadata -> Redis hash dev:inv:{invocation_id}
- SSE upgrade tickets -> Redis key dev:ticket:{device_id}
Per-connection SSE session state stays in the web process. Reuses the existing
get_redis_instance()/CACHE_REDIS_URL; no new infrastructure. Also makes the web
tier safe to scale past one worker.
Concurrency hardening (from adversarial review + real-Redis e2e):
- XADD the output/control chunk before flipping completed=1, and have
drain_output do a final non-blocking flush after observing completion, so a
reader can't see completion and stop before the control chunk lands (this had
reintroduced the false "device did not respond (timed out)" under a race).
- _collect_result builds the result from drained chunks, checks the deadline
only after capturing a chunk, and falls back to the authoritative snapshot
(before cleanup) when no control chunk was observed.
- Audit outcome is written from locally-known fields so it survives the worker
racing to delete the invocation; a denied command now records a terminal
"denied" outcome instead of staying "dispatched".
- cmd-queue TTL raised to 900s (>= max drain deadline); dispatch-failure and
reaped-invocation cleanup; UTF-8 byte counts.
Tests: new tests/devices/{conftest (FakeRedis double), test_broker_cross_process,
test_broker_race, test_submit_output_audit}; drain/cleanup/ticket tests rewritten
for the Redis contract. The race tests fail against the pre-fix code. ruff clean;
device + tool-executor suites green.
|
The latest updates on your projects. Learn more about Vercel for GitHub.
|
There was a problem hiding this comment.
Pull request overview
The DeviceBroker is rewritten from an in-process singleton to a Redis-backed broker so that remote-device invocations dispatched from a Celery worker (e.g., a scheduled run) can reach the device's SSE session held by the web process. The fix also hardens against a completion-vs-control-chunk race that could re-introduce the same false-timeout symptom, and rewrites the device test suite around an in-memory FakeRedis double.
Changes:
- Move queued commands, output chunks, invocation metadata, and SSE tickets into Redis keys (
dev:cmd:*,dev:out:*,dev:inv:*,dev:ticket:*), keeping only per-connectionSessionStatelocal. - Order writers so output XADDs land before
completed=1, givedrain_outputa final non-blocking flush, and make_collect_resultcapture chunks before the deadline check with an authoritative-snapshot fallback; record audit (including a terminal"denied"outcome) from locally-captured control fields so it survives worker-side cleanup. - Add
tests/devices/conftest.pyFakeRedisplus cross-process, race, and route-level audit suites; rewrite drain/cleanup/ticket tests against the Redis contract.
Reviewed changes
Copilot reviewed 11 out of 11 changed files in this pull request and generated no comments.
Show a summary per file
| File | Description |
|---|---|
| application/devices/broker.py | Redis-backed broker (commands/output/tickets/metadata) replacing the in-memory broker. |
| application/api/devices/session.py | SSE handler uses next_command; ack records denial audit; output handler writes audit from local control chunk. |
| application/agents/tools/remote_device.py | _collect_result drains via broker, capture-before-deadline, snapshot fallback when no control chunk. |
| application/core/settings.py | New REMOTE_DEVICE_CMD_QUEUE_TTL_SECONDS / INVOCATION_TTL_SECONDS / OUTPUT_STREAM_MAXLEN tunables. |
| tests/devices/conftest.py | New FakeRedis double + broker_env fixture for broker tests. |
| tests/devices/test_broker_cross_process.py | Regression that worker dispatch reaches a web-side session through shared Redis. |
| tests/devices/test_broker_race.py | Race regressions: final-flush, snapshot fallback, near-deadline capture, dispatch failure cleanup, reaped invocation, UTF-8 byte counts, queue TTL. |
| tests/devices/test_submit_output_audit.py | Route-level audit coverage incl. snapshot-deleted-by-race and denied-ack cases. |
| tests/devices/test_broker_drain.py | Rewritten drain tests against the Redis contract (control chunk, completion flag, unavailable Redis). |
| tests/devices/test_broker_cleanup.py | Cleanup tests asserting Redis hash/stream/list state. |
| tests/devices/test_session_ticket.py | Ticket tests use FakeRedis; expiry simulated by explicit delete; reuse-while-unexpired added. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #2511 +/- ##
==========================================
- Coverage 91.34% 89.55% -1.79%
==========================================
Files 248 288 +40
Lines 20709 25694 +4985
==========================================
+ Hits 18916 23010 +4094
- Misses 1793 2684 +891 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
Addresses the code-quality lint on the best-effort hash delete in dispatch_invocation's failure path: replace the bare `except: pass` with a logger.debug carrying the invocation_id. No behavior change — cleanup stays best-effort and still returns a failed Invocation.
Problem
The remote-device tool (run shell commands on a paired machine) worked when invoked interactively but timed out on every scheduled run.
DeviceBrokerwas an in-process, in-memory singleton; scheduled runs execute in the Celery worker, a separate process from the gunicorn web tier that holds the device's SSE session. A worker-sidedispatch_invocationtherefore never reached the device, and_collect_resultalways hit its deadline →"device did not respond (timed out)". Interactive worked only because the web tier is a single process (gunicorn -w 1) shared with the SSE session.Fix
Make the broker Redis-backed so every hop crosses the process boundary (reusing the existing
get_redis_instance()/CACHE_REDIS_URL— no new infra; both web and worker already point at the same Redis):dev:cmd:{device_id}(BLPOP)dev:out:{invocation_id}(XADD/XREAD)dev:inv:{invocation_id}dev:ticket:{device_id}(TTL)Per-connection SSE state stays local to the web process; the public broker API is preserved. This also makes the web tier safe to scale past one worker.
Concurrency hardening
The fix was reviewed adversarially and tested against real Redis (including a true cross-process subprocess reproduction). That caught — and this PR fixes — a race that had reintroduced the same false-timeout symptom:
completed=1, anddrain_outputdoes a final non-blocking sweep after observing completion — so a reader can't see completion and stop before the control chunk is on the stream._collect_result: builds the result from drained chunks, checks the deadline after capturing each chunk (no dropped near-deadline control chunk), and falls back to the authoritative snapshot before cleanup when no control chunk was seen."denied"outcome instead of staying"dispatched".Tests
New
tests/devices/:conftest.py(in-memoryFakeRedisdouble),test_broker_cross_process.py(regression for the worker→web gap),test_broker_race.py(8 race regressions that fail against the pre-fix code),test_submit_output_audit.py(route-level audit incl. the snapshot-deleted-by-race case + denial). Rewrote drain/cleanup/ticket tests for the Redis contract.ruffclean; device + tool-executor suites green (201 tests).Deferred (low/nit, not regressions from this change)
BLPOPorphan guard (narrowed; full close needs an atomic Lua pop).BLPOPholds one of the 32 WSGI threads → ~32 concurrent device sessions per web worker (operational).xreadassumes RESP2 (safe at the redis-py protocol=2 default; commented).Verification
Automated proof is the real-Redis cross-process e2e. A full end-to-end with a physically paired device + a live scheduled run is still worth doing manually before release — please attach a screenshot/recording of a scheduled remote-device run succeeding.