Skip to content

Commit ed882c7

Browse files
committed
fix: command execution control fixes
1 parent f4e8bb1 commit ed882c7

File tree

2 files changed

+132
-48
lines changed

2 files changed

+132
-48
lines changed

pkg/executor/cmd-executor.go

+125-46
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,12 @@ import (
99
"syscall"
1010
)
1111

12+
// type CommandGroup struct {
13+
// Commands []func(context.Context) *exec.Cmd
14+
// Parallel bool
15+
// Sequential bool
16+
// }
17+
1218
type CmdExecutor struct {
1319
logger *slog.Logger
1420
parentCtx context.Context
@@ -19,12 +25,20 @@ type CmdExecutor struct {
1925
mu sync.Mutex
2026

2127
kill func() error
28+
29+
Parallel []ParallelCommands
30+
}
31+
32+
type ParallelCommands struct {
33+
Index int
34+
Len int
2235
}
2336

2437
type CmdExecutorArgs struct {
2538
Logger *slog.Logger
2639
Commands []func(context.Context) *exec.Cmd
2740
Interactive bool
41+
Parallel []ParallelCommands
2842
}
2943

3044
func NewCmdExecutor(ctx context.Context, args CmdExecutorArgs) *CmdExecutor {
@@ -38,6 +52,7 @@ func NewCmdExecutor(ctx context.Context, args CmdExecutorArgs) *CmdExecutor {
3852
commands: args.Commands,
3953
mu: sync.Mutex{},
4054
interactive: args.Interactive,
55+
Parallel: args.Parallel,
4156
}
4257
}
4358

@@ -67,72 +82,136 @@ func killPID(pid int, logger ...*slog.Logger) error {
6782
return nil
6883
}
6984

70-
// Start implements Executor.
71-
func (ex *CmdExecutor) Start() error {
72-
ex.mu.Lock()
73-
defer ex.mu.Unlock()
74-
for i := range ex.commands {
75-
if err := ex.parentCtx.Err(); err != nil {
76-
return err
77-
}
85+
func (ex *CmdExecutor) exec(newCmd func(context.Context) *exec.Cmd) error {
86+
if err := ex.parentCtx.Err(); err != nil {
87+
return err
88+
}
7889

79-
ctx, cf := context.WithCancel(ex.parentCtx)
80-
defer cf()
90+
ctx, cf := context.WithCancel(ex.parentCtx)
91+
defer cf()
8192

82-
cmd := ex.commands[i](ctx)
93+
cmd := newCmd(ctx)
8394

84-
cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
85-
if ex.interactive {
86-
cmd.Stdin = os.Stdin
87-
cmd.SysProcAttr.Foreground = true
88-
}
95+
cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
96+
if ex.interactive {
97+
cmd.Stdin = os.Stdin
98+
cmd.SysProcAttr.Foreground = true
99+
}
89100

90-
if err := cmd.Start(); err != nil {
91-
return err
92-
}
101+
if err := cmd.Start(); err != nil {
102+
return err
103+
}
93104

94-
logger := ex.logger.With("pid", cmd.Process.Pid, "command", i+1)
105+
logger := ex.logger.With("pid", cmd.Process.Pid, "command", cmd.String())
95106

96-
ex.kill = func() error {
97-
return killPID(cmd.Process.Pid, logger)
98-
}
107+
ex.kill = func() error {
108+
return killPID(cmd.Process.Pid, logger)
109+
}
110+
111+
exitErr := make(chan error, 1)
99112

100-
go func() {
101-
if err := cmd.Wait(); err != nil {
102-
logger.Debug("process finished (wait completed), got", "err", err)
113+
go func() {
114+
if err := cmd.Wait(); err != nil {
115+
exitErr <- err
116+
logger.Debug("process finished (wait completed), got", "err", err)
117+
}
118+
cf()
119+
}()
120+
121+
select {
122+
case <-ctx.Done():
123+
logger.Debug("process finished (context cancelled)")
124+
case err := <-exitErr:
125+
if exitErr, ok := err.(*exec.ExitError); ok {
126+
logger.Debug("process finished", "exit.code", exitErr.ExitCode())
127+
if exitErr.ExitCode() != 0 {
128+
return err
103129
}
104-
cf()
105-
}()
106-
107-
select {
108-
case <-ctx.Done():
109-
logger.Debug("process finished (context cancelled)")
110-
case <-ex.parentCtx.Done():
111-
logger.Debug("process finished (parent context cancelled)")
130+
}
131+
case <-ex.parentCtx.Done():
132+
logger.Debug("process finished (parent context cancelled)")
133+
}
134+
135+
if ex.interactive {
136+
// Send SIGTERM to the interactive process, as user will see it on his screen
137+
proc, err := os.FindProcess(os.Getpid())
138+
if err != nil {
139+
return err
112140
}
113141

114-
if ex.interactive {
115-
// Send SIGTERM to the interactive process, as user will see it on his screen
116-
proc, err := os.FindProcess(os.Getpid())
117-
if err != nil {
142+
err = proc.Signal(syscall.SIGTERM)
143+
if err != nil {
144+
if err != syscall.ESRCH {
145+
logger.Error("failed to kill, got", "err", err)
118146
return err
119147
}
148+
return err
149+
}
150+
}
151+
152+
if err := ex.kill(); err != nil {
153+
return err
154+
}
120155

121-
err = proc.Signal(syscall.SIGTERM)
122-
if err != nil {
123-
if err != syscall.ESRCH {
124-
logger.Error("failed to kill, got", "err", err)
125-
return err
156+
logger.Debug("command fully executed and processed")
157+
return nil
158+
}
159+
160+
// Start implements Executor.
161+
func (ex *CmdExecutor) Start() error {
162+
ex.mu.Lock()
163+
defer ex.mu.Unlock()
164+
165+
var wg sync.WaitGroup
166+
167+
for i := 0; i < len(ex.commands); i++ {
168+
newCmd := ex.commands[i]
169+
170+
ex.logger.Info("HELLO", "idx", i, "ex.parallel", ex.Parallel)
171+
isParallel := false
172+
173+
for _, p := range ex.Parallel {
174+
if p.Index == i {
175+
isParallel = true
176+
for k := i; k <= i+p.Len; k++ {
177+
wg.Add(1)
178+
go func() {
179+
defer wg.Done()
180+
if err := ex.exec(newCmd); err != nil {
181+
ex.logger.Info("executing, got", "err", err)
182+
// handle error
183+
}
184+
}()
126185
}
127-
return err
186+
187+
i = i + p.Len - 1
128188
}
189+
break
129190
}
130191

131-
if err := ex.kill(); err != nil {
192+
if isParallel {
193+
continue
194+
}
195+
196+
// if ex.Parallel {
197+
// wg.Add(1)
198+
// go func() {
199+
// defer wg.Add(1)
200+
// if err := ex.exec(newCmd); err != nil {
201+
// // handle error
202+
// }
203+
// }()
204+
// continue
205+
// }
206+
207+
if err := ex.exec(newCmd); err != nil {
208+
ex.logger.Error("cmd failed with", "err", err)
132209
return err
133210
}
211+
}
134212

135-
logger.Debug("command fully executed and processed")
213+
if len(ex.Parallel) > 0 {
214+
wg.Wait()
136215
}
137216

138217
return nil

pkg/watcher/watcher.go

+7-2
Original file line numberDiff line numberDiff line change
@@ -274,6 +274,7 @@ func NewWatcher(ctx context.Context, args WatcherArgs) (*Watcher, error) {
274274
}
275275

276276
excludeDirs := map[string]struct{}{}
277+
277278
for _, dir := range args.IgnoreDirs {
278279
if args.ShouldLogWatchEvents {
279280
args.Logger.Debug("EXCLUDED from watching", "dir", dir)
@@ -282,8 +283,12 @@ func NewWatcher(ctx context.Context, args WatcherArgs) (*Watcher, error) {
282283
}
283284

284285
for _, dir := range args.WatchDirs {
285-
if strings.HasPrefix(dir, "-") {
286-
excludeDirs[dir[1:]] = struct{}{}
286+
if args.ShouldLogWatchEvents {
287+
args.Logger.Debug("watch-dirs", "dir", dir)
288+
}
289+
d := filepath.Base(dir)
290+
if strings.HasPrefix(d, "-") {
291+
excludeDirs[d[1:]] = struct{}{}
287292
}
288293
}
289294

0 commit comments

Comments
 (0)