Skip to content

Commit b5253b3

Browse files
authored
Instrument gRPC dial calls with metrics (temporalio#8162)
## What changed? Instrument gRPC dial calls with metrics monitoring connection establishment. This PR adds the following metrics: 1. ServiceDialLatency 2. ServiceDialSuccessCount 3. ServiceDialErrorCount ## Why? Currently the visibility into gRPC dial calls is limited, there are no metrics for how long establishing the connection took, or how many failures we encountered. Since `grpc.NewClient` is "lazy", in the sense that it doesn't establish connection right away, but rather on the first RPC, we need to instrument it via some hooks. This PR uses `grpc.WithContextDialer` to achieve this. gRPC library itself uses platform-specific keep alive hack to use OS defaults for TCP keepalive, but since we are on Go 1.23+ we can simply use newly introduced KeepAliveConfig. ## How did you test it? - [X] built - [X] run locally and tested manually - [X] covered by existing tests - [X] added new unit test(s) - [ ] added new functional test(s) ## Potential risks
1 parent 26f9caf commit b5253b3

File tree

10 files changed

+235
-18
lines changed

10 files changed

+235
-18
lines changed

common/metrics/metric_defs.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -607,6 +607,9 @@ var (
607607
"service_grpc_conn_active",
608608
WithDescription("Current number of gRPC's active TCP connections."),
609609
)
610+
ServiceDialLatency = NewTimerDef("service_dial_latency", WithDescription("The latency of establishing a new TCP connection."))
611+
ServiceDialSuccessCount = NewCounterDef("service_dial_success", WithDescription("Number of TCP dial attempts that successfully established a connection."))
612+
ServiceDialErrorCount = NewCounterDef("service_dial_error", WithDescription("Number of TCP dial attempts that failed to establish a connection."))
610613
ServiceLatency = NewTimerDef("service_latency")
611614
ServiceLatencyNoUserLatency = NewTimerDef("service_latency_nouserlatency")
612615
ServiceLatencyUserLatency = NewTimerDef("service_latency_userlatency")

common/resource/fx.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -337,6 +337,7 @@ func RPCFactoryProvider(
337337
cfg *config.Config,
338338
svcName primitives.ServiceName,
339339
logger log.Logger,
340+
metricsHandler metrics.Handler,
340341
tlsConfigProvider encryption.TLSConfigProvider,
341342
resolver *membership.GRPCResolver,
342343
tracingStatsHandler telemetry.ClientStatsHandler,
@@ -358,6 +359,7 @@ func RPCFactoryProvider(
358359
cfg,
359360
svcName,
360361
logger,
362+
metricsHandler,
361363
tlsConfigProvider,
362364
frontendURL,
363365
frontendHTTPURL,

common/rpc/dial_tracer.go

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
package rpc
2+
3+
import (
4+
"context"
5+
"net/http/httptrace"
6+
"time"
7+
8+
"go.temporal.io/server/common/log"
9+
"go.temporal.io/server/common/log/tag"
10+
"go.temporal.io/server/common/metrics"
11+
)
12+
13+
type dialTracer struct {
14+
address string
15+
metricsHandler metrics.Handler
16+
logger log.Logger
17+
}
18+
19+
// newDialTracer creates a dial tracer that logs errors during dial and produces metrics for different stages of the dial process (Connect)
20+
func newDialTracer(
21+
address string,
22+
mh metrics.Handler,
23+
logger log.Logger,
24+
) *dialTracer {
25+
l := log.With(
26+
logger,
27+
tag.NewStringTag("service", "client"),
28+
tag.NewStringTag("address", address),
29+
)
30+
31+
return &dialTracer{
32+
address: address,
33+
metricsHandler: mh,
34+
logger: l,
35+
}
36+
}
37+
38+
func (d *dialTracer) beginNetworkDial(ctx context.Context) (context.Context, *networkDialTrace) {
39+
ndt := &networkDialTrace{startedAt: time.Now()}
40+
41+
// Build a ClientTrace capturing TCP connections.
42+
trace := &httptrace.ClientTrace{
43+
ConnectStart: func(_, _ string) {
44+
ndt.connectStart = time.Now()
45+
},
46+
ConnectDone: func(_ string, addr string, err error) {
47+
// ConnectStart may not have been called if the address is reused
48+
if !ndt.connectStart.IsZero() {
49+
ndt.connectDuration = time.Since(ndt.connectStart)
50+
}
51+
ndt.connectAddr = addr
52+
ndt.connectErr = err
53+
},
54+
}
55+
56+
return httptrace.WithClientTrace(ctx, trace), ndt
57+
}
58+
59+
func (d *dialTracer) endNetworkDial(ndt *networkDialTrace, dialErr error) {
60+
total := time.Since(ndt.startedAt)
61+
62+
if dialErr != nil {
63+
fields := []tag.Tag{
64+
tag.NewDurationTag("totalDuration", total),
65+
tag.Error(dialErr),
66+
tag.ErrorType(dialErr),
67+
tag.NewDurationTag("connectDuration", ndt.connectDuration),
68+
tag.NewStringTag("connectAddr", ndt.connectAddr),
69+
}
70+
if ndt.connectErr != nil {
71+
fields = append(fields, tag.NewStringTag("connectErr", ndt.connectErr.Error()))
72+
}
73+
d.logger.Warn("network dial error", fields...)
74+
metrics.ServiceDialErrorCount.With(d.metricsHandler).Record(1)
75+
} else {
76+
metrics.ServiceDialSuccessCount.With(d.metricsHandler).Record(1)
77+
}
78+
79+
if ndt.connectDuration > 0 {
80+
metrics.ServiceDialLatency.With(d.metricsHandler).Record(ndt.connectDuration)
81+
}
82+
}
83+
84+
type networkDialTrace struct {
85+
startedAt time.Time
86+
87+
connectStart time.Time
88+
connectDuration time.Duration
89+
connectAddr string
90+
connectErr error
91+
}

common/rpc/dial_tracer_test.go

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
package rpc
2+
3+
import (
4+
"context"
5+
"net/http/httptrace"
6+
"testing"
7+
"time"
8+
9+
"github.com/stretchr/testify/assert"
10+
"github.com/stretchr/testify/require"
11+
"go.temporal.io/server/common/log"
12+
"go.temporal.io/server/common/metrics"
13+
)
14+
15+
func TestDialTracer_DialSuccess(t *testing.T) {
16+
t.Parallel()
17+
18+
addr := "127.0.0.1:1234"
19+
dt := newDialTracer(addr, metrics.NoopMetricsHandler, log.NewNoopLogger())
20+
21+
ctx := context.Background()
22+
ctx, ndt := dt.beginNetworkDial(ctx)
23+
24+
trace := httptrace.ContextClientTrace(ctx)
25+
require.NotNil(t, trace)
26+
27+
trace.ConnectStart("tcp", addr)
28+
rewind(&ndt.connectStart)
29+
trace.ConnectDone("tcp", addr, nil)
30+
31+
dt.endNetworkDial(ndt, nil)
32+
33+
assert.Positive(t, ndt.connectDuration)
34+
assert.Equal(t, addr, ndt.connectAddr)
35+
assert.Nil(t, ndt.connectErr)
36+
}
37+
38+
func TestDialTracer_DialError(t *testing.T) {
39+
t.Parallel()
40+
41+
addr := "127.0.0.1:1234"
42+
dt := newDialTracer(addr, metrics.NoopMetricsHandler, log.NewNoopLogger())
43+
44+
ctx := context.Background()
45+
ctx, ndt := dt.beginNetworkDial(ctx)
46+
47+
trace := httptrace.ContextClientTrace(ctx)
48+
require.NotNil(t, trace)
49+
50+
connectErr := assert.AnError
51+
trace.ConnectStart("tcp", addr)
52+
rewind(&ndt.connectStart)
53+
trace.ConnectDone("tcp", addr, connectErr)
54+
55+
dt.endNetworkDial(ndt, nil)
56+
57+
assert.Positive(t, ndt.connectDuration)
58+
assert.Equal(t, addr, ndt.connectAddr)
59+
assert.Equal(t, connectErr, ndt.connectErr)
60+
}
61+
62+
func TestDialTracer_NoConnectStart(t *testing.T) {
63+
t.Parallel()
64+
65+
addr := "127.0.0.1:1234"
66+
dt := newDialTracer(addr, metrics.NoopMetricsHandler, log.NewNoopLogger())
67+
68+
ctx := context.Background()
69+
ctx, ndt := dt.beginNetworkDial(ctx)
70+
71+
trace := httptrace.ContextClientTrace(ctx)
72+
require.NotNil(t, trace)
73+
74+
trace.ConnectDone("tcp", addr, nil)
75+
76+
dt.endNetworkDial(ndt, nil)
77+
78+
assert.Zero(t, ndt.connectDuration)
79+
assert.Equal(t, addr, ndt.connectAddr)
80+
assert.Nil(t, ndt.connectErr)
81+
}
82+
83+
// rewind moves a timestamp back by 1 millisecond to ensure positive durations in tests
84+
func rewind(ts *time.Time) {
85+
*ts = ts.Add(-time.Millisecond)
86+
}

common/rpc/grpc.go

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package rpc
33
import (
44
"context"
55
"crypto/tls"
6+
"net"
67
"time"
78

89
"go.temporal.io/server/common/headers"
@@ -46,7 +47,13 @@ const (
4647
// The hostName syntax is defined in
4748
// https://github.com/grpc/grpc/blob/master/doc/naming.md.
4849
// dns resolver is used by default
49-
func Dial(hostName string, tlsConfig *tls.Config, logger log.Logger, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
50+
func Dial(
51+
hostName string,
52+
tlsConfig *tls.Config,
53+
logger log.Logger,
54+
metricsHandler metrics.Handler,
55+
opts ...grpc.DialOption,
56+
) (*grpc.ClientConn, error) {
5057
var grpcSecureOpt grpc.DialOption
5158
if tlsConfig == nil {
5259
grpcSecureOpt = grpc.WithTransportCredentials(insecure.NewCredentials())
@@ -65,8 +72,27 @@ func Dial(hostName string, tlsConfig *tls.Config, logger log.Logger, opts ...grp
6572
}
6673
cp.Backoff.MaxDelay = MaxBackoffDelay
6774

75+
dtrace := newDialTracer(hostName, metricsHandler, logger)
76+
77+
contextDialer := func(ctx context.Context, s string) (net.Conn, error) {
78+
// Keep the existing gRPC behavior by using OS defaults for TCP keepalive settings.
79+
// We are on Go 1.23+ and can use KeepAliveConfig directly instead of the old KeepAlive/Control hacks.
80+
dialer := &net.Dialer{
81+
KeepAliveConfig: net.KeepAliveConfig{
82+
Enable: true,
83+
},
84+
}
85+
86+
var ndt *networkDialTrace
87+
ctx, ndt = dtrace.beginNetworkDial(ctx)
88+
conn, dialErr := dialer.DialContext(ctx, "tcp", s)
89+
dtrace.endNetworkDial(ndt, dialErr)
90+
return conn, dialErr
91+
}
92+
6893
dialOptions := []grpc.DialOption{
6994
grpcSecureOpt,
95+
grpc.WithContextDialer(contextDialer),
7096
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(maxInternodeRecvPayloadSize)),
7197
grpc.WithChainUnaryInterceptor(
7298
headersInterceptor,

common/rpc/rpc.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"go.temporal.io/server/common/log"
2020
"go.temporal.io/server/common/log/tag"
2121
"go.temporal.io/server/common/membership"
22+
"go.temporal.io/server/common/metrics"
2223
"go.temporal.io/server/common/primitives"
2324
"go.temporal.io/server/common/rpc/encryption"
2425
"go.temporal.io/server/temporal/environment"
@@ -31,9 +32,10 @@ var _ common.RPCFactory = (*RPCFactory)(nil)
3132

3233
// RPCFactory is an implementation of common.RPCFactory interface
3334
type RPCFactory struct {
34-
config *config.Config
35-
serviceName primitives.ServiceName
36-
logger log.Logger
35+
config *config.Config
36+
serviceName primitives.ServiceName
37+
logger log.Logger
38+
metricsHandler metrics.Handler
3739

3840
frontendURL string
3941
frontendHTTPURL string
@@ -59,6 +61,7 @@ func NewFactory(
5961
cfg *config.Config,
6062
sName primitives.ServiceName,
6163
logger log.Logger,
64+
metricsHandler metrics.Handler,
6265
tlsProvider encryption.TLSConfigProvider,
6366
frontendURL string,
6467
frontendHTTPURL string,
@@ -71,6 +74,7 @@ func NewFactory(
7174
config: cfg,
7275
serviceName: sName,
7376
logger: logger,
77+
metricsHandler: metricsHandler,
7478
frontendURL: frontendURL,
7579
frontendHTTPURL: frontendHTTPURL,
7680
frontendHTTPPort: frontendHTTPPort,
@@ -248,7 +252,7 @@ func (d *RPCFactory) CreateMatchingGRPCConnection(rpcAddress string) *grpc.Clien
248252

249253
func (d *RPCFactory) dial(hostName string, tlsClientConfig *tls.Config, dialOptions ...grpc.DialOption) *grpc.ClientConn {
250254
dialOptions = append(d.dialOptions, dialOptions...)
251-
connection, err := Dial(hostName, tlsClientConfig, d.logger, dialOptions...)
255+
connection, err := Dial(hostName, tlsClientConfig, d.logger, d.metricsHandler, dialOptions...)
252256
if err != nil {
253257
d.logger.Fatal("Failed to create gRPC connection", tag.Error(err))
254258
return nil

common/rpc/test/http_test.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ func TestCreateLocalFrontendHTTPClient_UsingMembership(t *testing.T) {
3434
nil,
3535
primitives.HistoryService,
3636
nil, // No logger
37+
nil, // No metrics handler
3738
nil,
3839
membership.GRPCResolverURLForTesting(monitor, primitives.FrontendService),
3940
membership.GRPCResolverURLForTesting(monitor, primitives.FrontendService),
@@ -64,6 +65,7 @@ func TestCreateLocalFrontendHTTPClient_UsingFixedHostPort(t *testing.T) {
6465
nil, // unused
6566
primitives.HistoryService,
6667
nil, // No logger
68+
nil, // No metrics handler
6769
nil,
6870
membership.GRPCResolverURLForTesting(nil, primitives.FrontendService),
6971
addr.String(),
@@ -95,6 +97,7 @@ func TestCreateLocalFrontendHTTPClient_UsingFixedHostPort_AndTLS(t *testing.T) {
9597
nil, // unused
9698
primitives.HistoryService,
9799
nil, // No logger
100+
nil, // No metrics handler
98101
nil,
99102
membership.GRPCResolverURLForTesting(nil, primitives.FrontendService),
100103
addr.String(),

common/rpc/test/rpc_common_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ func dialTestServiceAndGetTLSInfo(
157157
s.NoError(err)
158158
}
159159

160-
clientConn, err := rpc.Dial(hostport, cfg, logger)
160+
clientConn, err := rpc.Dial(hostport, cfg, logger, nil)
161161
s.NoError(err)
162162

163163
client := testservice.NewTestServiceClient(clientConn)

0 commit comments

Comments
 (0)