Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .github/workflows/go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ jobs:
build:
strategy:
matrix:
os: [ubuntu-latest, windows-latest]
os: [ubuntu-latest, windows-latest, macos-latest]
runs-on: ${{ matrix.os }}
steps:
- uses: actions/checkout@08eba0b27e820071cde6df949e0beb9ba4906955 # v4.3.0
Expand All @@ -31,5 +31,6 @@ jobs:
run: go test -v ./...

- name: Run golangci-lint
if: matrix.os == 'ubuntu-latest'
uses: golangci/golangci-lint-action@v9.0.0

29 changes: 20 additions & 9 deletions fixtures-code/process-compose-shutdown-inorder.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,39 +4,50 @@ log_level: debug
log_length: 1000
ordered_shutdown: true

# The shutdown.command on each process appends an "X: exit" line to the file
# named by $PC_TEST_SHUTDOWN_LOG (set by the test) and then exits non-zero so
# process-compose SIGKILLs the original `sleep`. Using shutdown.command instead
# of a bash trap makes the ordering deterministic: shutdown.command runs
# synchronously from process-compose's shutDown() in the order shutDownInOrder
# imposes, with no dependency on stdout-pipe drain timing or bash signal
# handling — both of which race on slower runners (notably macOS CI).

processes:
procA:
command: |
trap 'echo "A: exit"' SIGTERM
echo "A: starting"
sleep 3
sleep 30
shutdown:
command: 'printf "A: exit\n" >> "$PC_TEST_SHUTDOWN_LOG"; exit 1'

procB:
command: |
trap 'echo "B: exit"' SIGTERM
echo "B: starting"
sleep 3
sleep 30
shutdown:
command: 'printf "B: exit\n" >> "$PC_TEST_SHUTDOWN_LOG"; exit 1'
depends_on:
procC:
condition: process_started
procD:
condition: process_started


procC:
command: |
trap 'echo "C: exit"' SIGTERM
echo "C: starting"
sleep 3
sleep 30
shutdown:
command: 'printf "C: exit\n" >> "$PC_TEST_SHUTDOWN_LOG"; exit 1'
depends_on:
procA:
condition: process_started

procD:
command: |
trap 'echo "D: exit"' SIGTERM
echo "D: starting"
sleep 3
sleep 30
shutdown:
command: 'printf "D: exit\n" >> "$PC_TEST_SHUTDOWN_LOG"; exit 1'
depends_on:
procA:
condition: process_started
31 changes: 17 additions & 14 deletions src/app/project_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"slices"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/f1bonacc1/process-compose/src/command"
Expand Down Expand Up @@ -65,7 +66,7 @@ type ProjectRunner struct {
withRecursiveMetrics bool
procCompleteChannel chan int
processTree *ProcessTree
processScheduler *scheduler.Scheduler
processScheduler atomic.Pointer[scheduler.Scheduler]
stateBroadcaster *ProcessStateBroadcaster
}

