Skip to content

Commit

Permalink
Allow configuration of watchdog from env-vars
Browse files Browse the repository at this point in the history
Signed-off-by: Alex Ellis <[email protected]>
  • Loading branch information
alexellis committed Nov 29, 2017
1 parent 5901ef3 commit 04bd0cd
Show file tree
Hide file tree
Showing 7 changed files with 258 additions and 41 deletions.
19 changes: 19 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,23 @@ This is a re-write of the OpenFaaS watchdog.

![](https://camo.githubusercontent.com/61c169ab5cd01346bc3dc7a11edc1d218f0be3b4/68747470733a2f2f7062732e7477696d672e636f6d2f6d656469612f4447536344626c554941416f34482d2e6a70673a6c61726765)

## Config

Environmental variables:

| Option | Implemented | Usage |
|------------------------|--------------|-------------------------------|
| `function_process` | Yes | The process to invoke for each function call function process (alias - fprocess). This must be a UNIX binary and accept input via STDIN and output via STDOUT. |
| `read_timeout` | Yes | HTTP timeout for reading the payload from the client caller (in seconds) |
| `write_timeout` | Yes | HTTP timeout for writing a response body from your function (in seconds) |
| `hard_timeout` | Yes | Hard timeout for process exec'd for each incoming request (in seconds). Disabled if set to 0. |
| `port` | Yes | Specify an alternative TCP port fo testing |
| `write_debug` | No | Write all output, error messages, and additional information to the logs. Default is false. |
| `content_type` | No | Force a specific Content-Type response for all responses. |
| `suppress_lock` | No | The watchdog will attempt to write a lockfile to /tmp/ for swarm healthchecks - set this to true to disable behaviour. |

> Note: the .lock file is implemented for health-checking, but cannot be disabled yet.
## Watchdog modes:

The original watchdog supported mode 3 Serializing fork and has support for mode 2 Afterburn in an open PR.
Expand All @@ -24,6 +41,8 @@ Forks a process per request and can deal with more data than is available memory

HTTP headers cannot be sent after function starts executing due to input/output being hooked-up directly to response for streaming efficiencies. Response code is always 200 unless there is an issue forking the process. An error mid-flight will have to be picked up on the client. Multi-threaded.

* Input is sent back to client as soon as it's printed to stdout by the executing process.

* A static Content-type can be set ahead of time.

* Hard timeout: supported.
Expand Down
78 changes: 45 additions & 33 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,25 @@ package config

import (
"fmt"
"os"
"strconv"
"strings"
"time"
)

// WatchdogConfig configuration for a watchdog.
type WatchdogConfig struct {
TCPPort int
HTTPReadTimeout time.Duration
HTTPWriteTimeout time.Duration
HardTimeout time.Duration

FunctionProcess string
ContentType string
InjectCGIHeaders bool
HardTimeout time.Duration
OperationalMode int
}

// Process returns a string for the process and a slice for the arguments from the FunctionProcess.
func (w WatchdogConfig) Process() (string, []string) {
parts := strings.Split(w.FunctionProcess, " ")

Expand All @@ -27,18 +31,36 @@ func (w WatchdogConfig) Process() (string, []string) {
return parts[0], []string{}
}

// New create config based upon environmental variables.
func New(env []string) (WatchdogConfig, error) {

envMap := mapEnv(env)

var functionProcess string
if val, exists := envMap["fprocess"]; exists {
functionProcess = val
}

if val, exists := envMap["function_process"]; exists {
functionProcess = val
}

contentType := "application/octet-stream"
if val, exists := envMap["content_type"]; exists {
contentType = val
}

config := WatchdogConfig{
TCPPort: 8080,
HTTPReadTimeout: time.Second * 10,
HTTPWriteTimeout: time.Second * 10,
FunctionProcess: os.Getenv("fprocess"),
TCPPort: getInt(envMap, "port", 8080),
HTTPReadTimeout: getDuration(envMap, "read_timeout", time.Second*10),
HTTPWriteTimeout: getDuration(envMap, "write_timeout", time.Second*10),
FunctionProcess: functionProcess,
InjectCGIHeaders: true,
HardTimeout: 5 * time.Second,
HardTimeout: getDuration(envMap, "hard_timeout", time.Second*10),
OperationalMode: ModeStreaming,
ContentType: contentType,
}

envMap := mapEnv(env)
if val := envMap["mode"]; len(val) > 0 {
config.OperationalMode = WatchdogModeConst(val)
}
Expand All @@ -60,34 +82,24 @@ func mapEnv(env []string) map[string]string {
return mapped
}

const (
ModeStreaming = 1
ModeSerializing = 2
ModeAfterBurn = 3
)
func getDuration(env map[string]string, key string, defaultValue time.Duration) time.Duration {
result := defaultValue
if val, exists := env[key]; exists {
parsed, _ := time.ParseDuration(val)
result = parsed

func WatchdogModeConst(mode string) int {
switch mode {
case "streaming":
return ModeStreaming
case "afterburn":
return ModeAfterBurn
case "serializing":
return ModeSerializing
default:
return 0
}

return result
}

func WatchdogMode(mode int) string {
switch mode {
case ModeStreaming:
return "streaming"
case ModeAfterBurn:
return "afterburn"
case ModeSerializing:
return "serializing"
default:
return "unknown"
func getInt(env map[string]string, key string, defaultValue int) int {
result := defaultValue
if val, exists := env[key]; exists {
parsed, _ := strconv.Atoi(val)
result = parsed

}

return result
}
40 changes: 40 additions & 0 deletions config/config_modes.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package config

const (
// ModeStreaming streams the values live to the caller as they are printed by the process.
ModeStreaming = 1

// ModeSerializing reads all the response and buffers before returning
ModeSerializing = 2

// ModeAfterBurn for performance tuning
ModeAfterBurn = 3
)

// WatchdogModeConst as a const int
func WatchdogModeConst(mode string) int {
switch mode {
case "streaming":
return ModeStreaming
case "afterburn":
return ModeAfterBurn
case "serializing":
return ModeSerializing
default:
return 0
}
}

// WatchdogMode as a string
func WatchdogMode(mode int) string {
switch mode {
case ModeStreaming:
return "streaming"
case ModeAfterBurn:
return "afterburn"
case ModeSerializing:
return "serializing"
default:
return "unknown"
}
}
132 changes: 132 additions & 0 deletions config/config_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package config

import "testing"
import "time"

func TestNew(t *testing.T) {
defaults, err := New([]string{})
Expand Down Expand Up @@ -35,4 +36,135 @@ func Test_OperationalMode_AfterBurn(t *testing.T) {
if actual.OperationalMode != ModeAfterBurn {
t.Errorf("Want %s. got: %s", WatchdogMode(ModeAfterBurn), WatchdogMode(actual.OperationalMode))
}

}

func Test_ContentType_Default(t *testing.T) {
env := []string{}

actual, err := New(env)
if err != nil {
t.Errorf("Expected no errors")
}

if actual.ContentType != "application/octet-stream" {
t.Errorf("Default (ContentType) Want %s. got: %s", actual.ContentType, "octet-stream")
}
}

func Test_ContentType_Override(t *testing.T) {
env := []string{
"content_type=application/json",
}

actual, err := New(env)
if err != nil {
t.Errorf("Expected no errors")
}

if actual.ContentType != "application/json" {
t.Errorf("(ContentType) Want %s. got: %s", actual.ContentType, "application/json")
}
}

func Test_FunctionProcessLegacyName(t *testing.T) {
env := []string{
"fprocess=env",
}

actual, err := New(env)
if err != nil {
t.Errorf("Expected no errors")
}

if actual.FunctionProcess != "env" {
t.Errorf("Want %s. got: %s", "env", actual.FunctionProcess)
}
}

func Test_FunctionProcessAlternativeName(t *testing.T) {
env := []string{
"function_process=env",
}

actual, err := New(env)
if err != nil {
t.Errorf("Expected no errors")
}

if actual.FunctionProcess != "env" {
t.Errorf("Want %s. got: %s", "env", actual.FunctionProcess)
}
}

func Test_PortOverride(t *testing.T) {
env := []string{
"port=8081",
}

actual, err := New(env)
if err != nil {
t.Errorf("Expected no errors")
}

if actual.TCPPort != 8081 {
t.Errorf("Want %s. got: %s", 8081, actual.TCPPort)
}
}

func Test_Timeouts(t *testing.T) {
cases := []struct {
readTimeout time.Duration
writeTimeout time.Duration
hardTimeout time.Duration
env []string
name string
}{
{
name: "Defaults",
readTimeout: time.Second * 10,
writeTimeout: time.Second * 10,
hardTimeout: time.Second * 10,
env: []string{},
},
{
name: "Custom read-timeout",
readTimeout: time.Second * 5,
writeTimeout: time.Second * 10,
hardTimeout: time.Second * 10,
env: []string{"read_timeout=5s"},
},
{
name: "Custom write-timeout",
readTimeout: time.Second * 10,
writeTimeout: time.Second * 5,
hardTimeout: time.Second * 10,
env: []string{"write_timeout=5s"},
},
{
name: "Custom hard-timeout",
readTimeout: time.Second * 10,
writeTimeout: time.Second * 10,
hardTimeout: time.Second * 5,
env: []string{"hard_timeout=5s"},
},
}

for _, testCase := range cases {
actual, err := New(testCase.env)
if err != nil {
t.Errorf("(%s) Expected no errors", testCase.name)
}
if testCase.readTimeout != actual.HTTPReadTimeout {
t.Errorf("(%s) HTTPReadTimeout want: %s, got: %s", testCase.name, actual.HTTPReadTimeout, testCase.readTimeout)
}
if testCase.writeTimeout != actual.HTTPWriteTimeout {
t.Errorf("(%s) HTTPWriteTimeout want: %s, got: %s", testCase.name, actual.HTTPWriteTimeout, testCase.writeTimeout)
}
if testCase.hardTimeout != actual.HardTimeout {
t.Errorf("(%s) HardTimeout want: %s, got: %s", testCase.name, actual.HardTimeout, testCase.hardTimeout)
}

}

}
12 changes: 8 additions & 4 deletions functions/serializing_fork_runner.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package functions

import (
"fmt"
"io"
"io/ioutil"
"log"
Expand All @@ -26,6 +25,7 @@ func (f *SerializingForkFunctionRunner) Run(req FunctionRequest, w http.Response
}

w.WriteHeader(200)

if functionBytes != nil {
_, err = w.Write(*functionBytes)
} else {
Expand All @@ -44,18 +44,22 @@ func serializeFunction(req FunctionRequest, f *SerializingForkFunctionRunner) (*

var timer *time.Timer
if f.HardTimeout > time.Millisecond*0 {
log.Println("Started a timer.")

timer = time.NewTimer(f.HardTimeout)
go func() {
<-timer.C

fmt.Printf("Function was killed by HardTimeout: %d\n", f.HardTimeout)
log.Printf("Function was killed by HardTimeout: %s\n", f.HardTimeout)
killErr := cmd.Process.Kill()
if killErr != nil {
fmt.Println("Error killing function due to HardTimeout", killErr)
log.Println("Error killing function due to HardTimeout", killErr)
}
}()
}
defer timer.Stop()
if timer != nil {
defer timer.Stop()
}

var data []byte

Expand Down
Loading

0 comments on commit 04bd0cd

Please sign in to comment.