Skip to content

Commit

Permalink
Allow watchdog to exit earlier
Browse files Browse the repository at this point in the history
This change is a mirror image of:

openfaas/of-watchdog#125

It has been tested in of-watchdog and allows the graceful
shutdown sequence to exit earlier than previously, if all
active connections have completed. It also adds an
in-flight HTTP metric.

Signed-off-by: Alex Ellis (OpenFaaS Ltd) <[email protected]>
  • Loading branch information
alexellis committed Nov 2, 2021
1 parent f294c84 commit 56bf6aa
Show file tree
Hide file tree
Showing 11 changed files with 734 additions and 35 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ jobs:
build:
strategy:
matrix:
go-version: [ 1.15.x ]
go-version: [ 1.16.x ]
os: [ ubuntu-latest ]
runs-on: ${{ matrix.os }}
steps:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/publish.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ jobs:
publish:
strategy:
matrix:
go-version: [ 1.15.x ]
go-version: [ 1.16.x ]
os: [ ubuntu-latest ]
runs-on: ${{ matrix.os }}
steps:
Expand Down
9 changes: 9 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -98,12 +98,21 @@ The watchdog can be configured through environmental variables. You must always
| `content_type` | Force a specific Content-Type response for all responses |
| `write_timeout` | HTTP timeout for writing a response body from your function (in seconds) |
| `read_timeout` | HTTP timeout for reading the payload from the client caller (in seconds) |
| `healthcheck_interval` | Interval (in seconds) for HTTP healthcheck by container orchestrator i.e. kubelet. Used for graceful shutdowns. |
| `suppress_lock` | The watchdog will attempt to write a lockfile to /tmp/ for swarm healthchecks - set this to true to disable behaviour. |
| `exec_timeout` | Hard timeout for process exec'd for each incoming request (in seconds). Disabled if set to 0 |
| `write_debug` | Write all output, error messages, and additional information to the logs. Default is false |
| `combine_output` | True by default - combines stdout/stderr in function response, when set to false `stderr` is written to the container logs and stdout is used for function response |
| `max_inflight` | Limit the maximum number of requests in flight |

## Metrics

| Name | Description | Type |
|---------------------------------|-------------------------|------------------------|
| http_requests_total | Total number of requests | Counter |
| http_request_duration_seconds | Duration of requests | Histogram |
| http_requests_in_flight | Number of requests in-flight | Gauge |

## Advanced / tuning

### (New) of-watchdog and HTTP mode
Expand Down
55 changes: 31 additions & 24 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/openfaas/classic-watchdog/metrics"
"github.com/openfaas/classic-watchdog/types"
"github.com/prometheus/client_golang/prometheus/testutil"
)

