Skip to content

Commit

Permalink
Make response messages consistent between modes
Browse files Browse the repository at this point in the history
Makes the response messages consistent between HTTP,
streaming and serializing modes.

Tested with various processes as a Go process on a Linux host.

Signed-off-by: Alex Ellis (OpenFaaS Ltd) <[email protected]>
  • Loading branch information
alexellis committed Oct 14, 2022
1 parent 0b79385 commit aa898b7
Show file tree
Hide file tree
Showing 16 changed files with 742 additions and 54 deletions.
5 changes: 4 additions & 1 deletion executor/function_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@ type FunctionRunner interface {

// FunctionRequest stores request for function execution
type FunctionRequest struct {
Path string
RequestURI string
Method string
UserAgent string

Process string
ProcessArgs []string
Environment []string
Expand Down
8 changes: 6 additions & 2 deletions executor/http_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ import (
"fmt"
"io"
"io/ioutil"

units "github.com/docker/go-units"

"log"
"net"
"net/http"
Expand Down Expand Up @@ -155,8 +158,9 @@ func (f *HTTPFunctionRunner) Run(req FunctionRequest, contentLength int64, r *ht
}

copyHeaders(w.Header(), &res.Header)
done := time.Since(startedTime)

w.Header().Set("X-Duration-Seconds", fmt.Sprintf("%f", time.Since(startedTime).Seconds()))
w.Header().Set("X-Duration-Seconds", fmt.Sprintf("%f", done.Seconds()))

w.WriteHeader(res.StatusCode)
if res.Body != nil {
Expand All @@ -172,7 +176,7 @@ func (f *HTTPFunctionRunner) Run(req FunctionRequest, contentLength int64, r *ht
// Exclude logging for health check probes from the kubelet which can spam
// log collection systems.
if !strings.HasPrefix(r.UserAgent(), "kube-probe") {
log.Printf("%s %s - %s - ContentLength: %d", r.Method, r.RequestURI, res.Status, res.ContentLength)
log.Printf("%s %s - %s - ContentLength: %s (%.4fs)", r.Method, r.RequestURI, res.Status, units.HumanSize(float64(res.ContentLength)), done.Seconds())
}

return nil
Expand Down
56 changes: 35 additions & 21 deletions executor/serializing_fork_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,13 @@ import (
"fmt"
"io"
"io/ioutil"

units "github.com/docker/go-units"

"log"
"net/http"
"os/exec"
"strings"
"sync"
"time"
)
Expand All @@ -27,30 +31,42 @@ func (f *SerializingForkFunctionRunner) Run(req FunctionRequest, w http.Response
body, err := serializeFunction(req, f)
if err != nil {
w.Header().Set("X-Duration-Seconds", fmt.Sprintf("%f", time.Since(start).Seconds()))
w.WriteHeader(500)
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(err.Error()))

done := time.Since(start)

if !strings.HasPrefix(req.UserAgent, "kube-probe") {
log.Printf("%s %s - %d - ContentLength: %s (%.4fs)", req.Method, req.RequestURI, http.StatusOK, units.HumanSize(float64(len(err.Error()))), done.Seconds())
}

return err
}

w.Header().Set("X-Duration-Seconds", fmt.Sprintf("%f", time.Since(start).Seconds()))
w.WriteHeader(200)

bodyLen := 0
if body != nil {
_, err = w.Write(*body)
bodyLen = len(*body)
}

done := time.Since(start)

if !strings.HasPrefix(req.UserAgent, "kube-probe") {
log.Printf("%s %s - %d - ContentLength: %s (%.4fs)", req.Method, req.RequestURI, http.StatusOK, units.HumanSize(float64(bodyLen)), done.Seconds())
}

return err
}

func serializeFunction(req FunctionRequest, f *SerializingForkFunctionRunner) (*[]byte, error) {
log.Printf("Running: %s", req.Process)

if req.InputReader != nil {
defer req.InputReader.Close()
}

start := time.Now()

var cmd *exec.Cmd
ctx := context.Background()
if f.ExecTimeout.Nanoseconds() > 0 {
Expand All @@ -60,21 +76,25 @@ func serializeFunction(req FunctionRequest, f *SerializingForkFunctionRunner) (*
}

cmd = exec.CommandContext(ctx, req.Process, req.ProcessArgs...)
cmd.Env = req.Environment

var data []byte

reader := req.InputReader.(io.Reader)
if req.InputReader != nil {
reader := req.InputReader.(io.Reader)

// Limit read to the Content-Length header, if provided
if req.ContentLength != nil && *req.ContentLength > 0 {
reader = io.LimitReader(req.InputReader, *req.ContentLength)
}
// Limit read to the Content-Length header, if provided
if req.ContentLength != nil && *req.ContentLength > 0 {
reader = io.LimitReader(req.InputReader, *req.ContentLength)
}

var err error
data, err = ioutil.ReadAll(reader)
var err error
data, err = ioutil.ReadAll(reader)

if err != nil {
return nil, err
}

if err != nil {
return nil, err
}

stdout, _ := cmd.StdoutPipe()
Expand All @@ -89,15 +109,9 @@ func serializeFunction(req FunctionRequest, f *SerializingForkFunctionRunner) (*
return nil, errors[0]
}

err = cmd.Wait()
done := time.Since(start)
if err != nil {
return nil, fmt.Errorf("%s exited: after %.2fs, error: %s", req.Process, done.Seconds(), err)
}

log.Printf("%s done: %.2fs secs", req.Process, done.Seconds())
err := cmd.Wait()

return functionRes, nil
return functionRes, err
}

func pipeToProcess(stdin io.WriteCloser, stdout io.Reader, data *[]byte) (*[]byte, []error) {
Expand Down
14 changes: 1 addition & 13 deletions executor/forking_runner.go → executor/streaming_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ package executor

import (
"context"
"fmt"
"log"
"os"
"os/exec"
"time"
Expand All @@ -21,8 +19,6 @@ type StreamingFunctionRunner struct {

// Run run a fork for each invocation
func (f *StreamingFunctionRunner) Run(req FunctionRequest) error {
log.Printf("Running: %s - %s", req.Process, req.Path)
start := time.Now()

var cmd *exec.Cmd
ctx := context.Background()
Expand Down Expand Up @@ -50,13 +46,5 @@ func (f *StreamingFunctionRunner) Run(req FunctionRequest) error {
return err
}

err := cmd.Wait()
done := time.Since(start)
if err != nil {
return fmt.Errorf("%s exited: after %.2fs, error: %s", req.Process, done.Seconds(), err)
}

log.Printf("%s done: %.2fs secs", req.Process, done.Seconds())

return nil
return cmd.Wait()
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/openfaas/of-watchdog
go 1.18

require (
github.com/docker/go-units v0.5.0
github.com/openfaas/faas-middleware v1.2.2
github.com/prometheus/client_golang v1.13.0
)
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGX
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4=
github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk=
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98=
Expand Down
64 changes: 47 additions & 17 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"flag"
"fmt"
"io"
"io/ioutil"
"log"
"net/http"
Expand All @@ -19,6 +20,8 @@ import (
"syscall"
"time"

units "github.com/docker/go-units"

limiter "github.com/openfaas/faas-middleware/concurrency-limiter"
"github.com/openfaas/of-watchdog/config"
"github.com/openfaas/of-watchdog/executor"
Expand Down Expand Up @@ -78,7 +81,7 @@ func main() {
limit = requestLimiter
}

log.Printf("Watchdog mode: %s\n", config.WatchdogMode(watchdogConfig.OperationalMode))
log.Printf("Watchdog mode: %s\tfprocess: %q\n", config.WatchdogMode(watchdogConfig.OperationalMode), watchdogConfig.FunctionProcess)

httpMetrics := metrics.NewHttp()
http.HandleFunc("/", metrics.InstrumentHandler(requestHandler, httpMetrics))
Expand Down Expand Up @@ -242,10 +245,6 @@ func makeSerializingForkRequestHandler(watchdogConfig config.WatchdogConfig, log
environment = getEnvironment(r)
}

path := "/"
if r.URL != nil {
path = r.URL.Path
}
commandName, arguments := watchdogConfig.Process()
req := executor.FunctionRequest{
Process: commandName,
Expand All @@ -254,7 +253,9 @@ func makeSerializingForkRequestHandler(watchdogConfig config.WatchdogConfig, log
ContentLength: &r.ContentLength,
OutputWriter: w,
Environment: environment,
Path: path,
RequestURI: r.RequestURI,
Method: r.Method,
UserAgent: r.UserAgent(),
}

w.Header().Set("Content-Type", watchdogConfig.ContentType)
Expand All @@ -280,18 +281,19 @@ func makeStreamingRequestHandler(watchdogConfig config.WatchdogConfig, prefixLog
environment = getEnvironment(r)
}

path := "/"
if r.URL != nil {
path = r.URL.Path
}
ww := WriterCounter{}
ww.setWriter(w)
start := time.Now()
commandName, arguments := watchdogConfig.Process()
req := executor.FunctionRequest{
Process: commandName,
ProcessArgs: arguments,
InputReader: r.Body,
OutputWriter: w,
OutputWriter: &ww,
Environment: environment,
Path: path,
RequestURI: r.RequestURI,
Method: r.Method,
UserAgent: r.UserAgent(),
}

w.Header().Set("Content-Type", watchdogConfig.ContentType)
Expand All @@ -301,6 +303,16 @@ func makeStreamingRequestHandler(watchdogConfig config.WatchdogConfig, prefixLog

// Cannot write a status code to the client because we
// already have written a header
done := time.Since(start)
if !strings.HasPrefix(req.UserAgent, "kube-probe") {
log.Printf("%s %s - %d - ContentLength: %s (%.4fs)", req.Method, req.RequestURI, http.StatusInternalServerError, units.HumanSize(float64(ww.Bytes())), done.Seconds())
return
}
}

done := time.Since(start)
if !strings.HasPrefix(req.UserAgent, "kube-probe") {
log.Printf("%s %s - %d - ContentLength: %s (%.4fs)", req.Method, req.RequestURI, http.StatusOK, units.HumanSize(float64(ww.Bytes())), done.Seconds())
}
}
}
Expand Down Expand Up @@ -357,16 +369,11 @@ func makeHTTPRequestHandler(watchdogConfig config.WatchdogConfig, prefixLogs boo

return func(w http.ResponseWriter, r *http.Request) {

path := "/"
if r.URL != nil {
path = r.URL.Path
}
req := executor.FunctionRequest{
Process: commandName,
ProcessArgs: arguments,
InputReader: r.Body,
OutputWriter: w,
Path: path,
}

if r.Body != nil {
Expand Down Expand Up @@ -424,3 +431,26 @@ func printVersion() {

log.Printf("Version: %v\tSHA: %v\n", BuildVersion(), sha)
}

type WriterCounter struct {
w io.Writer
bytes int64
}

func (nc *WriterCounter) setWriter(w io.Writer) {
nc.w = w
}

func (nc *WriterCounter) Bytes() int64 {
return nc.bytes
}

func (nc *WriterCounter) Write(p []byte) (int, error) {
n, err := nc.w.Write(p)
if err != nil {
return n, err
}

nc.bytes += int64(n)
return n, err
}
Loading

0 comments on commit aa898b7

Please sign in to comment.