Expand Down Expand Up @@ -160,24 +161,25 @@ func (p *ProjectRunner) Run() error {
log.Debug().Msgf("Spinning up %d processes. Order: %q", len(runOrder), nameOrder)

// Initialize and start scheduler for scheduled processes
p.processScheduler, err = scheduler.New(p)
sched, err := scheduler.New(p)
if err != nil {
log.Error().Err(err).Msg("Failed to create scheduler")
} else {
p.processScheduler.Store(sched)
for name, proc := range p.project.Processes {
if proc.Schedule != nil && proc.Schedule.IsScheduled() {
if err := p.processScheduler.AddProcess(name, proc.Schedule); err != nil {
if err := sched.AddProcess(name, proc.Schedule); err != nil {
log.Error().Err(err).Msgf("Failed to schedule process %s", name)
} else if proc.Disabled {
if err := p.processScheduler.PauseProcess(name); err != nil {
if err := sched.PauseProcess(name); err != nil {
log.Error().Err(err).Msgf("Failed to pause schedule for disabled process %s", name)
}
}
}
}
p.processScheduler.Start()
sched.Start()
defer func() {
if err := p.processScheduler.Stop(); err != nil {
if err := sched.Stop(); err != nil {
log.Error().Err(err).Msg("Failed to stop scheduler gracefully")
}
}()
Expand Down Expand Up @@ -210,7 +212,7 @@ func (p *ProjectRunner) Run() error {
log.Debug().Msgf("Skipping project completion: %d restart(s) in progress", pendingRestarts)
continue
}
if p.processScheduler == nil || len(p.processScheduler.GetScheduledProcesses()) == 0 {
if s := p.processScheduler.Load(); s == nil || len(s.GetScheduledProcesses()) == 0 {
log.Info().Msg("Project completed")
p.exitCodeMutex.Lock()
exitCode := p.exitCode
Expand Down Expand Up @@ -385,8 +387,8 @@ func (p *ProjectRunner) GetProcessState(name string) (*types.ProcessState, error
}

// Add next run time for scheduled processes
if p.processScheduler != nil {
nextRun := p.processScheduler.GetNextRunTime(name)
if s := p.processScheduler.Load(); s != nil {
nextRun := s.GetNextRunTime(name)
state.NextRunTime = nextRun
if nextRun != nil {
if !state.IsRunning {
Expand Down Expand Up @@ -500,8 +502,8 @@ func (p *ProjectRunner) StartProcess(name string) error {
if processConfig, ok := p.project.Processes[name]; ok {
p.runProcess(&processConfig)
// Resume schedule if it was paused (e.g. initially disabled)
if p.processScheduler != nil && p.processScheduler.IsScheduled(name) {
if err := p.processScheduler.ResumeProcess(name); err != nil {
if s := p.processScheduler.Load(); s != nil && s.IsScheduled(name) {
if err := s.ResumeProcess(name); err != nil {
log.Error().Err(err).Msgf("Failed to resume schedule for process %s", name)
}
}
Expand All @@ -524,7 +526,8 @@ func (p *ProjectRunner) StopProcess(name string) error {
}
} else {
// If not running, check if it's scheduled. If so, we'll just pause the schedule.
if p.processScheduler == nil || !p.processScheduler.IsScheduled(name) {
sched := p.processScheduler.Load()
if sched == nil || !sched.IsScheduled(name) {
if _, ok := p.project.Processes[name]; !ok {
log.Error().Msgf("Process %s does not exist", name)
return fmt.Errorf("process %s does not exist", name)
Expand All @@ -535,8 +538,8 @@ func (p *ProjectRunner) StopProcess(name string) error {
}

// Pause schedule if it was running or scheduled
if p.processScheduler != nil && p.processScheduler.IsScheduled(name) {
if pauseErr := p.processScheduler.PauseProcess(name); pauseErr != nil {
if sched := p.processScheduler.Load(); sched != nil && sched.IsScheduled(name) {
if pauseErr := sched.PauseProcess(name); pauseErr != nil {
log.Error().Err(pauseErr).Msgf("Failed to pause schedule for process %s", name)
if err == nil {
err = pauseErr
Expand Down
73 changes: 44 additions & 29 deletions src/app/system_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package app

import (
"bufio"
"errors"
"fmt"
"os"
Expand Down Expand Up @@ -471,10 +470,15 @@ func TestSystem_TestProcListToRun(t *testing.T) {

func TestSystem_TestProcListShutsDownInOrder(t *testing.T) {
if runtime.GOOS == "windows" {
t.Skip("Skipping on Windows due to bash-specific fixture (trap/sleep)")
t.Skip("Skipping on Windows: fixture uses bash and POSIX shell redirection")
}
fixture1 := filepath.Join("..", "..", "fixtures-code", "process-compose-shutdown-inorder.yaml")
t.Run("Single Proc with deps", func(t *testing.T) {
// The fixture's shutdown.command appends to this file. Using a fresh
// path per run avoids cross-test contamination; t.Setenv restores the
// previous value on cleanup.
shutdownLog := filepath.Join(t.TempDir(), "shutdown_order.log")
t.Setenv("PC_TEST_SHUTDOWN_LOG", shutdownLog)

project, err := loader.Load(&loader.LoaderOptions{
FileNames: []string{fixture1},
Expand Down Expand Up @@ -502,28 +506,19 @@ func TestSystem_TestProcListShutsDownInOrder(t *testing.T) {
t.Errorf("process %s is disabled", name)
}
}
file, err := os.CreateTemp(t.TempDir(), "pc_log.*.log")
if err != nil {
t.Error(err.Error())
return
}
defer os.Remove(file.Name())
project.LogLocation = file.Name()
project.LoggerConfig = &types.LoggerConfig{
FieldsOrder: []string{"message"},
DisableJSON: true,
TimestampFormat: "",
NoMetadata: true,
FlushEachLine: true,
NoColor: true,
}
go func() {
err := runner.Run()
if err != nil {
t.Error(err.Error())
}
}()
time.Sleep(10 * time.Millisecond)
// Wait for all processes to reach Running before triggering shutdown;
// shutdown.command relies only on shutDown() being invoked in order, not
// on signal-delivery races, but we still need the runner to have started
// all processes before ShutDownProject is meaningful.
for _, name := range []string{"procA", "procB", "procC", "procD"} {
waitForProcessState(t, runner, name, types.ProcessStateRunning, 5*time.Second)
}
states, err := runner.GetProcessesState()
if err != nil {
t.Error(err.Error())
Expand All @@ -534,7 +529,6 @@ func TestSystem_TestProcListShutsDownInOrder(t *testing.T) {
t.Errorf("len(states.States) = %d, want %d", len(states.States), want)
}

time.Sleep(10 * time.Millisecond)
err = runner.ShutDownProject()
if err != nil {
t.Error(err.Error())
Expand All @@ -554,18 +548,33 @@ func TestSystem_TestProcListShutsDownInOrder(t *testing.T) {
if runningProcesses != want {
t.Errorf("runningProcesses = %d, want %d", runningProcesses, want)
}
//read file and validate the shutdown order
scanner := bufio.NewScanner(file)
order := make([]string, 0)
for scanner.Scan() {
line := scanner.Text()
if strings.Contains(line, "exit") {
order = append(order, line)
}
}
//the order if first D or C exits is not defined
// Read the shutdown-order file written by each process's
// shutdown.command. C and D shut down in parallel, so either order
// between them is acceptable.
wantOrder1 := []string{"B: exit", "D: exit", "C: exit", "A: exit"}
wantOrder2 := []string{"B: exit", "C: exit", "D: exit", "A: exit"}

// Poll briefly: shutdown.command writes happen during ShutDownProject
// and should be on disk by now, but tolerate small fs lag on slow CI.
var order []string
deadline := time.Now().Add(5 * time.Second)
for time.Now().Before(deadline) {
data, readErr := os.ReadFile(shutdownLog)
if readErr != nil && !errors.Is(readErr, os.ErrNotExist) {
t.Error(readErr.Error())
return
}
order = order[:0]
for line := range strings.SplitSeq(strings.TrimRight(string(data), "\n"), "\n") {
if line != "" {
order = append(order, line)
}
}
if slices.Equal(order, wantOrder1) || slices.Equal(order, wantOrder2) {
break
}
time.Sleep(50 * time.Millisecond)
}
if !slices.Equal(order, wantOrder1) && !slices.Equal(order, wantOrder2) {
t.Errorf("content = %v, want %v or %v", order, wantOrder1, wantOrder2)
return
Expand Down Expand Up @@ -1412,6 +1421,12 @@ func TestSystem_ConcurrentRestartRaceCondition(t *testing.T) {
return
}

// The last restart may still be transitioning (Terminating → Launching →
// Running). Wait briefly for the post-restart state to stabilize instead of
// racing it — on slower runners (e.g. macOS) the underlying `sleep 2` of an
// earlier launch can complete before we sample the state.
waitForProcessState(t, runner, testProcess, types.ProcessStateRunning, 5*time.Second)

// Verify exactly one process is running after all concurrent restarts
state, err := runner.GetProcessState(testProcess)
if err != nil {
Expand Down
Loading