Skip to content

Commit 17ab9c4

Browse files
authored
Merge pull request #3675 from pipecat-ai/mb/elevenlabs-realtime-send-silence
Add silence-based keepalive to WebsocketSTTService
2 parents 1128c5b + 2f5e61a commit 17ab9c4

File tree

6 files changed

+174
-70
lines changed

6 files changed

+174
-70
lines changed

changelog/3675.fixed.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
- Fixed WebSocket STT services (ElevenLabs, Cartesia, Gladia, Soniox) disconnecting due to idle timeout when no audio is being sent (e.g. when inactive behind a `ServiceSwitcher`). `WebsocketSTTService` now provides opt-in silence-based keepalive via `keepalive_timeout` and `keepalive_interval` parameters.

src/pipecat/services/cartesia/stt.py

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,11 @@ class CartesiaSTTService(WebsocketSTTService):
129129
Provides real-time speech transcription through WebSocket connection
130130
to Cartesia's Live transcription service. Supports both interim and
131131
final transcriptions with configurable models and languages.
132+
133+
Cartesia disconnects WebSocket connections after 3 minutes of inactivity.
134+
The timeout resets with each message (audio data or text command) sent to
135+
the server. Silence-based keepalive is enabled by default to prevent this.
136+
See: https://docs.cartesia.ai/api-reference/stt/stt
132137
"""
133138

134139
def __init__(
@@ -153,7 +158,13 @@ def __init__(
153158
**kwargs: Additional arguments passed to parent STTService.
154159
"""
155160
sample_rate = sample_rate or (live_options.sample_rate if live_options else None)
156-
super().__init__(sample_rate=sample_rate, ttfs_p99_latency=ttfs_p99_latency, **kwargs)
161+
super().__init__(
162+
sample_rate=sample_rate,
163+
ttfs_p99_latency=ttfs_p99_latency,
164+
keepalive_timeout=120,
165+
keepalive_interval=30,
166+
**kwargs,
167+
)
157168

