Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

prometheus: Add ip labels to server metrics #730

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,21 @@ Additional great feature of interceptors is the fact we can chain those. For exa

```go mdox-exec="sed -n '136,151p' examples/server/main.go"
grpcSrv := grpc.NewServer(
grpc.StatsHandler(otelgrpc.NewServerHandler()),
grpc.ChainUnaryInterceptor(
// Order matters e.g. tracing interceptor have to create span first for the later exemplars to work.
otelgrpc.UnaryServerInterceptor(),
srvMetrics.UnaryServerInterceptor(grpcprom.WithExemplarFromContext(exemplarFromContext)),
logging.UnaryServerInterceptor(interceptorLogger(rpcLogger), logging.WithFieldsFromContext(logTraceID)),
selector.UnaryServerInterceptor(auth.UnaryServerInterceptor(authFn), selector.MatchFunc(allButHealthZ)),
recovery.UnaryServerInterceptor(recovery.WithRecoveryHandler(grpcPanicRecoveryHandler)),
),
grpc.ChainStreamInterceptor(
otelgrpc.StreamServerInterceptor(),
srvMetrics.StreamServerInterceptor(grpcprom.WithExemplarFromContext(exemplarFromContext)),
logging.StreamServerInterceptor(interceptorLogger(rpcLogger), logging.WithFieldsFromContext(logTraceID)),
selector.StreamServerInterceptor(auth.StreamServerInterceptor(authFn), selector.MatchFunc(allButHealthZ)),
recovery.StreamServerInterceptor(recovery.WithRecoveryHandler(grpcPanicRecoveryHandler)),
),
)
t := &testpb.TestPingService{}
```

This pattern offers clean and explicit shared functionality for all your gRPC methods. Full, buildable examples can be found in [examples](examples) directory.
Expand Down
81 changes: 62 additions & 19 deletions providers/prometheus/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,20 @@ package prometheus

import (
"context"
"net/netip"
"time"

"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors"
"github.com/prometheus/client_golang/prometheus"
grpcpeer "google.golang.org/grpc/peer"
)

