-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathlistener.go
More file actions
715 lines (595 loc) · 23.1 KB
/
listener.go
File metadata and controls
715 lines (595 loc) · 23.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
package kafka
import (
"context"
"errors"
"fmt"
"log/slog"
"runtime/debug"
"strconv"
"strings"
"sync"
"time"
"github.com/IBM/sarama"
)
var errNoForwardTarget = errors.New("no forward target configured")
type processingResult struct {
commit bool
err error
}
type HandlerConfig struct {
ConsumerMaxRetries *int
DurationBeforeRetry *time.Duration
ExponentialBackoff bool
BackoffFunc BackoffFunc
RetryTopic string
DeadletterTopic string
}
// Handler Processor that handle received kafka messages
// Handler Config can be used to override global configuration for a specific handler
type Handler struct {
Processor func(ctx context.Context, msg *sarama.ConsumerMessage) error
Config HandlerConfig
}
// BackoffFunc is the function used to calculate backoff duration when ExponentialBackoff is true.
// If nil, the global ExponentialBackoffFunc (using sarama.NewExponentialBackoff) will be used.
// The function signature is: func(retries, maxRetries int) time.Duration
type BackoffFunc func(retries, maxRetries int) time.Duration
// Handlers defines a handler for a given topic
type Handlers map[string]Handler
// listener object represents kafka consumer.
// Listener implements both the Listener interface and ConsumerGroupHandler from sarama.
type listener struct {
consumerGroup sarama.ConsumerGroup
deadletterProducer Producer
topics []string
handlers Handlers
groupID string
instrumenting *ConsumerMetricsService
tracer TracingFunc
logContextStorer LogContextStorer
done chan struct{}
closeOnce sync.Once
}
// Listener is able to listen multiple topics with one handler by topic
type Listener interface {
Listen(ctx context.Context) error
Close()
GroupID() string
}
type LogContextStorer func(ctx context.Context, logger *slog.Logger) context.Context
// NewListener creates a new instance of Listener
func NewListener(groupID string, handlers Handlers, options ...ListenerOption) (Listener, error) {
if groupID == "" {
return nil, errors.New("cannot create new listener, group_id cannot be empty")
}
if len(handlers) == 0 {
return nil, errors.New("cannot create new listener, handlers cannot be empty")
}
// Initialize the topics to consume
var topics []string
for k := range handlers {
topics = append(topics, k)
}
c, err := getClient()
if err != nil {
return nil, err
}
producer, err := NewProducer(WithDeadletterProducerInstrumenting())
if err != nil {
return nil, err
}
consumerGroup, err := sarama.NewConsumerGroupFromClient(groupID, c)
if err != nil {
producer.Close()
return nil, err
}
done := make(chan struct{})
go func() {
for {
select {
case err, ok := <-consumerGroup.Errors():
if !ok {
return
}
if err != nil {
slog.Error("sarama consumer error", "error", err, logFieldName("consumerGroup", "consumer_group"), groupID)
}
case <-done:
return
}
}
}()
// Fill handler config unset elements with global default values.
fillHandlerConfigWithDefault(handlers)
// Log configuration for each topic
logHandlersConfig(groupID, handlers)
// Sanity check for error topics, to avoid infinite loop
err = checkErrorTopicToAvoidInfiniteLoop(handlers)
if err != nil {
close(done)
_ = consumerGroup.Close()
producer.Close()
return nil, err
}
l := &listener{
groupID: groupID,
deadletterProducer: producer,
handlers: handlers,
consumerGroup: consumerGroup,
topics: topics,
done: done,
}
// execute all method passed as option
for _, o := range options {
o(l)
}
return l, nil
}
// GroupID returns the groupID of the listener
func (l *listener) GroupID() string {
return l.groupID
}
// checkErrorTopicToAvoidInfiniteLoop checks if the error topic is configured correctly.
func checkErrorTopicToAvoidInfiniteLoop(handlers Handlers) error {
for topic, handler := range handlers {
if handler.Config.RetryTopic == topic {
return fmt.Errorf("%w: %s", ErrRetryTopicCollision, topic)
}
if handler.Config.DeadletterTopic == topic {
return fmt.Errorf("%w: %s", ErrDeadletterTopicCollision, topic)
}
}
return nil
}
// fillHandlerConfigWithDefault fills the handler config with the default values.
func fillHandlerConfigWithDefault(handlers Handlers) {
for k, h := range handlers {
if h.Config.ConsumerMaxRetries == nil {
maxRetries := ConsumerMaxRetries
h.Config.ConsumerMaxRetries = &maxRetries
}
if h.Config.DurationBeforeRetry == nil {
duration := DurationBeforeRetry
h.Config.DurationBeforeRetry = &duration
}
handlers[k] = h
}
}
// groupIDAndTopicReplacer creates a replacer for the groupID and topic.
func groupIDAndTopicReplacer(groupID, topic string) *strings.Replacer {
return strings.NewReplacer("$$CG$$", groupID, "$$T$$", topic)
}
// logHandlersConfig logs the retry configuration for each topic handler.
func logHandlersConfig(groupID string, handlers Handlers) {
for topic, handler := range handlers {
retryMode := "finite"
maxRetries := *handler.Config.ConsumerMaxRetries
if maxRetries == InfiniteRetries {
retryMode = "infinite"
}
r := groupIDAndTopicReplacer(groupID, topic)
retryTopic := handler.Config.RetryTopic
if retryTopic == "" {
if PushConsumerErrorsToRetryTopic {
retryTopic = r.Replace(RetryTopicPattern)
} else {
retryTopic = "disabled"
}
}
deadletterTopic := handler.Config.DeadletterTopic
if deadletterTopic == "" {
if PushConsumerErrorsToDeadletterTopic {
deadletterTopic = r.Replace(DeadletterTopicPattern)
} else {
deadletterTopic = "disabled"
}
}
// Build backoff description
backoffDesc := handler.Config.DurationBeforeRetry.String()
if handler.Config.ExponentialBackoff {
backoffDesc = fmt.Sprintf("%s -> %s (exponential)", handler.Config.DurationBeforeRetry, MaxBackoffDuration)
}
slog.Info("topic handler configuration",
logFieldName("consumerGroup", "consumer_group"), groupID,
"topic", topic,
logFieldName("retryMode", "retry_mode"), retryMode,
logFieldName("maxRetries", "max_retries"), maxRetries,
"backoff", backoffDesc,
logFieldName("retryTopic", "retry_topic"), retryTopic,
logFieldName("deadletterTopic", "deadletter_topic"), deadletterTopic,
)
}
}
// ListenerOption add listener option
type ListenerOption func(l *listener)
// Listen process incoming kafka messages with handlers configured by the listener
func (l *listener) Listen(consumerContext context.Context) error {
if l.consumerGroup == nil {
return errors.New("consumerGroup is nil, cannot listen")
}
slog.Info("starting listener", logFieldName("consumerGroup", "consumer_group"), l.groupID, "topics", l.topics)
// When a session is over, make consumer join a new session, as long as the context is not cancelled
for {
// Consume make this consumer join the next session
// This block until the `session` is over. (basically until next rebalance)
err := l.consumerGroup.Consume(consumerContext, l.topics, l)
if err != nil {
slog.Error("consumer group consume error", "error", err, logFieldName("consumerGroup", "consumer_group"), l.groupID)
return err
}
err = consumerContext.Err()
if err != nil {
// Check if context is cancelled
slog.Info("listener stopping (context cancelled)", logFieldName("consumerGroup", "consumer_group"), l.groupID)
return err
}
slog.Debug("consumer group session ended, rejoining", logFieldName("consumerGroup", "consumer_group"), l.groupID)
}
}
// Close shuts down the listener, its consumer group, and the internal error-draining goroutine.
// Close must be called to avoid goroutine leaks.
func (l *listener) Close() {
l.closeOnce.Do(func() {
if l.done != nil {
close(l.done)
}
})
if l.deadletterProducer != nil {
l.deadletterProducer.Close()
}
if l.consumerGroup != nil {
err := l.consumerGroup.Close()
if err != nil {
slog.Error("failed to close consumer group", "error", err, logFieldName("consumerGroup", "consumer_group"), l.groupID)
} else {
slog.Debug("consumer group closed", logFieldName("consumerGroup", "consumer_group"), l.groupID)
}
}
}
// The `Setup`, `Cleanup` and `ConsumeClaim` are actually implementation of ConsumerGroupHandler from sarama
// Copied from From the sarama lib:
//
// ConsumerGroupHandler instances are used to handle individual topic/partition claims.
// It also provides hooks for your consumer group session life-cycle and allow you to
// trigger logic before or after the consume loop(s).
//
// PLEASE NOTE that handlers are likely be called from several goroutines concurrently,
// ensure that all state is safely protected against race conditions.
// Setup is run at the beginning of a new session, before ConsumeClaim
func (l *listener) Setup(session sarama.ConsumerGroupSession) error {
slog.Debug("consumer group session started",
logFieldName("consumerGroup", "consumer_group"), l.groupID,
logFieldName("generationID", "generation_id"), session.GenerationID(),
logFieldName("memberID", "member_id"), session.MemberID(),
"claims", session.Claims(),
)
return nil
}
// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (l *listener) Cleanup(session sarama.ConsumerGroupSession) error {
slog.Debug("consumer group session ended",
logFieldName("consumerGroup", "consumer_group"), l.groupID,
logFieldName("generationID", "generation_id"), session.GenerationID(),
logFieldName("memberID", "member_id"), session.MemberID(),
)
return nil
}
// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
func (l *listener) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
slog.Debug("starting to consume partition",
logFieldName("consumerGroup", "consumer_group"), l.groupID,
"topic", claim.Topic(),
"partition", claim.Partition(),
logFieldName("initialOffset", "initial_offset"), claim.InitialOffset(),
)
for msg := range claim.Messages() {
l.onNewMessage(msg, session)
}
slog.Debug("stopped consuming partition",
logFieldName("consumerGroup", "consumer_group"), l.groupID,
"topic", claim.Topic(),
"partition", claim.Partition(),
)
return nil
}
// onNewMessage processes a new message.
// it the entry point for the message processing lifecycle.
func (l *listener) onNewMessage(msg *sarama.ConsumerMessage, session sarama.ConsumerGroupSession) {
ctx := l.enrichContext(session.Context(), msg)
ctx, endSpan := l.startMessageSpan(ctx, msg)
defer endSpan()
result := l.processMessage(ctx, msg)
if level, ok := logLevelForProcessingOutcome(result.err); ok {
switch level {
case slog.LevelInfo:
loggerFromContext(ctx).Info("message processing completed with expected terminal state", "error", result.err, logFieldName("errorType", "error_type"), errorType(result.err))
case slog.LevelError:
loggerFromContext(ctx).Error(result.err.Error(), logFieldName("errorType", "error_type"), errorType(result.err))
}
}
if result.commit {
session.MarkMessage(msg, "")
loggerFromContext(ctx).Debug("message offset committed")
}
}
// enrichContext builds a context carrying the enriched logger with Kafka message metadata.
// If a LogContextStorer is configured, it is also called so the user's handler can retrieve the logger.
func (l *listener) enrichContext(ctx context.Context, msg *sarama.ConsumerMessage) context.Context {
info := kafkaMessageInfo{ConsumerGroup: l.groupID}
if msg != nil {
info.Topic = msg.Topic
info.Partition = msg.Partition
info.Offset = msg.Offset
info.Key = string(msg.Key)
}
kLogger := slog.With("kafka", info)
ctx = context.WithValue(ctx, loggerKey{}, kLogger)
if l.logContextStorer != nil {
ctx = l.logContextStorer(ctx, kLogger)
}
return ctx
}
// processMessage handles the full lifecycle of a single message: retry loop, error
// classification, metrics, and forwarding to retry/deadletter topics.
func (l *listener) processMessage(ctx context.Context, msg *sarama.ConsumerMessage) processingResult {
handler := l.handlers[msg.Topic]
err := l.handleMessageWithRetry(ctx, handler, msg, *handler.Config.ConsumerMaxRetries, handler.Config.ExponentialBackoff)
if err != nil {
return l.handleErrorMessage(ctx, err, handler, msg)
}
loggerFromContext(ctx).Debug("message processed successfully")
return processingResult{commit: true}
}
// handleErrorMessage handles the error of a message after the retry loop.
// forward to retry or deadletter topic following the error classification and strategy configured.
func (l *listener) handleErrorMessage(ctx context.Context, initialError error, handler Handler, msg *sarama.ConsumerMessage) processingResult {
l.incErrorCounter(msg, initialError)
if isOmittedError(initialError) {
l.incOmittedCounter(msg)
return processingResult{err: initialError, commit: true}
}
if errors.Is(initialError, context.Canceled) {
return processingResult{err: context.Canceled}
}
if isRetriableError(initialError) {
err := l.tryForwardToRetry(ctx, handler, msg, initialError)
if err == nil {
return processingResult{commit: true}
}
if !errors.Is(err, errNoForwardTarget) {
return processingResult{err: fmt.Errorf("forward to retry topic failed: %w", err)}
}
}
err := l.tryForwardToDeadletter(ctx, handler, msg, initialError)
if err == nil {
return processingResult{commit: true}
}
if !errors.Is(err, errNoForwardTarget) {
return processingResult{err: fmt.Errorf("forward to deadletter topic failed: %w", err)}
}
loggerFromContext(ctx).Warn("message dropped: no retry or deadletter topic configured", "error", initialError, logFieldName("errorType", "error_type"), errorType(initialError))
l.incDroppedCounter(msg)
return processingResult{err: initialError, commit: true}
}
// tryForwardToRetry resolves the retry topic and forwards the message to it.
// Returns errNoForwardTarget if no retry topic is configured. On producer failure, it retries
// with exponential backoff until the message is published or the context is cancelled.
func (l *listener) tryForwardToRetry(ctx context.Context, handler Handler, msg *sarama.ConsumerMessage, initialError error) error {
topicName := handler.Config.RetryTopic
if topicName == "" {
if !PushConsumerErrorsToRetryTopic {
return errNoForwardTarget
}
topicName = l.deduceTopicNameFromPattern(msg.Topic, RetryTopicPattern)
}
if topicName == "" {
return errNoForwardTarget
}
loggerFromContext(ctx).Error("forwarding message to retry topic due to error", "error", initialError, logFieldName("retryTopic", "retry_topic"), topicName)
return l.forwardWithRetry(ctx, msg, topicName, "retry")
}
// tryForwardToDeadletter resolves the deadletter topic and forwards the message to it.
// Returns errNoForwardTarget if no deadletter topic is configured. On producer failure, it retries
// with exponential backoff until the message is published or the context is cancelled.
func (l *listener) tryForwardToDeadletter(ctx context.Context, handler Handler, msg *sarama.ConsumerMessage, initialError error) error {
topicName := handler.Config.DeadletterTopic
if topicName == "" && PushConsumerErrorsToDeadletterTopic {
topicName = l.deduceTopicNameFromPattern(msg.Topic, DeadletterTopicPattern)
}
if topicName == "" {
return errNoForwardTarget
}
loggerFromContext(ctx).Error("forwarding message to deadletter topic due to error", "error", initialError, logFieldName("deadletterTopic", "deadletter_topic"), topicName)
return l.forwardWithRetry(ctx, msg, topicName, "deadletter")
}
// forwardWithRetry forwards a message to the given topic, retrying with exponential
// backoff on failure. It blocks until the message is successfully produced or the
// context is cancelled. This guarantees the message is not lost on transient producer errors.
func (l *listener) forwardWithRetry(ctx context.Context, msg *sarama.ConsumerMessage, topicName, kind string) error {
backoff := min(DurationBeforeRetry, ForwardMaxBackoffDuration)
attempt := 0
for {
err := l.forwardToTopic(ctx, msg, topicName)
if err == nil {
if attempt > 0 {
loggerFromContext(ctx).Debug("message forwarded to "+kind+" topic after retry", logForwardTopicField(kind), topicName, "attempts", attempt+1)
}
return nil
}
attempt++
loggerFromContext(ctx).Error("failed to forward message to "+kind+" topic, will retry",
"error", err, logForwardTopicField(kind), topicName, "attempt", attempt, "backoff", backoff.String())
timer := time.NewTimer(backoff)
select {
case <-timer.C:
case <-ctx.Done():
timer.Stop()
loggerFromContext(ctx).Warn("context cancelled while retrying forward to "+kind+" topic",
logForwardTopicField(kind), topicName, "attempts", attempt)
return ctx.Err()
}
backoff = min(backoff*2, ForwardMaxBackoffDuration)
}
}
// incOmittedCounter increments the omitted counter if necessary.
func (l *listener) incOmittedCounter(msg *sarama.ConsumerMessage) {
if l.instrumenting != nil && l.instrumenting.recordOmittedCounter != nil {
l.instrumenting.recordOmittedCounter.With(map[string]string{"kafka_topic": msg.Topic, "consumer_group": l.groupID}).Inc()
}
}
// incErrorCounter increments the error counter if necessary.
func (l *listener) incErrorCounter(msg *sarama.ConsumerMessage, err error) {
if l.instrumenting != nil && l.instrumenting.recordErrorCounter != nil && !isOmittedError(err) {
l.instrumenting.recordErrorCounter.With(map[string]string{"kafka_topic": msg.Topic, "consumer_group": l.groupID}).Inc()
}
}
// incDroppedCounter increments the dropped counter if necessary.
func (l *listener) incDroppedCounter(msg *sarama.ConsumerMessage) {
if l.instrumenting != nil && l.instrumenting.recordDroppedCounter != nil {
l.instrumenting.recordDroppedCounter.With(map[string]string{"kafka_topic": msg.Topic, "consumer_group": l.groupID}).Inc()
}
}
// deduceTopicNameFromPattern deduces the topic name.
func (l *listener) deduceTopicNameFromPattern(topic, pattern string) string {
r := groupIDAndTopicReplacer(l.groupID, topic)
return r.Replace(pattern)
}
// forwardToTopic forwards a message to the given topic.
func (l *listener) forwardToTopic(ctx context.Context, msg *sarama.ConsumerMessage, topicName string) error {
// Inject current trace context so the forwarded message
// is linked to the processing span that failed.
traceHeaders := GetKafkaHeadersFromContext(ctx)
traceKeys := make(map[string]struct{}, len(traceHeaders))
for _, h := range traceHeaders {
traceKeys[string(h.Key)] = struct{}{}
}
headers := make([]sarama.RecordHeader, 0, len(msg.Headers)+len(traceHeaders))
for _, h := range msg.Headers {
if h != nil {
if _, isTrace := traceKeys[string(h.Key)]; !isTrace {
headers = append(headers, *h)
}
}
}
headers = append(headers, traceHeaders...)
return l.deadletterProducer.Produce(ctx, &sarama.ProducerMessage{
Key: sarama.ByteEncoder(msg.Key),
Value: sarama.ByteEncoder(msg.Value),
Topic: topicName,
Headers: headers,
})
}
// handleMessageWithRetry calls the handler function and retries on failure using a loop.
func (l *listener) handleMessageWithRetry(ctx context.Context, handler Handler, msg *sarama.ConsumerMessage, retries int, exponentialBackoff bool) error {
retryNumber := 0
for {
if ctx.Err() != nil {
return ctx.Err()
}
loggerFromContext(ctx).Debug("processing message")
err := l.safeProcess(ctx, handler, msg)
if err == nil {
return nil
}
if !shouldRetry(retries, err) {
return err
}
retryWaitDuration := retryDuration(handler, retryNumber, exponentialBackoff)
remainingRetries := "infinite"
if retries != InfiniteRetries {
retries--
remainingRetries = strconv.Itoa(retries)
}
retryNumber++
loggerFromContext(ctx).Error("message processing failed, will retry",
"error", err,
logFieldName("retryNumber", "retry_number"), retryNumber,
logFieldName("remainingRetries", "remaining_retries"), remainingRetries,
logFieldName("retryWaitDuration", "retry_wait_duration"), retryWaitDuration.Round(10*time.Millisecond).String(),
logFieldName("exponentialBackoff", "exponential_backoff"), exponentialBackoff,
)
// Use time.NewTimer instead of time.After to avoid leaking the timer
// when the context is cancelled during the backoff wait.
retryWaitTimer := time.NewTimer(retryWaitDuration)
select {
case <-retryWaitTimer.C:
case <-ctx.Done():
retryWaitTimer.Stop()
return ctx.Err()
}
}
}
// safeProcess wraps handler.Processor with panic recovery.
func (l *listener) safeProcess(ctx context.Context, handler Handler, msg *sarama.ConsumerMessage) (err error) {
defer func() {
if r := recover(); r != nil {
err = fmt.Errorf("panic happened during handle of message: %v", r)
loggerFromContext(ctx).Error("panic recovered during message processing", "panic", r, "stack", string(debug.Stack()))
}
}()
return handler.Processor(ctx, msg)
}
// shouldRetry returns whether another handler attempt is allowed given the
// remaining retry budget and the error classification.
func shouldRetry(retries int, err error) bool {
if retries == 0 {
return false
}
if isUnretriableError(err) || isOmittedError(err) {
return false
}
return true
}
// logLevelForProcessingOutcome classifies a processing error into a log level and indicates whether it should be logged.
func logLevelForProcessingOutcome(err error) (slog.Level, bool) {
if err == nil {
return 0, false
}
if isOmittedError(err) || errors.Is(err, errNoForwardTarget) {
return slog.LevelInfo, true
}
return slog.LevelError, true
}
// retryDuration returns the wait duration before the next retry attempt, capped
// by MaxBackoffDuration. When exponentialBackoff is true, it delegates to
// getBackoffDuration (which already respects the cap internally); otherwise it
// uses the handler's fixed DurationBeforeRetry with the same cap applied.
func retryDuration(handler Handler, retryNumber int, exponentialBackoff bool) time.Duration {
if exponentialBackoff {
consumerMaxRetries := ConsumerMaxRetries
if handler.Config.ConsumerMaxRetries != nil {
consumerMaxRetries = *handler.Config.ConsumerMaxRetries
}
return getBackoffDuration(handler, retryNumber, consumerMaxRetries)
}
durationBeforeRetry := DurationBeforeRetry
if handler.Config.DurationBeforeRetry != nil {
durationBeforeRetry = *handler.Config.DurationBeforeRetry
}
if durationBeforeRetry > MaxBackoffDuration {
return MaxBackoffDuration
}
return durationBeforeRetry
}
var defaultBackoffOnce sync.Once
var defaultBackoffFunc BackoffFunc
// getBackoffDuration returns the exponential backoff duration using (in priority order):
// 1. The handler's custom BackoffFunc
// 2. The global ExponentialBackoffFunc (if set by the client)
// 3. A lazily-created sarama.NewExponentialBackoff using the current DurationBeforeRetry/MaxBackoffDuration
func getBackoffDuration(handler Handler, retryNumber, maxRetries int) time.Duration {
if handler.Config.BackoffFunc != nil {
return handler.Config.BackoffFunc(retryNumber, maxRetries)
}
if ExponentialBackoffFunc != nil {
return ExponentialBackoffFunc(retryNumber, maxRetries)
}
durationBeforeRetry := DurationBeforeRetry
if handler.Config.DurationBeforeRetry != nil {
durationBeforeRetry = *handler.Config.DurationBeforeRetry
}
defaultBackoffOnce.Do(func() {
defaultBackoffFunc = sarama.NewExponentialBackoff(durationBeforeRetry, MaxBackoffDuration)
})
return defaultBackoffFunc(retryNumber, maxRetries)
}