@@ -81,6 +81,8 @@ type Appender struct {
8181 // tracer is an OTel tracer, and should not be confused with `a.config.Tracer`
8282 // which is an Elastic APM Tracer.
8383 tracer trace.Tracer
84+
85+ metricAttributeSet metric.MeasurementOption
8486}
8587
8688// New returns a new Appender that indexes documents into Elasticsearch.
@@ -121,12 +123,12 @@ func New(client elastictransport.Interface, cfg Config) (*Appender, error) {
121123 bulkItems : make (chan BulkIndexerItem , cfg .DocumentBufferSize ),
122124 metrics : ms ,
123125 }
126+ indexer .metricAttributeSet = metric .WithAttributeSet (indexer .config .MetricAttributes )
124127 // Use the Appender's pointer as the unique ID for the BulkIndexerPool.
125128 // Register the Appender ID in the pool.
126129 indexer .id = fmt .Sprintf ("%p" , indexer )
127130 indexer .pool .Register (indexer .id )
128- attrs := metric .WithAttributeSet (indexer .config .MetricAttributes )
129- indexer .metrics .availableBulkRequests .Add (context .Background (), int64 (cfg .MaxRequests ), attrs )
131+ indexer .metrics .availableBulkRequests .Add (context .Background (), int64 (cfg .MaxRequests ), indexer .metricAttributeSet )
130132 // We create a cancellable context for the errgroup.Group for unblocking
131133 // flushes when Close returns. We intentionally do not use errgroup.WithContext,
132134 // because one flush failure should not cause the context to be cancelled.
@@ -169,7 +171,7 @@ func (a *Appender) Close(ctx context.Context) error {
169171 <- ctx .Done ()
170172 }()
171173
172- defer a .metrics .availableBulkRequests .Add (context .Background (), - int64 (a .config .MaxRequests ), metric . WithAttributeSet ( a . config . MetricAttributes ) )
174+ defer a .metrics .availableBulkRequests .Add (context .Background (), - int64 (a .config .MaxRequests ), a . metricAttributeSet )
173175
174176 if err := a .errgroup .Wait (); err != nil {
175177 return err
@@ -210,8 +212,7 @@ func (a *Appender) Add(ctx context.Context, index string, document io.WriterTo)
210212 Body : document ,
211213 }
212214 if len (a .bulkItems ) == cap (a .bulkItems ) {
213- attrs := metric .WithAttributeSet (a .config .MetricAttributes )
214- a .metrics .blockedAdd .Add (context .Background (), 1 , attrs )
215+ a .metrics .blockedAdd .Add (context .Background (), 1 , a .metricAttributeSet )
215216 }
216217
217218 select {
@@ -223,9 +224,8 @@ func (a *Appender) Add(ctx context.Context, index string, document io.WriterTo)
223224 }
224225
225226 a .docsAdded .Add (1 )
226- attrs := metric .WithAttributeSet (a .config .MetricAttributes )
227- a .metrics .docsAdded .Add (context .Background (), 1 , attrs )
228- a .metrics .docsActive .Add (context .Background (), 1 , attrs )
227+ a .metrics .docsAdded .Add (context .Background (), 1 , a .metricAttributeSet )
228+ a .metrics .docsActive .Add (context .Background (), 1 , a .metricAttributeSet )
229229
230230 return nil
231231}
@@ -240,8 +240,7 @@ func (a *Appender) flush(ctx context.Context, bulkIndexer *BulkIndexer) error {
240240 return nil
241241 }
242242 defer func () {
243- attrs := metric .WithAttributeSet (a .config .MetricAttributes )
244- a .metrics .bulkRequests .Add (context .Background (), 1 , attrs )
243+ a .metrics .bulkRequests .Add (context .Background (), 1 , a .metricAttributeSet )
245244 }()
246245
247246 logger := a .config .Logger
@@ -275,18 +274,15 @@ func (a *Appender) flush(ctx context.Context, bulkIndexer *BulkIndexer) error {
275274 // Record the BulkIndexer buffer's length as the bytesTotal metric after
276275 // the request has been flushed.
277276 if flushed := bulkIndexer .BytesFlushed (); flushed > 0 {
278- attrs := metric .WithAttributeSet (a .config .MetricAttributes )
279- a .metrics .bytesTotal .Add (context .Background (), int64 (flushed ), attrs )
277+ a .metrics .bytesTotal .Add (context .Background (), int64 (flushed ), a .metricAttributeSet )
280278 }
281279 // Record the BulkIndexer uncompressed bytes written to the buffer
282280 // as the bytesUncompressedTotal metric after the request has been flushed.
283281 if flushed := bulkIndexer .BytesUncompressedFlushed (); flushed > 0 {
284- attrs := metric .WithAttributeSet (a .config .MetricAttributes )
285- a .metrics .bytesUncompressedTotal .Add (context .Background (), int64 (flushed ), attrs )
282+ a .metrics .bytesUncompressedTotal .Add (context .Background (), int64 (flushed ), a .metricAttributeSet )
286283 }
287284 if err != nil {
288- attrs := metric .WithAttributeSet (a .config .MetricAttributes )
289- a .metrics .docsActive .Add (context .Background (), - int64 (n ), attrs )
285+ a .metrics .docsActive .Add (context .Background (), - int64 (n ), a .metricAttributeSet )
290286 logger .Error ("bulk indexing request failed" , zap .Error (err ))
291287 if a .otelTracingEnabled () && span .IsRecording () {
292288 span .RecordError (err )
@@ -298,7 +294,7 @@ func (a *Appender) flush(ctx context.Context, bulkIndexer *BulkIndexer) error {
298294 context .Background (),
299295 int64 (n ),
300296 metric .WithAttributes (attribute .String ("status" , "Timeout" )),
301- metric . WithAttributeSet ( a . config . MetricAttributes ) ,
297+ a . metricAttributeSet ,
302298 )
303299 }
304300
@@ -323,7 +319,7 @@ func (a *Appender) flush(ctx context.Context, bulkIndexer *BulkIndexer) error {
323319 attribute .String ("status" , status ),
324320 semconv .HTTPResponseStatusCode (errFailed .statusCode ),
325321 ),
326- metric . WithAttributeSet ( a . config . MetricAttributes ) ,
322+ a . metricAttributeSet ,
327323 )
328324 }
329325 }
@@ -343,8 +339,7 @@ func (a *Appender) flush(ctx context.Context, bulkIndexer *BulkIndexer) error {
343339 }
344340 docsFailed = int64 (len (resp .FailedDocs ))
345341 totalFlushed := docsFailed + docsIndexed
346- attrs := metric .WithAttributeSet (a .config .MetricAttributes )
347- a .metrics .docsActive .Add (context .Background (), - totalFlushed , attrs )
342+ a .metrics .docsActive .Add (context .Background (), - totalFlushed , a .metricAttributeSet )
348343 for _ , info := range resp .FailedDocs {
349344 if info .Status >= 400 && info .Status < 500 {
350345 if info .Status == http .StatusTooManyRequests {
@@ -374,15 +369,15 @@ func (a *Appender) flush(ctx context.Context, bulkIndexer *BulkIndexer) error {
374369 a .metrics .docsRetried .Add (
375370 context .Background (),
376371 resp .RetriedDocs ,
377- metric . WithAttributeSet ( a . config . MetricAttributes ) ,
372+ a . metricAttributeSet ,
378373 metric .WithAttributes (attribute .Int ("greatest_retry" , resp .GreatestRetry )),
379374 )
380375 }
381376 if docsIndexed > 0 {
382377 a .metrics .docsIndexed .Add (
383378 context .Background (),
384379 docsIndexed ,
385- metric . WithAttributeSet ( a . config . MetricAttributes ) ,
380+ a . metricAttributeSet ,
386381 metric .WithAttributes (attribute .String ("status" , "Success" )),
387382 )
388383 }
@@ -392,23 +387,23 @@ func (a *Appender) flush(ctx context.Context, bulkIndexer *BulkIndexer) error {
392387 context .Background (),
393388 tooManyRequests ,
394389 metric .WithAttributes (attribute .String ("status" , "TooMany" )),
395- metric . WithAttributeSet ( a . config . MetricAttributes ) ,
390+ a . metricAttributeSet ,
396391 )
397392 }
398393 if clientFailed > 0 {
399394 a .metrics .docsIndexed .Add (
400395 context .Background (),
401396 clientFailed ,
402397 metric .WithAttributes (attribute .String ("status" , "FailedClient" )),
403- metric . WithAttributeSet ( a . config . MetricAttributes ) ,
398+ a . metricAttributeSet ,
404399 )
405400 }
406401 if serverFailed > 0 {
407402 a .metrics .docsIndexed .Add (
408403 context .Background (),
409404 serverFailed ,
410405 metric .WithAttributes (attribute .String ("status" , "FailedServer" )),
411- metric . WithAttributeSet ( a . config . MetricAttributes ) ,
406+ a . metricAttributeSet ,
412407 )
413408 }
414409 if failureStoreDocs .Used > 0 {
@@ -419,7 +414,7 @@ func (a *Appender) flush(ctx context.Context, bulkIndexer *BulkIndexer) error {
419414 attribute .String ("status" , "FailureStore" ),
420415 attribute .String ("failure_store" , string (FailureStoreStatusUsed )),
421416 ),
422- metric . WithAttributeSet ( a . config . MetricAttributes ) ,
417+ a . metricAttributeSet ,
423418 )
424419 }
425420 if failureStoreDocs .Failed > 0 {
@@ -430,7 +425,7 @@ func (a *Appender) flush(ctx context.Context, bulkIndexer *BulkIndexer) error {
430425 attribute .String ("status" , "FailureStore" ),
431426 attribute .String ("failure_store" , string (FailureStoreStatusFailed )),
432427 ),
433- metric . WithAttributeSet ( a . config . MetricAttributes ) ,
428+ a . metricAttributeSet ,
434429 )
435430 }
436431 if failureStoreDocs .NotEnabled > 0 {
@@ -441,7 +436,7 @@ func (a *Appender) flush(ctx context.Context, bulkIndexer *BulkIndexer) error {
441436 attribute .String ("status" , "FailureStore" ),
442437 attribute .String ("failure_store" , string (FailureStoreStatusNotEnabled )),
443438 ),
444- metric . WithAttributeSet ( a . config . MetricAttributes ) ,
439+ a . metricAttributeSet ,
445440 )
446441 }
447442 logger .Debug (
@@ -492,9 +487,8 @@ func (a *Appender) runActiveIndexer() {
492487 // to reset it to ensure we're using the right client.
493488 active .SetClient (a .client )
494489
495- attrs := metric .WithAttributeSet (a .config .MetricAttributes )
496- a .metrics .availableBulkRequests .Add (context .Background (), - 1 , attrs )
497- a .metrics .inflightBulkrequests .Add (context .Background (), 1 , attrs )
490+ a .metrics .availableBulkRequests .Add (context .Background (), - 1 , a .metricAttributeSet )
491+ a .metrics .inflightBulkrequests .Add (context .Background (), 1 , a .metricAttributeSet )
498492 flushTimer .Reset (a .config .FlushInterval )
499493 }
500494 if err := active .Add (item ); err != nil {
@@ -551,24 +545,22 @@ func (a *Appender) runActiveIndexer() {
551545 if active != nil {
552546 indexer := active
553547 active = nil
554- attrs := metric .WithAttributeSet (a .config .MetricAttributes )
555548 a .errgroup .Go (func () error {
556549 var err error
557550 took := timeFunc (func () {
558551 err = a .flush (a .errgroupContext , indexer )
559552 })
560553 indexer .Reset ()
561554 a .pool .Put (a .id , indexer )
562- attrs := metric .WithAttributeSet (a .config .MetricAttributes )
563- a .metrics .availableBulkRequests .Add (context .Background (), 1 , attrs )
564- a .metrics .inflightBulkrequests .Add (context .Background (), - 1 , attrs )
555+ a .metrics .availableBulkRequests .Add (context .Background (), 1 , a .metricAttributeSet )
556+ a .metrics .inflightBulkrequests .Add (context .Background (), - 1 , a .metricAttributeSet )
565557 a .metrics .flushDuration .Record (context .Background (), took .Seconds (),
566- attrs ,
558+ a . metricAttributeSet ,
567559 )
568560 return err
569561 })
570562 a .metrics .bufferDuration .Record (context .Background (),
571- time .Since (firstDocTS ).Seconds (), attrs ,
563+ time .Since (firstDocTS ).Seconds (), a . metricAttributeSet ,
572564 )
573565 }
574566 if a .config .Scaling .Disabled {
@@ -577,13 +569,11 @@ func (a *Appender) runActiveIndexer() {
577569 now := time .Now ()
578570 info := a .scalingInformation ()
579571 if a .maybeScaleDown (now , info , & timedFlush ) {
580- attrs := metric .WithAttributeSet (a .config .MetricAttributes )
581- a .metrics .activeDestroyed .Add (context .Background (), 1 , attrs )
572+ a .metrics .activeDestroyed .Add (context .Background (), 1 , a .metricAttributeSet )
582573 return
583574 }
584575 if a .maybeScaleUp (now , info , & fullFlush ) {
585- attrs := metric .WithAttributeSet (a .config .MetricAttributes )
586- a .metrics .activeCreated .Add (context .Background (), 1 , attrs )
576+ a .metrics .activeCreated .Add (context .Background (), 1 , a .metricAttributeSet )
587577 a .errgroup .Go (func () error {
588578 a .runActiveIndexer ()
589579 return nil
0 commit comments