Skip to content

Commit 740ea0f

Browse files
pkoutsovasilishayotbisonai
authored andcommitted
fix: zombie processes during restart (elastic#10650)
* fix: zombie processes during restart by extending shutdown timeout to 35sRetry * fix: linter QF1003 could use tagged switch on state * fix: linter QF1012 * doc: add changelog * doc: reword test code comments * fix: make otel manager process stop timeout way shorter * doc: add more documentation * doc: remove changelog fragment * doc: reword managerShutdownTimeout comment
1 parent 18e54a9 commit 740ea0f

File tree

9 files changed

+82
-12
lines changed

9 files changed

+82
-12
lines changed

internal/pkg/agent/application/application.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -266,7 +266,7 @@ func New(
266266
agentInfo,
267267
cfg.Settings.Collector,
268268
monitor.ComponentMonitoringConfig,
269-
cfg.Settings.ProcessConfig.StopTimeout,
269+
otelmanager.CollectorStopTimeout,
270270
)
271271
if err != nil {
272272
return nil, nil, nil, fmt.Errorf("failed to create otel manager: %w", err)

internal/pkg/agent/application/coordinator/coordinator.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,11 @@ type ComponentsModifier func(comps []component.Component, cfg map[string]interfa
230230

231231
// managerShutdownTimeout is how long the coordinator will wait during shutdown
232232
// to receive termination states from its managers.
233+
// Note: The current timeout (5s) is shorter than the default stop timeout for
234+
// subprocess components (30s from process.DefaultConfig()). This means the
235+
// coordinator may not wait for the subprocesses to finish terminating, preventing
236+
// Wait() from being called on them. This will result in zombie processes
237+
// during restart on Unix systems.
233238
const managerShutdownTimeout = time.Second * 5
234239

235240
type configReloader interface {

internal/pkg/agent/application/reexec/manager.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,10 @@ func NewManager(log *logger.Logger, exec string) ExecManager {
4848

4949
func (m *manager) ReExec(shutdownCallback ShutdownCallbackFn, argOverrides ...string) {
5050
go func() {
51+
m.logger.Debug("Triggering manager shutdown")
5152
close(m.trigger)
5253
<-m.shutdown
54+
m.logger.Debug("Manager shutdown complete")
5355

5456
if shutdownCallback != nil {
5557
if err := shutdownCallback(); err != nil {

internal/pkg/otel/manager/execution.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,5 +24,8 @@ type collectorExecution interface {
2424
}
2525

2626
type collectorHandle interface {
27+
// Stop stops and waits for collector to exit gracefully within the given duration. Note that if the collector
28+
// doesn't exit within that time, it will be killed and then it will wait an extra second for it to ensure it's
29+
// really stopped.
2730
Stop(waitTime time.Duration)
2831
}

internal/pkg/otel/manager/execution_subprocess.go

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,7 @@ func (r *subprocessExecution) startCollector(ctx context.Context, logger *logger
203203

204204
go func() {
205205
procState, procErr := processInfo.Process.Wait()
206+
logger.Debugf("wait for pid %d returned", processInfo.PID)
206207
procCtxCancel()
207208
<-healthCheckDone
208209
close(ctl.processDoneCh)
@@ -309,19 +310,26 @@ func (s *procHandle) Stop(waitTime time.Duration) {
309310
default:
310311
}
311312

313+
s.log.Debugf("gracefully stopping pid %d", s.processInfo.PID)
312314
if err := s.processInfo.Stop(); err != nil {
313315
s.log.Warnf("failed to send stop signal to the supervised collector: %v", err)
314316
// we failed to stop the process just kill it and return
315-
_ = s.processInfo.Kill()
316-
return
317+
} else {
318+
select {
319+
case <-time.After(waitTime):
320+
s.log.Warnf("timeout waiting (%s) for the supervised collector to stop, killing it", waitTime.String())
321+
case <-s.processDoneCh:
322+
// process has already exited
323+
return
324+
}
317325
}
318326

327+
// since we are here this means that the process either got an error at stop or did not stop within the timeout,
328+
// kill it and give one more mere second for the process wait to be called
329+
_ = s.processInfo.Kill()
319330
select {
320-
case <-time.After(waitTime):
321-
s.log.Warnf("timeout waiting (%s) for the supervised collector to stop, killing it", waitTime.String())
322-
// our caller ctx is Done; kill the process just in case
323-
_ = s.processInfo.Kill()
331+
case <-time.After(1 * time.Second):
332+
s.log.Warnf("supervised collector subprocess didn't exit in time after killing it")
324333
case <-s.processDoneCh:
325-
// process has already exited
326334
}
327335
}

internal/pkg/otel/manager/manager.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,10 @@ type ExecutionMode string
4444
const (
4545
SubprocessExecutionMode ExecutionMode = "subprocess"
4646
EmbeddedExecutionMode ExecutionMode = "embedded"
47+
48+
// CollectorStopTimeout is the duration to wait for the collector to stop. Note: this needs to be shorter
49+
// than 5 * time.Second (coordinator.managerShutdownTimeout) otherwise we might end up with a defunct process.
50+
CollectorStopTimeout = 3 * time.Second
4751
)
4852

4953
type collectorRecoveryTimer interface {

pkg/component/runtime/command.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -337,9 +337,10 @@ func (c *commandRuntime) forceCompState(state client.UnitState, msg string) {
337337
// compState updates just the component state not all the units.
338338
func (c *commandRuntime) compState(state client.UnitState) {
339339
msg := stateUnknownMessage
340-
if state == client.UnitStateHealthy {
340+
switch state {
341+
case client.UnitStateHealthy:
341342
msg = fmt.Sprintf("Healthy: communicating with pid '%d'", c.proc.PID)
342-
} else if state == client.UnitStateDegraded {
343+
case client.UnitStateDegraded:
343344
if c.missedCheckins == 1 {
344345
msg = fmt.Sprintf("Degraded: pid '%d' missed 1 check-in", c.proc.PID)
345346
} else {
@@ -433,17 +434,20 @@ func (c *commandRuntime) stop(ctx context.Context) error {
433434
return
434435
case <-t.C:
435436
// kill no matter what (might already be stopped)
437+
c.log.Debugf("timeout waiting for pid %d, killing it", c.proc.PID)
436438
_ = info.Kill()
437439
}
438440
}(c.proc, cmdSpec.Timeouts.Stop)
441+
442+
c.log.Debugf("gracefully stopping pid %d", c.proc.PID)
439443
return c.proc.Stop()
440444
}
441445

442446
func (c *commandRuntime) startWatcher(info *process.Info, comm Communicator) {
443447
go func() {
444448
err := comm.WriteStartUpInfo(info.Stdin)
445449
if err != nil {
446-
_, _ = c.logErr.Write([]byte(fmt.Sprintf("Failed: failed to provide connection information to spawned pid '%d': %s", info.PID, err)))
450+
_, _ = fmt.Fprintf(c.logErr, "Failed: failed to provide connection information to spawned pid '%d': %s", info.PID, err)
447451
// kill instantly
448452
_ = info.Kill()
449453
} else {
@@ -452,6 +456,7 @@ func (c *commandRuntime) startWatcher(info *process.Info, comm Communicator) {
452456

453457
ch := info.Wait()
454458
s := <-ch
459+
c.log.Debugf("wait for pid %d returned", info.PID)
455460
c.procCh <- procState{
456461
proc: info,
457462
state: s,

pkg/testing/fixture.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -849,6 +849,12 @@ func (f *Fixture) ExecInspect(ctx context.Context, opts ...process.CmdOption) (A
849849
return inspect, err
850850
}
851851

852+
// ExecRestart executes the restart subcommand on the prepared Elastic Agent binary.
853+
func (f *Fixture) ExecRestart(ctx context.Context, opts ...process.CmdOption) error {
854+
_, err := f.Exec(ctx, []string{"restart"}, opts...)
855+
return err
856+
}
857+
852858
// ExecVersion executes the version subcommand on the prepared Elastic Agent binary
853859
// with '--binary-only'. It returns the parsed YAML output.
854860
func (f *Fixture) ExecVersion(ctx context.Context, opts ...process.CmdOption) (AgentVersionOutput, error) {

testing/integration/ess/metrics_monitoring_test.go

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,17 @@ import (
1313
"fmt"
1414
"net/http"
1515
"net/http/httputil"
16+
"runtime"
1617
"testing"
1718
"time"
1819

1920
"github.com/gofrs/uuid/v5"
21+
"github.com/stretchr/testify/assert"
2022
"github.com/stretchr/testify/require"
2123
"github.com/stretchr/testify/suite"
2224

25+
"github.com/elastic/elastic-agent-system-metrics/metric/system/process"
26+
2327
"github.com/elastic/elastic-agent-libs/kibana"
2428
"github.com/elastic/elastic-agent-libs/testing/estools"
2529
otelMonitoring "github.com/elastic/elastic-agent/internal/pkg/otel/monitoring"
@@ -207,7 +211,40 @@ func (runner *MetricsRunner) TestBeatsMetrics() {
207211
return false
208212
}
209213
return true
210-
}, time.Minute*10, time.Second*10, "could not fetch metrics for all known components in default install: %v", componentIds)
214+
}, time.Minute*10, time.Second*10, "could not fetch metrics for edot collector")
215+
216+
if runtime.GOOS == "windows" {
217+
return
218+
}
219+
220+
// restart the agent to validate that this does not result in any agent-spawned subprocess
221+
// becoming defunct
222+
err = runner.agentFixture.ExecRestart(ctx)
223+
require.NoError(t, err, "could not restart agent")
224+
225+
require.Eventually(t, func() bool {
226+
err = runner.agentFixture.IsHealthy(ctx)
227+
if err != nil {
228+
t.Logf("waiting for agent healthy: %s", err.Error())
229+
return false
230+
}
231+
return true
232+
}, 1*time.Minute, 1*time.Second)
233+
234+
procStats := process.Stats{
235+
// filtering with '.*elastic-agent' or '^.*elastic-agent$' doesn't
236+
// seem to work as expected
237+
Procs: []string{".*"},
238+
}
239+
err = procStats.Init()
240+
require.NoError(t, err, "could not initialize process.Stats")
241+
242+
pidMap, _, err := procStats.FetchPids()
243+
require.NoError(t, err, "could not fetch pids")
244+
245+
for _, state := range pidMap {
246+
assert.NotEqualValuesf(t, process.Zombie, state.State, "process %d is in zombie state", state.Pid.ValueOr(0))
247+
}
211248
}
212249

213250
func genESQuery(agentID string, requiredFields [][]string) map[string]interface{} {

0 commit comments

Comments
 (0)