Skip to content

Commit 14c87c2

Browse files
authored
[otelmanager] emit starting state in the beginning (#11234)
* feat: emit starting state * test * rename method * emit starting state * fix test * comments * fix case * ut
1 parent 7d808fa commit 14c87c2

File tree

4 files changed

+168
-0
lines changed

4 files changed

+168
-0
lines changed

internal/pkg/otel/manager/execution_subprocess.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,9 @@ func (r *subprocessExecution) startCollector(ctx context.Context, baseLogger *lo
171171
// after the collector exits, we need to report a nil status
172172
r.reportSubprocessCollectorStatus(ctx, statusCh, nil)
173173
return
174+
default:
175+
// if we face any other error (most likely, connection refused), log the error.
176+
logger.Debugf("Received an unexpected error while fetching component status: %v", err)
174177
}
175178
} else {
176179
maxFailuresTimer.Reset(maxFailuresDuration)

internal/pkg/otel/manager/manager_test.go

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1649,6 +1649,90 @@ func TestManagerAlwaysEmitsStoppedStatesForComponents(t *testing.T) {
16491649
}, time.Millisecond, time.Second*5)
16501650
}
16511651

1652+
func TestManagerEmitsStartingStatesWhenHealthcheckIsUnavailable(t *testing.T) {
1653+
testLogger, _ := loggertest.New("test")
1654+
agentInfo := &info.AgentInfo{}
1655+
beatMonitoringConfigGetter := mockBeatMonitoringConfigGetter
1656+
collectorStarted := make(chan struct{})
1657+
1658+
execution := &mockExecution{
1659+
collectorStarted: collectorStarted,
1660+
}
1661+
1662+
// Create manager with test dependencies
1663+
mgr, err := NewOTelManager(
1664+
testLogger,
1665+
logp.DebugLevel,
1666+
testLogger,
1667+
config.SubprocessExecutionMode, // irrelevant, we'll override it
1668+
agentInfo,
1669+
nil,
1670+
beatMonitoringConfigGetter,
1671+
time.Second,
1672+
)
1673+
require.NoError(t, err)
1674+
mgr.recoveryTimer = newRestarterNoop()
1675+
mgr.execution = execution
1676+
1677+
// Start manager in a goroutine
1678+
ctx, cancel := context.WithTimeout(context.Background(), time.Minute*5)
1679+
defer cancel()
1680+
1681+
go func() {
1682+
err := mgr.Run(ctx)
1683+
assert.ErrorIs(t, err, context.Canceled)
1684+
}()
1685+
1686+
testComp := testComponent("test")
1687+
components := []component.Component{testComp}
1688+
otelStatus := &status.AggregateStatus{
1689+
Event: componentstatus.NewEvent(componentstatus.StatusStarting),
1690+
}
1691+
// start the collector by giving it a mock config
1692+
mgr.Update(nil, components)
1693+
select {
1694+
case <-ctx.Done():
1695+
t.Fatal("timeout waiting for collector status update")
1696+
case <-execution.collectorStarted:
1697+
}
1698+
1699+
// send the status from the execution
1700+
select {
1701+
case <-ctx.Done():
1702+
t.Fatal("timeout waiting for collector status update")
1703+
case execution.statusCh <- otelStatus:
1704+
}
1705+
1706+
// verify we get the component Starting state from the manager
1707+
componentStates, err := getFromChannelOrErrorWithContext(t, ctx, mgr.WatchComponents(), mgr.Errors())
1708+
require.NoError(t, err)
1709+
require.NotNil(t, componentStates)
1710+
require.Len(t, componentStates, 1)
1711+
componentState := componentStates[0]
1712+
assert.Equal(t, componentState.State.State, client.UnitStateStarting)
1713+
assert.Equal(t, componentState.State.Message, "STARTING")
1714+
1715+
// stop the component by sending a nil config
1716+
mgr.Update(nil, nil)
1717+
1718+
// then send a nil status, indicating the collector is not running the component anymore
1719+
// do this a few times to see if the STOPPED state isn't lost along the way
1720+
for range 3 {
1721+
reportCollectorStatus(ctx, execution.statusCh, nil)
1722+
time.Sleep(time.Millisecond * 100) // TODO: Replace this with synctest after we upgrade to Go 1.25
1723+
}
1724+
1725+
// verify that we get a STOPPED state for the component
1726+
assert.EventuallyWithT(t, func(collect *assert.CollectT) {
1727+
componentStates, err := getFromChannelOrErrorWithContext(t, ctx, mgr.WatchComponents(), mgr.Errors())
1728+
require.NoError(collect, err)
1729+
require.NotNil(collect, componentStates)
1730+
require.Len(collect, componentStates, 1)
1731+
componentState := componentStates[0]
1732+
assert.Equal(collect, componentState.State.State, client.UnitStateStopped)
1733+
}, time.Millisecond, time.Second*5)
1734+
}
1735+
16521736
func getFromChannelOrErrorWithContext[T any](t *testing.T, ctx context.Context, ch <-chan T, errCh <-chan error) (T, error) {
16531737
t.Helper()
16541738
var result T

internal/pkg/otel/translate/status.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ func GetAllComponentStates(otelStatus *status.AggregateStatus, components []comp
3939
if compState, statusErr = getComponentState(pipelineStatus, comp); statusErr != nil {
4040
return nil, statusErr
4141
}
42+
} else if otelStatus != nil && otelStatus.Event != nil && otelStatus.Status() == componentstatus.StatusStarting {
43+
compState = getComponentStartingState(comp)
4244
} else {
4345
// If the component is not found in the OTel status, we return a stopped state.
4446
compState = runtime.ComponentComponentState{
@@ -290,6 +292,38 @@ func getComponentState(pipelineStatus *status.AggregateStatus, comp component.Co
290292
return compStatus, nil
291293
}
292294

295+
// getComponentStartingState returns a ComponentComponentState with all units in the starting state,
296+
// including version info and initial status for each unit.
297+
func getComponentStartingState(comp component.Component) runtime.ComponentComponentState {
298+
compState := runtime.ComponentState{
299+
State: client.UnitStateStarting,
300+
Message: client.UnitStateStarting.String(),
301+
Units: make(map[runtime.ComponentUnitKey]runtime.ComponentUnitState),
302+
VersionInfo: runtime.ComponentVersionInfo{
303+
Name: OtelComponentName,
304+
Meta: map[string]string{ // mimic what beats return over the control protocol
305+
"build_time": version.BuildTime().String(),
306+
"commit": version.Commit(),
307+
},
308+
BuildHash: version.Commit(),
309+
},
310+
}
311+
for _, unit := range comp.Units {
312+
unitKey := runtime.ComponentUnitKey{
313+
UnitID: unit.ID,
314+
UnitType: unit.Type,
315+
}
316+
compState.Units[unitKey] = getComponentUnitState(&status.AggregateStatus{
317+
Event: componentstatus.NewEvent(componentstatus.StatusStarting),
318+
ComponentStatusMap: map[string]*status.AggregateStatus{},
319+
}, unit)
320+
}
321+
return runtime.ComponentComponentState{
322+
Component: comp,
323+
State: compState,
324+
}
325+
}
326+
293327
// getUnitOtelStatuses extracts the receiver and exporter status from otel pipeline status.
294328
func getUnitOtelStatuses(pipelineStatus *status.AggregateStatus, comp component.Component) (
295329
receiverStatuses map[otelcomponent.ID]*status.AggregateStatus,

internal/pkg/otel/translate/status_test.go

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,53 @@ func TestGetAllComponentState(t *testing.T) {
167167
},
168168
},
169169
},
170+
{
171+
name: "component state starting",
172+
components: []component.Component{fileStreamOtelComponent},
173+
otelStatus: &status.AggregateStatus{
174+
Event: componentstatus.NewEvent(componentstatus.StatusStarting),
175+
ComponentStatusMap: map[string]*status.AggregateStatus{},
176+
},
177+
expected: []runtime.ComponentComponentState{
178+
{
179+
Component: fileStreamOtelComponent,
180+
State: runtime.ComponentState{
181+
State: client.UnitStateStarting,
182+
Message: "STARTING",
183+
Units: map[runtime.ComponentUnitKey]runtime.ComponentUnitState{
184+
runtime.ComponentUnitKey{UnitID: "filestream-unit", UnitType: client.UnitTypeInput}: {
185+
State: client.UnitStateStarting,
186+
Message: "STARTING",
187+
Payload: map[string]any{
188+
"streams": map[string]map[string]string{
189+
"test-1": {
190+
"error": "",
191+
"status": client.UnitStateStarting.String(),
192+
},
193+
"test-2": {
194+
"error": "",
195+
"status": client.UnitStateStarting.String(),
196+
},
197+
},
198+
},
199+
},
200+
runtime.ComponentUnitKey{UnitID: "filestream-default", UnitType: client.UnitTypeOutput}: {
201+
State: client.UnitStateStarting,
202+
Message: "STARTING",
203+
},
204+
},
205+
VersionInfo: runtime.ComponentVersionInfo{
206+
Name: OtelComponentName,
207+
BuildHash: version.Commit(),
208+
Meta: map[string]string{
209+
"build_time": version.BuildTime().String(),
210+
"commit": version.Commit(),
211+
},
212+
},
213+
},
214+
},
215+
},
216+
},
170217
}
171218

172219
for _, test := range tests {

0 commit comments

Comments
 (0)