Skip to content

Commit 921e650

Browse files
fix(rotation): delegate to original receive to prevent event loop starvation
Starlette 0.52+ with ASGI spec <2.4 spawns a listen_for_disconnect task inside StreamingResponse that calls receive() in a loop. The old _make_receive returned immediately on every call after the first, causing an infinite busy-loop that starved the event loop and prevented streaming responses from ever sending data. Now delegates to the original ASGI receive after the buffered body is consumed, which properly blocks until the client disconnects. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 24f8e7f commit 921e650

1 file changed

Lines changed: 20 additions & 8 deletions

File tree

src/claude_code_proxy/rotation/middleware.py

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
"""Rotation middleware for injecting account into requests with retry support."""
22

3+
from typing import Any
4+
35
from fastapi.responses import JSONResponse
46
from starlette.types import ASGIApp, Message, Receive, Scope, Send
57
from structlog import get_logger
@@ -40,9 +42,10 @@ async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
4042
ACCOUNT_NAME_HEADER.lower().encode(),
4143
headers.get(ACCOUNT_NAME_HEADER.encode()),
4244
)
45+
make_recv = lambda b: self._make_receive(b, receive) # noqa: E731
4346
if manual:
4447
name = manual.decode() if isinstance(manual, bytes) else manual
45-
await self._handle_manual(scope, body, send, name)
48+
await self._handle_manual(scope, body, send, name, make_recv)
4649
else:
4750
await handle_auto(
4851
self.app,
@@ -51,7 +54,7 @@ async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
5154
body,
5255
send,
5356
self.max_retries,
54-
self._make_receive,
57+
make_recv,
5558
)
5659

5760
async def _read_body(self, receive: Receive) -> bytes:
@@ -64,20 +67,29 @@ async def _read_body(self, receive: Receive) -> bytes:
6467
break
6568
return b"".join(parts)
6669

67-
def _make_receive(self, body: bytes) -> Receive:
70+
def _make_receive(self, body: bytes, original_receive: Receive) -> Receive:
6871
sent = False
6972

7073
async def receive() -> Message:
7174
nonlocal sent
7275
if not sent:
7376
sent = True
7477
return {"type": "http.request", "body": body, "more_body": False}
75-
return {"type": "http.request", "body": b"", "more_body": False}
78+
# Delegate to original receive for disconnect detection.
79+
# Returning immediately here would busy-loop Starlette's
80+
# listen_for_disconnect task and starve the event loop,
81+
# blocking StreamingResponse from ever sending data.
82+
return await original_receive()
7683

7784
return receive
7885

7986
async def _handle_manual(
80-
self, scope: Scope, body: bytes, send: Send, name: str
87+
self,
88+
scope: Scope,
89+
body: bytes,
90+
send: Send,
91+
name: str,
92+
make_recv: Any,
8193
) -> None:
8294
account = self.pool.get_account(name)
8395
if account is None:
@@ -90,7 +102,7 @@ async def _handle_manual(
90102
}
91103
},
92104
)
93-
await r(scope, self._make_receive(body), send)
105+
await r(scope, make_recv(body), send)
94106
return
95107
if not account.is_available:
96108
r = JSONResponse(
@@ -103,7 +115,7 @@ async def _handle_manual(
103115
}
104116
},
105117
)
106-
await r(scope, self._make_receive(body), send)
118+
await r(scope, make_recv(body), send)
107119
return
108120
scope.setdefault("state", {}).update(
109121
{
@@ -126,4 +138,4 @@ async def send_wrapper(message: Message) -> None:
126138
)
127139
await send(message)
128140

129-
await self.app(scope, self._make_receive(body), send_wrapper)
141+
await self.app(scope, make_recv(body), send_wrapper)

0 commit comments

Comments
 (0)