@@ -23,6 +23,11 @@ import (
2323
2424 koanfmaps "github.com/knadh/koanf/maps"
2525
26+ "github.com/elastic/elastic-agent/internal/pkg/agent/application/monitoring/monitoringhelpers"
27+
28+ "github.com/elastic/elastic-agent-libs/logp"
29+ "github.com/elastic/elastic-agent/internal/pkg/otel/translate"
30+
2631 "github.com/elastic/elastic-agent/pkg/component"
2732 "github.com/elastic/elastic-agent/pkg/utils"
2833
@@ -71,8 +76,9 @@ const (
7176 fileBeatName = "filebeat"
7277 collectorName = "collector"
7378
74- monitoringMetricsUnitID = "metrics-monitoring"
75- monitoringFilesUnitsID = "filestream-monitoring"
79+ monitoringMetricsUnitID = "metrics-monitoring"
80+ monitoringFilesUnitsID = "filestream-monitoring"
81+ prometheusMonitoringComponentId = "prometheus/" + monitoringMetricsUnitID
7682
7783 windowsOS = "windows"
7884
@@ -228,15 +234,17 @@ func (b *BeatsMonitor) MonitoringConfig(
228234 }
229235 }
230236
231- componentInfos := b .getComponentInfos (components , componentIDPidMap )
232-
233- if err := b .injectMonitoringOutput (policy , cfg , monitoringOutputName ); err != nil && ! errors .Is (err , errNoOutputPresent ) {
237+ outputCfg , err := b .injectMonitoringOutput (policy , cfg , monitoringOutputName )
238+ if err != nil && ! errors .Is (err , errNoOutputPresent ) {
234239 return nil , errors .New (err , "failed to inject monitoring output" )
235240 } else if errors .Is (err , errNoOutputPresent ) {
236241 // nothing to inject, no monitoring output
237242 return nil , nil
238243 }
239244
245+ outputOtelSupported := isOutputOtelSupported (outputCfg )
246+ componentInfos := b .getComponentInfos (components , outputOtelSupported , componentIDPidMap )
247+
240248 // initializes inputs collection so injectors don't have to deal with it
241249 b .initInputs (cfg )
242250
@@ -287,7 +295,7 @@ func (b *BeatsMonitor) ComponentMonitoringConfig(unitID, binary string) map[stri
287295 }
288296
289297 configMap := make (map [string ]any )
290- endpoint := BeatsMonitoringEndpoint (unitID )
298+ endpoint := monitoringhelpers . BeatsMonitoringEndpoint (unitID )
291299 if endpoint != "" {
292300 httpConfigMap := map [string ]any {
293301 "enabled" : true ,
@@ -393,20 +401,27 @@ func (b *BeatsMonitor) initInputs(cfg map[string]interface{}) {
393401 cfg [inputsKey ] = inputsCollection
394402}
395403
396- func (b * BeatsMonitor ) injectMonitoringOutput (source , dest map [string ]interface {}, monitoringOutputName string ) error {
404+ // injectMonitoringOutput injects the monitoring output into the configuration. It takes an existing output named
405+ // `monitoringOutputName` and makes a copy of it named `monitoring`. It returns the output configuration.
406+ func (b * BeatsMonitor ) injectMonitoringOutput (source , dest map [string ]interface {}, monitoringOutputName string ) (map [string ]any , error ) {
397407 outputsNode , found := source [outputsKey ]
398408 if ! found {
399- return errNoOutputPresent
409+ return nil , errNoOutputPresent
400410 }
401411
402412 outputs , ok := outputsNode .(map [string ]interface {})
403413 if ! ok {
404- return fmt .Errorf ("outputs not a map" )
414+ return nil , fmt .Errorf ("outputs not a map" )
405415 }
406416
407417 outputNode , found := outputs [monitoringOutputName ]
408418 if ! found {
409- return fmt .Errorf ("output %q used for monitoring not found" , monitoringOutputName )
419+ return nil , fmt .Errorf ("output %q used for monitoring not found" , monitoringOutputName )
420+ }
421+
422+ outputMap , ok := outputNode .(map [string ]any )
423+ if ! ok {
424+ return nil , fmt .Errorf ("output %q used for monitoring not a map" , monitoringOutputName )
410425 }
411426
412427 monitoringOutputs := map [string ]interface {}{
@@ -415,13 +430,17 @@ func (b *BeatsMonitor) injectMonitoringOutput(source, dest map[string]interface{
415430
416431 dest [outputsKey ] = monitoringOutputs
417432
418- return nil
433+ return outputMap , nil
419434}
420435
421436// getComponentInfos returns a slice of componentInfo structs based on the provided components. This slice contains
422437// all the information needed to generate the monitoring configuration for these components, as well as configuration
423438// for new components which are going to be doing the monitoring.
424- func (b * BeatsMonitor ) getComponentInfos (components []component.Component , componentIDPidMap map [string ]uint64 ) []componentInfo {
439+ func (b * BeatsMonitor ) getComponentInfos (components []component.Component , outputOtelSupported bool , componentIDPidMap map [string ]uint64 ) []componentInfo {
440+ monitoringRuntime := component .RuntimeManager (b .config .C .RuntimeManager )
441+ if ! outputOtelSupported {
442+ monitoringRuntime = monitoringCfg .ProcessRuntimeManager
443+ }
425444 componentInfos := make ([]componentInfo , 0 , len (components ))
426445 for _ , comp := range components {
427446 compInfo := componentInfo {
@@ -440,29 +459,29 @@ func (b *BeatsMonitor) getComponentInfos(components []component.Component, compo
440459 componentInfo {
441460 ID : fmt .Sprintf ("beat/%s" , monitoringMetricsUnitID ),
442461 BinaryName : metricBeatName ,
443- RuntimeManager : component . RuntimeManager ( b . config . C . RuntimeManager ) ,
462+ RuntimeManager : monitoringRuntime ,
444463 },
445464 componentInfo {
446465 ID : fmt .Sprintf ("http/%s" , monitoringMetricsUnitID ),
447466 BinaryName : metricBeatName ,
448- RuntimeManager : component . RuntimeManager ( b . config . C . RuntimeManager ) ,
467+ RuntimeManager : monitoringRuntime ,
449468 })
450469 }
451470 if b .config .C .MonitorLogs {
452471 componentInfos = append (componentInfos , componentInfo {
453472 ID : monitoringFilesUnitsID ,
454473 BinaryName : fileBeatName ,
455- RuntimeManager : component . RuntimeManager ( b . config . C . RuntimeManager ) ,
474+ RuntimeManager : monitoringRuntime ,
456475 })
457476 }
458- // If any other component uses the Otel runtime, also add a component to monitor
459- // its telemetry .
460- if b .config .C .MonitorMetrics && usingOtelRuntime (componentInfos ) {
477+ // If any other component uses the Otel runtime, also add a component to monitor its telemetry.
478+ // This component only works in the Otel runtime, so we can't add it if the output doesn't support it .
479+ if b .config .C .MonitorMetrics && usingOtelRuntime (componentInfos ) && outputOtelSupported {
461480 componentInfos = append (componentInfos ,
462481 componentInfo {
463- ID : fmt . Sprintf ( "prometheus/%s" , monitoringMetricsUnitID ) ,
482+ ID : prometheusMonitoringComponentId ,
464483 BinaryName : metricBeatName ,
465- RuntimeManager : component .RuntimeManager ( b . config . C . RuntimeManager ) ,
484+ RuntimeManager : component .OtelRuntimeManager ,
466485 })
467486 }
468487 // sort the components to ensure a consistent order of inputs in the configuration
@@ -564,7 +583,12 @@ func (b *BeatsMonitor) injectMetricsInput(
564583 },
565584 }
566585
567- if usingOtelRuntime (componentInfos ) {
586+ // We only add this stream if the Otel manager is enabled and the respective component info exists. This is a
587+ // special case where this input shouldn't exists if the output doesn't support otel, which we check while
588+ // creating the component infos.
589+ if usingOtelRuntime (componentInfos ) && slices .ContainsFunc (componentInfos , func (ci componentInfo ) bool {
590+ return ci .ID == prometheusMonitoringComponentId
591+ }) {
568592 prometheusStream := b .getPrometheusStream (failureThreshold , metricsCollectionIntervalString )
569593 inputs = append (inputs , map [string ]interface {}{
570594 idKey : fmt .Sprintf ("%s-collector" , monitoringMetricsUnitID ),
@@ -750,7 +774,7 @@ func (b *BeatsMonitor) getHttpStreams(
750774 continue
751775 }
752776
753- endpoints := []interface {}{PrefixedEndpoint (BeatsMonitoringEndpoint (compInfo .ID ))}
777+ endpoints := []interface {}{PrefixedEndpoint (monitoringhelpers . BeatsMonitoringEndpoint (compInfo .ID ))}
754778 name := sanitizeName (binaryName )
755779
756780 // Do not create http streams if runtime-manager is otel and binary is of beat type
@@ -1440,10 +1464,6 @@ func AgentMonitoringEndpoint(cfg *monitoringCfg.MonitoringConfig) string {
14401464 return fmt .Sprintf (`unix:///tmp/elastic-agent/%x.sock` , sha256 .Sum256 ([]byte (path )))
14411465}
14421466
1443- func BeatsMonitoringEndpoint (componentID string ) string {
1444- return utils .SocketURLWithFallback (componentID , paths .TempDir ())
1445- }
1446-
14471467func httpCopyRules () []interface {} {
14481468 fromToMap := []interface {}{
14491469 // I should be able to see the CPU Usage on the running machine. Am using too much CPU?
@@ -1504,6 +1524,16 @@ func isSupportedBeatsBinary(binaryName string) bool {
15041524 return false
15051525}
15061526
1527+ func isOutputOtelSupported (outputCfg map [string ]any ) bool {
1528+ parsed , err := component .ParseOutput (monitoringOutput , outputCfg , logp .InfoLevel , nil )
1529+ if err != nil {
1530+ return false
1531+ }
1532+
1533+ err = translate .VerifyOutputIsOtelSupported (parsed .OutputType , outputCfg )
1534+ return err == nil
1535+ }
1536+
15071537func monitoringDrop (path string ) (drop string ) {
15081538 defer func () {
15091539 if drop != "" {
0 commit comments