Skip to content
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
7fb585a
Add-metrics-CEL-609 checkpoint
StacieClark-Elastic Sep 10, 2025
8379a77
Add-metrics-CEL-609 added conversion of http method histogram metrics…
StacieClark-Elastic Oct 13, 2025
1ccd5c5
Add-metrics-CEL-609 updated README.md
StacieClark-Elastic Oct 14, 2025
8b36376
Add-metrics-CEL-609 removed interval export. Changed histograms to Ex…
StacieClark-Elastic Nov 12, 2025
5c03586
Add-metrics-CEL-609 refactored ExporterFactory
StacieClark-Elastic Nov 12, 2025
b4fdd67
Add-metrics-CEL-609
StacieClark-Elastic Nov 19, 2025
829cd7e
Add-metrics-CEL-609 exporter is now shared among all cel inputs.
StacieClark-Elastic Nov 25, 2025
eccf80b
Add-metrics-CEL-609
StacieClark-Elastic Nov 26, 2025
597cb44
Add-metrics-CEL-609 Added a unit test for cel_metrics.go
StacieClark-Elastic Nov 26, 2025
7455988
Add-metrics-CEL-609 fixed missing http attribute in readme
StacieClark-Elastic Nov 26, 2025
0ff941d
Add-metrics-CEL-609 added license headers
StacieClark-Elastic Nov 26, 2025
635fa5c
Add-metrics-CEL-609 fixed some linter errors
StacieClark-Elastic Nov 26, 2025
16234c2
Changed attribute package.datastream to package.data_stream
StacieClark-Elastic Nov 26, 2025
bd13d1e
Add-metrics-CEL-609 added changelog. Fix go.mod
StacieClark-Elastic Nov 26, 2025
ac013bb
fixed some linter errors
StacieClark-Elastic Dec 1, 2025
9b8f092
Add package data only to cel inputs
StacieClark-Elastic Dec 2, 2025
6bc8c38
data.json files are made identical to main
StacieClark-Elastic Dec 2, 2025
8e7319e
data.json files forced to be like those in main
StacieClark-Elastic Dec 2, 2025
0ab4fc3
updated NOTICE.txt
StacieClark-Elastic Dec 2, 2025
69afb7b
Fixed linter error for unchecked error in type assertion
StacieClark-Elastic Dec 2, 2025
ad0addd
minor fixes due to comments on PR
StacieClark-Elastic Dec 4, 2025
b3e4fd7
Merge branch 'main' into Add-metrics-CEL-609
StacieClark-Elastic Dec 4, 2025
ec7452b
Add-metrics-CEL-609 OTEL metrics transport is at same level as the 'i…
StacieClark-Elastic Dec 4, 2025
962df13
Add-metrics-CEL-609 refactored ExporterFactory. Added a composite str…
StacieClark-Elastic Dec 10, 2025
045eab7
Merge branch 'main' into Add-metrics-CEL-609
StacieClark-Elastic Dec 10, 2025
fe3d2a4
Add-metrics-CEL-609 Converted metrics collection diagram to mermaid. …
StacieClark-Elastic Dec 10, 2025
951f75f
Add-metrics-CEL-609 Fixed linter errors
StacieClark-Elastic Dec 10, 2025
f7731a0
Merge branch 'main' into Add-metrics-CEL-609
StacieClark-Elastic Dec 10, 2025
beb7d7e
Add-metrics-CEL-609 fixed linter error
StacieClark-Elastic Dec 10, 2025
f02c58d
Merge branch 'main' into Add-metrics-CEL-609
StacieClark-Elastic Dec 11, 2025
52f1d8e
Add-metrics-CEL-609 unexported otelCELMetrics. Using sync.Once() inst…
StacieClark-Elastic Dec 12, 2025
abeac96
Add-metrics-CEL-609 Moved godoc level documentation into a doc.go fil…
StacieClark-Elastic Dec 15, 2025
f869295
Add-metrics-CEL-609 added link to metrics collection points diagram
StacieClark-Elastic Dec 15, 2025
3c38516
Merge branch 'main' into Add-metrics-CEL-609
StacieClark-Elastic Dec 15, 2025
0aee69e
Add-metrics-CEL-609 moved replacement of context to inside of manualE…
StacieClark-Elastic Dec 15, 2025
66bce44
Add-metrics-CEL-609 added missed cancel of a context
StacieClark-Elastic Dec 15, 2025
8a2afee
Merge branch 'main' into Add-metrics-CEL-609
StacieClark-Elastic Dec 15, 2025
a55cb0e
Add-metrics-CEL-609 build updated doc.go file
StacieClark-Elastic Dec 15, 2025
9eaae6f
Merge branch 'main' into Add-metrics-CEL-609
StacieClark-Elastic Dec 15, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27,816 changes: 13,908 additions & 13,908 deletions NOTICE.txt

