diff --git a/README.md b/README.md index 7ceb97e8..9e70fabf 100644 --- a/README.md +++ b/README.md @@ -10,32 +10,13 @@ This is a re-write of the OpenFaaS watchdog. ![](https://camo.githubusercontent.com/61c169ab5cd01346bc3dc7a11edc1d218f0be3b4/68747470733a2f2f7062732e7477696d672e636f6d2f6d656469612f4447536344626c554941416f34482d2e6a70673a6c61726765) -## Config - -Environmental variables: - -| Option | Implemented | Usage | -|------------------------|--------------|-------------------------------| -| `function_process` | Yes | The process to invoke for each function call function process (alias - fprocess). This must be a UNIX binary and accept input via STDIN and output via STDOUT. | -| `read_timeout` | Yes | HTTP timeout for reading the payload from the client caller (in seconds) | -| `write_timeout` | Yes | HTTP timeout for writing a response body from your function (in seconds) | -| `hard_timeout` | Yes | Hard timeout for process exec'd for each incoming request (in seconds). Disabled if set to 0. | -| `port` | Yes | Specify an alternative TCP port fo testing | -| `write_debug` | No | Write all output, error messages, and additional information to the logs. Default is false. | -| `content_type` | No | Force a specific Content-Type response for all responses. | -| `suppress_lock` | No | The watchdog will attempt to write a lockfile to /tmp/ for swarm healthchecks - set this to true to disable behaviour. | - -> Note: the .lock file is implemented for health-checking, but cannot be disabled yet. - ## Watchdog modes: -The original watchdog supported mode 3 Serializing fork and has support for mode 2 Afterburn in an open PR. +History/context: the original watchdog supported mode the Serializing fork mode only and Afterburn was available for testing via a pull request. -When complete this work will support all three modes and additional stretch goal of: +When the of-watchdog is complete this version will support four modes as listed below. We may consolidate or remove some of these modes before going to 1.0 so please consider modes 2-4 experimental. -* Handling of multi-part forms - -### 1. Streaming fork (implemented) - default. +### 1. Streaming fork (mode=streaming) - default. Forks a process per request and can deal with more data than is available memory capacity - i.e. 512mb VM can process multiple GB of video. @@ -47,7 +28,7 @@ HTTP headers cannot be sent after function starts executing due to input/output * Hard timeout: supported. -### 2. Afterburn (implemented) +### 2. Afterburn (mode=afterburn) Uses a single process for all requests, if that request dies the container dies. @@ -69,7 +50,41 @@ https://github.com/alexellis/python-afterburn https://github.com/alexellis/java-afterburn -### 3. Serializing fork (implemented in dev-branch) +### 3. HTTP (mode=http) + +The HTTP mode is similar to AfterBurn. + +A process is forked when the watchdog starts, we then forward any request incoming to the watchdog to a HTTP port within the container. + +Pros: + +* Fastest option - high concurrency and throughput + +* Does not require new/custom client libraries like afterburn but makes use of a long-running daemon such as Express.js for Node or Flask for Python + +Example usage for testing: + +* Forward to an NGinx container: + +``` +$ go build ; mode=http port=8081 fprocess="docker run -p 80:80 --name nginx -t nginx" upstream_url=http://127.0.0.1:80 ./of-watchdog +``` + +* Forward to a Node.js / Express.js hello-world app: + +``` +$ go build ; mode=http port=8081 fprocess="node expressjs-hello-world.js" upstream_url=http://127.0.0.1:3000 ./of-watchdog +``` + +Cons: + +* Questionable as to whether this is actually "serverless" + +* Daemons such as express/flask/sinatra could be hard to configure or potentially unpredictable when used in this way + +* One more HTTP hop in the chain between the client and the function + +### 4. Serializing fork (mode=serializing) Forks one process per request. Multi-threaded. Ideal for retro-fitting a CGI application handler i.e. for Flask. @@ -85,3 +100,20 @@ Reads entire request into memory from the HTTP request. At this point we seriali * Hard timeout: supported. +## Configuration + +Environmental variables: + +| Option | Implemented | Usage | +|------------------------|--------------|-------------------------------| +| `function_process` | Yes | The process to invoke for each function call function process (alias - fprocess). This must be a UNIX binary and accept input via STDIN and output via STDOUT. | +| `read_timeout` | Yes | HTTP timeout for reading the payload from the client caller (in seconds) | +| `write_timeout` | Yes | HTTP timeout for writing a response body from your function (in seconds) | +| `hard_timeout` | Yes | Hard timeout for process exec'd for each incoming request (in seconds). Disabled if set to 0. | +| `port` | Yes | Specify an alternative TCP port fo testing | +| `write_debug` | No | Write all output, error messages, and additional information to the logs. Default is false. | +| `content_type` | Yes | Force a specific Content-Type response for all responses - only in forking/serializing modes. | +| `suppress_lock` | No | The watchdog will attempt to write a lockfile to /tmp/ for swarm healthchecks - set this to true to disable behaviour. | +| `upstream_url` | Yes | `http` mode only - where to forward requests i.e. 127.0.0.1:5000 | + +> Note: the .lock file is implemented for health-checking, but cannot be disabled yet. You must create this file in /tmp/. diff --git a/config/config_modes.go b/config/config_modes.go index 38fe9311..4fa3cb87 100644 --- a/config/config_modes.go +++ b/config/config_modes.go @@ -9,6 +9,9 @@ const ( // ModeAfterBurn for performance tuning ModeAfterBurn = 3 + + //ModeHTTP for routing requests over HTTP + ModeHTTP = 4 ) // WatchdogModeConst as a const int @@ -20,6 +23,8 @@ func WatchdogModeConst(mode string) int { return ModeAfterBurn case "serializing": return ModeSerializing + case "http": + return ModeHTTP default: return 0 } @@ -34,6 +39,8 @@ func WatchdogMode(mode int) string { return "afterburn" case ModeSerializing: return "serializing" + case ModeHTTP: + return "http" default: return "unknown" } diff --git a/functions/http_runner.go b/functions/http_runner.go new file mode 100644 index 00000000..73fdb448 --- /dev/null +++ b/functions/http_runner.go @@ -0,0 +1,139 @@ +package functions + +import ( + "io" + "io/ioutil" + "log" + "net" + "net/http" + "net/url" + "os" + "os/exec" + "sync" + "time" +) + +// HTTPFunctionRunner creates and maintains one process responsible for handling all calls +type HTTPFunctionRunner struct { + Process string + ProcessArgs []string + Command *exec.Cmd + StdinPipe io.WriteCloser + StdoutPipe io.ReadCloser + Stderr io.Writer + Mutex sync.Mutex + Client *http.Client + UpstreamURL *url.URL +} + +// Start forks the process used for processing incoming requests +func (f *HTTPFunctionRunner) Start() error { + cmd := exec.Command(f.Process, f.ProcessArgs...) + + var stdinErr error + var stdoutErr error + + f.Command = cmd + f.StdinPipe, stdinErr = cmd.StdinPipe() + if stdinErr != nil { + return stdinErr + } + + f.StdoutPipe, stdoutErr = cmd.StdoutPipe() + if stdoutErr != nil { + return stdoutErr + } + + errPipe, _ := cmd.StderrPipe() + + // Prints stderr to console and is picked up by container logging driver. + go func() { + log.Println("Started logging stderr from function.") + for { + errBuff := make([]byte, 256) + + _, err := errPipe.Read(errBuff) + if err != nil { + log.Fatalf("Error reading stderr: %s", err) + + } else { + log.Printf("stderr: %s", errBuff) + } + } + }() + + go func() { + log.Println("Started logging stdout from function.") + for { + errBuff := make([]byte, 256) + + _, err := f.StdoutPipe.Read(errBuff) + if err != nil { + log.Fatalf("Error reading stdout: %s", err) + + } else { + log.Printf("stdout: %s", errBuff) + } + } + }() + + dialTimeout := 3 * time.Second + f.Client = makeProxyClient(dialTimeout) + + urlValue, upstreamURLErr := url.Parse(os.Getenv("upstream_url")) + if upstreamURLErr != nil { + log.Fatal(upstreamURLErr) + } + + f.UpstreamURL = urlValue + + return cmd.Start() +} + +// Run a function with a long-running process with a HTTP protocol for communication +func (f *HTTPFunctionRunner) Run(req FunctionRequest, contentLength int64, r *http.Request, w http.ResponseWriter) error { + + request, _ := http.NewRequest(r.Method, f.UpstreamURL.String(), r.Body) + + res, err := f.Client.Do(request) + + if err != nil { + log.Println(err) + } + + for h := range res.Header { + w.Header().Set(h, res.Header.Get(h)) + } + + w.WriteHeader(res.StatusCode) + if res.Body != nil { + defer res.Body.Close() + bodyBytes, bodyErr := ioutil.ReadAll(res.Body) + if bodyErr != nil { + log.Println("read body err", bodyErr) + } + w.Write(bodyBytes) + } + + log.Printf("%s %s - %s - ContentLength: %d", r.Method, r.RequestURI, res.Status, res.ContentLength) + + return nil +} + +func makeProxyClient(dialTimeout time.Duration) *http.Client { + proxyClient := http.Client{ + Transport: &http.Transport{ + Proxy: http.ProxyFromEnvironment, + DialContext: (&net.Dialer{ + Timeout: dialTimeout, + KeepAlive: 1, + }).DialContext, + MaxIdleConns: 1, + DisableKeepAlives: true, + IdleConnTimeout: 120 * time.Millisecond, + ExpectContinueTimeout: 1500 * time.Millisecond, + }, + } + + return &proxyClient +} diff --git a/main.go b/main.go index f58b0f87..017c8500 100644 --- a/main.go +++ b/main.go @@ -32,29 +32,40 @@ func main() { MaxHeaderBytes: 1 << 20, // Max header of 1MB } + requestHandler := buildRequestHandler(watchdogConfig) + + log.Printf("OperationalMode: %s\n", config.WatchdogMode(watchdogConfig.OperationalMode)) + + if err := lock(); err != nil { + log.Panic(err.Error()) + } + + http.HandleFunc("/", requestHandler) + log.Fatal(s.ListenAndServe()) +} + +func buildRequestHandler(watchdogConfig config.WatchdogConfig) http.HandlerFunc { var requestHandler http.HandlerFunc switch watchdogConfig.OperationalMode { case config.ModeStreaming: - log.Println("OperationalMode: Streaming") requestHandler = makeForkRequestHandler(watchdogConfig) break case config.ModeSerializing: - log.Println("OperationalMode: Serializing") requestHandler = makeSerializingForkRequestHandler(watchdogConfig) break case config.ModeAfterBurn: - log.Println("OperationalMode: AfterBurn") requestHandler = makeAfterBurnRequestHandler(watchdogConfig) break + case config.ModeHTTP: + requestHandler = makeHTTPRequestHandler(watchdogConfig) + break + default: + log.Panicf("unknown watchdog mode: %d", watchdogConfig.OperationalMode) + break } - if err := lock(); err != nil { - log.Panic(err.Error()) - } - - http.HandleFunc("/", requestHandler) - log.Fatal(s.ListenAndServe()) + return requestHandler } func lock() error { @@ -182,3 +193,32 @@ func getEnvironment(r *http.Request) []string { return envs } + +func makeHTTPRequestHandler(watchdogConfig config.WatchdogConfig) func(http.ResponseWriter, *http.Request) { + commandName, arguments := watchdogConfig.Process() + functionInvoker := functions.HTTPFunctionRunner{ + Process: commandName, + ProcessArgs: arguments, + } + + fmt.Printf("Forking - %s %s\n", commandName, arguments) + functionInvoker.Start() + + return func(w http.ResponseWriter, r *http.Request) { + + req := functions.FunctionRequest{ + Process: commandName, + ProcessArgs: arguments, + InputReader: r.Body, + OutputWriter: w, + } + + err := functionInvoker.Run(req, r.ContentLength, r, w) + + if err != nil { + w.WriteHeader(500) + w.Write([]byte(err.Error())) + } + + } +}