Skip to content

Commit d45b52e

Browse files
Add-metrics-CEL-609 checkpoint
1 parent 415d74d commit d45b52e

File tree

8 files changed

+15073
-14435
lines changed

8 files changed

+15073
-14435
lines changed

NOTICE.txt

Lines changed: 14410 additions & 14410 deletions
Large diffs are not rendered by default.

go.mod

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -249,6 +249,16 @@ require (
249249
go.opentelemetry.io/collector/processor v1.41.0
250250
go.opentelemetry.io/collector/processor/processorhelper v0.135.0
251251
go.opentelemetry.io/collector/receiver/receivertest v0.135.0
252+
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.62.0
253+
go.opentelemetry.io/otel v1.38.0
254+
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.37.0
255+
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.37.0
256+
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.37.0
257+
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.37.0
258+
go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v1.37.0
259+
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.37.0
260+
go.opentelemetry.io/otel/metric v1.38.0
261+
go.opentelemetry.io/otel/sdk v1.38.0
252262
go.opentelemetry.io/otel/sdk/metric v1.38.0
253263
go.uber.org/goleak v1.3.0
254264
sigs.k8s.io/kind v0.29.0
@@ -466,25 +476,15 @@ require (
466476
go.opentelemetry.io/contrib/bridges/otelzap v0.12.0 // indirect
467477
go.opentelemetry.io/contrib/detectors/gcp v1.36.0 // indirect
468478
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.61.0 // indirect
469-
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.62.0 // indirect
470479
go.opentelemetry.io/contrib/otelconf v0.17.0 // indirect
471480
go.opentelemetry.io/contrib/propagators/b3 v1.37.0 // indirect
472481
go.opentelemetry.io/ebpf-profiler v0.0.202531 // indirect
473-
go.opentelemetry.io/otel v1.38.0 // indirect
474482
go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc v0.13.0 // indirect
475483
go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp v0.13.0 // indirect
476-
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.37.0 // indirect
477-
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.37.0 // indirect
478-
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.37.0 // indirect
479484
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.37.0 // indirect
480-
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.37.0 // indirect
481485
go.opentelemetry.io/otel/exporters/prometheus v0.59.0 // indirect
482486
go.opentelemetry.io/otel/exporters/stdout/stdoutlog v0.13.0 // indirect
483-
go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v1.37.0 // indirect
484-
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.37.0 // indirect
485487
go.opentelemetry.io/otel/log v0.14.0 // indirect
486-
go.opentelemetry.io/otel/metric v1.38.0 // indirect
487-
go.opentelemetry.io/otel/sdk v1.38.0 // indirect
488488
go.opentelemetry.io/otel/sdk/log v0.13.0 // indirect
489489
go.opentelemetry.io/otel/trace v1.38.0 // indirect
490490
go.opentelemetry.io/proto/otlp v1.7.0 // indirect

x-pack/filebeat/input/cel/config.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,38 @@ type config struct {
6666
// RecordCoverage indicates whether a program should
6767
// record and log execution coverage.
6868
RecordCoverage bool `config:"record_coverage"`
69+
70+
Meta map[string]interface{} `config:"meta"`
71+
}
72+
73+
func (c config) GetPackageName() string {
74+
if c.Meta == nil {
75+
return "unknown"
76+
}
77+
packageinfo, ok := c.Meta["package"]
78+
if !ok {
79+
return "unknown"
80+
}
81+
name, ok := packageinfo.(map[string]interface{})["name"]
82+
if !ok {
83+
return "unknown"
84+
}
85+
return name.(string)
86+
}
87+
88+
func (c config) GetPackageVersion() string {
89+
if c.Meta == nil {
90+
return "unknown"
91+
}
92+
packageinfo, ok := c.Meta["package"]
93+
if !ok {
94+
return "unknown"
95+
}
96+
version, ok := packageinfo.(map[string]interface{})["version"]
97+
if !ok {
98+
return "unknown"
99+
}
100+
return version.(string)
69101
}
70102

71103
type redact struct {

x-pack/filebeat/input/cel/input.go

Lines changed: 58 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,9 @@ import (
1313
"encoding/json"
1414
"errors"
1515
"fmt"
16+
"go.opentelemetry.io/otel/attribute"
17+
"go.opentelemetry.io/otel/sdk/resource"
18+
semconv "go.opentelemetry.io/otel/semconv/v1.34.0"
1619
"io"
1720
"io/fs"
1821
"maps"
@@ -51,6 +54,7 @@ import (
5154
"github.com/elastic/beats/v7/libbeat/version"
5255
"github.com/elastic/beats/v7/x-pack/filebeat/input/internal/httplog"
5356
"github.com/elastic/beats/v7/x-pack/filebeat/input/internal/httpmon"
57+
"github.com/elastic/beats/v7/x-pack/filebeat/otel"
5458
"github.com/elastic/elastic-agent-libs/logp"
5559
"github.com/elastic/elastic-agent-libs/mapstr"
5660
"github.com/elastic/elastic-agent-libs/monitoring"
@@ -150,7 +154,7 @@ func (i input) run(env v2.Context, src *source, cursor map[string]interface{}, p
150154
cfg.Resource.Tracer.Filename = strings.ReplaceAll(cfg.Resource.Tracer.Filename, "*", id)
151155
}
152156

153-
client, trace, err := newClient(ctx, cfg, log, reg)
157+
client, trace, otelMetrics, err := newClient(ctx, cfg, log, reg, env)
154158
if err != nil {
155159
return err
156160
}
@@ -238,6 +242,11 @@ func (i input) run(env v2.Context, src *source, cursor map[string]interface{}, p
238242
// from mito/lib, a global, useragent, is available to use
239243
// in requests.
240244
err = periodically(ctx, cfg.Interval, func() error {
245+
log.Debug("Starting otel periodic")
246+
otelMetrics.AddPeriodicRun(ctx, 1)
247+
otelMetrics.StartPeriodic()
248+
defer otelMetrics.EndPeriodic(ctx)
249+
241250
log.Info("process periodic request")
242251
var (
243252
budget = *cfg.MaxExecutions
@@ -276,8 +285,10 @@ func (i input) run(env v2.Context, src *source, cursor map[string]interface{}, p
276285
log.Debugw("previous transaction", "transaction.id", trace.TxID())
277286
}
278287
log.Debugw("request state", logp.Namespace("cel"), "state", redactor{state: state, cfg: cfg.Redact})
288+
otelMetrics.AddProgramExecution(ctx, 1)
279289
metrics.executions.Add(1)
280290
start := i.now().In(time.UTC)
291+
defer otelMetrics.AddTotalDuration(ctx, time.Since(start))
281292
state, err = evalWith(ctx, prg, ast, state, start, wantDump, budget-1)
282293
log.Debugw("response state", logp.Namespace("cel"), "state", redactor{state: state, cfg: cfg.Redact})
283294
if err != nil {
@@ -303,6 +314,7 @@ func (i input) run(env v2.Context, src *source, cursor map[string]interface{}, p
303314
}
304315
isDegraded = err != nil
305316
metrics.celProcessingTime.Update(time.Since(start).Nanoseconds())
317+
otelMetrics.AddCELDuration(ctx, time.Since(start))
306318
if trace != nil {
307319
log.Debugw("final transaction", "transaction.id", trace.TxID())
308320
}
@@ -451,7 +463,8 @@ func (i input) run(env v2.Context, src *source, cursor map[string]interface{}, p
451463
// We have a non-empty batch of events to process.
452464
metrics.batchesReceived.Add(1)
453465
metrics.eventsReceived.Add(uint64(len(events)))
454-
466+
otelMetrics.AddEvents(ctx, int64(len(events)))
467+
otelMetrics.AddBatch(ctx, 1)
455468
// Drop events from state. If we fail during the publication,
456469
// we will re-request these events.
457470
delete(state, "events")
@@ -528,8 +541,11 @@ func (i input) run(env v2.Context, src *source, cursor map[string]interface{}, p
528541
}
529542
if i == 0 {
530543
metrics.batchesPublished.Add(1)
544+
otelMetrics.AddPublishedBatch(ctx, 1)
545+
531546
}
532547
metrics.eventsPublished.Add(1)
548+
otelMetrics.AddPublishedEvents(ctx, 1)
533549

534550
err = ctx.Err()
535551
if err != nil {
@@ -542,11 +558,12 @@ func (i input) run(env v2.Context, src *source, cursor map[string]interface{}, p
542558
}
543559

544560
metrics.batchProcessingTime.Update(time.Since(start).Nanoseconds())
545-
561+
otelMetrics.AddPublishDuration(ctx, time.Since(start))
546562
// Advance the cursor to the final state if there was no error during
547563
// publications. This is needed to transition to the next set of events.
548564
if !hadPublicationError {
549565
goodCursor = cursor
566+
otelMetrics.AddProgramSuccessExecution(ctx, 1)
550567
}
551568

552569
// Replace the last known good cursor.
@@ -573,6 +590,7 @@ func (i input) run(env v2.Context, src *source, cursor map[string]interface{}, p
573590
log.Infof("input stopped because context was cancelled with: %v", err)
574591
err = nil
575592
}
593+
otelMetrics.Shutdown(ctx)
576594
return err
577595
}
578596

@@ -784,10 +802,10 @@ func getLimit(which string, rateLimit map[string]interface{}, log *logp.Logger)
784802
// https://github.com/natefinch/lumberjack/blob/4cb27fcfbb0f35cb48c542c5ea80b7c1d18933d0/lumberjack.go#L39
785803
const lumberjackTimestamp = "[0-9][0-9][0-9][0-9]-[0-9][0-9]-[0-9][0-9]T[0-9][0-9]-[0-9][0-9]-[0-9][0-9].[0-9][0-9][0-9]"
786804

787-
func newClient(ctx context.Context, cfg config, log *logp.Logger, reg *monitoring.Registry) (*http.Client, *httplog.LoggingRoundTripper, error) {
805+
func newClient(ctx context.Context, cfg config, log *logp.Logger, reg *monitoring.Registry, env v2.Context) (*http.Client, *httplog.LoggingRoundTripper, *otel.OTELCELMetrics, error) {
788806
c, err := cfg.Resource.Transport.Client(clientOptions(cfg.Resource.URL.URL, cfg.Resource.KeepAlive.settings(), log)...)
789807
if err != nil {
790-
return nil, nil, err
808+
return nil, nil, nil, err
791809
}
792810

793811
if cfg.Auth.Digest.isEnabled() {
@@ -883,19 +901,46 @@ func newClient(ctx context.Context, cfg config, log *logp.Logger, reg *monitorin
883901
}
884902

885903
if cfg.Auth.OAuth2.isEnabled() {
886-
authClient, err := cfg.Auth.OAuth2.client(ctx, c)
904+
c, err = cfg.Auth.OAuth2.client(ctx, c)
887905
if err != nil {
888-
return nil, nil, err
906+
return nil, nil, nil, err
889907
}
890-
return authClient, trace, nil
891-
}
892908

893-
c.Transport = userAgentDecorator{
894-
UserAgent: userAgent,
895-
Transport: c.Transport,
909+
//return authClient, trace, nil, nil
910+
} else {
911+
912+
c.Transport = userAgentDecorator{
913+
UserAgent: userAgent,
914+
Transport: c.Transport,
915+
}
896916
}
897917

898-
return c, trace, nil
918+
resource := resource.NewWithAttributes(
919+
semconv.SchemaURL,
920+
semconv.ServiceInstanceID(env.IDWithoutName),
921+
semconv.ServiceNameKey.String(cfg.GetPackageName()),
922+
semconv.ServiceVersionKey.String(cfg.GetPackageVersion()),
923+
attribute.String("agent.version", env.Agent.Version),
924+
attribute.String("agent.componentID", env.Agent.ComponentID),
925+
attribute.String("agent.id", env.Agent.ID.String()),
926+
attribute.String("agent.version", env.Agent.Version),
927+
attribute.String("agent.componentID", env.Agent.ComponentID),
928+
)
929+
930+
log.Infof("created cel input resource", resource.String())
931+
exporterType := otel.GetExporterTypeFromEnv()
932+
exporter, err := otel.NewExporterFactory(log).NewExporter(ctx, exporterType)
933+
if err != nil {
934+
log.Errorw("failed to get exporter", "error", err)
935+
}
936+
exportInterval, err := otel.GetCollectionPeriodFromEnvironment(ctx, cfg.Interval)
937+
if err != nil {
938+
log.Errorw("failed to get collection period", "error", err)
939+
}
940+
log.Infof("created OTEL cel input exporter %s for input %s", exporterType, env.IDWithoutName)
941+
otelMetrics, otelTransport, err := otel.NewOTELCELMetrics(log, env.Agent.UserAgent, *resource, c.Transport, exporter, exportInterval)
942+
c.Transport = otelTransport
943+
return c, trace, otelMetrics, nil
899944
}
900945

901946
func wantClient(cfg config) bool {
@@ -918,7 +963,6 @@ func clientOptions(u *url.URL, keepalive httpcommon.WithKeepaliveSettings, log *
918963
case !ok:
919964
return []httpcommon.TransportOption{
920965
httpcommon.WithLogger(log),
921-
httpcommon.WithAPMHTTPInstrumentation(),
922966
keepalive,
923967
}
924968

x-pack/filebeat/otel/README.md

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
# Opentelemetry metrics
2+
3+
The CEL input is currently able to export Open Telemetry metrics.
4+
The export of OTEL metrics are off by default. The control of OTEL
5+
exporting is through environment variables.
6+
7+
## Configuration
8+
OTEL metrics can be sent to an otlp endpoint or to console for testing.
9+
OTEL metrics endpoints [Open Telemetry Collector](https://www.elastic.co/docs/reference/opentelemetry), the
10+
[Elastic Cloud Mangaged OTLP Endpoint](https://www.elastic.co/docs/reference/opentelemetry/motlp) in Elastic Cloud
11+
or the [otlp endpoint in Elastic APM](https://www.elastic.co/docs/solutions/observability/apm/use-opentelemetry-with-apm).
12+
13+
To export OTEL metrics to console set these environment variables.
14+
```
15+
unset OTEL_EXPORTER_OTLP_ENDPOINT
16+
unset OTEL_EXPORTER_OTLP_HEADERS
17+
export OTEL_METRICS_EXPORTER=console
18+
```
19+
To export OTEL metrics to an OTLP endpoint set these environment variables,
20+
```
21+
export OTEL_EXPORTER_OTLP_ENDPOINT=<endpoint URL>
22+
export OTEL_METRICS_EXPORTER=otlp
23+
24+
Authorization headers will depend upon the endpoint being used.
25+
export OTEL_EXPORTER_OTLP_HEADERS="Authorization=ApiKey <key>"
26+
```
27+
28+
The console exports in JSON. The default protocol for OTLP is GRPC.
29+
Filebeat also supports "http/protobuf". It does not support "http/json."
30+
To use an http/protobuf protocol:
31+
32+
```
33+
export OTEL_EXPORTER_OTLP_METRICS_PROTOCOL="http/protobuf"
34+
```
35+
## Exported metrics
36+
37+
Each CEL input has an associated Open Telemetry Resource associate with it
38+
39+
|name|description|
40+
|----|---|
41+
|resource.attributes.service.name | the package name of the integration|
42+
|resource.attributes.service.version | version of the integration|
43+
|resource.attributes.agent.id | id of the agent|
44+
|resource.attributes.instance.id ||
45+
46+
Exported metrics:
47+
A program run is single run of the cel program. A periodic run is all the
48+
program runs for that periodic run.
49+
50+
|name|description|metric type|
51+
|---|---|---|
52+
|input.cel.periodic.run.count | the number of times a periodic run was started.|Int64Counter|
53+
| input.cel.periodic.program.started | a histogram of times a program was started in a periodic run.|Int64Histogram|
54+
| input.cel.periodic.program.success | a histogram of times a program terminated without an error in a periodic run.|Int64Histogram|
55+
| input.cel.periodic.batch.generated | a histogram of the number of batches generated in a periodic run.|Int64Histogram|
56+
| input.cel.periodic.batch.published | a histogram of the number of batches successfully published in a periodic run.|Int64Histogram|
57+
| input.cel.periodic.event.generated | a histogram of the number of events generated in a periodic run.|Int64Histogram|
58+
| input.cel.periodic.event.published | a histogram of the number of events published in a periodic run.|Int64Histogram|
59+
| input.cel.periodic.run.duration | a histogram of the total duration of time in seconds spent in a periodic run.|Int64Histogram|
60+
| input.cel.periodic.cel.duration | a histogram of the total duration of time in seconds spent processing CEL programs in a periodic run.|Int64Histogram|
61+
|input.cel.periodic.event.publish.duration | a histogram of the total duration of time in seconds publishing events in a periodic run.|Int64Histogram|
62+
|input.cel.program.run.started.count | a count of the number of times a program was started.|Int64Counter|
63+
| input.cel.program.run.success.count | a count of the number of times a program terminated without error.|Int64Counter|
64+
| input.cel.program.batch.count | a count of the number of batches the program has generated.|Int64Counter|
65+
| input.cel.program.event.count | a count of the number of events the program has generated.|Int64Counter|
66+
| input.cel.program.event.published.count | a count of the number of events the program has published.|Int64Counter|
67+
| input.cel.program.batch.published.count | a count of the number of batched the program has published.|Int64Counter|
68+
| input.cel.program.run.duration | a histogram of the total time in seconds spent executing the program.|Int64Histogram|
69+
| input.cel.program.cel.duration | a histogram of the time in seconds spent processing the CEL program.|Int64Histogram|
70+
| input.cel.program.publish.duration | a histogram of the time in seconds spent publishing in the program.|Int64Histogram|
71+

0 commit comments

Comments
 (0)