feat: listener should inject the context into the context so that it …#561
Conversation
…keeps span/trace ids
WalkthroughListen shutdown ordering was adjusted to close the done channel after waiting and final logging. Message handling now injects a contextual logger into the message context and uses it for per-message logs. A new test verifies logger injection and OpenTelemetry W3C tracecontext propagation from message metadata. Changes
Sequence Diagram(s)(omitted — changes are localized to logger/context injection and a unit test; no new multi-component sequential flow requiring visualization) Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes Poem
🚥 Pre-merge checks | ✅ 1 | ❌ 2❌ Failed checks (1 warning, 1 inconclusive)
✅ Passed checks (1 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
Tip Try Coding Plans. Let us write the prompt for your AI agent so you can ship faster (with fewer bugs). Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (1)
queue/listener_test.go (1)
60-62: Use bounded waits here to avoid hanging the test.If the callback stops firing, this blocks until the package timeout. A
selectwithtime.Afterorrequire.Eventuallywill make this fail fast and localize the regression.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@queue/listener_test.go` around lines 60 - 62, Replace the unbounded blocking receives on the done channel and listener.Done() (the lines using "<-done" and "<-listener.Done()") with bounded waits so the test fails fast if the callback stops firing: use a select with a time.After timeout (or require.Eventually) to wait for done and for listener.Done(), and keep the cancel() call as-is between those waits; ensure the timeout duration is short and documented in the test.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@queue/listener_test.go`:
- Around line 54-66: The test currently only verifies logger injection but not
trace/span propagation; update the test that calls listener.Listen(ctx, ch)
(using message.NewMessage) to seed the message's Metadata with a traceparent
header (e.g., via msg.Metadata["traceparent"] = "...") before sending it into
ch, then assert that the buffer (buf.String()) contains the expected trace/span
identifiers produced by the callback; ensure this happens after the Extract(...)
call path in the listener so the test fails if trace context is not propagated
and reference the message.Message, msg.Metadata, listener.Listen and
Extract(...) code paths when making the change.
---
Nitpick comments:
In `@queue/listener_test.go`:
- Around line 60-62: Replace the unbounded blocking receives on the done channel
and listener.Done() (the lines using "<-done" and "<-listener.Done()") with
bounded waits so the test fails fast if the callback stops firing: use a select
with a time.After timeout (or require.Eventually) to wait for done and for
listener.Done(), and keep the cancel() call as-is between those waits; ensure
the timeout duration is short and documented in the test.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 4602ffe9-14df-4d33-91b8-8830825a7acb
📒 Files selected for processing (2)
queue/listener.goqueue/listener_test.go
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (2)
queue/listener_test.go (2)
39-40: Consider restoring the global propagator in cleanup.
otel.SetTextMapPropagatormodifies global state. If other tests in the suite expect a different propagator (or none), this could cause flaky behavior. Usingt.Cleanupto restore the original propagator improves test isolation.🧹 Suggested cleanup pattern
func TestHandleMessageInjectsLoggerAndPropagatesTraceContext(t *testing.T) { // Register W3C TraceContext propagator so Extract parses traceparent from metadata + originalPropagator := otel.GetTextMapPropagator() otel.SetTextMapPropagator(propagation.TraceContext{}) + t.Cleanup(func() { otel.SetTextMapPropagator(originalPropagator) })🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@queue/listener_test.go` around lines 39 - 40, The test sets a global propagator with otel.SetTextMapPropagator(propagation.TraceContext{}), which changes global state; capture the current propagator with otel.GetTextMapPropagator() before calling otel.SetTextMapPropagator, and register a t.Cleanup that restores the original propagator by calling otel.SetTextMapPropagator(original) so the test does not leave the global propagator modified; reference otel.SetTextMapPropagator, otel.GetTextMapPropagator, propagation.TraceContext{}, and t.Cleanup when making the change.
73-78: Consider adding a test timeout to prevent indefinite hangs.If the callback panics or the listener fails to invoke the callback, the test will block forever on
<-done. A context timeout orselectwith a timer provides a safety net.⏱️ Example with timeout
- ctx, cancel := context.WithCancel(context.Background()) + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() listener.Listen(ctx, ch) - <-done - cancel() + select { + case <-done: + case <-ctx.Done(): + t.Fatal("test timed out waiting for callback") + } <-listener.Done()🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@queue/listener_test.go` around lines 73 - 78, The test currently can hang indefinitely waiting on <-done; wrap the context or the wait with a timeout so the test fails fast instead of blocking: create the context with a timeout (e.g. context.WithTimeout) or use a select that waits on done and a time.After, then cancel and assert the timeout case if it fires; apply this change around the Listen call and the final waits that reference ctx, cancel, done and listener.Done() to ensure the test always returns.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@queue/listener_test.go`:
- Around line 56-60: The test currently uses
trace.SpanFromContext(ctx).SpanContext() which returns a no-op span's
SpanContext when no active span exists; instead use
trace.SpanContextFromContext(ctx) to get the SpanContext that Extract()
populated, then assert its TraceID/SpanID equal expectedTraceID/expectedSpanID;
alternatively, if you prefer the listener to create an active span, call
tracer.Start(ctx, "handleMessage") inside the listener after Extract() so
SpanFromContext will return the active span—update either the test (replace
SpanFromContext usage with trace.SpanContextFromContext) or the listener (call
tracer.Start after Extract()) accordingly.
---
Nitpick comments:
In `@queue/listener_test.go`:
- Around line 39-40: The test sets a global propagator with
otel.SetTextMapPropagator(propagation.TraceContext{}), which changes global
state; capture the current propagator with otel.GetTextMapPropagator() before
calling otel.SetTextMapPropagator, and register a t.Cleanup that restores the
original propagator by calling otel.SetTextMapPropagator(original) so the test
does not leave the global propagator modified; reference
otel.SetTextMapPropagator, otel.GetTextMapPropagator,
propagation.TraceContext{}, and t.Cleanup when making the change.
- Around line 73-78: The test currently can hang indefinitely waiting on <-done;
wrap the context or the wait with a timeout so the test fails fast instead of
blocking: create the context with a timeout (e.g. context.WithTimeout) or use a
select that waits on done and a time.After, then cancel and assert the timeout
case if it fires; apply this change around the Listen call and the final waits
that reference ctx, cancel, done and listener.Done() to ensure the test always
returns.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 5bbf54cd-5db4-4051-b3eb-fd6811a5e2f0
📒 Files selected for processing (2)
queue/listener.goqueue/listener_test.go
🚧 Files skipped from review as they are similar to previous changes (1)
- queue/listener.go
| // Verify trace propagation: Extract() should populate ctx with trace/span from metadata | ||
| sc := trace.SpanFromContext(ctx).SpanContext() | ||
| logging.FromContext(ctx).Infof( | ||
| fmt.Sprintf("trace_id=%s span_id=%s", sc.TraceID().String(), sc.SpanID().String()), | ||
| ) |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Check how the listener handles trace context extraction and span creation
ast-grep --pattern 'func ($RECV) handleMessage($$$) {
$$$
}'
# Also search for Extract calls and tracer.Start calls in listener.go
rg -n -A5 'Extract\(' --glob '**/listener.go'
rg -n -A5 'tracer\.Start\(' --glob '**/listener.go'
rg -n -A3 'SpanFromContext\|SpanContextFromContext' --glob '**/listener.go'Repository: formancehq/go-libs
Length of output: 1434
The test uses an incorrect approach to verify trace propagation; fix the assertion method.
The listener calls Extract() (line 118, queue/listener.go) but does not start a span with the extracted context. When SpanFromContext(ctx) is called after Extract(), it returns a no-op span because there is no active span in the context. Calling SpanContext() on a no-op span returns an invalid (zero) SpanContext, not the extracted trace/span IDs from the traceparent metadata.
The assertions on lines 82-83 expecting the actual expectedTraceID and expectedSpanID values will fail. To verify trace propagation, either:
- Start an active span with the extracted context in the listener:
ctx, span := tracer.Start(ctx, "handleMessage")after Extract(), or - Update the test to verify the extracted context directly via the propagation API rather than through SpanFromContext().
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@queue/listener_test.go` around lines 56 - 60, The test currently uses
trace.SpanFromContext(ctx).SpanContext() which returns a no-op span's
SpanContext when no active span exists; instead use
trace.SpanContextFromContext(ctx) to get the SpanContext that Extract()
populated, then assert its TraceID/SpanID equal expectedTraceID/expectedSpanID;
alternatively, if you prefer the listener to create an active span, call
tracer.Start(ctx, "handleMessage") inside the listener after Extract() so
SpanFromContext will return the active span—update either the test (replace
SpanFromContext usage with trace.SpanContextFromContext) or the listener (call
tracer.Start after Extract()) accordingly.
There was a problem hiding this comment.
It's already correct, even the no-op span context has the values.
There was a problem hiding this comment.
@fguery, you're absolutely right — my review comment was incorrect. Looking at the OTel Go documentation more carefully:
ContextWithRemoteSpanContext sets rsc explicitly as a remote SpanContext and as the current Span, and the Span implementation that wraps rsc "is non-recording and performs no operations other than to return rsc as the SpanContext from the SpanContext method."
So when the W3C TraceContext propagator's Extract() is called with the traceparent metadata, it invokes ContextWithRemoteSpanContext internally, which wraps the parsed SpanContext (with the real TraceID and SpanID) as the current span in the context. Consequently, trace.SpanFromContext(ctx).SpanContext() correctly returns the extracted SpanContext with the actual trace and span IDs — not a zero/invalid one.
The test is correct as-is. Apologies for the noise!
✏️ Learnings added
Learnt from: fguery
Repo: formancehq/go-libs PR: 561
File: queue/listener_test.go:56-60
Timestamp: 2026-03-09T09:16:24.464Z
Learning: In OpenTelemetry Go, `ContextWithRemoteSpanContext` (called internally by the W3C TraceContext propagator's `Extract`) sets the extracted `SpanContext` as the current Span in the context via a non-recording wrapper. Therefore, `trace.SpanFromContext(ctx).SpanContext()` correctly returns the extracted TraceID and SpanID after `Extract()` — even without calling `tracer.Start()` to create an active span.
Note: Learnings are effective only in the context of similar code segments. To apply general coding guidelines, please configure review instructions. You can manage existing learnings in the app.
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #561 +/- ##
==========================================
- Coverage 28.81% 28.80% -0.01%
==========================================
Files 175 175
Lines 7063 7065 +2
==========================================
Hits 2035 2035
- Misses 4911 4912 +1
- Partials 117 118 +1 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
🧹 Nitpick comments (1)
queue/listener.go (1)
119-122: Consider simplifying to avoid redundantWithContextcall.
ContextWithLoggeralready callslogger.WithContext(ctx)internally (seelogging/context.go:22-24), so callingl.logger.WithContext(ctx)on line 119 creates an intermediate logger that's then wrapped again.You could simplify to:
♻️ Suggested simplification
func (l *listener) handleMessage(ctx context.Context, msg *message.Message) { ctx = otel.GetTextMapPropagator().Extract(ctx, propagation.MapCarrier(msg.Metadata)) - logger := l.logger.WithContext(ctx) - ctx = logging.ContextWithLogger(ctx, logger) - - logger.WithField("message_uuid", msg.UUID).Debugf("queue listener handling message") + ctx = logging.ContextWithLogger(ctx, l.logger) + logging.FromContext(ctx).WithField("message_uuid", msg.UUID).Debugf("queue listener handling message") err := l.callbackFn(ctx, msg.Metadata, msg.Payload) if err != nil { - logger.WithField("message_uuid", msg.UUID).WithField("err", err.Error()).Errorf("queue listener failed to process message") + logging.FromContext(ctx).WithField("message_uuid", msg.UUID).WithField("err", err.Error()).Errorf("queue listener failed to process message") msg.Nack() return }Alternatively, keep the local
loggervariable for convenience but derive it from the context after injection:♻️ Alternative: keep local variable
func (l *listener) handleMessage(ctx context.Context, msg *message.Message) { ctx = otel.GetTextMapPropagator().Extract(ctx, propagation.MapCarrier(msg.Metadata)) - logger := l.logger.WithContext(ctx) - ctx = logging.ContextWithLogger(ctx, logger) + ctx = logging.ContextWithLogger(ctx, l.logger) + logger := logging.FromContext(ctx)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@queue/listener.go` around lines 119 - 122, The current code redundantly calls l.logger.WithContext(ctx) and then logging.ContextWithLogger(ctx, logger); remove the intermediate logger := l.logger.WithContext(ctx) call and instead inject l.logger into the context with ctx = logging.ContextWithLogger(ctx, l.logger), then derive the local logger from the context (e.g., logger := logging.FromContext(ctx) or LoggerFromContext) before using logger.WithField(...). This eliminates the duplicated WithContext invocation while keeping a convenient local logger variable and preserves the existing logging.ContextWithLogger and WithField usage.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@queue/listener.go`:
- Around line 119-122: The current code redundantly calls
l.logger.WithContext(ctx) and then logging.ContextWithLogger(ctx, logger);
remove the intermediate logger := l.logger.WithContext(ctx) call and instead
inject l.logger into the context with ctx = logging.ContextWithLogger(ctx,
l.logger), then derive the local logger from the context (e.g., logger :=
logging.FromContext(ctx) or LoggerFromContext) before using
logger.WithField(...). This eliminates the duplicated WithContext invocation
while keeping a convenient local logger variable and preserves the existing
logging.ContextWithLogger and WithField usage.
…keeps span/trace ids