Skip to content

Commit 82a88d8

Browse files
feat(eval): enqueue call trace ids onto evaluation redis queue
Push the bare OTEL trace_id to a Redis list at call end (atomic SETNX+RPUSH via Lua) for the evaluation worker to drain later. Co-Authored-By: Claude <noreply@anthropic.com>
1 parent 58d3fbc commit 82a88d8

4 files changed

Lines changed: 127 additions & 0 deletions

File tree

app/ai/voice/agents/breeze_buddy/handlers/internal/end_conversation.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,11 @@
77
from app.ai.voice.agents.breeze_buddy.callbacks import (
88
service_callback,
99
)
10+
from app.ai.voice.agents.breeze_buddy.observability.evaluations import (
11+
enqueue_trace_for_evaluation,
12+
)
1013
from app.ai.voice.agents.breeze_buddy.observability.tracing_setup import (
14+
get_trace_id,
1115
update_span_with_evaluation_data,
1216
)
1317
from app.ai.voice.agents.breeze_buddy.template.context import TemplateContext
@@ -259,6 +263,12 @@ async def end_conversation(context: TemplateContext, args, transition_to=None):
259263
# so context.lead.outcome reflects the final persisted value.
260264
update_span_with_evaluation_data(context)
261265

266+
# Enqueue the trace for internal LLM-as-judge evaluation.
267+
# Best-effort + idempotent per trace_id; never breaks call teardown.
268+
evaluation_trace_id = get_trace_id(context.root_span)
269+
if evaluation_trace_id:
270+
await enqueue_trace_for_evaluation(evaluation_trace_id)
271+
262272
# Execute end_conversation_callbacks
263273
if context.end_conversation_callbacks:
264274
logger.info(
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
"""Internal LLM-as-judge evaluation subsystem.
2+
3+
Phase 1 ships only the producer: ``enqueue_trace_for_evaluation``, called
4+
from the call-end handler with a bare trace_id. The worker (runner,
5+
llm_client, actions), the DB three-layer, the Pydantic types, and the REST
6+
API land in later phases.
7+
"""
8+
9+
from .queue import (
10+
ENQUEUED_KEY_PREFIX,
11+
ENQUEUED_TTL_SECONDS,
12+
TRACE_QUEUE_KEY,
13+
enqueue_trace_for_evaluation,
14+
)
15+
16+
__all__ = [
17+
"ENQUEUED_KEY_PREFIX",
18+
"ENQUEUED_TTL_SECONDS",
19+
"TRACE_QUEUE_KEY",
20+
"enqueue_trace_for_evaluation",
21+
]
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
"""Producer for the internal evaluation trace queue.
2+
3+
When a call ends, the OTEL/Langfuse trace_id is pushed onto a Redis list so
4+
the evaluation worker (later phase) can drain it and fetch the full trace
5+
from Langfuse. ONLY the trace_id is stored here — every other field (tags,
6+
call_sid, transcription, payload, ...) is read back from Langfuse by the
7+
worker, keeping the queue minimal.
8+
9+
This module owns only the producer half of the key layout. The consumer
10+
half (inflight ZSET + atomic pop + self-sweep) ships with the worker in a
11+
later phase and imports the constants below:
12+
13+
evaluation:trace_queue LIST trace_ids awaiting the worker
14+
evaluation:enqueued:{id} STRING (SETNX) dedup-at-enqueue marker
15+
16+
The dedup marker and the queue push run inside a single Redis Lua script so
17+
they are atomic: a failure between them can never leave a marker that
18+
suppresses retries for the TTL window. This mirrors how the dispatch system
19+
uses ``run_script`` for atomic multi-key ops, and is safe on the single-node
20+
Redis that prod runs.
21+
"""
22+
23+
from app.core.logger import logger
24+
from app.services.redis.client import get_redis_service
25+
26+
TRACE_QUEUE_KEY = "evaluation:trace_queue"
27+
ENQUEUED_KEY_PREFIX = "evaluation:enqueued:"
28+
# A trace is "done" once judged; cap the dedup/replay window at a week so a
29+
# retried call-end within that window won't requeue an already-queued trace.
30+
ENQUEUED_TTL_SECONDS = 7 * 24 * 3600
31+
32+
# Atomically set the dedup marker AND enqueue the trace_id, so a crash or
33+
# Redis failure between the two can never leave a marker that suppresses
34+
# retries. Returns 1 if newly enqueued, 0 if already enqueued (dedup).
35+
# KEYS[1] = evaluation:trace_queue (LIST)
36+
# KEYS[2] = evaluation:enqueued:{trace_id} (STRING marker)
37+
# ARGV[1] = trace_id ARGV[2] = marker TTL (seconds)
38+
_ENQUEUE_SCRIPT = """
39+
if redis.call('SET', KEYS[2], '1', 'NX', 'EX', ARGV[2]) then
40+
redis.call('RPUSH', KEYS[1], ARGV[1])
41+
return 1
42+
end
43+
return 0
44+
"""
45+
46+
47+
async def enqueue_trace_for_evaluation(trace_id: str) -> bool:
48+
"""Push a trace_id onto the evaluation queue, once per trace.
49+
50+
Dedups via a SETNX marker inside the same atomic Lua script that RPUSHes
51+
the trace_id, so the marker and the queue entry can never diverge.
52+
Best-effort: any failure (Redis down, script error) is logged and
53+
swallowed so it can never break call teardown. Returns True only when the
54+
trace was newly enqueued.
55+
"""
56+
if not trace_id:
57+
return False
58+
59+
try:
60+
redis = await get_redis_service()
61+
marker = f"{ENQUEUED_KEY_PREFIX}{trace_id}"
62+
result = await redis.run_script(
63+
_ENQUEUE_SCRIPT,
64+
keys=[TRACE_QUEUE_KEY, marker],
65+
args=[trace_id, ENQUEUED_TTL_SECONDS],
66+
)
67+
if result == 1:
68+
logger.info(f"Enqueued trace {trace_id} for evaluation")
69+
return True
70+
if result == 0:
71+
logger.debug(f"Trace {trace_id} already enqueued for evaluation; skipping")
72+
return False
73+
# run_script swallows Redis errors and returns None on failure.
74+
logger.error(f"Failed to enqueue trace {trace_id}: script returned {result!r}")
75+
return False
76+
except Exception as e: # noqa: BLE001
77+
logger.error(f"Failed to enqueue trace for evaluation: {e}")
78+
return False

app/ai/voice/agents/breeze_buddy/observability/tracing_setup.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from opentelemetry.sdk.resources import SERVICE_NAME, Resource
99
from opentelemetry.sdk.trace import TracerProvider
1010
from opentelemetry.sdk.trace.export import BatchSpanProcessor
11+
from opentelemetry.trace import format_trace_id
1112

1213
from app.ai.voice.agents.breeze_buddy.template.context import TemplateContext
1314
from app.core.config.static import (
@@ -240,3 +241,20 @@ def update_span_with_evaluation_data(context: TemplateContext) -> None:
240241

241242
except Exception as e:
242243
logger.error(f"Error updating span with evaluation data: {e}")
244+
245+
246+
def get_trace_id(span: Optional[trace.Span]) -> Optional[str]:
247+
"""Return the 32-char hex OTEL trace_id for a span, or None.
248+
249+
The OTEL trace_id is a 1:1 mapping to the Langfuse trace.id (the OTLP
250+
exporter ships this exact id). Returns None when tracing is disabled
251+
(mirrors update_span_with_evaluation_data's guard) or the span is
252+
missing, so callers can short-circuit safely.
253+
"""
254+
if not span or not ENABLE_BREEZE_BUDDY_TRACING:
255+
return None
256+
try:
257+
return format_trace_id(span.get_span_context().trace_id)
258+
except Exception as e: # noqa: BLE001
259+
logger.error(f"Error extracting trace_id from span: {e}")
260+
return None

0 commit comments

Comments
 (0)