Skip to content

Commit 27c35ba

Browse files
Display user's processes info in task log when high memory consumption is detected, for debugging reasons (#594)
* Display user's processes information in task log when high memory consumption is detected, for debugging reasons
1 parent 3bfdfeb commit 27c35ba

File tree

3 files changed

+44
-26
lines changed

3 files changed

+44
-26
lines changed

runner/execer/os/execer.go

Lines changed: 6 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package os
22

33
import (
4-
"context"
54
"fmt"
65
"io"
76
"math"
@@ -118,15 +117,15 @@ func (e *execer) Exec(command scootexecer.Command) (scootexecer.Process, error)
118117

119118
proc := &process{cmd: cmd, wg: &wg, ats: AbortTimeoutSec, LogTags: command.LogTags}
120119
if e.memCap > 0 {
121-
go e.monitorMem(proc, command.MemCh)
120+
go e.monitorMem(proc, command.MemCh, command.Stderr)
122121
}
123122

124123
return proc, nil
125124
}
126125

127126
// Periodically check to make sure memory constraints are respected,
128127
// and clean up after ourselves when the process has completed
129-
func (e *execer) monitorMem(p *process, memCh chan scootexecer.ProcessStatus) {
128+
func (e *execer) monitorMem(p *process, memCh chan scootexecer.ProcessStatus, stderr io.Writer) {
130129
pid := p.cmd.Process.Pid
131130
pgid, err := syscall.Getpgid(pid)
132131
if err != nil {
@@ -168,6 +167,8 @@ func (e *execer) monitorMem(p *process, memCh chan scootexecer.ProcessStatus) {
168167
Error: msg,
169168
ExitCode: errors.HighInitialMemoryUtilizationExitCode,
170169
}
170+
// log the process snapshot in worker log, as well as task stderr log
171+
e.pw.LogProcs(p, log.ErrorLevel, stderr)
171172
p.mutex.Unlock()
172173
e.memCapKill(p, mem, memCh)
173174
return
@@ -212,6 +213,7 @@ func (e *execer) monitorMem(p *process, memCh chan scootexecer.ProcessStatus) {
212213
Error: msg,
213214
ExitCode: 1,
214215
}
216+
e.pw.LogProcs(p, log.ErrorLevel, stderr)
215217
p.mutex.Unlock()
216218
e.memCapKill(p, mem, memCh)
217219
return
@@ -230,7 +232,7 @@ func (e *execer) monitorMem(p *process, memCh chan scootexecer.ProcessStatus) {
230232
"jobID": p.JobID,
231233
"taskID": p.TaskID,
232234
}).Infof("Memory utilization increased to %d%%, pid: %d", int(memUsagePct*100), pid)
233-
debugProcesses(p)
235+
e.pw.LogProcs(p, log.DebugLevel, nil)
234236
for memUsagePct > reportThresholds[thresholdsIdx] {
235237
thresholdsIdx++
236238
}
@@ -265,26 +267,6 @@ func (e *execer) memCapKill(p *process, mem scootexecer.Memory, memCh chan scoot
265267
e.stat.Gauge(stats.WorkerMemory).Update(int64(postKillMem))
266268
}
267269

268-
// debugProcesses logs a snapshot of the current processes at debug level.
269-
func debugProcesses(p *process) {
270-
// Debug log output with timeout since it seems CombinedOutput() sometimes fails to return.
271-
if log.IsLevelEnabled(log.DebugLevel) {
272-
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
273-
ps, err := exec.CommandContext(ctx, "ps", "-u", os.Getenv("USER"), "-opid,sess,ppid,pgid,rss,args").CombinedOutput()
274-
log.WithFields(
275-
log.Fields{
276-
"pid": p.cmd.Process.Pid,
277-
"ps": string(ps),
278-
"err": err,
279-
"errCtx": ctx.Err(),
280-
"tag": p.Tag,
281-
"jobID": p.JobID,
282-
"taskID": p.TaskID,
283-
}).Debugf("ps after increased memory utilization for pid %d", p.cmd.Process.Pid)
284-
cancel()
285-
}
286-
}
287-
288270
// Kill process along with all child processes, assuming no child processes called setpgid
289271
func cleanupProcs(pgid int) (err error) {
290272
log.WithFields(

runner/execer/os/process_watcher.go

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,24 @@
11
package os
22

33
import (
4+
"context"
45
"fmt"
6+
"io"
7+
"os"
8+
"time"
59

610
"os/exec"
711
"strings"
812

13+
log "github.com/sirupsen/logrus"
914
scootexecer "github.com/twitter/scoot/runner/execer"
1015
)
1116

1217
// Used for mocking memCap monitoring
1318
type ProcessWatcher interface {
1419
GetProcs() (map[int]ProcInfo, error)
1520
MemUsage(int) (scootexecer.Memory, error)
21+
LogProcs(*process, log.Level, io.Writer)
1622
}
1723

1824
type ProcInfo struct {
@@ -95,6 +101,36 @@ func (opw *procWatcher) MemUsage(pid int) (scootexecer.Memory, error) {
95101
return scootexecer.Memory(total * bytesToKB), nil
96102
}
97103

104+
// LogProcs logs the process snapshot of the current process along with other running processes for the user in the worker log,
105+
// at the specified level. Also writes to the writer, if provided
106+
func (opw *procWatcher) LogProcs(p *process, level log.Level, w io.Writer) {
107+
if !log.IsLevelEnabled(level) {
108+
return
109+
}
110+
111+
// log output with timeout since it seems CombinedOutput() sometimes fails to return.
112+
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
113+
ps, err := exec.CommandContext(ctx, "ps", "-u", os.Getenv("USER"), "-opid,sess,ppid,pgid,rss,args", "--sort=-rss").CombinedOutput()
114+
115+
log.WithFields(
116+
log.Fields{
117+
"pid": p.cmd.Process.Pid,
118+
"ps": string(ps),
119+
"err": err,
120+
"errCtx": ctx.Err(),
121+
"tag": p.Tag,
122+
"jobID": p.JobID,
123+
"taskID": p.TaskID,
124+
}).Log(level, fmt.Sprintf("ps after increased memory utilization for pid %d", p.cmd.Process.Pid))
125+
126+
if w != nil {
127+
w.Write([]byte(fmt.Sprintf("\nps after increased memory utilization for pid %d:\n\n", p.cmd.Process.Pid)))
128+
w.Write(ps)
129+
}
130+
131+
cancel()
132+
}
133+
98134
// Format processes into pgid and ppid groups for summation of memory usage
99135
func parseProcs(procs []string) (allProcesses map[int]ProcInfo, processGroups map[int][]ProcInfo,
100136
parentProcesses map[int][]ProcInfo, err error) {

runner/runners/invoke.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -329,7 +329,7 @@ func (inv *Invoker) run(cmd *runner.Command, id runner.RunID, abortCh chan struc
329329
"tag": cmd.Tag,
330330
"jobID": cmd.JobID,
331331
"taskID": cmd.TaskID,
332-
}).Info("Run timedout")
332+
}).Error("Run timedout")
333333
runStatus = runner.TimeoutStatus(id,
334334
tags.LogTags{JobID: cmd.JobID, TaskID: cmd.TaskID, Tag: cmd.Tag})
335335
case st := <-memCh:
@@ -345,7 +345,7 @@ func (inv *Invoker) run(cmd *runner.Command, id runner.RunID, abortCh chan struc
345345
"taskID": cmd.TaskID,
346346
"status": st,
347347
"checkout": co.Path(),
348-
}).Infof(st.Error)
348+
}).Errorf(st.Error)
349349
inv.stat.Counter(stats.WorkerMemoryCapExceeded).Inc(1)
350350
runStatus = getPostExecRunStatus(st, id, cmd)
351351
runStatus.Error = st.Error

0 commit comments

Comments
 (0)