-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Description
pipecat version
0.0.105
Python version
3.12.11
Operating System
MacOS 15.7.4
Issue description
Under adverse network conditions with high latency on the Internet connection the following sequence of events results in ElevenLabsTTSService pausing the frame processing of anything behind the TTSSpeakFrame . Several things have to happen (observed once, with fully running pipline) under right conditions.
Production pipeline
pipeline = Pipeline(
[
ws_transport.input(), # fastapi websocket
stt,
user_aggregator,
llm, # LLM
tts, # TTS
interruption_processor, # custom processor that pushes all frames in all directions, just tracks interruptions between BotStartedSpeaking and BotStoppedSpeaking events
ws_transport.output(),
assistant_aggregator,
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
audio_in_sample_rate=16000,
enable_metrics=True,
enable_usage_metrics=True,
),
)Log from production
{"timestamp": "2026-03-12T23:07:02.207720Z", "level": "debug", "message": "OpenAILLMService#0 Calling function [send_customer_request_or_update:call_nGE6SxeSt3dD0S7IaugB4ILC] with arguments {'customer_request_or_update': 'Customer wants a quote for insuring a motorcycle.'}", "location": "pipecat.services.llm_service:_run_function_call:841"}
{"timestamp": "2026-03-12T23:07:02.208852Z", "level": "debug", "message": "Sending customer request to Supervisor API: 'Customer wants a quote for insuring a motorcycle.'", "location": "my_agent.functions.manager:callback:106"}
{"timestamp": "2026-03-12T23:07:02.211566Z", "level": "debug", "message": "LLMAssistantAggregator#0 FunctionCallsStartedFrame: ['send_customer_request_or_update:call_nGE6SxeSt3dD0S7IaugB4ILC']", "location": "pipecat.processors.aggregators.llm_response_universal:_handle_function_calls_started:1015"}
{"timestamp": "2026-03-12T23:07:02.214738Z", "level": "debug", "message": "LLMAssistantAggregator#0 FunctionCallInProgressFrame: [send_customer_request_or_update:call_nGE6SxeSt3dD0S7IaugB4ILC]", "location": "pipecat.processors.aggregators.llm_response_universal:_handle_function_call_in_progress:1020"}
{"timestamp": "2026-03-12T23:07:02.459324Z", "level": "debug", "message": "getting time buying phrase from llm with prompt=Provide a short confirmation to the customer that you are working on their request and context messages=[{'role': 'assistant', 'content': 'Right now, I can only provide quotes for car'}, {'role': 'user', 'content': 'Okay.'}, {'role': 'user', 'content': \"Go ahead. Let's do, a motorcycle.\"}, {'role': 'assistant', 'tool_calls': [{'id': 'call_nGE6SxeSt3dD0S7IaugB4ILC', 'function': {'name': 'send_customer_request_or_update', 'arguments': '{\"customer_request_or_update\": \"Customer wants a quote for insuring a motorcycle.\"}'}, 'type': 'function'}]}, {'role': 'tool', 'content': 'IN_PROGRESS', 'tool_call_id': 'call_nGE6SxeSt3dD0S7IaugB4ILC'}]", "location": "my_agent.functions.manager:callback:127"}
{"timestamp": "2026-03-12T23:07:03.230536Z", "level": "debug", "message": "time buying phrase from llm: Thanks for letting me know you're interested in a motorcycle quote. I'm gathering the necessary information and will have some options for you shortly.", "location": "my_agent.functions.manager:callback:130"}
{"timestamp": "2026-03-12T23:07:03.232558Z", "level": "debug", "message": "ElevenLabsTTSService#0: Generating TTS [Thanks for letting me know you're interested in a motorcycle quote. I'm gathering the necessary information and will have some options for you shortly.]", "location": "pipecat.services.elevenlabs.tts:run_tts:862"}
{"timestamp": "2026-03-12T23:07:03.233130Z", "level": "debug", "message": "ElevenLabsTTSService#0 usage characters: 151", "location": "pipecat.processors.metrics.frame_processor_metrics:start_tts_usage_metrics:214"}
{"timestamp": "2026-03-12T23:07:03.233351Z", "level": "debug", "message": "ElevenLabsTTSService#0 processing time: 0.001s", "location": "pipecat.processors.metrics.frame_processor_metrics:stop_processing_metrics:175"}
{"timestamp": "2026-03-12T23:07:03.453750Z", "level": "debug", "message": "Ignoring message from unavailable context: None", "location": "pipecat.services.elevenlabs.tts:_receive_messages:781"}
{"timestamp": "2026-03-12T23:07:03.812552Z", "level": "info", "message": "received from Supervisor API: type=<EventType.PROCESSING_START: 'processingStart'> actionId=None", "location": "my_agent.supervisor_api.client:_create_event_generator:122"}
{"timestamp": "2026-03-12T23:07:04.378579Z", "level": "warning", "message": "ElevenLabsTTSService#0 connection closed, but with an error: received 1008 (policy violation) Maximum simultaneous contexts per WebSocket connection exceeded (5). Please close an existing context before opening a n; then sent 1008 (policy violation) Maximum simultaneous contexts per WebSocket connection exceeded (5). Please close an existing context before opening a n", "location": "pipecat.services.websocket_service:_maybe_try_reconnect:149"}
{"timestamp": "2026-03-12T23:07:04.379099Z", "level": "warning", "message": "ElevenLabsTTSService#0 reconnecting, attempt 1", "location": "pipecat.services.websocket_service:_try_reconnect:86"}
{"timestamp": "2026-03-12T23:07:04.379403Z", "level": "warning", "message": "ElevenLabsTTSService#0 reconnecting (attempt: 1)", "location": "pipecat.services.websocket_service:_reconnect_websocket:66"}
{"timestamp": "2026-03-12T23:07:04.379654Z", "level": "debug", "message": "ElevenLabsTTSService#0 TTFB: 1.147s", "location": "pipecat.processors.metrics.frame_processor_metrics:stop_ttfb_metrics:143"}
{"timestamp": "2026-03-12T23:07:04.379983Z", "level": "debug", "message": "Disconnecting from ElevenLabs", "location": "pipecat.services.elevenlabs.tts:_disconnect_websocket:706"}
{"timestamp": "2026-03-12T23:07:04.386515Z", "level": "error", "message": "ElevenLabsTTSService#0 exception (******/.venv/lib/python3.12/site-packages/websockets/asyncio/connection.py:957): Unknown error occurred: received 1008 (policy violation) Maximum simultaneous contexts per WebSocket connection exceeded (5). Please close an existing context before opening a n; then sent 1008 (policy violation) Maximum simultaneous contexts per WebSocket connection exceeded (5). Please close an existing context before opening a n", "location": "pipecat.processors.frame_processor:push_error_frame:744"}
{"timestamp": "2026-03-12T23:07:04.386886Z", "level": "debug", "message": "Connecting to ElevenLabs", "location": "pipecat.services.elevenlabs.tts:_connect_websocket:664"}
{"timestamp": "2026-03-12T23:07:04.389601Z", "level": "warning", "message": "PipelineTask#0: Something went wrong: ErrorFrame#0(error: Unknown error occurred: received 1008 (policy violation) Maximum simultaneous contexts per WebSocket connection exceeded (5). Please close an existing context before opening a n; then sent 1008 (policy violation) Maximum simultaneous contexts per WebSocket connection exceeded (5). Please close an existing context before opening a n, fatal: False)", "location": "pipecat.pipeline.task:_source_push_frame:903"}
{"timestamp": "2026-03-12T23:07:04.625320Z", "level": "info", "message": "ElevenLabsTTSService#0 reconnected successfully on attempt 1", "location": "pipecat.services.websocket_service:_try_reconnect:88"}
{"timestamp": "2026-03-12T23:07:05.235542Z", "level": "debug", "message": "ElevenLabsTTSService#0 cleaning up TTS context b1272756-8412-4b1d-a114-febfa217f6a3", "location": "pipecat.services.tts_service:push_frame:773"}
{"timestamp": "2026-03-12T23:07:07.312832Z", "level": "info", "message": "received from Supervisor API: type=<EventType.REPLY: 'reply'> actionId='cfeb8c27-d395-49f0-8cda-a09da791a38a' reply=Reply(messageId='01KKJ4VGP6QZCXWE6AZBW6ACHA', text='Ask the customer for the approximate value of their motorcycle so we can provide an insurance quote. Call me back with their answer so I can proceed.')", "location": "my_agent.supervisor_api.client:_create_event_generator:122"}
{"timestamp": "2026-03-12T23:07:07.313088Z", "level": "debug", "message": "Supervisor API reply received: 'Ask the customer for the approximate value of their motorcycle so we can provide an insurance quote. Call me back with their answer so I can proceed.'", "location": "my_agent.supervisor_api.session:_consume_events:73"}
{"timestamp": "2026-03-12T23:07:07.355189Z", "level": "info", "message": "received from Supervisor API: type=<EventType.PROCESSING_END: 'processingEnd'> actionId=None", "location": "my_agent.supervisor_api.client:_create_event_generator:122"}
{"timestamp": "2026-03-12T23:07:23.812971Z", "level": "debug", "message": "LLMUserAggregator#0: User started speaking (strategy: VADUserTurnStartStrategy#0)", "location": "pipecat.processors.aggregators.llm_response_universal:_on_user_turn_started:721"}
{"timestamp": "2026-03-12T23:07:23.813268Z", "level": "debug", "message": "LLMUserAggregator#0: broadcasting interruption", "location": "pipecat.processors.frame_processor:broadcast_interruption:765"}Observations
23:07:03.232 TTSSpeakFrame processed → TTS PAUSES __process_frame_task_handler . See tts_service.py:717-739, await self._maybe_pause_frame_processing() pauses frame processing
23:07:03.453 Error JSON dropped (no contextId) - see exact error in the reproduction code below
23:07:04.378 ConnectionClosedError → reconnect starts
23:07:04.625 Reconnected
23:07:05.235 _stop_frame_handler fires TTSStoppedFrame (2s timeout)
→ reaches BaseOutputTransport
→ _tts_audio_received=False → NO BotStoppedSpeakingFrame - see tts_service.py:757-759
→ TTS STAYS PAUSED
23:07:07.415 result_callback pushes FunctionCallResultFrame
→ enters TTS __process_queue
→ __process_frame_task_handler is BLOCKED → frame trapped
--> Caller does NOT here LLM response based on function result, it seems like assistant is just silent
23:07:23.812 Customer barge-in → InterruptionFrame → TTS unblocked (too late)
Contributing factors are:
- high latency on websocket connection to ElevenLabs
- error on websocket - likely result of high latency, audio context don't get closed in time from server point of view
- unlucky timing resulting in error before any audio came back from ElevenLabs
With the condition above, the ElevenLabsTTS (and maybe other websocket TTS services that pause processing of frames while waiting for audio rendering) would block all frames in pipeline because it's waiting on BotStoppedSpeakingFrame that never arrives (and transport never sends it because no audio has been received)
Reproduction steps
The following test should reproduce the issue. Note the error JSON represents the real MultiContext Websocket ElevenLabs API response when over 5 contexts are deliberately open. I expect other error messages to follow the same format, though I didn't quite find definitive documentation on websocket error format.
"""
Regression test: ElevenLabs error causes permanent TTS pipeline deadlock.
Production failure
------------------
23:07:03 — TTSSpeakFrame (time-buying phrase) processed by TTS.
ElevenLabsTTSService sets pause_frame_processing=True,
so __process_frame_task_handler blocks after synthesis.
23:07:03 — ElevenLabs error JSON silently dropped (_receive_messages).
No audio is ever delivered.
23:07:05 — _stop_frame_handler fires TTSStoppedFrame (2 s timeout).
It reaches BaseOutputTransport, but _tts_audio_received is False
(no audio arrived) → BotStoppedSpeakingFrame is NOT emitted.
23:07:07 — FunctionCallResultFrame pushed by result_callback.
It enters the TTS's __process_queue — but the process task
is blocked on __process_event.wait() → frame is trapped.
23:07:23 — Customer barge-in (InterruptionFrame) finally resets the
TTS process task and unblocks the queue.
Root cause chain
----------------
1. _receive_messages() silently drops the ElevenLabs error JSON
(no "contextId" → "Ignoring message from unavailable context: None").
2. No audio reaches BaseOutputTransport → _tts_audio_received stays False
→ BotStoppedSpeakingFrame is never emitted.
3. Without BotStoppedSpeakingFrame, the TTS never calls
_maybe_resume_frame_processing() (tts_service.py:755).
4. The TTS's __process_frame_task_handler stays blocked forever.
5. All subsequent non-system frames (FunctionCallResultFrame, TTSSpeakFrame)
are trapped in the paused __process_queue.
See also: test_elevenlabs_error_json_dropped.py — isolates bug #1 (the trigger).
Pipecat version: 0.0.105
"""
import asyncio
import json
import pytest
from unittest.mock import patch
from websockets.exceptions import ConnectionClosedError
from websockets.frames import Close
from websockets.protocol import State
from pipecat.frames.frames import (
Frame,
StartFrame,
TTSStartedFrame,
TTSSpeakFrame,
)
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.services.elevenlabs.tts import ElevenLabsTTSService, ElevenLabsTTSSettings
from pipecat.transports.base_output import BaseOutputTransport
from pipecat.transports.base_transport import TransportParams
# ---------------------------------------------------------------------------
# Exact error payload sent by ElevenLabs when the simultaneous-context limit
# is reached (verified against the live API in test_elevenlabs_error_format.py).
# ---------------------------------------------------------------------------
_ELEVENLABS_MAX_CONTEXTS_ERROR = json.dumps(
{
"message": (
"Maximum simultaneous contexts per WebSocket connection exceeded (5). "
"Please close an existing context before opening a new one."
),
"error": "max_active_conversations",
"code": 1008,
}
)
_ELEVENLABS_CLOSE = Close(code=1008, reason="Maximum simultaneous contexts per WebSocket connection exceeded (5).")
class _MockWebSocket:
"""
Simulates the ElevenLabs WebSocket server.
After the second ``send()`` (context-init + TTS text), yields the error
JSON then closes the connection with code 1008. Reconnected instances
block forever.
"""
def __init__(self) -> None:
self.state = State.OPEN
self._queue: asyncio.Queue = asyncio.Queue()
self._send_count = 0
async def send(self, data: str) -> None:
self._send_count += 1
if self._send_count == 2:
await self._queue.put(_ELEVENLABS_MAX_CONTEXTS_ERROR)
await self._queue.put(None)
async def ping(self) -> None:
pass
async def close(self) -> None:
self.state = State.CLOSED
def __aiter__(self):
return self
async def __anext__(self) -> str:
item = await self._queue.get()
if item is None:
self.state = State.CLOSED
raise ConnectionClosedError(rcvd=_ELEVENLABS_CLOSE, sent=None)
return item
class _TestOutputTransport(BaseOutputTransport):
"""Minimal BaseOutputTransport that self-initialises on StartFrame."""
async def start(self, frame: StartFrame):
await super().start(frame)
await self.set_transport_ready(frame)
class _FrameCapture(FrameProcessor):
"""Records every downstream frame that passes through it."""
def __init__(self) -> None:
super().__init__()
self.frames: list[Frame] = []
async def process_frame(self, frame: Frame, direction: FrameDirection) -> None:
await super().process_frame(frame, direction)
if direction == FrameDirection.DOWNSTREAM:
self.frames.append(frame)
await self.push_frame(frame, direction)
@pytest.mark.asyncio
async def test_tts_paused_frame_processing_traps_subsequent_frames():
"""
BUG: TTS frame processing permanently paused — subsequent frames trapped.
This is the full production deadlock chain:
1. TTSSpeakFrame arrives at ElevenLabsTTSService.
- TTS synthesises text, then calls ``_maybe_pause_frame_processing()``.
- Because ``pause_frame_processing=True`` (ElevenLabs default) and
``_processing_text=True`` (set by ``_synthesize_text``),
``pause_processing_frames()`` is called.
- The TTS's ``__process_frame_task_handler`` blocks on
``__process_event.wait()``.
2. The mock WebSocket returns the error JSON (no audio). The error is
silently dropped. The connection closes, pipecat reconnects.
3. After 2 s the ``_stop_frame_handler`` fires a ``TTSStoppedFrame``.
It reaches BaseOutputTransport, but ``_tts_audio_received`` is False
(no audio ever arrived in 221 ms), so ``_bot_stopped_speaking()`` is
NOT called and ``BotStoppedSpeakingFrame`` is never emitted.
4. Without ``BotStoppedSpeakingFrame``, the TTS's ``process_frame``
(tts_service.py:755) never calls ``_maybe_resume_frame_processing()``.
The ``__process_frame_task_handler`` stays blocked forever.
5. A second ``TTSSpeakFrame`` is queued (simulating the LLM functioon response
arriving via ``result_callback``). It enters the TTS's
``__input_queue`` → ``__process_queue`` — but the process task is
blocked, so the frame is **never processed**.
This test is expected to FAIL until the bug is fixed.
"""
capture = _FrameCapture()
tts = ElevenLabsTTSService(
api_key="fake-key-no-network-call-made",
settings=ElevenLabsTTSSettings(
voice="test-voice",
model="eleven_turbo_v2",
),
)
output_transport = _TestOutputTransport(
params=TransportParams(
audio_out_enabled=True,
audio_out_sample_rate=24000,
),
)
# Each call creates a fresh mock — first connection triggers error,
# reconnected connection blocks forever (no more errors).
async def _fake_websocket_connect(url, **kwargs):
return _MockWebSocket()
with patch("pipecat.services.elevenlabs.tts.websocket_connect", _fake_websocket_connect):
pipeline = Pipeline([tts, output_transport, capture])
task = PipelineTask(pipeline, params=PipelineParams())
runner = PipelineRunner(handle_sigint=False)
async def _driver() -> None:
# Phrase 1: the time-buying phrase.
# This triggers TTS processing → pause_frame_processing → error.
await task.queue_frame(
TTSSpeakFrame(text="Thanks for letting me know, let me look into that.")
)
# Wait for:
# - error to be processed (~0.5 s)
# - _stop_frame_handler to fire TTSStoppedFrame (2 s timeout)
# - TTSStoppedFrame to reach BaseOutputTransport (instant)
# - BaseOutputTransport to NOT emit BotStoppedSpeakingFrame
# (because _tts_audio_received is False)
# Total: ~3 s is enough.
await asyncio.sleep(3.0)
# Phrase 2: the LLM function response (simulating result_callback).
# In production this was a FunctionCallResultFrame, but any
# non-system frame demonstrates the same deadlock.
await task.queue_frame(
TTSSpeakFrame(text="Could you tell me the approximate value of your motorcycle?")
)
# Give phrase 2 time to be processed (if TTS were unblocked,
# it would produce a TTSStartedFrame almost instantly).
await asyncio.sleep(2.0)
await task.cancel()
asyncio.create_task(_driver())
await runner.run(task)
tts_started = [f for f in capture.frames if isinstance(f, TTSStartedFrame)]
# Phrase 1 produced a TTSStartedFrame (the context was opened before
# the error). If the pipeline were healthy, phrase 2 would produce
# a second TTSStartedFrame.
assert len(tts_started) >= 1, (
"Setup problem: no TTSStartedFrame at all."
)
# BUG: this assertion currently FAILS.
#
# Phrase 2's TTSSpeakFrame is stuck in the TTS's paused __process_queue.
# The TTS called pause_processing_frames() after phrase 1, and the only
# resume path (BotStoppedSpeakingFrame) never fires because no audio
# was delivered before the error.
#
# In production this trapped the FunctionCallResultFrame for 16 seconds
# until a customer barge-in sent an InterruptionFrame that reset the
# TTS's process task.
assert len(tts_started) >= 2, (
f"Bug: only {len(tts_started)} TTSStartedFrame(s), expected 2. "
"Phrase 2 is trapped in the TTS's paused __process_queue. "
"ElevenLabsTTSService.pause_frame_processing=True paused the queue "
"after phrase 1, and BotStoppedSpeakingFrame (the only resume signal) "
"was never emitted because no audio reached BaseOutputTransport "
"before the ElevenLabs error."
)Expected behavior
I expect the TTS service to NOT block the pipeline in case of underlying transport issues. Maybe reconnection should call _maybe_resume_frame_processing()?
Actual behavior
Pipeline remains blocked until user interruption. But at that point user experience is not great and the LLMAssistantAggregator doesn't have the function result to properly give an answer. So yet another request by user is needed to actually get the answer.