Skip to content

Commit a4ecf67

Browse files
Remove muxConnectionManager and replace with MuxManager (#149)
* Remove muxConnectionManager and replace with MuxManager * Fix unit tests, improve muxManager lifecycle * Fix shutdownch fake in mux_manager_test * Add missing channels to test * Clean up signaling logic, remove generics * Remove redundant IsShutdown check * Add metrics and logging for muxmanager * Ensure metricLabels is set, metrics are configured * Fix mux_manager_test * Refine connection metrics * have mux observer ping the session to get latency stats * Add more metrics * More metrics, wrap yamux logger * Move waiting for client metric close to include error case * Change to 30s stream timeout * Enable server log debugging * more logs * more logs * Include details about the replacement connection * Ensure grpc server restarts on error * fmt * Update metrics/prometheus_defs.go Co-authored-by: Paul Glass <pnglass@gmail.com> * Requested PR fixes * fmt --------- Co-authored-by: Paul Glass <pnglass@gmail.com>
1 parent a3f2d18 commit a4ecf67

17 files changed

Lines changed: 599 additions & 740 deletions

metrics/prometheus_defs.go

Lines changed: 41 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,13 @@ var (
1111

1212
// /proxy/adminservice.go
1313

14-
AdminServiceStreamsActive = DefaultGaugeVec("admin_service_streams_active", "Number of admin service streams open", "direction")
15-
AdminServiceStreamDuration = DefaultHistogramVec("admin_service_stream_duration", "The length of time each stream was open", "direction")
16-
AdminServiceStreamsOpenedCount = DefaultCounterVec("admin_service_streams_opened_count", "Number of streams opened", "direction")
17-
AdminServiceStreamsClosedCount = DefaultCounterVec("admin_service_streams_closed_count", "Number of streams closed", "direction")
18-
AdminServiceStreamReqCount = DefaultCounterVec("admin_service_stream_request_count", "Number of messages received", "direction")
19-
AdminServiceStreamRespCount = DefaultCounterVec("admin_service_stream_response_count", "Number of messages received", "direction")
14+
AdminServiceStreamsActive = DefaultGaugeVec("admin_service_streams_active", "Number of admin service streams open", "direction")
15+
AdminServiceStreamDuration = DefaultHistogramVec("admin_service_stream_duration", "The length of time each stream was open", "direction")
16+
AdminServiceWaitingForConnection = DefaultGaugeVec("admin_service_waiting_for_connection", "Indicates the number of requests waiting on a client", "direction")
17+
AdminServiceStreamsOpenedCount = DefaultCounterVec("admin_service_streams_opened_count", "Number of streams opened", "direction")
18+
AdminServiceStreamsClosedCount = DefaultCounterVec("admin_service_streams_closed_count", "Number of streams closed", "direction")
19+
AdminServiceStreamReqCount = DefaultCounterVec("admin_service_stream_request_count", "Number of messages received", "direction")
20+
AdminServiceStreamRespCount = DefaultCounterVec("admin_service_stream_response_count", "Number of messages received", "direction")
2021
// AdminServiceStreamTerminatedCount's labels are direction (inbound/outbound) and terminated_by (source/target)
2122
AdminServiceStreamTerminatedCount = DefaultCounterVec("admin_service_stream_terminated_count", "Stream was terminated by remote server", "direction", "terminated_by")
2223

@@ -35,14 +36,19 @@ var (
3536
ProxyServiceStopped = DefaultCounterVec("proxy_service_stopped", "Emitted on service shutdown", "direction")
3637
ProxyServiceRestarted = DefaultCounterVec("proxy_service_restarted", "Emitted on service shutdown", "direction")
3738

39+
// /proxy/temporal_api_server.go
40+
41+
GRPCServerStarted = DefaultCounterVec("grpc_server_started", "Emits when the grpc server is started", "service_name")
42+
GRPCServerStopped = DefaultCounterVec("grpc_server_stopped", "Emits when the grpc server is stopped", "service_name", "error")
43+
3844
// /transport/grpc.go
3945
// Gratuitous hack: Until https://github.com/grpc-ecosystem/go-grpc-middleware/issues/783 is addressed,
4046
// we need to register a dependent registry with constant labels applied.
4147

4248
GRPCOutboundClientMetrics = GetStandardGRPCClientInterceptor("outbound")
4349
GRPCInboundClientMetrics = GetStandardGRPCClientInterceptor("inbound")
4450

45-
// /transport/mux_connection_manager.go
51+
// Mux Session
4652

4753
// Every yamux session has these available, so let's use them in the prometheus tags so we can clearly see each connection
4854
muxSessionLabels = []string{"local_addr", "remote_addr", "mode", "config_name"}
@@ -52,9 +58,24 @@ var (
5258
muxSessionLabels...)
5359
MuxObserverReportCount = DefaultCounterVec("mux_observer_report_count", "Number of observer executions",
5460
muxSessionLabels...)
61+
MuxSessionPingError = DefaultCounterVec("mux_observer_session_ping_error", "Failed ping count",
62+
muxSessionLabels...)
63+
MuxSessionPingLatency = DefaultCounterVec("mux_observer_session_ping_latency", "Ping latency for the active session",
64+
muxSessionLabels...)
65+
MuxSessionPingSuccess = DefaultCounterVec("mux_observer_session_ping_success", "Ping successes for the active session",
66+
muxSessionLabels...)
67+
68+
// Mux Manager
69+
5570
muxManagerLabels = []string{"addr", "mode", "config_name"}
5671
MuxErrors = DefaultCounterVec("mux_errors", "Number of errors observed from mux", muxManagerLabels...)
5772
MuxConnectionEstablish = DefaultCounterVec("mux_connection_establish", "Number of times mux has established", muxManagerLabels...)
73+
MuxWaitingConnections = DefaultGaugeVec("mux_waiting_connections", "Number of goroutines waiting for a mux connection", muxManagerLabels...)
74+
MuxConnectionProvided = DefaultCounterVec("mux_connection_provided", "Number of times a connection was provided from WithConnection", muxManagerLabels...)
75+
MuxDialFailed = DefaultCounterVec("mux_dial_failed", "Mux failed when dialing", muxManagerLabels...)
76+
MuxDialSuccess = DefaultCounterVec("mux_dial_success", "Mux succeeded on dial", muxManagerLabels...)
77+
78+
// Translation interceptor
5879

5980
translationLabels = []string{"kind", "message_type"}
6081
TranslationCount = DefaultCounterVec("translation_success", "Count of message translations", translationLabels...)
@@ -74,6 +95,7 @@ func init() {
7495
prometheus.MustRegister(collectors.NewGoCollector(collectors.WithGoCollectorRuntimeMetrics(collectors.MetricsAll),
7596
collectors.WithoutGoCollectorRuntimeMetrics(collectors.MetricsDebug.Matcher)))
7697
prometheus.MustRegister(AdminServiceStreamsActive)
98+
prometheus.MustRegister(AdminServiceWaitingForConnection)
7799
prometheus.MustRegister(AdminServiceStreamDuration)
78100
prometheus.MustRegister(AdminServiceStreamsOpenedCount)
79101
prometheus.MustRegister(AdminServiceStreamsClosedCount)
@@ -91,15 +113,27 @@ func init() {
91113
prometheus.MustRegister(ProxyServiceCreated)
92114
prometheus.MustRegister(ProxyServiceStopped)
93115
prometheus.MustRegister(ProxyServiceRestarted)
116+
prometheus.MustRegister(GRPCServerStarted)
117+
prometheus.MustRegister(GRPCServerStopped)
94118

95119
prometheus.MustRegister(GRPCOutboundClientMetrics)
96120
prometheus.MustRegister(GRPCInboundClientMetrics)
97121

122+
// Mux Session
98123
prometheus.MustRegister(MuxSessionOpen)
99124
prometheus.MustRegister(MuxStreamsActive)
100125
prometheus.MustRegister(MuxObserverReportCount)
126+
prometheus.MustRegister(MuxSessionPingError)
127+
prometheus.MustRegister(MuxSessionPingLatency)
128+
prometheus.MustRegister(MuxSessionPingSuccess)
129+
130+
// Mux Manager
101131
prometheus.MustRegister(MuxErrors)
102132
prometheus.MustRegister(MuxConnectionEstablish)
133+
prometheus.MustRegister(MuxWaitingConnections)
134+
prometheus.MustRegister(MuxConnectionProvided)
135+
prometheus.MustRegister(MuxDialFailed)
136+
prometheus.MustRegister(MuxDialSuccess)
103137

104138
prometheus.MustRegister(TranslationCount)
105139
prometheus.MustRegister(TranslationErrors)

proxy/adminservice.go

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -261,21 +261,26 @@ func (s *adminServiceProxyServer) StreamWorkflowReplicationMessages(
261261
}
262262
logger.Info("AdminStreamReplicationMessages started.")
263263
streamsActiveGauge := metrics.AdminServiceStreamsActive.WithLabelValues(directionLabel)
264-
streamsActiveGauge.Inc()
265-
metrics.AdminServiceStreamsOpenedCount.WithLabelValues(directionLabel).Inc()
266-
defer streamsActiveGauge.Dec()
267-
defer logger.Info("AdminStreamReplicationMessages stopped.")
268264

269265
// simply forwarding target metadata
270266
outgoingContext := metadata.NewOutgoingContext(targetStreamServer.Context(), targetMetadata)
271267
outgoingContext, cancel := context.WithCancel(outgoingContext)
272268
defer cancel()
273269

270+
// The underlying adminClient will try to grab a connection when we call StreamWorkflowReplicationMessages.
271+
// The connection is separately managed, so we want to see how long it takes to establish that conn.
272+
metrics.AdminServiceWaitingForConnection.WithLabelValues(directionLabel).Inc()
274273
sourceStreamClient, err := s.adminClient.StreamWorkflowReplicationMessages(outgoingContext)
274+
metrics.AdminServiceWaitingForConnection.WithLabelValues(directionLabel).Dec()
275275
if err != nil {
276276
logger.Error("remoteAdminServiceClient.StreamWorkflowReplicationMessages encountered error", tag.Error(err))
277277
return err
278278
}
279+
// We succesfully got a stream connection, so mark the stream as active
280+
streamsActiveGauge.Inc()
281+
metrics.AdminServiceStreamsOpenedCount.WithLabelValues(directionLabel).Inc()
282+
defer streamsActiveGauge.Dec()
283+
defer logger.Info("AdminStreamReplicationMessages stopped.")
279284
streamStartTime := time.Now()
280285

281286
// When one side of the stream dies, we want to tell the other side to hang up

proxy/temporal_api_server.go

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package proxy
33
import (
44
"fmt"
55
"io"
6+
"time"
67

78
"go.temporal.io/api/workflowservice/v1"
89
"go.temporal.io/server/api/adminservice/v1"
@@ -11,6 +12,7 @@ import (
1112
"google.golang.org/grpc"
1213

1314
"github.com/temporalio/s2s-proxy/config"
15+
"github.com/temporalio/s2s-proxy/metrics"
1416
"github.com/temporalio/s2s-proxy/transport"
1517
)
1618

@@ -36,6 +38,9 @@ func NewTemporalAPIServer(
3638
logger log.Logger,
3739
) *TemporalAPIServer {
3840
server := grpc.NewServer(serverOptions...)
41+
adminservice.RegisterAdminServiceServer(server, adminHandler)
42+
workflowservice.RegisterWorkflowServiceServer(server, workflowserviceHandler)
43+
metrics.GRPCServerStarted.WithLabelValues(serviceName)
3944
return &TemporalAPIServer{
4045
serviceName: serviceName,
4146
serverConfig: serverConfig,
@@ -48,21 +53,25 @@ func NewTemporalAPIServer(
4853
}
4954

5055
func (s *TemporalAPIServer) Start() {
51-
adminservice.RegisterAdminServiceServer(s.server, s.adminHandler)
52-
workflowservice.RegisterWorkflowServiceServer(s.server, s.workflowserviceHandler)
53-
5456
go func() {
55-
if err := s.serverTransport.Serve(s.server); err != nil {
57+
for !s.serverTransport.IsClosed() {
58+
metrics.GRPCServerStarted.WithLabelValues(s.serviceName).Inc()
59+
err := s.serverTransport.Serve(s.server)
5660
if err == io.EOF {
5761
// grpc server can get EOF error if grpc server relies on client side of
5862
// mux connection. Given a mux connection from node A (mux client) to node B (mux server),
5963
// and start a grpc server on A using mux client. If node B (mux server) closed the connection,
6064
// grpc server on A can get an EOF client connection error from underlying mux connection.
6165
// It should not happen if grpc server is based on mux server or normal TCP connection.
62-
s.logger.Warn("grpc server received EOF error")
63-
} else {
66+
s.logger.Info("grpc server received EOF! Connection is closing")
67+
metrics.GRPCServerStopped.WithLabelValues(s.serviceName, "eof").Inc()
68+
} else if err != nil {
6469
s.logger.Error("grpc server fatal error ", tag.Error(err))
70+
metrics.GRPCServerStopped.WithLabelValues(s.serviceName, "unknown").Inc()
71+
} else {
72+
metrics.GRPCServerStopped.WithLabelValues(s.serviceName, "none").Inc()
6573
}
74+
time.Sleep(1 * time.Second)
6675
}
6776
}()
6877
}
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package transport
1+
package grpcutil
22

33
import (
44
"context"
@@ -36,7 +36,7 @@ const (
3636
// The hostName syntax is defined in
3737
// https://github.com/grpc/grpc/blob/master/doc/naming.md.
3838
// e.g. to use dns resolver, a "dns:///" prefix should be applied to the target.
39-
func dial(hostName string, tlsConfig *tls.Config, clientMetrics *grpcprom.ClientMetrics, dialer func(ctx context.Context, addr string) (net.Conn, error)) (*grpc.ClientConn, error) {
39+
func Dial(hostName string, tlsConfig *tls.Config, clientMetrics *grpcprom.ClientMetrics, dialer func(ctx context.Context, addr string) (net.Conn, error)) (*grpc.ClientConn, error) {
4040
var grpcSecureOpt grpc.DialOption
4141
if tlsConfig == nil {
4242
grpcSecureOpt = grpc.WithTransportCredentials(insecure.NewCredentials())

transport/mux/establisher.go

Lines changed: 31 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package mux
33
import (
44
"crypto/tls"
55
"errors"
6-
"math/rand/v2"
76
"net"
87
"time"
98

@@ -35,14 +34,23 @@ type shutdownCheck interface {
3534
IsShutdown() bool
3635
}
3736

38-
var retryPolicy = backoff.NewExponentialRetryPolicy(time.Second).
39-
WithBackoffCoefficient(1.5).
40-
WithMaximumInterval(30 * time.Second)
37+
var (
38+
retryPolicy = backoff.NewExponentialRetryPolicy(time.Second).
39+
WithBackoffCoefficient(1.5).
40+
WithMaximumInterval(30 * time.Second)
41+
42+
// The establisher provider never has cleanup work, so we provide the same closed channel on CloseCh()
43+
alwaysClosedCh = make(chan struct{})
44+
)
45+
46+
func init() {
47+
close(alwaysClosedCh)
48+
}
4149

4250
// NewMuxEstablisherProvider makes an outbound call using the provided TCP settings. This constructor handles unpacking
4351
// the TLS config, configures the connection provider with retry and exponential backoff, and sets a disconnect
4452
// sleep time of 1-2 seconds.
45-
func NewMuxEstablisherProvider(name string, transportFn SetTransportCallback, setting config.TCPClientSetting, metricLabels []string, logger log.Logger, shutDown channel.ShutdownOnce) (*MuxProvider, error) {
53+
func NewMuxEstablisherProvider(name string, transportFn SetTransportCallback, setting config.TCPClientSetting, metricLabels []string, logger log.Logger, shutDown channel.ShutdownOnce) (MuxProvider, error) {
4654
tlsWrapper := func(conn net.Conn) net.Conn { return conn }
4755
if tlsCfg := setting.TLS; tlsCfg.IsEnabled() {
4856
tlsConfig, err := encryption.GetClientTLSConfig(tlsCfg)
@@ -62,12 +70,14 @@ func NewMuxEstablisherProvider(name string, transportFn SetTransportCallback, se
6270
shutdownCheck: shutDown,
6371
metricLabels: metricLabels,
6472
}
65-
sessionFn := func(conn net.Conn) (*yamux.Session, error) { return yamux.Client(conn, nil) }
66-
disconnectFn := func() {
67-
// If the server rapidly disconnects us, we don't want to get caught in a tight loop. Sleep 1-2 seconds before retry
68-
time.Sleep(time.Second + time.Duration(rand.IntN(1000))*time.Millisecond)
73+
sessionFn := func(conn net.Conn) (*yamux.Session, error) {
74+
cfg := yamux.DefaultConfig()
75+
cfg.Logger = wrapLoggerForYamux{logger: logger}
76+
cfg.LogOutput = nil
77+
cfg.StreamCloseTimeout = 30 * time.Second
78+
return yamux.Client(conn, cfg)
6979
}
70-
return NewMuxProvider(name, connPv, sessionFn, disconnectFn, transportFn, metricLabels, logger, shutDown), nil
80+
return NewMuxProvider(name, connPv, sessionFn, func() {}, transportFn, metricLabels, logger, shutDown), nil
7181
}
7282

7383
// NewConnection makes a TCP call to establish a connection, then returns it. Retries with backoff over 30 seconds
@@ -85,17 +95,23 @@ func (p *establishingConnProvider) NewConnection() (net.Conn, error) {
8595
}
8696

8797
onError := func(err error) bool {
88-
p.logger.Error("mux client failed to dial", tag.Error(err))
98+
if !p.shutdownCheck.IsShutdown() {
99+
p.logger.Info("mux client failed to dial", tag.Error(err))
100+
}
89101
return !p.shutdownCheck.IsShutdown()
90102
}
91103
if err := backoff.ThrottleRetry(dialFn, retryPolicy, onError); err != nil {
92-
p.logger.Error("mux client failed to dial with retry", tag.Error(err))
93-
metrics.MuxErrors.WithLabelValues(p.metricLabels...).Inc()
104+
if !p.shutdownCheck.IsShutdown() {
105+
p.logger.Error("mux client failed to dial with retry", tag.Error(err))
106+
metrics.MuxErrors.WithLabelValues(p.metricLabels...).Inc()
107+
}
94108
return nil, err
95109
}
110+
metrics.MuxConnectionEstablish.WithLabelValues(p.metricLabels...).Inc()
96111
return client, nil
97112
}
98113

99-
func (p *establishingConnProvider) CloseProvider() {
100-
// Nothing to close on the client side, we're done.
114+
// CloseCh for the establisher is a no-op, because only the Conn needs to be closed
115+
func (p *establishingConnProvider) CloseCh() <-chan struct{} {
116+
return alwaysClosedCh
101117
}

0 commit comments

Comments
 (0)