Skip to content

Commit

Permalink
Add HTTP as a watchdog mode
Browse files Browse the repository at this point in the history
Signed-off-by: Alex Ellis <[email protected]>
  • Loading branch information
alexellis committed Jan 24, 2018
1 parent 4b0ee8f commit 6910e94
Show file tree
Hide file tree
Showing 4 changed files with 251 additions and 33 deletions.
80 changes: 56 additions & 24 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,32 +10,13 @@ This is a re-write of the OpenFaaS watchdog.

![](https://camo.githubusercontent.com/61c169ab5cd01346bc3dc7a11edc1d218f0be3b4/68747470733a2f2f7062732e7477696d672e636f6d2f6d656469612f4447536344626c554941416f34482d2e6a70673a6c61726765)

## Config

Environmental variables:

| Option | Implemented | Usage |
|------------------------|--------------|-------------------------------|
| `function_process` | Yes | The process to invoke for each function call function process (alias - fprocess). This must be a UNIX binary and accept input via STDIN and output via STDOUT. |
| `read_timeout` | Yes | HTTP timeout for reading the payload from the client caller (in seconds) |
| `write_timeout` | Yes | HTTP timeout for writing a response body from your function (in seconds) |
| `hard_timeout` | Yes | Hard timeout for process exec'd for each incoming request (in seconds). Disabled if set to 0. |
| `port` | Yes | Specify an alternative TCP port fo testing |
| `write_debug` | No | Write all output, error messages, and additional information to the logs. Default is false. |
| `content_type` | No | Force a specific Content-Type response for all responses. |
| `suppress_lock` | No | The watchdog will attempt to write a lockfile to /tmp/ for swarm healthchecks - set this to true to disable behaviour. |

> Note: the .lock file is implemented for health-checking, but cannot be disabled yet.
## Watchdog modes:

The original watchdog supported mode 3 Serializing fork and has support for mode 2 Afterburn in an open PR.
History/context: the original watchdog supported mode the Serializing fork mode only and Afterburn was available for testing via a pull request.

When complete this work will support all three modes and additional stretch goal of:
When the of-watchdog is complete this version will support four modes as listed below. We may consolidate or remove some of these modes before going to 1.0 so please consider modes 2-4 experimental.

* Handling of multi-part forms

### 1. Streaming fork (implemented) - default.
### 1. Streaming fork (mode=streaming) - default.

Forks a process per request and can deal with more data than is available memory capacity - i.e. 512mb VM can process multiple GB of video.

Expand All @@ -47,7 +28,7 @@ HTTP headers cannot be sent after function starts executing due to input/output

* Hard timeout: supported.

### 2. Afterburn (implemented)
### 2. Afterburn (mode=afterburn)

Uses a single process for all requests, if that request dies the container dies.

Expand All @@ -69,7 +50,41 @@ https://github.com/alexellis/python-afterburn

https://github.com/alexellis/java-afterburn

### 3. Serializing fork (implemented in dev-branch)
### 3. HTTP (mode=http)

The HTTP mode is similar to AfterBurn.

A process is forked when the watchdog starts, we then forward any request incoming to the watchdog to a HTTP port within the container.

Pros:

* Fastest option - high concurrency and throughput

* Does not require new/custom client libraries like afterburn but makes use of a long-running daemon such as Express.js for Node or Flask for Python

Example usage for testing:

* Forward to an NGinx container:

```
$ go build ; mode=http port=8081 fprocess="docker run -p 80:80 --name nginx -t nginx" upstream_url=http://127.0.0.1:80 ./of-watchdog
```

* Forward to a Node.js / Express.js hello-world app:

```
$ go build ; mode=http port=8081 fprocess="node expressjs-hello-world.js" upstream_url=http://127.0.0.1:3000 ./of-watchdog
```

Cons:

* Questionable as to whether this is actually "serverless"

* Daemons such as express/flask/sinatra could be hard to configure or potentially unpredictable when used in this way

* One more HTTP hop in the chain between the client and the function

### 4. Serializing fork (mode=serializing)

Forks one process per request. Multi-threaded. Ideal for retro-fitting a CGI application handler i.e. for Flask.

Expand All @@ -85,3 +100,20 @@ Reads entire request into memory from the HTTP request. At this point we seriali

* Hard timeout: supported.

## Configuration

Environmental variables:

| Option | Implemented | Usage |
|------------------------|--------------|-------------------------------|
| `function_process` | Yes | The process to invoke for each function call function process (alias - fprocess). This must be a UNIX binary and accept input via STDIN and output via STDOUT. |
| `read_timeout` | Yes | HTTP timeout for reading the payload from the client caller (in seconds) |
| `write_timeout` | Yes | HTTP timeout for writing a response body from your function (in seconds) |
| `hard_timeout` | Yes | Hard timeout for process exec'd for each incoming request (in seconds). Disabled if set to 0. |
| `port` | Yes | Specify an alternative TCP port fo testing |
| `write_debug` | No | Write all output, error messages, and additional information to the logs. Default is false. |
| `content_type` | Yes | Force a specific Content-Type response for all responses - only in forking/serializing modes. |
| `suppress_lock` | No | The watchdog will attempt to write a lockfile to /tmp/ for swarm healthchecks - set this to true to disable behaviour. |
| `upstream_url` | Yes | `http` mode only - where to forward requests i.e. 127.0.0.1:5000 |

> Note: the .lock file is implemented for health-checking, but cannot be disabled yet. You must create this file in /tmp/.
7 changes: 7 additions & 0 deletions config/config_modes.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ const (

// ModeAfterBurn for performance tuning
ModeAfterBurn = 3

//ModeHTTP for routing requests over HTTP
ModeHTTP = 4
)

// WatchdogModeConst as a const int
Expand All @@ -20,6 +23,8 @@ func WatchdogModeConst(mode string) int {
return ModeAfterBurn
case "serializing":
return ModeSerializing
case "http":
return ModeHTTP
default:
return 0
}
Expand All @@ -34,6 +39,8 @@ func WatchdogMode(mode int) string {
return "afterburn"
case ModeSerializing:
return "serializing"
case ModeHTTP:
return "http"
default:
return "unknown"
}
Expand Down
139 changes: 139 additions & 0 deletions functions/http_runner.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
package functions

import (
"io"
"io/ioutil"
"log"
"net"
"net/http"
"net/url"
"os"
"os/exec"
"sync"
"time"
)

// HTTPFunctionRunner creates and maintains one process responsible for handling all calls
type HTTPFunctionRunner struct {
Process string
ProcessArgs []string
Command *exec.Cmd
StdinPipe io.WriteCloser
StdoutPipe io.ReadCloser
Stderr io.Writer
Mutex sync.Mutex
Client *http.Client
UpstreamURL *url.URL
}

// Start forks the process used for processing incoming requests
func (f *HTTPFunctionRunner) Start() error {
cmd := exec.Command(f.Process, f.ProcessArgs...)

var stdinErr error
var stdoutErr error

f.Command = cmd
f.StdinPipe, stdinErr = cmd.StdinPipe()
if stdinErr != nil {
return stdinErr
}

f.StdoutPipe, stdoutErr = cmd.StdoutPipe()
if stdoutErr != nil {
return stdoutErr
}

errPipe, _ := cmd.StderrPipe()

// Prints stderr to console and is picked up by container logging driver.
go func() {
log.Println("Started logging stderr from function.")
for {
errBuff := make([]byte, 256)

_, err := errPipe.Read(errBuff)
if err != nil {
log.Fatalf("Error reading stderr: %s", err)

} else {
log.Printf("stderr: %s", errBuff)
}
}
}()

go func() {
log.Println("Started logging stdout from function.")
for {
errBuff := make([]byte, 256)

_, err := f.StdoutPipe.Read(errBuff)
if err != nil {
log.Fatalf("Error reading stdout: %s", err)

} else {
log.Printf("stdout: %s", errBuff)
}
}
}()

dialTimeout := 3 * time.Second
f.Client = makeProxyClient(dialTimeout)

urlValue, upstreamURLErr := url.Parse(os.Getenv("upstream_url"))
if upstreamURLErr != nil {
log.Fatal(upstreamURLErr)
}

f.UpstreamURL = urlValue

return cmd.Start()
}

// Run a function with a long-running process with a HTTP protocol for communication
func (f *HTTPFunctionRunner) Run(req FunctionRequest, contentLength int64, r *http.Request, w http.ResponseWriter) error {

request, _ := http.NewRequest(r.Method, f.UpstreamURL.String(), r.Body)

res, err := f.Client.Do(request)

if err != nil {
log.Println(err)
}

for h := range res.Header {
w.Header().Set(h, res.Header.Get(h))
}

w.WriteHeader(res.StatusCode)
if res.Body != nil {
defer res.Body.Close()
bodyBytes, bodyErr := ioutil.ReadAll(res.Body)
if bodyErr != nil {
log.Println("read body err", bodyErr)
}
w.Write(bodyBytes)
}

log.Printf("%s %s - %s - ContentLength: %d", r.Method, r.RequestURI, res.Status, res.ContentLength)

return nil
}

func makeProxyClient(dialTimeout time.Duration) *http.Client {
proxyClient := http.Client{
Transport: &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: dialTimeout,
KeepAlive: 1,
}).DialContext,
MaxIdleConns: 1,
DisableKeepAlives: true,
IdleConnTimeout: 120 * time.Millisecond,
ExpectContinueTimeout: 1500 * time.Millisecond,
},
}

