Skip to content
242 changes: 144 additions & 98 deletions pexec/managed_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,13 @@ import (

var errAlreadyStopped = errors.New("already stopped")

// UnexpectedExitHandler is the signature for functions that can optionally be
// provided to run when a managed process unexpectedly exits. The return value
// indicates whether pexec should continue with its own attempt to restart the
// process: true means pexec will attempt its own restart, false means the
// no restart will be attempted and the process will remain dead.
type UnexpectedExitHandler = func(exitCode int) bool

// A ManagedProcess controls the lifecycle of a single system process. Based on
// its configuration, it will ensure the process is revived if it every unexpectedly
// perishes.
Expand Down Expand Up @@ -101,8 +108,7 @@ type managedProcess struct {
shouldLog bool
cmd *exec.Cmd

stopped bool
onUnexpectedExit func(int) bool
onUnexpectedExit UnexpectedExitHandler
managingCh chan struct{}
killCh chan struct{}
stopSig syscall.Signal
Expand Down Expand Up @@ -159,7 +165,11 @@ func (p *managedProcess) validateCWD() error {
func (p *managedProcess) Start(ctx context.Context) error {
p.mu.Lock()
defer p.mu.Unlock()
return p.start(ctx)
}

// This internal version of start must be called with the process lock held.
func (p *managedProcess) start(ctx context.Context) error {
// In the event this Start happened from a restart but a
// stop happened while we were acquiring the lock, we may
// need to return early.
Expand Down Expand Up @@ -268,85 +278,24 @@ func (p *managedProcess) manage(stdOut, stdErr io.ReadCloser) {
}
}()

// This block here logs as much as possible if it's requested until the
// pipes are closed.
stopLogging := make(chan struct{})
var activeLoggers sync.WaitGroup
if p.shouldLog || p.logWriter != nil {
logPipe := func(name string, pipe io.ReadCloser, isErr bool, logger utils.ZapCompatibleLogger) {
defer activeLoggers.Done()
pipeR := bufio.NewReader(pipe)
logWriterError := false
for {
select {
case <-stopLogging:
return
default:
}
line, _, err := pipeR.ReadLine()
if err != nil {
if !errors.Is(err, io.EOF) && !errors.Is(err, os.ErrClosed) {
p.logger.Errorw("error reading output", "name", name, "error", err)
}
return
}
if p.shouldLog {
if isErr {
logger.Error("\n\\_ " + string(line))
} else {
logger.Info("\n\\_ " + string(line))
}
}
if p.logWriter != nil && !logWriterError {
_, err := p.logWriter.Write(line)
if err == nil {
_, err = p.logWriter.Write([]byte("\n"))
}
if err != nil {
if !errors.Is(err, io.ErrClosedPipe) {
p.logger.Debugw("error writing process output to log writer", "name", name, "error", err)
}
if !p.shouldLog {
return
}
logWriterError = true
}
}
}
}
activeLoggers.Add(2)
utils.PanicCapturingGo(func() {
name := "StdOut"
var logger utils.ZapCompatibleLogger
if p.stdoutLogger != nil {
logger = p.stdoutLogger
} else {
logger = utils.Sublogger(p.logger, name)
}
logPipe(name, stdOut, false, logger)
})
utils.PanicCapturingGo(func() {
name := "StdErr"
var logger utils.ZapCompatibleLogger
if p.stderrLogger != nil {
logger = p.stderrLogger
} else {
logger = utils.Sublogger(p.logger, name)
}
logPipe(name, stdErr, true, logger)
})
}
stopAndDrainLoggers := p.startLoggers(stdOut, stdErr)

err := p.cmd.Wait()
// This is safe to write to because it is only read in Stop which
// is waiting for us to stop managing.
if err == nil {
p.lastWaitErr = nil
} else {
p.lastWaitErr = err
}
close(stopLogging)
activeLoggers.Wait()
p.lastWaitErr = err

stopAndDrainLoggers()

// Take the lock to prevent a race where a crashed process restarts even
// though Stop is called.
p.mu.Lock()
locked := true
defer func() {
if locked {
p.mu.Unlock()
}
}()

// It's possible that Stop was called and is the reason why Wait returned.
select {
Expand All @@ -356,10 +305,23 @@ func (p *managedProcess) manage(stdOut, stdErr io.ReadCloser) {
}

// Run onUnexpectedExit if it exists. Do not attempt restart if
// onUnexpectedExit returns false.
if p.onUnexpectedExit != nil &&
!p.onUnexpectedExit(p.cmd.ProcessState.ExitCode()) {
return
// onUnexpectedExit returns false. Drop the lock to avoid deadlocking other
// goroutines that my try to call Stop while we're handling a crash.
if p.onUnexpectedExit != nil {
p.mu.Unlock()
locked = false
if !p.onUnexpectedExit(p.cmd.ProcessState.ExitCode()) {
return
}
p.mu.Lock()
locked = true
// Dropped the lock to call the oue handler, check if we were stopped during
// that time.
select {
case <-p.killCh:
return
default:
}
}

// Otherwise, let's try restarting the process.
Expand Down Expand Up @@ -387,40 +349,123 @@ func (p *managedProcess) manage(stdOut, stdErr io.ReadCloser) {
return
}

err = p.Start(context.Background())
// Use the internal version of start since we already hold the lock.
err = p.start(context.Background())
if err != nil {
if !errors.Is(err, errAlreadyStopped) {
// MAYBE(erd): add retry
p.logger.Errorw("error restarting process", "error", err)
}
return
}
restarted = true
}

// This helper function is only meant to be called from manage. If logging is
// enabled it creates goroutines that log as much as possible until the pipes
// are closed. It returns a function that terminates logging and blocks until
// the loggers drain.
func (p *managedProcess) startLoggers(stdOut, stdErr io.ReadCloser) func() {
if !p.shouldLog && p.logWriter == nil {
// No logging enabled, return a noop func so the caller can unconditionally
// invoke it.
return func() {}
}

stopLogging := make(chan struct{})
var activeLoggers sync.WaitGroup
activeLoggers.Add(2)
logPipe := func(name string, pipe io.ReadCloser, isErr bool, logger utils.ZapCompatibleLogger) {
defer activeLoggers.Done()
pipeR := bufio.NewReader(pipe)
logWriterError := false
for {
select {
case <-stopLogging:
return
default:
}
line, _, err := pipeR.ReadLine()
if err != nil {
if !errors.Is(err, io.EOF) && !errors.Is(err, os.ErrClosed) {
p.logger.Errorw("error reading output", "name", name, "error", err)
}
return
}
if p.shouldLog {
if isErr {
logger.Error("\n\\_ " + string(line))
} else {
logger.Info("\n\\_ " + string(line))
}
}
if p.logWriter != nil && !logWriterError {
_, err := p.logWriter.Write(line)
if err == nil {
_, err = p.logWriter.Write([]byte("\n"))
}
if err != nil {
if !errors.Is(err, io.ErrClosedPipe) {
p.logger.Debugw("error writing process output to log writer", "name", name, "error", err)
}
if !p.shouldLog {
return
}
logWriterError = true
}
}
}
}

utils.PanicCapturingGo(func() {
name := "StdOut"
var logger utils.ZapCompatibleLogger
if p.stdoutLogger != nil {
logger = p.stdoutLogger
} else {
logger = utils.Sublogger(p.logger, name)
}
logPipe(name, stdOut, false, logger)
})
utils.PanicCapturingGo(func() {
name := "StdErr"
var logger utils.ZapCompatibleLogger
if p.stderrLogger != nil {
logger = p.stderrLogger
} else {
logger = utils.Sublogger(p.logger, name)
}
logPipe(name, stdErr, true, logger)
})

return func() {
close(stopLogging)
activeLoggers.Wait()
}
}

func (p *managedProcess) Stop() error {
// Minimally hold a lock here so that we can signal the
// management goroutine to stop. If we were to hold the
// lock for the duration of the function, we would possibly
// deadlock with manage trying to restart.
p.mu.Lock()
if p.stopped {

// Return early if the process has already been killed.
select {
case <-p.killCh:
p.mu.Unlock()
<-p.managingCh
return nil
default:
}

close(p.killCh)
p.stopped = true

// All relevant methods wait on the lock we hold and will not attempt to
// (re)start the process now that we closed killch, so we can safely drop the
// lock to unblock other calls while we proceed with shutown.
p.mu.Unlock()

// Nothing to do.
if p.cmd == nil {
p.mu.Unlock()
return nil
}
p.mu.Unlock()

// Since p.cmd is mutex guarded and we just signaled the manage
// goroutine to stop, no new Start can happen and therefore
// p.cmd can no longer be modified rendering it safe to read
// without a lock held.

forceKilled, err := p.kill()
if err != nil {
Expand Down Expand Up @@ -460,9 +505,10 @@ func (p *managedProcess) KillGroup() {
// management goroutine to stop. We will attempt to kill the
// process even if p.stopped is true.
p.mu.Lock()
if !p.stopped {
select {
case <-p.killCh:
default:
close(p.killCh)
p.stopped = true
}

if p.cmd == nil {
Expand Down
2 changes: 1 addition & 1 deletion pexec/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ type ProcessConfig struct {
//
// NOTE(benjirewis): use `jsonschema:"-"` struct tag to avoid issues with
// jsonschema reflection (go functions cannot be encoded to JSON).
OnUnexpectedExit func(int) bool `jsonschema:"-"`
OnUnexpectedExit UnexpectedExitHandler `jsonschema:"-"`
// The logger to use for STDOUT of this process. If not specified, will use
// a sublogger of the `logger` parameter given to `NewManagedProcess`.
StdOutLogger utils.ZapCompatibleLogger
Expand Down
13 changes: 4 additions & 9 deletions runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,8 @@ func PanicCapturingGoWithCallback(f func(), callback func(err interface{})) {
go func() {
defer func() {
if err := recover(); err != nil {
debug.PrintStack()
golog.Global().Errorw("panic while running function", "error", err)
debug.PrintStack()
if callback == nil {
return
}
Expand All @@ -161,15 +161,10 @@ func PanicCapturingGoWithCallback(f func(), callback func(err interface{})) {
// it terminates normally.
func ManagedGo(f, onComplete func()) {
PanicCapturingGoWithCallback(func() {
defer func() {
if err := recover(); err == nil && onComplete != nil {
onComplete()
} else if err != nil {
// re-panic
panic(err)
}
}()
f()
if onComplete != nil {
onComplete()
}
}, func(_ interface{}) {
ManagedGo(f, onComplete)
})
Expand Down
Loading