-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathoptions.go
More file actions
31 lines (27 loc) · 1.07 KB
/
options.go
File metadata and controls
31 lines (27 loc) · 1.07 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package kafka
// WithInstrumenting adds the instrumenting layer on a listener.
// Handlers are wrapped once at creation time rather than per-message.
func WithInstrumenting() ListenerOption {
return func(l *listener) {
l.instrumenting = NewConsumerMetricsService(l.groupID)
for topic, handler := range l.handlers {
l.handlers[topic] = l.instrumenting.Instrumentation(handler)
}
}
}
// ProducerOption is a function that is passed to the producer constructor to configure it.
type ProducerOption func(p *producer)
// WithProducerInstrumenting adds the instrumenting layer on a producer.
func WithProducerInstrumenting() ProducerOption {
return func(p *producer) {
p.instrumenting = NewProducerMetricsService()
p.handler = p.instrumenting.Instrumentation(p.handler)
}
}
// WithDeadletterProducerInstrumenting adds the instrumenting layer on a deadletter producer.
func WithDeadletterProducerInstrumenting() ProducerOption {
return func(p *producer) {
p.instrumenting = NewDeadletterProducerMetricsService()
p.handler = p.instrumenting.DeadletterInstrumentation(p.handler)
}
}