From e3fb00c24c8c4df4d7ceef88495138243410f10c Mon Sep 17 00:00:00 2001 From: Andrey Golev Date: Mon, 11 May 2020 14:15:00 +0300 Subject: [PATCH] Prometheus metrics. Configurable timeout and memory limit --- builtin/bins/dkron-executor-shell/main.go | 27 ++++++ .../bins/dkron-executor-shell/prometheus.go | 50 +++++++++++ builtin/bins/dkron-executor-shell/shell.go | 88 +++++++++++++++++-- .../bins/dkron-executor-shell/shell_unix.go | 31 +++++++ 4 files changed, 187 insertions(+), 9 deletions(-) create mode 100644 builtin/bins/dkron-executor-shell/prometheus.go diff --git a/builtin/bins/dkron-executor-shell/main.go b/builtin/bins/dkron-executor-shell/main.go index fb22bcc78..be5a095ef 100644 --- a/builtin/bins/dkron-executor-shell/main.go +++ b/builtin/bins/dkron-executor-shell/main.go @@ -1,11 +1,24 @@ package main import ( + "net/http" + "os" + dkplugin "github.com/distribworks/dkron/v3/plugin" "github.com/hashicorp/go-plugin" + "github.com/prometheus/client_golang/prometheus/promhttp" + log "github.com/sirupsen/logrus" ) func main() { + finish := make(chan bool) + promServer := http.NewServeMux() + promServer.Handle("/metrics", promhttp.Handler()) + + go func() { + http.ListenAndServe(":"+getEnv("PROMETHEUS_PORT"), promServer) + }() + plugin.Serve(&plugin.ServeConfig{ HandshakeConfig: dkplugin.Handshake, Plugins: map[string]plugin.Plugin{ @@ -15,4 +28,18 @@ func main() { // A non-nil value here enables gRPC serving for this plugin... GRPCServer: plugin.DefaultGRPCServer, }) + <-finish +} + +func getEnv(key string) string { + v, ok := os.LookupEnv(key) + if v == "" { + log.Warningf("empty value for environment variable %s", key) + return "set_my_env_var" + } + if !ok { + log.Warningf("environment variable %s is not set", key) + return "var_is_empty" + } + return v } diff --git a/builtin/bins/dkron-executor-shell/prometheus.go b/builtin/bins/dkron-executor-shell/prometheus.go new file mode 100644 index 000000000..68062497b --- /dev/null +++ b/builtin/bins/dkron-executor-shell/prometheus.go @@ -0,0 +1,50 @@ +package main + +import ( + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +const ( + namespace = "dkron_job" +) + +var ( + cpuUsage = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: namespace, + Name: "cpu_usage", + Help: "CPU usage by job", + }, + []string{"job_name"}) + + memUsage = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: namespace, + Name: "mem_usage_kb", + Help: "Current memory consumed by job", + }, + []string{"job_name"}) + + executionTime = prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: namespace, + Name: "execution_time_seconds", + Help: "Duration of job execution", + }) + + exitCode = prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: namespace, + Name: "exit_code", + Help: "Exit code of a job", + }) + + + lastExecutionTimestamp = prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: namespace, + Name: "last_execution_unixtimestamp", + }) + +) + +func updateMetric(jobName string, metricName *prometheus.GaugeVec, value float64) { + metricName.WithLabelValues(jobName).Set(value) +} + diff --git a/builtin/bins/dkron-executor-shell/shell.go b/builtin/bins/dkron-executor-shell/shell.go index 84d4f47b8..d2d11be9e 100644 --- a/builtin/bins/dkron-executor-shell/shell.go +++ b/builtin/bins/dkron-executor-shell/shell.go @@ -2,7 +2,7 @@ package main import ( "encoding/base64" - "log" + "fmt" "os" "os/exec" "runtime" @@ -14,6 +14,9 @@ import ( dkplugin "github.com/distribworks/dkron/v3/plugin" dktypes "github.com/distribworks/dkron/v3/plugin/types" "github.com/mattn/go-shellwords" + "github.com/prometheus/client_golang/prometheus/push" + log "github.com/sirupsen/logrus" + "github.com/struCoder/pidusage" ) const ( @@ -59,6 +62,10 @@ func (s *Shell) ExecuteImpl(args *dktypes.ExecuteRequest, cb dkplugin.StatusHelp shell = false } command := args.Config["command"] + if command == "" { + return nil, err + } + env := strings.Split(args.Config["env"], ",") cwd := args.Config["cwd"] @@ -74,12 +81,6 @@ func (s *Shell) ExecuteImpl(args *dktypes.ExecuteRequest, cb dkplugin.StatusHelp cmd.Stderr = reportingWriter{buffer: output, cb: cb, isError: true} cmd.Stdout = reportingWriter{buffer: output, cb: cb} - // Start a timer to warn about slow handlers - slowTimer := time.AfterFunc(2*time.Hour, func() { - log.Printf("shell: Script '%s' slow, execution exceeding %v", command, 2*time.Hour) - }) - defer slowTimer.Stop() - stdin, err := cmd.StdinPipe() if err != nil { return nil, err @@ -96,20 +97,86 @@ func (s *Shell) ExecuteImpl(args *dktypes.ExecuteRequest, cb dkplugin.StatusHelp stdin.Close() log.Printf("shell: going to run %s", command) + + jobTimeout := args.Config["timeout"] + jobMemLimit := args.Config["mem_limit_kb"] + + if jobTimeout == "" { + log.Infof("shell: Job '%v' doesn't have configured timeout. Defaulting to 24h", args.JobName) + jobTimeout = "24h" + } + + if args.Config["mem_limit_kb"] == "" { + log.Infof("shell: Job '%v' doesn't have configured mem_limit_kb", args.JobName) + jobMemLimit = "inf" + } + + t, err := time.ParseDuration(jobTimeout) + if err != nil { + log.Infof("shell: Job '%v' can't parse job timeout. Defaulting to 24h", args.JobName) + jobTimeout = "24h" + } + + startTime := time.Now() err = cmd.Start() if err != nil { return nil, err } + slowTimer := time.AfterFunc(t, func() { + j := fmt.Sprintf("shell: Job '%s' execution time exceeding timeout %v. Killing job.", command, t) + output.Write([]byte(j)) + cmd.Process.Kill() + + }) + defer slowTimer.Stop() + // Warn if buffer is overritten if output.TotalWritten() > output.Size() { log.Printf("shell: Script '%s' generated %d bytes of output, truncated to %d", command, output.TotalWritten(), output.Size()) } + pid := cmd.Process.Pid + quit := make(chan struct{}) + + go func() { + for { + select { + case <-quit: + return + default: + stat, _ := pidusage.GetStat(pid) + mem, _ := calculateMemory(pid) + cpu := stat.CPU + updateMetric(args.JobName, memUsage, float64(mem)) + updateMetric(args.JobName, cpuUsage, cpu) + if jobMemLimit != "inf" { + i, _ := strconv.ParseUint(jobMemLimit, 0, 64) + if mem > i { + j := fmt.Sprintf("shell: Job '%s' memory limit exceeded %vkb. Killing job.", command, i) + output.Write([]byte(j)) + cmd.Process.Kill() + return + } + } + time.Sleep(1 * time.Second) + } + } + }() + err = cmd.Wait() + close(quit) + + executionTime.Set(time.Since(startTime).Seconds()) + exitCode.Set(float64(cmd.ProcessState.ExitCode())) + lastExecutionTimestamp.Set(float64(time.Now().Unix())) - // Always log output - log.Printf("shell: Command output %s", output) + push.New(getEnv("PUSHGATEWAY_URL"), "dkron_job_push").Collector(executionTime).Grouping("job_name", args.JobName).Add() + push.New(getEnv("PUSHGATEWAY_URL"), "dkron_job_push").Collector(exitCode).Grouping("job_name", args.JobName).Add() + push.New(getEnv("PUSHGATEWAY_URL"), "dkron_job_push").Collector(lastExecutionTimestamp).Grouping("job_name", args.JobName).Add() + + updateMetric(args.JobName, memUsage, 0) + updateMetric(args.JobName, cpuUsage, 0) return output.Bytes(), err } @@ -132,6 +199,9 @@ func buildCmd(command string, useShell bool, env []string, cwd string) (cmd *exe if err != nil { return nil, err } + if len(args) == 0 { + return nil, err + } cmd = exec.Command(args[0], args[1:]...) } if env != nil { diff --git a/builtin/bins/dkron-executor-shell/shell_unix.go b/builtin/bins/dkron-executor-shell/shell_unix.go index f1d671598..f55980ffc 100644 --- a/builtin/bins/dkron-executor-shell/shell_unix.go +++ b/builtin/bins/dkron-executor-shell/shell_unix.go @@ -3,6 +3,10 @@ package main import ( + "bufio" + "bytes" + "fmt" + "os" "os/exec" "os/user" "strconv" @@ -37,3 +41,30 @@ func setCmdAttr(cmd *exec.Cmd, config map[string]string) error { } return nil } + +func calculateMemory(pid int) (uint64, error) { + f, err := os.Open(fmt.Sprintf("/proc/%d/smaps", pid)) + if err != nil { + return 0, err + } + defer f.Close() + + res := uint64(0) + rfx := []byte("Rss:") + r := bufio.NewScanner(f) + for r.Scan() { + line := r.Bytes() + if bytes.HasPrefix(line, rfx) { + var size uint64 + _, err := fmt.Sscanf(string(line[4:]), "%d", &size) + if err != nil { + return 0, err + } + res += size + } + } + if err := r.Err(); err != nil { + return 0, err + } + return res, nil +}