return &proxyClient
}
58 changes: 49 additions & 9 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,29 +32,40 @@ func main() {
MaxHeaderBytes: 1 << 20, // Max header of 1MB
}

requestHandler := buildRequestHandler(watchdogConfig)

log.Printf("OperationalMode: %s\n", config.WatchdogMode(watchdogConfig.OperationalMode))

if err := lock(); err != nil {
log.Panic(err.Error())
}

http.HandleFunc("/", requestHandler)
log.Fatal(s.ListenAndServe())
}

func buildRequestHandler(watchdogConfig config.WatchdogConfig) http.HandlerFunc {
var requestHandler http.HandlerFunc

switch watchdogConfig.OperationalMode {
case config.ModeStreaming:
log.Println("OperationalMode: Streaming")
requestHandler = makeForkRequestHandler(watchdogConfig)
break
case config.ModeSerializing:
log.Println("OperationalMode: Serializing")
requestHandler = makeSerializingForkRequestHandler(watchdogConfig)
break
case config.ModeAfterBurn:
log.Println("OperationalMode: AfterBurn")
requestHandler = makeAfterBurnRequestHandler(watchdogConfig)
break
case config.ModeHTTP:
requestHandler = makeHTTPRequestHandler(watchdogConfig)
break
default:
log.Panicf("unknown watchdog mode: %d", watchdogConfig.OperationalMode)
break
}

if err := lock(); err != nil {
log.Panic(err.Error())
}

http.HandleFunc("/", requestHandler)
log.Fatal(s.ListenAndServe())
return requestHandler
}

func lock() error {
Expand Down Expand Up @@ -182,3 +193,32 @@ func getEnvironment(r *http.Request) []string {

return envs
}

func makeHTTPRequestHandler(watchdogConfig config.WatchdogConfig) func(http.ResponseWriter, *http.Request) {
commandName, arguments := watchdogConfig.Process()
functionInvoker := functions.HTTPFunctionRunner{
Process: commandName,
ProcessArgs: arguments,
}

fmt.Printf("Forking - %s %s\n", commandName, arguments)
functionInvoker.Start()

return func(w http.ResponseWriter, r *http.Request) {

req := functions.FunctionRequest{
Process: commandName,
ProcessArgs: arguments,
InputReader: r.Body,
OutputWriter: w,
}

err := functionInvoker.Run(req, r.ContentLength, r, w)

if err != nil {
w.WriteHeader(500)
w.Write([]byte(err.Error()))
}

}
}

0 comments on commit 6910e94

Please sign in to comment.