Skip to content

Commit 5dafb3c

Browse files
Refactor stream connections for better readability, add stream metrics (#131)
* Refactor stream connections for better readability * remove breaks to conform to linter * Native histograms are behaving badly in Grafana, remove for now * Remove remaining native histogram
1 parent fb88289 commit 5dafb3c

4 files changed

Lines changed: 272 additions & 98 deletions

File tree

metrics/metrics.go

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,13 @@ func GetStandardGRPCInterceptor(labelNamesInContext ...string) *grpcprom.ServerM
4141
grpcprom.WithServerHandlingTimeHistogram(
4242
grpcprom.WithHistogramNamespace("temporal"),
4343
grpcprom.WithHistogramSubsystem("s2s_proxy"),
44-
// TODO: Use Native histograms or configure the buckets here?
44+
// TODO: Enable native histograms later
45+
//grpcprom.WithHistogramOpts(&prometheus.HistogramOpts{
46+
// // Only Buckets, NativeHistogramBucketFactor, and other NativeHistogram options are supported here.
47+
// // Other histogram options should be supplied with grpcprom.WithXXXX
48+
// NativeHistogramBucketFactor: 1.1,
49+
// NativeHistogramMaxBucketNumber: 10,
50+
//}),
4551
),
4652
grpcprom.WithServerCounterOptions(
4753
grpcprom.WithNamespace("temporal"),
@@ -57,6 +63,12 @@ func GetStandardGRPCClientInterceptor(direction string) *grpcprom.ClientMetrics
5763
grpcprom.WithHistogramNamespace("temporal"),
5864
// TODO: Gratuitous hack until https://github.com/grpc-ecosystem/go-grpc-middleware/issues/783
5965
grpcprom.WithHistogramSubsystem("s2s_proxy_"+direction),
66+
// TODO: Enable native histograms later
67+
//grpcprom.WithHistogramOpts(&prometheus.HistogramOpts{
68+
// // Only Buckets, NativeHistogramBucketFactor, and other NativeHistogram options are supported here.
69+
// // Other histogram options should be supplied with grpcprom.WithXXXX
70+
// NativeHistogramBucketFactor: 1.1,
71+
//}),
6072
),
6173
grpcprom.WithClientCounterOptions(
6274
grpcprom.WithNamespace("temporal"),
@@ -88,6 +100,19 @@ func DefaultGaugeVec(name string, help string, labels ...string) *prometheus.Gau
88100
}, labels)
89101
}
90102

