From c06f57769b4a53a9d6fc32e00a1f665ca61f33d9 Mon Sep 17 00:00:00 2001 From: Eli Treuherz Date: Tue, 19 Dec 2023 17:36:19 +0000 Subject: [PATCH] Add context to ProducerInterceptor This change adds a context.Context argument to the ProducerInterceptor interface, and passes it between the pre- and post-Send interceptor methods. Having this makes it much easier to write useful interceptors that can integrate with common tracing SDKs like OpenTelemetry, as the context is the conventional method for propagating metadata vertically through a call stack. For an example of another library using a similar convention, see: https://github.com/jackc/pgx/blob/9ab9e3c40bbb33c6f37359c87508cbc6a9830ed6/tracer.go#L10 Fixes #443 --- .../pulsartracing/producer_interceptor.go | 15 ++++-- pulsar/producer_interceptor.go | 15 +++--- pulsar/producer_partition.go | 25 +++++----- pulsar/producer_test.go | 48 ++++++++++++------- 4 files changed, 66 insertions(+), 37 deletions(-) diff --git a/pulsar/internal/pulsartracing/producer_interceptor.go b/pulsar/internal/pulsartracing/producer_interceptor.go index 6c7728cf0a..465ee3db2d 100644 --- a/pulsar/internal/pulsartracing/producer_interceptor.go +++ b/pulsar/internal/pulsartracing/producer_interceptor.go @@ -29,13 +29,20 @@ const toPrefix = "To__" type ProducerInterceptor struct { } -func (t *ProducerInterceptor) BeforeSend(producer pulsar.Producer, message *pulsar.ProducerMessage) { +func (t *ProducerInterceptor) BeforeSend( + ctx context.Context, + producer pulsar.Producer, + message *pulsar.ProducerMessage, +) context.Context { buildAndInjectSpan(message, producer).Finish() + return ctx } -func (t *ProducerInterceptor) OnSendAcknowledgement(producer pulsar.Producer, - message *pulsar.ProducerMessage, - msgID pulsar.MessageID) { +func (t *ProducerInterceptor) OnSendAcknowledgement( + _ context.Context, + _ pulsar.Producer, + _ *pulsar.ProducerMessage, + _ pulsar.MessageID) { } func buildAndInjectSpan(message *pulsar.ProducerMessage, producer pulsar.Producer) opentracing.Span { diff --git a/pulsar/producer_interceptor.go b/pulsar/producer_interceptor.go index e18994cfcd..7358349ef5 100644 --- a/pulsar/producer_interceptor.go +++ b/pulsar/producer_interceptor.go @@ -17,27 +17,30 @@ package pulsar +import "context" + type ProducerInterceptor interface { // BeforeSend This is called before send the message to the brokers. This method is allowed to modify the // message. - BeforeSend(producer Producer, message *ProducerMessage) + BeforeSend(ctx context.Context, producer Producer, message *ProducerMessage) context.Context // OnSendAcknowledgement This method is called when the message sent to the broker has been acknowledged, // or when sending the message fails. - OnSendAcknowledgement(producer Producer, message *ProducerMessage, msgID MessageID) + OnSendAcknowledgement(ctx context.Context, producer Producer, message *ProducerMessage, msgID MessageID) } type ProducerInterceptors []ProducerInterceptor -func (x ProducerInterceptors) BeforeSend(producer Producer, message *ProducerMessage) { +func (x ProducerInterceptors) BeforeSend(ctx context.Context, producer Producer, message *ProducerMessage) context.Context { for i := range x { - x[i].BeforeSend(producer, message) + ctx = x[i].BeforeSend(ctx, producer, message) } + return ctx } -func (x ProducerInterceptors) OnSendAcknowledgement(producer Producer, message *ProducerMessage, msgID MessageID) { +func (x ProducerInterceptors) OnSendAcknowledgement(ctx context.Context, producer Producer, message *ProducerMessage, msgID MessageID) { for i := range x { - x[i].OnSendAcknowledgement(producer, message, msgID) + x[i].OnSendAcknowledgement(ctx, producer, message, msgID) } } diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index 46167d0cf1..c99a669360 100755 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -994,7 +994,7 @@ func (p *partitionProducer) Send(ctx context.Context, msg *ProducerMessage) (Mes isDone := uAtomic.NewBool(false) doneCh := make(chan struct{}) - p.internalSendAsync(ctx, msg, func(ID MessageID, message *ProducerMessage, e error) { + ctx = p.internalSendAsync(ctx, msg, func(ID MessageID, message *ProducerMessage, e error) { if isDone.CAS(false, true) { err = e msgID = ID @@ -1194,11 +1194,11 @@ func (p *partitionProducer) internalSendAsync( msg *ProducerMessage, callback func(MessageID, *ProducerMessage, error), flushImmediately bool, -) { +) context.Context { if err := p.validateMsg(msg); err != nil { p.log.Error(err) runCallback(callback, nil, msg, err) - return + return ctx } sr := sendRequestPool.Get().(*sendRequest) @@ -1216,26 +1216,27 @@ func (p *partitionProducer) internalSendAsync( if err := p.prepareTransaction(sr); err != nil { sr.done(nil, err) - return + return ctx } if p.getProducerState() != producerReady { sr.done(nil, errProducerClosed) - return + return ctx } - p.options.Interceptors.BeforeSend(p, msg) + ctx = p.options.Interceptors.BeforeSend(ctx, p, msg) + sr.ctx = ctx if err := p.updateSchema(sr); err != nil { p.log.Error(err) sr.done(nil, err) - return + return ctx } if err := p.updateUncompressedPayload(sr); err != nil { p.log.Error(err) sr.done(nil, err) - return + return ctx } p.updateMetaData(sr) @@ -1243,16 +1244,18 @@ func (p *partitionProducer) internalSendAsync( if err := p.updateChunkInfo(sr); err != nil { p.log.Error(err) sr.done(nil, err) - return + return ctx } if err := p.reserveResources(sr); err != nil { p.log.Error(err) sr.done(nil, err) - return + return ctx } p.dataChan <- sr + + return ctx } func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt) { @@ -1497,7 +1500,7 @@ func (sr *sendRequest) done(msgID MessageID, err error) { if sr.totalChunks <= 1 || sr.chunkID == sr.totalChunks-1 { if sr.producer.options.Interceptors != nil { - sr.producer.options.Interceptors.OnSendAcknowledgement(sr.producer, sr.msg, msgID) + sr.producer.options.Interceptors.OnSendAcknowledgement(sr.ctx, sr.producer, sr.msg, msgID) } } } diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go index 0f89069243..c115faa55e 100644 --- a/pulsar/producer_test.go +++ b/pulsar/producer_test.go @@ -1480,23 +1480,38 @@ func TestProducuerSendFailOnInvalidKey(t *testing.T) { type noopProduceInterceptor struct{} -func (noopProduceInterceptor) BeforeSend(producer Producer, message *ProducerMessage) {} +func (noopProduceInterceptor) BeforeSend(ctx context.Context, _ Producer, _ *ProducerMessage) context.Context { + return ctx +} -func (noopProduceInterceptor) OnSendAcknowledgement(producer Producer, message *ProducerMessage, msgID MessageID) { +func (noopProduceInterceptor) OnSendAcknowledgement(_ context.Context, _ Producer, _ *ProducerMessage, _ MessageID) { } -// copyPropertyIntercepotr copy all keys in message properties map and add a suffix -type metricProduceInterceptor struct { - sendn int - ackn int +type trackingProduceInterceptor struct { + sendn int + ackn int + maxDuration time.Duration } -func (x *metricProduceInterceptor) BeforeSend(producer Producer, message *ProducerMessage) { - x.sendn++ +type beforeSendCtxKey struct{} + +func (i *trackingProduceInterceptor) BeforeSend(ctx context.Context, _ Producer, msg *ProducerMessage) context.Context { + i.sendn++ + ctx = context.WithValue(ctx, beforeSendCtxKey{}, time.Now()) + return ctx } -func (x *metricProduceInterceptor) OnSendAcknowledgement(producer Producer, message *ProducerMessage, msgID MessageID) { - x.ackn++ +func (i *trackingProduceInterceptor) OnSendAcknowledgement(ctx context.Context, _ Producer, _ *ProducerMessage, _ MessageID) { + var dur time.Duration + if v := ctx.Value(beforeSendCtxKey{}); v != nil { + dur = time.Since(v.(time.Time)) + } + + if dur > i.maxDuration { + i.maxDuration = dur + } + + i.ackn++ } func TestProducerWithInterceptors(t *testing.T) { @@ -1519,14 +1534,14 @@ func TestProducerWithInterceptors(t *testing.T) { assert.Nil(t, err) defer consumer.Close() - metric := &metricProduceInterceptor{} + interceptor := &trackingProduceInterceptor{} // create producer producer, err := client.CreateProducer(ProducerOptions{ Topic: topic, DisableBatching: false, Interceptors: ProducerInterceptors{ noopProduceInterceptor{}, - metric, + interceptor, }, }) assert.Nil(t, err) @@ -1576,8 +1591,9 @@ func TestProducerWithInterceptors(t *testing.T) { consumer.Ack(msg) } - assert.Equal(t, 10, metric.sendn) - assert.Equal(t, 10, metric.ackn) + assert.Equal(t, 10, interceptor.sendn) + assert.Equal(t, 10, interceptor.ackn) + assert.NotZero(t, interceptor.maxDuration) } func TestProducerSendAfterClose(t *testing.T) { @@ -1720,7 +1736,7 @@ func TestMultipleSchemaOfKeyBasedBatchProducerConsumer(t *testing.T) { } producer.Flush() - //// create consumer + // create consumer consumer, err := client.Subscribe(ConsumerOptions{ Topic: topic, SubscriptionName: "my-sub2", @@ -1811,7 +1827,7 @@ func TestMultipleSchemaProducerConsumer(t *testing.T) { } producer.Flush() - //// create consumer + // create consumer consumer, err := client.Subscribe(ConsumerOptions{ Topic: topic, SubscriptionName: "my-sub2",