Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/lmnr/opentelemetry_lib/decorators/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ def wrap(*args, **kwargs):
except Exception as e:
_process_exception(span, e)
_cleanup_span(span, wrapper)
raise e
raise
finally:
# Always restore global context
context_api.detach(ctx_token)
Expand Down
132 changes: 107 additions & 25 deletions src/lmnr/opentelemetry_lib/litellm/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@
import json
from datetime import datetime

from opentelemetry.semconv._incubating.attributes.gen_ai_attributes import GEN_AI_PROMPT
from opentelemetry.trace import SpanKind, Status, StatusCode, Tracer
from lmnr.opentelemetry_lib.decorators import json_dumps
from lmnr.opentelemetry_lib.litellm.utils import (
get_tool_definition,
is_validator_iterator,
model_as_dict,
set_span_attribute,
)
Expand Down Expand Up @@ -245,35 +247,107 @@ def _process_input_messages(self, span, messages):
if not isinstance(messages, list):
return

for i, message in enumerate(messages):
message_dict = model_as_dict(message)
role = message_dict.get("role", "unknown")
set_span_attribute(span, f"gen_ai.prompt.{i}.role", role)

tool_calls = message_dict.get("tool_calls", [])
self._process_tool_calls(span, tool_calls, i, is_response=False)
prompt_index = 0
for item in messages:
block_dict = model_as_dict(item)
if block_dict.get("type", "message") == "message":
tool_calls = block_dict.get("tool_calls", [])
self._process_tool_calls(
span, tool_calls, prompt_index, is_response=False
)
content = block_dict.get("content")
if is_validator_iterator(content):
# Have not been able to catch this in the wild, but keeping
# just in case, as raw OpenAI responses do that
content = [self._process_content_part(part) for part in content]
try:
stringified_content = (
content if isinstance(content, str) else json_dumps(content)
)
except Exception:
stringified_content = (
str(content) if content is not None else ""
)
set_span_attribute(
span,
f"{GEN_AI_PROMPT}.{prompt_index}.content",
stringified_content,
)
set_span_attribute(
span,
f"{GEN_AI_PROMPT}.{prompt_index}.role",
block_dict.get("role"),
)
prompt_index += 1

content = message_dict.get("content", "")
if content is None:
continue
if isinstance(content, str):
set_span_attribute(span, f"gen_ai.prompt.{i}.content", content)
elif isinstance(content, list):
elif block_dict.get("type") == "computer_call_output":
set_span_attribute(
span, f"gen_ai.prompt.{i}.content", json.dumps(content)
span,
f"{GEN_AI_PROMPT}.{prompt_index}.role",
"computer_call_output",
)
else:
output_image_url = block_dict.get("output", {}).get("image_url")
if output_image_url:
set_span_attribute(
span,
f"{GEN_AI_PROMPT}.{prompt_index}.content",
json.dumps(
[
{
"type": "image_url",
"image_url": {"url": output_image_url},
}
]
),
)
prompt_index += 1
elif block_dict.get("type") == "computer_call":
set_span_attribute(
span, f"{GEN_AI_PROMPT}.{prompt_index}.role", "assistant"
)
call_content = {}
if block_dict.get("id"):
call_content["id"] = block_dict.get("id")
if block_dict.get("action"):
call_content["action"] = block_dict.get("action")
set_span_attribute(
span,
f"gen_ai.prompt.{i}.content",
json.dumps(model_as_dict(content)),
f"{GEN_AI_PROMPT}.{prompt_index}.tool_calls.0.arguments",
json.dumps(call_content),
)
if role == "tool":
set_span_attribute(
span,
f"gen_ai.prompt.{i}.tool_call_id",
message_dict.get("tool_call_id"),
f"{GEN_AI_PROMPT}.{prompt_index}.tool_calls.0.id",
block_dict.get("call_id"),
)
set_span_attribute(
span,
f"{GEN_AI_PROMPT}.{prompt_index}.tool_calls.0.name",
"computer_call",
)
prompt_index += 1
elif block_dict.get("type") == "reasoning":
reasoning_summary = block_dict.get("summary")
if reasoning_summary and isinstance(reasoning_summary, list):
processed_chunks = [
{"type": "text", "text": chunk.get("text")}
for chunk in reasoning_summary
if isinstance(chunk, dict)
and chunk.get("type") == "summary_text"
]
set_span_attribute(
span,
f"{GEN_AI_PROMPT}.{prompt_index}.reasoning",
json_dumps(processed_chunks),
)
set_span_attribute(
span,
f"{GEN_AI_PROMPT}.{prompt_index}.role",
"assistant",
)
# reasoning is followed by other content parts in the same messge,
# so we don't increment the prompt index
# TODO: handle other block types

