|
| 1 | +# Plan: Resolve Production Worker Stability & SSE Streaming Issues |
| 2 | + |
| 3 | +## Context |
| 4 | + |
| 5 | +After 19+ hours of uptime, the production worker accumulates orphaned asyncio tasks and Pydantic serializer warnings. Additionally, both worker processes crash simultaneously due to unhandled Redis `ConnectionError` in the TaskIQ broker's `listen()` loop. These issues compound to cause SSE streaming responses not being delivered to users in production, despite healthy backend health checks. Four GitHub issues are needed. |
| 6 | + |
| 7 | +--- |
| 8 | + |
| 9 | +## Issue 1: Redis Connection Drops Kill Worker Processes (CRITICAL) |
| 10 | + |
| 11 | +**Problem:** `RedisStreamBroker.listen()` in taskiq-redis has **no error handling** for `ConnectionError`. When Redis closes the connection (server restart, network blip, idle timeout), the exception propagates up through TaskIQ's receiver, killing the worker process. Both worker-0 and worker-1 crash simultaneously. TaskIQ's process manager restarts them, but any in-flight tasks are lost and streams are never completed. |
| 12 | + |
| 13 | +**Evidence:** Logs show `redis.exceptions.ConnectionError: Connection closed by server.` causing `ExceptionGroup: unhandled errors in a TaskGroup` in both workers. |
| 14 | + |
| 15 | +**Note:** `ListQueueBroker.listen()` (line 135 of `redis_broker.py`) already handles this with `except ConnectionError: continue`, but `RedisStreamBroker` does not. |
| 16 | + |
| 17 | +**Fix:** Configure Redis connection pool with `retry_on_error` and `health_check_interval`, and add a custom broker subclass with connection error handling. |
| 18 | + |
| 19 | +### Files to modify |
| 20 | + |
| 21 | +1. **`backend/src/workers/broker.py`** — Subclass `RedisStreamBroker` to add `ConnectionError` handling in `listen()`, and pass `retry_on_error` + `health_check_interval` kwargs to the connection pool |
| 22 | + |
| 23 | +### Implementation details |
| 24 | + |
| 25 | +```python |
| 26 | +from redis.asyncio import Redis |
| 27 | +from redis.exceptions import ConnectionError |
| 28 | +from taskiq_redis import RedisStreamBroker |
| 29 | + |
| 30 | +class ResilientRedisStreamBroker(RedisStreamBroker): |
| 31 | + """RedisStreamBroker with ConnectionError retry handling.""" |
| 32 | + |
| 33 | + async def listen(self): |
| 34 | + """Listen with automatic reconnect on ConnectionError.""" |
| 35 | + while True: |
| 36 | + try: |
| 37 | + async for message in super().listen(): |
| 38 | + yield message |
| 39 | + except ConnectionError as exc: |
| 40 | + logger.warning("Redis connection error in broker listen: %s. Reconnecting...", exc) |
| 41 | + await asyncio.sleep(1) # Brief backoff before reconnect |
| 42 | + continue |
| 43 | + |
| 44 | +broker = ResilientRedisStreamBroker( |
| 45 | + url=REDIS_URL, |
| 46 | + queue_name="orchestra_tasks", |
| 47 | + health_check_interval=30, # Ping Redis every 30s to detect dead connections |
| 48 | + retry_on_error=[ConnectionError], |
| 49 | +).with_result_backend(result_backend) |
| 50 | +``` |
| 51 | + |
| 52 | +--- |
| 53 | + |
| 54 | +## Issue 2: Orphaned Asyncio Tasks in Worker (LangGraph Store Batch Loop) |
| 55 | + |
| 56 | +**Problem:** Each worker task creates a new `AsyncPostgresStore` via `get_store_db()`. The store's internal batch loop task (`asyncio.create_task(_run())`) is not properly awaited on context manager exit, leaving orphaned tasks that accumulate over hours. |
| 57 | + |
| 58 | +**Fix:** Add a singleton store to `WorkerState`, mirroring the existing checkpointer pattern in `state.py`. |
| 59 | + |
| 60 | +### Files to modify |
| 61 | + |
| 62 | +1. **`backend/src/workers/state.py`** — Add `_store` field, `_store_exit_stack`, initialize in `initialize()`, cleanup in `shutdown()`, add `get_store()` and `get_worker_store()` convenience function |
| 63 | +2. **`backend/src/workers/tasks.py`** — Replace `async with get_store_db() as store:` with `store = await get_worker_store()` |
| 64 | +3. **`backend/src/workers/broker.py`** — Ensure lifecycle hooks call store init/shutdown (already structured for this) |
| 65 | + |
| 66 | +### Implementation details |
| 67 | + |
| 68 | +- Use `contextlib.AsyncExitStack` to manage the store context manager lifecycle in `WorkerState` |
| 69 | +- On shutdown, exit the stack (which calls `__aexit__`), then cancel the store's internal `_task` and await with timeout as defense-in-depth |
| 70 | +- Keep `get_store_db()` in `db.py` unchanged for non-worker callers (API routes) |
| 71 | + |
| 72 | +--- |
| 73 | + |
| 74 | +## Issue 3: Pydantic Serializer Warnings |
| 75 | + |
| 76 | +**Problem:** Worker produces repeated Pydantic serializer warnings from legacy `.dict()` calls and `model_dump()` on LangChain `BaseMessage` objects (which extend `Serializable`, not Pydantic `BaseModel`). |
| 77 | + |
| 78 | +**Fix:** Update serialization calls to use correct methods per object type. |
| 79 | + |
| 80 | +### Files to modify |
| 81 | + |
| 82 | +1. **`backend/src/agents/__init__.py`** (lines 64, 104) — Replace `memory.dict()` with `memory.model_dump()` or access `.value` directly |
| 83 | +2. **`backend/src/repos/thread_repo.py`** (line 67) — Replace `.dict()` with `.model_dump()` |
| 84 | +3. **`backend/src/utils/messages.py`** (line 20) — Use LangChain's `message_to_dict()` for `BaseMessage` instead of `.model_dump()` |
| 85 | +4. **`backend/src/utils/stream.py`** (line 62) — Same fix in `_to_dict()`: type-check for `BaseMessage` before calling serialization |
| 86 | + |
| 87 | +--- |
| 88 | + |
| 89 | +## Issue 4: Production SSE Streaming Not Returning Responses |
| 90 | + |
| 91 | +**Problem:** Production uses distributed mode (nginx + Redis streams). Responses not reaching users despite healthy backend. Root causes are a combination of Issues 1-3 plus missing nginx buffering headers and stream race conditions. |
| 92 | + |
| 93 | +**Fix:** Multiple targeted changes to eliminate buffering and race conditions. |
| 94 | + |
| 95 | +### Files to modify |
| 96 | + |
| 97 | +1. **`backend/src/routes/v0/llm.py`** (line 150) — Add `"X-Accel-Buffering": "no"` header to sync-mode `StreamingResponse` (already present on distributed endpoint in `thread.py:285`) |
| 98 | +2. **`backend/src/workers/tasks.py`** (~line 105) — Write an "initializing" event to Redis stream immediately on task start, before heavy init work, to eliminate the race where client polls before stream exists |
| 99 | +3. **`backend/src/utils/stream.py`** — In `stream_from_redis()`, add a wait-loop when stream doesn't exist yet (up to 30s with keep-alive comments) instead of immediately raising `FileNotFoundError` |
| 100 | + |
| 101 | +--- |
| 102 | + |
| 103 | +## Deployment Order |
| 104 | + |
| 105 | +1. **Issue 1 (Redis resilience)** — CRITICAL. Workers crashing is the most likely cause of missing responses. Lowest risk fix (subclass + config). |
| 106 | +2. **Issue 4 (SSE headers + race conditions)** — High user impact, low risk. `X-Accel-Buffering` header is a one-liner. |
| 107 | +3. **Issue 3 (Pydantic warnings)** — Low risk, reduces log noise, aids debugging. |
| 108 | +4. **Issue 2 (Singleton store)** — Medium risk, fixes long-running stability. Deploy after validating Issue 1 fix. |
| 109 | + |
| 110 | +## Verification |
| 111 | + |
| 112 | +- **Issue 1:** Simulate Redis restart (`docker restart orchestra-dev-redis-1`), verify workers reconnect without crashing |
| 113 | +- **Issue 2:** Run worker for extended period, monitor for "Task was destroyed" warnings via `docker logs` |
| 114 | +- **Issue 3:** Check worker logs for absence of Pydantic serializer warnings |
| 115 | +- **Issue 4:** Send chat message in production, verify SSE stream delivers response. Test with `curl -N` behind nginx. |
| 116 | +- All: `make test` passes, `make format` clean |
| 117 | + |
| 118 | +## GitHub Issues |
| 119 | + |
| 120 | +Create 4 separate issues on the repo, each referencing the relevant files and fix approach above. Label with `bug` and `backend`. |
0 commit comments