Skip to content

Commit 8009635

Browse files
Merge pull request #31 from agentevals-dev/fix/live-session-groupping
Fix incorrect span groupping on UI
2 parents b7c262a + 23ad2da commit 8009635

6 files changed

Lines changed: 349 additions & 9 deletions

File tree

src/agentevals/streaming/session.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ class TraceSession:
2020
logs: list[dict] = field(default_factory=list)
2121
started_at: datetime = field(default_factory=lambda: datetime.now(UTC))
2222
is_complete: bool = False
23+
completed_at: datetime | None = None
2324
metadata: dict = field(default_factory=dict)
2425
source: str = "websocket"
2526
has_root_span: bool = False

src/agentevals/streaming/ws_server.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,11 @@ async def get_or_create_otlp_session(
233233
active.trace_ids.add(trace_id)
234234
return active
235235

236+
existing = self.find_session_by_trace_id(trace_id)
237+
if existing and existing.is_complete:
238+
self._reopen_session(existing, trace_id, session_name)
239+
return existing
240+
236241
session_id = session_name
237242
if session_id in self.sessions:
238243
counter = 2
@@ -323,6 +328,29 @@ def schedule_log_reextraction(self, session_id: str) -> None:
323328
self._delayed_reextract(session_id, self.reextraction_delay_seconds)
324329
)
325330

331+
def _reopen_session(
332+
self, session: TraceSession, trace_id: str, session_name: str
333+
) -> None:
334+
"""Reopen a completed session when a trace_id already in the session
335+
receives more spans after completion (split-batch scenario).
336+
337+
The OTLP BatchSpanProcessor may flush one turn's spans across the
338+
completion boundary: some child spans arrive before the grace period
339+
fires, and the root span (plus remaining children) arrives after.
340+
Because the trace_id was already registered in the session, we know
341+
these late spans belong here rather than to a new agent run.
342+
"""
343+
session.is_complete = False
344+
session.completed_at = None
345+
session.trace_ids.add(trace_id)
346+
self._active_session_for_name[session_name] = session.session_id
347+
self.incremental_extractors[session.session_id] = IncrementalInvocationExtractor()
348+
self.reset_idle_timer(session.session_id)
349+
logger.info(
350+
"Reopened session %s for trace %s (%d spans so far)",
351+
session.session_id, trace_id, len(session.spans),
352+
)
353+
326354
async def _delayed_complete(self, session_id: str, delay: float) -> None:
327355
await asyncio.sleep(delay)
328356
await self._complete_otlp_session(session_id)
@@ -376,6 +404,7 @@ async def _complete_otlp_session(self, session_id: str) -> None:
376404
return
377405

378406
session.is_complete = True
407+
session.completed_at = datetime.now(UTC)
379408

380409
for name, sid in list(self._active_session_for_name.items()):
381410
if sid == session_id:

