Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 23 additions & 2 deletions javascript/src/voice/adapters/__tests__/elevenlabs.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -631,13 +631,34 @@ describe("ElevenLabsAgentAdapter wire-protocol (onMessage branches)", () => {
const { adapter, socket } = await makeConnected();
emit(socket, { type: "interruption" });
emit(socket, { type: "vad_score", vad_event: { score: 0.5 } });
emit(socket, { type: "client_tool_call", payload: {} });
// No throw, no mutation visible to callers.
emit(socket, { type: "agent_response_metadata", metadata: {} });
// No throw, no mutation visible to callers. (`client_tool_call` is NOT here:
// it is a non-audio terminal, covered by its own test below — issue #648.)
expect(adapter.lastAgentTranscript).toBeNull();
expect(adapter.lastUserTranscript).toBeNull();
await adapter.disconnect();
});

it("client_tool_call (tool-only turn) resolves the receiver with an empty chunk (#648)", async () => {
const { adapter, socket } = await makeConnected();
const recv = adapter.receiveAudio(2);
emit(socket, {
type: "client_tool_call",
client_tool_call: {
tool_name: "lookup_order",
tool_call_id: "call_1",
parameters: { order_id: "42" },
},
});
const chunk = await recv;
// A tool-only turn yields no spoken audio (this adapter has no
// client_tool_result path). The drain must exit cleanly with an empty chunk
// rather than swallowing the event and hanging to the receiveAudio timeout.
expect(chunk).toBeInstanceOf(AudioChunk);
expect(chunk.data.length).toBe(0);
await adapter.disconnect();
});