Large diffs are not rendered by default.

45 changes: 45 additions & 0 deletions changelog/fragments/1764199697-add-cel-input-otel-metrics.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# REQUIRED
# Kind can be one of:
# - breaking-change: a change to previously-documented behavior
# - deprecation: functionality that is being removed in a later release
# - bug-fix: fixes a problem in a previous version
# - enhancement: extends functionality but does not break or fix existing behavior
# - feature: new functionality
# - known-issue: problems that we are aware of in a given version
# - security: impacts on the security of a product or a user’s deployment.
# - upgrade: important information for someone upgrading from a prior version
# - other: does not fit into any of the other categories
kind: feature

# REQUIRED for all kinds
# Change summary; a 80ish characters long description of the change.
summary: add OTEL metrics to CEL inputs

# REQUIRED for breaking-change, deprecation, known-issue
# Long description; in case the summary is not enough to describe the change
# this field accommodate a description without length limits.
# description:

# REQUIRED for breaking-change, deprecation, known-issue
# impact:

# REQUIRED for breaking-change, deprecation, known-issue
# action:

# REQUIRED for all kinds
# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc.
component: filebeat

# AUTOMATED
# OPTIONAL to manually add other PR URLs
# PR URL: A link the PR that added the changeset.
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
# Please provide it if you are adding a fragment for a different PR.
# pr: https://github.com/owner/repo/1234

