Skip to content

Commit c78724f

Browse files
[8.19] (backport #11466) Remove config parsing from monitoring manager (#11631)
* Remove config parsing from monitoring manager (#11466) * Remove config parsing from monitoring manager # Conflicts: # internal/pkg/agent/application/monitoring/component/v1_monitor.go # internal/pkg/agent/application/monitoring/component/v1_monitor_test.go * Apply suggestions from code review Co-authored-by: Shaunak Kashyap <[email protected]> * Drop unnecessarily committed testsignal binary --------- Co-authored-by: Shaunak Kashyap <[email protected]> (cherry picked from commit 786c3e5) # Conflicts: # internal/pkg/core/monitoring/config/config.go * Fix conflicts --------- Co-authored-by: Mikołaj Świątek <[email protected]>
1 parent 5158bea commit c78724f

File tree

6 files changed

+252
-584
lines changed

6 files changed

+252
-584
lines changed

internal/pkg/agent/application/coordinator/diagnostics_test.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ func TestDiagnosticLocalConfig(t *testing.T) {
8484
ServerCA: "/path/to/server/ca",
8585
},
8686
},
87+
MetricsPeriod: monitoringCfg.DefaultMetricsCollectionInterval,
8788
},
8889
},
8990
}
@@ -106,11 +107,12 @@ agent:
106107
http: null
107108
logs: false
108109
metrics: false
109-
metrics_period: ""
110+
metrics_period: "1m0s"
110111
namespace: ""
111112
pprof: null
112113
failure_threshold: null
113114
traces: true
115+
use_output: ""
114116
apm:
115117
hosts:
116118
- host1

internal/pkg/agent/application/monitoring/component/testdata/monitoring_config_full_otel.yaml

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,3 @@
1-
agent:
2-
monitoring:
3-
http:
4-
enabled: false
5-
metrics: true
61
inputs:
72
- _runtime_experimental: otel
83
id: filestream-monitoring-agent

internal/pkg/agent/application/monitoring/component/testdata/monitoring_config_full_process.yaml

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,3 @@
1-
agent:
2-
monitoring:
3-
http:
4-
enabled: false
5-
metrics: true
61
inputs:
72
- _runtime_experimental: process
83
id: filestream-monitoring-agent

internal/pkg/agent/application/monitoring/component/v1_monitor.go

