Skip to content

Commit bcb9ae1

Browse files
Add liveness reporting for the yamux observer (#101)
* Add liveness reporting for the yamux observer * Add missing vectors to registry * IF the health server doesn't start immediately, wait up to 50ms
1 parent d2e238d commit bcb9ae1

4 files changed

Lines changed: 32 additions & 7 deletions

File tree

metrics/metrics.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ func DefaultGauge(name string, help string) prometheus.Gauge {
6262
}
6363

6464
// DefaultGaugeVec provides a prometheus GaugeVec for the requested name. The name will be sanitized, and the recommended
65-
// namespace and subsystem will be set. Vector metrics allow the use of labels, so if you need a label, then use this.
65+
// namespace and subsystem will be set. Vector metrics allow the use of labels, so if you need labels on your metrics, then use this.
6666
func DefaultGaugeVec(name string, help string, labels ...string) *prometheus.GaugeVec {
6767
return prometheus.NewGaugeVec(prometheus.GaugeOpts{
6868
Namespace: "temporal",
@@ -83,6 +83,17 @@ func DefaultCounter(name string, help string) prometheus.Counter {
8383
})
8484
}
8585

86+
// DefaultCounterVec provides a prometheus CounterVec for the requested name. The name will be sanitized, and the recommended
87+
// namespace and subsystem will be set. Vector metrics allow the use of labels, so if you need labels on your metrics, then use this.
88+
func DefaultCounterVec(name string, help string, labels ...string) *prometheus.CounterVec {
89+
return prometheus.NewCounterVec(prometheus.CounterOpts{
90+
Namespace: "temporal",
91+
Subsystem: "s2s_proxy",
92+
Name: SanitizeForPrometheus(name),
93+
Help: help,
94+
}, labels)
95+
}
96+
8697
// wrapLoggerForPrometheus is necessary to adapt our temporal Logger to Prometheus's Println interface
8798
type wrapLoggerForPrometheus struct {
8899
log.Logger

metrics/prometheus_defs.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ var (
2828
muxSessionLabels...)
2929
MuxStreamsActive = DefaultGaugeVec("mux_streams_active", "Immediate count of the current streams open",
3030
muxSessionLabels...)
31+
MuxObserverReportCount = DefaultCounterVec("mux_observer_report_count", "Number of observer executions",
32+
muxSessionLabels...)
3133
)
3234

3335
func init() {
@@ -36,4 +38,7 @@ func init() {
3638
prometheus.MustRegister(HealthCheckIsHealthy)
3739
prometheus.MustRegister(HealthCheckHealthyCount)
3840
prometheus.MustRegister(AdminServiceStreamsActive)
41+
prometheus.MustRegister(MuxSessionOpen)
42+
prometheus.MustRegister(MuxStreamsActive)
43+
prometheus.MustRegister(MuxObserverReportCount)
3944
}

proxy/test/wiring_test.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"io"
66
"net/http"
77
"testing"
8+
"time"
89

910
"github.com/stretchr/testify/assert"
1011
"go.temporal.io/api/workflowservice/v1"
@@ -61,8 +62,14 @@ func TestWiringWithEchoService(t *testing.T) {
6162
echoServer.stop()
6263
}()
6364
// Test s2s-proxy health check
64-
_, err := http.Get(fmt.Sprintf("http://%s/health", echoServerInfo.s2sProxyConfig.HealthCheck.ListenAddress))
65-
assert.NoError(t, err)
65+
66+
// The server may take a few 10s of ms to start
67+
var healthErr = fmt.Errorf("Not started")
68+
for attempts := 0; healthErr != nil && attempts < 5; attempts++ {
69+
_, healthErr = http.Get(fmt.Sprintf("http://%s/health", echoServerInfo.s2sProxyConfig.HealthCheck.ListenAddress))
70+
time.Sleep(10 * time.Millisecond)
71+
}
72+
assert.NoError(t, healthErr)
6673

6774
// Confirm that Prometheus initialized and is reporting. We should see proxy_start_count
6875
serverMetrics := scrapePrometheus(t, echoServerInfo.s2sProxyConfig.Metrics.Prometheus.ListenAddress)

transport/mux_connection_manager.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ func (m *muxConnectMananger) serverLoop(setting config.TCPServerSetting) error {
156156
m.logger.Info("Accept new connection", tag.NewStringTag("remoteAddr", conn.RemoteAddr().String()))
157157

158158
session, err := yamux.Server(conn, nil)
159-
go observeYamuxSession(session, m.config)
159+
go observeYamuxSession(session, m.config, m.logger)
160160
if err != nil {
161161
m.logger.Fatal("yamux.Server failed", tag.Error(err))
162162
}
@@ -218,7 +218,7 @@ func (m *muxConnectMananger) clientLoop(setting config.TCPClientSetting) error {
218218
}
219219

220220
session, err := yamux.Client(conn, nil)
221-
go observeYamuxSession(session, m.config)
221+
go observeYamuxSession(session, m.config, m.logger)
222222
if err != nil {
223223
m.logger.Fatal("yamux.Client failed", tag.Error(err))
224224
}
@@ -234,14 +234,15 @@ func (m *muxConnectMananger) clientLoop(setting config.TCPClientSetting) error {
234234

235235
// observeYamuxSession creates a goroutine that pings the provided yamux session repeatedly and gathers its two
236236
// metrics: Whether the server is alive and how many streams it has open.
237-
func observeYamuxSession(session *yamux.Session, config config.MuxTransportConfig) {
237+
func observeYamuxSession(session *yamux.Session, config config.MuxTransportConfig, logger log.Logger) {
238238
if session == nil {
239239
// If we got a null session, we can't even generate tags to report
240240
return
241241
}
242242
defer func() {
243243
// This is an async monitor. Don't let it crash the rest of the program if there's a problem
244-
recover()
244+
err := recover()
245+
logger.Warn("Yamux observer died!", tag.NewStringTag("muxConfigName", config.Name), tag.NewAnyTag("err", err))
245246
}()
246247
labels := []string{session.LocalAddr().String(),
247248
session.RemoteAddr().String(),
@@ -261,6 +262,7 @@ func observeYamuxSession(session *yamux.Session, config config.MuxTransportConfi
261262
}
262263
metrics.MuxSessionOpen.WithLabelValues(labels...).Set(float64(sessionActive))
263264
metrics.MuxStreamsActive.WithLabelValues(labels...).Set(float64(session.NumStreams()))
265+
metrics.MuxObserverReportCount.WithLabelValues(labels...).Inc()
264266
}
265267
}
266268

0 commit comments

Comments
 (0)