Skip to content

Conversation

@StacieClark-Elastic
Copy link
Member

@StacieClark-Elastic StacieClark-Elastic commented Oct 9, 2025

Proposed commit message

Added OTEL metrics to cel input to support collection of metrics per input periodic run in agentless environment.

Produces http and cel input metrics using the OTEL SDK and pushes the metrics to either a defined endpoint or the console at the end of each periodic run. No metrics are produced if no environment variables are set.
Produces a count for each defined metric for every periodic run. Each metric set is for a single periodic run.
Histograms are exported as Exponential Histograms.
If the environment variable OTEL_EXPORTER_OTLP_ENDPOINT is set, OTEL OTLP metrics will be exported after each periodic run using the to the endpoint defined in OTEL_EXPORTER_OTLP_ENDPOINT.

Each input has a unique resource attribute set. Any attributes set in the environment variable OTEL_RESOURCE_ATTRIBUTES are added to the input attribute set. Existing keys will not be overwritten

Checklist

  • My code follows the style guidelines of this project
  • I have commented my code, particularly in hard-to-understand areas
  • I have made corresponding changes to the documentation
  • I have made corresponding change to the default configuration files
  • I have added tests that prove my fix is effective or that my feature works
  • I have added an entry in CHANGELOG.next.asciidoc or CHANGELOG-developer.next.asciidoc.

Disruptive User Impact

The default is to produce no metrics.
The only place that I made changes that could possibly effect behavior outside of this change is: We are wrapping the http transport for http metrics. We have many nested transport wrappers. I do not expect the other transport wrappers
to be effected. But it's something to look out for.

Author's Checklist

How to test this PR locally

Reviewing this PR requires building beats, building elastic-agent, then running elastic-agent standalone against a cluster.
I used the serverless cluster on prod as this has a managed OTLP endpoint. You can also run this against a 9.3.0-SNAPSHOT using elastic-package. To use elastic-package you will need to enable the APM server. Create another profiel with apm enabled. Note, if you are MacOS, you will need to change the docker file to expose a different port than 8200 and rebuild elastic-package because MacOS uses that port for another service.

There are two ways to test this.

  • Replace the agentbeat in an elastic-agent distro with one built in beats.
  • Build elastic-agent so that it pulls from the beats repo. elastic-agent and beats need to be in the same directory
  1. To build agentbeat: checkout branch, cd ../beats/x-pack/agentbeat and run
    DEV=true SNAPSHOT=true PLATFORMS=darwin/arm64 mage build
    replace PLATFORMS with correct platform for builds on non MacOS machines.
    Not required unless you are overwriting the agentbeat in an existing elastic-agent installation.
    If so, overwrite the agentbeat at /data/elastic-agent-/components

  2. To build elastic-agent: cd into elastic-agent repo and Build elastic-agent
    DEV=true EXTERNAL=false SNAPSHOT=true PLATFORMS=darwin/arm64 PACKAGES=tar.gz mage -v package
    replace PLATFORMS with correct platform for builds on non MAC machines. Make sure that beats repo is in the same directory as the elastic-agent repo since the EXTERNAL=false will pull beats code from the co-located beats repo instead of from github.

  3. in elastic-agent repo
    cd ./build/distributions
    tar -xvzf <elastic-agent-.tar.gz>
    cd into untarred directory elastic-agent-
    .
    rm elastic-agent.yml (we will replace this before running the elastic-agent)

  4. The rest of the directions are for serverless. Create an observability serverless cluster

  5. Get environment variables for APM
    On Bottom left side click "Add Data"
    Choose "Application" from choices of "What do you want to monitor?"
    Choose "OpenTelemetry" from "Monitor your Application using:"
    Copy the 3 environment variables from section 2. Values from OTEL_RESOURCE_ATTRIBUTES are added to the resource object that each CEL input creates to identify itself. It's presence is required.
    In the OTEL_RESOURCE_ATTRIBUTES template, replace with elastic-agent, app-version with the version being used. You may choose to override deployment.environment.
    Each CEL input behaves like it's own application. All CEL applications require OTEL_EXPORTER_OTLP_ENDPOINT and OTEL_EXPORTER_OTLP_HEADERS values as well.

  6. Add an integration. I have a simple CEL integration package that requires no configuration if you want an easy one to use.
    a. On lower right chose "Install Elastic Agent"
    b. In the first paragraph in the next page click the link that says “standalone mode”
    c. This takes you to the configuration page for the integration.
    d. After filling out configuration, on lower right click“Save and Continue”
    e. ON configure Agent page: Create API Key
    f. Download policy
    e. Do not install agent. Leave page

  7. Copy the downloaded policy to elastic-agent.yml into ./build/distributions/elastic-agent--

  8. Start the agent in development mode. In ./build/distributions/elastic-agent*
    sudo OTEL_RESOURCE_ATTRIBUTES="<value>" OTEL_EXPORTER_OTLP_ENDPOINT="<value>" OTEL_EXPORTER_OTLP_HEADERS="<value>" ./elastic-agent run -e --develop &> output.txt

  9. Check for data in the cluster.
    On Left choose "Discover"
    in Date View, use dropdown to select 'metrics-'
    Filter by package and datastream name: package.datastream : "<package_name>.<datastream.name>"
    All the metrics for periodic run will have the same timestamp. For any timestamp there will be 19 metrics. 1 document per metrics, except for the http.cleint.
    metrics which may have multiple metrics.
    "input.cel.periodic.run",
    "input.cel.periodic.program.run.started",
    "input.cel.periodic.program.run.success",
    "input.cel.periodic.batch.received",
    "input.cel.periodic.batch.published",
    "input.cel.periodic.event.received",
    "input.cel.periodic.event.published",
    "input.cel.periodic.run.duration",
    "input.cel.periodic.cel.duration",
    "input.cel.periodic.event.publish.duration",
    "input.cel.program.batch.received",
    "input.cel.program.batch.published",
    "input.cel.program.event.received",
    "input.cel.program.event.published",
    "input.cel.program.run.duration",
    "input.cel.program.cel.duration",
    "input.cel.program.publish.duration",
    "http.client.request.body.size", (could be multiple documents for this depending upon integration used for test)
    "http.client.request.duration", (could be multiple documents for this depending upon integration used for test)
    Verify that metrics exist for each of these names.

    Look for metrics beginning with
    input.cel.periodic.* (cel processing metrics for each periodic run) (all are counters)
    input.cel.program.* (cel processing metrics for each program run. Most are histograms across all the program runs for the periodic run)
    CEL periodic and program metrics.input.cel.*
    To look at http metrics that are generated from the SDK