tests/integration/test_live_agents.py

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,3 +187,107 @@ def test_session_visible_via_api(self, live_servers):
187187
assert resp.status_code == 200
188188
session_ids = [s["sessionId"] for s in resp.json()["data"]]
189189
assert session_name in session_ids
190+
191+
192+
@_skip_no_openai
193+
class TestAgentRerun:
194+
"""Verify that re-running an agent with the same session_name creates
195+
separate sessions, not merging them into one.
196+
197+
Each subprocess is a new OS process with new trace_ids. The OTLP
198+
receiver must recognize these as distinct runs and assign unique
199+
session IDs (session_name, session_name-2, etc.)."""
200+
201+
def test_strands_rerun_creates_separate_sessions(self, live_servers):
202+
"""Run the Strands agent twice with the same session_name.
203+
Each run must produce its own session."""
204+
main_port, otlp_port, mgr = live_servers
205+
session_name = "e2e-strands-rerun"
206+
strands_env = {
207+
"OTEL_SEMCONV_STABILITY_OPT_IN": "gen_ai_latest_experimental",
208+
}
209+
210+
result1 = _run_agent(
211+
"examples/zero-code-examples/strands/run.py",
212+
otlp_port,
213+
session_name,
214+
extra_env=strands_env,
215+
)
216+
assert result1.returncode == 0, f"Run 1 failed:\n{result1.stderr}"
217+
wait_for_session_complete_sync(mgr, session_name, timeout=30)
218+
219+
result2 = _run_agent(
220+
"examples/zero-code-examples/strands/run.py",
221+
otlp_port,
222+
session_name,
223+
extra_env=strands_env,
224+
)
225+
assert result2.returncode == 0, f"Run 2 failed:\n{result2.stderr}"
226+
wait_for_session_complete_sync(mgr, f"{session_name}-2", timeout=30)
227+
228+
s1 = mgr.sessions[session_name]
229+
s2 = mgr.sessions[f"{session_name}-2"]
230+
231+
assert s1.is_complete and s2.is_complete
232+
assert len(s1.spans) > 0
233+
assert len(s2.spans) > 0
234+
assert s1.trace_ids.isdisjoint(s2.trace_ids), (
235+
f"Sessions share trace_ids: {s1.trace_ids & s2.trace_ids}"
236+
)
237+
238+
def test_langchain_rerun_creates_separate_sessions(self, live_servers):
239+
"""Run the LangChain agent twice with the same session_name.
240+
Each run must produce its own session."""
241+
main_port, otlp_port, mgr = live_servers
242+
session_name = "e2e-langchain-rerun"
243+
244+
result1 = _run_agent(
245+
"examples/zero-code-examples/langchain/run.py",
246+
otlp_port,
247+
session_name,
248+
)
249+
assert result1.returncode == 0, f"Run 1 failed:\n{result1.stderr}"
250+
wait_for_session_complete_sync(mgr, session_name, timeout=30)
251+
252+
result2 = _run_agent(
253+
"examples/zero-code-examples/langchain/run.py",
254+
otlp_port,
255+
session_name,
256+
)
257+
assert result2.returncode == 0, f"Run 2 failed:\n{result2.stderr}"
258+
wait_for_session_complete_sync(mgr, f"{session_name}-2", timeout=30)
259+
260+
s1 = mgr.sessions[session_name]
261+
s2 = mgr.sessions[f"{session_name}-2"]
262+
263+
assert s1.is_complete and s2.is_complete
264+
assert len(s1.spans) > 0
265+
assert len(s2.spans) > 0
266+
assert s1.trace_ids.isdisjoint(s2.trace_ids), (
267+
f"Sessions share trace_ids: {s1.trace_ids & s2.trace_ids}"
268+
)
269+
270+
def test_rerun_sessions_visible_via_api(self, live_servers):
271+
"""Both rerun sessions are visible in the API response."""
272+
main_port, otlp_port, mgr = live_servers
273+
session_name = "e2e-strands-rerun-api"
274+
strands_env = {
275+
"OTEL_SEMCONV_STABILITY_OPT_IN": "gen_ai_latest_experimental",
276+
}
277+
278+
for run_idx in range(2):
279+
result = _run_agent(
280+
"examples/zero-code-examples/strands/run.py",
281+
otlp_port,
282+
session_name,
283+
extra_env=strands_env,
284+
)
285+
assert result.returncode == 0, f"Run {run_idx + 1} failed:\n{result.stderr}"
286+
expected_id = session_name if run_idx == 0 else f"{session_name}-2"
287+
wait_for_session_complete_sync(mgr, expected_id, timeout=30)
288+
289+
resp = httpx.get(f"http://127.0.0.1:{main_port}/api/streaming/sessions")
290+
assert resp.status_code == 200
291+
session_ids = [s["sessionId"] for s in resp.json()["data"]]
292+
assert session_name in session_ids
293+
assert f"{session_name}-2" in session_ids

tests/integration/test_session_grouping.py

Lines changed: 168 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,7 @@ async def test_sequential_runs_unique_ids(self, trace_manager, otlp_client):
166166
await send_traces(otlp_client, body)
167167
await wait_for_session_complete(trace_manager, "repeated")
168168

