From 170fd99e0234f5fc13fae5c336a528bdba684e90 Mon Sep 17 00:00:00 2001 From: Andrew Garde Joia Date: Sun, 21 Jun 2026 10:15:45 +0000 Subject: [PATCH 1/4] fix(voice/#648): terminal drain on non-audio completion (EL py+ts, WebSocket py) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The ElevenLabs and generic WebSocket adapters shared an audio-gated receive loop that returned ONLY on an audio frame, so a turn completing without audio drained to the response_timeout deadline and raised — a latent hang surfaced by /sweep during PR #647 (#646 fixed the same anti-pattern in OpenAI Realtime). Mirror the #646/PR647 reference pattern (and the Gemini Live / Pipecat idiom): return an empty AudioChunk on a non-audio terminal so the base _drain_agent_response loop exits cleanly instead of hanging to the deadline. - elevenlabs.py recv_audio: a socket close (ConnectionClosed) and a client_tool_call (tool-only turn, no client_tool_result path) each return an empty chunk. - websocket.py recv_audio: a clean server close (end of stream) returns empty. - elevenlabs.ts onMessage: client_tool_call resolves the active receiver with an empty chunk (socket close was already handled by onSocketClose). Co-Authored-By: Claude Opus 4.8 --- javascript/src/voice/adapters/elevenlabs.ts | 22 ++++++++ python/scenario/voice/adapters/elevenlabs.py | 54 ++++++++++++++++++-- python/scenario/voice/adapters/websocket.py | 19 ++++++- 3 files changed, 89 insertions(+), 6 deletions(-) diff --git a/javascript/src/voice/adapters/elevenlabs.ts b/javascript/src/voice/adapters/elevenlabs.ts index 476ac008a..03cdaae7f 100644 --- a/javascript/src/voice/adapters/elevenlabs.ts +++ b/javascript/src/voice/adapters/elevenlabs.ts @@ -17,9 +17,15 @@ * `lastAgentTranscript` * - `audio` — decoded base64 PCM16 and returned from `receiveAudio` * - `ping` — replied with `{"type": "pong", "event_id": }` + * - `client_tool_call` — tool-only / non-audio terminal turn: resolves the + * drain with an empty `AudioChunk` (issue #648) instead of hanging to the + * `receiveAudio` timeout (no `client_tool_result` path → no follow-up audio) * - `interruption` — swallowed * - Anything else — silently skipped * + * A socket close mid-receive is likewise terminal: `onSocketClose` resolves + * pending waiters with an empty `AudioChunk` so the drain exits cleanly (#648). + * * {@link ElevenLabsVoiceAgent} — the typed *local* composable preset (distinct * responsibility, same vendor): you compose {@link ElevenLabsSTTProvider} + any * LLM + ElevenLabs TTS yourself, keeping control over prompts, model choice, @@ -481,6 +487,22 @@ export class ElevenLabsAgentAdapter extends VoiceAgentAdapter { return; } + if (etype === "client_tool_call") { + // Issue #648: EL ConvAI emits `client_tool_call` when the agent invokes a + // CLIENT-side tool. This adapter has no `client_tool_result` path, so the + // agent will never produce spoken audio for this turn — it is a tool-only + // / non-audio terminal turn. Resolve the active `receiveAudio` waiter with + // an empty chunk so the base drain (`drainAgentResponse`) exits cleanly + // instead of hanging to the `receiveAudio` timeout. Mirrors the #646/PR647 + // reference fix and the Python parity in `elevenlabs.py`. If no receive is + // in flight there is nothing to unblock — drop it (the close/error + // handlers use the same active-waiters-only semantics via + // `drainPendingWaiters`). + const waiter = this.waiters.shift(); + if (waiter) waiter(new AudioChunk({ data: new Uint8Array(0) })); + return; + } + // `interruption` and any unknown events are swallowed — Python parity. } } diff --git a/python/scenario/voice/adapters/elevenlabs.py b/python/scenario/voice/adapters/elevenlabs.py index 6022b400c..72552cb7b 100644 --- a/python/scenario/voice/adapters/elevenlabs.py +++ b/python/scenario/voice/adapters/elevenlabs.py @@ -28,10 +28,16 @@ ``last_agent_transcript`` (post-barge-in update) - ``audio`` — decoded and returned from ``recv_audio`` - ``ping`` — replied to with ``{"type": "pong", "event_id": }`` + - ``client_tool_call`` — tool-only / non-audio terminal turn: ends the + drain with an empty ``AudioChunk`` (issue #648) instead of hanging to + the ``response_timeout`` deadline. The adapter has no + ``client_tool_result`` path, so the agent cannot follow up with audio. - ``interruption`` — swallowed - - Other documented events (``vad_score``, ``client_tool_call``, - ``agent_response_metadata``, etc.) — silently skipped; the - provisioned test agent doesn't trigger them. + - Other documented events (``vad_score``, ``agent_response_metadata``, + etc.) — silently skipped; the provisioned test agent doesn't trigger them. + +A socket close mid-receive is also treated as a terminal: ``recv_audio`` +returns an empty ``AudioChunk`` so the drain exits cleanly (issue #648). """ from __future__ import annotations @@ -297,9 +303,15 @@ async def recv_audio(self, timeout: float) -> AudioChunk: deadline, so this returns when an ``audio`` event arrives and raises :class:`asyncio.TimeoutError` only after ``timeout`` seconds elapse with **no message of any kind**. Pings are replied to inline; - transcript events update instance attributes for observability; all + transcript events update instance attributes for observability; most other event types are swallowed without error. + Terminal (non-audio) completions return an **empty** ``AudioChunk`` + rather than hanging to the deadline (issue #648): a ``client_tool_call`` + (tool-only turn — this adapter has no ``client_tool_result`` path) and a + socket close mid-receive both end the drain cleanly, mirroring the + #646/PR647 reference fix and the Gemini Live / Pipecat idiom. + Design decision (issue #493 — intentional, not an oversight): because a received ping is treated as proof of liveness, a hosted agent that keeps pinging but never sends audio (e.g. a wedged tool/RAG call) will @@ -311,6 +323,8 @@ async def recv_audio(self, timeout: float) -> AudioChunk: (An absolute caller-side backstop for the wedged-agent case is tracked as a separate follow-up; it is intentionally not implemented here.) """ + import websockets # for the ConnectionClosed terminal (issue #648) + if self._ws is None: raise RuntimeError("ElevenLabsAgentAdapter: not connected") @@ -320,7 +334,21 @@ async def recv_audio(self, timeout: float) -> AudioChunk: if remaining <= 0: raise asyncio.TimeoutError("ElevenLabsAgentAdapter: recv_audio timed out") - raw = await asyncio.wait_for(self._ws.recv(), timeout=remaining) + try: + raw = await asyncio.wait_for(self._ws.recv(), timeout=remaining) + except websockets.exceptions.ConnectionClosed: + # Issue #648: the hosted agent finished its turn and the server + # closed the socket WITHOUT a trailing audio frame (a silent / + # tool-only turn). Mirror the #646/PR647 reference pattern (and + # the Gemini Live / Pipecat idiom): return an empty AudioChunk so + # the base ``_drain_agent_response`` loop exits cleanly, instead + # of letting ConnectionClosed propagate — the drain only catches + # asyncio.TimeoutError, so an unhandled close would crash the turn. + logger.debug( + "ElevenLabsAgentAdapter: socket closed during recv; " + "ending turn with empty chunk" + ) + return AudioChunk(data=b"") # A received message (ping included) proves the socket is alive, so # re-arm the idle deadline. Placed BEFORE json.loads so ANY frame — # even a non-JSON/malformed one — counts as a liveness signal. @@ -414,6 +442,22 @@ async def recv_audio(self, timeout: float) -> AudioChunk: in_fmt, ) + elif etype == "client_tool_call": + # Issue #648: EL ConvAI emits ``client_tool_call`` when the agent + # invokes a CLIENT-side tool. This adapter is a black-box test + # harness and does NOT send ``client_tool_result`` back, so the + # hosted agent will never produce spoken audio for this turn — it + # is a tool-only / non-audio terminal turn. Mirror the #646/PR647 + # reference pattern: return an empty AudioChunk so the drain exits + # cleanly instead of looping to the ``response_timeout`` deadline + # and raising. The tool call is observable on the wire; we surface + # the turn's completion, not its payload. + logger.debug( + "ElevenLabsAgentAdapter: client_tool_call (tool-only turn); " + "ending turn with empty chunk" + ) + return AudioChunk(data=b"") + elif etype == "interruption": pass # documented non-audio event, no action needed diff --git a/python/scenario/voice/adapters/websocket.py b/python/scenario/voice/adapters/websocket.py index b8106c5b5..79d92256b 100644 --- a/python/scenario/voice/adapters/websocket.py +++ b/python/scenario/voice/adapters/websocket.py @@ -68,13 +68,30 @@ async def send_audio(self, chunk: AudioChunk) -> None: await self._ws.send(payload) async def recv_audio(self, timeout: float) -> AudioChunk: + """Loop inbound frames until the protocol decodes an audio chunk. + + A clean server close (end of stream) with no final audio frame is a + terminal, not an error: ``recv_audio`` returns an empty ``AudioChunk`` + so the base ``_drain_agent_response`` loop exits cleanly (issue #648), + mirroring the #646/PR647 reference pattern and the Gemini Live / Pipecat + idiom. ``asyncio.TimeoutError`` is still raised on inter-message silence. + """ + import websockets # for the ConnectionClosed terminal (issue #648) + if self._ws is None: raise RuntimeError(f"{type(self).__name__}: not connected") loop = asyncio.get_running_loop() deadline = loop.time() + timeout while True: remaining = max(0.0, deadline - loop.time()) - message = await asyncio.wait_for(self._ws.recv(), timeout=remaining) + try: + message = await asyncio.wait_for(self._ws.recv(), timeout=remaining) + except websockets.exceptions.ConnectionClosed: + # End of stream: the server closed without a trailing audio + # frame. Surface a clean terminal rather than letting + # ConnectionClosed propagate — the drain only catches + # asyncio.TimeoutError, so an unhandled close crashes the turn. + return AudioChunk(data=b"") chunk = self.protocol.decode_response(message) if chunk is not None: return chunk From 2423a923a05395751c36e412f3e746ba718e140e Mon Sep 17 00:00:00 2001 From: Andrew Garde Joia Date: Sun, 21 Jun 2026 10:16:00 +0000 Subject: [PATCH 2/4] test(voice/#648): non-audio terminal drains cleanly (silent, socket-close, tool-only) Prove the audio-gated drain returns an empty AudioChunk (not a timeout/raise) on a non-audio completion, and that the normal audio path still drains (no regression). - python/tests/voice/test_audio_gated_drain.py (new): EL client_tool_call, EL socket close, and WebSocket socket close each return an empty chunk; normal audio for both adapters still returns the decoded payload. Terminal-case calls use a long recv budget under a short outer asyncio.wait_for ceiling, so an un-fixed adapter fails fast instead of stalling the suite. - elevenlabs.test.ts: add a client_tool_call terminal test; drop client_tool_call from the "swallows unknown events" case (it is now a terminal, not swallowed). Co-Authored-By: Claude Opus 4.8 --- .../adapters/__tests__/elevenlabs.test.ts | 25 ++- python/tests/voice/test_audio_gated_drain.py | 207 ++++++++++++++++++ 2 files changed, 230 insertions(+), 2 deletions(-) create mode 100644 python/tests/voice/test_audio_gated_drain.py diff --git a/javascript/src/voice/adapters/__tests__/elevenlabs.test.ts b/javascript/src/voice/adapters/__tests__/elevenlabs.test.ts index 2aeb4cae6..32fe3982a 100644 --- a/javascript/src/voice/adapters/__tests__/elevenlabs.test.ts +++ b/javascript/src/voice/adapters/__tests__/elevenlabs.test.ts @@ -631,13 +631,34 @@ describe("ElevenLabsAgentAdapter wire-protocol (onMessage branches)", () => { const { adapter, socket } = await makeConnected(); emit(socket, { type: "interruption" }); emit(socket, { type: "vad_score", vad_event: { score: 0.5 } }); - emit(socket, { type: "client_tool_call", payload: {} }); - // No throw, no mutation visible to callers. + emit(socket, { type: "agent_response_metadata", metadata: {} }); + // No throw, no mutation visible to callers. (`client_tool_call` is NOT here: + // it is a non-audio terminal, covered by its own test below — issue #648.) expect(adapter.lastAgentTranscript).toBeNull(); expect(adapter.lastUserTranscript).toBeNull(); await adapter.disconnect(); }); + it("client_tool_call (tool-only turn) resolves the receiver with an empty chunk (#648)", async () => { + const { adapter, socket } = await makeConnected(); + const recv = adapter.receiveAudio(2); + emit(socket, { + type: "client_tool_call", + client_tool_call: { + tool_name: "lookup_order", + tool_call_id: "call_1", + parameters: { order_id: "42" }, + }, + }); + const chunk = await recv; + // A tool-only turn yields no spoken audio (this adapter has no + // client_tool_result path). The drain must exit cleanly with an empty chunk + // rather than swallowing the event and hanging to the receiveAudio timeout. + expect(chunk).toBeInstanceOf(AudioChunk); + expect(chunk.data.length).toBe(0); + await adapter.disconnect(); + }); + it("ignores non-JSON frames cleanly", async () => { const { adapter, socket } = await makeConnected(); socket.emit("message", Buffer.from("not-json", "utf-8")); diff --git a/python/tests/voice/test_audio_gated_drain.py b/python/tests/voice/test_audio_gated_drain.py new file mode 100644 index 000000000..98b1cbf3e --- /dev/null +++ b/python/tests/voice/test_audio_gated_drain.py @@ -0,0 +1,207 @@ +""" +Issue #648 — audio-gated drain must terminate cleanly on a non-audio completion. + +The ElevenLabs (hosted ConvAI) and generic WebSocket adapters share an +audio-gated receive loop: historically each returned ONLY on an audio frame, so +a turn that completed WITHOUT producing audio drained to the ``response_timeout`` +deadline and raised (a latent hang surfaced by ``/sweep`` during PR #647, which +fixed the same anti-pattern in the OpenAI Realtime adapter for issue #646). + +The fix mirrors the #646/PR647 reference pattern (and the Gemini Live / +Pipecat idiom): on a non-audio terminal — a socket close, or an ElevenLabs +``client_tool_call`` (a tool-only turn that never yields spoken audio because +this adapter has no ``client_tool_result`` path) — ``recv_audio`` returns an +**empty** ``AudioChunk`` so the base ``_drain_agent_response`` loop exits +cleanly instead of hanging. + +These tests pin that behaviour and guard against regressing the normal +audio path. No real network: ``websockets.connect`` is patched to a mock whose +``recv()`` serves programmed frames (or raises ``ConnectionClosedOK`` to model a +clean server close). + +Each terminal-case assertion gives ``recv_audio`` a generous ``timeout`` (the +budget it would otherwise hang for) and wraps the call in a short outer +``asyncio.wait_for`` ceiling, so an un-fixed adapter that loops to its deadline +fails fast instead of stalling the suite — the empty-chunk fix returns +immediately and stays well under the ceiling. +""" + +import asyncio +import base64 +import json +from unittest.mock import AsyncMock, patch + +import pytest +from websockets.exceptions import ConnectionClosedOK + +from scenario.voice import AudioChunk, ElevenLabsAgentAdapter +from scenario.voice.adapters.websocket import ( + WebSocketAgentAdapter, + WebSocketProtocol, +) + + +# recv_audio is handed a long nominal budget (what an un-fixed adapter would +# hang for); the outer ceiling fails the test fast if the empty-chunk terminal +# is missing. The fix returns instantly, far under the ceiling. +RECV_TIMEOUT = 30.0 +OUTER_CEILING = 2.0 + + +def _scripted_ws(frames: list, *, then_close: bool = False) -> AsyncMock: + """A mock WS whose ``recv()`` serves ``frames`` in order. + + After the programmed frames are exhausted it either raises + ``ConnectionClosedOK`` (``then_close=True``, modelling a clean server close) + or blocks indefinitely (modelling a silent-but-open socket). The + block-forever tail is what makes the terminal-case tests RED on an un-fixed + adapter: without the empty-chunk return, ``recv_audio`` loops past the + swallowed non-audio frame into the blocking ``recv()`` and only the outer + ceiling unwinds it. + """ + idx = 0 + + async def fake_recv(): + nonlocal idx + if idx < len(frames): + msg = frames[idx] + idx += 1 + return msg + if then_close: + raise ConnectionClosedOK(None, None) + await asyncio.sleep(3600) # silent-but-open socket + raise AssertionError("unreachable") # pragma: no cover + + ws = AsyncMock() + ws.recv = fake_recv + ws.send = AsyncMock() + ws.close = AsyncMock() + return ws + + +# --------------------------------------------------------------------- ElevenLabs + + +@pytest.mark.asyncio +async def test_elevenlabs_client_tool_call_terminates_drain(): + """A tool-only turn (``client_tool_call``, no audio) returns an empty chunk. + + EL ConvAI emits ``client_tool_call`` when the agent invokes a client-side + tool. This adapter never sends ``client_tool_result``, so the agent produces + no spoken audio for the turn — pre-fix, ``recv_audio`` swallowed the event + and looped to the deadline. The fix surfaces the completion as an empty + ``AudioChunk``. + """ + adapter = ElevenLabsAgentAdapter(agent_id="a", api_key="k") + tool_call = json.dumps( + { + "type": "client_tool_call", + "client_tool_call": { + "tool_name": "lookup_order", + "tool_call_id": "call_1", + "parameters": {"order_id": "42"}, + }, + } + ) + mock_ws = _scripted_ws([tool_call]) # then blocks forever + + with patch("websockets.connect", new=AsyncMock(return_value=mock_ws)): + await adapter.connect() + result = await asyncio.wait_for( + adapter.recv_audio(timeout=RECV_TIMEOUT), timeout=OUTER_CEILING + ) + + assert isinstance(result, AudioChunk) + assert result.data == b"" # empty terminal, not a hang + + +@pytest.mark.asyncio +async def test_elevenlabs_socket_close_terminates_drain(): + """A clean server close mid-receive returns an empty chunk, not an error. + + Pre-fix, the unhandled ``ConnectionClosed`` propagated out of ``recv_audio`` + (the drain only catches ``asyncio.TimeoutError``) and crashed the turn. + """ + adapter = ElevenLabsAgentAdapter(agent_id="a", api_key="k") + mock_ws = _scripted_ws([], then_close=True) + + with patch("websockets.connect", new=AsyncMock(return_value=mock_ws)): + await adapter.connect() + result = await asyncio.wait_for( + adapter.recv_audio(timeout=RECV_TIMEOUT), timeout=OUTER_CEILING + ) + + assert isinstance(result, AudioChunk) + assert result.data == b"" + + +@pytest.mark.asyncio +async def test_elevenlabs_normal_audio_still_returned(): + """No regression: a normal ``audio`` frame is still decoded and returned.""" + adapter = ElevenLabsAgentAdapter(agent_id="a", api_key="k") + pcm_payload = b"\x12\x34" * 8 # 16 bytes of dummy PCM16 + b64 = base64.b64encode(pcm_payload).decode() + audio_frame = json.dumps({"type": "audio", "audio_event": {"audio_base_64": b64}}) + mock_ws = _scripted_ws([audio_frame]) + + with patch("websockets.connect", new=AsyncMock(return_value=mock_ws)): + await adapter.connect() + result = await asyncio.wait_for( + adapter.recv_audio(timeout=RECV_TIMEOUT), timeout=OUTER_CEILING + ) + + assert isinstance(result, AudioChunk) + assert result.data == pcm_payload # real audio, non-empty + + +# ----------------------------------------------------------------- WebSocket (generic) + + +class _BytesAudioProtocol(WebSocketProtocol): + """Minimal protocol: binary frames are PCM16 audio; everything else is non-audio.""" + + def encode_audio(self, audio: bytes): + return audio + + def decode_response(self, message): + if isinstance(message, (bytes, bytearray)): + return AudioChunk(data=bytes(message)) + return None + + +@pytest.mark.asyncio +async def test_websocket_socket_close_terminates_drain(): + """Generic WebSocket: a clean server close (end of stream) returns empty. + + Pre-fix, the ``while True`` loop returned only on a decoded audio chunk and + had no end-of-stream path, so a clean close raised an unhandled + ``ConnectionClosed``. + """ + adapter = WebSocketAgentAdapter(url="ws://x", protocol=_BytesAudioProtocol()) + mock_ws = _scripted_ws([], then_close=True) + + with patch("websockets.connect", new=AsyncMock(return_value=mock_ws)): + await adapter.connect() + result = await asyncio.wait_for( + adapter.recv_audio(timeout=RECV_TIMEOUT), timeout=OUTER_CEILING + ) + + assert isinstance(result, AudioChunk) + assert result.data == b"" + + +@pytest.mark.asyncio +async def test_websocket_normal_audio_still_returned(): + """No regression: a decoded audio frame is still returned from the loop.""" + adapter = WebSocketAgentAdapter(url="ws://x", protocol=_BytesAudioProtocol()) + pcm_payload = b"\xab\xcd" * 8 + mock_ws = _scripted_ws([pcm_payload]) + + with patch("websockets.connect", new=AsyncMock(return_value=mock_ws)): + await adapter.connect() + result = await asyncio.wait_for( + adapter.recv_audio(timeout=RECV_TIMEOUT), timeout=OUTER_CEILING + ) + + assert isinstance(result, AudioChunk) + assert result.data == pcm_payload From 5827647a3daf77832afc0564cbdbfc491c7f2e4d Mon Sep 17 00:00:00 2001 From: Andrew Garde Joia Date: Sun, 21 Jun 2026 10:36:49 +0000 Subject: [PATCH 3/4] test(voice/#648): cover abnormal socket close + clarify TS terminal-drop rationale MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Address non-blocking review feedback on PR #693 (no production-logic change): - test_audio_gated_drain.py: parametrize the EL + WebSocket socket-close tests over both ConnectionClosedOK (clean) and ConnectionClosedError (abnormal), since production catches the base ConnectionClosed — both subclasses must terminate the drain cleanly. - elevenlabs.ts: expand the client_tool_call comment to explain why dropping the terminal when no receiver is in flight is safe (a terminal carries no payload; the drain always parks a waiter first) and why queuing an empty sentinel would be worse (a spurious empty turn on the next receiveAudio). Co-Authored-By: Claude Opus 4.8 --- javascript/src/voice/adapters/elevenlabs.ts | 16 +++-- python/tests/voice/test_audio_gated_drain.py | 61 ++++++++++++-------- 2 files changed, 49 insertions(+), 28 deletions(-) diff --git a/javascript/src/voice/adapters/elevenlabs.ts b/javascript/src/voice/adapters/elevenlabs.ts index 03cdaae7f..3529241f5 100644 --- a/javascript/src/voice/adapters/elevenlabs.ts +++ b/javascript/src/voice/adapters/elevenlabs.ts @@ -494,10 +494,18 @@ export class ElevenLabsAgentAdapter extends VoiceAgentAdapter { // / non-audio terminal turn. Resolve the active `receiveAudio` waiter with // an empty chunk so the base drain (`drainAgentResponse`) exits cleanly // instead of hanging to the `receiveAudio` timeout. Mirrors the #646/PR647 - // reference fix and the Python parity in `elevenlabs.py`. If no receive is - // in flight there is nothing to unblock — drop it (the close/error - // handlers use the same active-waiters-only semantics via - // `drainPendingWaiters`). + // reference fix and the Python parity in `elevenlabs.py`. + // + // If no receive is in flight we DROP the terminal rather than queue it + // (unlike the `audio` branch above, which buffers onto `audioQueue`). Safe + // because: (1) a terminal carries no payload to preserve, so nothing is + // lost; (2) the drain always parks a waiter before the agent acts + // (call -> drain -> receiveAudio awaits), so a mid-turn tool call always + // finds one. Queuing an empty sentinel would be WORSE — it would surface as + // the NEXT turn's first `receiveAudio` result, a spurious empty turn. This + // matches the active-waiters-only semantics of onSocketClose / onSocketError + // (`drainPendingWaiters`). Python differs only because its pull-loop + // `recv_audio` hands the terminal to whichever call asks next. const waiter = this.waiters.shift(); if (waiter) waiter(new AudioChunk({ data: new Uint8Array(0) })); return; diff --git a/python/tests/voice/test_audio_gated_drain.py b/python/tests/voice/test_audio_gated_drain.py index 98b1cbf3e..065aab6ef 100644 --- a/python/tests/voice/test_audio_gated_drain.py +++ b/python/tests/voice/test_audio_gated_drain.py @@ -29,10 +29,11 @@ import asyncio import base64 import json +from typing import Optional from unittest.mock import AsyncMock, patch import pytest -from websockets.exceptions import ConnectionClosedOK +from websockets.exceptions import ConnectionClosedError, ConnectionClosedOK from scenario.voice import AudioChunk, ElevenLabsAgentAdapter from scenario.voice.adapters.websocket import ( @@ -48,16 +49,16 @@ OUTER_CEILING = 2.0 -def _scripted_ws(frames: list, *, then_close: bool = False) -> AsyncMock: +def _scripted_ws(frames: list, *, close_with: Optional[Exception] = None) -> AsyncMock: """A mock WS whose ``recv()`` serves ``frames`` in order. - After the programmed frames are exhausted it either raises - ``ConnectionClosedOK`` (``then_close=True``, modelling a clean server close) - or blocks indefinitely (modelling a silent-but-open socket). The - block-forever tail is what makes the terminal-case tests RED on an un-fixed - adapter: without the empty-chunk return, ``recv_audio`` loops past the - swallowed non-audio frame into the blocking ``recv()`` and only the outer - ceiling unwinds it. + After the programmed frames are exhausted it either raises ``close_with`` (a + ``ConnectionClosed*`` instance, modelling a server close) or — when + ``close_with is None`` — blocks indefinitely (modelling a silent-but-open + socket). The block-forever tail is what makes the terminal-case tests RED on + an un-fixed adapter: without the empty-chunk return, ``recv_audio`` loops + past the swallowed non-audio frame into the blocking ``recv()`` and only the + outer ceiling unwinds it. """ idx = 0 @@ -67,8 +68,8 @@ async def fake_recv(): msg = frames[idx] idx += 1 return msg - if then_close: - raise ConnectionClosedOK(None, None) + if close_with is not None: + raise close_with await asyncio.sleep(3600) # silent-but-open socket raise AssertionError("unreachable") # pragma: no cover @@ -79,6 +80,12 @@ async def fake_recv(): return ws +# Production catches the base ``ConnectionClosed``; both a clean close +# (``ConnectionClosedOK``) and an abnormal one (``ConnectionClosedError``) must +# terminate the drain cleanly, so the socket-close tests run against both. +_CLOSE_CLASSES = [ConnectionClosedOK, ConnectionClosedError] + + # --------------------------------------------------------------------- ElevenLabs @@ -116,14 +123,18 @@ async def test_elevenlabs_client_tool_call_terminates_drain(): @pytest.mark.asyncio -async def test_elevenlabs_socket_close_terminates_drain(): - """A clean server close mid-receive returns an empty chunk, not an error. - - Pre-fix, the unhandled ``ConnectionClosed`` propagated out of ``recv_audio`` - (the drain only catches ``asyncio.TimeoutError``) and crashed the turn. +@pytest.mark.parametrize("close_cls", _CLOSE_CLASSES) +async def test_elevenlabs_socket_close_terminates_drain(close_cls): + """A server close mid-receive returns an empty chunk, not an error. + + Runs against both a clean close (``ConnectionClosedOK``) and an abnormal one + (``ConnectionClosedError``): production catches the base ``ConnectionClosed``, + so both subclasses must terminate the drain cleanly. Pre-fix, the unhandled + ``ConnectionClosed`` propagated out of ``recv_audio`` (the drain only catches + ``asyncio.TimeoutError``) and crashed the turn. """ adapter = ElevenLabsAgentAdapter(agent_id="a", api_key="k") - mock_ws = _scripted_ws([], then_close=True) + mock_ws = _scripted_ws([], close_with=close_cls(None, None)) with patch("websockets.connect", new=AsyncMock(return_value=mock_ws)): await adapter.connect() @@ -170,15 +181,17 @@ def decode_response(self, message): @pytest.mark.asyncio -async def test_websocket_socket_close_terminates_drain(): - """Generic WebSocket: a clean server close (end of stream) returns empty. - - Pre-fix, the ``while True`` loop returned only on a decoded audio chunk and - had no end-of-stream path, so a clean close raised an unhandled - ``ConnectionClosed``. +@pytest.mark.parametrize("close_cls", _CLOSE_CLASSES) +async def test_websocket_socket_close_terminates_drain(close_cls): + """Generic WebSocket: a server close (end of stream) returns empty. + + Runs against both clean (``ConnectionClosedOK``) and abnormal + (``ConnectionClosedError``) closes. Pre-fix, the ``while True`` loop returned + only on a decoded audio chunk and had no end-of-stream path, so a close + raised an unhandled ``ConnectionClosed``. """ adapter = WebSocketAgentAdapter(url="ws://x", protocol=_BytesAudioProtocol()) - mock_ws = _scripted_ws([], then_close=True) + mock_ws = _scripted_ws([], close_with=close_cls(None, None)) with patch("websockets.connect", new=AsyncMock(return_value=mock_ws)): await adapter.connect() From 3f9b4ac9c96a03e15104543eb99fa94fe41096dd Mon Sep 17 00:00:00 2001 From: Andrew Garde Joia Date: Sun, 21 Jun 2026 10:49:57 +0000 Subject: [PATCH 4/4] test(voice/#648): drain-level regression test + honest unit-test names MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Address review feedback (Metz/Beck, Fowler) on PR #693 — test-only: - Add test_elevenlabs_tool_only_turn_drain_exits_cleanly: drives _drain_agent_response end-to-end on a tool-only turn and asserts it returns an empty merged turn instead of raising FirstChunkTimeoutError. Guards the bug at the level it was reported (a drain-level hang), above the recv_audio unit tests. Red without the fix (drain hangs to the outer ceiling). - Rename the recv_audio unit tests *_terminates_drain -> *_returns_empty_chunk so the names match what they assert (the recv contract, not the drain loop). Co-Authored-By: Claude Opus 4.8 --- python/tests/voice/test_audio_gated_drain.py | 51 +++++++++++++++++--- 1 file changed, 45 insertions(+), 6 deletions(-) diff --git a/python/tests/voice/test_audio_gated_drain.py b/python/tests/voice/test_audio_gated_drain.py index 065aab6ef..31e4e07ab 100644 --- a/python/tests/voice/test_audio_gated_drain.py +++ b/python/tests/voice/test_audio_gated_drain.py @@ -90,8 +90,8 @@ async def fake_recv(): @pytest.mark.asyncio -async def test_elevenlabs_client_tool_call_terminates_drain(): - """A tool-only turn (``client_tool_call``, no audio) returns an empty chunk. +async def test_elevenlabs_client_tool_call_returns_empty_chunk(): + """Unit: a tool-only turn (``client_tool_call``, no audio) returns empty. EL ConvAI emits ``client_tool_call`` when the agent invokes a client-side tool. This adapter never sends ``client_tool_result``, so the agent produces @@ -124,8 +124,8 @@ async def test_elevenlabs_client_tool_call_terminates_drain(): @pytest.mark.asyncio @pytest.mark.parametrize("close_cls", _CLOSE_CLASSES) -async def test_elevenlabs_socket_close_terminates_drain(close_cls): - """A server close mid-receive returns an empty chunk, not an error. +async def test_elevenlabs_socket_close_returns_empty_chunk(close_cls): + """Unit: a server close mid-receive returns an empty chunk, not an error. Runs against both a clean close (``ConnectionClosedOK``) and an abnormal one (``ConnectionClosedError``): production catches the base ``ConnectionClosed``, @@ -165,6 +165,45 @@ async def test_elevenlabs_normal_audio_still_returned(): assert result.data == pcm_payload # real audio, non-empty +@pytest.mark.asyncio +async def test_elevenlabs_tool_only_turn_drain_exits_cleanly(): + """Drain-level: a tool-only turn makes ``_drain_agent_response`` exit cleanly. + + This is the end-to-end guard for the bug as reported — a *drain*-level hang — + above the unit tests that pin ``recv_audio`` alone. With a ``client_tool_call`` + and no audio: the first ``recv_audio`` returns an empty chunk; the drain marks + no first-chunk, enters its tail loop, and breaks on the next recv (the now- + silent socket times out at ``response_tail_silence``), returning an empty + merged turn. Pre-fix, the first ``recv_audio`` looped to ``response_timeout`` + and the drain raised :class:`FirstChunkTimeoutError`. + """ + adapter = ElevenLabsAgentAdapter(agent_id="a", api_key="k") + # After the empty first chunk the drain does one more recv that must time out + # fast against the now-silent socket — keep the tail-silence wait tiny. + adapter.response_tail_silence = 0.1 + tool_call = json.dumps( + { + "type": "client_tool_call", + "client_tool_call": { + "tool_name": "lookup_order", + "tool_call_id": "call_1", + "parameters": {}, + }, + } + ) + mock_ws = _scripted_ws([tool_call]) # then blocks (silent-but-open) socket + + with patch("websockets.connect", new=AsyncMock(return_value=mock_ws)): + await adapter.connect() + merged = await asyncio.wait_for( + adapter._drain_agent_response(), timeout=OUTER_CEILING + ) + + # Drain returned an empty merged turn instead of raising FirstChunkTimeoutError. + assert isinstance(merged, AudioChunk) + assert merged.data == b"" + + # ----------------------------------------------------------------- WebSocket (generic) @@ -182,8 +221,8 @@ def decode_response(self, message): @pytest.mark.asyncio @pytest.mark.parametrize("close_cls", _CLOSE_CLASSES) -async def test_websocket_socket_close_terminates_drain(close_cls): - """Generic WebSocket: a server close (end of stream) returns empty. +async def test_websocket_socket_close_returns_empty_chunk(close_cls): + """Unit: generic WebSocket server close (end of stream) returns empty. Runs against both clean (``ConnectionClosedOK``) and abnormal (``ConnectionClosedError``) closes. Pre-fix, the ``while True`` loop returned