Skip to content

Commit 62aa02e

Browse files
Merge pull request #83 from agentevals-dev/fix/conversation_id-handling
Fix otlp session fragmentation
2 parents 4e52b7f + 3b38c16 commit 62aa02e

4 files changed

Lines changed: 94 additions & 2 deletions

File tree

src/agentevals/api/models.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,12 @@ class WSSessionCompleteEvent(CamelModel):
180180
invocations: list[dict[str, Any]]
181181

182182

183+
class WSSessionRemovedEvent(CamelModel):
184+
type: str = "session_removed"
185+
session_id: str
186+
absorbed_by: str
187+
188+
183189
class WSSpanReceivedEvent(CamelModel):
184190
type: str = "span_received"
185191
session_id: str

src/agentevals/api/otlp_routes.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,9 @@ async def _process_traces(body: dict, manager: StreamingTraceManager) -> None:
9696
resource_attrs = resource_span.get("resource", {}).get("attributes", [])
9797
metadata = _extract_agentevals_metadata(resource_attrs)
9898

99+
if not metadata.get("conversation_id"):
100+
metadata["conversation_id"] = _prescan_conversation_id(resource_span)
101+
99102
for scope_span in resource_span.get("scopeSpans", []):
100103
scope = scope_span.get("scope", {})
101104
scope_name = scope.get("name", "")
@@ -258,6 +261,21 @@ def _extract_agentevals_metadata(resource_attrs: list[dict]) -> dict:
258261
}
259262

260263

264+
def _prescan_conversation_id(resource_span: dict) -> str | None:
265+
"""Pre-scan all spans in a resourceSpan batch for gen_ai.conversation.id.
266+
267+
Within a single OTLP batch, some scopes (e.g. A2A server instrumentation)
268+
may lack conversation_id while others (agent instrumentation) have it.
269+
Scanning upfront ensures ALL spans in the batch route to the same session.
270+
"""
271+
for scope_span in resource_span.get("scopeSpans", []):
272+
for span_data in scope_span.get("spans", []):
273+
conv_id = _extract_conversation_id(span_data.get("attributes", []))
274+
if conv_id:
275+
return conv_id
276+
return None
277+
278+
261279
def _extract_conversation_id(attrs_list: list[dict]) -> str | None:
262280
"""Extract gen_ai.conversation.id from OTLP span attributes."""
263281
for attr in attrs_list:

src/agentevals/streaming/ws_server.py

Lines changed: 59 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
from ..api.models import (
1616
SessionInfo,
1717
WSSessionCompleteEvent,
18+
WSSessionRemovedEvent,
1819
WSSessionStartedEvent,
1920
WSSpanReceivedEvent,
2021
)
@@ -227,14 +228,21 @@ async def get_or_create_otlp_session(self, trace_id: str, metadata: dict) -> Tra
227228
active = self.sessions.get(active_id)
228229
if active and not active.is_complete:
229230
active.trace_ids.add(trace_id)
231+
if conversation_id:
232+
await self._absorb_orphan_for_trace(trace_id, active)
230233
return active
231234
if active and active.is_complete and conversation_id:
232235
self._reopen_session(active, trace_id, session_name)
236+
await self._absorb_orphan_for_trace(trace_id, active)
233237
return active
234238

235239
existing = self.find_session_by_trace_id(trace_id)
236-
if existing and existing.is_complete:
237-
self._reopen_session(existing, trace_id, session_name)
240+
if existing:
241+
if existing.is_complete:
242+
self._reopen_session(existing, trace_id, session_name)
243+
else:
244+
existing.trace_ids.add(trace_id)
245+
self._active_session_for_name[session_name] = existing.session_id
238246
return existing
239247

240248
session_id = session_name
@@ -349,6 +357,55 @@ def _reopen_session(self, session: TraceSession, trace_id: str, session_name: st
349357
len(session.spans),
350358
)
351359

360+
async def _absorb_orphan_for_trace(self, trace_id: str, target: TraceSession) -> None:
361+
"""Merge an orphan session into the target when conversation_id is discovered.
362+
363+
When infrastructure spans (no conversation_id) arrive before agent spans,
364+
they create a separate session keyed by trace_id. Once the conversation_id
365+
is known and routes to the correct session, the orphan's data is merged
366+
and the orphan session is removed.
367+
"""
368+
orphan = None
369+
orphan_id = None
370+
for sid, session in self.sessions.items():
371+
if sid == target.session_id:
372+
continue
373+
if trace_id in session.trace_ids:
374+
orphan = session
375+
orphan_id = sid
376+
break
377+
378+
if not orphan:
379+
return
380+
381+
target.spans.extend(orphan.spans)
382+
target.logs.extend(orphan.logs)
383+
target.trace_ids.update(orphan.trace_ids)
384+
if orphan.has_root_span:
385+
target.has_root_span = True
386+
387+
del self.sessions[orphan_id]
388+
for name, mapped_id in list(self._active_session_for_name.items()):
389+
if mapped_id == orphan_id:
390+
del self._active_session_for_name[name]
391+
for timer_map in (self._completion_timers, self._idle_timers):
392+
if orphan_id in timer_map:
393+
timer_map.pop(orphan_id).cancel()
394+
self.incremental_extractors.pop(orphan_id, None)
395+
396+
await self.broadcast_to_ui(
397+
WSSessionRemovedEvent(
398+
session_id=orphan_id,
399+
absorbed_by=target.session_id,
400+
).model_dump(by_alias=True)
401+
)
402+
logger.info(
403+
"Absorbed orphan session %s (%d spans) into %s",
404+
orphan_id,
405+
len(orphan.spans),
406+
target.session_id,
407+
)
408+
352409
async def _delayed_complete(self, session_id: str, delay: float) -> None:
353410
await asyncio.sleep(delay)
354411
await self._complete_otlp_session(session_id)

ui/src/components/streaming/LiveStreamingView.tsx

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -318,6 +318,17 @@ export function LiveStreamingView() {
318318
});
319319
break;
320320

321+
case 'session_removed':
322+
if (import.meta.env.DEV) {
323+
console.log('[Streaming] Session removed:', data.sessionId, 'absorbed by:', data.absorbedBy);
324+
}
325+
setActiveSessions(prev => {
326+
const newMap = new Map(prev);
327+
newMap.delete(data.sessionId);
328+
return newMap;
329+
});
330+
break;
331+
321332
case 'session_complete':
322333
if (import.meta.env.DEV) {
323334
console.log('[Streaming] Session complete with invocations:', data.invocations?.length);

0 commit comments

Comments
 (0)