def _process_request_tool_definitions(self, span, tools):
"""Process and set tool definitions attributes on the span"""
Expand Down Expand Up @@ -493,11 +567,19 @@ def _process_response_output(self, span, output):
)
tool_call_index += 1
elif block_dict.get("type") == "reasoning":
set_span_attribute(
span,
"gen_ai.completion.0.reasoning",
block_dict.get("summary"),
)
reasoning_summary = block_dict.get("summary")
if reasoning_summary and isinstance(reasoning_summary, list):
processed_chunks = [
{"type": "text", "text": chunk.get("text")}
for chunk in reasoning_summary
if isinstance(chunk, dict)
and chunk.get("type") == "summary_text"
]
set_span_attribute(
span,
"gen_ai.completion.0.reasoning",
json_dumps(processed_chunks),
)
# TODO: handle other block types, in particular other calls

def _process_success_response(self, span, response_obj):
Expand Down
12 changes: 12 additions & 0 deletions src/lmnr/opentelemetry_lib/litellm/utils.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import re
from pydantic import BaseModel
from opentelemetry.sdk.trace import Span
from opentelemetry.util.types import AttributeValue
Expand Down Expand Up @@ -80,3 +81,14 @@ def get_tool_definition(tool: dict) -> ToolDefinition:
description=description,
parameters=parameters,
)


def is_validator_iterator(content):
"""
Some OpenAI objects contain fields typed as Iterable, which pydantic
internally converts to a ValidatorIterator, and they cannot be trivially
serialized without consuming the iterator to, for example, a list.

See: https://github.com/pydantic/pydantic/issues/9541#issuecomment-2189045051
"""
return re.search(r"pydantic.*ValidatorIterator'>$", str(type(content)))
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
"""OpenTelemetry Groq instrumentation"""

import logging
from typing import Any, AsyncGenerator, Collection

from lmnr.opentelemetry_lib.decorators import json_dumps
from lmnr import Laminar
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
from opentelemetry.instrumentation.utils import unwrap

from opentelemetry.trace import Span
from opentelemetry.trace.status import Status, StatusCode
from wrapt import wrap_function_wrapper

logger = logging.getLogger(__name__)

_instruments = ("cua-agent >= 0.4.0",)


def _wrap_run(
wrapped,
instance,
args,
kwargs,
):
parent_span = Laminar.start_span("agent.run")
instance._lmnr_parent_span = parent_span

try:
result: AsyncGenerator[dict[str, Any], None] = wrapped(*args, **kwargs)
return _abuild_from_streaming_response(parent_span, result)
except Exception as e:
if parent_span.is_recording():
parent_span.set_status(Status(StatusCode.ERROR))
parent_span.record_exception(e)
parent_span.end()
raise


async def _abuild_from_streaming_response(
parent_span: Span, response: AsyncGenerator[dict[str, Any], None]
) -> AsyncGenerator[dict[str, Any], None]:
with Laminar.use_span(parent_span, end_on_exit=True):
response_iter = aiter(response)
while True:
step = None
step_span = Laminar.start_span("agent.step")
with Laminar.use_span(step_span):
try:
step = await anext(response_iter)
step_span.set_attribute("lmnr.span.output", json_dumps(step))
if step_span.is_recording():
step_span.end()
except StopAsyncIteration:
# don't end on purpose, there is no iteration step here.
break

if step is not None:
yield step


class CuaAgentInstrumentor(BaseInstrumentor):
def __init__(self):
super().__init__()

def instrumentation_dependencies(self) -> Collection[str]:
return _instruments

def _instrument(self, **kwargs):
wrap_package = "agent.agent"
wrap_object = "ComputerAgent"
wrap_method = "run"
try:
wrap_function_wrapper(
wrap_package,
f"{wrap_object}.{wrap_method}",
_wrap_run,
)
except ModuleNotFoundError:
pass # that's ok, we don't want to fail if some methods do not exist

def _uninstrument(self, **kwargs):
wrap_package = "agent.agent"
wrap_object = "ComputerAgent"
wrap_method = "run"
try:
unwrap(
f"{wrap_package}.{wrap_object}",
wrap_method,
)
except ModuleNotFoundError:
pass # that's ok, we don't want to fail if some methods do not exist
Loading