Skip to content

Backpressure and body streaming for system_interface#165

Draft
gnguralnick wants to merge 7 commits into
mainfrom
gabriel/backpressure-body-streaming-v3
Draft

Backpressure and body streaming for system_interface#165
gnguralnick wants to merge 7 commits into
mainfrom
gabriel/backpressure-body-streaming-v3

Conversation

@gnguralnick

@gnguralnick gnguralnick commented Jun 15, 2026

Copy link
Copy Markdown
Contributor

Addresses the system_interface audit's backpressure & body-streaming issues so the app does not leak memory under load or OOM on large payloads. Re-applied on current main after the original branch fell far behind.

  • Bound per-agent SSE event queues and evict slow subscribers (event_queues.py): register() now allocates a bounded queue; a subscriber that stops draining is evicted after a run of consecutive queue-full broadcasts (drain + None sentinel), the replay buffer is capped, and a new evict() supports destroy-time teardown. Mirrors WebSocketBroadcaster.
  • Stream proxy request/response bodies with caps (service_dispatcher.py): request bodies stream upstream with a hard byte cap (413 on exceed); responses stream unless HTML (buffered under a smaller cap so it can be rewritten, else 502); streaming is chosen by the backend content-type rather than the Accept header (fixing fetch() NDJSON/chunked streams that send Accept: */*). The service worker is left unchanged (duplex: 'half' is not portable). Preserves upstream's WebSocket survivor-cancel behavior.
  • Make the shared SSE generator async (server.py): _stream_filtered_events now polls the thread-safe queue via run_in_threadpool(get, timeout=1) instead of blocking a threadpool worker for each connection's lifetime.
  • Evict per-agent resources on agent destroy (agent_manager.py, server.py, frontend models): an agent-removed listener fires on every removal path; the lifespan listener stops the session watcher and evicts the event queues, and _destroy_agent runs off the event loop. The frontend drops a destroyed agent's cached transcript store and tears down its SSE stream.

Tests added across event_queues_test.py, service_dispatcher_test.py, server_test.py, agent_manager_test.py, and the frontend Response.test.ts / StreamingMessage.test.ts.

uv.lock is also updated to match the current vendored dependency definitions (the vendor/mngr 0.3.1 refresh: imbue-mngr 0.2.12->0.2.14, imbue-common/concurrency-group 0.1.19->0.1.20, and the added anthropic/requests deps); uv lock --check passes.

Gabriel Guralnick added 7 commits June 15, 2026 10:24
A slow or disconnected SSE consumer was an unbounded memory leak: register()
created an unbounded queue.Queue and broadcast() fanned out with put_nowait,
while the STORE replay buffer grew without limit. Mirror WebSocketBroadcaster:
bounded queue (maxsize), per-subscriber consecutive-queue.Full counter,
eviction (drain + None sentinel) once a subscriber stalls past the threshold,
a capped replay buffer, and a new evict(agent_id) for destroy-time teardown.
broadcast_all_ignored continues to delegate to broadcast, so it inherits the
bounding automatically.
…ontent-type

The /service/<name>/ proxy fully buffered request and response bodies
(await request.body(), backend_response.content/.text), so a large
upload/download could OOM the server, and streaming was gated on the
Accept: text/event-stream header (fetch() defaults to */*, falling into the
buffering branch). Stream the request body upstream with a hard byte cap
(413 on exceed), stream the response unless it is HTML (HTML is buffered
under a smaller cap so it can be rewritten, else 502), and choose streaming
by the backend response content-type instead of the request Accept header.
The service worker is left unchanged (streaming request bodies need
duplex:'half', which is not portable across browsers).

Preserves the upstream WebSocket survivor-cancel improvement in
_handle_service_websocket.
… workers

_stream_filtered_events was a sync generator handed to StreamingResponse, so
Starlette iterated it in the threadpool for the lifetime of each SSE
connection -- N subscribers permanently held N of the ~40 default workers,
starving every other run_in_threadpool caller. Convert it to an async
generator that polls the thread-safe queue via run_in_threadpool(get,
timeout=1), releasing the worker on every poll (the same pattern as
_run_ws_broadcast_loop). The shared should_forward predicate and keepalive
behavior are unchanged. The filtered-events unit test now drives the async
generator via anyio.
Destroying an agent left its server-side resources running and its client
caches populated for the lifetime of the process/page. AgentManager gains an
agent-removed listener that fires on every removal path (REST destroy plus
observe-driven destroy / host-destroy); the FastAPI lifespan registers a
listener that stops the session watcher and evicts the SSE event queues.
_destroy_agent runs remove_agent off the event loop so the bounded-but-blocking
watcher join never parks the loop. Watcher stop() errors (OSError/RuntimeError
from watchdog teardown) are contained so a teardown hiccup can't kill the
observe thread or skip eviction, while genuine programming errors still surface.

Frontend: when an agent disappears from an agents_updated snapshot (or via
removeAgentLocally), drop its cached transcript store and not-found marker and
tear down its SSE stream (closing the EventSource, recording the tombstone so
a pending reconnect timer stays down, and dropping the snapshot buffer).
The add_agent_removed_listener docstring still referenced the tickets watcher,
which was removed upstream; only the session watcher remains.
The merge of origin/main pulled in the vendor/mngr refresh (minds 0.3.1),
which bumps imbue-mngr 0.2.12->0.2.14, imbue-common/concurrency-group
0.1.19->0.1.20, adds the anthropic dependency to imbue-mngr-claude, and
adds requests to imbue-mngr. Update the lockfile to match. (uv lock --check passes.)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant