Skip to content

Commit 12dc6c0

Browse files
authored
Merge pull request #3707 from pipecat-ai/aleix/fix-openai-stream-close-compat
fix(openai): use compatible stream closing for non-OpenAI providers
2 parents 17ab9c4 + 93f4402 commit 12dc6c0

File tree

3 files changed

+16
-8
lines changed

3 files changed

+16
-8
lines changed

changelog/3707.fixed.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
- Fixed stream closing compatibility for OpenAI-compatible providers (e.g. OpenPipe) that return async generators instead of `AsyncStream`.

src/pipecat/services/openai/base_llm.py

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import asyncio
1010
import base64
1111
import json
12+
from contextlib import asynccontextmanager
1213
from typing import Any, Dict, List, Mapping, Optional
1314

1415
import httpx
@@ -374,9 +375,19 @@ async def _process_context(self, context: OpenAILLMContext | LLMContext):
374375
else self._stream_chat_completions_universal_context(context)
375376
)
376377

377-
# Use context manager to ensure stream is closed on cancellation/exception.
378-
# Without this, CancelledError during iteration leaves the underlying socket open.
379-
async with chunk_stream:
378+
# Ensure stream is closed on cancellation/exception to prevent socket
379+
# leaks. OpenAI's AsyncStream uses close(), async generators use aclose().
380+
@asynccontextmanager
381+
async def _closing(stream):
382+
try:
383+
yield stream
384+
finally:
385+
if hasattr(stream, "aclose"):
386+
await stream.aclose()
387+
elif hasattr(stream, "close"):
388+
await stream.close()
389+
390+
async with _closing(chunk_stream):
380391
async for chunk in chunk_stream:
381392
if chunk.usage:
382393
cached_tokens = (

tests/test_openai_llm_timeout.py

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -149,13 +149,9 @@ class MockAsyncStream:
149149
def __init__(self):
150150
self.iteration_count = 0
151151

152-
async def __aenter__(self):
153-
return self
154-
155-
async def __aexit__(self, exc_type, exc_val, exc_tb):
152+
async def close(self):
156153
nonlocal stream_closed
157154
stream_closed = True
158-
return False
159155

160156
def __aiter__(self):
161157
return self

0 commit comments

Comments
 (0)