Skip to content

Commit 4289c12

Browse files
Improve metrics and logs. Have MuxManager.Start wait 1m so that muxes are connected before we start outbound server. (#164)
1 parent 8eba132 commit 4289c12

13 files changed

Lines changed: 109 additions & 16 deletions

metrics/prometheus_defs.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ var (
5959
// Mux Manager
6060

6161
muxManagerLabels = []string{"addr", "mode", "config_name"}
62-
MuxErrors = DefaultCounterVec("mux_errors", "Number of errors observed from mux", muxManagerLabels...)
62+
MuxErrors = DefaultCounterVec("mux_errors", "Number of errors observed from mux", append(muxManagerLabels, "error")...)
6363
MuxConnectionEstablish = DefaultCounterVec("mux_connection_establish", "Number of times mux has established", muxManagerLabels...)
6464
MuxDialFailed = DefaultCounterVec("mux_dial_failed", "Mux failed when dialing", muxManagerLabels...)
6565
MuxDialSuccess = DefaultCounterVec("mux_dial_success", "Mux succeeded on dial", muxManagerLabels...)

proxy/adminservice.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,10 @@ func NewAdminServiceProxyServer(
4343
reportStreamValue func(idx int32, value int32),
4444
logger log.Logger,
4545
) adminservice.AdminServiceServer {
46-
logger = log.With(logger, common.ServiceTag(serviceName))
46+
// The AdminServiceStreams will duplicate the same output for an underlying connection issue hundreds of times.
47+
// Limit their output to three times per minute
48+
logger = log.NewThrottledLogger(log.With(logger, common.ServiceTag(serviceName)),
49+
func() float64 { return 3.0 / 60.0 })
4750
return &adminServiceProxyServer{
4851
adminClient: adminClient,
4952
logger: logger,

proxy/cluster_connection_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,12 @@ import (
1818
"github.com/temporalio/s2s-proxy/endtoendtest/testservices"
1919
"github.com/temporalio/s2s-proxy/metrics"
2020
"github.com/temporalio/s2s-proxy/transport/grpcutil"
21+
"github.com/temporalio/s2s-proxy/transport/mux"
2122
)
2223

2324
func init() {
2425
_ = os.Setenv("TEMPORAL_TEST_LOG_LEVEL", "error")
26+
mux.MuxManagerStartDelay = 0
2527
}
2628

2729
func getDynamicPorts(t *testing.T, num int) []string {

proxy/test/echo_proxy_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,13 @@ import (
1717
"github.com/temporalio/s2s-proxy/config"
1818
"github.com/temporalio/s2s-proxy/encryption"
1919
"github.com/temporalio/s2s-proxy/endtoendtest"
20+
"github.com/temporalio/s2s-proxy/transport/mux"
2021
)
2122

2223
func init() {
2324
// silence info log spam
2425
_ = os.Setenv("TEMPORAL_TEST_LOG_LEVEL", "error")
26+
mux.MuxManagerStartDelay = 0
2527
}
2628

2729
const (

transport/mux/establisher.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,6 @@ func (p *establishingConnProvider) NewConnection() (net.Conn, error) {
110110
return nil, err
111111
}
112112
metrics.MuxDialSuccess.WithLabelValues(p.metricLabels...).Inc()
113-
metrics.MuxConnectionEstablish.WithLabelValues(p.metricLabels...).Inc()
114113
return client, nil
115114
}
116115

transport/mux/grpc_mux_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,10 @@ import (
1919
"github.com/temporalio/s2s-proxy/transport/grpcutil"
2020
)
2121

22+
func init() {
23+
MuxManagerStartDelay = 0
24+
}
25+
2226
func TestGRPCMux(t *testing.T) {
2327
logger := log.NewTestLogger()
2428

transport/mux/multi_mux_manager.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"strconv"
88
"strings"
99
"sync"
10+
"time"
1011

1112
"github.com/hashicorp/yamux"
1213
"go.temporal.io/server/common/channel"
@@ -17,6 +18,8 @@ import (
1718
"github.com/temporalio/s2s-proxy/transport/mux/session"
1819
)
1920

21+
var MuxManagerStartDelay = time.Minute
22+
2023
type (
2124
multiMuxManager struct {
2225
lifetime context.Context
@@ -150,6 +153,25 @@ func (m *multiMuxManager) Start() {
150153
m.init.Do(func() {
151154
// Start the mux provider
152155
m.muxProvider.Start()
156+
go func() {
157+
ticker := time.NewTicker(time.Minute)
158+
for m.lifetime.Err() == nil {
159+
select {
160+
case <-ticker.C:
161+
case <-m.lifetime.Done():
162+
}
163+
sb := strings.Builder{}
164+
m.muxesLock.RLock()
165+
for _, v := range m.muxes {
166+
sb.WriteString(v.Describe())
167+
}
168+
m.muxesLock.RUnlock()
169+
m.logger.Info("MuxManager status", tag.NewBoolTag("shutdown", m.hasShutDown.IsShutdown()),
170+
tag.Name(m.name), tag.NewStringTag("sessions", sb.String()))
171+
}
172+
}()
173+
// Allow the mux provider some time to provide connections
174+
<-time.After(MuxManagerStartDelay)
153175
})
154176
}
155177
func (m *multiMuxManager) Describe() string {

transport/mux/multi_mux_manager_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717

1818
func init() {
1919
_ = os.Setenv("TEMPORAL_TEST_LOG_LEVEL", "error")
20+
MuxManagerStartDelay = 0
2021
}
2122

2223
func TestMultiMuxManager(t *testing.T) {

transport/mux/multimux_test.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,30 @@ func (s *muxServer) Run(t *testing.T) {
4444
}
4545
}
4646

47+
func TestListenerHandoff(t *testing.T) {
48+
listener, err := net.Listen("tcp", "127.0.0.1:0")
49+
require.NoError(t, err)
50+
serverConnCh := make(chan net.Conn)
51+
listenerClosedCh := make(chan struct{})
52+
go func() {
53+
serverSide, err := listener.Accept()
54+
require.NoError(t, err)
55+
serverConnCh <- serverSide
56+
err = listener.Close()
57+
require.NoError(t, err)
58+
close(listenerClosedCh)
59+
}()
60+
clientConn, err := net.Dial("tcp", listener.Addr().String())
61+
require.NoError(t, err)
62+
serverConn := <-serverConnCh
63+
<-listenerClosedCh
64+
_, _ = clientConn.Write([]byte("Hi"))
65+
buf := make([]byte, 1024)
66+
n, _ := serverConn.Read(buf)
67+
require.Equal(t, []byte("Hi"), buf[:n])
68+
close(serverConnCh)
69+
}
70+
4771
func TestMultiMux(t *testing.T) {
4872
wg := &sync.WaitGroup{}
4973
serverCh, shutDownMux, muxListenerAddrCh := make(chan *muxServer), make(chan struct{}), make(chan string)

transport/mux/observer.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ func emitYamuxMetrics(lifetime context.Context, muxCategory string, id string, s
5959
} else {
6060
// Clean up the label so we don't report it forever
6161
metrics.MuxStreamsActive.DeleteLabelValues(metricLabels...)
62+
metrics.MuxSessionOpen.DeleteLabelValues(metricLabels...)
6263
}
6364
metrics.MuxObserverReportCount.WithLabelValues(metricLabels...).Inc()
6465
}

0 commit comments

Comments
 (0)