Skip to content

Commit 83880d3

Browse files
authored
[beatsreceivers] add option to mute exporter status (#10890)
* add option to skip output status * fix test * status reporting * test case fix * skip yaml * test * fix test * pipeline status and review comments * review comments and testing * comment * lint * rename channel parameter to ensure consitency * integration test * remove os arch hardcoded * review comments * test fix * move force fetch if config hasn't changed and fix test
1 parent 40bc68c commit 83880d3

File tree

8 files changed

+601
-29
lines changed

8 files changed

+601
-29
lines changed

internal/pkg/otel/manager/execution.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,12 @@ type collectorExecution interface {
2020
// The collector will report status events in the statusCh channel and errors on errCh in a non-blocking fashion,
2121
// draining the channel before writing to it.
2222
// After the collector exits, it will emit an error describing the exit status (nil if successful) and a nil status.
23-
startCollector(ctx context.Context, baseLogger *logger.Logger, logger *logger.Logger, cfg *confmap.Conf, errCh chan error, statusCh chan *status.AggregateStatus) (collectorHandle, error)
23+
// Parameters:
24+
// - cfg: Configuration for the collector.
25+
// - errCh: Process exit errors are sent to the errCh channel
26+
// - statusCh: Collector's status updates are sent to statusCh channel.
27+
// - forceFetchStatusCh: Channel that is used to trigger a forced status update.
28+
startCollector(ctx context.Context, baseLogger *logger.Logger, logger *logger.Logger, cfg *confmap.Conf, errCh chan error, statusCh chan *status.AggregateStatus, forceFetchStatusCh chan struct{}) (collectorHandle, error)
2429
}
2530

2631
type collectorHandle interface {

internal/pkg/otel/manager/execution_subprocess.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ type subprocessExecution struct {
7979

8080
// startCollector starts a supervised collector and monitors its health. Process exit errors are sent to the
8181
// processErrCh channel. Other run errors, such as not able to connect to the health endpoint, are sent to the runErrCh channel.
82-
func (r *subprocessExecution) startCollector(ctx context.Context, baseLogger *logger.Logger, logger *logger.Logger, cfg *confmap.Conf, processErrCh chan error, statusCh chan *status.AggregateStatus) (collectorHandle, error) {
82+
func (r *subprocessExecution) startCollector(ctx context.Context, baseLogger *logger.Logger, logger *logger.Logger, cfg *confmap.Conf, processErrCh chan error, statusCh chan *status.AggregateStatus, forceFetchStatusCh chan struct{}) (collectorHandle, error) {
8383
if cfg == nil {
8484
// configuration is required
8585
return nil, errors.New("no configuration provided")
@@ -186,6 +186,8 @@ func (r *subprocessExecution) startCollector(ctx context.Context, baseLogger *lo
186186
// after the collector exits, we need to report a nil status
187187
r.reportSubprocessCollectorStatus(ctx, statusCh, nil)
188188
return
189+
case <-forceFetchStatusCh:
190+
r.reportSubprocessCollectorStatus(procCtx, statusCh, statuses)
189191
case <-healthCheckPollTimer.C:
190192
healthCheckPollTimer.Reset(healthCheckPollDuration)
191193
case <-maxFailuresTimer.C:

internal/pkg/otel/manager/manager.go

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,7 @@ func (m *OTelManager) Run(ctx context.Context) error {
200200
// this channel is buffered because it's possible for the collector to send a status update while the manager is
201201
// waiting for the collector to exit
202202
collectorStatusCh := make(chan *status.AggregateStatus, 1)
203+
forceFetchStatusCh := make(chan struct{}, 1)
203204
for {
204205
select {
205206
case <-ctx.Done():
@@ -224,7 +225,7 @@ func (m *OTelManager) Run(ctx context.Context) error {
224225

225226
newRetries := m.recoveryRetries.Add(1)
226227
m.logger.Infof("collector recovery restarting, total retries: %d", newRetries)
227-
m.proc, err = m.execution.startCollector(ctx, m.baseLogger, m.logger, m.mergedCollectorCfg, m.collectorRunErr, collectorStatusCh)
228+
m.proc, err = m.execution.startCollector(ctx, m.baseLogger, m.logger, m.mergedCollectorCfg, m.collectorRunErr, collectorStatusCh, forceFetchStatusCh)
228229
if err != nil {
229230
reportErr(ctx, m.errCh, err)
230231
// reset the restart timer to the next backoff
@@ -256,7 +257,7 @@ func (m *OTelManager) Run(ctx context.Context) error {
256257

257258
// in this rare case the collector stopped running but a configuration was
258259
// provided and the collector stopped with a clean exit
259-
m.proc, err = m.execution.startCollector(ctx, m.baseLogger, m.logger, m.mergedCollectorCfg, m.collectorRunErr, collectorStatusCh)
260+
m.proc, err = m.execution.startCollector(ctx, m.baseLogger, m.logger, m.mergedCollectorCfg, m.collectorRunErr, collectorStatusCh, forceFetchStatusCh)
260261
if err != nil {
261262
// failed to create the collector (this is different then
262263
// it's failing to run). we do not retry creation on failure
@@ -317,7 +318,7 @@ func (m *OTelManager) Run(ctx context.Context) error {
317318
m.logger.Debugf(
318319
"new config hash (%d) is different than the old config hash (%d), applying update",
319320
m.mergedCollectorCfgHash, previousConfigHash)
320-
applyErr := m.applyMergedConfig(ctx, collectorStatusCh, m.collectorRunErr)
321+
applyErr := m.applyMergedConfig(ctx, collectorStatusCh, m.collectorRunErr, forceFetchStatusCh)
321322
// only report the error if we actually apply the update
322323
// otherwise, we could override an actual error with a nil in the channel when the collector
323324
// state doesn't actually change
@@ -326,6 +327,16 @@ func (m *OTelManager) Run(ctx context.Context) error {
326327
m.logger.Debugf(
327328
"new config hash (%d) is identical to the old config hash (%d), skipping update",
328329
m.mergedCollectorCfgHash, previousConfigHash)
330+
331+
// there was a config update, but the hash hasn't changed.
332+
// Force fetch the latest collector status in case the user modified the output.status_reporting flag.
333+
//
334+
// drain the channel first
335+
select {
336+
case <-forceFetchStatusCh:
337+
default:
338+
}
339+
forceFetchStatusCh <- struct{}{}
329340
}
330341

331342
case otelStatus := <-collectorStatusCh:
@@ -415,7 +426,7 @@ func injectDiagnosticsExtension(config *confmap.Conf) error {
415426
return config.Merge(confmap.NewFromStringMap(extensionCfg))
416427
}
417428

418-
func (m *OTelManager) applyMergedConfig(ctx context.Context, collectorStatusCh chan *status.AggregateStatus, collectorRunErr chan error) error {
429+
func (m *OTelManager) applyMergedConfig(ctx context.Context, collectorStatusCh chan *status.AggregateStatus, collectorRunErr chan error, forceFetchStatusCh chan struct{}) error {
419430
if m.proc != nil {
420431
m.proc.Stop(m.stopTimeout)
421432
m.proc = nil
@@ -443,7 +454,7 @@ func (m *OTelManager) applyMergedConfig(ctx context.Context, collectorStatusCh c
443454
} else {
444455
// either a new configuration or the first configuration
445456
// that results in the collector being started
446-
proc, err := m.execution.startCollector(ctx, m.baseLogger, m.logger, m.mergedCollectorCfg, collectorRunErr, collectorStatusCh)
457+
proc, err := m.execution.startCollector(ctx, m.baseLogger, m.logger, m.mergedCollectorCfg, collectorRunErr, collectorStatusCh, forceFetchStatusCh)
447458
if err != nil {
448459
// failed to create the collector (this is different then
449460
// it's failing to run). we do not retry creation on failure
@@ -523,6 +534,11 @@ func (m *OTelManager) handleOtelStatusUpdate(otelStatus *status.AggregateStatus)
523534
}
524535
}
525536

537+
otelStatus, err := translate.MaybeMuteExporterStatus(otelStatus, m.components)
538+
if err != nil {
539+
return nil, fmt.Errorf("failed to mute exporter states from otel status: %w", err)
540+
}
541+
526542
// Extract component states from otel status
527543
componentStates, err := translate.GetAllComponentStates(otelStatus, m.components)
528544
if err != nil {

internal/pkg/otel/manager/manager_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -91,12 +91,12 @@ type testExecution struct {
9191
handle collectorHandle
9292
}
9393

94-
func (e *testExecution) startCollector(ctx context.Context, baseLogger *logger.Logger, logger *logger.Logger, cfg *confmap.Conf, errCh chan error, statusCh chan *status.AggregateStatus) (collectorHandle, error) {
94+
func (e *testExecution) startCollector(ctx context.Context, baseLogger *logger.Logger, logger *logger.Logger, cfg *confmap.Conf, errCh chan error, statusCh chan *status.AggregateStatus, forceFetchStatusCh chan struct{}) (collectorHandle, error) {
9595
e.mtx.Lock()
9696
defer e.mtx.Unlock()
9797

9898
var err error
99-
e.handle, err = e.exec.startCollector(ctx, baseLogger, logger, cfg, errCh, statusCh)
99+
e.handle, err = e.exec.startCollector(ctx, baseLogger, logger, cfg, errCh, statusCh, forceFetchStatusCh)
100100
return e.handle, err
101101
}
102102

@@ -123,6 +123,7 @@ func (e *mockExecution) startCollector(
123123
cfg *confmap.Conf,
124124
errCh chan error,
125125
statusCh chan *status.AggregateStatus,
126+
_ chan struct{},
126127
) (collectorHandle, error) {
127128
e.errCh = errCh
128129
e.statusCh = statusCh

internal/pkg/otel/translate/status.go

Lines changed: 106 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,110 @@ func DropComponentStateFromOtelStatus(otelStatus *status.AggregateStatus) (*stat
9090
return newStatus, nil
9191
}
9292

93+
// MaybeMuteExporterStatus modifies the given otel status by muting exporter statuses for muted components.
94+
// It also updates parent pipeline statuses based on child components.
95+
func MaybeMuteExporterStatus(
96+
otelStatus *status.AggregateStatus,
97+
components []component.Component,
98+
) (*status.AggregateStatus, error) {
99+
if otelStatus == nil {
100+
return nil, nil
101+
}
102+
103+
newStatus := deepCopyStatus(otelStatus)
104+
105+
// Mute exporters
106+
if err := muteExporters(newStatus, components); err != nil {
107+
return nil, err
108+
}
109+
110+
updateStatus(newStatus)
111+
112+
return newStatus, nil
113+
}
114+
115+
// updateStatus recursively updates each AggregateStatus.Event
116+
// based on the statuses of its child components.
117+
func updateStatus(status *status.AggregateStatus) {
118+
if status == nil {
119+
return
120+
}
121+
122+
ok := true
123+
124+
for _, child := range status.ComponentStatusMap {
125+
updateStatus(child)
126+
127+
if child.Status() != componentstatus.StatusOK {
128+
ok = false
129+
}
130+
}
131+
132+
if len(status.ComponentStatusMap) > 0 {
133+
if ok {
134+
status.Event = componentstatus.NewEvent(componentstatus.StatusOK)
135+
}
136+
}
137+
}
138+
139+
// muteExporters sets all exporter statuses to OK for muted pipelines/components.
140+
func muteExporters(agg *status.AggregateStatus, components []component.Component) error {
141+
for pipelineStatusID, pipelineStatus := range agg.ComponentStatusMap {
142+
if pipelineStatusID == "extensions" {
143+
// we do not want to report extension status
144+
continue
145+
}
146+
pipelineID, err := parsePipelineID(pipelineStatusID)
147+
if err != nil {
148+
return err
149+
}
150+
151+
componentID, found := strings.CutPrefix(pipelineID.Name(), OtelNamePrefix)
152+
if !found || !isOutputMuted(componentID, components) {
153+
continue
154+
}
155+
156+
// Mute exporters for the pipeline for this component
157+
for compID, compStatus := range pipelineStatus.ComponentStatusMap {
158+
kind, _, err := parseEntityStatusId(compID)
159+
if err != nil {
160+
return err
161+
}
162+
if kind == "exporter" {
163+
compStatus.Event = componentstatus.NewEvent(componentstatus.StatusOK)
164+
}
165+
}
166+
}
167+
return nil
168+
}
169+
170+
// parsePipelineID extracts a *pipeline.ID from a status ID string.
171+
func parsePipelineID(statusID string) (*pipeline.ID, error) {
172+
_, pipelineIDStr, err := parseEntityStatusId(statusID)
173+
if err != nil {
174+
return nil, err
175+
}
176+
pID := &pipeline.ID{}
177+
if err := pID.UnmarshalText([]byte(pipelineIDStr)); err != nil {
178+
return nil, err
179+
}
180+
return pID, nil
181+
}
182+
183+
// isOutputMuted checks whether output status reporting is disabled for the given componentID
184+
func isOutputMuted(componentID string, components []component.Component) bool {
185+
for _, comp := range components {
186+
if comp.ID != componentID {
187+
continue
188+
}
189+
if comp.OutputStatusReporting == nil {
190+
return false
191+
}
192+
return !comp.OutputStatusReporting.Enabled
193+
}
194+
return false
195+
}
196+
93197
// getOtelRuntimePipelineStatuses finds otel pipeline statuses belonging to runtime components and returns them as a map
94198
// from component id to pipeline status.
95199
func getOtelRuntimePipelineStatuses(otelStatus *status.AggregateStatus) (map[string]*status.AggregateStatus, error) {
@@ -142,7 +246,7 @@ func getComponentState(pipelineStatus *status.AggregateStatus, comp component.Co
142246
BuildHash: version.Commit(),
143247
},
144248
}
145-
receiverStatuses, exporterStatuses, err := getUnitOtelStatuses(pipelineStatus)
249+
receiverStatuses, exporterStatuses, err := getUnitOtelStatuses(pipelineStatus, comp)
146250
if err != nil {
147251
return runtime.ComponentComponentState{}, err
148252
}
@@ -187,7 +291,7 @@ func getComponentState(pipelineStatus *status.AggregateStatus, comp component.Co
187291
}
188292

189293
// getUnitOtelStatuses extracts the receiver and exporter status from otel pipeline status.
190-
func getUnitOtelStatuses(pipelineStatus *status.AggregateStatus) (
294+
func getUnitOtelStatuses(pipelineStatus *status.AggregateStatus, comp component.Component) (
191295
receiverStatuses map[otelcomponent.ID]*status.AggregateStatus,
192296
exporterStatuses map[otelcomponent.ID]*status.AggregateStatus,
193297
err error) {

0 commit comments

Comments
 (0)