# AUTOMATED
# OPTIONAL to manually add other issue URLs
# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of).
# If not present is automatically filled by the tooling with the issue linked to the PR number.
# issue: https://github.com/owner/repo/1234
14 changes: 7 additions & 7 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,13 @@ require (
go.opentelemetry.io/collector/processor v1.45.0
go.opentelemetry.io/collector/processor/processorhelper v0.139.0
go.opentelemetry.io/collector/receiver/receivertest v0.139.0
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.63.0
go.opentelemetry.io/otel v1.38.0
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.38.0
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.38.0
go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v1.38.0
go.opentelemetry.io/otel/metric v1.38.0
go.opentelemetry.io/otel/sdk v1.38.0
go.opentelemetry.io/otel/sdk/metric v1.38.0
go.uber.org/goleak v1.3.0
sigs.k8s.io/kind v0.29.0
Expand Down Expand Up @@ -468,25 +475,18 @@ require (
go.opentelemetry.io/contrib/bridges/otelzap v0.13.0 // indirect
go.opentelemetry.io/contrib/detectors/gcp v1.36.0 // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.61.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.63.0 // indirect
go.opentelemetry.io/contrib/otelconf v0.18.0 // indirect
go.opentelemetry.io/contrib/propagators/b3 v1.38.0 // indirect
go.opentelemetry.io/ebpf-profiler v0.0.202540 // indirect
go.opentelemetry.io/otel v1.38.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc v0.14.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp v0.14.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.38.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.38.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.38.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.38.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.38.0 // indirect
go.opentelemetry.io/otel/exporters/prometheus v0.60.0 // indirect
go.opentelemetry.io/otel/exporters/stdout/stdoutlog v0.14.0 // indirect
go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v1.38.0 // indirect
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.38.0 // indirect
go.opentelemetry.io/otel/log v0.14.0 // indirect
go.opentelemetry.io/otel/metric v1.38.0 // indirect
go.opentelemetry.io/otel/sdk v1.38.0 // indirect
go.opentelemetry.io/otel/sdk/log v0.14.0 // indirect
go.opentelemetry.io/otel/trace v1.38.0 // indirect
go.opentelemetry.io/proto/otlp v1.7.1 // indirect
Expand Down
15 changes: 15 additions & 0 deletions x-pack/filebeat/input/cel/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,21 @@
// RecordCoverage indicates whether a program should
// record and log execution coverage.
RecordCoverage bool `config:"record_coverage"`

// Package contains pacakge level integration data.

Check failure on line 74 in x-pack/filebeat/input/cel/config.go

View workflow job for this annotation

GitHub Actions / lint (ubuntu-latest)

`pacakge` is a misspelling of `package` (misspell)
// name and version are expected.
Package map[string]string `config:"package"`
}

func (c config) GetPackageData(key string) string {
if c.Package == nil {
return "unknown"
}
value, ok := c.Package[key]
if !ok {
return "unknown"
}
return value
}

type redact struct {
Expand Down
113 changes: 99 additions & 14 deletions x-pack/filebeat/input/cel/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@
"github.com/icholy/digest"
"github.com/rcrowley/go-metrics"
"go.elastic.co/ecszap"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/resource"
semconv "go.opentelemetry.io/otel/semconv/v1.17.0"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"golang.org/x/time/rate"
Expand All @@ -51,6 +55,7 @@
"github.com/elastic/beats/v7/libbeat/version"
"github.com/elastic/beats/v7/x-pack/filebeat/input/internal/httplog"
"github.com/elastic/beats/v7/x-pack/filebeat/input/internal/httpmon"
"github.com/elastic/beats/v7/x-pack/filebeat/otel"
"github.com/elastic/beats/v7/x-pack/libbeat/common/aws"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/mapstr"
Expand Down Expand Up @@ -100,7 +105,7 @@
func (input) Name() string { return inputName }

func (input) Test(src inputcursor.Source, _ v2.TestContext) error {
cfg := src.(*source).cfg
cfg := src.(*source).cfg //nolint:errcheck

Check failure on line 108 in x-pack/filebeat/input/cel/input.go

View workflow job for this annotation

GitHub Actions / lint (ubuntu-latest)

directive `//nolint:errcheck` should provide explanation such as `//nolint:errcheck // this is why` (nolintlint)
if !wantClient(cfg) {
return nil
}
Expand All @@ -110,7 +115,7 @@
// Run starts the input and blocks until it ends completes. It will return on
// context cancellation or type invalidity errors, any other error will be retried.
func (input) Run(env v2.Context, src inputcursor.Source, crsr inputcursor.Cursor, pub inputcursor.Publisher) error {
dataStreamName := src.(*source).cfg.DataStream // May be empty.
dataStreamName := src.(*source).cfg.DataStream //nolint:errcheck May be empty.

Check failure on line 118 in x-pack/filebeat/input/cel/input.go

View workflow job for this annotation

GitHub Actions / lint (ubuntu-latest)

Error return value is not checked (errcheck)

var cursor map[string]interface{}
env.UpdateStatus(status.Starting, dataStreamName)
Expand All @@ -128,7 +133,7 @@
parent: &env,
}
}
err := input{}.run(env, src.(*source), cursor, pub, health)

Check failure on line 136 in x-pack/filebeat/input/cel/input.go

View workflow job for this annotation

GitHub Actions / lint (ubuntu-latest)

Error return value is not checked (errcheck)
if err != nil {
msg := "failed to run: " + err.Error()
if dataStreamName != "" {
Expand Down Expand Up @@ -179,7 +184,7 @@
cfg.Resource.Tracer.Filename = strings.ReplaceAll(cfg.Resource.Tracer.Filename, "*", id)
}

client, trace, err := newClient(ctx, cfg, log, reg)
client, trace, otelMetrics, err := newClient(ctx, cfg, log, reg, env)
if err != nil {
return err
}
Expand Down Expand Up @@ -268,7 +273,12 @@
// from mito/lib, a global, useragent, is available to use
// in requests.
err = periodically(ctx, cfg.Interval, func() error {
log.Info("process periodic request")
log.Debug("Starting otel periodic")
otelMetrics.AddPeriodicRun(ctx, 1)
otelMetrics.StartPeriodic()
defer otelMetrics.EndPeriodic(ctx)

log.Debug("process periodic request")
var (
budget = *cfg.MaxExecutions
waitUntil time.Time
Expand Down Expand Up @@ -306,9 +316,14 @@
log.Debugw("previous transaction", "transaction.id", trace.TxID())
}
log.Debugw("request state", logp.Namespace("cel"), "state", redactor{state: state, cfg: cfg.Redact})
otelMetrics.AddProgramExecution(ctx, 1)
metrics.executions.Add(1)
start := i.now().In(time.UTC)
defer func() {
otelMetrics.AddTotalDuration(ctx, time.Since(start))
}()
state, err = evalWith(ctx, prg, ast, state, start, wantDump, budget-1)
otelMetrics.AddCELDuration(ctx, time.Since(start))
log.Debugw("response state", logp.Namespace("cel"), "state", redactor{state: state, cfg: cfg.Redact})
if err != nil {
var dump dumpError
Expand Down Expand Up @@ -443,11 +458,13 @@
switch e := e.(type) {
case []interface{}:
if len(e) == 0 {
otelMetrics.AddProgramSuccessExecution(ctx, 1)
return nil
}
events = e
case map[string]interface{}:
if e == nil {
otelMetrics.AddProgramSuccessExecution(ctx, 1)
return nil
}
if _, ok := e["error"]; ok {
Expand Down Expand Up @@ -481,7 +498,8 @@
// We have a non-empty batch of events to process.
metrics.batchesReceived.Add(1)
metrics.eventsReceived.Add(uint64(len(events)))

otelMetrics.AddGeneratedBatch(ctx, 1)
otelMetrics.AddEvents(ctx, int64(len(events)))
// Drop events from state. If we fail during the publication,
// we will re-request these events.
delete(state, "events")
Expand Down Expand Up @@ -578,8 +596,11 @@
}
if i == 0 {
metrics.batchesPublished.Add(1)
otelMetrics.AddPublishedBatch(ctx, 1)

}
metrics.eventsPublished.Add(1)
otelMetrics.AddPublishedEvents(ctx, 1)

err = ctx.Err()
if err != nil {
Expand All @@ -592,11 +613,12 @@
}

metrics.batchProcessingTime.Update(time.Since(start).Nanoseconds())

otelMetrics.AddPublishDuration(ctx, time.Since(start))
// Advance the cursor to the final state if there was no error during
// publications. This is needed to transition to the next set of events.
if !hadPublicationError {
goodCursor = cursor
otelMetrics.AddProgramSuccessExecution(ctx, 1)
}

// Replace the last known good cursor.
Expand All @@ -623,6 +645,7 @@
log.Infof("input stopped because context was cancelled with: %v", err)
err = nil
}
otelMetrics.Shutdown(ctx)
return err
}

Expand Down Expand Up @@ -847,10 +870,10 @@
// https://github.com/natefinch/lumberjack/blob/4cb27fcfbb0f35cb48c542c5ea80b7c1d18933d0/lumberjack.go#L39
const lumberjackTimestamp = "[0-9][0-9][0-9][0-9]-[0-9][0-9]-[0-9][0-9]T[0-9][0-9]-[0-9][0-9]-[0-9][0-9].[0-9][0-9][0-9]"

func newClient(ctx context.Context, cfg config, log *logp.Logger, reg *monitoring.Registry) (*http.Client, *httplog.LoggingRoundTripper, error) {
func newClient(ctx context.Context, cfg config, log *logp.Logger, reg *monitoring.Registry, env v2.Context) (*http.Client, *httplog.LoggingRoundTripper, *otel.OTELCELMetrics, error) {
c, err := cfg.Resource.Transport.Client(clientOptions(cfg.Resource.URL.URL, cfg.Resource.KeepAlive.settings(), log)...)
if err != nil {
return nil, nil, err
return nil, nil, nil, err
}

if cfg.Auth.Digest.isEnabled() {
Expand All @@ -870,13 +893,13 @@
tr, err := aws.InitializeSignerTransport(*cfg.Auth.AWS, log, c.Transport)
if err != nil {
log.Errorw("failed to initialize aws config failed for signer", "error", err)
return nil, nil, err
return nil, nil, nil, err
}
c.Transport = tr
} else if cfg.Auth.File.isEnabled() {
tr, err := newFileAuthTransport(cfg.Auth.File, c.Transport)
if err != nil {
return nil, nil, err
return nil, nil, nil, err
}
c.Transport = tr
}
Expand Down Expand Up @@ -944,8 +967,15 @@
c.Transport = httpmon.NewMetricsRoundTripper(c.Transport, reg, log)
}

otelMetrics, otelTransport, err := CreateOTELMetrics(ctx, cfg, log, env, c.Transport)
if err != nil {
return nil, nil, nil, err
}

c.Transport = otelTransport
c.CheckRedirect = checkRedirect(cfg.Resource, log)


Check failure on line 978 in x-pack/filebeat/input/cel/input.go

View workflow job for this annotation

GitHub Actions / lint (ubuntu-latest)

File is not properly formatted (goimports)
if cfg.Resource.Retry.getMaxAttempts() > 1 {
maxAttempts := cfg.Resource.Retry.getMaxAttempts()
c = (&retryablehttp.Client{
Expand All @@ -961,19 +991,74 @@
}

if cfg.Auth.OAuth2.isEnabled() {
authClient, err := cfg.Auth.OAuth2.client(ctx, c)
c, err = cfg.Auth.OAuth2.client(ctx, c)
if err != nil {
return nil, nil, err
return nil, nil, nil, err
}
return authClient, trace, nil
}

c.Transport = userAgentDecorator{
UserAgent: userAgent,
Transport: c.Transport,
}

return c, trace, nil
return c, trace, otelMetrics, nil
}

func CreateOTELMetrics(ctx context.Context, cfg config, log *logp.Logger, env v2.Context, tripper http.RoundTripper) (*otel.OTELCELMetrics, *otelhttp.Transport, error) {
resource := resource.NewWithAttributes(
semconv.SchemaURL, GetResourceAttributes(env, cfg)...,
)

log.Infof("created cel input resource %s", resource.String())
exporter, exporterType, err := otel.GetGlobalExporterFactory(log).GetExporter(ctx)
if err != nil {
log.Errorw("failed to get exporter", "error", err)
}
if err != nil {
log.Errorw("failed to get collection period", "error", err)
}
log.Infof("created OTEL cel input exporter %s for input %s", exporterType, env.IDWithoutName)
return otel.NewOTELCELMetrics(log, env.IDWithoutName, *resource, tripper, exporter)
}

func GetResourceAttributes(env v2.Context, cfg config) []attribute.KeyValue {
attrs := []attribute.KeyValue{semconv.ServiceInstanceID(env.IDWithoutName),
attribute.String("package.name", cfg.GetPackageData("name")),
attribute.String("package.version", cfg.GetPackageData("version")),
attribute.String("package.data_stream", cfg.DataStream),
attribute.String("agent.version", env.Agent.Version),
attribute.String("agent.id", env.Agent.ID.String())}

usedKeys := make(map[string]struct{})

for _, attr := range attrs {
// Access the Key field of the KeyValue struct
usedKeys[string(attr.Key)] = struct{}{}
}
attributesStr, ok := os.LookupEnv("OTEL_RESOURCE_ATTRIBUTES")
if ok && len(attributesStr) > 0 {
attributes := make([]attribute.KeyValue, 0)
pairs := strings.Split(attributesStr, ",")
for _, pair := range pairs {
kv := strings.SplitN(pair, "=", 2)
if len(kv) == 2 {
key := strings.TrimSpace(kv[0])
value := strings.TrimSpace(kv[1])
if key != "" {
// don't overwrite existing keys
_, used := usedKeys[key]
if !used {
attributes = append(attributes, attribute.String(key, value))
}
}
}
}
attrs = append(attrs, attributes...)
}

return attrs

}

func wantClient(cfg config) bool {
Expand Down
Loading
Loading