103+
// DefaultHistogramVec provides a prometheus HistogramVec for the requested name. The name will be sanitized, and the recommended
104+
// namespace and subsystem will be set. Vector metrics allow the use of labels, so if you need labels on your metrics, then use this.
105+
func DefaultHistogramVec(name string, help string, labels ...string) *prometheus.HistogramVec {
106+
return prometheus.NewHistogramVec(prometheus.HistogramOpts{
107+
Namespace: "temporal",
108+
Subsystem: "s2s_proxy",
109+
Name: SanitizeForPrometheus(name),
110+
Help: help,
111+
// TODO: Native histograms aren't supported in our Grafana just yet
112+
//NativeHistogramBucketFactor: 1.1,
113+
}, labels)
114+
}
115+
91116
// DefaultCounter provides a prometheus Counter for the requested name. The name will be sanitized, and the recommended
92117
// namespace and subsystem will be set.
93118
func DefaultCounter(name string, help string) prometheus.Counter {

metrics/prometheus_defs.go

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,14 @@ var (
1111

1212
// /proxy/adminservice.go
1313

14-
AdminServiceStreamsActive = DefaultGaugeVec("admin_service_streams_active", "Number of admin service streams open",
15-
"direction")
14+
AdminServiceStreamsActive = DefaultGaugeVec("admin_service_streams_active", "Number of admin service streams open", "direction")
15+
AdminServiceStreamDuration = DefaultHistogramVec("admin_service_stream_duration", "The length of time each stream was open", "direction")
16+
AdminServiceStreamsOpenedCount = DefaultCounterVec("admin_service_streams_opened_count", "Number of streams opened", "direction")
17+
AdminServiceStreamsClosedCount = DefaultCounterVec("admin_service_streams_closed_count", "Number of streams closed", "direction")
18+
AdminServiceStreamReqCount = DefaultCounterVec("admin_service_stream_request_count", "Number of messages received", "direction")
19+
AdminServiceStreamRespCount = DefaultCounterVec("admin_service_stream_response_count", "Number of messages received", "direction")
20+
// AdminServiceStreamTerminatedCount's labels are direction (inbound/outbound) and terminated_by (source/target)
21+
AdminServiceStreamTerminatedCount = DefaultCounterVec("admin_service_stream_terminated_count", "Stream was terminated by remote server", "direction", "terminated_by")
1622

1723
// /proxy/health_check.go
1824

@@ -49,13 +55,23 @@ func init() {
4955
// Re-register the go collector with all non-debug metrics. See: https://pkg.go.dev/runtime/metrics
5056
prometheus.MustRegister(collectors.NewGoCollector(collectors.WithGoCollectorRuntimeMetrics(collectors.MetricsAll),
5157
collectors.WithoutGoCollectorRuntimeMetrics(collectors.MetricsDebug.Matcher)))
52-
prometheus.MustRegister(ProxyStartCount)
58+
prometheus.MustRegister(AdminServiceStreamsActive)
59+
prometheus.MustRegister(AdminServiceStreamDuration)
60+
prometheus.MustRegister(AdminServiceStreamsOpenedCount)
61+
prometheus.MustRegister(AdminServiceStreamsClosedCount)
62+
prometheus.MustRegister(AdminServiceStreamReqCount)
63+
prometheus.MustRegister(AdminServiceStreamRespCount)
64+
prometheus.MustRegister(AdminServiceStreamTerminatedCount)
65+
66+
prometheus.MustRegister(HealthCheckIsHealthy)
67+
prometheus.MustRegister(HealthCheckHealthyCount)
68+
5369
prometheus.MustRegister(GRPCServerMetrics)
70+
prometheus.MustRegister(ProxyStartCount)
71+
5472
prometheus.MustRegister(GRPCOutboundClientMetrics)
5573
prometheus.MustRegister(GRPCInboundClientMetrics)
56-
prometheus.MustRegister(HealthCheckIsHealthy)
57-
prometheus.MustRegister(HealthCheckHealthyCount)
58-
prometheus.MustRegister(AdminServiceStreamsActive)
74+
5975
prometheus.MustRegister(MuxSessionOpen)
6076
prometheus.MustRegister(MuxStreamsActive)
6177
prometheus.MustRegister(MuxObserverReportCount)

proxy/admin_stream_transfer.go

Lines changed: 187 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,187 @@
1+
package proxy
2+
3+
import (
4+
"fmt"
5+
"io"
6+
"sync"
7+
"sync/atomic"
8+
"time"
9+
10+
"go.temporal.io/api/serviceerror"
11+
"go.temporal.io/server/api/adminservice/v1"
12+
"go.temporal.io/server/common/channel"
13+
"go.temporal.io/server/common/log"
14+
"go.temporal.io/server/common/log/tag"
15+
16+
"github.com/temporalio/s2s-proxy/metrics"
17+
)
18+
19+
type StreamRequestOrResponse interface {
20+
adminservice.StreamWorkflowReplicationMessagesRequest | adminservice.StreamWorkflowReplicationMessagesResponse
21+
}
22+
type ValueWithError[T StreamRequestOrResponse] struct {
23+
val *T
24+
err error
25+
}
26+
type recvable[T StreamRequestOrResponse] interface {
27+
Recv() (*T, error)
28+
}
29+
30+
// startListener creates a channel of Recv() from the provided source. It is the job of the caller to cancel the context
31+
// that will stop Recv(), or the goroutine created by this will block forever
32+
func startListener[T StreamRequestOrResponse](
33+
receiver recvable[T],
34+
shutdownChan channel.ShutdownOnce,
35+
) chan ValueWithError[T] {
36+
targetStreamServerData := make(chan ValueWithError[T])
37+
go func() {
38+
for !shutdownChan.IsShutdown() {
39+
req, err := receiver.Recv()
40+
targetStreamServerData <- ValueWithError[T]{val: req, err: err}
41+
}
42+
}()
43+
return targetStreamServerData
44+
}
45+
46+
func transferSourceToTarget(
47+
sourceStreamClient adminservice.AdminService_StreamWorkflowReplicationMessagesClient,
48+
targetStreamServer adminservice.AdminService_StreamWorkflowReplicationMessagesServer,
49+
wg *sync.WaitGroup,
50+
shutdownChan channel.ShutdownOnce,
51+
sendEOFToServer *atomic.Bool,
52+
directionLabel string,
53+
logger log.Logger,
54+
) {
55+
defer func() {
56+
logger.Debug("Shutdown sourceStreamClient.Recv loop.")
57+
shutdownChan.Shutdown()
58+
wg.Done()
59+
}()
60+
61+
dataChan := startListener(sourceStreamClient, shutdownChan)
62+
63+
for {
64+
var resp *adminservice.StreamWorkflowReplicationMessagesResponse
65+
var err error
66+
select {
67+
case <-shutdownChan.Channel():
68+
return
69+
case dataWithError := <-dataChan:
70+
resp = dataWithError.val
71+
err = dataWithError.err
72+
}
73+
if err == io.EOF {
74+
logger.Debug("sourceStreamClient.Recv encountered EOF", tag.Error(err))
75+
metrics.AdminServiceStreamTerminatedCount.WithLabelValues(directionLabel, "source").Inc()
76+
sendEOFToServer.Store(true)
77+
return
78+
}
79+
80+
if err != nil {
81+
logger.Error("sourceStreamClient.Recv encountered error", tag.Error(err))
82+
sendEOFToServer.Store(true)
83+
return
84+
}
85+
switch attr := resp.GetAttributes().(type) {
86+
case *adminservice.StreamWorkflowReplicationMessagesResponse_Messages:
87+
logger.Debug(fmt.Sprintf("forwarding ReplicationMessages: exclusive %v", attr.Messages.ExclusiveHighWatermark))
88+
if err = targetStreamServer.Send(resp); err != nil {
89+
if err != io.EOF {
90+
logger.Error("targetStreamServer.Send encountered error", tag.Error(err))
91+
} else {
92+
logger.Debug("targetStreamServer.Send encountered EOF", tag.Error(err))
93+
metrics.AdminServiceStreamTerminatedCount.WithLabelValues(directionLabel, "target").Inc()
94+
}
95+
sendEOFToServer.Store(true)
96+
return
97+
}
98+
metrics.AdminServiceStreamReqCount.WithLabelValues(directionLabel).Inc()
99+
default:
100+
logger.Error("sourceStreamClient.Recv encountered error", tag.Error(serviceerror.NewInternal(fmt.Sprintf(
101+
"StreamWorkflowReplicationMessages encountered unknown type: %T %v", attr, attr,
102+
))))
103+
sendEOFToServer.Store(true)
104+
return
105+
}
106+
}
107+
}
108+
109+
func transferTargetToSource(
110+
sourceStreamClient adminservice.AdminService_StreamWorkflowReplicationMessagesClient,
111+
targetStreamServer adminservice.AdminService_StreamWorkflowReplicationMessagesServer,
112+
wg *sync.WaitGroup,
113+
shutdownChan channel.ShutdownOnce,
114+
sendEOFToServer *atomic.Bool,
115+
directionLabel string,
116+
logger log.Logger,
117+
) {
118+
defer func() {
119+
logger.Debug("Shutdown targetStreamServer.Recv loop.")
120+
shutdownChan.Shutdown()
121+
var err error
122+
closeSent := make(chan struct{})
123+
go func() {
124+
err = sourceStreamClient.CloseSend()
125+
closeSent <- struct{}{}
126+
}()
127+
timeout := time.After(time.Second)
128+
select {
129+
case <-closeSent:
130+
break
131+
case <-timeout:
132+
err = fmt.Errorf("timed out waiting for source stream to close")
133+
}
134+
135+
if err != nil {
136+
logger.Error("Failed to close sourceStreamClient", tag.Error(err))
137+
}
138+
wg.Done()
139+
}()
140+
141+
dataChan := startListener(targetStreamServer, shutdownChan)
142+
143+
for {
144+
var req *adminservice.StreamWorkflowReplicationMessagesRequest
145+
var err error
146+
select {
147+
case <-shutdownChan.Channel():
148+
return
149+
case valueWithError := <-dataChan:
150+
req = valueWithError.val
151+
err = valueWithError.err
152+
}
153+
if err == io.EOF {
154+
logger.Debug("targetStreamServer.Recv encountered EOF", tag.Error(err))
155+
metrics.AdminServiceStreamTerminatedCount.WithLabelValues(directionLabel, "target").Inc()
156+
sendEOFToServer.Store(true)
157+
return
158+
}
159+
160+
if err != nil {
161+
logger.Error("targetStreamServer.Recv encountered error", tag.Error(err))
162+
return
163+
}
164+
165+
switch attr := req.GetAttributes().(type) {
166+
case *adminservice.StreamWorkflowReplicationMessagesRequest_SyncReplicationState:
167+
logger.Debug(fmt.Sprintf("forwarding SyncReplicationState: inclusive %v", attr.SyncReplicationState.InclusiveLowWatermark))
168+
if err = sourceStreamClient.Send(req); err != nil {
169+
if err != io.EOF {
170+
logger.Error("sourceStreamClient.Send encountered error", tag.Error(err))
171+
} else {
172+
logger.Debug("sourceStreamClient.Send encountered EOF", tag.Error(err))
173+
metrics.AdminServiceStreamTerminatedCount.WithLabelValues(directionLabel, "source").Inc()
174+
}
175+
sendEOFToServer.Store(true)
176+
return
177+
}
178+
metrics.AdminServiceStreamRespCount.WithLabelValues(directionLabel).Inc()
179+
default:
180+
logger.Error("targetStreamServer.Recv encountered error", tag.Error(serviceerror.NewInternal(fmt.Sprintf(
181+
"StreamWorkflowReplicationMessages encountered unknown type: %T %v", attr, attr,
182+
))))
183+
sendEOFToServer.Store(true)
184+
return
185+
}
186+
}
187+
}

0 commit comments

Comments
 (0)