track user bot turn metrics#817
Conversation
WalkthroughA new ChangesPipecat Metrics Collection and Persistence
Estimated code review effort🎯 2 (Simple) | ⏱️ ~10 minutes Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
app/ai/voice/agents/breeze_buddy/handlers/internal/end_conversation.py (1)
76-110:⚠️ Potential issue | 🟠 Major | ⚡ Quick winFlush the speech gate before snapshotting
history.
TranscriptionGateProcessoronly copies buffereduser_*/tts_*values intocontext.messageswhen a later frame triggers_flush_timestamps(). If the call ends right after the lastUserStoppedSpeakingFrameorBotStoppedSpeakingFrame,end_conversation()serializeshistorybefore that buffered state is materialized, so the last-turn metrics are missing frommetaData["transcription"]. TheEndFramequeued infinallyis too late to rescue it.TemplateContextalready exposesspeech_gate, so flush it immediately before iteratingcontext.context.messages.🧩 Minimal fix shape
if context.context: + if context.speech_gate is not None: + context.speech_gate.flush_timestamps() history = context.context.messagesAdd a small public
flush_timestamps()wrapper onTranscriptionGateProcessorrather than calling the private helper directly.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@app/ai/voice/agents/breeze_buddy/handlers/internal/end_conversation.py` around lines 76 - 110, The history snapshot misses the last-turn user/tts timestamps because the TranscriptionGateProcessor only materializes buffered timestamps when its private flush runs; add a public flush_timestamps() method on TranscriptionGateProcessor that invokes the existing private _flush_timestamps(), then call that new method on the TemplateContext's speech_gate (e.g., context.context.speech_gate.flush_timestamps()) immediately before reading context.context.messages in end_conversation (the loop that builds transcription/filtered_transcript) so buffered user_start/user_end/tts_start/tts_end values are materialized into context.messages before serialization into context.lead.metaData["transcription"].
🧹 Nitpick comments (1)
app/ai/voice/agents/breeze_buddy/processors/transcription_gate.py (1)
184-216: ⚡ Quick winDon't swallow timestamp flush failures.
If
context.messages[-1]ever stops being a mutable dict or a malformed message slips through, thisexcept Exception: passturns the new turn-metrics path into a silent no-op. Catch only the expected "nothing to flush" cases and log unexpected failures so missing metrics are diagnosable.♻️ Suggested tightening
- except Exception: - # If context is empty or uninitialized, do nothing safely - pass + except (AttributeError, IndexError, TypeError): + logger.debug( + "TranscriptionGate: skipping timestamp flush; no writable message" + ) + except Exception: + logger.exception( + "TranscriptionGate: unexpected failure while flushing timestamps" + )🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@app/ai/voice/agents/breeze_buddy/processors/transcription_gate.py` around lines 184 - 216, The _flush_timestamps method currently swallows all exceptions, hiding real errors; change the try/except to only guard the expected "nothing to flush" conditions (e.g., check self.context and getattr(self.context, "messages", None) before accessing messages and bail early) and remove the broad except Exception: pass; instead catch and handle specific exceptions (like IndexError or AttributeError when self.context.messages is empty or not a list/dict) and log unexpected failures via the module logger (include details of self.context, msg and exception) so failures in manipulating msg (the last message) or missing attributes (user_start/user_end/tts_start/tts_end) are visible for debugging.Source: Linters/SAST tools
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Outside diff comments:
In `@app/ai/voice/agents/breeze_buddy/handlers/internal/end_conversation.py`:
- Around line 76-110: The history snapshot misses the last-turn user/tts
timestamps because the TranscriptionGateProcessor only materializes buffered
timestamps when its private flush runs; add a public flush_timestamps() method
on TranscriptionGateProcessor that invokes the existing private
_flush_timestamps(), then call that new method on the TemplateContext's
speech_gate (e.g., context.context.speech_gate.flush_timestamps()) immediately
before reading context.context.messages in end_conversation (the loop that
builds transcription/filtered_transcript) so buffered
user_start/user_end/tts_start/tts_end values are materialized into
context.messages before serialization into
context.lead.metaData["transcription"].
---
Nitpick comments:
In `@app/ai/voice/agents/breeze_buddy/processors/transcription_gate.py`:
- Around line 184-216: The _flush_timestamps method currently swallows all
exceptions, hiding real errors; change the try/except to only guard the expected
"nothing to flush" conditions (e.g., check self.context and
getattr(self.context, "messages", None) before accessing messages and bail
early) and remove the broad except Exception: pass; instead catch and handle
specific exceptions (like IndexError or AttributeError when
self.context.messages is empty or not a list/dict) and log unexpected failures
via the module logger (include details of self.context, msg and exception) so
failures in manipulating msg (the last message) or missing attributes
(user_start/user_end/tts_start/tts_end) are visible for debugging.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 2fbc1d30-a9f5-45e2-895a-fc6f11b485d7
📒 Files selected for processing (3)
app/ai/voice/agents/breeze_buddy/agent/pipeline.pyapp/ai/voice/agents/breeze_buddy/handlers/internal/end_conversation.pyapp/ai/voice/agents/breeze_buddy/processors/transcription_gate.py
PR #817 — track user/bot turn metricsVerdict: no blocking issues — looks good. Gates all green: What it actually doesDespite the title, this PR does not count turns or write to a metrics table — it injects four ISO timestamps ( Checked and cleared
🟨 Minor (not blocking)
If the intent really is turn counts (the title suggests so), flag that this PR delivers timestamps only. |
702ddfa to
4db51e6
Compare
|
@coderabbitai review |
✅ Action performedReview finished.
|
There was a problem hiding this comment.
Actionable comments posted: 3
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@app/ai/voice/agents/breeze_buddy/handlers/internal/end_conversation.py`:
- Around line 221-225: The metrics collection block starting with the hasattr
check on context.bot.metrics_collector is not isolated from the outer try block,
so if get_metrics() throws an exception, it will abort the entire DB update and
callback execution in the finalization path. Wrap the metrics collection logic
(the entire if block containing the hasattr check and the assignment to
context.lead.metaData) in its own try-except handler that catches any exceptions
from get_metrics(), logs the error, and continues execution. This makes the
metrics persistence best-effort and prevents it from disrupting the critical
call finalization steps.
In `@app/ai/voice/agents/breeze_buddy/processors/metrics_collector_processor.py`:
- Around line 18-19: The self._frames_seen set in the
metrics_collector_processor initialization is storing frame IDs indefinitely
without any bounds, causing unbounded memory growth on long-running calls.
Replace the unbounded set with a bounded data structure that enforces a maximum
size limit (such as an LRU cache, a deque with maxlen, or a fixed-size circular
buffer). Ensure this bounded structure is used consistently wherever frame
deduplication occurs in the processor to prevent memory degradation over time
during extended voice agent calls.
- Around line 14-21: Add missing type annotations to three methods in the
metrics_collector_processor class. In the `__init__` method, annotate the
`**kwargs` parameter as `**kwargs: Any` and add `-> None` return type
annotation. In the `process_frame` method, add the `-> None` return type
annotation. In the `_commit_turn` method, add the `-> None` return type
annotation. These changes align with the coding guidelines requiring type hints
on all function signatures.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: ffa358b0-363c-4211-a5a9-d261a68eec42
📒 Files selected for processing (4)
app/ai/voice/agents/breeze_buddy/agent/__init__.pyapp/ai/voice/agents/breeze_buddy/agent/pipeline.pyapp/ai/voice/agents/breeze_buddy/handlers/internal/end_conversation.pyapp/ai/voice/agents/breeze_buddy/processors/metrics_collector_processor.py
| # Store Pipecat metrics collected during the call | ||
| if hasattr(context.bot, "metrics_collector") and context.bot.metrics_collector: | ||
| context.lead.metaData["pipecat_metrics"] = ( | ||
| context.bot.metrics_collector.get_metrics() | ||
| ) |
There was a problem hiding this comment.
Isolate metrics write failures from call finalization path.
If get_metrics() throws here, the outer try aborts DB update and callback execution for end-of-call finalization. This new persistence step should be best-effort.
Proposed fix
# Store Pipecat metrics collected during the call
if hasattr(context.bot, "metrics_collector") and context.bot.metrics_collector:
- context.lead.metaData["pipecat_metrics"] = (
- context.bot.metrics_collector.get_metrics()
- )
+ try:
+ context.lead.metaData["pipecat_metrics"] = (
+ context.bot.metrics_collector.get_metrics()
+ )
+ except Exception as metrics_error:
+ logger.warning(
+ f"Failed to collect pipecat metrics for call {context.call_sid}: {metrics_error}"
+ )🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@app/ai/voice/agents/breeze_buddy/handlers/internal/end_conversation.py`
around lines 221 - 225, The metrics collection block starting with the hasattr
check on context.bot.metrics_collector is not isolated from the outer try block,
so if get_metrics() throws an exception, it will abort the entire DB update and
callback execution in the finalization path. Wrap the metrics collection logic
(the entire if block containing the hasattr check and the assignment to
context.lead.metaData) in its own try-except handler that catches any exceptions
from get_metrics(), logs the error, and continues execution. This makes the
metrics persistence best-effort and prevents it from disrupting the critical
call finalization steps.
| def __init__(self, **kwargs): | ||
| super().__init__(**kwargs) | ||
| self._turns: list[Dict[str, Any]] = [] | ||
| self._current_turn_metrics = defaultdict(dict) | ||
| self._frames_seen = set() | ||
| self._turn_count = 1 | ||
|
|
||
| async def process_frame(self, frame: Frame, direction: FrameDirection): |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
python - <<'PY'
import ast
from pathlib import Path
p = Path("app/ai/voice/agents/breeze_buddy/processors/metrics_collector_processor.py")
tree = ast.parse(p.read_text())
for node in ast.walk(tree):
if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)):
if node.returns is None:
print(f"Missing return annotation: Line {node.lineno} -> {node.name}")
if node.name == "__init__":
for arg in node.args.kwarg,:
if arg is not None and arg.annotation is None:
print(f"Missing annotation for **{arg.arg}: Line {arg.lineno}")
PYRepository: juspay/clairvoyance
Length of output: 254
🏁 Script executed:
cat -n app/ai/voice/agents/breeze_buddy/processors/metrics_collector_processor.pyRepository: juspay/clairvoyance
Length of output: 2907
Add missing type annotations to method signatures.
Methods in this processor are missing return type annotations. Add -> None to __init__ (line 14), process_frame (line 21), and _commit_turn (line 46). Also annotate **kwargs on line 14 as **kwargs: Any.
Per coding guidelines: "Add type hints on all function signatures."
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@app/ai/voice/agents/breeze_buddy/processors/metrics_collector_processor.py`
around lines 14 - 21, Add missing type annotations to three methods in the
metrics_collector_processor class. In the `__init__` method, annotate the
`**kwargs` parameter as `**kwargs: Any` and add `-> None` return type
annotation. In the `process_frame` method, add the `-> None` return type
annotation. In the `_commit_turn` method, add the `-> None` return type
annotation. These changes align with the coding guidelines requiring type hints
on all function signatures.
Source: Coding guidelines
| self._frames_seen = set() | ||
| self._turn_count = 1 |
There was a problem hiding this comment.
Bound dedupe state to avoid unbounded memory growth.
Line 18 + Lines 27-29 keep every processed frame id forever. On long-running calls this grows without limit and can degrade worker memory.
Proposed fix
-from collections import defaultdict
+from collections import defaultdict, deque
@@
class MetricsCollectorProcessor(FrameProcessor):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self._turns: list[Dict[str, Any]] = []
self._current_turn_metrics = defaultdict(dict)
- self._frames_seen = set()
+ self._frames_seen: set[Any] = set()
+ self._frames_seen_order: deque[Any] = deque()
+ self._max_seen_frames = 5000
self._turn_count = 1
@@
elif isinstance(frame, MetricsFrame):
- if frame.id not in self._frames_seen:
- self._frames_seen.add(frame.id)
+ if frame.id not in self._frames_seen:
+ self._frames_seen.add(frame.id)
+ self._frames_seen_order.append(frame.id)
+ if len(self._frames_seen_order) > self._max_seen_frames:
+ oldest = self._frames_seen_order.popleft()
+ self._frames_seen.discard(oldest)
for data in frame.data:Also applies to: 27-29
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@app/ai/voice/agents/breeze_buddy/processors/metrics_collector_processor.py`
around lines 18 - 19, The self._frames_seen set in the
metrics_collector_processor initialization is storing frame IDs indefinitely
without any bounds, causing unbounded memory growth on long-running calls.
Replace the unbounded set with a bounded data structure that enforces a maximum
size limit (such as an LRU cache, a deque with maxlen, or a fixed-size circular
buffer). Ensure this bounded structure is used consistently wherever frame
deduplication occurs in the processor to prevent memory degradation over time
during extended voice agent calls.
Summary by CodeRabbit
Release Notes