Skip to content

Commit 409c398

Browse files
Merge pull request #32 from agentevals-dev/fix/ordering
Fix conversation element ordering issue with batching
2 parents 8009635 + 860fdd6 commit 409c398

3 files changed

Lines changed: 52 additions & 14 deletions

File tree

src/agentevals/streaming/incremental_processor.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -174,9 +174,9 @@ def process_log(self, log_event: dict) -> list[dict]:
174174
event_name = log_event.get("event_name", "")
175175
body = log_event.get("body", {})
176176

177-
invocation_id = self.current_invocation_id
177+
invocation_id = log_event.get("span_id")
178178
if not invocation_id:
179-
invocation_id = log_event.get("span_id")
179+
invocation_id = self.current_invocation_id
180180
if not invocation_id:
181181
return updates
182182

src/agentevals/streaming/ws_server.py

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -261,15 +261,6 @@ async def get_or_create_otlp_session(
261261
self._active_session_for_name[session_name] = session_id
262262
self.incremental_extractors[session_id] = IncrementalInvocationExtractor()
263263

264-
replayed = self._replay_orphan_logs(session)
265-
extractor = self.incremental_extractors.get(session_id)
266-
if extractor and replayed:
267-
for log_event in replayed:
268-
updates = extractor.process_log(log_event)
269-
for update in updates:
270-
update["sessionId"] = session_id
271-
await self.broadcast_to_ui(update)
272-
273264
await self.broadcast_to_ui(WSSessionStartedEvent(
274265
session=SessionInfo(
275266
session_id=session_id,
@@ -282,6 +273,15 @@ async def get_or_create_otlp_session(
282273
),
283274
).model_dump(by_alias=True))
284275

276+
replayed = self._replay_orphan_logs(session)
277+
extractor = self.incremental_extractors.get(session_id)
278+
if extractor and replayed:
279+
for log_event in replayed:
280+
updates = extractor.process_log(log_event)
281+
for update in updates:
282+
update["sessionId"] = session_id
283+
await self.broadcast_to_ui(update)
284+
285285
logger.info("Auto-created OTLP session: %s (trace: %s)", session_id, trace_id)
286286
return session
287287

ui/src/components/streaming/LiveStreamingView.tsx

Lines changed: 41 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,38 @@ import { useEffect, useRef, useState } from 'react';
22
import { useTraceContext } from '../../context/TraceContext';
33
import { SessionCard } from './SessionCard';
44
import { config } from '../../config';
5-
import type { LiveSession, StreamingInvocation } from '../../lib/types';
5+
import type { ConversationElement, LiveSession, StreamingInvocation } from '../../lib/types';
6+
7+
function invocationsToElements(invocations: StreamingInvocation[]): ConversationElement[] {
8+
return invocations.flatMap((inv, idx) => {
9+
const elements: ConversationElement[] = [];
10+
if (inv.userText) {
11+
elements.push({
12+
type: 'user_input',
13+
timestamp: idx * 3,
14+
invocationId: inv.invocationId,
15+
data: { text: inv.userText },
16+
});
17+
}
18+
for (const tc of inv.toolCalls || []) {
19+
elements.push({
20+
type: 'tool_call',
21+
timestamp: idx * 3 + 1,
22+
invocationId: inv.invocationId,
23+
data: { toolCall: tc },
24+
});
25+
}
26+
if (inv.agentText) {
27+
elements.push({
28+
type: 'agent_response',
29+
timestamp: idx * 3 + 2,
30+
invocationId: inv.invocationId,
31+
data: { text: inv.agentText },
32+
});
33+
}
34+
return elements;
35+
});
36+
}
637

738
export function LiveStreamingView() {
839
const { state, actions } = useTraceContext();
@@ -67,7 +98,9 @@ export function LiveStreamingView() {
6798
status: s.isComplete ? 'complete' : 'active',
6899
metadata: (s.metadata ?? {}) as Record<string, string>,
69100
invocations: s.invocations,
70-
liveElements: [],
101+
liveElements: s.invocations?.length
102+
? invocationsToElements(s.invocations)
103+
: [],
71104
liveStats: { totalInputTokens: 0, totalOutputTokens: 0 },
72105
startedAt: s.startedAt,
73106
});
@@ -272,7 +305,9 @@ export function LiveStreamingView() {
272305
status: 'complete',
273306
metadata: {},
274307
invocations: data.invocations,
275-
liveElements: [],
308+
liveElements: data.invocations?.length
309+
? invocationsToElements(data.invocations)
310+
: [],
276311
liveStats: {
277312
totalInputTokens: 0,
278313
totalOutputTokens: 0,
@@ -284,6 +319,9 @@ export function LiveStreamingView() {
284319
...session,
285320
status: 'complete',
286321
invocations: data.invocations,
322+
liveElements: data.invocations?.length
323+
? invocationsToElements(data.invocations)
324+
: session.liveElements,
287325
});
288326
}
289327

0 commit comments

Comments
 (0)