Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -144,3 +144,6 @@ build-backend = "uv_build"

[tool.uv.workspace]
members = ["examples/fastapi-app"]

[tool.ruff]
target-version = "py310"
2 changes: 1 addition & 1 deletion src/lmnr/opentelemetry_lib/decorators/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ def wrap(*args, **kwargs):
except Exception as e:
_process_exception(span, e)
_cleanup_span(span, wrapper)
raise e
raise
finally:
# Always restore global context
context_api.detach(ctx_token)
Expand Down
132 changes: 107 additions & 25 deletions src/lmnr/opentelemetry_lib/litellm/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@
import json
from datetime import datetime

from opentelemetry.semconv._incubating.attributes.gen_ai_attributes import GEN_AI_PROMPT
from opentelemetry.trace import SpanKind, Status, StatusCode, Tracer
from lmnr.opentelemetry_lib.decorators import json_dumps
from lmnr.opentelemetry_lib.litellm.utils import (
get_tool_definition,
is_validator_iterator,
model_as_dict,
set_span_attribute,
)
Expand Down Expand Up @@ -245,35 +247,107 @@ def _process_input_messages(self, span, messages):
if not isinstance(messages, list):
return

for i, message in enumerate(messages):
message_dict = model_as_dict(message)
role = message_dict.get("role", "unknown")
set_span_attribute(span, f"gen_ai.prompt.{i}.role", role)

tool_calls = message_dict.get("tool_calls", [])
self._process_tool_calls(span, tool_calls, i, is_response=False)
prompt_index = 0
for item in messages:
block_dict = model_as_dict(item)
if block_dict.get("type", "message") == "message":
tool_calls = block_dict.get("tool_calls", [])
self._process_tool_calls(
span, tool_calls, prompt_index, is_response=False
)
content = block_dict.get("content")
if is_validator_iterator(content):
# Have not been able to catch this in the wild, but keeping
# just in case, as raw OpenAI responses do that
content = [self._process_content_part(part) for part in content]
try:
stringified_content = (
content if isinstance(content, str) else json_dumps(content)
)
except Exception:
stringified_content = (
str(content) if content is not None else ""
)
set_span_attribute(
span,
f"{GEN_AI_PROMPT}.{prompt_index}.content",
stringified_content,
)
set_span_attribute(
span,
f"{GEN_AI_PROMPT}.{prompt_index}.role",
block_dict.get("role"),
)
prompt_index += 1

content = message_dict.get("content", "")
if content is None:
continue
if isinstance(content, str):
set_span_attribute(span, f"gen_ai.prompt.{i}.content", content)
elif isinstance(content, list):
elif block_dict.get("type") == "computer_call_output":
set_span_attribute(
span, f"gen_ai.prompt.{i}.content", json.dumps(content)
span,
f"{GEN_AI_PROMPT}.{prompt_index}.role",
"computer_call_output",
)
else:
output_image_url = block_dict.get("output", {}).get("image_url")
if output_image_url:
set_span_attribute(
span,
f"{GEN_AI_PROMPT}.{prompt_index}.content",
json.dumps(
[
{
"type": "image_url",
"image_url": {"url": output_image_url},
}
]
),
)
prompt_index += 1
elif block_dict.get("type") == "computer_call":
set_span_attribute(
span, f"{GEN_AI_PROMPT}.{prompt_index}.role", "assistant"
)
call_content = {}
if block_dict.get("id"):
call_content["id"] = block_dict.get("id")
if block_dict.get("action"):
call_content["action"] = block_dict.get("action")
set_span_attribute(
span,
f"gen_ai.prompt.{i}.content",
json.dumps(model_as_dict(content)),
f"{GEN_AI_PROMPT}.{prompt_index}.tool_calls.0.arguments",
json.dumps(call_content),
)
if role == "tool":
set_span_attribute(
span,
f"gen_ai.prompt.{i}.tool_call_id",
message_dict.get("tool_call_id"),
f"{GEN_AI_PROMPT}.{prompt_index}.tool_calls.0.id",
block_dict.get("call_id"),
)
set_span_attribute(
span,
f"{GEN_AI_PROMPT}.{prompt_index}.tool_calls.0.name",
"computer_call",
)
prompt_index += 1
elif block_dict.get("type") == "reasoning":
reasoning_summary = block_dict.get("summary")
if reasoning_summary and isinstance(reasoning_summary, list):
processed_chunks = [
{"type": "text", "text": chunk.get("text")}
for chunk in reasoning_summary
if isinstance(chunk, dict)
and chunk.get("type") == "summary_text"
]
set_span_attribute(
span,
f"{GEN_AI_PROMPT}.{prompt_index}.reasoning",
json_dumps(processed_chunks),
)
set_span_attribute(
span,
f"{GEN_AI_PROMPT}.{prompt_index}.role",
"assistant",
)
# reasoning is followed by other content parts in the same messge,
# so we don't increment the prompt index
# TODO: handle other block types

