Skip to content

Commit

Permalink
feat: expose metrics from the kubenurse httpclient (#31)
Browse files Browse the repository at this point in the history
The following new metrics were added:
* kubenurse_httpclient_requests_total - Total issued requests by kubenurse, partitioned by http code/method.
* kubenurse_httpclient_trace_request_duration_seconds - Latency histogram for requests from the kubenurse httpclient, partitioned by event.
* httpclient_request_duration_seconds - Latency histogram of request latencies from the kubenurse httpclient.
  • Loading branch information
ghouscht authored Jan 17, 2022
1 parent 9f02d56 commit ebb0764
Show file tree
Hide file tree
Showing 6 changed files with 154 additions and 87 deletions.
25 changes: 2 additions & 23 deletions internal/kubenurse/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ package kubenurse
import (
"context"
"fmt"
"log"
"net/http"
"os"
"sync"
Expand All @@ -29,8 +28,6 @@ type Server struct {
useTLS bool
// If we want to consider kubenurses on unschedulable nodes
allowUnschedulable bool
extraCA string
insecure bool

// Mutex to protect ready flag
mu *sync.Mutex
Expand All @@ -39,15 +36,13 @@ type Server struct {

// New creates a new kubenurse server. The server can be configured with the following environment variables:
// * KUBENURSE_USE_TLS
// * KUBENURSE_ALLOW_UNSCHEDULABL
// * KUBENURSE_ALLOW_UNSCHEDULABLE
// * KUBENURSE_INGRESS_URL
// * KUBENURSE_SERVICE_URL
// * KUBERNETES_SERVICE_HOST
// * KUBERNETES_SERVICE_PORT
// * KUBENURSE_NAMESPACE
// * KUBENURSE_NEIGHBOUR_FILTER
// * KUBENURSE_EXTRA_CA
// * KUBENURSE_INSECURE
func New(ctx context.Context, k8s kubernetes.Interface) (*Server, error) {
mux := http.NewServeMux()

Expand All @@ -64,8 +59,6 @@ func New(ctx context.Context, k8s kubernetes.Interface) (*Server, error) {
//nolint:goconst // No need to make "true" a constant in my opinion, readability is better like this.
useTLS: os.Getenv("KUBENURSE_USE_TLS") == "true",
allowUnschedulable: os.Getenv("KUBENURSE_ALLOW_UNSCHEDULABLE") == "true",
extraCA: os.Getenv("KUBENURSE_EXTRA_CA"),
insecure: os.Getenv("KUBENURSE_INSECURE") == "true",

mu: new(sync.Mutex),
ready: true,
Expand All @@ -77,26 +70,13 @@ func New(ctx context.Context, k8s kubernetes.Interface) (*Server, error) {
collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}),
)

// setup http transport
transport, err := server.generateRoundTripper()
if err != nil {
log.Printf("using default transport: %s", err)

transport = http.DefaultTransport
}

httpClient := &http.Client{
Timeout: 5 * time.Second,
Transport: transport,
}

discovery, err := kubediscovery.New(ctx, k8s, server.allowUnschedulable)
if err != nil {
return nil, fmt.Errorf("create k8s discovery client: %w", err)
}

// setup checker
chk, err := servicecheck.New(ctx, httpClient, discovery, promRegistry, server.allowUnschedulable, 3*time.Second)
chk, err := servicecheck.New(ctx, discovery, promRegistry, server.allowUnschedulable, 3*time.Second)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -134,7 +114,6 @@ func (s *Server) Run() error {
defer wg.Done()

s.checker.RunScheduled(5 * time.Second)
log.Printf("checker exited")
}()

wg.Add(1)
Expand Down
55 changes: 0 additions & 55 deletions internal/kubenurse/transport.go

This file was deleted.

83 changes: 83 additions & 0 deletions internal/servicecheck/httptrace.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package servicecheck

import (
"net/http"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
)

func withRequestTracing(registry *prometheus.Registry, transport http.RoundTripper) http.RoundTripper {
counter := prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: metricsNamespace,
Name: "httpclient_requests_total",
Help: "A counter for requests from the kubenurse http client.",
},
[]string{"code", "method"},
)

latencyVec := prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: metricsNamespace,
Name: "httpclient_trace_request_duration_seconds",
Help: "Latency histogram for requests from the kubenurse http client. Time in seconds since the start of the http request.",
Buckets: []float64{.0005, .005, .01, .025, .05, .1, .25, .5, 1},
},
[]string{"event"},
)

// histVec has no labels, making it a zero-dimensional ObserverVec.
histVec := prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: metricsNamespace,
Name: "httpclient_request_duration_seconds",
Help: "A latency histogram of request latencies from the kubenurse http client.",
Buckets: prometheus.DefBuckets,
},
[]string{},
)

// Register all of the metrics in the standard registry.
registry.MustRegister(counter, latencyVec, histVec)

// Define functions for the available httptrace.ClientTrace hook
// functions that we want to instrument.
trace := &promhttp.InstrumentTrace{
DNSStart: func(t float64) {
latencyVec.WithLabelValues("dns_start").Observe(t)
},
DNSDone: func(t float64) {
latencyVec.WithLabelValues("dns_done").Observe(t)
},
ConnectStart: func(t float64) {
latencyVec.WithLabelValues("connect_start").Observe(t)
},
ConnectDone: func(t float64) {
latencyVec.WithLabelValues("connect_done").Observe(t)
},
TLSHandshakeStart: func(t float64) {
latencyVec.WithLabelValues("tls_handshake_start").Observe(t)
},
TLSHandshakeDone: func(t float64) {
latencyVec.WithLabelValues("tls_handshake_done").Observe(t)
},
WroteRequest: func(t float64) {
latencyVec.WithLabelValues("wrote_request").Observe(t)
},
GotFirstResponseByte: func(t float64) {
latencyVec.WithLabelValues("got_first_resp_byte").Observe(t)
},
}

