99 _ "embed"
1010 "fmt"
1111 "maps"
12- "math"
1312 "net"
1413 "net/url"
1514 "os"
@@ -18,7 +17,6 @@ import (
1817 "slices"
1918 "strconv"
2019 "strings"
21- "time"
2220 "unicode"
2321
2422 koanfmaps "github.com/knadh/koanf/maps"
@@ -59,15 +57,11 @@ const (
5957 fileSchemePrefix = "file"
6058 unixSchemePrefix = "unix"
6159
62- defaultOutputName = "default"
6360 outputsKey = "outputs"
6461 inputsKey = "inputs"
6562 idKey = "id"
6663 agentKey = "agent"
67- monitoringKey = "monitoring"
68- serviceKey = "service"
6964 useOutputKey = "use_output"
70- monitoringMetricsPeriodKey = "metrics_period"
7165 failureThresholdKey = "failure_threshold"
7266 monitoringOutput = "monitoring"
7367 defaultMonitoringNamespace = "default"
@@ -81,14 +75,6 @@ const (
8175 prometheusMonitoringComponentId = "prometheus/" + monitoringMetricsUnitID
8276
8377 windowsOS = "windows"
84-
85- // metricset execution period used for the monitoring metrics inputs
86- // we set this to 60s to reduce the load/data volume on the monitoring cluster
87- defaultMetricsCollectionInterval = 60 * time .Second
88-
89- // metricset stream failure threshold before the stream is marked as DEGRADED
90- // to avoid marking the agent degraded for transient errors, we set the default threshold to 5
91- defaultMetricsStreamFailureThreshold = uint (5 )
9278)
9379
9480var (
@@ -182,67 +168,12 @@ func (b *BeatsMonitor) MonitoringConfig(
182168
183169 cfg := make (map [string ]interface {})
184170
185- monitoringOutputName := defaultOutputName
186- metricsCollectionIntervalString := b .config .C .MetricsPeriod
187- failureThreshold := b .config .C .FailureThreshold
188171 if agentCfg , found := policy [agentKey ]; found {
189172 // The agent section is required for feature flags
190173 cfg [agentKey ] = agentCfg
191-
192- agentCfgMap , ok := agentCfg .(map [string ]interface {})
193- if ok {
194- if monitoringCfg , found := agentCfgMap [monitoringKey ]; found {
195- monitoringMap , ok := monitoringCfg .(map [string ]interface {})
196- if ok {
197- if use , found := monitoringMap [useOutputKey ]; found {
198- if useStr , ok := use .(string ); ok {
199- monitoringOutputName = useStr
200- }
201- }
202-
203- if metricsPeriod , found := monitoringMap [monitoringMetricsPeriodKey ]; found {
204- if metricsPeriodStr , ok := metricsPeriod .(string ); ok {
205- metricsCollectionIntervalString = metricsPeriodStr
206- }
207- }
208-
209- if policyFailureThresholdRaw , found := monitoringMap [failureThresholdKey ]; found {
210- switch policyValue := policyFailureThresholdRaw .(type ) {
211- case uint :
212- failureThreshold = & policyValue
213- case int :
214- if policyValue < 0 {
215- return nil , fmt .Errorf ("converting policy failure threshold int to uint, value must be non-negative: %v" , policyValue )
216- }
217- unsignedValue := uint (policyValue )
218- failureThreshold = & unsignedValue
219- case float64 :
220- if policyValue < 0 || policyValue > math .MaxUint {
221- return nil , fmt .Errorf ("converting policy failure threshold float64 to uint, value out of range: %v" , policyValue )
222- }
223- truncatedUnsignedValue := uint (policyValue )
224- failureThreshold = & truncatedUnsignedValue
225- case string :
226- parsedPolicyValue , err := strconv .ParseUint (policyValue , 10 , 64 )
227- if err != nil {
228- return nil , fmt .Errorf ("converting policy failure threshold string to uint: %w" , err )
229- }
230- if parsedPolicyValue > math .MaxUint {
231- // this is to catch possible overflow in 32-bit envs, should not happen that often
232- return nil , fmt .Errorf ("converting policy failure threshold from string to uint, value out of range: %v" , policyValue )
233- }
234- uintPolicyValue := uint (parsedPolicyValue )
235- failureThreshold = & uintPolicyValue
236- default :
237- return nil , fmt .Errorf ("unsupported type for policy failure threshold: %T" , policyFailureThresholdRaw )
238- }
239- }
240- }
241- }
242- }
243174 }
244175
245- outputCfg , err := b .injectMonitoringOutput (policy , cfg , monitoringOutputName )
176+ outputCfg , err := b .injectMonitoringOutput (policy , cfg )
246177 if err != nil && ! errors .Is (err , errNoOutputPresent ) {
247178 return nil , errors .New (err , "failed to inject monitoring output" )
248179 } else if errors .Is (err , errNoOutputPresent ) {
@@ -269,8 +200,7 @@ func (b *BeatsMonitor) MonitoringConfig(
269200 }
270201
271202 if b .config .C .MonitorMetrics {
272- if err := b .injectMetricsInput (
273- cfg , componentInfos , metricsCollectionIntervalString , failureThreshold , monitoringRuntime ); err != nil {
203+ if err := b .injectMetricsInput (cfg , componentInfos , monitoringRuntime ); err != nil {
274204 return nil , errors .New (err , "failed to inject monitoring output" )
275205 }
276206 }
@@ -418,7 +348,8 @@ func (b *BeatsMonitor) initInputs(cfg map[string]interface{}) {
418348
419349// injectMonitoringOutput injects the monitoring output into the configuration. It takes an existing output named
420350// `monitoringOutputName` and makes a copy of it named `monitoring`. It returns the output configuration.
421- func (b * BeatsMonitor ) injectMonitoringOutput (source , dest map [string ]interface {}, monitoringOutputName string ) (map [string ]any , error ) {
351+ func (b * BeatsMonitor ) injectMonitoringOutput (source , dest map [string ]interface {}) (map [string ]any , error ) {
352+ monitoringOutputName := b .config .C .UseOutput
422353 outputsNode , found := source [outputsKey ]
423354 if ! found {
424355 return nil , errNoOutputPresent
@@ -567,22 +498,12 @@ func (b *BeatsMonitor) getCollectorTelemetryEndpoint() string {
567498func (b * BeatsMonitor ) injectMetricsInput (
568499 cfg map [string ]interface {},
569500 componentInfos []componentInfo ,
570- metricsCollectionIntervalString string ,
571- failureThreshold * uint ,
572501 monitoringRuntime component.RuntimeManager ,
573502) error {
574- if metricsCollectionIntervalString == "" {
575- metricsCollectionIntervalString = defaultMetricsCollectionInterval .String ()
576- }
577-
578- if failureThreshold == nil {
579- defaultValue := defaultMetricsStreamFailureThreshold
580- failureThreshold = & defaultValue
581- }
582503 monitoringNamespace := b .monitoringNamespace ()
583504
584- beatsStreams := b .getBeatsStreams (componentInfos , failureThreshold , metricsCollectionIntervalString )
585- httpStreams := b .getHttpStreams (componentInfos , failureThreshold , metricsCollectionIntervalString )
505+ beatsStreams := b .getBeatsStreams (componentInfos )
506+ httpStreams := b .getHttpStreams (componentInfos )
586507
587508 inputs := []interface {}{
588509 map [string ]interface {}{
@@ -615,7 +536,7 @@ func (b *BeatsMonitor) injectMetricsInput(
615536 if usingOtelRuntime (componentInfos ) && slices .ContainsFunc (componentInfos , func (ci componentInfo ) bool {
616537 return ci .ID == prometheusMonitoringComponentId
617538 }) {
618- prometheusStream := b .getPrometheusStream (failureThreshold , metricsCollectionIntervalString )
539+ prometheusStream := b .getPrometheusStream ()
619540 inputs = append (inputs , map [string ]interface {}{
620541 idKey : fmt .Sprintf ("%s-collector" , monitoringMetricsUnitID ),
621542 "name" : fmt .Sprintf ("%s-collector" , monitoringMetricsUnitID ),
@@ -633,7 +554,7 @@ func (b *BeatsMonitor) injectMetricsInput(
633554
634555 // add system/process metrics for services that can't be monitored via json/beats metrics
635556 inputs = append (inputs , b .getServiceComponentProcessMetricInputs (
636- componentInfos , metricsCollectionIntervalString )... )
557+ componentInfos )... )
637558
638559 inputsNode , found := cfg [inputsKey ]
639560 if ! found {
@@ -733,13 +654,13 @@ func (b *BeatsMonitor) getServiceComponentFilestreamStreams(componentInfos []com
733654// Note: The return type must be []any due to protobuf serialization quirks.
734655func (b * BeatsMonitor ) getHttpStreams (
735656 componentInfos []componentInfo ,
736- failureThreshold * uint ,
737- metricsCollectionIntervalString string ,
738657) []any {
739658 monitoringNamespace := b .monitoringNamespace ()
740659 sanitizedAgentName := sanitizeName (agentName )
741660 indexName := fmt .Sprintf ("metrics-elastic_agent.%s-%s" , sanitizedAgentName , monitoringNamespace )
742661 dataset := fmt .Sprintf ("elastic_agent.%s" , sanitizedAgentName )
662+ metricsCollectionIntervalString := b .config .C .MetricsPeriod .String ()
663+ failureThreshold := b .config .C .FailureThreshold
743664 httpStreams := make ([]any , 0 , len (componentInfos ))
744665
745666 agentStream := map [string ]any {
@@ -848,10 +769,7 @@ func (b *BeatsMonitor) getHttpStreams(
848769
849770// getPrometheusStream returns the stream definition for prometheus/metrics input.
850771// Note: The return type must be []any due to protobuf serialization quirks.
851- func (b * BeatsMonitor ) getPrometheusStream (
852- failureThreshold * uint ,
853- metricsCollectionIntervalString string ,
854- ) any {
772+ func (b * BeatsMonitor ) getPrometheusStream () any {
855773 monitoringNamespace := b .monitoringNamespace ()
856774
857775 // Send these metrics through the metricbeat monitoring datastream, since
@@ -875,12 +793,12 @@ func (b *BeatsMonitor) getPrometheusStream(
875793 "metrics_path" : "/metrics" ,
876794 "hosts" : []interface {}{prometheusHost },
877795 "namespace" : monitoringNamespace ,
878- "period" : metricsCollectionIntervalString ,
796+ "period" : b . config . C . MetricsPeriod . String () ,
879797 "index" : indexName ,
880798 "processors" : processorsForCollectorPrometheusStream (monitoringNamespace , dataset , b .agentInfo ),
881799 }
882- if failureThreshold != nil {
883- otelStream [failureThresholdKey ] = * failureThreshold
800+ if b . config . C . FailureThreshold != nil {
801+ otelStream [failureThresholdKey ] = * b . config . C . FailureThreshold
884802 }
885803 return otelStream
886804}
@@ -889,8 +807,6 @@ func (b *BeatsMonitor) getPrometheusStream(
889807// Note: The return type must be []any due to protobuf serialization quirks.
890808func (b * BeatsMonitor ) getBeatsStreams (
891809 componentInfos []componentInfo ,
892- failureThreshold * uint ,
893- metricsCollectionIntervalString string ,
894810) []any {
895811 monitoringNamespace := b .monitoringNamespace ()
896812 beatsStreams := make ([]any , 0 , len (componentInfos ))
@@ -915,13 +831,13 @@ func (b *BeatsMonitor) getBeatsStreams(
915831 },
916832 "metricsets" : []interface {}{"stats" },
917833 "hosts" : endpoints ,
918- "period" : metricsCollectionIntervalString ,
834+ "period" : b . config . C . MetricsPeriod . String () ,
919835 "index" : indexName ,
920836 "processors" : processorsForBeatsStream (binaryName , compInfo .ID , monitoringNamespace , dataset , b .agentInfo , compInfo .RuntimeManager ),
921837 }
922838
923- if failureThreshold != nil {
924- beatsStream [failureThresholdKey ] = * failureThreshold
839+ if b . config . C . FailureThreshold != nil {
840+ beatsStream [failureThresholdKey ] = * b . config . C . FailureThreshold
925841 }
926842
927843 beatsStreams = append (beatsStreams , beatsStream )
@@ -935,7 +851,6 @@ func (b *BeatsMonitor) getBeatsStreams(
935851// Note: The return type must be []any due to protobuf serialization quirks.
936852func (b * BeatsMonitor ) getServiceComponentProcessMetricInputs (
937853 componentInfos []componentInfo ,
938- metricsCollectionIntervalString string ,
939854) []any {
940855 monitoringNamespace := b .monitoringNamespace ()
941856 inputs := []any {}
@@ -963,7 +878,7 @@ func (b *BeatsMonitor) getServiceComponentProcessMetricInputs(
963878 "namespace" : monitoringNamespace ,
964879 },
965880 "metricsets" : []interface {}{"process" },
966- "period" : metricsCollectionIntervalString ,
881+ "period" : b . config . C . MetricsPeriod . String () ,
967882 "index" : fmt .Sprintf ("metrics-elastic_agent.%s-%s" , name , monitoringNamespace ),
968883 "process.pid" : compInfo .Pid ,
969884 "process.cgroups.enabled" : false ,
0 commit comments