def _process_request_tool_definitions(self, span, tools):
"""Process and set tool definitions attributes on the span"""
Expand Down Expand Up @@ -493,11 +567,19 @@ def _process_response_output(self, span, output):
)
tool_call_index += 1
elif block_dict.get("type") == "reasoning":
set_span_attribute(
span,
"gen_ai.completion.0.reasoning",
block_dict.get("summary"),
)
reasoning_summary = block_dict.get("summary")
if reasoning_summary and isinstance(reasoning_summary, list):
processed_chunks = [
{"type": "text", "text": chunk.get("text")}
for chunk in reasoning_summary
if isinstance(chunk, dict)
and chunk.get("type") == "summary_text"
]
set_span_attribute(
span,
"gen_ai.completion.0.reasoning",
json_dumps(processed_chunks),
)
# TODO: handle other block types, in particular other calls

def _process_success_response(self, span, response_obj):
Expand Down
12 changes: 12 additions & 0 deletions src/lmnr/opentelemetry_lib/litellm/utils.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import re
from pydantic import BaseModel
from opentelemetry.sdk.trace import Span
from opentelemetry.util.types import AttributeValue
Expand Down Expand Up @@ -80,3 +81,14 @@ def get_tool_definition(tool: dict) -> ToolDefinition:
description=description,
parameters=parameters,
)


def is_validator_iterator(content):
"""
Some OpenAI objects contain fields typed as Iterable, which pydantic
internally converts to a ValidatorIterator, and they cannot be trivially
serialized without consuming the iterator to, for example, a list.

See: https://github.com/pydantic/pydantic/issues/9541#issuecomment-2189045051
"""
return re.search(r"pydantic.*ValidatorIterator'>$", str(type(content)))
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
"""OpenTelemetry CUA instrumentation"""

import logging
from typing import Any, AsyncGenerator, Collection

from lmnr.opentelemetry_lib.decorators import json_dumps
from lmnr import Laminar
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
from opentelemetry.instrumentation.utils import unwrap

from opentelemetry.trace import Span
from opentelemetry.trace.status import Status, StatusCode
from wrapt import wrap_function_wrapper

logger = logging.getLogger(__name__)

_instruments = ("cua-agent >= 0.4.0",)


def _wrap_run(
wrapped,
instance,
args,
kwargs,
):
parent_span = Laminar.start_span("ComputerAgent.run")
instance._lmnr_parent_span = parent_span

try:
result: AsyncGenerator[dict[str, Any], None] = wrapped(*args, **kwargs)
return _abuild_from_streaming_response(parent_span, result)
except Exception as e:
if parent_span.is_recording():
parent_span.set_status(Status(StatusCode.ERROR))
parent_span.record_exception(e)
parent_span.end()
raise


async def _abuild_from_streaming_response(
parent_span: Span, response: AsyncGenerator[dict[str, Any], None]
) -> AsyncGenerator[dict[str, Any], None]:
with Laminar.use_span(parent_span, end_on_exit=True):
response_iter = aiter(response)
while True:
step = None
step_span = Laminar.start_span("ComputerAgent.step")
with Laminar.use_span(step_span):
try:
step = await anext(response_iter)
step_span.set_attribute("lmnr.span.output", json_dumps(step))
try:
# When processing tool calls, each output item is processed separately,
# if the output is message, agent.step returns an empty array
# https://github.com/trycua/cua/blob/17d670962970a1d1774daaec029ebf92f1f9235e/libs/python/agent/agent/agent.py#L459
if len(step.get("output", [])) == 0:
continue
except Exception:
pass
if step_span.is_recording():
step_span.end()
except StopAsyncIteration:
# don't end on purpose, there is no iteration step here.
break

if step is not None:
yield step


class CuaAgentInstrumentor(BaseInstrumentor):
def __init__(self):
super().__init__()

def instrumentation_dependencies(self) -> Collection[str]:
return _instruments

def _instrument(self, **kwargs):
wrap_package = "agent.agent"
wrap_object = "ComputerAgent"
wrap_method = "run"
try:
wrap_function_wrapper(
wrap_package,
f"{wrap_object}.{wrap_method}",
_wrap_run,
)
except ModuleNotFoundError:
pass # that's ok, we don't want to fail if some methods do not exist

def _uninstrument(self, **kwargs):
wrap_package = "agent.agent"
wrap_object = "ComputerAgent"
wrap_method = "run"
try:
unwrap(
f"{wrap_package}.{wrap_object}",
wrap_method,
)
except ModuleNotFoundError:
pass # that's ok, we don't want to fail if some methods do not exist
Loading