158169
default_options = CartesiaLiveOptions(
159170
model="ink-whisper",
@@ -248,10 +259,10 @@ async def run_stt(self, audio: bytes) -> AsyncGenerator[Frame, None]:
248259
yield None
249260

250261
async def _connect(self):
251-
await super()._connect()
252-
253262
await self._connect_websocket()
254263

264+
await super()._connect()
265+
255266
if self._websocket and not self._receive_task:
256267
self._receive_task = self.create_task(self._receive_task_handler(self._report_error))
257268

@@ -295,7 +306,7 @@ def _get_websocket(self):
295306
return self._websocket
296307
raise Exception("Websocket not connected")
297308

298-
async def _process_messages(self):
309+
async def _receive_messages(self):
299310
"""Process incoming WebSocket messages."""
300311
async for message in self._get_websocket():
301312
try:
@@ -306,14 +317,6 @@ async def _process_messages(self):
306317
except Exception as e:
307318
logger.error(f"Error processing message: {e}")
308319

309-
async def _receive_messages(self):
310-
while True:
311-
await self._process_messages()
312-
# Cartesia times out after 5 minutes of innactivity (no keepalive
313-
# mechanism is available). So, we try to reconnect.
314-
logger.debug(f"{self} Cartesia connection was disconnected (timeout?), reconnecting")
315-
await self._connect_websocket()
316-
317320
async def _process_response(self, data):
318321
if "type" in data:
319322
if data["type"] == "transcript":

src/pipecat/services/elevenlabs/stt.py

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -459,6 +459,8 @@ def __init__(
459459
super().__init__(
460460
sample_rate=sample_rate,
461461
ttfs_p99_latency=ttfs_p99_latency,
462+
keepalive_timeout=10,
463+
keepalive_interval=5,
462464
**kwargs,
463465
)
464466

@@ -611,10 +613,10 @@ async def run_stt(self, audio: bytes) -> AsyncGenerator[Frame, None]:
611613

612614
async def _connect(self):
613615
"""Establish WebSocket connection to ElevenLabs Realtime STT."""
614-
await super()._connect()
615-
616616
await self._connect_websocket()
617617

618+
await super()._connect()
619+
618620
if self._websocket and not self._receive_task:
619621
self._receive_task = self.create_task(self._receive_task_handler(self._report_error))
620622

@@ -628,6 +630,21 @@ async def _disconnect(self):
628630

629631
await self._disconnect_websocket()
630632

633+
async def _send_keepalive(self, silence: bytes):
634+
"""Send silent audio wrapped in ElevenLabs' JSON protocol.
635+
636+
Args:
637+
silence: Silent 16-bit mono PCM audio bytes.
638+
"""
639+
audio_base64 = base64.b64encode(silence).decode("utf-8")
640+
message = {
641+
"message_type": "input_audio_chunk",
642+
"audio_base_64": audio_base64,
643+
"commit": False,
644+
"sample_rate": self.sample_rate,
645+
}
646+
await self._websocket.send(json.dumps(message))
647+
631648
async def _connect_websocket(self):
632649
"""Connect to ElevenLabs Realtime STT WebSocket endpoint."""
633650
try:

src/pipecat/services/gladia/stt.py

Lines changed: 16 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -231,7 +231,13 @@ def __init__(
231231
Override for your deployment. See https://github.com/pipecat-ai/stt-benchmark
232232
**kwargs: Additional arguments passed to the STTService parent class.
233233
"""
234-
super().__init__(sample_rate=sample_rate, ttfs_p99_latency=ttfs_p99_latency, **kwargs)
234+
super().__init__(
235+
sample_rate=sample_rate,
236+
ttfs_p99_latency=ttfs_p99_latency,
237+
keepalive_timeout=20,
238+
keepalive_interval=5,
239+
**kwargs,
240+
)
235241

236242
params = params or GladiaInputParams()
237243

@@ -261,7 +267,6 @@ def __init__(
261267
self.set_model_name(model)
262268
self._params = params
263269
self._receive_task = None
264-
self._keepalive_task = None
265270
self._settings = {}
266271

267272
# Session management
@@ -416,8 +421,6 @@ async def _connect(self):
416421
417422
Initializes the session if needed and establishes websocket connection.
418423
"""
419-
await super()._connect()
420-
421424
# Initialize session if needed
422425
if not self._session_url:
423426
settings = self._prepare_settings()
@@ -428,12 +431,11 @@ async def _connect(self):
428431

429432
await self._connect_websocket()
430433

434+
await super()._connect()
435+
431436
if self._websocket and not self._receive_task:
432437
self._receive_task = self.create_task(self._receive_task_handler(self._report_error))
433438

434-
if self._websocket and not self._keepalive_task:
435-
self._keepalive_task = self.create_task(self._keepalive_task_handler())
436-
437439
async def _disconnect(self):
438440
"""Disconnect from the Gladia service.
439441
@@ -443,10 +445,6 @@ async def _disconnect(self):
443445

444446
self._connection_active = False
445447

446-
if self._keepalive_task:
447-
await self.cancel_task(self._keepalive_task)
448-
self._keepalive_task = None
449-
450448
if self._receive_task:
451449
await self.cancel_task(self._receive_task)
452450
self._receive_task = None
@@ -644,21 +642,10 @@ async def _receive_messages(self):
644642
except json.JSONDecodeError:
645643
logger.warning(f"{self} Received non-JSON message: {message}")
646644

647-
async def _keepalive_task_handler(self):
648-
"""Send periodic empty audio chunks to keep the connection alive."""
649-
try:
650-
KEEPALIVE_SLEEP = 20
651-
while self._connection_active:
652-
# Send keepalive (Gladia times out after 30 seconds)
653-
await asyncio.sleep(KEEPALIVE_SLEEP)
654-
if self._websocket and self._websocket.state is State.OPEN:
655-
# Send an empty audio chunk as keepalive
656-
empty_audio = b""
657-
await self._send_audio(empty_audio)
658-
else:
659-
logger.debug(f"{self} Websocket closed, stopping keepalive")
660-
break
661-
except websockets.exceptions.ConnectionClosed:
662-
logger.debug(f"{self} Connection closed during keepalive")
663-
except Exception as e:
664-
await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
645+
async def _send_keepalive(self, silence: bytes):
646+
"""Send an empty audio chunk to keep the Gladia connection alive.
647+
648+
Args:
649+
silence: Silent PCM audio bytes (unused, Gladia accepts empty chunks).
650+
"""
651+
await self._send_audio(b"")

src/pipecat/services/soniox/stt.py

Lines changed: 15 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66

77
"""Soniox speech-to-text service implementation."""
88

9-
import asyncio
109
import json
1110
import time
1211
from typing import AsyncGenerator, List, Optional
@@ -170,7 +169,13 @@ def __init__(
170169
Override for your deployment. See https://github.com/pipecat-ai/stt-benchmark
171170
**kwargs: Additional arguments passed to the STTService.
172171
"""
173-
super().__init__(sample_rate=sample_rate, ttfs_p99_latency=ttfs_p99_latency, **kwargs)
172+
super().__init__(
173+
sample_rate=sample_rate,
174+
ttfs_p99_latency=ttfs_p99_latency,
175+
keepalive_timeout=1,
176+
keepalive_interval=5,
177+
**kwargs,
178+
)
174179
params = params or SonioxInputParams()
175180

176181
self._api_key = api_key
@@ -183,7 +188,6 @@ def __init__(
183188
self._last_tokens_received: Optional[float] = None
184189

185190
self._receive_task = None
186-
self._keepalive_task = None
187191

188192
async def start(self, frame: StartFrame):
189193
"""Start the Soniox STT websocket connection.
@@ -269,27 +273,20 @@ async def _connect(self):
269273
270274
Establishes websocket connection and starts receive and keepalive tasks.
271275
"""
272-
await super()._connect()
273-
274276
await self._connect_websocket()
275277

278+
await super()._connect()
279+
276280
if self._websocket and not self._receive_task:
277281
self._receive_task = self.create_task(self._receive_task_handler(self._report_error))
278282

279-
if self._websocket and not self._keepalive_task:
280-
self._keepalive_task = self.create_task(self._keepalive_task_handler())
281-
282283
async def _disconnect(self):
283284
"""Disconnect from the Soniox service.
284285
285286
Cleans up tasks and closes websocket connection.
286287
"""
287288
await super()._disconnect()
288289

289-
if self._keepalive_task:
290-
await self.cancel_task(self._keepalive_task)
291-
self._keepalive_task = None
292-
293290
if self._receive_task:
294291
await self.cancel_task(self._receive_task)
295292
self._receive_task = None
@@ -462,17 +459,10 @@ async def send_endpoint_transcript():
462459
except Exception as e:
463460
logger.warning(f"Error processing message: {e}")
464461

465-
async def _keepalive_task_handler(self):
466-
"""Connection has to be open all the time."""
467-
try:
468-
while True:
469-
logger.trace("Sending keepalive message")
470-
if self._websocket and self._websocket.state is State.OPEN:
471-
await self._websocket.send(KEEPALIVE_MESSAGE)
472-
else:
473-
logger.debug("WebSocket connection closed.")
474-
break
475-
await asyncio.sleep(5)
462+
async def _send_keepalive(self, silence: bytes):
463+
"""Send a Soniox protocol-level keepalive message.
476464
477-
except Exception as e:
478-
logger.debug(f"Keepalive task stopped: {e}")
465+
Args:
466+
silence: Silent PCM audio bytes (unused, Soniox uses a protocol message).
467+
"""
468+
await self._websocket.send(KEEPALIVE_MESSAGE)

0 commit comments

Comments
 (0)