Skip to content

Commit

Permalink
Add http_buffer to "http" mode
Browse files Browse the repository at this point in the history
Allows servers such as Swoole which do not implement the whole
HTTP spec to be used with of-watchdog. Apply this setting when
Transfer-Encoding: chunked is not compatible with your http
server in http mode.

Tested with netcat "nc" outside of a Docker container to see
the output either sent with a content-length when turned on
or without one when in chunked mode.

Signed-off-by: Alex Ellis (VMware) <[email protected]>
  • Loading branch information
alexellis committed Nov 18, 2018
1 parent 8df2b25 commit 85505a7
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 23 deletions.
12 changes: 7 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -140,14 +140,16 @@ Environmental variables:

| Option | Implemented | Usage |
|------------------------|--------------|-------------------------------|
| `function_process` | Yes | The process to invoke for each function call function process (alias - fprocess). This must be a UNIX binary and accept input via STDIN and output via STDOUT. |
| `function_process` | Yes | Process to execute a server in `http` mode or to be executed for each request in the other modes. For non `http` mode the process must accept input via STDIN and print output via STDOUT. Alias: `fprocess` |
| `read_timeout` | Yes | HTTP timeout for reading the payload from the client caller (in seconds) |
| `write_timeout` | Yes | HTTP timeout for writing a response body from your function (in seconds) |
| `exec_timeout` | Yes | Exec timeout for process exec'd for each incoming request (in seconds). Disabled if set to 0. |
| `port` | Yes | Specify an alternative TCP port for testing |
| `write_debug` | No | Write all output, error messages, and additional information to the logs. Default is false. |
| `port` | Yes | Specify an alternative TCP port for testing. Default: `8080` |
| `write_debug` | No | Write all output, error messages, and additional information to the logs. Default is `false`. |
| `content_type` | Yes | Force a specific Content-Type response for all responses - only in forking/serializing modes. |
| `suppress_lock` | Yes | The watchdog will attempt to write a lockfile to /tmp/ for swarm healthchecks - set this to true to disable behaviour. |
| `upstream_url` | Yes | `http` mode only - where to forward requests i.e. 127.0.0.1:5000 |
| `suppress_lock` | Yes | When set to `false` the watchdog will attempt to write a lockfile to /tmp/ for healthchecks. Default `false` |
| `upstream_url` | Yes | `http` mode only - where to forward requests i.e. `127.0.0.1:5000` |
| `buffer_http` | Yes | `http` mode only - buffers request body to memory before fowarding. Use if your upstream HTTP server does not accept `Transfer-Encoding: chunked` Default: `false` |


> Note: the .lock file is implemented for health-checking, but cannot be disabled yet. You must create this file in /tmp/.
6 changes: 6 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ type WatchdogConfig struct {
OperationalMode int
SuppressLock bool
UpstreamURL string

// BufferHTTPBody buffers the HTTP body in memory
// to prevent transfer type of chunked encoding
// which some servers do not support.
BufferHTTPBody bool
}

// Process returns a string for the process and a slice for the arguments from the FunctionProcess.
Expand Down Expand Up @@ -71,6 +76,7 @@ func New(env []string) (WatchdogConfig, error) {
ContentType: contentType,
SuppressLock: getBool(envMap, "suppress_lock"),
UpstreamURL: upstreamURL,
BufferHTTPBody: getBool(envMap, "buffer_http"),
}

if val := envMap["mode"]; len(val) > 0 {
Expand Down
28 changes: 27 additions & 1 deletion config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,33 @@ func Test_OperationalMode_Default(t *testing.T) {
t.Errorf("Want %s. got: %s", WatchdogMode(ModeStreaming), WatchdogMode(defaults.OperationalMode))
}
}
func Test_BufferHttpModeDefaultsToFalse(t *testing.T) {
env := []string{}

actual, err := New(env)
if err != nil {
t.Errorf("Expected no errors")
}
want := false
if actual.BufferHTTPBody != want {
t.Errorf("Want %v. got: %v", want, actual.BufferHTTPBody)
}
}

func Test_BufferHttpMode_CanBeSetToTrue(t *testing.T) {
env := []string{
"buffer_http=true",
}

actual, err := New(env)
if err != nil {
t.Errorf("Expected no errors")
}
want := true
if actual.BufferHTTPBody != want {
t.Errorf("Want %v. got: %v", want, actual.BufferHTTPBody)
}
}

func Test_OperationalMode_AfterBurn(t *testing.T) {
env := []string{
Expand All @@ -36,7 +63,6 @@ func Test_OperationalMode_AfterBurn(t *testing.T) {
if actual.OperationalMode != ModeAfterBurn {
t.Errorf("Want %s. got: %s", WatchdogMode(ModeAfterBurn), WatchdogMode(actual.OperationalMode))
}

}

func Test_ContentType_Default(t *testing.T) {
Expand Down
36 changes: 22 additions & 14 deletions executor/http_runner.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package executor

import (
"bytes"
"context"
"fmt"
"io"
Expand All @@ -12,25 +13,24 @@ import (
"os"
"os/exec"
"os/signal"
"sync"
"syscall"
"time"
)

// HTTPFunctionRunner creates and maintains one process responsible for handling all calls
type HTTPFunctionRunner struct {
ExecTimeout time.Duration // ExecTimeout the maxmium duration or an upstream function call
ReadTimeout time.Duration
WriteTimeout time.Duration
Process string
ProcessArgs []string
Command *exec.Cmd
StdinPipe io.WriteCloser
StdoutPipe io.ReadCloser
Stderr io.Writer
Mutex sync.Mutex
Client *http.Client
UpstreamURL *url.URL
ExecTimeout time.Duration // ExecTimeout the maxmium duration or an upstream function call
ReadTimeout time.Duration
WriteTimeout time.Duration
Process string
ProcessArgs []string
Command *exec.Cmd
StdinPipe io.WriteCloser
StdoutPipe io.ReadCloser
Stderr io.Writer
Client *http.Client
UpstreamURL *url.URL
BufferHTTPBody bool
}

// Start forks the process used for processing incoming requests
Expand Down Expand Up @@ -108,7 +108,15 @@ func (f *HTTPFunctionRunner) Run(req FunctionRequest, contentLength int64, r *ht
upstreamURL += r.RequestURI
}

request, _ := http.NewRequest(r.Method, upstreamURL, r.Body)
var body io.Reader
if f.BufferHTTPBody {
reqBody, _ := ioutil.ReadAll(r.Body)
body = bytes.NewReader(reqBody)
} else {
body = r.Body
}

request, _ := http.NewRequest(r.Method, upstreamURL, body)
for h := range r.Header {
request.Header.Set(h, r.Header.Get(h))
}
Expand Down
7 changes: 4 additions & 3 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,9 +278,10 @@ func getEnvironment(r *http.Request) []string {
func makeHTTPRequestHandler(watchdogConfig config.WatchdogConfig) func(http.ResponseWriter, *http.Request) {
commandName, arguments := watchdogConfig.Process()
functionInvoker := executor.HTTPFunctionRunner{
ExecTimeout: watchdogConfig.ExecTimeout,
Process: commandName,
ProcessArgs: arguments,
ExecTimeout: watchdogConfig.ExecTimeout,
Process: commandName,
ProcessArgs: arguments,
BufferHTTPBody: watchdogConfig.BufferHTTPBody,
}

if len(watchdogConfig.UpstreamURL) == 0 {
Expand Down

0 comments on commit 85505a7

Please sign in to comment.