Skip to content

Commit 1144501

Browse files
authored
Set correct types for otel components in Fleet status (#11233)
* Set correct types for otel components in Fleet status * Log a warning if we get a malformed otel component status id * Only sort checkin output in tests * Fix new unit tests
1 parent a693a6e commit 1144501

File tree

9 files changed

+523
-241
lines changed

9 files changed

+523
-241
lines changed

internal/pkg/agent/application/coordinator/coordinator_state.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ import (
1818
"github.com/elastic/elastic-agent-libs/logp"
1919

2020
"github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/details"
21-
"github.com/elastic/elastic-agent/internal/pkg/otel/otelhelpers"
21+
"github.com/elastic/elastic-agent/internal/pkg/otel/translate"
2222
"github.com/elastic/elastic-agent/pkg/component/runtime"
2323
agentclient "github.com/elastic/elastic-agent/pkg/control/v2/client"
2424
)
@@ -245,10 +245,10 @@ func (c *Coordinator) generateReportableState() (s State) {
245245
} else if c.varsMgrErr != nil {
246246
s.State = agentclient.Failed
247247
s.Message = fmt.Sprintf("Vars manager: %s", c.varsMgrErr.Error())
248-
} else if hasState(s.Components, client.UnitStateFailed) || otelhelpers.HasStatus(s.Collector, componentstatus.StatusFatalError) || otelhelpers.HasStatus(s.Collector, componentstatus.StatusPermanentError) {
248+
} else if hasState(s.Components, client.UnitStateFailed) || translate.HasStatus(s.Collector, componentstatus.StatusFatalError) || translate.HasStatus(s.Collector, componentstatus.StatusPermanentError) {
249249
s.State = agentclient.Degraded
250250
s.Message = "1 or more components/units in a failed state"
251-
} else if hasState(s.Components, client.UnitStateDegraded) || otelhelpers.HasStatus(s.Collector, componentstatus.StatusRecoverableError) {
251+
} else if hasState(s.Components, client.UnitStateDegraded) || translate.HasStatus(s.Collector, componentstatus.StatusRecoverableError) {
252252
s.State = agentclient.Degraded
253253
s.Message = "1 or more components/units in a degraded state"
254254
} else {

internal/pkg/agent/application/gateway/fleet/fleet_gateway.go

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ import (
1212

1313
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/status"
1414

15+
"github.com/elastic/elastic-agent-libs/logp"
16+
1517
agentclient "github.com/elastic/elastic-agent/pkg/control/v2/client"
1618

1719
eaclient "github.com/elastic/elastic-agent-client/v7/pkg/client"
@@ -23,7 +25,7 @@ import (
2325
"github.com/elastic/elastic-agent/internal/pkg/fleetapi"
2426
"github.com/elastic/elastic-agent/internal/pkg/fleetapi/acker"
2527
"github.com/elastic/elastic-agent/internal/pkg/fleetapi/client"
26-
"github.com/elastic/elastic-agent/internal/pkg/otel/otelhelpers"
28+
"github.com/elastic/elastic-agent/internal/pkg/otel/translate"
2729
"github.com/elastic/elastic-agent/internal/pkg/scheduler"
2830
"github.com/elastic/elastic-agent/pkg/component/runtime"
2931
"github.com/elastic/elastic-agent/pkg/core/logger"
@@ -263,8 +265,8 @@ func (f *FleetGateway) doExecute(ctx context.Context, bo backoff.Backoff) (*flee
263265
return nil, ctx.Err()
264266
}
265267

266-
func (f *FleetGateway) convertToCheckinComponents(components []runtime.ComponentComponentState, collector *status.AggregateStatus) []fleetapi.CheckinComponent {
267-
if components == nil {
268+
func convertToCheckinComponents(logger *logp.Logger, components []runtime.ComponentComponentState, collector *status.AggregateStatus) []fleetapi.CheckinComponent {
269+
if components == nil && (collector == nil || len(collector.ComponentStatusMap) == 0) {
268270
return nil
269271
}
270272
stateString := func(s eaclient.UnitState) string {
@@ -281,6 +283,21 @@ func (f *FleetGateway) convertToCheckinComponents(components []runtime.Component
281283
return ""
282284
}
283285

286+
otelComponentTypeString := func(componentStatusId string) string {
287+
kind, _, err := translate.ParseEntityStatusId(componentStatusId)
288+
if err != nil {
289+
logger.Warnf("failed to parse component status id '%s': %v", componentStatusId, err)
290+
return ""
291+
}
292+
switch kind {
293+
case "receiver":
294+
return "input"
295+
case "exporter":
296+
return "output"
297+
}
298+
return ""
299+
}
300+
284301
size := len(components)
285302
if collector != nil {
286303
size += len(collector.ComponentStatusMap)
@@ -319,7 +336,7 @@ func (f *FleetGateway) convertToCheckinComponents(components []runtime.Component
319336
// and each subcomponent is a unit.
320337
if collector != nil {
321338
for id, item := range collector.ComponentStatusMap {
322-
state, msg := otelhelpers.StateWithMessage(item)
339+
state, msg := translate.StateWithMessage(item)
323340

324341
checkinComponent := fleetapi.CheckinComponent{
325342
ID: id,
@@ -331,11 +348,12 @@ func (f *FleetGateway) convertToCheckinComponents(components []runtime.Component
331348
if len(item.ComponentStatusMap) > 0 {
332349
units := make([]fleetapi.CheckinUnit, 0, len(item.ComponentStatusMap))
333350
for unitId, unitItem := range item.ComponentStatusMap {
334-
unitState, unitMsg := otelhelpers.StateWithMessage(unitItem)
351+
unitState, unitMsg := translate.StateWithMessage(unitItem)
335352
units = append(units, fleetapi.CheckinUnit{
336353
ID: unitId,
337354
Status: stateString(unitState),
338355
Message: unitMsg,
356+
Type: otelComponentTypeString(unitId),
339357
})
340358
}
341359
checkinComponent.Units = units
@@ -365,7 +383,7 @@ func (f *FleetGateway) execute(ctx context.Context) (*fleetapi.CheckinResponse,
365383
state, stateCtx := f.stateFetcher.FetchState(ctx)
366384

367385
// convert components into checkin components structure
368-
components := f.convertToCheckinComponents(state.Components, state.Collector)
386+
components := convertToCheckinComponents(f.log, state.Components, state.Collector)
369387

370388
f.log.Debugf("correcting agent loglevel from %s to %s using coordinator state", ecsMeta.Elastic.Agent.LogLevel, state.LogLevel.String())
371389
// Fix loglevel with the current log level used by coordinator

internal/pkg/agent/application/gateway/fleet/fleet_gateway_test.go

Lines changed: 270 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,21 @@ import (
1313
"net/http"
1414
"net/url"
1515
"path/filepath"
16+
"slices"
17+
"strings"
1618
"sync"
1719
"testing"
1820
"time"
1921

2022
"github.com/stretchr/testify/assert"
2123
"github.com/stretchr/testify/require"
2224

25+
"github.com/elastic/elastic-agent-libs/logp"
26+
27+
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/status"
28+
"go.opentelemetry.io/collector/component/componentstatus"
29+
30+
eaclient "github.com/elastic/elastic-agent-client/v7/pkg/client"
2331
"github.com/elastic/elastic-agent/internal/pkg/agent/application/coordinator"
2432
"github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/details"
2533
"github.com/elastic/elastic-agent/internal/pkg/agent/errors"
@@ -29,6 +37,8 @@ import (
2937
"github.com/elastic/elastic-agent/internal/pkg/fleetapi/acker/noop"
3038
"github.com/elastic/elastic-agent/internal/pkg/fleetapi/client"
3139
"github.com/elastic/elastic-agent/internal/pkg/scheduler"
40+
"github.com/elastic/elastic-agent/pkg/component"
41+
"github.com/elastic/elastic-agent/pkg/component/runtime"
3242
agentclient "github.com/elastic/elastic-agent/pkg/control/v2/client"
3343
"github.com/elastic/elastic-agent/pkg/core/logger"
3444
"github.com/elastic/elastic-agent/pkg/core/logger/loggertest"
@@ -874,3 +884,263 @@ func TestFastCheckinStateFetcher(t *testing.T) {
874884
s.mutex.Unlock()
875885
})
876886
}
887+
888+
func TestConvertToCheckingComponents(t *testing.T) {
889+
tests := []struct {
890+
name string
891+
components []runtime.ComponentComponentState
892+
collector *status.AggregateStatus
893+
expected []fleetapi.CheckinComponent
894+
}{
895+
{
896+
name: "Nil inputs",
897+
components: nil,
898+
collector: nil,
899+
expected: nil,
900+
},
901+
{
902+
name: "Empty inputs",
903+
components: []runtime.ComponentComponentState{},
904+
collector: &status.AggregateStatus{},
905+
expected: []fleetapi.CheckinComponent{},
906+
},
907+
{
908+
name: "Only agent components",
909+
components: []runtime.ComponentComponentState{
910+
{
911+
Component: component.Component{ID: "comp-1", InputSpec: &component.InputRuntimeSpec{InputType: "log"}},
912+
State: runtime.ComponentState{
913+
State: eaclient.UnitStateHealthy,
914+
Message: "Component is healthy",
915+
},
916+
},
917+
{
918+
Component: component.Component{ID: "comp-2", InputSpec: &component.InputRuntimeSpec{InputType: "log"}},
919+
State: runtime.ComponentState{
920+
State: eaclient.UnitStateDegraded,
921+
Message: "Component is degraded",
922+
Units: map[runtime.ComponentUnitKey]runtime.ComponentUnitState{
923+
{UnitID: "unit-1", UnitType: eaclient.UnitTypeInput}: {
924+
State: eaclient.UnitStateFailed,
925+
Message: "Input unit failed",
926+
Payload: map[string]interface{}{"error": "some error"},
927+
},
928+
},
929+
},
930+
},
931+
},
932+
collector: nil,
933+
expected: []fleetapi.CheckinComponent{
934+
{
935+
ID: "comp-1",
936+
Type: "log",
937+
Status: "HEALTHY",
938+
Message: "Component is healthy",
939+
},
940+
{
941+
ID: "comp-2",
942+
Type: "log",
943+
Status: "DEGRADED",
944+
Message: "Component is degraded",
945+
Units: []fleetapi.CheckinUnit{
946+
{
947+
ID: "unit-1",
948+
Type: "input",
949+
Status: "FAILED",
950+
Message: "Input unit failed",
951+
Payload: map[string]interface{}{"error": "some error"},
952+
},
953+
},
954+
},
955+
},
956+
},
957+
{
958+
name: "Only OTel components",
959+
components: nil,
960+
collector: &status.AggregateStatus{
961+
ComponentStatusMap: map[string]*status.AggregateStatus{
962+
"extensions": {
963+
Event: componentstatus.NewEvent(componentstatus.StatusOK),
964+
ComponentStatusMap: map[string]*status.AggregateStatus{
965+
"extensions:healthcheck": {
966+
Event: componentstatus.NewEvent(componentstatus.StatusOK),
967+
},
968+
},
969+
},
970+
"pipeline:logs": {
971+
Event: componentstatus.NewRecoverableErrorEvent(fmt.Errorf("pipeline error")),
972+
ComponentStatusMap: map[string]*status.AggregateStatus{
973+
"receiver:filebeat": {
974+
Event: componentstatus.NewEvent(componentstatus.StatusStarting),
975+
},
976+
"exporter:elasticsearch": {
977+
Event: componentstatus.NewEvent(componentstatus.StatusOK),
978+
},
979+
"processor:batch": {
980+
Event: componentstatus.NewEvent(componentstatus.StatusOK),
981+
},
982+
},
983+
},
984+
},
985+
},
986+
expected: []fleetapi.CheckinComponent{
987+
{
988+
ID: "extensions",
989+
Type: "otel",
990+
Status: "HEALTHY",
991+
Message: "Healthy",
992+
Units: []fleetapi.CheckinUnit{
993+
{
994+
ID: "extensions:healthcheck",
995+
Type: "",
996+
Status: "HEALTHY",
997+
Message: "Healthy",
998+
},
999+
},
1000+
},
1001+
{
1002+
ID: "pipeline:logs",
1003+
Type: "otel",
1004+
Status: "DEGRADED",
1005+
Message: "Recoverable: pipeline error",
1006+
Units: []fleetapi.CheckinUnit{
1007+
{
1008+
ID: "exporter:elasticsearch",
1009+
Type: "output",
1010+
Status: "HEALTHY",
1011+
Message: "Healthy",
1012+
},
1013+
{
1014+
ID: "processor:batch",
1015+
Type: "",
1016+
Status: "HEALTHY",
1017+
Message: "Healthy",
1018+
},
1019+
{
1020+
ID: "receiver:filebeat",
1021+
Type: "input",
1022+
Status: "STARTING",
1023+
Message: "Starting",
1024+
},
1025+
},
1026+
},
1027+
},
1028+
},
1029+
{
1030+
name: "Both agent and OTel components",
1031+
components: []runtime.ComponentComponentState{
1032+
{
1033+
Component: component.Component{ID: "comp-1", InputSpec: &component.InputRuntimeSpec{InputType: "log"}},
1034+
State: runtime.ComponentState{
1035+
State: eaclient.UnitStateHealthy,
1036+
Message: "Component is healthy",
1037+
},
1038+
},
1039+
},
1040+
collector: &status.AggregateStatus{
1041+
ComponentStatusMap: map[string]*status.AggregateStatus{
1042+
"pipeline:logs": {
1043+
Event: componentstatus.NewEvent(componentstatus.StatusOK),
1044+
},
1045+
},
1046+
},
1047+
expected: []fleetapi.CheckinComponent{
1048+
{
1049+
ID: "comp-1",
1050+
Type: "log",
1051+
Status: "HEALTHY",
1052+
Message: "Component is healthy",
1053+
},
1054+
{
1055+
ID: "pipeline:logs",
1056+
Type: "otel",
1057+
Status: "HEALTHY",
1058+
Message: "Healthy",
1059+
},
1060+
},
1061+
},
1062+
{
1063+
name: "Unknown states and types",
1064+
components: []runtime.ComponentComponentState{
1065+
{
1066+
Component: component.Component{ID: "comp-1", InputSpec: &component.InputRuntimeSpec{InputType: "log"}},
1067+
State: runtime.ComponentState{
1068+
State: eaclient.UnitState(99),
1069+
Message: "Unknown state",
1070+
Units: map[runtime.ComponentUnitKey]runtime.ComponentUnitState{
1071+
{UnitID: "unit-1", UnitType: eaclient.UnitType(99)}: {
1072+
State: eaclient.UnitState(99),
1073+
Message: "Unknown unit state",
1074+
},
1075+
},
1076+
},
1077+
},
1078+
},
1079+
collector: nil,
1080+
expected: []fleetapi.CheckinComponent{
1081+
{
1082+
ID: "comp-1",
1083+
Type: "log",
1084+
Status: "",
1085+
Message: "Unknown state",
1086+
Units: []fleetapi.CheckinUnit{
1087+
{
1088+
ID: "unit-1",
1089+
Type: "",
1090+
Status: "",
1091+
Message: "Unknown unit state",
1092+
},
1093+
},
1094+
},
1095+
},
1096+
},
1097+
{
1098+
name: "OTel component with invalid ID",
1099+
components: []runtime.ComponentComponentState{},
1100+
collector: &status.AggregateStatus{
1101+
ComponentStatusMap: map[string]*status.AggregateStatus{
1102+
"invalid-id": {
1103+
Event: componentstatus.NewEvent(componentstatus.StatusOK),
1104+
ComponentStatusMap: map[string]*status.AggregateStatus{
1105+
"invalid-unit-id": {
1106+
Event: componentstatus.NewEvent(componentstatus.StatusOK),
1107+
},
1108+
},
1109+
},
1110+
},
1111+
},
1112+
expected: []fleetapi.CheckinComponent{
1113+
{
1114+
ID: "invalid-id",
1115+
Type: "otel",
1116+
Status: "HEALTHY",
1117+
Message: "Healthy",
1118+
Units: []fleetapi.CheckinUnit{
1119+
{
1120+
ID: "invalid-unit-id",
1121+
Type: "",
1122+
Status: "HEALTHY",
1123+
Message: "Healthy",
1124+
},
1125+
},
1126+
},
1127+
},
1128+
},
1129+
}
1130+
1131+
for _, tt := range tests {
1132+
t.Run(tt.name, func(t *testing.T) {
1133+
result := convertToCheckinComponents(logp.NewNopLogger(), tt.components, tt.collector)
1134+
// Testify diffs are nicer if we sort and compare directly vs using ElementsMathc
1135+
slices.SortFunc(result, func(a, b fleetapi.CheckinComponent) int {
1136+
return strings.Compare(a.ID, b.ID)
1137+
})
1138+
for _, c := range result {
1139+
slices.SortFunc(c.Units, func(a, b fleetapi.CheckinUnit) int {
1140+
return strings.Compare(a.ID, b.ID)
1141+
})
1142+
}
1143+
assert.Equal(t, tt.expected, result)
1144+
})
1145+
}
1146+
}

0 commit comments

Comments
 (0)