Lines changed: 18 additions & 103 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ import (
99
_ "embed"
1010
"fmt"
1111
"maps"
12-
"math"
1312
"net"
1413
"net/url"
1514
"os"
@@ -18,7 +17,6 @@ import (
1817
"slices"
1918
"strconv"
2019
"strings"
21-
"time"
2220
"unicode"
2321

2422
koanfmaps "github.com/knadh/koanf/maps"
@@ -59,15 +57,11 @@ const (
5957
fileSchemePrefix = "file"
6058
unixSchemePrefix = "unix"
6159

62-
defaultOutputName = "default"
6360
outputsKey = "outputs"
6461
inputsKey = "inputs"
6562
idKey = "id"
6663
agentKey = "agent"
67-
monitoringKey = "monitoring"
68-
serviceKey = "service"
6964
useOutputKey = "use_output"
70-
monitoringMetricsPeriodKey = "metrics_period"
7165
failureThresholdKey = "failure_threshold"
7266
monitoringOutput = "monitoring"
7367
defaultMonitoringNamespace = "default"
@@ -81,14 +75,6 @@ const (
8175
prometheusMonitoringComponentId = "prometheus/" + monitoringMetricsUnitID
8276

8377
windowsOS = "windows"
84-
85-
// metricset execution period used for the monitoring metrics inputs
86-
// we set this to 60s to reduce the load/data volume on the monitoring cluster
87-
defaultMetricsCollectionInterval = 60 * time.Second
88-
89-
// metricset stream failure threshold before the stream is marked as DEGRADED
90-
// to avoid marking the agent degraded for transient errors, we set the default threshold to 5
91-
defaultMetricsStreamFailureThreshold = uint(5)
9278
)
9379

9480
var (
@@ -182,67 +168,12 @@ func (b *BeatsMonitor) MonitoringConfig(
182168

183169
cfg := make(map[string]interface{})
184170

185-
monitoringOutputName := defaultOutputName
186-
metricsCollectionIntervalString := b.config.C.MetricsPeriod
187-
failureThreshold := b.config.C.FailureThreshold
188171
if agentCfg, found := policy[agentKey]; found {
189172
// The agent section is required for feature flags
190173
cfg[agentKey] = agentCfg
191-
192-
agentCfgMap, ok := agentCfg.(map[string]interface{})
193-
if ok {
194-
if monitoringCfg, found := agentCfgMap[monitoringKey]; found {
195-
monitoringMap, ok := monitoringCfg.(map[string]interface{})
196-
if ok {
197-
if use, found := monitoringMap[useOutputKey]; found {
198-
if useStr, ok := use.(string); ok {
199-
monitoringOutputName = useStr
200-
}
201-
}
202-
203-
if metricsPeriod, found := monitoringMap[monitoringMetricsPeriodKey]; found {
204-
if metricsPeriodStr, ok := metricsPeriod.(string); ok {
205-
metricsCollectionIntervalString = metricsPeriodStr
206-
}
207-
}
208-
209-
if policyFailureThresholdRaw, found := monitoringMap[failureThresholdKey]; found {
210-
switch policyValue := policyFailureThresholdRaw.(type) {
211-
case uint:
212-
failureThreshold = &policyValue
213-
case int:
214-
if policyValue < 0 {
215-
return nil, fmt.Errorf("converting policy failure threshold int to uint, value must be non-negative: %v", policyValue)
216-
}
217-
unsignedValue := uint(policyValue)
218-
failureThreshold = &unsignedValue
219-
case float64:
220-
if policyValue < 0 || policyValue > math.MaxUint {
221-
return nil, fmt.Errorf("converting policy failure threshold float64 to uint, value out of range: %v", policyValue)
222-
}
223-
truncatedUnsignedValue := uint(policyValue)
224-
failureThreshold = &truncatedUnsignedValue
225-
case string:
226-
parsedPolicyValue, err := strconv.ParseUint(policyValue, 10, 64)
227-
if err != nil {
228-
return nil, fmt.Errorf("converting policy failure threshold string to uint: %w", err)
229-
}
230-
if parsedPolicyValue > math.MaxUint {
231-
// this is to catch possible overflow in 32-bit envs, should not happen that often
232-
return nil, fmt.Errorf("converting policy failure threshold from string to uint, value out of range: %v", policyValue)
233-
}
234-
uintPolicyValue := uint(parsedPolicyValue)
235-
failureThreshold = &uintPolicyValue
236-
default:
237-
return nil, fmt.Errorf("unsupported type for policy failure threshold: %T", policyFailureThresholdRaw)
238-
}
239-
}
240-
}
241-
}
242-
}
243174
}
244175

245-
outputCfg, err := b.injectMonitoringOutput(policy, cfg, monitoringOutputName)
176+
outputCfg, err := b.injectMonitoringOutput(policy, cfg)
246177
if err != nil && !errors.Is(err, errNoOutputPresent) {
247178
return nil, errors.New(err, "failed to inject monitoring output")
248179
} else if errors.Is(err, errNoOutputPresent) {
@@ -269,8 +200,7 @@ func (b *BeatsMonitor) MonitoringConfig(
269200
}
270201

271202
if b.config.C.MonitorMetrics {
272-
if err := b.injectMetricsInput(
273-
cfg, componentInfos, metricsCollectionIntervalString, failureThreshold, monitoringRuntime); err != nil {
203+
if err := b.injectMetricsInput(cfg, componentInfos, monitoringRuntime); err != nil {
274204
return nil, errors.New(err, "failed to inject monitoring output")
275205
}
276206
}
@@ -418,7 +348,8 @@ func (b *BeatsMonitor) initInputs(cfg map[string]interface{}) {
418348

419349
// injectMonitoringOutput injects the monitoring output into the configuration. It takes an existing output named
420350
// `monitoringOutputName` and makes a copy of it named `monitoring`. It returns the output configuration.
421-
func (b *BeatsMonitor) injectMonitoringOutput(source, dest map[string]interface{}, monitoringOutputName string) (map[string]any, error) {
351+
func (b *BeatsMonitor) injectMonitoringOutput(source, dest map[string]interface{}) (map[string]any, error) {
352+
monitoringOutputName := b.config.C.UseOutput
422353
outputsNode, found := source[outputsKey]
423354
if !found {
424355
return nil, errNoOutputPresent
@@ -567,22 +498,12 @@ func (b *BeatsMonitor) getCollectorTelemetryEndpoint() string {
567498
func (b *BeatsMonitor) injectMetricsInput(
568499
cfg map[string]interface{},
569500
componentInfos []componentInfo,
570-
metricsCollectionIntervalString string,
571-
failureThreshold *uint,
572501
monitoringRuntime component.RuntimeManager,
573502
) error {
574-
if metricsCollectionIntervalString == "" {
575-
metricsCollectionIntervalString = defaultMetricsCollectionInterval.String()
576-
}
577-
578-
if failureThreshold == nil {
579-
defaultValue := defaultMetricsStreamFailureThreshold
580-
failureThreshold = &defaultValue
581-
}
582503
monitoringNamespace := b.monitoringNamespace()
583504

584-
beatsStreams := b.getBeatsStreams(componentInfos, failureThreshold, metricsCollectionIntervalString)
585-
httpStreams := b.getHttpStreams(componentInfos, failureThreshold, metricsCollectionIntervalString)
505+
beatsStreams := b.getBeatsStreams(componentInfos)
506+
httpStreams := b.getHttpStreams(componentInfos)
586507

587508
inputs := []interface{}{
588509
map[string]interface{}{
@@ -615,7 +536,7 @@ func (b *BeatsMonitor) injectMetricsInput(
615536
if usingOtelRuntime(componentInfos) && slices.ContainsFunc(componentInfos, func(ci componentInfo) bool {
616537
return ci.ID == prometheusMonitoringComponentId
617538
}) {
618-
prometheusStream := b.getPrometheusStream(failureThreshold, metricsCollectionIntervalString)
539+
prometheusStream := b.getPrometheusStream()
619540
inputs = append(inputs, map[string]interface{}{
620541
idKey: fmt.Sprintf("%s-collector", monitoringMetricsUnitID),
621542
"name": fmt.Sprintf("%s-collector", monitoringMetricsUnitID),
@@ -633,7 +554,7 @@ func (b *BeatsMonitor) injectMetricsInput(
633554

634555
// add system/process metrics for services that can't be monitored via json/beats metrics
635556
inputs = append(inputs, b.getServiceComponentProcessMetricInputs(
636-
componentInfos, metricsCollectionIntervalString)...)
557+
componentInfos)...)
637558

638559
inputsNode, found := cfg[inputsKey]
639560
if !found {
@@ -733,13 +654,13 @@ func (b *BeatsMonitor) getServiceComponentFilestreamStreams(componentInfos []com
733654
// Note: The return type must be []any due to protobuf serialization quirks.
734655
func (b *BeatsMonitor) getHttpStreams(
735656
componentInfos []componentInfo,
736-
failureThreshold *uint,
737-
metricsCollectionIntervalString string,
738657
) []any {
739658
monitoringNamespace := b.monitoringNamespace()
740659
sanitizedAgentName := sanitizeName(agentName)
741660
indexName := fmt.Sprintf("metrics-elastic_agent.%s-%s", sanitizedAgentName, monitoringNamespace)
742661
dataset := fmt.Sprintf("elastic_agent.%s", sanitizedAgentName)
662+
metricsCollectionIntervalString := b.config.C.MetricsPeriod.String()
663+
failureThreshold := b.config.C.FailureThreshold
743664
httpStreams := make([]any, 0, len(componentInfos))
744665

745666
agentStream := map[string]any{
@@ -848,10 +769,7 @@ func (b *BeatsMonitor) getHttpStreams(
848769

849770
// getPrometheusStream returns the stream definition for prometheus/metrics input.
850771
// Note: The return type must be []any due to protobuf serialization quirks.
851-
func (b *BeatsMonitor) getPrometheusStream(
852-
failureThreshold *uint,
853-
metricsCollectionIntervalString string,
854-
) any {
772+
func (b *BeatsMonitor) getPrometheusStream() any {
855773
monitoringNamespace := b.monitoringNamespace()
856774

857775
// Send these metrics through the metricbeat monitoring datastream, since
@@ -875,12 +793,12 @@ func (b *BeatsMonitor) getPrometheusStream(
875793
"metrics_path": "/metrics",
876794
"hosts": []interface{}{prometheusHost},
877795
"namespace": monitoringNamespace,
878-
"period": metricsCollectionIntervalString,
796+
"period": b.config.C.MetricsPeriod.String(),
879797
"index": indexName,
880798
"processors": processorsForCollectorPrometheusStream(monitoringNamespace, dataset, b.agentInfo),
881799
}
882-
if failureThreshold != nil {
883-
otelStream[failureThresholdKey] = *failureThreshold
800+
if b.config.C.FailureThreshold != nil {
801+
otelStream[failureThresholdKey] = *b.config.C.FailureThreshold
884802
}
885803
return otelStream
886804
}
@@ -889,8 +807,6 @@ func (b *BeatsMonitor) getPrometheusStream(
889807
// Note: The return type must be []any due to protobuf serialization quirks.
890808
func (b *BeatsMonitor) getBeatsStreams(
891809
componentInfos []componentInfo,
892-
failureThreshold *uint,
893-
metricsCollectionIntervalString string,
894810
) []any {
895811
monitoringNamespace := b.monitoringNamespace()
896812
beatsStreams := make([]any, 0, len(componentInfos))
@@ -915,13 +831,13 @@ func (b *BeatsMonitor) getBeatsStreams(
915831
},
916832
"metricsets": []interface{}{"stats"},
917833
"hosts": endpoints,
918-
"period": metricsCollectionIntervalString,
834+
"period": b.config.C.MetricsPeriod.String(),
919835
"index": indexName,
920836
"processors": processorsForBeatsStream(binaryName, compInfo.ID, monitoringNamespace, dataset, b.agentInfo, compInfo.RuntimeManager),
921837
}
922838

923-
if failureThreshold != nil {
924-
beatsStream[failureThresholdKey] = *failureThreshold
839+
if b.config.C.FailureThreshold != nil {
840+
beatsStream[failureThresholdKey] = *b.config.C.FailureThreshold
925841
}
926842

927843
beatsStreams = append(beatsStreams, beatsStream)
@@ -935,7 +851,6 @@ func (b *BeatsMonitor) getBeatsStreams(
935851
// Note: The return type must be []any due to protobuf serialization quirks.
936852
func (b *BeatsMonitor) getServiceComponentProcessMetricInputs(
937853
componentInfos []componentInfo,
938-
metricsCollectionIntervalString string,
939854
) []any {
940855
monitoringNamespace := b.monitoringNamespace()
941856
inputs := []any{}
@@ -963,7 +878,7 @@ func (b *BeatsMonitor) getServiceComponentProcessMetricInputs(
963878
"namespace": monitoringNamespace,
964879
},
965880
"metricsets": []interface{}{"process"},
966-
"period": metricsCollectionIntervalString,
881+
"period": b.config.C.MetricsPeriod.String(),
967882
"index": fmt.Sprintf("metrics-elastic_agent.%s-%s", name, monitoringNamespace),
968883
"process.pid": compInfo.Pid,
969884
"process.cgroups.enabled": false,

0 commit comments

Comments
 (0)