From 04bd0cd7f1731ca7afca6af9220a626e6c975b50 Mon Sep 17 00:00:00 2001 From: Alex Ellis Date: Wed, 29 Nov 2017 12:52:41 +0000 Subject: [PATCH] Allow configuration of watchdog from env-vars Signed-off-by: Alex Ellis --- README.md | 19 ++++ config/config.go | 78 +++++++++------- config/config_modes.go | 40 ++++++++ config/config_test.go | 132 +++++++++++++++++++++++++++ functions/serializing_fork_runner.go | 12 ++- functions/streaming_runner.go | 6 +- main.go | 12 ++- 7 files changed, 258 insertions(+), 41 deletions(-) create mode 100644 config/config_modes.go diff --git a/README.md b/README.md index aa0aa6d6..7ceb97e8 100644 --- a/README.md +++ b/README.md @@ -10,6 +10,23 @@ This is a re-write of the OpenFaaS watchdog. ![](https://camo.githubusercontent.com/61c169ab5cd01346bc3dc7a11edc1d218f0be3b4/68747470733a2f2f7062732e7477696d672e636f6d2f6d656469612f4447536344626c554941416f34482d2e6a70673a6c61726765) +## Config + +Environmental variables: + +| Option | Implemented | Usage | +|------------------------|--------------|-------------------------------| +| `function_process` | Yes | The process to invoke for each function call function process (alias - fprocess). This must be a UNIX binary and accept input via STDIN and output via STDOUT. | +| `read_timeout` | Yes | HTTP timeout for reading the payload from the client caller (in seconds) | +| `write_timeout` | Yes | HTTP timeout for writing a response body from your function (in seconds) | +| `hard_timeout` | Yes | Hard timeout for process exec'd for each incoming request (in seconds). Disabled if set to 0. | +| `port` | Yes | Specify an alternative TCP port fo testing | +| `write_debug` | No | Write all output, error messages, and additional information to the logs. Default is false. | +| `content_type` | No | Force a specific Content-Type response for all responses. | +| `suppress_lock` | No | The watchdog will attempt to write a lockfile to /tmp/ for swarm healthchecks - set this to true to disable behaviour. | + +> Note: the .lock file is implemented for health-checking, but cannot be disabled yet. + ## Watchdog modes: The original watchdog supported mode 3 Serializing fork and has support for mode 2 Afterburn in an open PR. @@ -24,6 +41,8 @@ Forks a process per request and can deal with more data than is available memory HTTP headers cannot be sent after function starts executing due to input/output being hooked-up directly to response for streaming efficiencies. Response code is always 200 unless there is an issue forking the process. An error mid-flight will have to be picked up on the client. Multi-threaded. +* Input is sent back to client as soon as it's printed to stdout by the executing process. + * A static Content-type can be set ahead of time. * Hard timeout: supported. diff --git a/config/config.go b/config/config.go index 56d08575..ac06e6f7 100644 --- a/config/config.go +++ b/config/config.go @@ -2,21 +2,25 @@ package config import ( "fmt" - "os" + "strconv" "strings" "time" ) +// WatchdogConfig configuration for a watchdog. type WatchdogConfig struct { TCPPort int HTTPReadTimeout time.Duration HTTPWriteTimeout time.Duration + HardTimeout time.Duration + FunctionProcess string + ContentType string InjectCGIHeaders bool - HardTimeout time.Duration OperationalMode int } +// Process returns a string for the process and a slice for the arguments from the FunctionProcess. func (w WatchdogConfig) Process() (string, []string) { parts := strings.Split(w.FunctionProcess, " ") @@ -27,18 +31,36 @@ func (w WatchdogConfig) Process() (string, []string) { return parts[0], []string{} } +// New create config based upon environmental variables. func New(env []string) (WatchdogConfig, error) { + + envMap := mapEnv(env) + + var functionProcess string + if val, exists := envMap["fprocess"]; exists { + functionProcess = val + } + + if val, exists := envMap["function_process"]; exists { + functionProcess = val + } + + contentType := "application/octet-stream" + if val, exists := envMap["content_type"]; exists { + contentType = val + } + config := WatchdogConfig{ - TCPPort: 8080, - HTTPReadTimeout: time.Second * 10, - HTTPWriteTimeout: time.Second * 10, - FunctionProcess: os.Getenv("fprocess"), + TCPPort: getInt(envMap, "port", 8080), + HTTPReadTimeout: getDuration(envMap, "read_timeout", time.Second*10), + HTTPWriteTimeout: getDuration(envMap, "write_timeout", time.Second*10), + FunctionProcess: functionProcess, InjectCGIHeaders: true, - HardTimeout: 5 * time.Second, + HardTimeout: getDuration(envMap, "hard_timeout", time.Second*10), OperationalMode: ModeStreaming, + ContentType: contentType, } - envMap := mapEnv(env) if val := envMap["mode"]; len(val) > 0 { config.OperationalMode = WatchdogModeConst(val) } @@ -60,34 +82,24 @@ func mapEnv(env []string) map[string]string { return mapped } -const ( - ModeStreaming = 1 - ModeSerializing = 2 - ModeAfterBurn = 3 -) +func getDuration(env map[string]string, key string, defaultValue time.Duration) time.Duration { + result := defaultValue + if val, exists := env[key]; exists { + parsed, _ := time.ParseDuration(val) + result = parsed -func WatchdogModeConst(mode string) int { - switch mode { - case "streaming": - return ModeStreaming - case "afterburn": - return ModeAfterBurn - case "serializing": - return ModeSerializing - default: - return 0 } + + return result } -func WatchdogMode(mode int) string { - switch mode { - case ModeStreaming: - return "streaming" - case ModeAfterBurn: - return "afterburn" - case ModeSerializing: - return "serializing" - default: - return "unknown" +func getInt(env map[string]string, key string, defaultValue int) int { + result := defaultValue + if val, exists := env[key]; exists { + parsed, _ := strconv.Atoi(val) + result = parsed + } + + return result } diff --git a/config/config_modes.go b/config/config_modes.go new file mode 100644 index 00000000..38fe9311 --- /dev/null +++ b/config/config_modes.go @@ -0,0 +1,40 @@ +package config + +const ( + // ModeStreaming streams the values live to the caller as they are printed by the process. + ModeStreaming = 1 + + // ModeSerializing reads all the response and buffers before returning + ModeSerializing = 2 + + // ModeAfterBurn for performance tuning + ModeAfterBurn = 3 +) + +// WatchdogModeConst as a const int +func WatchdogModeConst(mode string) int { + switch mode { + case "streaming": + return ModeStreaming + case "afterburn": + return ModeAfterBurn + case "serializing": + return ModeSerializing + default: + return 0 + } +} + +// WatchdogMode as a string +func WatchdogMode(mode int) string { + switch mode { + case ModeStreaming: + return "streaming" + case ModeAfterBurn: + return "afterburn" + case ModeSerializing: + return "serializing" + default: + return "unknown" + } +} diff --git a/config/config_test.go b/config/config_test.go index 7dc15382..97114ad9 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -1,6 +1,7 @@ package config import "testing" +import "time" func TestNew(t *testing.T) { defaults, err := New([]string{}) @@ -35,4 +36,135 @@ func Test_OperationalMode_AfterBurn(t *testing.T) { if actual.OperationalMode != ModeAfterBurn { t.Errorf("Want %s. got: %s", WatchdogMode(ModeAfterBurn), WatchdogMode(actual.OperationalMode)) } + +} + +func Test_ContentType_Default(t *testing.T) { + env := []string{} + + actual, err := New(env) + if err != nil { + t.Errorf("Expected no errors") + } + + if actual.ContentType != "application/octet-stream" { + t.Errorf("Default (ContentType) Want %s. got: %s", actual.ContentType, "octet-stream") + } +} + +func Test_ContentType_Override(t *testing.T) { + env := []string{ + "content_type=application/json", + } + + actual, err := New(env) + if err != nil { + t.Errorf("Expected no errors") + } + + if actual.ContentType != "application/json" { + t.Errorf("(ContentType) Want %s. got: %s", actual.ContentType, "application/json") + } +} + +func Test_FunctionProcessLegacyName(t *testing.T) { + env := []string{ + "fprocess=env", + } + + actual, err := New(env) + if err != nil { + t.Errorf("Expected no errors") + } + + if actual.FunctionProcess != "env" { + t.Errorf("Want %s. got: %s", "env", actual.FunctionProcess) + } +} + +func Test_FunctionProcessAlternativeName(t *testing.T) { + env := []string{ + "function_process=env", + } + + actual, err := New(env) + if err != nil { + t.Errorf("Expected no errors") + } + + if actual.FunctionProcess != "env" { + t.Errorf("Want %s. got: %s", "env", actual.FunctionProcess) + } +} + +func Test_PortOverride(t *testing.T) { + env := []string{ + "port=8081", + } + + actual, err := New(env) + if err != nil { + t.Errorf("Expected no errors") + } + + if actual.TCPPort != 8081 { + t.Errorf("Want %s. got: %s", 8081, actual.TCPPort) + } +} + +func Test_Timeouts(t *testing.T) { + cases := []struct { + readTimeout time.Duration + writeTimeout time.Duration + hardTimeout time.Duration + env []string + name string + }{ + { + name: "Defaults", + readTimeout: time.Second * 10, + writeTimeout: time.Second * 10, + hardTimeout: time.Second * 10, + env: []string{}, + }, + { + name: "Custom read-timeout", + readTimeout: time.Second * 5, + writeTimeout: time.Second * 10, + hardTimeout: time.Second * 10, + env: []string{"read_timeout=5s"}, + }, + { + name: "Custom write-timeout", + readTimeout: time.Second * 10, + writeTimeout: time.Second * 5, + hardTimeout: time.Second * 10, + env: []string{"write_timeout=5s"}, + }, + { + name: "Custom hard-timeout", + readTimeout: time.Second * 10, + writeTimeout: time.Second * 10, + hardTimeout: time.Second * 5, + env: []string{"hard_timeout=5s"}, + }, + } + + for _, testCase := range cases { + actual, err := New(testCase.env) + if err != nil { + t.Errorf("(%s) Expected no errors", testCase.name) + } + if testCase.readTimeout != actual.HTTPReadTimeout { + t.Errorf("(%s) HTTPReadTimeout want: %s, got: %s", testCase.name, actual.HTTPReadTimeout, testCase.readTimeout) + } + if testCase.writeTimeout != actual.HTTPWriteTimeout { + t.Errorf("(%s) HTTPWriteTimeout want: %s, got: %s", testCase.name, actual.HTTPWriteTimeout, testCase.writeTimeout) + } + if testCase.hardTimeout != actual.HardTimeout { + t.Errorf("(%s) HardTimeout want: %s, got: %s", testCase.name, actual.HardTimeout, testCase.hardTimeout) + } + + } + } diff --git a/functions/serializing_fork_runner.go b/functions/serializing_fork_runner.go index 14bc9594..2c40b697 100644 --- a/functions/serializing_fork_runner.go +++ b/functions/serializing_fork_runner.go @@ -1,7 +1,6 @@ package functions import ( - "fmt" "io" "io/ioutil" "log" @@ -26,6 +25,7 @@ func (f *SerializingForkFunctionRunner) Run(req FunctionRequest, w http.Response } w.WriteHeader(200) + if functionBytes != nil { _, err = w.Write(*functionBytes) } else { @@ -44,18 +44,22 @@ func serializeFunction(req FunctionRequest, f *SerializingForkFunctionRunner) (* var timer *time.Timer if f.HardTimeout > time.Millisecond*0 { + log.Println("Started a timer.") + timer = time.NewTimer(f.HardTimeout) go func() { <-timer.C - fmt.Printf("Function was killed by HardTimeout: %d\n", f.HardTimeout) + log.Printf("Function was killed by HardTimeout: %s\n", f.HardTimeout) killErr := cmd.Process.Kill() if killErr != nil { - fmt.Println("Error killing function due to HardTimeout", killErr) + log.Println("Error killing function due to HardTimeout", killErr) } }() } - defer timer.Stop() + if timer != nil { + defer timer.Stop() + } var data []byte diff --git a/functions/streaming_runner.go b/functions/streaming_runner.go index 7b9e218e..b0cf1fb6 100644 --- a/functions/streaming_runner.go +++ b/functions/streaming_runner.go @@ -39,6 +39,7 @@ func (f *ForkFunctionRunner) Run(req FunctionRequest) error { var timer *time.Timer if f.HardTimeout > time.Millisecond*0 { timer = time.NewTimer(f.HardTimeout) + go func() { <-timer.C @@ -88,12 +89,15 @@ func (f *ForkFunctionRunner) Run(req FunctionRequest) error { waitErr := cmd.Wait() done := time.Since(start) log.Printf("Took %f secs", done.Seconds()) - timer.Stop() + if timer != nil { + timer.Stop() + } req.InputReader.Close() if waitErr != nil { return waitErr } + return nil } diff --git a/main.go b/main.go index 70bfd8d8..f58b0f87 100644 --- a/main.go +++ b/main.go @@ -21,7 +21,7 @@ func main() { } if len(watchdogConfig.FunctionProcess) == 0 { - fmt.Fprintf(os.Stderr, "Provide a fprocess environmental variable for your function.\n") + fmt.Fprintf(os.Stderr, "Provide a \"function_process\" or \"fprocess\" environmental variable for your function.\n") os.Exit(-1) } @@ -71,6 +71,7 @@ func makeAfterBurnRequestHandler(watchdogConfig config.WatchdogConfig) func(http Process: commandName, ProcessArgs: arguments, } + fmt.Printf("Forking - %s %s\n", commandName, arguments) functionInvoker.Start() @@ -119,6 +120,7 @@ func makeSerializingForkRequestHandler(watchdogConfig config.WatchdogConfig) fun Environment: environment, } + w.Header().Set("Content-Type", watchdogConfig.ContentType) err := functionInvoker.Run(req, w) if err != nil { log.Println(err) @@ -148,10 +150,14 @@ func makeForkRequestHandler(watchdogConfig config.WatchdogConfig) func(http.Resp Environment: environment, } + w.Header().Set("Content-Type", watchdogConfig.ContentType) err := functionInvoker.Run(req) if err != nil { - w.WriteHeader(500) - w.Write([]byte(err.Error())) + log.Println(err.Error()) + + // Probably cannot write to client if we already have written a header + // w.WriteHeader(500) + // w.Write([]byte(err.Error())) } } }