Skip to content

Commit 52f1d8e

Browse files
Add-metrics-CEL-609 unexported otelCELMetrics. Using sync.Once() instead of init() for global ExporterFactory initialization
1 parent f02c58d commit 52f1d8e

File tree

7 files changed

+179
-108
lines changed

7 files changed

+179
-108
lines changed

x-pack/filebeat/input/cel/cel_metrics.go

Lines changed: 40 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
// +------------------------+-------------------------------------------------------+
3232
//
3333
// See [otel.ExportFactory] for environment settings to run console or http/protobuf output.
34-
// See [OTELCELMetrics] for the complete list of exported metrics.
34+
// See [otelCELMetrics] for the complete list of exported metrics.
3535

3636
// Open Telemetry Metrics for CEL Input
3737
//
@@ -126,7 +126,7 @@ func newInputMetrics(reg *monitoring.Registry, logger *logp.Logger) (*inputMetri
126126
return out, reg
127127
}
128128

129-
type OTELCELMetrics struct {
129+
type otelCELMetrics struct {
130130
log *logp.Logger
131131
shutdownFuncs []func(context.Context) error
132132
manualExportFunc func(context.Context) error
@@ -158,16 +158,16 @@ type OTELCELMetrics struct {
158158
// in the real world use due to the use of intervals for
159159
// running periodic runs. However, test environments with
160160
// small intervals could potentially cause this to happen.
161-
func (o *OTELCELMetrics) StartPeriodic(ctx context.Context) {
162-
o.exportLock.Lock() // Acquire the lock
163-
defer o.exportLock.Unlock()
161+
func (o *otelCELMetrics) StartPeriodic(ctx context.Context) {
162+
o.exportLock.Lock()
164163
o.export = true
164+
o.exportLock.Unlock()
165165
o.periodicRunCount.Add(ctx, 1)
166166
o.startRunTime = time.Now()
167167
}
168168

169169
// EndPeriodic ends the periodic metrics collection and manually exports metrics if a manual export function is set.
170-
func (o *OTELCELMetrics) EndPeriodic(ctx context.Context) {
170+
func (o *otelCELMetrics) EndPeriodic(ctx context.Context) {
171171
if o.export {
172172
o.periodicRunDuration.Add(ctx, time.Since(o.startRunTime).Seconds())
173173
}
@@ -176,61 +176,61 @@ func (o *OTELCELMetrics) EndPeriodic(ctx context.Context) {
176176
}
177177
o.exportLock.Lock() // Acquire the lock
178178
defer o.exportLock.Unlock()
179-
o.log.Debug("OTELCELMetrics EndPeriodic called")
179+
o.log.Debug("otelCELMetrics EndPeriodic called")
180180
o.export = false
181-
o.log.Debug("OTELCELMetrics manual export export")
181+
o.log.Debug("otelCELMetrics manual export export")
182182
err := o.manualExportFunc(ctx)
183183
if err != nil {
184184
o.log.Errorf("error exporting metrics: %v", err)
185185
}
186-
o.log.Debug("OTELCELMetrics manual export ended")
186+
o.log.Debug("otelCELMetrics manual export ended")
187187
}
188188

189-
func (o *OTELCELMetrics) AddProgramRunDuration(ctx context.Context, duration time.Duration) {
189+
func (o *otelCELMetrics) AddProgramRunDuration(ctx context.Context, duration time.Duration) {
190190
o.programRunDurationHistogram.Record(ctx, duration.Seconds())
191191
}
192192

193-
func (o *OTELCELMetrics) AddPublishDuration(ctx context.Context, duration time.Duration) {
193+
func (o *otelCELMetrics) AddPublishDuration(ctx context.Context, duration time.Duration) {
194194
o.periodicEventPublishDuration.Add(ctx, duration.Seconds())
195195
o.programEventPublishDurationHistogram.Record(ctx, duration.Seconds())
196196
}
197197

198-
func (o *OTELCELMetrics) AddCELDuration(ctx context.Context, duration time.Duration) {
198+
func (o *otelCELMetrics) AddCELDuration(ctx context.Context, duration time.Duration) {
199199
o.periodicCelDuration.Add(ctx, duration.Seconds())
200200
o.programCelDurationHistogram.Record(ctx, duration.Seconds())
201201
}
202202

203-
func (o *OTELCELMetrics) AddReceivedBatch(ctx context.Context, count int64) {
203+
func (o *otelCELMetrics) AddReceivedBatch(ctx context.Context, count int64) {
204204
o.periodicBatchProcessedCount.Add(ctx, count)
205205
o.programBatchProcessedHistogram.Record(ctx, count)
206206
}
207207

208-
func (o *OTELCELMetrics) AddPublishedBatch(ctx context.Context, count int64) {
208+
func (o *otelCELMetrics) AddPublishedBatch(ctx context.Context, count int64) {
209209
o.periodicBatchPublishedCount.Add(ctx, count)
210210
o.programBatchPublishedHistogram.Record(ctx, count)
211211
}
212212

213-
func (o *OTELCELMetrics) AddReceivedEvents(ctx context.Context, count int64) {
213+
func (o *otelCELMetrics) AddReceivedEvents(ctx context.Context, count int64) {
214214
o.periodicEventProcessedCount.Add(ctx, count)
215215
o.programEventProcessedHistogram.Record(ctx, count)
216216
}
217217

218-
func (o *OTELCELMetrics) AddPublishedEvents(ctx context.Context, count int64) {
218+
func (o *otelCELMetrics) AddPublishedEvents(ctx context.Context, count int64) {
219219
o.periodicEventPublishedCount.Add(ctx, count)
220220
o.programEventPublishedHistogram.Record(ctx, count)
221221
}
222222

223-
func (o *OTELCELMetrics) AddProgramExecutionStarted(ctx context.Context, count int64) {
223+
func (o *otelCELMetrics) AddProgramExecutionStarted(ctx context.Context, count int64) {
224224
o.periodicProgramRunStartedCount.Add(ctx, count)
225225
}
226226

227-
func (o *OTELCELMetrics) AddProgramExecutionSuccess(ctx context.Context, count int64) {
227+
func (o *otelCELMetrics) AddProgramExecutionSuccess(ctx context.Context, count int64) {
228228
o.periodicProgramRunSuccessCount.Add(ctx, count)
229229
}
230230

231231
// Shutdown(ctx context.Context) error
232232
// Flushes the meters to the exporters, then shutsdown the exporter
233-
func (o *OTELCELMetrics) Shutdown(ctx context.Context) {
233+
func (o *otelCELMetrics) Shutdown(ctx context.Context) {
234234
o.EndPeriodic(ctx)
235235
var err error
236236
for _, fn := range o.shutdownFuncs {
@@ -241,11 +241,11 @@ func (o *OTELCELMetrics) Shutdown(ctx context.Context) {
241241
}
242242
}
243243

244-
func NewOTELCELMetrics(log *logp.Logger,
244+
func newOTELCELMetrics(log *logp.Logger,
245245
resource resource.Resource,
246246
tripper http.RoundTripper,
247247
metricExporter sdkmetric.Exporter,
248-
) (*OTELCELMetrics, *otelhttp.Transport, error) {
248+
) (*otelCELMetrics, *otelhttp.Transport, error) {
249249
var manualExportFunc func(context.Context) error
250250
var meterProvider metric.MeterProvider
251251
var shutdownFuncs []func(context.Context) error
@@ -283,9 +283,9 @@ func NewOTELCELMetrics(log *logp.Logger,
283283
if log.IsDebug() {
284284
jsonData, err := json.Marshal(collectedMetrics)
285285
if err == nil {
286-
log.Debugf("OTELCELMetrics Collected metrics %s", jsonData)
286+
log.Debugf("otelCELMetrics Collected metrics %s", jsonData)
287287
} else {
288-
log.Debugf("OTELCELMetrics could not marshall Collected metrics into json %v", collectedMetrics)
288+
log.Debugf("otelCELMetrics could not marshall Collected metrics into json %v", collectedMetrics)
289289
}
290290
}
291291
go func(ctx context.Context, log *logp.Logger, metricExporter sdkmetric.Exporter, collectedMetrics *metricdata.ResourceMetrics) {
@@ -373,7 +373,7 @@ func NewOTELCELMetrics(log *logp.Logger,
373373
return nil, nil, fmt.Errorf("failed to create input.cel.program.publish.duration: %w", err)
374374
}
375375

376-
return &OTELCELMetrics{
376+
return &otelCELMetrics{
377377
log: log,
378378
shutdownFuncs: shutdownFuncs,
379379
manualExportFunc: manualExportFunc,
@@ -397,73 +397,73 @@ func NewOTELCELMetrics(log *logp.Logger,
397397
}, transport, nil
398398
}
399399

400-
type MetricsRecorder struct {
400+
type metricsRecorder struct {
401401
inputMetrics *inputMetrics
402-
otelMetrics *OTELCELMetrics
402+
otelMetrics *otelCELMetrics
403403
}
404404

405-
func NewMetricsRecorder(inputMetrics *inputMetrics, otelMetrics *OTELCELMetrics) (*MetricsRecorder, error) {
405+
func newMetricsRecorder(inputMetrics *inputMetrics, otelMetrics *otelCELMetrics) (*metricsRecorder, error) {
406406
if inputMetrics == nil || otelMetrics == nil {
407407
return nil, errors.New("input metrics and otel metrics cannot be nil")
408408
}
409-
return &MetricsRecorder{
409+
return &metricsRecorder{
410410
inputMetrics,
411411
otelMetrics,
412412
}, nil
413413
}
414414

415-
func (o *MetricsRecorder) StartPeriodic(ctx context.Context) {
415+
func (o *metricsRecorder) StartPeriodic(ctx context.Context) {
416416
o.otelMetrics.StartPeriodic(ctx)
417417
}
418418

419419
// EndPeriodic ends the periodic metrics collection and manually exports metrics if a manual export function is set.
420-
func (o *MetricsRecorder) EndPeriodic(ctx context.Context) {
420+
func (o *metricsRecorder) EndPeriodic(ctx context.Context) {
421421
o.otelMetrics.EndPeriodic(ctx)
422422
}
423423

424-
func (o *MetricsRecorder) AddCELDuration(ctx context.Context, duration time.Duration) {
424+
func (o *metricsRecorder) AddCELDuration(ctx context.Context, duration time.Duration) {
425425
o.otelMetrics.AddCELDuration(ctx, duration)
426426
o.inputMetrics.celProcessingTime.Update(duration.Nanoseconds())
427427
}
428428

429-
func (o *MetricsRecorder) AddProgramRunDuration(ctx context.Context, duration time.Duration) {
429+
func (o *metricsRecorder) AddProgramRunDuration(ctx context.Context, duration time.Duration) {
430430
o.otelMetrics.AddProgramRunDuration(ctx, duration)
431431
}
432432

433-
func (o *MetricsRecorder) AddPublishDuration(ctx context.Context, duration time.Duration) {
433+
func (o *metricsRecorder) AddPublishDuration(ctx context.Context, duration time.Duration) {
434434
o.otelMetrics.AddPublishDuration(ctx, duration)
435435
o.inputMetrics.batchProcessingTime.Update(duration.Nanoseconds())
436436
}
437437

438-
func (o *MetricsRecorder) AddReceivedBatch(ctx context.Context, count uint) {
438+
func (o *metricsRecorder) AddReceivedBatch(ctx context.Context, count uint) {
439439
o.inputMetrics.batchesReceived.Add(uint64(count))
440440
o.otelMetrics.AddReceivedBatch(ctx, int64(count)) //nolint:gosec // disable G115
441441
}
442442

443-
func (o *MetricsRecorder) AddPublishedBatch(ctx context.Context, count uint) {
443+
func (o *metricsRecorder) AddPublishedBatch(ctx context.Context, count uint) {
444444
o.inputMetrics.batchesPublished.Add(uint64(count))
445445
o.otelMetrics.AddPublishedBatch(ctx, int64(count)) //nolint:gosec // disable G115
446446
}
447447

448-
func (o *MetricsRecorder) AddReceivedEvents(ctx context.Context, count uint) {
448+
func (o *metricsRecorder) AddReceivedEvents(ctx context.Context, count uint) {
449449
o.inputMetrics.eventsReceived.Add(uint64(count))
450450
o.otelMetrics.AddReceivedEvents(ctx, int64(count)) //nolint:gosec // disable G115
451451
}
452452

453-
func (o *MetricsRecorder) AddPublishedEvents(ctx context.Context, count uint) {
453+
func (o *metricsRecorder) AddPublishedEvents(ctx context.Context, count uint) {
454454
o.inputMetrics.eventsPublished.Add(uint64(count))
455455
o.otelMetrics.AddPublishedEvents(ctx, int64(count)) //nolint:gosec // disable G115
456456
}
457457

458-
func (o *MetricsRecorder) AddProgramExecution(ctx context.Context) {
458+
func (o *metricsRecorder) AddProgramExecution(ctx context.Context) {
459459
o.inputMetrics.executions.Add(1)
460460
o.otelMetrics.AddProgramExecutionStarted(ctx, 1)
461461
}
462462

463-
func (o *MetricsRecorder) AddProgramSuccessExecution(ctx context.Context) {
463+
func (o *metricsRecorder) AddProgramSuccessExecution(ctx context.Context) {
464464
o.otelMetrics.AddProgramExecutionSuccess(ctx, 1)
465465
}
466466

467-
func (o *MetricsRecorder) SetResourceURL(url string) {
467+
func (o *metricsRecorder) SetResourceURL(url string) {
468468
o.inputMetrics.resource.Set(url)
469469
}

x-pack/filebeat/input/cel/cel_metrics_test.go

Lines changed: 30 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ import (
1919
"github.com/elastic/elastic-agent-libs/logp"
2020
"github.com/elastic/elastic-agent-libs/monitoring"
2121

22-
"github.com/stretchr/testify/assert"
2322
"go.opentelemetry.io/otel/exporters/stdout/stdoutmetric"
2423
"go.opentelemetry.io/otel/sdk/resource"
2524
semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
@@ -34,7 +33,7 @@ func TestOTELCELMetrics(t *testing.T) {
3433

3534
client := testServer.Client()
3635

37-
// Set up the OTELCELMetrics
36+
// Set up the otelCELMetrics
3837
log := logp.NewLogger("cel_metrics_test")
3938
resource := resource.NewWithAttributes(
4039
semconv.SchemaURL,
@@ -59,14 +58,14 @@ func TestOTELCELMetrics(t *testing.T) {
5958

6059
metricExporter, err := stdoutmetric.New(stdoutmetric.WithPrettyPrint(),
6160
stdoutmetric.WithTemporalitySelector(otel.DeltaSelector),
62-
stdoutmetric.WithEncoder(otel.NewConcurentEncoder(json.NewEncoder(w))))
61+
stdoutmetric.WithEncoder(&otel.ConcurrentEncoder{Encoder: json.NewEncoder(w)}))
6362
if err != nil {
6463
t.Fatalf("failed to create exporter: %v", err)
6564
}
6665

67-
otelCELMetrics, transport, err := NewOTELCELMetrics(log, *resource, client.Transport, metricExporter)
66+
otelCELMetrics, transport, err := newOTELCELMetrics(log, *resource, client.Transport, metricExporter)
6867
if err != nil {
69-
t.Fatalf("failed to create OTELCELMetrics: %v", err)
68+
t.Fatalf("failed to create otelCELMetrics: %v", err)
7069
}
7170
ctx := context.Background()
7271
defer otelCELMetrics.Shutdown(ctx)
@@ -75,11 +74,11 @@ func TestOTELCELMetrics(t *testing.T) {
7574

7675
inputMetrics, _ := newInputMetrics(reg, log)
7776

78-
mRecorder, err := NewMetricsRecorder(inputMetrics, otelCELMetrics)
77+
mRecorder, err := newMetricsRecorder(inputMetrics, otelCELMetrics)
7978
if err != nil {
8079
t.Fatalf("failed to create metrics recorder: %v", err)
8180
}
82-
// Create an HTTP client using the OTELCELMetrics transport
81+
// Create an HTTP client using the otelCELMetrics transport
8382
client.Transport = transport
8483

8584
var totalCelDuration time.Duration
@@ -155,14 +154,30 @@ func TestOTELCELMetrics(t *testing.T) {
155154
}
156155
}
157156

158-
assert.Equal(t, 0, len(notFound), notFound)
157+
if len(notFound) != 0 {
158+
t.Errorf("expected all metrics to be found, but missing: %v", notFound)
159+
}
159160

160161
// check that inputMetrics are incremented
161-
assert.Equal(t, uint64(5), inputMetrics.executions.Get())
162-
assert.Equal(t, uint64(15), inputMetrics.eventsReceived.Get())
163-
assert.Equal(t, uint64(5), inputMetrics.batchesReceived.Get())
164-
assert.Equal(t, uint64(15), inputMetrics.eventsPublished.Get())
165-
assert.Equal(t, uint64(5), inputMetrics.batchesPublished.Get())
166-
assert.Equal(t, int64(5), inputMetrics.celProcessingTime.Count())
167-
assert.Equal(t, totalCelDuration.Nanoseconds(), inputMetrics.celProcessingTime.Sum())
162+
if inputMetrics.executions.Get() != uint64(5) {
163+
t.Errorf("executions = %v, want %v", inputMetrics.executions.Get(), uint64(5))
164+
}
165+
if inputMetrics.eventsReceived.Get() != uint64(15) {
166+
t.Errorf("eventsReceived = %v, want %v", inputMetrics.eventsReceived.Get(), uint64(15))
167+
}
168+
if inputMetrics.batchesReceived.Get() != uint64(5) {
169+
t.Errorf("batchesReceived = %v, want %v", inputMetrics.batchesReceived.Get(), uint64(5))
170+
}
171+
if inputMetrics.eventsPublished.Get() != uint64(15) {
172+
t.Errorf("eventsPublished = %v, want %v", inputMetrics.eventsPublished.Get(), uint64(15))
173+
}
174+
if inputMetrics.batchesPublished.Get() != uint64(5) {
175+
t.Errorf("batchesPublished = %v, want %v", inputMetrics.batchesPublished.Get(), uint64(5))
176+
}
177+
if inputMetrics.celProcessingTime.Count() != int64(5) {
178+
t.Errorf("celProcessingTime.Count() = %v, want %v", inputMetrics.celProcessingTime.Count(), int64(5))
179+
}
180+
if inputMetrics.celProcessingTime.Sum() != totalCelDuration.Nanoseconds() {
181+
t.Errorf("celProcessingTime.Sum() = %v, want %v", inputMetrics.celProcessingTime.Sum(), totalCelDuration.Nanoseconds())
182+
}
168183
}

x-pack/filebeat/input/cel/input.go

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,9 @@
1717
// - OTEL_EXPORTER_OTLP_HEADERS: Required if endpoint is authenticated.
1818
// - OTEL_RESOURCE_ATTRIBUTES: Optional but recommended
1919
//
20-
// See [OTELCELMetrics] for more information about OTEL_RESOURCE_ATTRIBUTES and Open Telemetry ResourceAttributes
20+
// See [otelCELMetrics] for more information about OTEL_RESOURCE_ATTRIBUTES and Open Telemetry ResourceAttributes
2121
// See [otel.ExportFactory] for environment settings to run console or http/protobuf output.
22-
// See [OTELCELMetrics] for the complete list of exported metrics.
22+
// See [otelCELMetrics] for the complete list of exported metrics.
2323
package cel
2424

2525
import (
@@ -202,7 +202,7 @@ func (i input) run(env v2.Context, src *source, cursor map[string]interface{}, p
202202
return err
203203
}
204204
defer otelMetrics.Shutdown(ctx)
205-
metricsRecorder, err := NewMetricsRecorder(metrics, otelMetrics)
205+
metricsRecorder, err := newMetricsRecorder(metrics, otelMetrics)
206206
if err != nil {
207207
return err
208208
}
@@ -885,7 +885,7 @@ func getLimit(which string, rateLimit map[string]interface{}, log *logp.Logger)
885885
// https://github.com/natefinch/lumberjack/blob/4cb27fcfbb0f35cb48c542c5ea80b7c1d18933d0/lumberjack.go#L39
886886
const lumberjackTimestamp = "[0-9][0-9][0-9][0-9]-[0-9][0-9]-[0-9][0-9]T[0-9][0-9]-[0-9][0-9]-[0-9][0-9].[0-9][0-9][0-9]"
887887

888-
func newClient(ctx context.Context, cfg config, log *logp.Logger, reg *monitoring.Registry, env v2.Context) (*http.Client, *httplog.LoggingRoundTripper, *OTELCELMetrics, error) {
888+
func newClient(ctx context.Context, cfg config, log *logp.Logger, reg *monitoring.Registry, env v2.Context) (*http.Client, *httplog.LoggingRoundTripper, *otelCELMetrics, error) {
889889
c, err := cfg.Resource.Transport.Client(clientOptions(cfg.Resource.URL.URL, cfg.Resource.KeepAlive.settings(), log)...)
890890
if err != nil {
891891
return nil, nil, nil, err
@@ -982,7 +982,7 @@ func newClient(ctx context.Context, cfg config, log *logp.Logger, reg *monitorin
982982
c.Transport = httpmon.NewMetricsRoundTripper(c.Transport, reg, log)
983983
}
984984

985-
otelMetrics, otelTransport, err := CreateOTELMetrics(ctx, cfg, log, env, c.Transport)
985+
otelMetrics, otelTransport, err := createOTELMetrics(ctx, cfg, log, env, c.Transport)
986986
if err != nil {
987987
return nil, nil, nil, err
988988
}
@@ -1019,7 +1019,7 @@ func newClient(ctx context.Context, cfg config, log *logp.Logger, reg *monitorin
10191019
return c, trace, otelMetrics, nil
10201020
}
10211021

1022-
func CreateOTELMetrics(ctx context.Context, cfg config, log *logp.Logger, env v2.Context, tripper http.RoundTripper) (*OTELCELMetrics, *otelhttp.Transport, error) {
1022+
func createOTELMetrics(ctx context.Context, cfg config, log *logp.Logger, env v2.Context, tripper http.RoundTripper) (*otelCELMetrics, *otelhttp.Transport, error) {
10231023
resource := resource.NewWithAttributes(
10241024
semconv.SchemaURL, getResourceAttributes(env, cfg)...,
10251025
)
@@ -1030,7 +1030,7 @@ func CreateOTELMetrics(ctx context.Context, cfg config, log *logp.Logger, env v2
10301030
log.Errorw("failed to get exporter", "error", err)
10311031
}
10321032
log.Infof("created OTEL cel input exporter %s for input %s", exporterType, env.IDWithoutName)
1033-
return NewOTELCELMetrics(log, *resource, tripper, exporter)
1033+
return newOTELCELMetrics(log, *resource, tripper, exporter)
10341034
}
10351035

10361036
func getResourceAttributes(env v2.Context, cfg config) []attribute.KeyValue {
@@ -1049,7 +1049,6 @@ func getResourceAttributes(env v2.Context, cfg config) []attribute.KeyValue {
10491049
}
10501050

10511051
seen := make(map[attribute.Key]bool)
1052-
10531052
for _, attr := range attrs {
10541053
seen[attr.Key] = true
10551054
}

0 commit comments

Comments
 (0)