feat: Metrics for queues and event writer#922
Conversation
Signed-off-by: jannfis <jann@mistrust.net>
|
Important Review skippedDraft detected. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
📝 WalkthroughWalkthroughThis PR adds comprehensive Prometheus metrics instrumentation for event writing and queue depth monitoring across principal and agent components. New collectors track event queue depths, EventWriter pending/resend states, and retry exhaustion events through optional callbacks and observer patterns integrated into the EventWriter. Changes
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~30 minutes Possibly related PRs
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (3)
internal/queue/queue_test.go (1)
126-147: Minor: inconsistent callback guard style.The first
ObserveDepthscallback (Line 128) writes unconditionally while the second (Line 139) guards withif name == "agent1". Since only one pair exists, both work, but mixing styles makes it unclear whether the guard matters. Consider making both consistent — either always unconditional (since only one pair exists here) or always guarded — to keep intent unambiguous.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/queue/queue_test.go` around lines 126 - 147, Make the two ObserveDepths callbacks consistent by guarding assignments in the first callback the same way as the second: in the ObserveDepths call where you set gotName, gotSend, gotRecv, wrap the assignments with if name == "agent1" { ... } (keeping the second callback as-is that checks name) so both callbacks use the same intent; update the ObserveDepths lambda that currently assigns unconditionally to only assign when name == "agent1" and leave uses of sendQ.Get and sendQ.Done unchanged.internal/event/event_writer.go (1)
351-358: Callback invoked while holdingew.muwrite lock.
onRetryExhausted(resID)is called inside theew.mu.Lock()critical section. For the metric-increment callers added in this PR this is fine, but this puts a sharp edge on the publicWithOnRetryExhaustedAPI: any user-supplied callback that calls back intoEventWriter(e.g.,Get/Add/Remove) will deadlock, and a slow callback will stall all other writers/readers on thisEventWriter.Consider either (a) documenting on
WithOnRetryExhaustedthat the callback must be non-blocking and must not touch theEventWriter, or (b) invoking the hook after the lock is released:♻️ Proposed refactor to invoke hook outside the lock
ew.mu.Lock() + removed := false if cur, ok := ew.sentEvents[resID]; ok && cur == sentMsg { - if ew.onRetryExhausted != nil { - ew.onRetryExhausted(resID) - } delete(ew.sentEvents, resID) + removed = true } ew.mu.Unlock() + if removed && ew.onRetryExhausted != nil { + ew.onRetryExhausted(resID) + } returnNote:
ew.onRetryExhaustedis set once at construction and never mutated, so reading it outside the lock is safe.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/event/event_writer.go` around lines 351 - 358, The callback ew.onRetryExhausted is being invoked while holding ew.mu which can deadlock or block other operations; change the code to perform the map lookup, deletion, and capture whether the hook should be called (and a local copy of ew.onRetryExhausted) while holding ew.mu, then release ew.mu and invoke the captured callback outside the lock; specifically update the block around ew.sentEvents[resID] so you still check cur == sentMsg and delete(ew.sentEvents, resID) under the lock but assign a local variable like cb := ew.onRetryExhausted and call cb(resID) only after ew.mu.Unlock().internal/metrics/queue_depth_test.go (1)
63-81: Tighten the collector test against unexpected series.This test currently passes even if the collector emits extra metric series or an unexpected
directionlabel. Add an exact metric count and a default failure branch so regressions are caught.Proposed test hardening
var sendVal, recvVal float64 var seenSend, seenRecv bool + require.Len(t, mf.GetMetric(), 2) for _, m := range mf.GetMetric() { + require.Equal(t, "agent-a", labelValue(m.GetLabel(), queueDepthLabelQueue)) dir := labelValue(m.GetLabel(), queueDepthLabelDirection) switch dir { case "send": seenSend = true - require.Equal(t, "agent-a", labelValue(m.GetLabel(), queueDepthLabelQueue)) sendVal = m.GetGauge().GetValue() case "recv": seenRecv = true - require.Equal(t, "agent-a", labelValue(m.GetLabel(), queueDepthLabelQueue)) recvVal = m.GetGauge().GetValue() + default: + require.Failf(t, "unexpected direction label", "direction=%q", dir) } }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/metrics/queue_depth_test.go` around lines 63 - 81, Tighten the test in queue_depth_test.go by asserting the exact number of series and failing on unexpected directions: check that len(mf.GetMetric()) == 2 before iterating, and add a default branch in the switch over labelValue(m.GetLabel(), queueDepthLabelDirection) that calls require.Failf or t.Fatalf (with a message including the unexpected direction and metric labels) so any extra or unknown series will make the test fail; keep the existing checks for "send" and "recv" (asserting queue label via queueDepthLabelQueue and the gauge values) unchanged.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@agent/agent.go`:
- Around line 642-646: CurrentEventWriter currently reads a.eventWriter without
synchronization while handleStreamEvents assigns it from another goroutine,
causing a data race; change the Agent field backing eventWriter to a
synchronized holder (either use sync/atomic's atomic.Pointer[*event.EventWriter]
or protect with a sync.RWMutex), update CurrentEventWriter to return the safely
loaded value (use Load() or read under RLock), and update the code in
handleStreamEvents (where it calls event.NewEventWriter and ew.UpdateTarget) to
Store/CompareAndSwap the new writer or perform writes under Lock so reads from
the Prometheus scrape handler see a consistent value.
---
Nitpick comments:
In `@internal/event/event_writer.go`:
- Around line 351-358: The callback ew.onRetryExhausted is being invoked while
holding ew.mu which can deadlock or block other operations; change the code to
perform the map lookup, deletion, and capture whether the hook should be called
(and a local copy of ew.onRetryExhausted) while holding ew.mu, then release
ew.mu and invoke the captured callback outside the lock; specifically update the
block around ew.sentEvents[resID] so you still check cur == sentMsg and
delete(ew.sentEvents, resID) under the lock but assign a local variable like cb
:= ew.onRetryExhausted and call cb(resID) only after ew.mu.Unlock().
In `@internal/metrics/queue_depth_test.go`:
- Around line 63-81: Tighten the test in queue_depth_test.go by asserting the
exact number of series and failing on unexpected directions: check that
len(mf.GetMetric()) == 2 before iterating, and add a default branch in the
switch over labelValue(m.GetLabel(), queueDepthLabelDirection) that calls
require.Failf or t.Fatalf (with a message including the unexpected direction and
metric labels) so any extra or unknown series will make the test fail; keep the
existing checks for "send" and "recv" (asserting queue label via
queueDepthLabelQueue and the gauge values) unchanged.
In `@internal/queue/queue_test.go`:
- Around line 126-147: Make the two ObserveDepths callbacks consistent by
guarding assignments in the first callback the same way as the second: in the
ObserveDepths call where you set gotName, gotSend, gotRecv, wrap the assignments
with if name == "agent1" { ... } (keeping the second callback as-is that checks
name) so both callbacks use the same intent; update the ObserveDepths lambda
that currently assigns unconditionally to only assign when name == "agent1" and
leave uses of sendQ.Get and sendQ.Done unchanged.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 5f468389-8199-4170-aa85-66945b73f136
📒 Files selected for processing (11)
agent/agent.goagent/connection.godocs/operations/metrics.mdinternal/event/event_writer.gointernal/event/event_writer_test.gointernal/metrics/queue_depth.gointernal/metrics/queue_depth_test.gointernal/queue/queue.gointernal/queue/queue_test.goprincipal/apis/eventstream/eventstream.goprincipal/server.go
| // CurrentEventWriter returns the outbound event writer when connected, or nil. | ||
| // It implements metrics.EventWriterLookup for Prometheus scraping. | ||
| func (a *Agent) CurrentEventWriter() *event.EventWriter { | ||
| return a.eventWriter | ||
| } |
There was a problem hiding this comment.
Data race on a.eventWriter.
CurrentEventWriter() reads a.eventWriter without synchronization, while handleStreamEvents in agent/connection.go (Line 188) assigns a.eventWriter = event.NewEventWriter(...) from a different goroutine. Metrics scrapes happen on the Prometheus HTTP handler goroutine, so this is a concurrent read/write of an interface/pointer value — a data race per the Go memory model, even though the pointer is never reassigned after the first write.
At best the scraper will see nil briefly and skip; at worst -race builds will flag it and on some architectures the read could observe a torn value.
🔒 Proposed fix using atomic.Pointer (or a mutex)
- eventWriter *event.EventWriter
+ eventWriter atomic.Pointer[event.EventWriter]Callers then use a.eventWriter.Load() / a.eventWriter.Store(ew) / a.eventWriter.CompareAndSwap(...). CurrentEventWriter() becomes:
func (a *Agent) CurrentEventWriter() *event.EventWriter {
return a.eventWriter.Load()
}And in agent/connection.go:
if ew := a.eventWriter.Load(); ew == nil {
// build opts...
a.eventWriter.Store(event.NewEventWriter("", stream, ewOpts...))
} else {
ew.UpdateTarget(stream)
}A dedicated sync.RWMutex guarding eventWriter would also work.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@agent/agent.go` around lines 642 - 646, CurrentEventWriter currently reads
a.eventWriter without synchronization while handleStreamEvents assigns it from
another goroutine, causing a data race; change the Agent field backing
eventWriter to a synchronized holder (either use sync/atomic's
atomic.Pointer[*event.EventWriter] or protect with a sync.RWMutex), update
CurrentEventWriter to return the safely loaded value (use Load() or read under
RLock), and update the code in handleStreamEvents (where it calls
event.NewEventWriter and ew.UpdateTarget) to Store/CompareAndSwap the new writer
or perform writes under Lock so reads from the Prometheus scrape handler see a
consistent value.
Signed-off-by: jannfis <jann@mistrust.net>
Signed-off-by: jannfis <jann@mistrust.net>
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #922 +/- ##
==========================================
+ Coverage 46.85% 47.08% +0.23%
==========================================
Files 122 124 +2
Lines 17524 22213 +4689
==========================================
+ Hits 8210 10458 +2248
- Misses 8555 10995 +2440
- Partials 759 760 +1 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
|
Superseded by #931 |
What does this PR do / why we need it:
Add metrics for queue depths and events in the event writer
Which issue(s) this PR fixes:
Fixes #?
How to test changes / Special notes to the reviewer:
Checklist
Summary by CodeRabbit
New Features
Documentation