type reporter struct {
clientMetrics *ClientMetrics
serverMetrics *ServerMetrics
clientMetrics *ClientMetrics
serverMetrics *ServerMetrics

grpcServerIP string
grpcClientIP string
typ interceptors.GRPCType
service, method string
kind Kind
Expand All @@ -28,9 +33,17 @@ func (r *reporter) PostCall(err error, rpcDuration time.Duration) {
// perform handling of metrics from code
switch r.kind {
case KindServer:
r.incrementWithExemplar(r.serverMetrics.serverHandledCounter, string(r.typ), r.service, r.method, code.String())
lvals := []string{string(r.typ), r.service, r.method, code.String()}
if r.serverMetrics.ipLabelsEnabled {
lvals = append(lvals, r.grpcServerIP, r.grpcClientIP)
}
r.incrementWithExemplar(r.serverMetrics.serverHandledCounter, lvals...)
if r.serverMetrics.serverHandledHistogram != nil {
r.observeWithExemplar(r.serverMetrics.serverHandledHistogram, rpcDuration.Seconds(), string(r.typ), r.service, r.method)
lvals = []string{string(r.typ), r.service, r.method}
if r.serverMetrics.ipLabelsEnabled {
lvals = append(lvals, r.grpcServerIP, r.grpcClientIP)
}
r.observeWithExemplar(r.serverMetrics.serverHandledHistogram, rpcDuration.Seconds(), lvals...)
}

case KindClient:
Expand All @@ -44,7 +57,11 @@ func (r *reporter) PostCall(err error, rpcDuration time.Duration) {
func (r *reporter) PostMsgSend(_ any, _ error, sendDuration time.Duration) {
switch r.kind {
case KindServer:
r.incrementWithExemplar(r.serverMetrics.serverStreamMsgSent, string(r.typ), r.service, r.method)
lvals := []string{string(r.typ), r.service, r.method}
if r.serverMetrics.ipLabelsEnabled {
lvals = append(lvals, r.grpcServerIP, r.grpcClientIP)
}
r.incrementWithExemplar(r.serverMetrics.serverStreamMsgSent, lvals...)
case KindClient:
r.incrementWithExemplar(r.clientMetrics.clientStreamMsgSent, string(r.typ), r.service, r.method)
if r.clientMetrics.clientStreamSendHistogram != nil {
Expand All @@ -56,7 +73,11 @@ func (r *reporter) PostMsgSend(_ any, _ error, sendDuration time.Duration) {
func (r *reporter) PostMsgReceive(_ any, _ error, recvDuration time.Duration) {
switch r.kind {
case KindServer:
r.incrementWithExemplar(r.serverMetrics.serverStreamMsgReceived, string(r.typ), r.service, r.method)
lvals := []string{string(r.typ), r.service, r.method}
if r.serverMetrics.ipLabelsEnabled {
lvals = append(lvals, r.grpcServerIP, r.grpcClientIP)
}
r.incrementWithExemplar(r.serverMetrics.serverStreamMsgReceived, lvals...)
case KindClient:
r.incrementWithExemplar(r.clientMetrics.clientStreamMsgReceived, string(r.typ), r.service, r.method)
if r.clientMetrics.clientStreamRecvHistogram != nil {
Expand All @@ -65,6 +86,14 @@ func (r *reporter) PostMsgReceive(_ any, _ error, recvDuration time.Duration) {
}
}

func (r *reporter) incrementWithExemplar(c *prometheus.CounterVec, lvals ...string) {
c.WithLabelValues(lvals...).(prometheus.ExemplarAdder).AddWithExemplar(1, r.exemplar)
}

func (r *reporter) observeWithExemplar(h *prometheus.HistogramVec, value float64, lvals ...string) {
h.WithLabelValues(lvals...).(prometheus.ExemplarObserver).ObserveWithExemplar(value, r.exemplar)
}

type reportable struct {
clientMetrics *ClientMetrics
serverMetrics *ServerMetrics
Expand All @@ -86,10 +115,11 @@ func (rep *reportable) reporter(ctx context.Context, sm *ServerMetrics, cm *Clie
r := &reporter{
clientMetrics: cm,
serverMetrics: sm,
typ: meta.Typ,
service: meta.Service,
method: meta.Method,
kind: kind,

typ: meta.Typ,
service: meta.Service,
method: meta.Method,
kind: kind,
}
if c.exemplarFn != nil {
r.exemplar = c.exemplarFn(ctx)
Expand All @@ -99,15 +129,28 @@ func (rep *reportable) reporter(ctx context.Context, sm *ServerMetrics, cm *Clie
case KindClient:
r.incrementWithExemplar(r.clientMetrics.clientStartedCounter, string(r.typ), r.service, r.method)
case KindServer:
r.incrementWithExemplar(r.serverMetrics.serverStartedCounter, string(r.typ), r.service, r.method)

lvals := []string{string(r.typ), r.service, r.method}
if r.serverMetrics.ipLabelsEnabled {
if peer, ok := grpcpeer.FromContext(ctx); ok {
// Fallback to net.Addr.String() when ParseAddrPort failed, because it already contains
// necessary information to be added to the label and we

// This is server side, so LocalAddr is server's address.
if addrPort, e := netip.ParseAddrPort(peer.LocalAddr.String()); e != nil {
r.grpcServerIP = peer.LocalAddr.String()
} else {
r.grpcServerIP = addrPort.Addr().String()
}
if addrPort, e := netip.ParseAddrPort(peer.Addr.String()); e != nil {
r.grpcClientIP = peer.Addr.String()
} else {
r.grpcClientIP = addrPort.Addr().String()
}
}
lvals = append(lvals, r.grpcServerIP, r.grpcClientIP)
}
r.incrementWithExemplar(r.serverMetrics.serverStartedCounter, lvals...)
}
return r, ctx
}

func (r *reporter) incrementWithExemplar(c *prometheus.CounterVec, lvals ...string) {
c.WithLabelValues(lvals...).(prometheus.ExemplarAdder).AddWithExemplar(1, r.exemplar)
}

func (r *reporter) observeWithExemplar(h *prometheus.HistogramVec, value float64, lvals ...string) {
h.WithLabelValues(lvals...).(prometheus.ExemplarObserver).ObserveWithExemplar(value, r.exemplar)
}
49 changes: 38 additions & 11 deletions providers/prometheus/server_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
// ServerMetrics represents a collection of metrics to be registered on a
// Prometheus metrics registry for a gRPC server.
type ServerMetrics struct {
ipLabelsEnabled bool
serverStartedCounter *prometheus.CounterVec
serverHandledCounter *prometheus.CounterVec
serverStreamMsgReceived *prometheus.CounterVec
Expand All @@ -27,29 +28,43 @@ type ServerMetrics struct {
func NewServerMetrics(opts ...ServerMetricsOption) *ServerMetrics {
var config serverMetricsConfig
config.apply(opts)
return &ServerMetrics{

addIPLables := func(orig []string) []string {
if config.ipLabelsEnabled {
return serverMetricAddIPLabel(orig)
}
return orig
}

sm := &ServerMetrics{
ipLabelsEnabled: config.ipLabelsEnabled,
serverStartedCounter: prometheus.NewCounterVec(
config.counterOpts.apply(prometheus.CounterOpts{
Name: "grpc_server_started_total",
Help: "Total number of RPCs started on the server.",
}), []string{"grpc_type", "grpc_service", "grpc_method"}),
}), addIPLables([]string{"grpc_type", "grpc_service", "grpc_method"})),
serverHandledCounter: prometheus.NewCounterVec(
config.counterOpts.apply(prometheus.CounterOpts{
Name: "grpc_server_handled_total",
Help: "Total number of RPCs completed on the server, regardless of success or failure.",
}), []string{"grpc_type", "grpc_service", "grpc_method", "grpc_code"}),
}), addIPLables([]string{"grpc_type", "grpc_service", "grpc_method", "grpc_code"})),
serverStreamMsgReceived: prometheus.NewCounterVec(
config.counterOpts.apply(prometheus.CounterOpts{
Name: "grpc_server_msg_received_total",
Help: "Total number of RPC stream messages received on the server.",
}), []string{"grpc_type", "grpc_service", "grpc_method"}),
}), addIPLables([]string{"grpc_type", "grpc_service", "grpc_method"})),
serverStreamMsgSent: prometheus.NewCounterVec(
config.counterOpts.apply(prometheus.CounterOpts{
Name: "grpc_server_msg_sent_total",
Help: "Total number of gRPC stream messages sent by the server.",
}), []string{"grpc_type", "grpc_service", "grpc_method"}),
serverHandledHistogram: config.serverHandledHistogram,
}), addIPLables([]string{"grpc_type", "grpc_service", "grpc_method"})),
}
if config.serverHandledHistogramEnabled {
sm.serverHandledHistogram = newServerHandlingTimeHistogram(
config.ipLabelsEnabled, config.serverHandledHistogramOptions)
}

return sm
}

// Describe sends the super-set of all possible descriptors of metrics
Expand Down Expand Up @@ -95,15 +110,27 @@ func (m *ServerMetrics) InitializeMetrics(server reflection.ServiceInfoProvider)
func (m *ServerMetrics) preRegisterMethod(serviceName string, mInfo *grpc.MethodInfo) {
methodName := mInfo.Name
methodType := string(typeFromMethodInfo(mInfo))

lvals := []string{methodType, serviceName, methodName}
if m.ipLabelsEnabled {
// Because netip.Addr.String() returns "invalid IP" for zero Addr,
// we use this value with grpc_server and grpc_client.
lvals = append(lvals, "invalid IP", "invalid IP")
}
// These are just references (no increments), as just referencing will create the labels but not set values.
_, _ = m.serverStartedCounter.GetMetricWithLabelValues(methodType, serviceName, methodName)
_, _ = m.serverStreamMsgReceived.GetMetricWithLabelValues(methodType, serviceName, methodName)
_, _ = m.serverStreamMsgSent.GetMetricWithLabelValues(methodType, serviceName, methodName)
_, _ = m.serverStartedCounter.GetMetricWithLabelValues(lvals...)
_, _ = m.serverStreamMsgReceived.GetMetricWithLabelValues(lvals...)
_, _ = m.serverStreamMsgSent.GetMetricWithLabelValues(lvals...)
if m.serverHandledHistogram != nil {
_, _ = m.serverHandledHistogram.GetMetricWithLabelValues(methodType, serviceName, methodName)
_, _ = m.serverHandledHistogram.GetMetricWithLabelValues(lvals...)
}

for _, code := range interceptors.AllCodes {
_, _ = m.serverHandledCounter.GetMetricWithLabelValues(methodType, serviceName, methodName, code.String())
lvals = []string{methodType, serviceName, methodName, code.String()}
if m.ipLabelsEnabled {
lvals = append(lvals, "invalid IP", "invalid IP")
}
_, _ = m.serverHandledCounter.GetMetricWithLabelValues(lvals...)
}
}

Expand Down
45 changes: 35 additions & 10 deletions providers/prometheus/server_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,23 @@ import (
"github.com/prometheus/client_golang/prometheus"
)

func serverMetricAddIPLabel(orig []string) []string {
return append(orig, "grpc_server_ip", "grpc_client_ip")
}

type exemplarFromCtxFn func(ctx context.Context) prometheus.Labels

type serverMetricsConfig struct {
// ipLabelsEnabled control whether to add grpc_server and grpc_client labels to metrics.
ipLabelsEnabled bool

counterOpts counterOptions
// serverHandledHistogram can be nil.
serverHandledHistogram *prometheus.HistogramVec

serverHandledHistogramEnabled bool
serverHandledHistogramOptions []HistogramOption
}

// ServerMetricsOption configures how we set up the server metrics.
type ServerMetricsOption func(*serverMetricsConfig)

func (c *serverMetricsConfig) apply(opts []ServerMetricsOption) {
Expand All @@ -32,17 +41,33 @@ func WithServerCounterOptions(opts ...CounterOption) ServerMetricsOption {
}
}

func newServerHandlingTimeHistogram(ipLabelsEnabled bool, opts []HistogramOption) *prometheus.HistogramVec {
labels := []string{"grpc_type", "grpc_service", "grpc_method"}
if ipLabelsEnabled {
labels = serverMetricAddIPLabel(labels)
}
return prometheus.NewHistogramVec(
histogramOptions(opts).apply(prometheus.HistogramOpts{
Name: "grpc_server_handling_seconds",
Help: "Histogram of response latency (seconds) of gRPC that had been application-level handled by the server.",
Buckets: prometheus.DefBuckets,
}),
labels,
)
}

// WithServerHandlingTimeHistogram turns on recording of handling time of RPCs.
// Histogram metrics can be very expensive for Prometheus to retain and query.
func WithServerHandlingTimeHistogram(opts ...HistogramOption) ServerMetricsOption {
return func(o *serverMetricsConfig) {
o.serverHandledHistogram = prometheus.NewHistogramVec(
histogramOptions(opts).apply(prometheus.HistogramOpts{
Name: "grpc_server_handling_seconds",
Help: "Histogram of response latency (seconds) of gRPC that had been application-level handled by the server.",
Buckets: prometheus.DefBuckets,
}),
[]string{"grpc_type", "grpc_service", "grpc_method"},
)
o.serverHandledHistogramEnabled = true
o.serverHandledHistogramOptions = opts
}
}

// WithServerIPLabelsEnabled enables adding grpc_server and grpc_client labels to metrics.
func WithServerIPLabelsEnabled() ServerMetricsOption {
return func(o *serverMetricsConfig) {
o.ipLabelsEnabled = true
}
}
Loading
Loading