169-
# Second run — same name but first is complete
169+
# Second run — same name but new trace_id → new session
170170
body = make_trace_request(
171171
trace_id="run2",
172172
session_name="repeated",
@@ -317,3 +317,170 @@ async def test_span_limit_enforcement(self, trace_manager, otlp_client):
317317
await send_traces(otlp_client, body2)
318318

319319
assert len(session.spans) == MAX_SPANS_PER_SESSION
320+
321+
322+
class TestSplitBatchReopen:
323+
"""Regression tests for the split-batch scenario where the OTLP
324+
BatchSpanProcessor flushes one trace's spans across the session
325+
completion boundary. Some child spans arrive before the grace period
326+
fires, the root span arrives after. Because the trace_id was already
327+
registered in the session, the session is reopened.
328+
329+
Bug report: strands-zero-code vs strands-zero-code-2 — a 3-turn
330+
conversation was broken into 2 sessions because turn 3's spans
331+
were split across the completion boundary."""
332+
333+
async def test_split_trace_reopens_completed_session(
334+
self, trace_manager, otlp_client
335+
):
336+
"""When child spans arrive before completion and the root span
337+
arrives after, the session reopens because the trace_id already
338+
exists in the session."""
339+
session_name = "split-reopen"
340+
341+
# Batch 1: turn 1 root + turn 2 child spans in same flush
342+
await send_traces(otlp_client, make_trace_request(
343+
trace_id="sr-t1", session_name=session_name,
344+
spans=[
345+
make_genai_span(trace_id="sr-t1", span_id="t1-root", parent_span_id=None),
346+
make_genai_span(trace_id="sr-t2", span_id="t2-child",
347+
parent_span_id="t2-root"),
348+
],
349+
))
350+
await wait_for_session_complete(trace_manager, session_name)
351+
352+
session = trace_manager.sessions[session_name]
353+
assert session.is_complete
354+
assert "sr-t2" in session.trace_ids
355+
356+
# Batch 2: turn 2 root span arrives after completion
357+
await send_traces(otlp_client, make_trace_request(
358+
trace_id="sr-t2", session_name=session_name,
359+
spans=[
360+
make_genai_span(trace_id="sr-t2", span_id="t2-root", parent_span_id=None),
361+
],
362+
))
363+
364+
assert not session.is_complete
365+
assert len(trace_manager.sessions) == 1
366+
assert session.trace_ids == {"sr-t1", "sr-t2"}
367+
368+
await wait_for_session_complete(trace_manager, session_name)
369+
assert session.is_complete
370+
assert len(session.spans) == 3
371+
372+
async def test_strands_three_turn_bug_repro(
373+
self, trace_manager, otlp_client
374+
):
375+
"""Reproduces the exact bug from the Strands SDK report: the
376+
BatchSpanProcessor flushes turns 1-2 and partial turn 3 in one
377+
batch, then the rest of turn 3 in a second batch after the
378+
session completes. All spans must end up in a single session."""
379+
session_name = "strands-repro"
380+
381+
# Batch 1: turns 1 & 2 fully, plus turn 3 child spans
382+
await send_traces(otlp_client, make_trace_request(
383+
trace_id="t1", session_name=session_name,
384+
spans=[
385+
make_genai_span(trace_id="t1", span_id="t1-llm", parent_span_id="t1-root"),
386+
make_genai_span(trace_id="t1", span_id="t1-root", parent_span_id=None,
387+
name="invoke_agent"),
388+
make_genai_span(trace_id="t2", span_id="t2-llm", parent_span_id="t2-root"),
389+
make_genai_span(trace_id="t2", span_id="t2-tool",
390+
parent_span_id="t2-root", name="execute_tool roll_die"),
391+
make_genai_span(trace_id="t2", span_id="t2-root", parent_span_id=None,
392+
name="invoke_agent"),
393+
# Turn 3 child spans — flushed in same batch
394+
make_genai_span(trace_id="t3", span_id="t3-llm", parent_span_id="t3-root"),
395+
make_genai_span(trace_id="t3", span_id="t3-tool",
396+
parent_span_id="t3-root", name="execute_tool check_prime"),
397+
],
398+
))
399+
await wait_for_session_complete(trace_manager, session_name)
400+
401+
session = trace_manager.sessions[session_name]
402+
assert session.is_complete
403+
assert "t3" in session.trace_ids
404+
405+
# Batch 2: turn 3 root span + remaining spans (after completion)
406+
await send_traces(otlp_client, make_trace_request(
407+
trace_id="t3", session_name=session_name,
408+
spans=[
409+
make_genai_span(trace_id="t3", span_id="t3-loop",
410+
parent_span_id="t3-root", name="execute_event_loop_cycle"),
411+
make_genai_span(trace_id="t3", span_id="t3-root", parent_span_id=None,
412+
name="invoke_agent"),
413+
],
414+
))
415+
416+
assert not session.is_complete
417+
await wait_for_session_complete(trace_manager, session_name)
418+
419+
assert len(trace_manager.sessions) == 1
420+
assert session.trace_ids == {"t1", "t2", "t3"}
421+
assert len(session.spans) == 9
422+
423+
async def test_new_trace_after_completion_creates_new_session(
424+
self, trace_manager, otlp_client
425+
):
426+
"""A completely new trace_id after session completion creates a
427+
new session (not a reopen). This is the re-run case."""
428+
session_name = "no-reopen"
429+
430+
await send_traces(otlp_client, make_trace_request(
431+
trace_id="run-1", session_name=session_name,
432+
spans=[make_genai_span(trace_id="run-1", parent_span_id=None)],
433+
))
434+
await wait_for_session_complete(trace_manager, session_name)
435+
436+
# New trace_id (not seen before) → new session
437+
await send_traces(otlp_client, make_trace_request(
438+
trace_id="run-2", session_name=session_name,
439+
spans=[make_genai_span(trace_id="run-2", parent_span_id=None)],
440+
))
441+
await wait_for_session_complete(trace_manager, f"{session_name}-2")
442+
443+
assert len(trace_manager.sessions) == 2
444+
assert trace_manager.sessions[session_name].trace_ids == {"run-1"}
445+
assert trace_manager.sessions[f"{session_name}-2"].trace_ids == {"run-2"}
446+
447+
async def test_reopen_preserves_existing_spans_and_logs(
448+
self, trace_manager, otlp_client
449+
):
450+
"""Reopening a session preserves all previously collected spans and logs."""
451+
session_name = "preserve"
452+
453+
# Send spans and logs with trace_id "pres-t1", PLUS a child span
454+
# from "pres-t2" so its trace_id is registered for reopen
455+
await send_traces(otlp_client, make_trace_request(
456+
trace_id="pres-t1", session_name=session_name,
457+
spans=[
458+
make_genai_span(trace_id="pres-t1", parent_span_id=None),
459+
make_genai_span(trace_id="pres-t2", span_id="t2-child",
460+
parent_span_id="t2-root"),
461+
],
462+
))
463+
await send_logs(otlp_client, make_log_request(
464+
trace_id="pres-t1", session_name=session_name,
465+
log_records=[
466+
make_genai_log("gen_ai.user.message", "Turn 1", trace_id="pres-t1"),
467+
],
468+
))
469+
await wait_for_session_complete(trace_manager, session_name)
470+
471+
session = trace_manager.sessions[session_name]
472+
spans_before = len(session.spans)
473+
logs_before = len(session.logs)
474+
assert spans_before >= 2
475+
assert logs_before >= 1
476+
477+
# Reopen via split-batch trace_id match
478+
await send_traces(otlp_client, make_trace_request(
479+
trace_id="pres-t2", session_name=session_name,
480+
spans=[make_genai_span(trace_id="pres-t2", span_id="t2-root",
481+
parent_span_id=None)],
482+
))
483+
await wait_for_session_complete(trace_manager, session_name)
484+
485+
assert len(session.spans) == spans_before + 1
486+
assert len(session.logs) == logs_before

0 commit comments

Comments
 (0)