var (
Expand Down Expand Up @@ -67,6 +68,7 @@ func main() {

readTimeout := config.readTimeout
writeTimeout := config.writeTimeout
healthcheckInterval := config.healthcheckInterval

s := &http.Server{
Addr: fmt.Sprintf(":%d", config.port),
Expand All @@ -77,10 +79,11 @@ func main() {

httpMetrics := metrics.NewHttp()

log.Printf("Timeouts: read: %s, write: %s hard: %s.\n",
log.Printf("Timeouts: read: %s write: %s hard: %s health: %s.\n",
readTimeout,
writeTimeout,
config.execTimeout)
config.execTimeout,
healthcheckInterval)
log.Printf("Listening on port: %d\n", config.port)

http.HandleFunc("/_/health", makeHealthHandler())
Expand All @@ -93,24 +96,14 @@ func main() {

go metricsServer.Serve(cancel)

shutdownTimeout := config.writeTimeout
listenUntilShutdown(shutdownTimeout, s, config.suppressLock)
}

func markUnhealthy() error {
atomic.StoreInt32(&acceptingConnections, 0)

path := filepath.Join(os.TempDir(), ".lock")
log.Printf("Removing lock-file : %s\n", path)
removeErr := os.Remove(path)
return removeErr
listenUntilShutdown(s, healthcheckInterval, writeTimeout, config.suppressLock, &httpMetrics)
}

// listenUntilShutdown will listen for HTTP requests until SIGTERM
// is sent at which point the code will wait `shutdownTimeout` before
// closing off connections and a futher `shutdownTimeout` before
// exiting
func listenUntilShutdown(shutdownTimeout time.Duration, s *http.Server, suppressLock bool) {
func listenUntilShutdown(s *http.Server, healthcheckInterval time.Duration, writeTimeout time.Duration, suppressLock bool, httpMetrics *metrics.Http) {

idleConnsClosed := make(chan struct{})
go func() {
Expand All @@ -119,24 +112,29 @@ func listenUntilShutdown(shutdownTimeout time.Duration, s *http.Server, suppress

<-sig

log.Printf("SIGTERM received.. shutting down server in %s\n", shutdownTimeout.String())
log.Printf("SIGTERM: no new connections in %s\n", healthcheckInterval.String())

healthErr := markUnhealthy()

if healthErr != nil {
log.Printf("Unable to mark unhealthy during shutdown: %s\n", healthErr.Error())
if err := markUnhealthy(); err != nil {
log.Printf("Unable to mark server as unhealthy: %s\n", err.Error())
}

<-time.Tick(shutdownTimeout)
<-time.Tick(healthcheckInterval)

connections := int64(testutil.ToFloat64(httpMetrics.InFlight))
log.Printf("No new connections allowed, draining: %d requests\n", connections)

if err := s.Shutdown(context.Background()); err != nil {
// Error from closing listeners, or context timeout:
// The maximum time to wait for active connections whilst shutting down is
// equivalent to the maximum execution time i.e. writeTimeout.
ctx, cancel := context.WithTimeout(context.Background(), writeTimeout)
defer cancel()

if err := s.Shutdown(ctx); err != nil {
log.Printf("Error in Shutdown: %v", err)
}

log.Printf("No new connections allowed. Exiting in: %s\n", shutdownTimeout.String())
connections = int64(testutil.ToFloat64(httpMetrics.InFlight))

<-time.Tick(shutdownTimeout)
log.Printf("Exiting. Active connections: %d\n", connections)

close(idleConnsClosed)
}()
Expand Down Expand Up @@ -164,6 +162,15 @@ func listenUntilShutdown(shutdownTimeout time.Duration, s *http.Server, suppress
<-idleConnsClosed
}

func markUnhealthy() error {
atomic.StoreInt32(&acceptingConnections, 0)

path := filepath.Join(os.TempDir(), ".lock")
log.Printf("Removing lock-file : %s\n", path)
removeErr := os.Remove(path)
return removeErr
}

func printVersion() {
sha := "unknown"
if len(GitCommit) > 0 {
Expand Down
12 changes: 11 additions & 1 deletion metrics/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@ import (
type Http struct {
RequestsTotal *prometheus.CounterVec
RequestDurationHistogram *prometheus.HistogramVec
InFlight prometheus.Gauge
}

func NewHttp() Http {
return Http{
h := Http{
RequestsTotal: promauto.NewCounterVec(prometheus.CounterOpts{
Subsystem: "http",
Name: "requests_total",
Expand All @@ -23,5 +24,14 @@ func NewHttp() Http {
Help: "Seconds spent serving HTTP requests.",
Buckets: prometheus.DefBuckets,
}, []string{"code", "method"}),
InFlight: promauto.NewGauge(prometheus.GaugeOpts{
Subsystem: "http",
Name: "requests_in_flight",
Help: "total HTTP requests in-flight",
}),
}

// Default to 0 for queries during graceful shutdown.
h.InFlight.Set(0)
return h
}
20 changes: 12 additions & 8 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,18 +48,22 @@ func (m *MetricsServer) Serve(cancel chan bool) {
}()

go func() {
select {
case <-cancel:
log.Printf("metrics server shutdown\n")

m.s.Shutdown(context.Background())
}
<-cancel
log.Printf("metrics server shutdown\n")
m.s.Shutdown(context.Background())
}()
}

// InstrumentHandler returns a handler which records HTTP requests
// as they are made
func InstrumentHandler(next http.Handler, _http Http) http.HandlerFunc {
return promhttp.InstrumentHandlerCounter(_http.RequestsTotal,
promhttp.InstrumentHandlerDuration(_http.RequestDurationHistogram, next))
return func(w http.ResponseWriter, r *http.Request) {
then := promhttp.InstrumentHandlerCounter(_http.RequestsTotal,
promhttp.InstrumentHandlerDuration(_http.RequestDurationHistogram, next))

_http.InFlight.Inc()
defer _http.InFlight.Dec()

then(w, r)
}
}
5 changes: 5 additions & 0 deletions readconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ func (ReadConfig) Read(hasEnv HasEnv) WatchdogConfig {

cfg.readTimeout = parseIntOrDurationValue(hasEnv.Getenv("read_timeout"), time.Second*5)
cfg.writeTimeout = parseIntOrDurationValue(hasEnv.Getenv("write_timeout"), time.Second*5)
cfg.healthcheckInterval = parseIntOrDurationValue(hasEnv.Getenv("healthcheck_interval"), cfg.writeTimeout)

cfg.execTimeout = parseIntOrDurationValue(hasEnv.Getenv("exec_timeout"), time.Second*0)
cfg.port = parseIntValue(hasEnv.Getenv("port"), 8080)
Expand Down Expand Up @@ -106,6 +107,10 @@ type WatchdogConfig struct {
// HTTP write timeout
writeTimeout time.Duration

// healthcheckInterval is the interval that an external service runs its health checks to
// detect health and remove the watchdog from its pool of endpoints
healthcheckInterval time.Duration

// faasProcess is the process to exec
faasProcess string

Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 56bf6aa

Please sign in to comment.