From 5ffed0061ab02bc334d3edb410f5731b832d393c Mon Sep 17 00:00:00 2001 From: Joshua Matthews Date: Tue, 6 May 2025 18:02:18 -0400 Subject: [PATCH 01/13] Break deadlock when a module tries to restart while being reconfigured --- module/modmanager/manager.go | 20 +++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/module/modmanager/manager.go b/module/modmanager/manager.go index 9004ffd9c4d..8ccce5db0f6 100644 --- a/module/modmanager/manager.go +++ b/module/modmanager/manager.go @@ -931,6 +931,14 @@ var oueRestartInterval = 5 * time.Second // for the passed-in module to include in the pexec.ProcessConfig. func (mgr *Manager) newOnUnexpectedExitHandler(mod *module) func(exitCode int) bool { return func(exitCode int) bool { + // There is a circular dependency that causes a deadlock if a module dies + // while being reconfigured. Break it here by giving up on the restart if we + // cannot lock the mananger. + if locked := mgr.mu.TryLock(); !locked { + return false + } + defer mgr.mu.Unlock() + mod.inRecoveryLock.Lock() defer mod.inRecoveryLock.Unlock() if mod.inStartup.Load() { @@ -976,10 +984,7 @@ func (mgr *Manager) newOnUnexpectedExitHandler(mod *module) func(exitCode int) b // Finally, handle orphaned resources. var orphanedResourceNames []resource.Name for name, res := range mod.resources { - // The `addResource` method might still be executing for this resource with a - // read lock, so we execute it here with a write lock to make sure it doesn't - // run concurrently. - if _, err := mgr.addResourceWithWriteLock(mgr.restartCtx, res.conf, res.deps); err != nil { + if _, err := mgr.addResource(mgr.restartCtx, res.conf, res.deps); err != nil { mod.logger.Warnw("Error while re-adding resource to module", "resource", name, "module", mod.cfg.Name, "error", err) mgr.rMap.Delete(name) @@ -1003,9 +1008,6 @@ func (mgr *Manager) newOnUnexpectedExitHandler(mod *module) func(exitCode int) b // attemptRestart will attempt to restart the module up to three times and // return the names of now orphaned resources. func (mgr *Manager) attemptRestart(ctx context.Context, mod *module) []resource.Name { - mgr.mu.Lock() - defer mgr.mu.Unlock() - // deregister crashed module's resources, and let later checkReady reset m.handles // before reregistering. mod.deregisterResources() @@ -1362,6 +1364,10 @@ func (m *module) stopProcess() error { if strings.Contains(err.Error(), errMessageExitStatus143) || strings.Contains(err.Error(), "no such process") { return nil } + // Even though we got an error the process is dead; task failed successfully. + if statusErr := m.process.Status(); errors.Is(statusErr, os.ErrProcessDone) { + return nil + } return err } From 4917724dec835d729264757ea725684c8d56e036 Mon Sep 17 00:00:00 2001 From: Joshua Matthews Date: Wed, 7 May 2025 12:21:32 -0400 Subject: [PATCH 02/13] Removing redundant synchronization code --- module/modmanager/manager.go | 25 ------------------------- 1 file changed, 25 deletions(-) diff --git a/module/modmanager/manager.go b/module/modmanager/manager.go index 8ccce5db0f6..314dccafcb8 100644 --- a/module/modmanager/manager.go +++ b/module/modmanager/manager.go @@ -104,15 +104,6 @@ type module struct { // pendingRemoval allows delaying module close until after resources within it are closed pendingRemoval bool - // inStartup stores whether or not the manager of the OnUnexpectedExit function - // is trying to start up this module; inRecoveryLock guards the execution of an - // OnUnexpectedExit function for this module. - // - // NOTE(benjirewis): Using just an atomic boolean is not sufficient, as OUE - // functions for the same module cannot overlap and should not continue after - // another OUE has finished. - inStartup atomic.Bool - inRecoveryLock sync.Mutex logger logging.Logger ftdc *ftdc.FTDC // port stores the listen port of this module when ViamTCPSockets() = true. @@ -385,13 +376,6 @@ func (mgr *Manager) startModuleProcess(mod *module) error { } func (mgr *Manager) startModule(ctx context.Context, mod *module) error { - // add calls startProcess, which can also be called by the OUE handler in the attemptRestart - // call. Both of these involve owning a lock, so in unhappy cases of malformed modules - // this can lead to a deadlock. To prevent this, we set inStartup here to indicate to - // the OUE handler that it shouldn't act while add is still processing. - mod.inStartup.Store(true) - defer mod.inStartup.Store(false) - var success bool defer func() { if !success { @@ -939,15 +923,6 @@ func (mgr *Manager) newOnUnexpectedExitHandler(mod *module) func(exitCode int) b } defer mgr.mu.Unlock() - mod.inRecoveryLock.Lock() - defer mod.inRecoveryLock.Unlock() - if mod.inStartup.Load() { - return false - } - - mod.inStartup.Store(true) - defer mod.inStartup.Store(false) - // Log error immediately, as this is unexpected behavior. mod.logger.Errorw( "Module has unexpectedly exited.", "module", mod.cfg.Name, "exit_code", exitCode, From 9092dcd50b33cd69ebc253e8411167124942977c Mon Sep 17 00:00:00 2001 From: Joshua Matthews Date: Thu, 8 May 2025 14:07:14 -0400 Subject: [PATCH 03/13] Refactoring module struct into a separate file --- module/modmanager/manager.go | 386 --------------------------------- module/modmanager/module.go | 410 +++++++++++++++++++++++++++++++++++ 2 files changed, 410 insertions(+), 386 deletions(-) create mode 100644 module/modmanager/module.go diff --git a/module/modmanager/manager.go b/module/modmanager/manager.go index 314dccafcb8..369276bd2df 100644 --- a/module/modmanager/manager.go +++ b/module/modmanager/manager.go @@ -4,41 +4,30 @@ package modmanager import ( "context" "fmt" - "io/fs" "os" "path/filepath" "regexp" "runtime" "slices" - "strconv" "strings" "sync" "sync/atomic" "time" - grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry" "github.com/pkg/errors" "go.uber.org/multierr" - "go.uber.org/zap/zapcore" pb "go.viam.com/api/module/v1" - robotpb "go.viam.com/api/robot/v1" "go.viam.com/utils" - "go.viam.com/utils/pexec" - "go.viam.com/utils/rpc" - "google.golang.org/grpc" "google.golang.org/grpc/codes" - "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/status" "go.viam.com/rdk/config" "go.viam.com/rdk/ftdc" - "go.viam.com/rdk/ftdc/sys" rdkgrpc "go.viam.com/rdk/grpc" "go.viam.com/rdk/logging" modlib "go.viam.com/rdk/module" modmanageroptions "go.viam.com/rdk/module/modmanager/options" "go.viam.com/rdk/module/modmaninterface" - "go.viam.com/rdk/operation" "go.viam.com/rdk/resource" "go.viam.com/rdk/robot/packages" rutils "go.viam.com/rdk/utils" @@ -86,30 +75,6 @@ func NewManager( return ret, nil } -type module struct { - cfg config.Module - dataDir string - process pexec.ManagedProcess - handles modlib.HandlerMap - sharedConn rdkgrpc.SharedConn - client pb.ModuleServiceClient - // robotClient supplements the ModuleServiceClient client to serve select robot level methods from the module server - robotClient robotpb.RobotServiceClient - addr string - resources map[resource.Name]*addedResource - // resourcesMu must be held if the `resources` field is accessed without - // write-locking the module manager. - resourcesMu sync.Mutex - - // pendingRemoval allows delaying module close until after resources within it are closed - pendingRemoval bool - - logger logging.Logger - ftdc *ftdc.FTDC - // port stores the listen port of this module when ViamTCPSockets() = true. - port int -} - type addedResource struct { conf resource.Config deps []string @@ -1069,96 +1034,6 @@ func (mgr *Manager) attemptRestart(ctx context.Context, mod *module) []resource. return nil } -// dial will Dial the module and replace the underlying connection (if it exists) in m.conn. -func (m *module) dial() error { - // TODO(PRODUCT-343): session support probably means interceptors here - var err error - addrToDial := m.addr - if !rutils.TCPRegex.MatchString(addrToDial) { - addrToDial = "unix://" + addrToDial - } - conn, err := grpc.Dial( //nolint:staticcheck - addrToDial, - grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(rpc.MaxMessageSize)), - grpc.WithTransportCredentials(insecure.NewCredentials()), - grpc.WithChainUnaryInterceptor( - rdkgrpc.EnsureTimeoutUnaryClientInterceptor, - grpc_retry.UnaryClientInterceptor(), - operation.UnaryClientInterceptor, - ), - grpc.WithChainStreamInterceptor( - grpc_retry.StreamClientInterceptor(), - operation.StreamClientInterceptor, - ), - ) - if err != nil { - return errors.WithMessage(err, "module startup failed") - } - - // Take the grpc over unix socket connection and add it to this `module`s `SharedConn` - // object. This `m.sharedConn` object is referenced by all resources/components. `Client` - // objects communicating with the module. If we're re-dialing after a restart, there may be - // existing resource `Client`s objects. Rather than recreating clients with new information, we - // choose to "swap out" the underlying connection object for those existing `Client`s. - // - // Resetting the `SharedConn` will also create a new WebRTC PeerConnection object. `dial`ing to - // a module is followed by doing a `ReadyRequest` `ReadyResponse` exchange. If that exchange - // contains a working WebRTC offer and answer, the PeerConnection will succeed in connecting. If - // there is an error exchanging offers and answers, the PeerConnection object will be nil'ed - // out. - m.sharedConn.ResetConn(rpc.GrpcOverHTTPClientConn{ClientConn: conn}, m.logger) - m.client = pb.NewModuleServiceClient(m.sharedConn.GrpcConn()) - m.robotClient = robotpb.NewRobotServiceClient(m.sharedConn.GrpcConn()) - return nil -} - -// checkReady sends a `ReadyRequest` and waits for either a `ReadyResponse`, or a context -// cancelation. -func (m *module) checkReady(ctx context.Context, parentAddr string) error { - ctxTimeout, cancelFunc := context.WithTimeout(ctx, rutils.GetModuleStartupTimeout(m.logger)) - defer cancelFunc() - - m.logger.CInfow(ctx, "Waiting for module to respond to ready request", "module", m.cfg.Name) - - req := &pb.ReadyRequest{ParentAddress: parentAddr} - - // Wait for gathering to complete. Pass the entire SDP as an offer to the `ReadyRequest`. - var err error - req.WebrtcOffer, err = m.sharedConn.GenerateEncodedOffer() - if err != nil { - m.logger.CWarnw(ctx, "Unable to generate offer for module PeerConnection. Ignoring.", "err", err) - } - - for { - // 5000 is an arbitrarily high number of attempts (context timeout should hit long before) - resp, err := m.client.Ready(ctxTimeout, req, grpc_retry.WithMax(5000)) - if err != nil { - return err - } - - if !resp.Ready { - // Module's can express that they are in a state: - // - That is Capable of receiving and responding to gRPC commands - // - But is "not ready" to take full responsibility of being a module - // - // Our behavior is to busy-poll until a module declares it is ready. But we otherwise do - // not adjust timeouts based on this information. - continue - } - - err = m.sharedConn.ProcessEncodedAnswer(resp.WebrtcAnswer) - if err != nil { - m.logger.CWarnw(ctx, "Unable to create PeerConnection with module. Ignoring.", "err", err) - } - - // The `ReadyRespones` also includes the Viam `API`s and `Model`s the module provides. This - // will be used to construct "generic Client" objects that can execute gRPC commands for - // methods that are not part of the viam-server's API proto. - m.handles, err = modlib.NewHandlerMapFromProto(ctx, resp.Handlermap, m.sharedConn.GrpcConn()) - return err - } -} - // FirstRun is runs a module-specific setup script. func (mgr *Manager) FirstRun(ctx context.Context, conf config.Module) error { pkgsDir := packages.LocalPackagesDir(mgr.packagesDir) @@ -1196,267 +1071,6 @@ func cleanWindowsSocketPath(goos, orig string) (string, error) { return orig, nil } -func (m *module) startProcess( - ctx context.Context, - parentAddr string, - oue func(int) bool, - viamHomeDir string, - packagesDir string, -) error { - var err error - - if rutils.ViamTCPSockets() { - m.addr = "127.0.0.1:" + strconv.Itoa(m.port) - } else { - // append a random alpha string to the module name while creating a socket address to avoid conflicts - // with old versions of the module. - if m.addr, err = modlib.CreateSocketAddress( - filepath.Dir(parentAddr), fmt.Sprintf("%s-%s", m.cfg.Name, utils.RandomAlphaString(5))); err != nil { - return err - } - m.addr, err = cleanWindowsSocketPath(runtime.GOOS, m.addr) - if err != nil { - return err - } - } - - // We evaluate the Module's ExePath absolutely in the viam-server process so that - // setting the CWD does not cause issues with relative process names - absoluteExePath, err := m.cfg.EvaluateExePath(packages.LocalPackagesDir(packagesDir)) - if err != nil { - return err - } - moduleEnvironment := m.getFullEnvironment(viamHomeDir) - // Prefer VIAM_MODULE_ROOT as the current working directory if present but fallback to the directory of the exepath - moduleWorkingDirectory, ok := moduleEnvironment["VIAM_MODULE_ROOT"] - if !ok { - moduleWorkingDirectory = filepath.Dir(absoluteExePath) - m.logger.CDebugw(ctx, "VIAM_MODULE_ROOT was not passed to module. Defaulting to module's working directory", - "module", m.cfg.Name, "dir", moduleWorkingDirectory) - } else { - m.logger.CInfow(ctx, "Starting module in working directory", "module", m.cfg.Name, "dir", moduleWorkingDirectory) - } - - // Create STDOUT and STDERR loggers for the module and turn off log deduplication for - // both. Module output through these loggers may contain data like stack traces, which - // are repetitive but are not actually "noisy." - stdoutLogger := m.logger.Sublogger("StdOut") - stderrLogger := m.logger.Sublogger("StdErr") - stdoutLogger.NeverDeduplicate() - stderrLogger.NeverDeduplicate() - - pconf := pexec.ProcessConfig{ - ID: m.cfg.Name, - Name: absoluteExePath, - Args: []string{m.addr}, - CWD: moduleWorkingDirectory, - Environment: moduleEnvironment, - Log: true, - OnUnexpectedExit: oue, - StdOutLogger: stdoutLogger, - StdErrLogger: stderrLogger, - } - // Start module process with supplied log level or "debug" if none is - // supplied and module manager has a DebugLevel logger. - if m.cfg.LogLevel != "" { - pconf.Args = append(pconf.Args, fmt.Sprintf(logLevelArgumentTemplate, m.cfg.LogLevel)) - } else if m.logger.Level().Enabled(zapcore.DebugLevel) { - pconf.Args = append(pconf.Args, fmt.Sprintf(logLevelArgumentTemplate, "debug")) - } - - m.process = pexec.NewManagedProcess(pconf, m.logger) - - if err := m.process.Start(context.Background()); err != nil { - return errors.WithMessage(err, "module startup failed") - } - - // Turn on process cpu/memory diagnostics for the module process. If there's an error, we - // continue normally, just without FTDC. - m.registerProcessWithFTDC() - - checkTicker := time.NewTicker(100 * time.Millisecond) - defer checkTicker.Stop() - - m.logger.CInfow(ctx, "Starting up module", "module", m.cfg.Name) - rutils.LogViamEnvVariables("Starting module with following Viam environment variables", moduleEnvironment, m.logger) - - ctxTimeout, cancel := context.WithTimeout(ctx, rutils.GetModuleStartupTimeout(m.logger)) - defer cancel() - for { - select { - case <-ctxTimeout.Done(): - if errors.Is(ctxTimeout.Err(), context.DeadlineExceeded) { - return rutils.NewModuleStartUpTimeoutError(m.cfg.Name) - } - return ctxTimeout.Err() - case <-checkTicker.C: - if errors.Is(m.process.Status(), os.ErrProcessDone) { - return fmt.Errorf( - "module %s exited too quickly after attempted startup; it might have a fatal runtime issue", - m.cfg.Name, - ) - } - } - if !rutils.TCPRegex.MatchString(m.addr) { - // note: we don't do this check in TCP mode because TCP addresses are not file paths and will fail check. - err = modlib.CheckSocketOwner(m.addr) - if errors.Is(err, fs.ErrNotExist) { - continue - } - if err != nil { - return errors.WithMessage(err, "module startup failed") - } - } - break - } - return nil -} - -func (m *module) stopProcess() error { - if m.process == nil { - return nil - } - - m.logger.Infof("Stopping module: %s process", m.cfg.Name) - - // Attempt to remove module's .sock file if module did not remove it - // already. - defer func() { - rutils.RemoveFileNoError(m.addr) - - // The system metrics "statser" is resilient to the process dying under the hood. An empty set - // of metrics will be reported. Therefore it is safe to continue monitoring the module process - // while it's in shutdown. - if m.ftdc != nil { - m.ftdc.Remove(m.getFTDCName()) - } - }() - - // TODO(RSDK-2551): stop ignoring exit status 143 once Python modules handle - // SIGTERM correctly. - // Also ignore if error is that the process no longer exists. - if err := m.process.Stop(); err != nil { - if strings.Contains(err.Error(), errMessageExitStatus143) || strings.Contains(err.Error(), "no such process") { - return nil - } - // Even though we got an error the process is dead; task failed successfully. - if statusErr := m.process.Status(); errors.Is(statusErr, os.ErrProcessDone) { - return nil - } - return err - } - - return nil -} - -func (m *module) killProcessGroup() { - if m.process == nil { - return - } - m.logger.Infof("Killing module: %s process", m.cfg.Name) - m.process.KillGroup() -} - -func (m *module) registerResources(mgr modmaninterface.ModuleManager) { - for api, models := range m.handles { - if _, ok := resource.LookupGenericAPIRegistration(api.API); !ok { - resource.RegisterAPI( - api.API, - resource.APIRegistration[resource.Resource]{ReflectRPCServiceDesc: api.Desc}, - ) - } - - switch { - case api.API.IsComponent(): - for _, model := range models { - m.logger.Infow("Registering component API and model from module", "module", m.cfg.Name, "API", api.API, "model", model) - resource.RegisterComponent(api.API, model, resource.Registration[resource.Resource, resource.NoNativeConfig]{ - Constructor: func( - ctx context.Context, - deps resource.Dependencies, - conf resource.Config, - logger logging.Logger, - ) (resource.Resource, error) { - return mgr.AddResource(ctx, conf, DepsToNames(deps)) - }, - }) - } - case api.API.IsService(): - for _, model := range models { - m.logger.Infow("Registering service API and model from module", "module", m.cfg.Name, "API", api.API, "model", model) - resource.RegisterService(api.API, model, resource.Registration[resource.Resource, resource.NoNativeConfig]{ - Constructor: func( - ctx context.Context, - deps resource.Dependencies, - conf resource.Config, - logger logging.Logger, - ) (resource.Resource, error) { - return mgr.AddResource(ctx, conf, DepsToNames(deps)) - }, - }) - } - default: - m.logger.Errorw("Invalid module type", "API type", api.API.Type) - } - } -} - -func (m *module) deregisterResources() { - for api, models := range m.handles { - for _, model := range models { - resource.Deregister(api.API, model) - } - } - m.handles = nil -} - -func (m *module) cleanupAfterStartupFailure() { - if err := m.stopProcess(); err != nil { - msg := "Error while stopping process of module that failed to start" - m.logger.Errorw(msg, "module", m.cfg.Name, "error", err) - } - utils.UncheckedError(m.sharedConn.Close()) -} - -func (m *module) cleanupAfterCrash(mgr *Manager) { - utils.UncheckedError(m.sharedConn.Close()) - mgr.rMap.Range(func(r resource.Name, mod *module) bool { - if mod == m { - mgr.rMap.Delete(r) - } - return true - }) - mgr.modules.Delete(m.cfg.Name) -} - -func (m *module) getFullEnvironment(viamHomeDir string) map[string]string { - return getFullEnvironment(m.cfg, m.dataDir, viamHomeDir) -} - -func (m *module) getFTDCName() string { - return fmt.Sprintf("proc.modules.%s", m.process.ID()) -} - -func (m *module) registerProcessWithFTDC() { - if m.ftdc == nil { - return - } - - pid, err := m.process.UnixPid() - if err != nil { - m.logger.Warnw("Module process has no pid. Cannot start ftdc.", "err", err) - return - } - - statser, err := sys.NewPidSysUsageStatser(pid) - if err != nil { - m.logger.Warnw("Cannot find /proc files", "err", err) - return - } - - m.ftdc.Add(m.getFTDCName(), statser) -} - func getFullEnvironment( cfg config.Module, dataDir string, diff --git a/module/modmanager/module.go b/module/modmanager/module.go new file mode 100644 index 00000000000..c9805918023 --- /dev/null +++ b/module/modmanager/module.go @@ -0,0 +1,410 @@ +package modmanager + +import ( + "context" + "fmt" + "io/fs" + "os" + "path/filepath" + "runtime" + "strconv" + "strings" + "sync" + "time" + + grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry" + "github.com/pkg/errors" + "go.uber.org/zap/zapcore" + pb "go.viam.com/api/module/v1" + robotpb "go.viam.com/api/robot/v1" + "go.viam.com/rdk/config" + "go.viam.com/rdk/ftdc" + "go.viam.com/rdk/ftdc/sys" + rdkgrpc "go.viam.com/rdk/grpc" + "go.viam.com/rdk/logging" + modlib "go.viam.com/rdk/module" + "go.viam.com/rdk/module/modmaninterface" + "go.viam.com/rdk/operation" + "go.viam.com/rdk/resource" + "go.viam.com/rdk/robot/packages" + rutils "go.viam.com/rdk/utils" + "go.viam.com/utils" + "go.viam.com/utils/pexec" + "go.viam.com/utils/rpc" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" +) + +type module struct { + cfg config.Module + dataDir string + process pexec.ManagedProcess + handles modlib.HandlerMap + sharedConn rdkgrpc.SharedConn + client pb.ModuleServiceClient + // robotClient supplements the ModuleServiceClient client to serve select robot level methods from the module server + robotClient robotpb.RobotServiceClient + addr string + resources map[resource.Name]*addedResource + // resourcesMu must be held if the `resources` field is accessed without + // write-locking the module manager. + resourcesMu sync.Mutex + + // pendingRemoval allows delaying module close until after resources within it are closed + pendingRemoval bool + logger logging.Logger + ftdc *ftdc.FTDC + // port stores the listen port of this module when ViamTCPSockets() = true. + port int +} + +// checkReady sends a `ReadyRequest` and waits for either a `ReadyResponse`, or a context +// cancelation. +func (m *module) checkReady(ctx context.Context, parentAddr string) error { + ctxTimeout, cancelFunc := context.WithTimeout(ctx, rutils.GetModuleStartupTimeout(m.logger)) + defer cancelFunc() + + m.logger.CInfow(ctx, "Waiting for module to respond to ready request", "module", m.cfg.Name) + + req := &pb.ReadyRequest{ParentAddress: parentAddr} + + // Wait for gathering to complete. Pass the entire SDP as an offer to the `ReadyRequest`. + var err error + req.WebrtcOffer, err = m.sharedConn.GenerateEncodedOffer() + if err != nil { + m.logger.CWarnw(ctx, "Unable to generate offer for module PeerConnection. Ignoring.", "err", err) + } + + for { + // 5000 is an arbitrarily high number of attempts (context timeout should hit long before) + resp, err := m.client.Ready(ctxTimeout, req, grpc_retry.WithMax(5000)) + if err != nil { + return err + } + + if !resp.Ready { + // Module's can express that they are in a state: + // - That is Capable of receiving and responding to gRPC commands + // - But is "not ready" to take full responsibility of being a module + // + // Our behavior is to busy-poll until a module declares it is ready. But we otherwise do + // not adjust timeouts based on this information. + continue + } + + err = m.sharedConn.ProcessEncodedAnswer(resp.WebrtcAnswer) + if err != nil { + m.logger.CWarnw(ctx, "Unable to create PeerConnection with module. Ignoring.", "err", err) + } + + // The `ReadyRespones` also includes the Viam `API`s and `Model`s the module provides. This + // will be used to construct "generic Client" objects that can execute gRPC commands for + // methods that are not part of the viam-server's API proto. + m.handles, err = modlib.NewHandlerMapFromProto(ctx, resp.Handlermap, m.sharedConn.GrpcConn()) + return err + } +} + +func (m *module) cleanupAfterCrash(mgr *Manager) { + utils.UncheckedError(m.sharedConn.Close()) + mgr.rMap.Range(func(r resource.Name, mod *module) bool { + if mod == m { + mgr.rMap.Delete(r) + } + return true + }) + mgr.modules.Delete(m.cfg.Name) +} + +func (m *module) getFullEnvironment(viamHomeDir string) map[string]string { + return getFullEnvironment(m.cfg, m.dataDir, viamHomeDir) +} + +func (m *module) getFTDCName() string { + return fmt.Sprintf("proc.modules.%s", m.process.ID()) +} + +func (m *module) registerProcessWithFTDC() { + if m.ftdc == nil { + return + } + + pid, err := m.process.UnixPid() + if err != nil { + m.logger.Warnw("Module process has no pid. Cannot start ftdc.", "err", err) + return + } + + statser, err := sys.NewPidSysUsageStatser(pid) + if err != nil { + m.logger.Warnw("Cannot find /proc files", "err", err) + return + } + + m.ftdc.Add(m.getFTDCName(), statser) +} + +func (m *module) killProcessGroup() { + if m.process == nil { + return + } + m.logger.Infof("Killing module: %s process", m.cfg.Name) + m.process.KillGroup() +} + +func (m *module) registerResources(mgr modmaninterface.ModuleManager) { + for api, models := range m.handles { + if _, ok := resource.LookupGenericAPIRegistration(api.API); !ok { + resource.RegisterAPI( + api.API, + resource.APIRegistration[resource.Resource]{ReflectRPCServiceDesc: api.Desc}, + ) + } + + switch { + case api.API.IsComponent(): + for _, model := range models { + m.logger.Infow("Registering component API and model from module", "module", m.cfg.Name, "API", api.API, "model", model) + resource.RegisterComponent(api.API, model, resource.Registration[resource.Resource, resource.NoNativeConfig]{ + Constructor: func( + ctx context.Context, + deps resource.Dependencies, + conf resource.Config, + logger logging.Logger, + ) (resource.Resource, error) { + return mgr.AddResource(ctx, conf, DepsToNames(deps)) + }, + }) + } + case api.API.IsService(): + for _, model := range models { + m.logger.Infow("Registering service API and model from module", "module", m.cfg.Name, "API", api.API, "model", model) + resource.RegisterService(api.API, model, resource.Registration[resource.Resource, resource.NoNativeConfig]{ + Constructor: func( + ctx context.Context, + deps resource.Dependencies, + conf resource.Config, + logger logging.Logger, + ) (resource.Resource, error) { + return mgr.AddResource(ctx, conf, DepsToNames(deps)) + }, + }) + } + default: + m.logger.Errorw("Invalid module type", "API type", api.API.Type) + } + } +} + +func (m *module) deregisterResources() { + for api, models := range m.handles { + for _, model := range models { + resource.Deregister(api.API, model) + } + } + m.handles = nil +} + +func (m *module) cleanupAfterStartupFailure() { + if err := m.stopProcess(); err != nil { + msg := "Error while stopping process of module that failed to start" + m.logger.Errorw(msg, "module", m.cfg.Name, "error", err) + } + utils.UncheckedError(m.sharedConn.Close()) +} + +// dial will Dial the module and replace the underlying connection (if it exists) in m.conn. +func (m *module) dial() error { + // TODO(PRODUCT-343): session support probably means interceptors here + var err error + addrToDial := m.addr + if !rutils.TCPRegex.MatchString(addrToDial) { + addrToDial = "unix://" + addrToDial + } + conn, err := grpc.Dial( //nolint:staticcheck + addrToDial, + grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(rpc.MaxMessageSize)), + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithChainUnaryInterceptor( + rdkgrpc.EnsureTimeoutUnaryClientInterceptor, + grpc_retry.UnaryClientInterceptor(), + operation.UnaryClientInterceptor, + ), + grpc.WithChainStreamInterceptor( + grpc_retry.StreamClientInterceptor(), + operation.StreamClientInterceptor, + ), + ) + if err != nil { + return errors.WithMessage(err, "module startup failed") + } + + // Take the grpc over unix socket connection and add it to this `module`s `SharedConn` + // object. This `m.sharedConn` object is referenced by all resources/components. `Client` + // objects communicating with the module. If we're re-dialing after a restart, there may be + // existing resource `Client`s objects. Rather than recreating clients with new information, we + // choose to "swap out" the underlying connection object for those existing `Client`s. + // + // Resetting the `SharedConn` will also create a new WebRTC PeerConnection object. `dial`ing to + // a module is followed by doing a `ReadyRequest` `ReadyResponse` exchange. If that exchange + // contains a working WebRTC offer and answer, the PeerConnection will succeed in connecting. If + // there is an error exchanging offers and answers, the PeerConnection object will be nil'ed + // out. + m.sharedConn.ResetConn(rpc.GrpcOverHTTPClientConn{ClientConn: conn}, m.logger) + m.client = pb.NewModuleServiceClient(m.sharedConn.GrpcConn()) + m.robotClient = robotpb.NewRobotServiceClient(m.sharedConn.GrpcConn()) + return nil +} + +func (m *module) startProcess( + ctx context.Context, + parentAddr string, + oue func(int) bool, + viamHomeDir string, + packagesDir string, +) error { + var err error + + if rutils.ViamTCPSockets() { + m.addr = "127.0.0.1:" + strconv.Itoa(m.port) + } else { + // append a random alpha string to the module name while creating a socket address to avoid conflicts + // with old versions of the module. + if m.addr, err = modlib.CreateSocketAddress( + filepath.Dir(parentAddr), fmt.Sprintf("%s-%s", m.cfg.Name, utils.RandomAlphaString(5))); err != nil { + return err + } + m.addr, err = cleanWindowsSocketPath(runtime.GOOS, m.addr) + if err != nil { + return err + } + } + + // We evaluate the Module's ExePath absolutely in the viam-server process so that + // setting the CWD does not cause issues with relative process names + absoluteExePath, err := m.cfg.EvaluateExePath(packages.LocalPackagesDir(packagesDir)) + if err != nil { + return err + } + moduleEnvironment := m.getFullEnvironment(viamHomeDir) + // Prefer VIAM_MODULE_ROOT as the current working directory if present but fallback to the directory of the exepath + moduleWorkingDirectory, ok := moduleEnvironment["VIAM_MODULE_ROOT"] + if !ok { + moduleWorkingDirectory = filepath.Dir(absoluteExePath) + m.logger.CDebugw(ctx, "VIAM_MODULE_ROOT was not passed to module. Defaulting to module's working directory", + "module", m.cfg.Name, "dir", moduleWorkingDirectory) + } else { + m.logger.CInfow(ctx, "Starting module in working directory", "module", m.cfg.Name, "dir", moduleWorkingDirectory) + } + + // Create STDOUT and STDERR loggers for the module and turn off log deduplication for + // both. Module output through these loggers may contain data like stack traces, which + // are repetitive but are not actually "noisy." + stdoutLogger := m.logger.Sublogger("StdOut") + stderrLogger := m.logger.Sublogger("StdErr") + stdoutLogger.NeverDeduplicate() + stderrLogger.NeverDeduplicate() + + pconf := pexec.ProcessConfig{ + ID: m.cfg.Name, + Name: absoluteExePath, + Args: []string{m.addr}, + CWD: moduleWorkingDirectory, + Environment: moduleEnvironment, + Log: true, + OnUnexpectedExit: oue, + StdOutLogger: stdoutLogger, + StdErrLogger: stderrLogger, + } + // Start module process with supplied log level or "debug" if none is + // supplied and module manager has a DebugLevel logger. + if m.cfg.LogLevel != "" { + pconf.Args = append(pconf.Args, fmt.Sprintf(logLevelArgumentTemplate, m.cfg.LogLevel)) + } else if m.logger.Level().Enabled(zapcore.DebugLevel) { + pconf.Args = append(pconf.Args, fmt.Sprintf(logLevelArgumentTemplate, "debug")) + } + + m.process = pexec.NewManagedProcess(pconf, m.logger) + + if err := m.process.Start(context.Background()); err != nil { + return errors.WithMessage(err, "module startup failed") + } + + // Turn on process cpu/memory diagnostics for the module process. If there's an error, we + // continue normally, just without FTDC. + m.registerProcessWithFTDC() + + checkTicker := time.NewTicker(100 * time.Millisecond) + defer checkTicker.Stop() + + m.logger.CInfow(ctx, "Starting up module", "module", m.cfg.Name) + rutils.LogViamEnvVariables("Starting module with following Viam environment variables", moduleEnvironment, m.logger) + + ctxTimeout, cancel := context.WithTimeout(ctx, rutils.GetModuleStartupTimeout(m.logger)) + defer cancel() + for { + select { + case <-ctxTimeout.Done(): + if errors.Is(ctxTimeout.Err(), context.DeadlineExceeded) { + return rutils.NewModuleStartUpTimeoutError(m.cfg.Name) + } + return ctxTimeout.Err() + case <-checkTicker.C: + if errors.Is(m.process.Status(), os.ErrProcessDone) { + return fmt.Errorf( + "module %s exited too quickly after attempted startup; it might have a fatal runtime issue", + m.cfg.Name, + ) + } + } + if !rutils.TCPRegex.MatchString(m.addr) { + // note: we don't do this check in TCP mode because TCP addresses are not file paths and will fail check. + err = modlib.CheckSocketOwner(m.addr) + if errors.Is(err, fs.ErrNotExist) { + continue + } + if err != nil { + return errors.WithMessage(err, "module startup failed") + } + } + break + } + return nil +} + +func (m *module) stopProcess() error { + if m.process == nil { + return nil + } + + m.logger.Infof("Stopping module: %s process", m.cfg.Name) + + // Attempt to remove module's .sock file if module did not remove it + // already. + defer func() { + rutils.RemoveFileNoError(m.addr) + + // The system metrics "statser" is resilient to the process dying under the hood. An empty set + // of metrics will be reported. Therefore it is safe to continue monitoring the module process + // while it's in shutdown. + if m.ftdc != nil { + m.ftdc.Remove(m.getFTDCName()) + } + }() + + // TODO(RSDK-2551): stop ignoring exit status 143 once Python modules handle + // SIGTERM correctly. + // Also ignore if error is that the process no longer exists. + if err := m.process.Stop(); err != nil { + if strings.Contains(err.Error(), errMessageExitStatus143) || strings.Contains(err.Error(), "no such process") { + return nil + } + // Even though we got an error the process is dead; task failed successfully. + if statusErr := m.process.Status(); errors.Is(statusErr, os.ErrProcessDone) { + return nil + } + return err + } + + return nil +} From 160193ca56ba51e37e22e853c1e5bb85c03b6a09 Mon Sep 17 00:00:00 2001 From: Joshua Matthews Date: Fri, 9 May 2025 10:03:19 -0400 Subject: [PATCH 04/13] Properly handle mutex handoff in unexpected restart callback --- module/modmanager/manager.go | 29 +++++++++++++++++------------ 1 file changed, 17 insertions(+), 12 deletions(-) diff --git a/module/modmanager/manager.go b/module/modmanager/manager.go index 369276bd2df..43b835800f9 100644 --- a/module/modmanager/manager.go +++ b/module/modmanager/manager.go @@ -18,6 +18,7 @@ import ( "go.uber.org/multierr" pb "go.viam.com/api/module/v1" "go.viam.com/utils" + "go.viam.com/utils/pexec" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -878,21 +879,25 @@ var oueRestartInterval = 5 * time.Second // newOnUnexpectedExitHandler returns the appropriate OnUnexpectedExit function // for the passed-in module to include in the pexec.ProcessConfig. -func (mgr *Manager) newOnUnexpectedExitHandler(mod *module) func(exitCode int) bool { - return func(exitCode int) bool { - // There is a circular dependency that causes a deadlock if a module dies - // while being reconfigured. Break it here by giving up on the restart if we - // cannot lock the mananger. - if locked := mgr.mu.TryLock(); !locked { - return false - } - defer mgr.mu.Unlock() - +func (mgr *Manager) newOnUnexpectedExitHandler(mod *module) pexec.UnexpectedExitHandler { + return func(exitCode int) (continueAttemptingRestart bool) { // Log error immediately, as this is unexpected behavior. mod.logger.Errorw( "Module has unexpectedly exited.", "module", mod.cfg.Name, "exit_code", exitCode, ) + // Take the lock to avoid racing with calls like Reconfigure that may make + // their own changes to the module's process. + mgr.mu.Lock() + defer mgr.mu.Unlock() + + // Something else already started a new process while we were waiting on the + // lock, so no restart is needed. + if err := mod.process.Status(); err == nil { + mod.logger.Warnw("Module process already running, abandoning restart attempt") + return + } + if err := mod.sharedConn.Close(); err != nil { mod.logger.Warnw("Error closing connection to crashed module. Continuing restart attempt", "error", err) @@ -915,7 +920,7 @@ func (mgr *Manager) newOnUnexpectedExitHandler(mod *module) func(exitCode int) b "resources", resource.NamesToStrings(orphanedResourceNames), ) } - return false + return } mod.logger.Infow("Module successfully restarted, re-adding resources", "module", mod.cfg.Name) @@ -941,7 +946,7 @@ func (mgr *Manager) newOnUnexpectedExitHandler(mod *module) func(exitCode int) b } mod.logger.Infow("Module resources successfully re-added after module restart", "module", mod.cfg.Name) - return false + return } } From 0317869f2643393448f6f5b07c64cee986216ef0 Mon Sep 17 00:00:00 2001 From: Joshua Matthews Date: Fri, 9 May 2025 10:36:35 -0400 Subject: [PATCH 05/13] Fixing test that still referenced removed struct field --- module/modmanager/manager_test.go | 9 --------- 1 file changed, 9 deletions(-) diff --git a/module/modmanager/manager_test.go b/module/modmanager/manager_test.go index f61d334aba3..a8b22c1fbfc 100644 --- a/module/modmanager/manager_test.go +++ b/module/modmanager/manager_test.go @@ -80,15 +80,6 @@ func setupModManager( return true }) test.That(t, mgr.Close(ctx), test.ShouldBeNil) - for _, mod := range modules { - if mod != nil { - func() { - // Wait for module recovery processes to complete. - mod.inRecoveryLock.Lock() - defer mod.inRecoveryLock.Unlock() - }() - } - } }) return mgr } From b340efabcae1e6b4dc1dcbb083282e38ce23546f Mon Sep 17 00:00:00 2001 From: Joshua Matthews Date: Fri, 9 May 2025 15:13:09 -0400 Subject: [PATCH 06/13] Removing use of new type alias from pexec so CI can run --- module/modmanager/manager.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/module/modmanager/manager.go b/module/modmanager/manager.go index 43b835800f9..5760a0adb0d 100644 --- a/module/modmanager/manager.go +++ b/module/modmanager/manager.go @@ -18,7 +18,6 @@ import ( "go.uber.org/multierr" pb "go.viam.com/api/module/v1" "go.viam.com/utils" - "go.viam.com/utils/pexec" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -879,7 +878,7 @@ var oueRestartInterval = 5 * time.Second // newOnUnexpectedExitHandler returns the appropriate OnUnexpectedExit function // for the passed-in module to include in the pexec.ProcessConfig. -func (mgr *Manager) newOnUnexpectedExitHandler(mod *module) pexec.UnexpectedExitHandler { +func (mgr *Manager) newOnUnexpectedExitHandler(mod *module) func(exitCode int) bool { return func(exitCode int) (continueAttemptingRestart bool) { // Log error immediately, as this is unexpected behavior. mod.logger.Errorw( From 0c47dc1ecf05ef1cede32c84bff599b5d6c59159 Mon Sep 17 00:00:00 2001 From: Joshua Matthews Date: Fri, 9 May 2025 16:11:01 -0400 Subject: [PATCH 07/13] Wait on module processes to terminate in test cleanup --- module/modmanager/manager_test.go | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/module/modmanager/manager_test.go b/module/modmanager/manager_test.go index a8b22c1fbfc..2007e27058a 100644 --- a/module/modmanager/manager_test.go +++ b/module/modmanager/manager_test.go @@ -75,11 +75,15 @@ func setupModManager( mMgr, ok := mgr.(*Manager) test.That(t, ok, test.ShouldBeTrue) modules := []*module{} - mMgr.modules.Range(func(_ string, mod *module) bool { + for _, mod := range mMgr.modules.Range { modules = append(modules, mod) - return true - }) + } test.That(t, mgr.Close(ctx), test.ShouldBeNil) + for _, m := range modules { + // managedProcess.Stop waits on the process lock and for all logging to + // end before returning. + m.process.Stop() + } }) return mgr } From 7d598f8efa7a389c6c07cdf091ec9afa50d7aa96 Mon Sep 17 00:00:00 2001 From: Joshua Matthews Date: Fri, 9 May 2025 16:29:43 -0400 Subject: [PATCH 08/13] Removing unused method and formatting code to keep the linter happy --- module/modmanager/manager.go | 6 ------ module/modmanager/module.go | 11 ++++++----- 2 files changed, 6 insertions(+), 11 deletions(-) diff --git a/module/modmanager/manager.go b/module/modmanager/manager.go index 5760a0adb0d..41298d9a525 100644 --- a/module/modmanager/manager.go +++ b/module/modmanager/manager.go @@ -517,12 +517,6 @@ func (mgr *Manager) AddResource(ctx context.Context, conf resource.Config, deps return mgr.addResource(ctx, conf, deps) } -func (mgr *Manager) addResourceWithWriteLock(ctx context.Context, conf resource.Config, deps []string) (resource.Resource, error) { - mgr.mu.Lock() - defer mgr.mu.Unlock() - return mgr.addResource(ctx, conf, deps) -} - func (mgr *Manager) addResource(ctx context.Context, conf resource.Config, deps []string) (resource.Resource, error) { mod, ok := mgr.getModule(conf) if !ok { diff --git a/module/modmanager/module.go b/module/modmanager/module.go index c9805918023..3e6b55cc4c0 100644 --- a/module/modmanager/module.go +++ b/module/modmanager/module.go @@ -17,6 +17,12 @@ import ( "go.uber.org/zap/zapcore" pb "go.viam.com/api/module/v1" robotpb "go.viam.com/api/robot/v1" + "go.viam.com/utils" + "go.viam.com/utils/pexec" + "go.viam.com/utils/rpc" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + "go.viam.com/rdk/config" "go.viam.com/rdk/ftdc" "go.viam.com/rdk/ftdc/sys" @@ -28,11 +34,6 @@ import ( "go.viam.com/rdk/resource" "go.viam.com/rdk/robot/packages" rutils "go.viam.com/rdk/utils" - "go.viam.com/utils" - "go.viam.com/utils/pexec" - "go.viam.com/utils/rpc" - "google.golang.org/grpc" - "google.golang.org/grpc/credentials/insecure" ) type module struct { From 82f10c0b9b212e67772aa1b036e5dc8e6e536755 Mon Sep 17 00:00:00 2001 From: Joshua Matthews Date: Tue, 13 May 2025 14:07:01 -0400 Subject: [PATCH 09/13] Bump goutils --- go.mod | 2 +- go.sum | 2 ++ module/modmanager/manager.go | 3 ++- 3 files changed, 5 insertions(+), 2 deletions(-) diff --git a/go.mod b/go.mod index 6da4709818f..035cd873bdb 100644 --- a/go.mod +++ b/go.mod @@ -81,7 +81,7 @@ require ( go.uber.org/zap v1.27.0 go.viam.com/api v0.1.432 go.viam.com/test v1.2.4 - go.viam.com/utils v0.1.142 + go.viam.com/utils v0.1.143 goji.io v2.0.2+incompatible golang.org/x/image v0.19.0 golang.org/x/mobile v0.0.0-20240112133503-c713f31d574b diff --git a/go.sum b/go.sum index a541f13c2e9..e463e0135f4 100644 --- a/go.sum +++ b/go.sum @@ -1538,6 +1538,8 @@ go.viam.com/test v1.2.4 h1:JYgZhsuGAQ8sL9jWkziAXN9VJJiKbjoi9BsO33TW3ug= go.viam.com/test v1.2.4/go.mod h1:zI2xzosHdqXAJ/kFqcN+OIF78kQuTV2nIhGZ8EzvaJI= go.viam.com/utils v0.1.142 h1:vk5ijKf5Q2eZ1AOOOttAhmLG1IHxGm+nrgz5j/CkvtA= go.viam.com/utils v0.1.142/go.mod h1:AjmP/wKHvMkBBW55HJecD3I3Ci10vZGFv7RzFfJ6+mw= +go.viam.com/utils v0.1.143 h1:QyvniQw6+WDovfyhMzbDO6D2vPkiQpKyDkjDPu1OaE0= +go.viam.com/utils v0.1.143/go.mod h1:AjmP/wKHvMkBBW55HJecD3I3Ci10vZGFv7RzFfJ6+mw= go4.org/unsafe/assume-no-moving-gc v0.0.0-20230525183740-e7c30c78aeb2 h1:WJhcL4p+YeDxmZWg141nRm7XC8IDmhz7lk5GpadO1Sg= go4.org/unsafe/assume-no-moving-gc v0.0.0-20230525183740-e7c30c78aeb2/go.mod h1:FftLjUGFEDu5k8lt0ddY+HcrH/qU/0qk+H8j9/nTl3E= gocv.io/x/gocv v0.25.0/go.mod h1:Rar2PS6DV+T4FL+PM535EImD/h13hGVaHhnCu1xarBs= diff --git a/module/modmanager/manager.go b/module/modmanager/manager.go index 41298d9a525..75845fd7e55 100644 --- a/module/modmanager/manager.go +++ b/module/modmanager/manager.go @@ -18,6 +18,7 @@ import ( "go.uber.org/multierr" pb "go.viam.com/api/module/v1" "go.viam.com/utils" + "go.viam.com/utils/pexec" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -872,7 +873,7 @@ var oueRestartInterval = 5 * time.Second // newOnUnexpectedExitHandler returns the appropriate OnUnexpectedExit function // for the passed-in module to include in the pexec.ProcessConfig. -func (mgr *Manager) newOnUnexpectedExitHandler(mod *module) func(exitCode int) bool { +func (mgr *Manager) newOnUnexpectedExitHandler(mod *module) pexec.UnexpectedExitHandler { return func(exitCode int) (continueAttemptingRestart bool) { // Log error immediately, as this is unexpected behavior. mod.logger.Errorw( From e59d45e26add01381b192e96d603c7e08f072cfa Mon Sep 17 00:00:00 2001 From: Joshua Matthews Date: Tue, 13 May 2025 18:31:01 -0400 Subject: [PATCH 10/13] Clean up duplicate goutils entry in go.mod --- go.sum | 2 -- 1 file changed, 2 deletions(-) diff --git a/go.sum b/go.sum index e463e0135f4..9de2bb1272c 100644 --- a/go.sum +++ b/go.sum @@ -1536,8 +1536,6 @@ go.viam.com/api v0.1.432 h1:XT2HfUC/nO/1otRbYat25E4dhk+9KiTU/yj2jmy/YMM= go.viam.com/api v0.1.432/go.mod h1:drvlBWaiHFxPziz5jayHvibez1qG7lylcNCC1LF8onU= go.viam.com/test v1.2.4 h1:JYgZhsuGAQ8sL9jWkziAXN9VJJiKbjoi9BsO33TW3ug= go.viam.com/test v1.2.4/go.mod h1:zI2xzosHdqXAJ/kFqcN+OIF78kQuTV2nIhGZ8EzvaJI= -go.viam.com/utils v0.1.142 h1:vk5ijKf5Q2eZ1AOOOttAhmLG1IHxGm+nrgz5j/CkvtA= -go.viam.com/utils v0.1.142/go.mod h1:AjmP/wKHvMkBBW55HJecD3I3Ci10vZGFv7RzFfJ6+mw= go.viam.com/utils v0.1.143 h1:QyvniQw6+WDovfyhMzbDO6D2vPkiQpKyDkjDPu1OaE0= go.viam.com/utils v0.1.143/go.mod h1:AjmP/wKHvMkBBW55HJecD3I3Ci10vZGFv7RzFfJ6+mw= go4.org/unsafe/assume-no-moving-gc v0.0.0-20230525183740-e7c30c78aeb2 h1:WJhcL4p+YeDxmZWg141nRm7XC8IDmhz7lk5GpadO1Sg= From 3f1a858ef9c26a7e37105184c2f0c935db69ff4a Mon Sep 17 00:00:00 2001 From: Joshua Matthews Date: Wed, 14 May 2025 13:55:08 -0400 Subject: [PATCH 11/13] Fixing race in server shutdown test --- testutils/robottestutils/robot_utils.go | 20 +++++++++++++++++--- web/server/entrypoint_test.go | 10 +++++++++- 2 files changed, 26 insertions(+), 4 deletions(-) diff --git a/testutils/robottestutils/robot_utils.go b/testutils/robottestutils/robot_utils.go index e5b87d46da8..07ae3695291 100644 --- a/testutils/robottestutils/robot_utils.go +++ b/testutils/robottestutils/robot_utils.go @@ -95,21 +95,35 @@ func MakeTempConfig(t *testing.T, cfg *config.Config, logger logging.Logger) (st return file.Name(), file.Close() } +// ServerProcessOptions are used to modify the behavior of ServerAsSeparateProcess. +type ServerProcessOption func(*pexec.ProcessConfig) + +// WithoutRestart disables the automatic restart of the pexec library for the server process. +func WithoutRestart() ServerProcessOption { + return func(cfg *pexec.ProcessConfig) { + cfg.OnUnexpectedExit = func(int) bool { return false } + } +} + // ServerAsSeparateProcess builds the viam server and returns an unstarted ManagedProcess for // the built binary. -func ServerAsSeparateProcess(t *testing.T, cfgFileName string, logger logging.Logger) pexec.ManagedProcess { +func ServerAsSeparateProcess(t *testing.T, cfgFileName string, logger logging.Logger, opts ...ServerProcessOption) pexec.ManagedProcess { serverPath := rtestutils.BuildTempModule(t, "web/cmd/server/") // use a temporary home directory so that it doesn't collide with // the user's/other tests' viam home directory testTempHome := t.TempDir() - server := pexec.NewManagedProcess(pexec.ProcessConfig{ + cfg := pexec.ProcessConfig{ Name: serverPath, Args: []string{"-config", cfgFileName}, CWD: utils.ResolveFile("./"), Environment: map[string]string{"HOME": testTempHome}, Log: true, - }, logger) + } + for _, opt := range opts { + opt(&cfg) + } + server := pexec.NewManagedProcess(cfg, logger) return server } diff --git a/web/server/entrypoint_test.go b/web/server/entrypoint_test.go index 69ce8406b9b..f12da6ee0db 100644 --- a/web/server/entrypoint_test.go +++ b/web/server/entrypoint_test.go @@ -150,7 +150,15 @@ func TestShutdown(t *testing.T) { cfgFilename, err = robottestutils.MakeTempConfig(t, cfg, testLogger) test.That(t, err, test.ShouldBeNil) - server = robottestutils.ServerAsSeparateProcess(t, cfgFilename, serverLogger) + // Start the server w/ ManagedProcess auto-restart disabled, otherwise + // we'll be racing the process restart to check that the stop command + // actually worked. + server = robottestutils.ServerAsSeparateProcess( + t, + cfgFilename, + serverLogger, + robottestutils.WithoutRestart(), + ) err = server.Start(context.Background()) test.That(t, err, test.ShouldBeNil) From 40f7452e32a3fa7a78b5809cd21acef9f0600a1b Mon Sep 17 00:00:00 2001 From: Joshua Matthews Date: Wed, 14 May 2025 14:02:41 -0400 Subject: [PATCH 12/13] Make docstring linter happy --- testutils/robottestutils/robot_utils.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/testutils/robottestutils/robot_utils.go b/testutils/robottestutils/robot_utils.go index 07ae3695291..143e1b9cd3d 100644 --- a/testutils/robottestutils/robot_utils.go +++ b/testutils/robottestutils/robot_utils.go @@ -95,7 +95,7 @@ func MakeTempConfig(t *testing.T, cfg *config.Config, logger logging.Logger) (st return file.Name(), file.Close() } -// ServerProcessOptions are used to modify the behavior of ServerAsSeparateProcess. +// ServerProcessOption can be used to modify the behavior of ServerAsSeparateProcess. type ServerProcessOption func(*pexec.ProcessConfig) // WithoutRestart disables the automatic restart of the pexec library for the server process. From 1e799ed4163c0b8732573de40d72eba8c13c7209 Mon Sep 17 00:00:00 2001 From: Joshua Matthews Date: Thu, 15 May 2025 10:28:47 -0400 Subject: [PATCH 13/13] Preventing crashed module restart when module has been removed --- module/modmanager/manager.go | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/module/modmanager/manager.go b/module/modmanager/manager.go index ec6cde0fde1..1f911497158 100644 --- a/module/modmanager/manager.go +++ b/module/modmanager/manager.go @@ -441,13 +441,17 @@ func (mgr *Manager) Remove(modName string) ([]resource.Name, error) { handledResources := mod.resources - // If module handles no resources, remove it now. Otherwise mark it - // pendingRemoval for eventual removal after last handled resource has been - // closed. + // Always mark pendingRemoval even if there is no more work to do because the + // restart handler checks it to avoid restarting a removed module. + mod.pendingRemoval = true + + // If module handles no resources, remove it now. if len(handledResources) == 0 { return nil, mgr.closeModule(mod, false) } + // Otherwise return the list of resources that need to be closed before the + // module can be cleanly removed. var orphanedResourceNames []resource.Name var orphanedResourceNameStrings []string for name := range handledResources { @@ -456,7 +460,6 @@ func (mgr *Manager) Remove(modName string) ([]resource.Name, error) { } mgr.logger.Infow("Resources handled by removed module will be removed", "module", mod.cfg.Name, "resources", orphanedResourceNameStrings) - mod.pendingRemoval = true return orphanedResourceNames, nil } @@ -878,10 +881,15 @@ func (mgr *Manager) newOnUnexpectedExitHandler(mod *module) pexec.UnexpectedExit mgr.mu.Lock() defer mgr.mu.Unlock() + if mod.pendingRemoval { + mod.logger.Infow("Module marked for removal, abandoning restart attempt") + return + } + // Something else already started a new process while we were waiting on the // lock, so no restart is needed. if err := mod.process.Status(); err == nil { - mod.logger.Warnw("Module process already running, abandoning restart attempt") + mod.logger.Infow("Module process already running, abandoning restart attempt") return }