Other filtering options:
For instance if the id in the elastic-agent.yml is "- id: cel-cel_simple-d78ef7a8-0757-4606-902e-c6a7f9320013"
then you can filter by
resource.attributes.service.instance.id : "cel-cel_simple.fakedts-d78ef7a8-0757-4606-902e-c6a7f9320013"

Related issues

@botelastic botelastic bot added the needs_team Indicates that the issue/PR needs a Team:* label label Oct 9, 2025
@github-actions
Copy link
Contributor

github-actions bot commented Oct 9, 2025

🤖 GitHub comments

Expand to view the GitHub comments

Just comment with:

  • run docs-build : Re-trigger the docs validation. (use unformatted text in the comment!)

@mergify
Copy link
Contributor

mergify bot commented Oct 9, 2025

This pull request does not have a backport label.
If this is a bug or security fix, could you label this PR @StacieClark-Elastic? 🙏.
For such, you'll need to label your PR with:

  • The upcoming major version of the Elastic Stack
  • The upcoming minor version of the Elastic Stack (if you're not pushing a breaking change)

To fixup this pull request, you need to add the backport labels for the needed
branches, such as:

  • backport-8./d is the label to automatically backport to the 8./d branch. /d is the digit
  • backport-active-all is the label that automatically backports to all active branches.
  • backport-active-8 is the label that automatically backports to all active minor branches for the 8 major.
  • backport-active-9 is the label that automatically backports to all active minor branches for the 9 major.

@mergify
Copy link
Contributor

mergify bot commented Oct 12, 2025

This pull request is now in conflicts. Could you fix it? 🙏
To fixup this pull request, you can check out it locally. See documentation: https://help.github.com/articles/checking-out-pull-requests-locally/

git fetch upstream
git checkout -b Add-metrics-CEL-609 upstream/Add-metrics-CEL-609
git merge upstream/main
git push upstream Add-metrics-CEL-609

@narph narph added the Team:Security-Service Integrations Security Service Integrations Team label Nov 17, 2025
@botelastic botelastic bot removed the needs_team Indicates that the issue/PR needs a Team:* label label Nov 17, 2025
@mergify
Copy link
Contributor

mergify bot commented Nov 19, 2025

This pull request is now in conflicts. Could you fix it? 🙏
To fixup this pull request, you can check out it locally. See documentation: https://help.github.com/articles/checking-out-pull-requests-locally/

git fetch upstream
git checkout -b Add-metrics-CEL-609 upstream/Add-metrics-CEL-609
git merge upstream/main
git push upstream Add-metrics-CEL-609

@StacieClark-Elastic StacieClark-Elastic marked this pull request as ready for review November 26, 2025 22:15
@StacieClark-Elastic StacieClark-Elastic requested review from a team as code owners November 26, 2025 22:15
@elasticmachine
Copy link
Contributor

Pinging @elastic/security-service-integrations (Team:Security-Service Integrations)

@StacieClark-Elastic StacieClark-Elastic marked this pull request as draft November 26, 2025 23:42
… to a Sum metricv so it can be visualized in APM
Added a check for an environment variable 'APM_OTLP'. if set, all metric histograms will be exported as Sum (Counter) type. This is support sending metrics to both the APM OTLP endpoint and the managed OTLP endpoint
Histogram defaults to exponential type. Can be changed to use regular histograms by setting environment variable USE_NON_EXPONENTIAL_HISTOGRAMS.
Removed flush and shutdown functions due to the exporter being shared.
Shortened metric names. Removed option to export as plain histograms. Cleaned up README. Added a PNG of where metrics are collected
@StacieClark-Elastic StacieClark-Elastic marked this pull request as ready for review December 2, 2025 22:01
// record and log execution coverage.
RecordCoverage bool `config:"record_coverage"`

Package map[string]string `config:"package"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this needs godoc.

Comment on lines 60 to 70
v2 "github.com/elastic/beats/v7/filebeat/input/v2"
inputcursor "github.com/elastic/beats/v7/filebeat/input/v2/input-cursor"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/feature"
"github.com/elastic/beats/v7/libbeat/management/status"
"github.com/elastic/beats/v7/libbeat/statestore"
"github.com/elastic/beats/v7/libbeat/version"
"github.com/elastic/beats/v7/x-pack/filebeat/input/internal/httplog"
"github.com/elastic/beats/v7/x-pack/filebeat/input/internal/httpmon"
"github.com/elastic/beats/v7/x-pack/filebeat/otel"
"github.com/elastic/beats/v7/x-pack/libbeat/common/aws"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason that this was moved?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bad IDE settings. I fixed that.

Comment on lines 109 to 112
srcp, ok := src.(*source)
if !ok {
return fmt.Errorf("input type %T is not a source", src)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please don't do this. If this panics something has gone terribly wrong and the program should fail fatally. If absolutely necessary, you can add a //nolint:errcheck // If this assertion fails, the program is incorrect and should panic..

Comment on lines 124 to 128
srcP, ok := src.(*source)
if !ok {
return errors.New("inputcursor.Source is not a *source type")
}
dataStreamName := srcP.cfg.DataStream // May be empty.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here.

Comment on lines 329 to 330
otelMetrics.AddProgramExecution(ctx, 1)
metrics.executions.Add(1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a general pattern, it seems to me that if metrics held otelMetrics and maybe the context, the metrics notes could be rolled into methods on metrics that do both the current metrics publication and the OTel metrics publication. This would either be with an explicit context being passed in, or with the held context. I don't really mind which.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm assuming that metrics are going to be deprecated.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is an even better argument for doing it that way then since it reduces diff churn in the body of the CEL input an places it in the metrics code.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The OTEL metrics are local to the input. The exports happen locally. The design of the beats-wide metrics is global. the metrics are pulled for the entire beats executable. It makes sense to have that go back to a global registry. This design is per input. We made a conscious decision to keep the design on a per input basis.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm afraid I don't see how that impacts on what I'm suggesting. I'm only suggesting that the calling be made less invasive here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have some other OTEL based things that we need to get in as part of this work. I want to see where they hook in before moving anything into another part of the system.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not suggesting moving it into another package, just into a helper in this package.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should consolidate the metrics calls now. The existing metrics exposed via the HTTP monitoring endpoint are effectively a public interface and users depend on them. They are not going away. So we will have both systems indefinitely.

My concern with keeping them separate is that it will never get cleaned up later. I have seen this pattern many times where the intent is to consolidate later, but that work never gets prioritized.

Dan's suggestion to have the metrics struct hold the otelMetrics and provide methods that update both sounds ideal to me. It will help the two from drifting apart (at least unintentionally). Refactoring should be straightforward now while everything is fresh. It only gets harder with time as more code builds on the current structure.

Copy link
Member Author

@StacieClark-Elastic StacieClark-Elastic Dec 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The way I'd like to do this is the keep the systems separate but wrap them in another struct that has references to each system and makes the calls out to each system:

type CELMetricsRecorder struct {
inputMetrics *inputMetrics
otelMetrics *otel.OTELCELMetrics
}

I would also consider making an interface and changing the function calls in otel.OTELCELMetrics to match what is called in inputMetrics. I don't know if that is worth the effort. One incompatibility is OTELCELMetrics stores seconds as this is the standard for OTEL. inputMetrics stores nanoseconds.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

d.Seconds()gets you the value if you pass in a d time.Duration.

Comment on lines 1009 to 1046
func GetResourceAttributes(env v2.Context, cfg config) []attribute.KeyValue {
attrs := []attribute.KeyValue{semconv.ServiceInstanceID(env.IDWithoutName),
attribute.String("package.name", cfg.GetPackageData("name")),
attribute.String("package.version", cfg.GetPackageData("version")),
attribute.String("package.data_stream", cfg.DataStream),
attribute.String("agent.version", env.Agent.Version),
attribute.String("agent.id", env.Agent.ID.String())}

usedKeys := make(map[string]struct{})

for _, attr := range attrs {
// Access the Key field of the KeyValue struct
usedKeys[string(attr.Key)] = struct{}{}
}
attributesStr, ok := os.LookupEnv("OTEL_RESOURCE_ATTRIBUTES")
if ok && len(attributesStr) > 0 {
attributes := make([]attribute.KeyValue, 0)
pairs := strings.Split(attributesStr, ",")
for _, pair := range pairs {
kv := strings.SplitN(pair, "=", 2)
if len(kv) == 2 {
key := strings.TrimSpace(kv[0])
value := strings.TrimSpace(kv[1])
if key != "" {
// don't overwrite existing keys
_, used := usedKeys[key]
if !used {
attributes = append(attributes, attribute.String(key, value))
}
}
}
}
attrs = append(attrs, attributes...)
}

return attrs

}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
func GetResourceAttributes(env v2.Context, cfg config) []attribute.KeyValue {
attrs := []attribute.KeyValue{semconv.ServiceInstanceID(env.IDWithoutName),
attribute.String("package.name", cfg.GetPackageData("name")),
attribute.String("package.version", cfg.GetPackageData("version")),
attribute.String("package.data_stream", cfg.DataStream),
attribute.String("agent.version", env.Agent.Version),
attribute.String("agent.id", env.Agent.ID.String())}
usedKeys := make(map[string]struct{})
for _, attr := range attrs {
// Access the Key field of the KeyValue struct
usedKeys[string(attr.Key)] = struct{}{}
}
attributesStr, ok := os.LookupEnv("OTEL_RESOURCE_ATTRIBUTES")
if ok && len(attributesStr) > 0 {
attributes := make([]attribute.KeyValue, 0)
pairs := strings.Split(attributesStr, ",")
for _, pair := range pairs {
kv := strings.SplitN(pair, "=", 2)
if len(kv) == 2 {
key := strings.TrimSpace(kv[0])
value := strings.TrimSpace(kv[1])
if key != "" {
// don't overwrite existing keys
_, used := usedKeys[key]
if !used {
attributes = append(attributes, attribute.String(key, value))
}
}
}
}
attrs = append(attrs, attributes...)
}
return attrs
}
func getResourceAttributes(env v2.Context, cfg config) []attribute.KeyValue {
attrs := []attribute.KeyValue{
semconv.ServiceInstanceID(env.IDWithoutName),
attribute.String("package.name", cfg.GetPackageData("name")),
attribute.String("package.version", cfg.GetPackageData("version")),
attribute.String("package.data_stream", cfg.DataStream),
attribute.String("agent.version", env.Agent.Version),
attribute.String("agent.id", env.Agent.ID.String()),
}
attributes := os.Getenv("OTEL_RESOURCE_ATTRIBUTES")
if attributes == "" {
return attrs
}
seen := make(map[attribute.Key]bool)
for _, attr := range attrs {
seen[attr.Key] = true
}
pairs := strings.Split(attributes, ",")
for _, pair := range pairs {
key, val, ok := strings.Cut(pair, "=")
if !ok || key == "" || seen[attribute.Key(key)] {
continue
}
attrs = append(attrs, attribute.String(key, val))
}
return attrs
}

Comment on lines 988 to 1005
resource := resource.NewWithAttributes(
semconv.SchemaURL, GetResourceAttributes(env, cfg)...,
)

log.Infof("created cel input resource %s", resource.String())
exporter, exporterType, err := otel.GetGlobalExporterFactory(log).GetExporter(ctx)
if err != nil {
log.Errorw("failed to get exporter", "error", err)
}
if err != nil {
log.Errorw("failed to get collection period", "error", err)
}
log.Infof("created OTEL cel input exporter %s for input %s", exporterType, env.IDWithoutName)
otelMetrics, otelTransport, err := otel.NewOTELCELMetrics(log, env.IDWithoutName, *resource, c.Transport, exporter)
if err != nil {
return nil, nil, nil, err
}
c.Transport = otelTransport
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be done in a helper outside newClient on its return? Then we don't need to return the otelMetrics value here.

func addOtelMetrics(ctx context.Context, cli *http.Client, cfg config, env v2.Context, log *logp.Logger) (*otel.OTELCELMetrics, error) {
	resource := resource.NewWithAttributes(semconv.SchemaURL, getResourceAttributes(env, cfg)...)

	log.Infow("created cel input resource", "resource", resource)
	exporter, typ, err := otel.GetGlobalExporterFactory(log).GetExporter(ctx)
	if err != nil {
		log.Errorw("failed to get exporter", "error", err)
	}
	if err != nil {
		log.Errorw("failed to get collection period", "error", err)
	}
	log.Infow("created OTEL cel input exporter", "type", typ, "id", env.IDWithoutName)
	metrics, transport, err := otel.NewOTELCELMetrics(log, env.IDWithoutName, *resource, cli.Transport, exporter)
	if err != nil {
		return nil, err
	}
	cli.Transport = transport
	return metrics, nil
}

with the call site looking like

	client, trace, err := newClient(ctx, cfg, log, reg)
	if err != nil {
		return err
	}
	otelMetrics, err := addOtelMetrics(ctx, client, cfg, env, log)
	if err != nil {
		return err
	}

This may not be completely possible; the helper is, but where it happens may need to still be in newClient. The existing metrics is added to the round-tripper chain before the retries, but my suggestion and the existing proposal add the OTel metrics after the retries. This means they are measuring different things.

Copy link
Member Author

@StacieClark-Elastic StacieClark-Elastic Dec 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wrapping the transport before the retryable client wrapper is better design so we get accurate metrics. that means that we can't separate the create of OTEL metrics out the newClient

Comment on lines 56 to 58
o.exportLock.Lock() // Acquire the lock
defer o.exportLock.Unlock()
o.started = true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
o.exportLock.Lock() // Acquire the lock
defer o.exportLock.Unlock()
o.started = true
o.exportLock.Lock()
o.started = true
o.exportLock.Unlock()

programCelDurationHistogram: programCELDuration,
programEventPublishDurationHistogram: programPublishDuration,
}, transport, nil

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change

Comment on lines 28 to 29
exportLock sync.Mutex
started bool
Copy link
Contributor

@efd6 efd6 Dec 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The mutex is protecting only the boolean, correct? If that's the case, the mutex can go away and we can use an atomic.Bool. Like

diff --git a/x-pack/filebeat/otel/cel_metrics.go b/x-pack/filebeat/otel/cel_metrics.go
index dbb5b66847..f23abcff61 100644
--- a/x-pack/filebeat/otel/cel_metrics.go
+++ b/x-pack/filebeat/otel/cel_metrics.go
@@ -9,7 +9,7 @@ import (
        "encoding/json"
        "fmt"
        "net/http"
-       "sync"
+       "sync/atomic"
        "time"
 
        "github.com/elastic/elastic-agent-libs/logp"
@@ -25,8 +25,7 @@ import (
 type OTELCELMetrics struct {
        log                                  *logp.Logger
        manualExportFunc                     func(context.Context) error
-       exportLock                           sync.Mutex
-       started                              bool
+       started                              atomic.Bool
        periodicRunCount                     metric.Int64Counter
        periodicBatchGeneratedCount          metric.Int64Counter
        periodicBatchPublishedCount          metric.Int64Counter
@@ -53,27 +52,24 @@ type OTELCELMetrics struct {
 // running periodic runs. However, test environments with
 // small intervals could potentially cause this to happen.
 func (o *OTELCELMetrics) StartPeriodic() {
-       o.exportLock.Lock() // Acquire the lock
-       defer o.exportLock.Unlock()
-       o.started = true
+       o.started.Store(true)
 }
 
 // EndPeriodic ends the periodic metrics collection and manually exports metrics if a manual export function is set.
 func (o *OTELCELMetrics) EndPeriodic(ctx context.Context) {
-       o.exportLock.Lock() // Acquire the lock
-       defer o.exportLock.Unlock()
-       if o.started {
-               o.log.Debug("OTELCELMetrics EndPeriodic called")
-               o.started = false
-               if o.manualExportFunc != nil {
-                       o.log.Debug("OTELCELMetrics manual export started")
-                       err := o.manualExportFunc(ctx)
-                       if err != nil {
-                               o.log.Errorf("error exporting metrics: %v", err)
-                       }
-                       o.log.Debug("OTELCELMetrics manual export ended")
-               }
+       o.log.Debug("OTELCELMetrics EndPeriodic called")
+       if o.manualExportFunc == nil {
+               return
+       }
+       if !o.started.CompareAndSwap(true, false) {
+               return
+       }
+       o.log.Debug("OTELCELMetrics manual export started")
+       err := o.manualExportFunc(ctx)
+       if err != nil {
+               o.log.Errorf("error exporting metrics: %v", err)
        }
+       o.log.Debug("OTELCELMetrics manual export ended")
 }
 
 func (o *OTELCELMetrics) AddPeriodicRun(ctx context.Context, count int64) {

(note that the manualExportFunc field is never written to after construction, so it does not need protection and it can be used as a non-fenced check before the atomic operation — even if the atomic.Bool approach cannot be used, this movement can be done to reduce the locking costs in the case that there is no export function)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm forcing serialization of access to the manualExportFunc due to the collect() function not being concurrent. The exporter can be run concurrently, but collect() cannot.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that the o.manualExportFunc is constructed by NewOTELCELMetrics, it could close over a mutex and do the locking itself.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see what you are saying. However, the need to synchronize start is not a separate concern from synchronizing manualExportFunc. Changes to start needs to be blocked during the o.manualExportFunc because the collect() call is not concurrency safe. Starting a new set of metrics recording needs to be blocked until the current metrics are collected.

There are 2 parts to export. The first is the collection of metrics. This is the part that needs to be serialized. The second is the actual export of that collection. The export() call is in a go routine and async. And the export() call is concurrent safe.

It's highly unlikely that this will ever happen to be begin with due to the temporal aspects of interval processing. But it's not impossible during testing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I'm suggesting is that instead of

lock
call manualExportFunc
  do work in manualExportFunc
unlock

we have

call manualExportFunc
  lock
  do work in manualExportFunc
  unlock

These are functionally identical, but allow the client code to not have to consider concurrency safety and allow checking whether manualExportFunc is nil before trying to take the lock pointlessly.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

started and manualExportFunc need to be locked at the same time. If that was not the case, I would just lock access to the collect() function. The other option is pass the started bool to the manualExportFunc and handle checking and locking of the started bool there. Which would be fine.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Under the model I'm suggesting I don't think that is the case.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are 2 things that have to be controlled here:

  1. Started cannot be set to true while the collect() function is being called. This blocks the metrics being added to as part of another period before they can be collected.
  2. collect() must be synchronized.
    If I set Start to false in an atomic, then call manualExportFunc, I don't see anything in your design What is the stop StartPeriodic from succeeding and start being to set to true and the metrics being modified while manualExportFunc is running?

Also what we have now is
Lock. ->blocks changing Start to True. Synchronizes access to ManualExportFunc
Set start to false
run ManualExportFunc.
Unlock.

The locking of start, is a requirement of the logic at the OTELCelMetrics level. i.e. Start must be locked until metrics are exported, regardless of whatever is defined as the manual export function. The locking of the collect() function is a concern of the ManualExportFunc. The manualExportFuncand the function was overwritten, and the function now did not handle locking start correctly we would have a bug on OTELCelMetrics caused by a function that was defined elsewhere.

The manualExportFunc is there because as originally written it supported multiple types of readers which required different export functions. I took out the multiple types of reader and left just the one but left the overall design. If I was only going to support 1 type of reader for now and forever, I would have written export as a function on the OTELCelMetrics struct rather than a functor. A member function allows the function to mutate fields in OTELCelMetrics .

@pierrehilbert pierrehilbert added the Team:Elastic-Agent-Data-Plane Label for the Agent Data Plane team label Dec 3, 2025
@elasticmachine
Copy link
Contributor

Pinging @elastic/elastic-agent-data-plane (Team:Elastic-Agent-Data-Plane)

Comment on lines 329 to 330
otelMetrics.AddProgramExecution(ctx, 1)
metrics.executions.Add(1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should consolidate the metrics calls now. The existing metrics exposed via the HTTP monitoring endpoint are effectively a public interface and users depend on them. They are not going away. So we will have both systems indefinitely.

My concern with keeping them separate is that it will never get cleaned up later. I have seen this pattern many times where the intent is to consolidate later, but that work never gets prioritized.

Dan's suggestion to have the metrics struct hold the otelMetrics and provide methods that update both sounds ideal to me. It will help the two from drifting apart (at least unintentionally). Refactoring should be straightforward now while everything is fresh. It only gets harder with time as more code builds on the current structure.

otelMetrics.AddTotalDuration(ctx, time.Since(start))
}()
state, err = evalWith(ctx, prg, ast, state, start, wantDump, budget-1)
otelMetrics.AddCELDuration(ctx, time.Since(start))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I calculate CEL duration right after the CEL program finishes. The current metrics adds the time after processing errors and potentially dumping them. Including writing to output hide the correct CEL data. That adds time to the CEL metric that is not actually about the CEL program. What is the intent here? They are going to have to be in the same place if we merge both types of metrics being processed in a single call.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the existing approach is incorrect we can change it.


func (input) Test(src inputcursor.Source, _ v2.TestContext) error {
cfg := src.(*source).cfg
cfg := src.(*source).cfg //nolint:errcheck
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be something like //nolint:errcheck // If this assertion fails, the program is incorrect and should panic.; golangci-lint requires an explanatory note and the syntax for that is defined, requiring a // between the linter directive and the reason. Also below.

"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/monitoring"

"github.com/stretchr/testify/assert"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please don't use testify in the cel package for the reasons outlined in this.

return out, reg
}

type OTELCELMetrics struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this need to be exported yet? Also, its methods. Is this going to be used elsewhere? If not, make it all unexported.

Comment on lines 244 to 248
func NewOTELCELMetrics(log *logp.Logger,
resource resource.Resource,
tripper http.RoundTripper,
metricExporter sdkmetric.Exporter,
) (*OTELCELMetrics, *otelhttp.Transport, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
func NewOTELCELMetrics(log *logp.Logger,
resource resource.Resource,
tripper http.RoundTripper,
metricExporter sdkmetric.Exporter,
) (*OTELCELMetrics, *otelhttp.Transport, error) {
func newOTELCELMetrics(log *logp.Logger, res resource.Resource, rt http.RoundTripper, exp sdkmetric.Exporter) (*OTELCELMetrics, *otelhttp.Transport, error) {

}, transport, nil
}

type MetricsRecorder struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this need to be exported?

otelMetrics *OTELCELMetrics
}

func NewMetricsRecorder(inputMetrics *inputMetrics, otelMetrics *OTELCELMetrics) (*MetricsRecorder, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
func NewMetricsRecorder(inputMetrics *inputMetrics, otelMetrics *OTELCELMetrics) (*MetricsRecorder, error) {
func newMetricsRecorder(inputMetrics *inputMetrics, otelMetrics *OTELCELMetrics) (*metricsRecorder, error) {

return c, trace, otelMetrics, nil
}

func CreateOTELMetrics(ctx context.Context, cfg config, log *logp.Logger, env v2.Context, tripper http.RoundTripper) (*OTELCELMetrics, *otelhttp.Transport, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
func CreateOTELMetrics(ctx context.Context, cfg config, log *logp.Logger, env v2.Context, tripper http.RoundTripper) (*OTELCELMetrics, *otelhttp.Transport, error) {
func createOTELMetrics(ctx context.Context, cfg config, log *logp.Logger, env v2.Context, tripper http.RoundTripper) (*OTELCELMetrics, *otelhttp.Transport, error) {

}

seen := make(map[attribute.Key]bool)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change

possibly also in the same stanza as the pairs range since this is entirely related to the use there.

Comment on lines 21 to 24
type ConcurrentEncoder struct {
encoder stdoutmetric.Encoder
lock sync.Mutex
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
type ConcurrentEncoder struct {
encoder stdoutmetric.Encoder
lock sync.Mutex
}
type concurrentEncoder struct {
mu sync.Mutex
encoder stdoutmetric.Encoder
}

convention is to place the lock above the fields it protects.


// NewConcurentEncoder creates a ConcurrentEncoder that wraps that
// stdoutmetric.Encoder that is passed in.
func NewConcurentEncoder(encoder stdoutmetric.Encoder) *ConcurrentEncoder {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need a constructor function?

"github.com/elastic/beats/v7/libbeat/common/reload"
)

var celInputPrefix = "cel-"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
var celInputPrefix = "cel-"
const celInputPrefix = "cel-"

// ctx context.Context: A context object created using the Go standard library.
// global bool: if true, returns the global Exporter. If false, it generates a new Exporter.
//

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change

// global exporters.
func init() {
exporterFactory = NewMetricsExporterFactory(GetDefaultMetricExporterOptions())
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd rather use a sync.Once in GetGlobalMetricsExporterFactory instead of an init function. We had lots of issues bitting us because of init functions.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this init() needs removed. There is already code on line 99 that will initialize exporterFactory lazily.

…ead of init() for global ExporterFactory initialization
// - OTEL_EXPORTER_OTLP_ENDPOINT: Required. The OTLP endpoint URL.
// - OTEL_EXPORTER_OTLP_HEADERS: Required if endpoint is authenticated.
//
// To export using the httt/protobuf protocol add this environment variable with the above environment variables:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There’s still a typo in the doc comment: httt/protobuf -> http/protobuf.

// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.
//
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Package comment should be of the form 'Package otel ...'

There should be only one package level comment in the package.

There should be newline break between the copyright header and the package comment.

This reads like it is comment that belongs entirely on MetricsExporterFactory.

Bullets should be indented by two spaces to get proper rendering in godoc (look at go tool doc -http . in the browser).

// global exporters.
func init() {
exporterFactory = NewMetricsExporterFactory(GetDefaultMetricExporterOptions())
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this init() needs removed. There is already code on line 99 that will initialize exporterFactory lazily.

httpOptions []otlpmetrichttp.Option
}

// GetDefaultMetricExporterOptions() returns the default set of MetricExporterOptions
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// GetDefaultMetricExporterOptions() returns the default set of MetricExporterOptions
// GetDefaultMetricExporterOptions returns the default set of [MetricExporterOptions].


// GetExporter returns an Exporter
// ctx context.Context: A context object created using the Go standard library.
// global bool: if true, returns the global Exporter. If false, it generates a new Exporter.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This still needs some updates to remove the previous "Args:" style.

// GetExporter returns a metrics exporter based on the current environment
// configuration.
//
// If global is true, GetExporter returns the cached global exporter, creating it
// on first use. If global is false, GetExporter always creates and returns a new
// exporter.

if err != nil {
t.Fatalf("failed to make request: %v", err)
}
defer resp.Body.Close()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There’s a defer resp.Body.Close() inside the loop. It’s only 5 iterations so it’s not a huge deal, but please close per-iteration instead of deferring in the loop.


"go.opentelemetry.io/otel/exporters/stdout/stdoutmetric"
"go.opentelemetry.io/otel/sdk/resource"
semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test uses semconv/v1.4.0 while the input.go code uses semconv/v1.17.0. Can we align these so we’re not mixing versions?

// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.
//
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
//

Copyright headers should be separated by a newline.

Comment on lines +5 to +71
// Resource attributes for Open Telemetry CEL Input Metrics
// Each CEL input has an associated Open Telemetry Resource associated with it. The Resource Attribute Set identifies a
// unique metric set in Elastic. Changing the Resource Attribute Set for input will identify the input's metrics as
// different metric set.
// These Resource Attributes are included for every CEL input instance:
// +--------------------+------------------------------------------------+
// | name | description |
// +--------------------+------------------------------------------------+
// |agent.version | version of agent |
// |agent.id | the id of the agent |
// |service.instance.id | id of the cel input instance |
// |package.name | name of the integration package |
// |package.version | version of the integration package |
// |package.data_stream | the datastream name in the integration package |
// +--------------------+------------------------------------------------+

// Resource Attributes that are defined in an OTEL_RESOURCE_ATTRIBUTES environment variable will be added to the CEL
// input instance Resource Attributes set. An example of an OTEL_RESOURCE_ATTRIBUTES:
// service.name=elastic-agent,service.version=9.1.2,deployment.environment=production
//
// These attributes are expected in the OTEL_RESOURCE_ATTRIBUTES but not required:
// +------------------------+-------------------------------------------------------+
// | name | description |
// +------------------------+-------------------------------------------------------+
// | service.name, | service that is running the program, ex. elastic-agent|
// | deployment.environment | deployment environment, ex. production |
// +------------------------+-------------------------------------------------------+
//
// See [otel.ExportFactory] for environment settings to run console or http/protobuf output.
// See [otelCELMetrics] for the complete list of exported metrics.

// Open Telemetry Metrics for CEL Input
//
// CEL Metrics are sent as Delta metrics for each Periodic Run (Interval).
// Export occurs as a push to an OTLP endpoint at the end of the Periodic Run. Metrics are reset between Periodic Runs.
// +-------------------------------------------+----------------------------------------------------------------------------------------+------------------+
// | name | description | metric type |
// |-------------------------------------------|----------------------------------------------------------------------------------------|------------------|
// These metrics are generated by the OTEL SDK through wrapping the transport and are scoped in the OTEL metrics as
// ' go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp'.
// A metric is exported for each unique set of attributes:
// - http.request.method
// - http.response.status_code
// - network.protocol.name
// - network.protocol.version
// - server.address
// - server.port
// - url.scheme
//
// +-------------------------------+----------------------------------------------+------------------+
// | name | description | metric type |
// |-------------------------------|----------------------------------------------|------------------|
// | http.client.request.duration | The duration in seconds for an http request | Float64Histogram |
// | http.client.request.body.size | The size of the request in bytes | Int64Histogram |
// +------------------------------+-----------------------------------------------+------------------+
//
// See cel_metric_collection.png for a diagram of when each OTEL CEL metric is collected.
//
// inputMetrics
//
// inputMetrics are part of the agent monitoring framework. They are cumulative metrics for each input for the entire
// run of a CEL input. They are packaged together with other agent data for monitoring.
// Durations are collected as histograms and are in nanoseconds.
// Cumulative HTTP metrics are collected through a wrapper on the HTTP transport.
// See https://www.elastic.co/guide/en/beats/filebeat/current/http-endpoint.html
// and https://www.elastic.co/docs/reference/fleet/monitor-elastic-agent for details
// on agent monitoring.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This big comment block at the top of cel_metrics.go won’t show up in godoc because it’s not the package comment (and we can only have one package-level comment; currently that’s in input.go).

The most appropriate place for this content is a doc.go file in x-pack/filebeat/input/cel with a // Package cel ... comment. Move/merge the existing package comment from input.go plus the OTEL/resource attributes/metrics documentation into a package level doc.go comment.

}
}
go func(ctx context.Context, log *logp.Logger, metricExporter sdkmetric.Exporter, collectedMetrics *metricdata.ResourceMetrics) {
err := metricExporter.Export(ctx, collectedMetrics)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is asynchronously using the context passed to EndPeriodic. If that context is canceled (shutdown/timeout), the export can be dropped. If the intent is “best effort”, can you add a comment, otherwise consider using a detached context with a short timeout for the export.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement Team:Elastic-Agent-Data-Plane Label for the Agent Data Plane team Team:Security-Service Integrations Security Service Integrations Team

Projects

None yet

Development

Successfully merging this pull request may close these issues.

8 participants