// Wrap the default RoundTripper with middleware.
roundTripper := promhttp.InstrumentRoundTripperCounter(counter,
promhttp.InstrumentRoundTripperTrace(trace,
promhttp.InstrumentRoundTripperDuration(histVec,
transport,
),
),
)

return roundTripper
}
31 changes: 24 additions & 7 deletions internal/servicecheck/servicecheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,32 +6,36 @@ import (
"fmt"
"log"
"net/http"
"os"
"time"

"github.com/postfinance/kubenurse/internal/kubediscovery"
"github.com/prometheus/client_golang/prometheus"
)

const (
okStr = "ok"
errStr = "error"
okStr = "ok"
errStr = "error"
metricsNamespace = "kubenurse"
)

// New configures the checker with a httpClient and a cache timeout for check
// results. Other parameters of the Checker struct need to be configured separately.
func New(ctx context.Context, httpClient *http.Client, discovery *kubediscovery.Client,
promRegistry *prometheus.Registry, allowUnschedulable bool, cacheTTL time.Duration) (*Checker, error) {
func New(ctx context.Context, discovery *kubediscovery.Client, promRegistry *prometheus.Registry,
allowUnschedulable bool, cacheTTL time.Duration) (*Checker, error) {
errorCounter := prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "kubenurse_errors_total",
Help: "Kubenurse error counter partitioned by error type",
Namespace: metricsNamespace,
Name: "errors_total",
Help: "Kubenurse error counter partitioned by error type",
},
[]string{"type"},
)

durationSummary := prometheus.NewSummaryVec(
prometheus.SummaryOpts{
Name: "kubenurse_request_duration",
Namespace: metricsNamespace,
Name: "request_duration",
Help: "Kubenurse request duration partitioned by error type",
MaxAge: 1 * time.Minute,
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
Expand All @@ -41,6 +45,19 @@ func New(ctx context.Context, httpClient *http.Client, discovery *kubediscovery.

promRegistry.MustRegister(errorCounter, durationSummary)

// setup http transport
transport, err := generateRoundTripper(os.Getenv("KUBENURSE_EXTRA_CA"), os.Getenv("KUBENURSE_INSECURE") == "true")
if err != nil {
log.Printf("using default transport: %s", err)

transport = http.DefaultTransport
}

httpClient := &http.Client{
Timeout: 5 * time.Second,
Transport: withRequestTracing(promRegistry, transport),
}

return &Checker{
allowUnschedulable: allowUnschedulable,
discovery: discovery,
Expand Down
3 changes: 1 addition & 2 deletions internal/servicecheck/servicecheck_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package servicecheck

import (
"context"
"net/http"
"testing"
"time"

Expand Down Expand Up @@ -41,7 +40,7 @@ func TestCombined(t *testing.T) {
discovery, err := kubediscovery.New(context.Background(), fakeClient, false)
r.NoError(err)

checker, err := New(context.Background(), http.DefaultClient, discovery, prometheus.NewRegistry(), false, 3*time.Second)
checker, err := New(context.Background(), discovery, prometheus.NewRegistry(), false, 3*time.Second)
r.NoError(err)
r.NotNil(checker)

Expand Down
44 changes: 44 additions & 0 deletions internal/servicecheck/transport.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package servicecheck

import (
"crypto/tls"
"crypto/x509"
"errors"
"fmt"
"net/http"
Expand All @@ -11,6 +13,7 @@ import (
const (
//nolint:gosec // This is the well-known path to Kubernetes serviceaccount tokens.
tokenFile = "/var/run/secrets/kubernetes.io/serviceaccount/token"
caFile = "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt"
)

// doRequest does an http request only to get the http status code
Expand Down Expand Up @@ -42,3 +45,44 @@ func (c *Checker) doRequest(url string) (string, error) {

return resp.Status, errors.New(resp.Status)
}

// generateRoundTripper returns a custom http.RoundTripper, including the k8s CA.
func generateRoundTripper(extraCA string, insecure bool) (http.RoundTripper, error) {
// Append default certpool
rootCAs, _ := x509.SystemCertPool()
if rootCAs == nil {
rootCAs = x509.NewCertPool()
}

// Append ServiceAccount cacert
caCert, err := os.ReadFile(caFile)
if err != nil {
return nil, fmt.Errorf("could not load certificate %s: %w", caFile, err)
}

if ok := rootCAs.AppendCertsFromPEM(caCert); !ok {
return nil, errors.New("could not append ca cert to system certpool")
}

// Append extra CA, if set
if extraCA != "" {
caCert, err := os.ReadFile(extraCA) //nolint:gosec // Intentionally included by the user.
if err != nil {
return nil, fmt.Errorf("could not load certificate %s: %w", extraCA, err)
}

if ok := rootCAs.AppendCertsFromPEM(caCert); !ok {
return nil, errors.New("could not append extra ca cert to system certpool")
}
}

// Configure transport
tlsConfig := &tls.Config{
InsecureSkipVerify: insecure, //nolint:gosec // Can be true if the user requested this.
RootCAs: rootCAs,
}

transport := &http.Transport{TLSClientConfig: tlsConfig}

return transport, nil
}

0 comments on commit ebb0764

Please sign in to comment.