it("ignores non-JSON frames cleanly", async () => {
const { adapter, socket } = await makeConnected();
socket.emit("message", Buffer.from("not-json", "utf-8"));
Expand Down
30 changes: 30 additions & 0 deletions javascript/src/voice/adapters/elevenlabs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,15 @@
* `lastAgentTranscript`
* - `audio` — decoded base64 PCM16 and returned from `receiveAudio`
* - `ping` — replied with `{"type": "pong", "event_id": <id>}`
* - `client_tool_call` — tool-only / non-audio terminal turn: resolves the
* drain with an empty `AudioChunk` (issue #648) instead of hanging to the
* `receiveAudio` timeout (no `client_tool_result` path → no follow-up audio)
* - `interruption` — swallowed
* - Anything else — silently skipped
*
* A socket close mid-receive is likewise terminal: `onSocketClose` resolves
* pending waiters with an empty `AudioChunk` so the drain exits cleanly (#648).
*
* {@link ElevenLabsVoiceAgent} — the typed *local* composable preset (distinct
* responsibility, same vendor): you compose {@link ElevenLabsSTTProvider} + any
* LLM + ElevenLabs TTS yourself, keeping control over prompts, model choice,
Expand Down Expand Up @@ -481,6 +487,30 @@ export class ElevenLabsAgentAdapter extends VoiceAgentAdapter {
return;
}

if (etype === "client_tool_call") {
// Issue #648: EL ConvAI emits `client_tool_call` when the agent invokes a
// CLIENT-side tool. This adapter has no `client_tool_result` path, so the
// agent will never produce spoken audio for this turn — it is a tool-only
// / non-audio terminal turn. Resolve the active `receiveAudio` waiter with
// an empty chunk so the base drain (`drainAgentResponse`) exits cleanly
// instead of hanging to the `receiveAudio` timeout. Mirrors the #646/PR647
// reference fix and the Python parity in `elevenlabs.py`.
//
// If no receive is in flight we DROP the terminal rather than queue it
// (unlike the `audio` branch above, which buffers onto `audioQueue`). Safe
// because: (1) a terminal carries no payload to preserve, so nothing is
// lost; (2) the drain always parks a waiter before the agent acts
// (call -> drain -> receiveAudio awaits), so a mid-turn tool call always
// finds one. Queuing an empty sentinel would be WORSE — it would surface as
// the NEXT turn's first `receiveAudio` result, a spurious empty turn. This
// matches the active-waiters-only semantics of onSocketClose / onSocketError
// (`drainPendingWaiters`). Python differs only because its pull-loop
// `recv_audio` hands the terminal to whichever call asks next.
const waiter = this.waiters.shift();
if (waiter) waiter(new AudioChunk({ data: new Uint8Array(0) }));
return;
}

// `interruption` and any unknown events are swallowed — Python parity.
}
}
Expand Down
54 changes: 49 additions & 5 deletions python/scenario/voice/adapters/elevenlabs.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,16 @@
``last_agent_transcript`` (post-barge-in update)
- ``audio`` — decoded and returned from ``recv_audio``
- ``ping`` — replied to with ``{"type": "pong", "event_id": <id>}``
- ``client_tool_call`` — tool-only / non-audio terminal turn: ends the
drain with an empty ``AudioChunk`` (issue #648) instead of hanging to
the ``response_timeout`` deadline. The adapter has no
``client_tool_result`` path, so the agent cannot follow up with audio.
- ``interruption`` — swallowed
- Other documented events (``vad_score``, ``client_tool_call``,
``agent_response_metadata``, etc.) — silently skipped; the
provisioned test agent doesn't trigger them.
- Other documented events (``vad_score``, ``agent_response_metadata``,
etc.) — silently skipped; the provisioned test agent doesn't trigger them.

A socket close mid-receive is also treated as a terminal: ``recv_audio``
returns an empty ``AudioChunk`` so the drain exits cleanly (issue #648).
"""

from __future__ import annotations
Expand Down Expand Up @@ -297,9 +303,15 @@ async def recv_audio(self, timeout: float) -> AudioChunk:
deadline, so this returns when an ``audio`` event arrives and raises
:class:`asyncio.TimeoutError` only after ``timeout`` seconds elapse
with **no message of any kind**. Pings are replied to inline;
transcript events update instance attributes for observability; all
transcript events update instance attributes for observability; most
other event types are swallowed without error.

Terminal (non-audio) completions return an **empty** ``AudioChunk``
rather than hanging to the deadline (issue #648): a ``client_tool_call``
(tool-only turn — this adapter has no ``client_tool_result`` path) and a
socket close mid-receive both end the drain cleanly, mirroring the
#646/PR647 reference fix and the Gemini Live / Pipecat idiom.

Design decision (issue #493 — intentional, not an oversight): because
a received ping is treated as proof of liveness, a hosted agent that
keeps pinging but never sends audio (e.g. a wedged tool/RAG call) will
Expand All @@ -311,6 +323,8 @@ async def recv_audio(self, timeout: float) -> AudioChunk:
(An absolute caller-side backstop for the wedged-agent case is tracked
as a separate follow-up; it is intentionally not implemented here.)
"""
import websockets # for the ConnectionClosed terminal (issue #648)

if self._ws is None:
raise RuntimeError("ElevenLabsAgentAdapter: not connected")

Expand All @@ -320,7 +334,21 @@ async def recv_audio(self, timeout: float) -> AudioChunk:
if remaining <= 0:
raise asyncio.TimeoutError("ElevenLabsAgentAdapter: recv_audio timed out")

raw = await asyncio.wait_for(self._ws.recv(), timeout=remaining)
try:
raw = await asyncio.wait_for(self._ws.recv(), timeout=remaining)
except websockets.exceptions.ConnectionClosed:
# Issue #648: the hosted agent finished its turn and the server
# closed the socket WITHOUT a trailing audio frame (a silent /
# tool-only turn). Mirror the #646/PR647 reference pattern (and
# the Gemini Live / Pipecat idiom): return an empty AudioChunk so
# the base ``_drain_agent_response`` loop exits cleanly, instead
# of letting ConnectionClosed propagate — the drain only catches
# asyncio.TimeoutError, so an unhandled close would crash the turn.
logger.debug(
"ElevenLabsAgentAdapter: socket closed during recv; "
"ending turn with empty chunk"
)
return AudioChunk(data=b"")
# A received message (ping included) proves the socket is alive, so
# re-arm the idle deadline. Placed BEFORE json.loads so ANY frame —
# even a non-JSON/malformed one — counts as a liveness signal.
Expand Down Expand Up @@ -414,6 +442,22 @@ async def recv_audio(self, timeout: float) -> AudioChunk:
in_fmt,
)

elif etype == "client_tool_call":
# Issue #648: EL ConvAI emits ``client_tool_call`` when the agent
# invokes a CLIENT-side tool. This adapter is a black-box test
# harness and does NOT send ``client_tool_result`` back, so the
# hosted agent will never produce spoken audio for this turn — it
# is a tool-only / non-audio terminal turn. Mirror the #646/PR647
# reference pattern: return an empty AudioChunk so the drain exits
# cleanly instead of looping to the ``response_timeout`` deadline
# and raising. The tool call is observable on the wire; we surface
# the turn's completion, not its payload.
logger.debug(
"ElevenLabsAgentAdapter: client_tool_call (tool-only turn); "
"ending turn with empty chunk"
)
return AudioChunk(data=b"")

elif etype == "interruption":
pass # documented non-audio event, no action needed

Expand Down
19 changes: 18 additions & 1 deletion python/scenario/voice/adapters/websocket.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,30 @@ async def send_audio(self, chunk: AudioChunk) -> None:
await self._ws.send(payload)

async def recv_audio(self, timeout: float) -> AudioChunk:
"""Loop inbound frames until the protocol decodes an audio chunk.
A clean server close (end of stream) with no final audio frame is a
terminal, not an error: ``recv_audio`` returns an empty ``AudioChunk``
so the base ``_drain_agent_response`` loop exits cleanly (issue #648),
mirroring the #646/PR647 reference pattern and the Gemini Live / Pipecat
idiom. ``asyncio.TimeoutError`` is still raised on inter-message silence.
"""
import websockets # for the ConnectionClosed terminal (issue #648)

if self._ws is None:
raise RuntimeError(f"{type(self).__name__}: not connected")
loop = asyncio.get_running_loop()
deadline = loop.time() + timeout
while True:
remaining = max(0.0, deadline - loop.time())
message = await asyncio.wait_for(self._ws.recv(), timeout=remaining)
try:
message = await asyncio.wait_for(self._ws.recv(), timeout=remaining)
except websockets.exceptions.ConnectionClosed:
# End of stream: the server closed without a trailing audio
# frame. Surface a clean terminal rather than letting
# ConnectionClosed propagate — the drain only catches
# asyncio.TimeoutError, so an unhandled close crashes the turn.
return AudioChunk(data=b"")
chunk = self.protocol.decode_response(message)
if chunk is not None:
return chunk
Loading
Loading