From f246a716b6acb5b47556c5b75bbf6bb2e23e925a Mon Sep 17 00:00:00 2001 From: Eli Treuherz Date: Tue, 19 Dec 2023 17:36:19 +0000 Subject: [PATCH] Add context to ProducerInterceptor This change adds a context.Context argument to the ProducerInterceptor interface, and passes it between the pre- and post-Send interceptor methods. Having this makes it much easier to write useful interceptors that can integrate with common tracing SDKs like OpenTelemetry, as the context is the conventional method for propagating metadata vertically through a call stack. For an example of another library using a similar convention, see: https://github.com/jackc/pgx/blob/9ab9e3c40bbb33c6f37359c87508cbc6a9830ed6/tracer.go#L10 Fixes #443 --- .../pulsartracing/producer_interceptor.go | 15 ++++-- pulsar/producer_interceptor.go | 24 ++++++--- pulsar/producer_partition.go | 25 +++++---- pulsar/producer_test.go | 53 +++++++++++++------ 4 files changed, 80 insertions(+), 37 deletions(-) diff --git a/pulsar/internal/pulsartracing/producer_interceptor.go b/pulsar/internal/pulsartracing/producer_interceptor.go index 6c7728cf0a..465ee3db2d 100644 --- a/pulsar/internal/pulsartracing/producer_interceptor.go +++ b/pulsar/internal/pulsartracing/producer_interceptor.go @@ -29,13 +29,20 @@ const toPrefix = "To__" type ProducerInterceptor struct { } -func (t *ProducerInterceptor) BeforeSend(producer pulsar.Producer, message *pulsar.ProducerMessage) { +func (t *ProducerInterceptor) BeforeSend( + ctx context.Context, + producer pulsar.Producer, + message *pulsar.ProducerMessage, +) context.Context { buildAndInjectSpan(message, producer).Finish() + return ctx } -func (t *ProducerInterceptor) OnSendAcknowledgement(producer pulsar.Producer, - message *pulsar.ProducerMessage, - msgID pulsar.MessageID) { +func (t *ProducerInterceptor) OnSendAcknowledgement( + _ context.Context, + _ pulsar.Producer, + _ *pulsar.ProducerMessage, + _ pulsar.MessageID) { } func buildAndInjectSpan(message *pulsar.ProducerMessage, producer pulsar.Producer) opentracing.Span { diff --git a/pulsar/producer_interceptor.go b/pulsar/producer_interceptor.go index e18994cfcd..ae9297e198 100644 --- a/pulsar/producer_interceptor.go +++ b/pulsar/producer_interceptor.go @@ -17,27 +17,39 @@ package pulsar +import "context" + type ProducerInterceptor interface { // BeforeSend This is called before send the message to the brokers. This method is allowed to modify the // message. - BeforeSend(producer Producer, message *ProducerMessage) + BeforeSend(ctx context.Context, producer Producer, message *ProducerMessage) context.Context // OnSendAcknowledgement This method is called when the message sent to the broker has been acknowledged, // or when sending the message fails. - OnSendAcknowledgement(producer Producer, message *ProducerMessage, msgID MessageID) + OnSendAcknowledgement(ctx context.Context, producer Producer, message *ProducerMessage, msgID MessageID) } type ProducerInterceptors []ProducerInterceptor -func (x ProducerInterceptors) BeforeSend(producer Producer, message *ProducerMessage) { +func (x ProducerInterceptors) BeforeSend( + ctx context.Context, + producer Producer, + message *ProducerMessage, +) context.Context { for i := range x { - x[i].BeforeSend(producer, message) + ctx = x[i].BeforeSend(ctx, producer, message) } + return ctx } -func (x ProducerInterceptors) OnSendAcknowledgement(producer Producer, message *ProducerMessage, msgID MessageID) { +func (x ProducerInterceptors) OnSendAcknowledgement( + ctx context.Context, + producer Producer, + message *ProducerMessage, + msgID MessageID, +) { for i := range x { - x[i].OnSendAcknowledgement(producer, message, msgID) + x[i].OnSendAcknowledgement(ctx, producer, message, msgID) } } diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index 1b79053e38..d7820f38d0 100755 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -1001,7 +1001,7 @@ func (p *partitionProducer) Send(ctx context.Context, msg *ProducerMessage) (Mes isDone := uAtomic.NewBool(false) doneCh := make(chan struct{}) - p.internalSendAsync(ctx, msg, func(ID MessageID, message *ProducerMessage, e error) { + ctx = p.internalSendAsync(ctx, msg, func(ID MessageID, message *ProducerMessage, e error) { if isDone.CAS(false, true) { err = e msgID = ID @@ -1202,11 +1202,11 @@ func (p *partitionProducer) internalSendAsync( msg *ProducerMessage, callback func(MessageID, *ProducerMessage, error), flushImmediately bool, -) { +) context.Context { if err := p.validateMsg(msg); err != nil { p.log.Error(err) runCallback(callback, nil, msg, err) - return + return ctx } sr := sendRequestPool.Get().(*sendRequest) @@ -1224,26 +1224,27 @@ func (p *partitionProducer) internalSendAsync( if err := p.prepareTransaction(sr); err != nil { sr.done(nil, err) - return + return ctx } if p.getProducerState() != producerReady { sr.done(nil, ErrProducerClosed) - return + return ctx } - p.options.Interceptors.BeforeSend(p, msg) + ctx = p.options.Interceptors.BeforeSend(ctx, p, msg) + sr.ctx = ctx if err := p.updateSchema(sr); err != nil { p.log.Error(err) sr.done(nil, err) - return + return ctx } if err := p.updateUncompressedPayload(sr); err != nil { p.log.Error(err) sr.done(nil, err) - return + return ctx } p.updateMetaData(sr) @@ -1251,16 +1252,18 @@ func (p *partitionProducer) internalSendAsync( if err := p.updateChunkInfo(sr); err != nil { p.log.Error(err) sr.done(nil, err) - return + return ctx } if err := p.reserveResources(sr); err != nil { p.log.Error(err) sr.done(nil, err) - return + return ctx } p.dataChan <- sr + + return ctx } func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt) { @@ -1505,7 +1508,7 @@ func (sr *sendRequest) done(msgID MessageID, err error) { if sr.totalChunks <= 1 || sr.chunkID == sr.totalChunks-1 { if sr.producer.options.Interceptors != nil { - sr.producer.options.Interceptors.OnSendAcknowledgement(sr.producer, sr.msg, msgID) + sr.producer.options.Interceptors.OnSendAcknowledgement(sr.ctx, sr.producer, sr.msg, msgID) } } } diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go index 0d74cdeefe..e5a5a5540f 100644 --- a/pulsar/producer_test.go +++ b/pulsar/producer_test.go @@ -1479,23 +1479,43 @@ func TestProducuerSendFailOnInvalidKey(t *testing.T) { type noopProduceInterceptor struct{} -func (noopProduceInterceptor) BeforeSend(producer Producer, message *ProducerMessage) {} +func (noopProduceInterceptor) BeforeSend(ctx context.Context, _ Producer, _ *ProducerMessage) context.Context { + return ctx +} -func (noopProduceInterceptor) OnSendAcknowledgement(producer Producer, message *ProducerMessage, msgID MessageID) { +func (noopProduceInterceptor) OnSendAcknowledgement(_ context.Context, _ Producer, _ *ProducerMessage, _ MessageID) { } -// copyPropertyIntercepotr copy all keys in message properties map and add a suffix -type metricProduceInterceptor struct { - sendn int - ackn int +type trackingProduceInterceptor struct { + sendn int + ackn int + maxDuration time.Duration } -func (x *metricProduceInterceptor) BeforeSend(producer Producer, message *ProducerMessage) { - x.sendn++ +type beforeSendCtxKey struct{} + +func (i *trackingProduceInterceptor) BeforeSend(ctx context.Context, _ Producer, msg *ProducerMessage) context.Context { + i.sendn++ + ctx = context.WithValue(ctx, beforeSendCtxKey{}, time.Now()) + return ctx } -func (x *metricProduceInterceptor) OnSendAcknowledgement(producer Producer, message *ProducerMessage, msgID MessageID) { - x.ackn++ +func (i *trackingProduceInterceptor) OnSendAcknowledgement( + ctx context.Context, + _ Producer, + _ *ProducerMessage, + _ MessageID, +) { + var dur time.Duration + if v := ctx.Value(beforeSendCtxKey{}); v != nil { + dur = time.Since(v.(time.Time)) + } + + if dur > i.maxDuration { + i.maxDuration = dur + } + + i.ackn++ } func TestProducerWithInterceptors(t *testing.T) { @@ -1518,14 +1538,14 @@ func TestProducerWithInterceptors(t *testing.T) { assert.Nil(t, err) defer consumer.Close() - metric := &metricProduceInterceptor{} + interceptor := &trackingProduceInterceptor{} // create producer producer, err := client.CreateProducer(ProducerOptions{ Topic: topic, DisableBatching: false, Interceptors: ProducerInterceptors{ noopProduceInterceptor{}, - metric, + interceptor, }, }) assert.Nil(t, err) @@ -1575,8 +1595,9 @@ func TestProducerWithInterceptors(t *testing.T) { consumer.Ack(msg) } - assert.Equal(t, 10, metric.sendn) - assert.Equal(t, 10, metric.ackn) + assert.Equal(t, 10, interceptor.sendn) + assert.Equal(t, 10, interceptor.ackn) + assert.NotZero(t, interceptor.maxDuration) } func TestProducerSendAfterClose(t *testing.T) { @@ -1719,7 +1740,7 @@ func TestMultipleSchemaOfKeyBasedBatchProducerConsumer(t *testing.T) { } producer.Flush() - //// create consumer + // create consumer consumer, err := client.Subscribe(ConsumerOptions{ Topic: topic, SubscriptionName: "my-sub2", @@ -1810,7 +1831,7 @@ func TestMultipleSchemaProducerConsumer(t *testing.T) { } producer.Flush() - //// create consumer + // create consumer consumer, err := client.Subscribe(ConsumerOptions{ Topic: topic, SubscriptionName: "my-sub2",