Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([#3715](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3715))
- Add wrappers for OpenAI Responses API streams and response stream managers
([#4280](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4280))
- Add async wrappers for OpenAI Responses API streams and response stream managers
([#4325](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4325))

## Version 2.3b0 (2025-12-24)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from __future__ import annotations

import logging
from contextlib import ExitStack, contextmanager
from contextlib import AsyncExitStack, ExitStack, contextmanager
from types import TracebackType
from typing import TYPE_CHECKING, Callable, Generator, Generic, TypeVar

Expand Down Expand Up @@ -53,6 +53,8 @@
ResponseStreamEvent,
)
from openai.lib.streaming.responses._responses import (
AsyncResponseStream,
AsyncResponseStreamManager,
ResponseStream,
ResponseStreamManager,
) # pylint: disable=no-name-in-module
Expand All @@ -76,6 +78,16 @@ def _set_response_attributes(
_set_invocation_response_attributes(invocation, result, capture_content)


def _get_stream_response(stream):
try:
return stream._response
except AttributeError:
try:
return stream.response
except AttributeError:
return None


class _ResponseProxy(Generic[ResponseT]):
def __init__(self, response: ResponseT, finalize: Callable[[], None]):
self._response = response
Expand All @@ -91,6 +103,21 @@ def __getattr__(self, name: str):
return getattr(self._response, name)


class _AsyncResponseProxy(Generic[ResponseT]):
def __init__(self, response: ResponseT, finalize: Callable[[], None]):
self._response = response
self._finalize = finalize

async def aclose(self) -> None:
try:
await self._response.aclose()
finally:
self._finalize()

def __getattr__(self, name: str):
return getattr(self._response, name)


class ResponseStreamWrapper(Generic[TextFormatT]):
"""Wrapper for OpenAI Responses API stream objects.

Expand Down Expand Up @@ -172,7 +199,7 @@ def __getattr__(self, name: str):

@property
def response(self):
response = self.stream.response
response = _get_stream_response(self.stream)
if response is None:
return None
return _ResponseProxy(response, lambda: self._stop(None))
Expand Down Expand Up @@ -303,3 +330,135 @@ def parse(self) -> "ResponseStreamManagerWrapper[TextFormatT]":
# cleanup once wrapt 2 typing support is available (wrapt PR #3903).
def __getattr__(self, name: str):
return getattr(self._manager, name)


class AsyncResponseStreamWrapper(ResponseStreamWrapper[TextFormatT]):
"""Wrapper for async OpenAI Responses API stream objects."""

stream: "AsyncResponseStream[TextFormatT]"

async def __aenter__(self) -> "AsyncResponseStreamWrapper[TextFormatT]":
return self

async def __aexit__(
self,
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: TracebackType | None,
) -> bool:
try:
if exc_type is not None:
self._fail(
str(exc_val), type(exc_val) if exc_val else Exception
)
finally:
await self.close()
return False

async def close(self) -> None:
try:
await self.stream.close()
finally:
self._stop(None)

def __aiter__(self) -> "AsyncResponseStreamWrapper[TextFormatT]":
return self

async def __anext__(self) -> "ResponseStreamEvent[TextFormatT]":
try:
event = await self.stream.__anext__()
except StopAsyncIteration:
self._stop(None)
raise
except Exception as error:
self._fail(str(error), type(error))
raise
with self._safe_instrumentation("event processing"):
self.process_event(event)
return event

async def get_final_response(self) -> "ParsedResponse[TextFormatT]":
await self.until_done()
return await self.stream.get_final_response()

async def until_done(self) -> "AsyncResponseStreamWrapper[TextFormatT]":
async for _ in self:
pass
return self

def parse(self) -> "AsyncResponseStreamWrapper[TextFormatT]":
raise NotImplementedError(
"AsyncResponseStreamWrapper.parse() is not implemented"
)

@property
def response(self):
response = _get_stream_response(self.stream)
if response is None:
return None
return _AsyncResponseProxy(response, lambda: self._stop(None))


class AsyncResponseStreamManagerWrapper(Generic[TextFormatT]):
"""Wrapper for async OpenAI Responses API stream managers."""

def __init__(
self,
manager: "AsyncResponseStreamManager[TextFormatT]",
handler: TelemetryHandler,
invocation: "LLMInvocation",
capture_content: bool,
):
self._manager = manager
self._handler = handler
self._invocation = invocation
self._capture_content = capture_content
self._stream_wrapper: (
AsyncResponseStreamWrapper[TextFormatT] | None
) = None

async def __aenter__(self) -> AsyncResponseStreamWrapper[TextFormatT]:
stream = await self._manager.__aenter__()
self._stream_wrapper = AsyncResponseStreamWrapper(
stream,
self._handler,
self._invocation,
self._capture_content,
)
return self._stream_wrapper

async def __aexit__(
self,
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: TracebackType | None,
) -> bool:
suppressed = False
stream_wrapper = self._stream_wrapper
self._stream_wrapper = None
async with AsyncExitStack() as cleanup:
if stream_wrapper is not None:

async def finalize_stream_wrapper() -> None:
if suppressed:
await stream_wrapper.__aexit__(None, None, None)
else:
await stream_wrapper.__aexit__(
exc_type, exc_val, exc_tb
)

cleanup.push_async_callback(finalize_stream_wrapper)
suppressed = await self._manager.__aexit__(
exc_type, exc_val, exc_tb
)
return suppressed

def parse(self) -> "AsyncResponseStreamManagerWrapper[TextFormatT]":
raise NotImplementedError(
"AsyncResponseStreamManagerWrapper.parse() is not implemented"
)

# TODO: Replace __getattr__ passthrough with wrapt.ObjectProxy in a future
# cleanup once wrapt 2 typing support is available (wrapt PR #3903).
def __getattr__(self, name: str):
return getattr(self._manager, name)
Loading