Skip to content

Commit 0eba9a3

Browse files
committed
Merge branch 'pr/450'
2 parents 39916bc + 18f0f06 commit 0eba9a3

2 files changed

Lines changed: 177 additions & 1 deletion

File tree

src/app/process.go

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,18 @@ func NewProcess(opts ...ProcOpts) *Process {
110110

111111
func (p *Process) run() int {
112112
if p.isState(types.ProcessStateTerminating) {
113-
return 0
113+
select {
114+
case <-p.procRunCtx.Done():
115+
// Concurrent stop in progress — honour it.
116+
return 0
117+
default:
118+
// Stale "Terminating" state from a prior broken shutdown (e.g. zombie
119+
// that was reaped without completing the normal lifecycle). Reset so
120+
// the process can start fresh.
121+
log.Warn().Str("process", p.getName()).
122+
Msg("Resetting stale Terminating state before start")
123+
p.setState(types.ProcessStatePending)
124+
}
114125
}
115126

116127
if err := p.validateProcess(); err != nil {
@@ -187,13 +198,24 @@ func (p *Process) waitForStdOutErr() {
187198
ctx, cancel := context.WithCancel(context.Background())
188199
if p.procConf.IsDaemon {
189200
ctx, cancel = context.WithTimeout(context.Background(), time.Duration(p.procConf.LaunchTimeout)*time.Second)
201+
} else if p.isState(types.ProcessStateTerminating) {
202+
// In terminating state, never block forever on stdout/stderr drain.
203+
// Reaching command.Wait() is required to reap child processes.
204+
timeoutSec := p.procConf.ShutDownParams.ShutDownTimeout
205+
if timeoutSec == UndefinedShutdownTimeoutSec {
206+
timeoutSec = DefaultShutdownTimeoutSec
207+
}
208+
ctx, cancel = context.WithTimeout(context.Background(), time.Duration(timeoutSec)*time.Second)
190209
}
191210
defer cancel()
192211
if p.stdOutDone != nil {
193212
select {
194213
case <-ctx.Done():
195214
log.Debug().Msgf("%s stdout done with timeout", p.getName())
196215
return
216+
case <-p.procRunCtx.Done():
217+
log.Debug().Msgf("%s stdout abandoned: process stopped", p.getName())
218+
return
197219
case <-p.stdOutDone:
198220
log.Debug().Msgf("%s stdout done", p.getName())
199221
}
@@ -204,6 +226,9 @@ func (p *Process) waitForStdOutErr() {
204226
case <-ctx.Done():
205227
log.Debug().Msgf("%s stderr done with timeout", p.getName())
206228
return
229+
case <-p.procRunCtx.Done():
230+
log.Debug().Msgf("%s stderr abandoned: process stopped", p.getName())
231+
return
207232
case <-p.stdErrDone:
208233
log.Debug().Msgf("%s stderr done", p.getName())
209234
}

src/app/system_test.go

Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1431,6 +1431,157 @@ func TestSystem_ConcurrentRestartRaceCondition(t *testing.T) {
14311431
}
14321432
}
14331433

1434+
func TestSystem_StartProcessResetsStaleTerminatingState(t *testing.T) {
1435+
testProcess := "stale_terminating"
1436+
shell := command.DefaultShellConfig()
1437+
1438+
project := &types.Project{
1439+
Processes: map[string]types.ProcessConfig{
1440+
testProcess: {
1441+
Name: testProcess,
1442+
ReplicaName: testProcess,
1443+
Executable: shell.ShellCommand,
1444+
Args: []string{shell.ShellArgument, getSleepCommand(1.0)},
1445+
RestartPolicy: types.RestartPolicyConfig{
1446+
Restart: types.RestartPolicyNo,
1447+
},
1448+
},
1449+
},
1450+
ShellConfig: shell,
1451+
}
1452+
1453+
runner, err := NewProjectRunner(&ProjectOpts{
1454+
project: project,
1455+
processesToRun: []string{},
1456+
noDeps: false,
1457+
mainProcess: "",
1458+
mainProcessArgs: []string{},
1459+
isTuiOn: false,
1460+
})
1461+
if err != nil {
1462+
t.Error(err.Error())
1463+
return
1464+
}
1465+
1466+
// Simulate stale state from a prior broken termination.
1467+
runner.statesMutex.Lock()
1468+
runner.processStates[testProcess].Status = types.ProcessStateTerminating
1469+
runner.processStates[testProcess].IsRunning = false
1470+
runner.statesMutex.Unlock()
1471+
runner.runProcMutex.Lock()
1472+
runner.runningProcesses = make(map[string]*Process)
1473+
runner.runProcMutex.Unlock()
1474+
runner.doneProcMutex.Lock()
1475+
runner.doneProcesses = make(map[string]*Process)
1476+
runner.doneProcMutex.Unlock()
1477+
runner.logger = pclog.NewNilLogger()
1478+
1479+
if err := runner.StartProcess(testProcess); err != nil {
1480+
t.Fatalf("failed to start process: %v", err)
1481+
}
1482+
1483+
var lastStatus string
1484+
for attempts := range 200 {
1485+
state, stateErr := runner.GetProcessState(testProcess)
1486+
if stateErr != nil {
1487+
t.Fatalf("failed to get process state: %v", stateErr)
1488+
}
1489+
lastStatus = state.Status
1490+
if state.Status == types.ProcessStateRunning || state.Status == types.ProcessStateCompleted {
1491+
break
1492+
}
1493+
time.Sleep(10 * time.Millisecond)
1494+
if attempts == 199 {
1495+
t.Fatalf("process failed to leave stale Terminating state, last status=%s", state.Status)
1496+
}
1497+
}
1498+
1499+
if lastStatus == types.ProcessStateTerminating {
1500+
t.Fatalf("expected process to recover from stale Terminating state, got %s", lastStatus)
1501+
}
1502+
1503+
// Cleanup if still running.
1504+
if runner.getRunningProcess(testProcess) != nil {
1505+
if err := runner.StopProcess(testProcess); err != nil {
1506+
t.Fatalf("failed to stop process: %v", err)
1507+
}
1508+
}
1509+
}
1510+
1511+
func TestSystem_TestTerminatingWithOrphanedChildHoldingPipe(t *testing.T) {
1512+
if runtime.GOOS == "windows" {
1513+
t.Skip("test requires Unix process semantics")
1514+
}
1515+
// Reproduce the zombie/stale-Terminating scenario:
1516+
// A parent process spawns a child that inherits stdout and keeps it open.
1517+
// When we stop the parent, the stdout pipe never gets an EOF because the
1518+
// child still holds it. Without a timeout in waitForStdOutErr, the
1519+
// process would hang in Terminating forever and never reach Completed.
1520+
1521+
testProcess := "orphan_pipe"
1522+
shell := command.DefaultShellConfig()
1523+
shutdownTimeout := 3
1524+
1525+
project := &types.Project{
1526+
Processes: map[string]types.ProcessConfig{
1527+
testProcess: {
1528+
Name: testProcess,
1529+
ReplicaName: testProcess,
1530+
Executable: shell.ShellCommand,
1531+
// Parent spawns a child in a new session (setsid) so it won't
1532+
// receive the SIGTERM sent to the parent's process group.
1533+
// The child writes to stdout (holding the pipe open) and sleeps.
1534+
// When the parent is killed, the child survives and keeps the
1535+
// pipe open, blocking waitForStdOutErr until the timeout fires.
1536+
Args: []string{shell.ShellArgument, "setsid bash -c 'while true; do echo holding_pipe; sleep 1; done' & sleep 60"},
1537+
RestartPolicy: types.RestartPolicyConfig{
1538+
Restart: types.RestartPolicyNo,
1539+
},
1540+
ShutDownParams: types.ShutDownParams{
1541+
ShutDownTimeout: shutdownTimeout,
1542+
Signal: int(syscall.SIGTERM),
1543+
},
1544+
},
1545+
},
1546+
ShellConfig: shell,
1547+
}
1548+
1549+
runner, err := NewProjectRunner(&ProjectOpts{project: project})
1550+
if err != nil {
1551+
t.Fatal(err)
1552+
}
1553+
go func() {
1554+
_ = runner.Run()
1555+
}()
1556+
1557+
// Wait for the process to be running (poll via mutex-protected method to avoid race).
1558+
var proc *Process
1559+
for i := 0; i < 50; i++ {
1560+
time.Sleep(100 * time.Millisecond)
1561+
proc = runner.getRunningProcess(testProcess)
1562+
if proc != nil {
1563+
break
1564+
}
1565+
}
1566+
if proc == nil {
1567+
t.Fatal("process never started")
1568+
}
1569+
1570+
// Stop the parent — it will enter Terminating, but the child keeps stdout open.
1571+
stopDone := make(chan struct{})
1572+
go func() {
1573+
defer close(stopDone)
1574+
_ = runner.StopProcess(testProcess)
1575+
}()
1576+
1577+
// The process must reach Completed within the shutdown timeout + margin,
1578+
// NOT stay stuck in Terminating.
1579+
waitForProcessState(t, runner, testProcess, types.ProcessStateCompleted,
1580+
time.Duration(shutdownTimeout+5)*time.Second)
1581+
1582+
<-stopDone
1583+
}
1584+
14341585
func TestReadinessProbeRestart(t *testing.T) {
14351586
proc := &types.ProcessConfig{
14361587
Name: "test",

0 commit comments

Comments
 (0)