-
Notifications
You must be signed in to change notification settings - Fork 27
Expand file tree
/
Copy pathbatch_emitter_service.go
More file actions
240 lines (210 loc) · 8.05 KB
/
Copy pathbatch_emitter_service.go
File metadata and controls
240 lines (210 loc) · 8.05 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
package beholder
import (
"context"
"errors"
"fmt"
"sync"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
otelmetric "go.opentelemetry.io/otel/metric"
"github.com/smartcontractkit/chainlink-common/pkg/chipingress"
"github.com/smartcontractkit/chainlink-common/pkg/chipingress/batch"
"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/services"
)
// ChipIngressBatchEmitterService batches events and sends them via chipingress.Client.PublishBatch.
// It implements the Emitter interface.
type ChipIngressBatchEmitterService struct {
services.Service
eng *services.Engine
batchClient *batch.Client
metricAttrsCache sync.Map // map[string]otelmetric.MeasurementOption
metrics batchEmitterMetrics
}
type batchEmitterMetrics struct {
eventsSent otelmetric.Int64Counter
eventsDropped otelmetric.Int64Counter
}
// NewChipIngressBatchEmitterService creates a batch emitter service backed by the given chipingress client.
func NewChipIngressBatchEmitterService(client chipingress.Client, cfg Config, lggr logger.Logger) (*ChipIngressBatchEmitterService, error) {
if client == nil {
return nil, fmt.Errorf("chip ingress client is nil")
}
defaults := DefaultConfig()
bufferSize := int(cfg.ChipIngressBufferSize)
if bufferSize == 0 {
bufferSize = int(defaults.ChipIngressBufferSize)
}
maxBatchSize := int(cfg.ChipIngressMaxBatchSize)
if maxBatchSize == 0 {
maxBatchSize = int(defaults.ChipIngressMaxBatchSize)
}
maxConcurrentSends := cfg.ChipIngressMaxConcurrentSends
if maxConcurrentSends == 0 {
maxConcurrentSends = defaults.ChipIngressMaxConcurrentSends
}
sendInterval := cfg.ChipIngressSendInterval
if sendInterval == 0 {
sendInterval = defaults.ChipIngressSendInterval
}
sendTimeout := cfg.ChipIngressSendTimeout
if sendTimeout == 0 {
sendTimeout = defaults.ChipIngressSendTimeout
}
drainTimeout := cfg.ChipIngressDrainTimeout
if drainTimeout == 0 {
drainTimeout = defaults.ChipIngressDrainTimeout
}
meter := otel.Meter("beholder/chip_ingress_batch_emitter")
metrics, err := newBatchEmitterMetrics(meter)
if err != nil {
return nil, fmt.Errorf("failed to create batch emitter metrics: %w", err)
}
batchClient, err := batch.NewBatchClient(client,
batch.WithBatchSize(maxBatchSize),
batch.WithMessageBuffer(bufferSize),
batch.WithBatchInterval(sendInterval),
batch.WithMaxPublishTimeout(sendTimeout),
batch.WithShutdownTimeout(drainTimeout),
batch.WithMaxConcurrentSends(maxConcurrentSends),
batch.WithEventClone(false),
)
if err != nil {
return nil, fmt.Errorf("failed to create batch client: %w", err)
}
e := &ChipIngressBatchEmitterService{
batchClient: batchClient,
metrics: metrics,
}
e.Service, e.eng = services.Config{
Name: "ChipIngressBatchEmitterService",
Start: e.start,
Close: e.stop,
}.NewServiceEngine(lggr)
return e, nil
}
func (e *ChipIngressBatchEmitterService) start(_ context.Context) error {
// Do not pass the startup ctx — the services contract forbids retaining it
// after Start returns. Use the engine's lifecycle context so the batcher
// is cancelled when the service shuts down (StopChan closes before stop() runs).
ctx, _ := e.eng.NewCtx()
e.batchClient.Start(ctx)
return nil
}
func (e *ChipIngressBatchEmitterService) stop() error {
e.batchClient.Stop()
return nil
}
// Emit queues an event for batched delivery without blocking.
// Returns an error if the emitter is stopped or the context is cancelled.
// If the buffer is full, the event is silently dropped.
func (e *ChipIngressBatchEmitterService) Emit(ctx context.Context, body []byte, attrKVs ...any) error {
return e.emitInternal(ctx, body, nil, attrKVs...)
}
// EmitWithCallback works like Emit but invokes callback once the event's fate
// is determined (nil on success, non-nil on failure or buffer-full drop).
//
// If EmitWithCallback returns a non-nil error, the callback will NOT be invoked.
// If it returns nil, the callback is guaranteed to fire exactly once.
func (e *ChipIngressBatchEmitterService) EmitWithCallback(ctx context.Context, body []byte, callback func(error), attrKVs ...any) error {
return e.emitInternal(ctx, body, callback, attrKVs...)
}
func (e *ChipIngressBatchEmitterService) emitInternal(ctx context.Context, body []byte, callback func(error), attrKVs ...any) error {
return e.eng.IfStarted(func() error {
domain, entity, err := ExtractSourceAndType(attrKVs...)
if err != nil {
return err
}
attributes := newAttributes(attrKVs...)
event, err := chipingress.NewEvent(domain, entity, body, attributes)
if err != nil {
return fmt.Errorf("failed to create CloudEvent: %w", err)
}
eventPb, err := chipingress.EventToProto(event)
if err != nil {
return fmt.Errorf("failed to convert to proto: %w", err)
}
if err := ctx.Err(); err != nil {
return err
}
metricAttrs := e.metricAttrsFor(domain, entity)
queueErr := e.batchClient.QueueMessage(eventPb, func(sendErr error) {
// The callback fires asynchronously after the batch is sent,
// so the caller's ctx may already be cancelled. Use ctx directly
// for metric recording — OTel Add is non-blocking and tolerates
// cancelled contexts.
if sendErr != nil {
var pubErr *batch.PublishError
if errors.As(sendErr, &pubErr) {
// Per-event partial delivery failure: server accepted the batch but
// rejected this individual event (e.g. schema validation, encode error).
e.metrics.eventsDropped.Add(ctx, 1, e.dropMetricAttrsFor(domain, entity, "partial_delivery"))
e.eng.Errorw("failed to emit to chip ingress (partial delivery)",
"error_code", pubErr.Code.String(),
"reason", pubErr.Reason,
"domain", domain,
"entity", entity,
)
} else {
// Whole-batch RPC failure (network error, timeout, auth, etc.).
e.metrics.eventsDropped.Add(ctx, 1, e.dropMetricAttrsFor(domain, entity, "rpc_error"))
e.eng.Errorw("failed to emit to chip ingress", "error", sendErr, "domain", domain, "entity", entity)
}
} else {
e.metrics.eventsSent.Add(ctx, 1, metricAttrs)
}
if callback != nil {
callback(sendErr)
}
})
if queueErr != nil {
e.metrics.eventsDropped.Add(ctx, 1, metricAttrs)
e.eng.Errorw("failed to queue message for chip ingress", "error", queueErr, "domain", domain, "entity", entity)
if callback != nil {
callback(queueErr)
}
}
return nil
})
}
func (e *ChipIngressBatchEmitterService) metricAttrsFor(domain, entity string) otelmetric.MeasurementOption {
key := domain + "\x00" + entity
if v, ok := e.metricAttrsCache.Load(key); ok {
return v.(otelmetric.MeasurementOption)
}
attrs := otelmetric.WithAttributeSet(attribute.NewSet(
attribute.String("domain", domain),
attribute.String("entity", entity),
))
v, _ := e.metricAttrsCache.LoadOrStore(key, attrs)
return v.(otelmetric.MeasurementOption)
}
// dropMetricAttrsFor returns a measurement option for the eventsDropped counter that
// includes domain, entity, and a failure_type label distinguishing partial per-event
// failures ("partial_delivery") from whole-batch RPC failures ("rpc_error").
// Not cached — drop paths are not on the hot path.
func (e *ChipIngressBatchEmitterService) dropMetricAttrsFor(domain, entity, failureType string) otelmetric.MeasurementOption {
return otelmetric.WithAttributeSet(attribute.NewSet(
attribute.String("domain", domain),
attribute.String("entity", entity),
attribute.String("failure_type", failureType),
))
}
func newBatchEmitterMetrics(meter otelmetric.Meter) (batchEmitterMetrics, error) {
eventsSent, err := meter.Int64Counter("chip_ingress.events_sent",
otelmetric.WithDescription("Total events successfully sent via PublishBatch"),
otelmetric.WithUnit("{event}"))
if err != nil {
return batchEmitterMetrics{}, err
}
eventsDropped, err := meter.Int64Counter("chip_ingress.events_dropped",
otelmetric.WithDescription("Total events dropped (buffer full or send failure)"),
otelmetric.WithUnit("{event}"))
if err != nil {
return batchEmitterMetrics{}, err
}
return batchEmitterMetrics{
eventsSent: eventsSent,
eventsDropped: eventsDropped,
}, nil
}