Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions queue/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,11 +115,14 @@ func (l *listener) startWorker(ctx context.Context, messages <-chan *message.Mes
}

func (l *listener) handleMessage(ctx context.Context, msg *message.Message) {
l.logger.WithField("message_uuid", msg.UUID).Debugf("queue listener handling message")
ctx = otel.GetTextMapPropagator().Extract(ctx, propagation.MapCarrier(msg.Metadata))
ctx = logging.ContextWithLogger(ctx, l.logger)
logger := logging.FromContext(ctx)

logger.WithField("message_uuid", msg.UUID).Debugf("queue listener handling message")
err := l.callbackFn(ctx, msg.Metadata, msg.Payload)
if err != nil {
l.logger.WithField("message_uuid", msg.UUID).WithField("err", err.Error()).Errorf("queue listener failed to process message")
logger.WithField("message_uuid", msg.UUID).WithField("err", err.Error()).Errorf("queue listener failed to process message")
msg.Nack()
return
}
Expand Down
38 changes: 38 additions & 0 deletions queue/listener_test.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
package queue_test

import (
"bytes"
"context" //nolint: gosec
"os"
"testing"

"github.com/ThreeDotsLabs/watermill/message"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

Expand All @@ -27,3 +30,38 @@ func TestNewListenerInvalidCallback(t *testing.T) {
assert.ErrorContains(t, err, "callback")
assert.Nil(t, listener)
}

func TestHandleMessageInjectsLoggerInContext(t *testing.T) {
// Create a logger that writes to a buffer so we can verify it's the one in context
var buf bytes.Buffer
l := logrus.New()
l.SetOutput(&buf)
l.SetLevel(logrus.DebugLevel)
logger := logging.NewLogrus(l)

done := make(chan struct{})
callback := func(ctx context.Context, meta map[string]string, msg []byte) error {
// Log using the logger from context — if ContextWithLogger was called,
// this writes to our buffer; if not, it writes to stderr (default fallback)
logging.FromContext(ctx).Infof("hello from callback")
close(done)
return nil
}

listener, err := queue.NewListener(logger, callback, 1)
require.NoError(t, err)

ch := make(chan *message.Message, 1)
ch <- message.NewMessage("test-uuid", []byte("test-payload"))

ctx, cancel := context.WithCancel(context.Background())
listener.Listen(ctx, ch)

<-done
cancel()
<-listener.Done()

// The injected logger writes to buf; the default fallback writes to stderr.
// If our buffer contains the callback message, the logger was properly injected.
assert.Contains(t, buf.String(), "hello from callback")
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated
}
Loading