diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml index 6b8cb582..3abfd274 100644 --- a/.github/workflows/go.yml +++ b/.github/workflows/go.yml @@ -14,7 +14,7 @@ jobs: build: strategy: matrix: - os: [ubuntu-latest, windows-latest] + os: [ubuntu-latest, windows-latest, macos-latest] runs-on: ${{ matrix.os }} steps: - uses: actions/checkout@08eba0b27e820071cde6df949e0beb9ba4906955 # v4.3.0 @@ -31,5 +31,6 @@ jobs: run: go test -v ./... - name: Run golangci-lint + if: matrix.os == 'ubuntu-latest' uses: golangci/golangci-lint-action@v9.0.0 diff --git a/fixtures-code/process-compose-shutdown-inorder.yaml b/fixtures-code/process-compose-shutdown-inorder.yaml index 60aaf390..663cd0a4 100644 --- a/fixtures-code/process-compose-shutdown-inorder.yaml +++ b/fixtures-code/process-compose-shutdown-inorder.yaml @@ -4,39 +4,50 @@ log_level: debug log_length: 1000 ordered_shutdown: true +# The shutdown.command on each process appends an "X: exit" line to the file +# named by $PC_TEST_SHUTDOWN_LOG (set by the test) and then exits non-zero so +# process-compose SIGKILLs the original `sleep`. Using shutdown.command instead +# of a bash trap makes the ordering deterministic: shutdown.command runs +# synchronously from process-compose's shutDown() in the order shutDownInOrder +# imposes, with no dependency on stdout-pipe drain timing or bash signal +# handling — both of which race on slower runners (notably macOS CI). + processes: procA: command: | - trap 'echo "A: exit"' SIGTERM echo "A: starting" - sleep 3 + sleep 30 + shutdown: + command: 'printf "A: exit\n" >> "$PC_TEST_SHUTDOWN_LOG"; exit 1' procB: command: | - trap 'echo "B: exit"' SIGTERM echo "B: starting" - sleep 3 + sleep 30 + shutdown: + command: 'printf "B: exit\n" >> "$PC_TEST_SHUTDOWN_LOG"; exit 1' depends_on: procC: condition: process_started procD: condition: process_started - procC: command: | - trap 'echo "C: exit"' SIGTERM echo "C: starting" - sleep 3 + sleep 30 + shutdown: + command: 'printf "C: exit\n" >> "$PC_TEST_SHUTDOWN_LOG"; exit 1' depends_on: procA: condition: process_started procD: command: | - trap 'echo "D: exit"' SIGTERM echo "D: starting" - sleep 3 + sleep 30 + shutdown: + command: 'printf "D: exit\n" >> "$PC_TEST_SHUTDOWN_LOG"; exit 1' depends_on: procA: condition: process_started diff --git a/src/app/project_runner.go b/src/app/project_runner.go index 328b2d89..485adadf 100644 --- a/src/app/project_runner.go +++ b/src/app/project_runner.go @@ -12,6 +12,7 @@ import ( "slices" "strings" "sync" + "sync/atomic" "time" "github.com/f1bonacc1/process-compose/src/command" @@ -65,7 +66,7 @@ type ProjectRunner struct { withRecursiveMetrics bool procCompleteChannel chan int processTree *ProcessTree - processScheduler *scheduler.Scheduler + processScheduler atomic.Pointer[scheduler.Scheduler] stateBroadcaster *ProcessStateBroadcaster } @@ -160,24 +161,25 @@ func (p *ProjectRunner) Run() error { log.Debug().Msgf("Spinning up %d processes. Order: %q", len(runOrder), nameOrder) // Initialize and start scheduler for scheduled processes - p.processScheduler, err = scheduler.New(p) + sched, err := scheduler.New(p) if err != nil { log.Error().Err(err).Msg("Failed to create scheduler") } else { + p.processScheduler.Store(sched) for name, proc := range p.project.Processes { if proc.Schedule != nil && proc.Schedule.IsScheduled() { - if err := p.processScheduler.AddProcess(name, proc.Schedule); err != nil { + if err := sched.AddProcess(name, proc.Schedule); err != nil { log.Error().Err(err).Msgf("Failed to schedule process %s", name) } else if proc.Disabled { - if err := p.processScheduler.PauseProcess(name); err != nil { + if err := sched.PauseProcess(name); err != nil { log.Error().Err(err).Msgf("Failed to pause schedule for disabled process %s", name) } } } } - p.processScheduler.Start() + sched.Start() defer func() { - if err := p.processScheduler.Stop(); err != nil { + if err := sched.Stop(); err != nil { log.Error().Err(err).Msg("Failed to stop scheduler gracefully") } }() @@ -210,7 +212,7 @@ func (p *ProjectRunner) Run() error { log.Debug().Msgf("Skipping project completion: %d restart(s) in progress", pendingRestarts) continue } - if p.processScheduler == nil || len(p.processScheduler.GetScheduledProcesses()) == 0 { + if s := p.processScheduler.Load(); s == nil || len(s.GetScheduledProcesses()) == 0 { log.Info().Msg("Project completed") p.exitCodeMutex.Lock() exitCode := p.exitCode @@ -385,8 +387,8 @@ func (p *ProjectRunner) GetProcessState(name string) (*types.ProcessState, error } // Add next run time for scheduled processes - if p.processScheduler != nil { - nextRun := p.processScheduler.GetNextRunTime(name) + if s := p.processScheduler.Load(); s != nil { + nextRun := s.GetNextRunTime(name) state.NextRunTime = nextRun if nextRun != nil { if !state.IsRunning { @@ -500,8 +502,8 @@ func (p *ProjectRunner) StartProcess(name string) error { if processConfig, ok := p.project.Processes[name]; ok { p.runProcess(&processConfig) // Resume schedule if it was paused (e.g. initially disabled) - if p.processScheduler != nil && p.processScheduler.IsScheduled(name) { - if err := p.processScheduler.ResumeProcess(name); err != nil { + if s := p.processScheduler.Load(); s != nil && s.IsScheduled(name) { + if err := s.ResumeProcess(name); err != nil { log.Error().Err(err).Msgf("Failed to resume schedule for process %s", name) } } @@ -524,7 +526,8 @@ func (p *ProjectRunner) StopProcess(name string) error { } } else { // If not running, check if it's scheduled. If so, we'll just pause the schedule. - if p.processScheduler == nil || !p.processScheduler.IsScheduled(name) { + sched := p.processScheduler.Load() + if sched == nil || !sched.IsScheduled(name) { if _, ok := p.project.Processes[name]; !ok { log.Error().Msgf("Process %s does not exist", name) return fmt.Errorf("process %s does not exist", name) @@ -535,8 +538,8 @@ func (p *ProjectRunner) StopProcess(name string) error { } // Pause schedule if it was running or scheduled - if p.processScheduler != nil && p.processScheduler.IsScheduled(name) { - if pauseErr := p.processScheduler.PauseProcess(name); pauseErr != nil { + if sched := p.processScheduler.Load(); sched != nil && sched.IsScheduled(name) { + if pauseErr := sched.PauseProcess(name); pauseErr != nil { log.Error().Err(pauseErr).Msgf("Failed to pause schedule for process %s", name) if err == nil { err = pauseErr diff --git a/src/app/system_test.go b/src/app/system_test.go index d25d1996..307af081 100644 --- a/src/app/system_test.go +++ b/src/app/system_test.go @@ -1,7 +1,6 @@ package app import ( - "bufio" "errors" "fmt" "os" @@ -471,10 +470,15 @@ func TestSystem_TestProcListToRun(t *testing.T) { func TestSystem_TestProcListShutsDownInOrder(t *testing.T) { if runtime.GOOS == "windows" { - t.Skip("Skipping on Windows due to bash-specific fixture (trap/sleep)") + t.Skip("Skipping on Windows: fixture uses bash and POSIX shell redirection") } fixture1 := filepath.Join("..", "..", "fixtures-code", "process-compose-shutdown-inorder.yaml") t.Run("Single Proc with deps", func(t *testing.T) { + // The fixture's shutdown.command appends to this file. Using a fresh + // path per run avoids cross-test contamination; t.Setenv restores the + // previous value on cleanup. + shutdownLog := filepath.Join(t.TempDir(), "shutdown_order.log") + t.Setenv("PC_TEST_SHUTDOWN_LOG", shutdownLog) project, err := loader.Load(&loader.LoaderOptions{ FileNames: []string{fixture1}, @@ -502,28 +506,19 @@ func TestSystem_TestProcListShutsDownInOrder(t *testing.T) { t.Errorf("process %s is disabled", name) } } - file, err := os.CreateTemp(t.TempDir(), "pc_log.*.log") - if err != nil { - t.Error(err.Error()) - return - } - defer os.Remove(file.Name()) - project.LogLocation = file.Name() - project.LoggerConfig = &types.LoggerConfig{ - FieldsOrder: []string{"message"}, - DisableJSON: true, - TimestampFormat: "", - NoMetadata: true, - FlushEachLine: true, - NoColor: true, - } go func() { err := runner.Run() if err != nil { t.Error(err.Error()) } }() - time.Sleep(10 * time.Millisecond) + // Wait for all processes to reach Running before triggering shutdown; + // shutdown.command relies only on shutDown() being invoked in order, not + // on signal-delivery races, but we still need the runner to have started + // all processes before ShutDownProject is meaningful. + for _, name := range []string{"procA", "procB", "procC", "procD"} { + waitForProcessState(t, runner, name, types.ProcessStateRunning, 5*time.Second) + } states, err := runner.GetProcessesState() if err != nil { t.Error(err.Error()) @@ -534,7 +529,6 @@ func TestSystem_TestProcListShutsDownInOrder(t *testing.T) { t.Errorf("len(states.States) = %d, want %d", len(states.States), want) } - time.Sleep(10 * time.Millisecond) err = runner.ShutDownProject() if err != nil { t.Error(err.Error()) @@ -554,18 +548,33 @@ func TestSystem_TestProcListShutsDownInOrder(t *testing.T) { if runningProcesses != want { t.Errorf("runningProcesses = %d, want %d", runningProcesses, want) } - //read file and validate the shutdown order - scanner := bufio.NewScanner(file) - order := make([]string, 0) - for scanner.Scan() { - line := scanner.Text() - if strings.Contains(line, "exit") { - order = append(order, line) - } - } - //the order if first D or C exits is not defined + // Read the shutdown-order file written by each process's + // shutdown.command. C and D shut down in parallel, so either order + // between them is acceptable. wantOrder1 := []string{"B: exit", "D: exit", "C: exit", "A: exit"} wantOrder2 := []string{"B: exit", "C: exit", "D: exit", "A: exit"} + + // Poll briefly: shutdown.command writes happen during ShutDownProject + // and should be on disk by now, but tolerate small fs lag on slow CI. + var order []string + deadline := time.Now().Add(5 * time.Second) + for time.Now().Before(deadline) { + data, readErr := os.ReadFile(shutdownLog) + if readErr != nil && !errors.Is(readErr, os.ErrNotExist) { + t.Error(readErr.Error()) + return + } + order = order[:0] + for line := range strings.SplitSeq(strings.TrimRight(string(data), "\n"), "\n") { + if line != "" { + order = append(order, line) + } + } + if slices.Equal(order, wantOrder1) || slices.Equal(order, wantOrder2) { + break + } + time.Sleep(50 * time.Millisecond) + } if !slices.Equal(order, wantOrder1) && !slices.Equal(order, wantOrder2) { t.Errorf("content = %v, want %v or %v", order, wantOrder1, wantOrder2) return @@ -1412,6 +1421,12 @@ func TestSystem_ConcurrentRestartRaceCondition(t *testing.T) { return } + // The last restart may still be transitioning (Terminating → Launching → + // Running). Wait briefly for the post-restart state to stabilize instead of + // racing it — on slower runners (e.g. macOS) the underlying `sleep 2` of an + // earlier launch can complete before we sample the state. + waitForProcessState(t, runner, testProcess, types.ProcessStateRunning, 5*time.Second) + // Verify exactly one process is running after all concurrent restarts state, err := runner.GetProcessState(testProcess) if err != nil {