Skip to content

Commit 19a3f21

Browse files
committed
Differentiate high and low-velocity loggers. Add error logging for unary APIs
1 parent 13fa665 commit 19a3f21

1 file changed

Lines changed: 45 additions & 20 deletions

File tree

proxy/adminservice.go

Lines changed: 45 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,8 @@ type (
2424
adminServiceProxyServer struct {
2525
adminservice.UnimplementedAdminServiceServer
2626
adminClient adminservice.AdminServiceClient
27-
logger log.Logger
27+
verboseLogger log.Logger
28+
replicationLogger log.Logger
2829
outboundAddressOverride string
2930
apiOverrides *config.APIOverridesConfig
3031
metricLabelValues []string
@@ -43,13 +44,17 @@ func NewAdminServiceProxyServer(
4344
reportStreamValue func(idx int32, value int32),
4445
logger log.Logger,
4546
) adminservice.AdminServiceServer {
46-
// The AdminServiceStreams will duplicate the same output for an underlying connection issue hundreds of times.
47-
// Limit their output to three times per minute
48-
logger = log.NewThrottledLogger(log.With(logger, common.ServiceTag(serviceName)),
47+
// Replication streams / APIs will run many hundreds of times per second. Throttle their output
48+
// to 3 / min
49+
replicationLogger := log.NewThrottledLogger(log.With(logger, common.ServiceTag(serviceName)),
4950
func() float64 { return 3.0 / 60.0 })
51+
// For config operations, allow most logs so that we can see all the info without putting disk at risk
52+
verboseLogger := log.NewThrottledLogger(log.With(logger, common.ServiceTag(serviceName)),
53+
func() float64 { return 3.0 })
5054
return &adminServiceProxyServer{
5155
adminClient: adminClient,
52-
logger: logger,
56+
verboseLogger: verboseLogger,
57+
replicationLogger: replicationLogger,
5358
outboundAddressOverride: outboundAddressOverride,
5459
apiOverrides: apiOverrides,
5560
metricLabelValues: metricLabelValues,
@@ -58,14 +63,14 @@ func NewAdminServiceProxyServer(
5863
}
5964

6065
func (s *adminServiceProxyServer) AddOrUpdateRemoteCluster(ctx context.Context, in0 *adminservice.AddOrUpdateRemoteClusterRequest) (*adminservice.AddOrUpdateRemoteClusterResponse, error) {
61-
s.logger.Info("Received AddOrUpdateRemoteCluster", tag.Address(in0.FrontendAddress), tag.NewBoolTag("Enabled", in0.GetEnableRemoteClusterConnection()), tag.NewStringsTag("configTags", s.metricLabelValues))
66+
s.verboseLogger.Info("Received AddOrUpdateRemoteCluster", tag.Address(in0.FrontendAddress), tag.NewBoolTag("Enabled", in0.GetEnableRemoteClusterConnection()), tag.NewStringsTag("configTags", s.metricLabelValues))
6267
if !common.IsRequestTranslationDisabled(ctx) {
6368
if len(s.outboundAddressOverride) > 0 {
6469
// Override this address so that cross-cluster connections flow through the proxy.
6570
// Use a separate "external address" config option because the outbound.listenerAddress may not be routable
6671
// from the local temporal server, or the proxy may be deployed behind a load balancer.
6772
in0.FrontendAddress = s.outboundAddressOverride
68-
s.logger.Info("Overwrote outbound address", tag.Address(in0.FrontendAddress), tag.NewStringsTag("configTags", s.metricLabelValues))
73+
s.verboseLogger.Info("Overwrote outbound address", tag.Address(in0.FrontendAddress), tag.NewStringsTag("configTags", s.metricLabelValues))
6974
}
7075
}
7176
return s.adminClient.AddOrUpdateRemoteCluster(ctx, in0)
@@ -92,13 +97,13 @@ func (s *adminServiceProxyServer) DeleteWorkflowExecution(ctx context.Context, i
9297
}
9398

9499
func (s *adminServiceProxyServer) DescribeCluster(ctx context.Context, in0 *adminservice.DescribeClusterRequest) (*adminservice.DescribeClusterResponse, error) {
95-
s.logger.Info("Received DescribeClusterRequest")
100+
s.verboseLogger.Info("Received DescribeClusterRequest")
96101
resp, err := s.adminClient.DescribeCluster(ctx, in0)
97102
if common.IsRequestTranslationDisabled(ctx) {
98103
return resp, err
99104
}
100105
if resp != nil {
101-
s.logger.Info("Raw DescribeClusterResponse", tag.NewStringTag("clusterID", resp.ClusterId),
106+
s.verboseLogger.Info("Raw DescribeClusterResponse", tag.NewStringTag("clusterID", resp.ClusterId),
102107
tag.NewStringTag("clusterName", resp.ClusterName), tag.NewStringTag("version", resp.ServerVersion),
103108
tag.NewInt64("failoverVersionIncrement", resp.FailoverVersionIncrement), tag.NewInt64("initialFailoverVersion", resp.InitialFailoverVersion),
104109
tag.NewBoolTag("isGlobalNamespaceEnabled", resp.IsGlobalNamespaceEnabled), tag.NewStringsTag("configTags", s.metricLabelValues))
@@ -107,19 +112,19 @@ func (s *adminServiceProxyServer) DescribeCluster(ctx context.Context, in0 *admi
107112
responseOverride := s.apiOverrides.AdminSerivce.DescribeCluster.Response
108113
if resp != nil && responseOverride.FailoverVersionIncrement != nil {
109114
resp.FailoverVersionIncrement = *responseOverride.FailoverVersionIncrement
110-
s.logger.Info("Overwrite FailoverVersionIncrement", tag.NewInt64("failoverVersionIncrement", resp.FailoverVersionIncrement),
115+
s.verboseLogger.Info("Overwrite FailoverVersionIncrement", tag.NewInt64("failoverVersionIncrement", resp.FailoverVersionIncrement),
111116
tag.NewStringsTag("configTags", s.metricLabelValues))
112117
}
113118
}
114119

115120
if resp != nil {
116-
s.logger.Info("Translated DescribeClusterResponse", tag.NewStringTag("clusterID", resp.ClusterId),
121+
s.verboseLogger.Info("Translated DescribeClusterResponse", tag.NewStringTag("clusterID", resp.ClusterId),
117122
tag.NewStringTag("clusterName", resp.ClusterName), tag.NewStringTag("version", resp.ServerVersion),
118123
tag.NewInt64("failoverVersionIncrement", resp.FailoverVersionIncrement), tag.NewInt64("initialFailoverVersion", resp.InitialFailoverVersion),
119124
tag.NewBoolTag("isGlobalNamespaceEnabled", resp.IsGlobalNamespaceEnabled), tag.NewStringsTag("configTags", s.metricLabelValues))
120125
}
121126
if err != nil {
122-
s.logger.Info("Error from remote server!", tag.Error(err), tag.NewStringsTag("configTags", s.metricLabelValues))
127+
s.verboseLogger.Info("Error from remote server!", tag.Error(err), tag.NewStringsTag("configTags", s.metricLabelValues))
123128
}
124129
return resp, err
125130
}
@@ -132,8 +137,17 @@ func (s *adminServiceProxyServer) DescribeHistoryHost(ctx context.Context, in0 *
132137
return s.adminClient.DescribeHistoryHost(ctx, in0)
133138
}
134139

135-
func (s *adminServiceProxyServer) DescribeMutableState(ctx context.Context, in0 *adminservice.DescribeMutableStateRequest) (*adminservice.DescribeMutableStateResponse, error) {
136-
return s.adminClient.DescribeMutableState(ctx, in0)
140+
func (s *adminServiceProxyServer) DescribeMutableState(ctx context.Context, in0 *adminservice.DescribeMutableStateRequest) (resp *adminservice.DescribeMutableStateResponse, err error) {
141+
resp, err = s.adminClient.DescribeMutableState(ctx, in0)
142+
if err != nil {
143+
// This is a duplicate of the grpc client metrics, but not everyone has metrics set up
144+
s.replicationLogger.Error("Failed to describe workflow",
145+
tag.NewStringTag("WorkflowId", in0.GetExecution().GetWorkflowId()),
146+
tag.NewStringTag("RunId", in0.GetExecution().GetRunId()),
147+
tag.NewStringTag("Namespace", in0.GetNamespace()),
148+
tag.Error(err), tag.Operation("DescribeMutableState"))
149+
}
150+
return
137151
}
138152

139153
func (s *adminServiceProxyServer) GetDLQMessages(ctx context.Context, in0 *adminservice.GetDLQMessagesRequest) (*adminservice.GetDLQMessagesResponse, error) {
@@ -152,12 +166,23 @@ func (s *adminServiceProxyServer) GetNamespace(ctx context.Context, in0 *adminse
152166
return s.adminClient.GetNamespace(ctx, in0)
153167
}
154168

155-
func (s *adminServiceProxyServer) GetNamespaceReplicationMessages(ctx context.Context, in0 *adminservice.GetNamespaceReplicationMessagesRequest) (*adminservice.GetNamespaceReplicationMessagesResponse, error) {
156-
return s.adminClient.GetNamespaceReplicationMessages(ctx, in0)
169+
func (s *adminServiceProxyServer) GetNamespaceReplicationMessages(ctx context.Context, in0 *adminservice.GetNamespaceReplicationMessagesRequest) (resp *adminservice.GetNamespaceReplicationMessagesResponse, err error) {
170+
resp, err = s.adminClient.GetNamespaceReplicationMessages(ctx, in0)
171+
if err != nil {
172+
// This is a duplicate of the grpc client metrics, but not everyone has metrics set up
173+
s.replicationLogger.Error("Failed to get namespace replication messages", tag.NewStringTag("Cluster", in0.GetClusterName()),
174+
tag.Error(err), tag.Operation("GetNamespaceReplicationMessages"))
175+
}
176+
return
157177
}
158178

159-
func (s *adminServiceProxyServer) GetReplicationMessages(ctx context.Context, in0 *adminservice.GetReplicationMessagesRequest) (*adminservice.GetReplicationMessagesResponse, error) {
160-
return s.adminClient.GetReplicationMessages(ctx, in0)
179+
func (s *adminServiceProxyServer) GetReplicationMessages(ctx context.Context, in0 *adminservice.GetReplicationMessagesRequest) (resp *adminservice.GetReplicationMessagesResponse, err error) {
180+
resp, err = s.adminClient.GetReplicationMessages(ctx, in0)
181+
if err != nil {
182+
s.replicationLogger.Error("Failed to get replication messages", tag.NewStringTag("Cluster", in0.GetClusterName()),
183+
tag.Error(err), tag.Operation("GetReplicationMessages"))
184+
}
185+
return
161186
}
162187

163188
func (s *adminServiceProxyServer) GetSearchAttributes(ctx context.Context, in0 *adminservice.GetSearchAttributesRequest) (*adminservice.GetSearchAttributesResponse, error) {
@@ -255,7 +280,7 @@ func ClusterShardIDtoString(sd history.ClusterShardID) string {
255280
func (s *adminServiceProxyServer) StreamWorkflowReplicationMessages(
256281
targetStreamServer adminservice.AdminService_StreamWorkflowReplicationMessagesServer,
257282
) (retError error) {
258-
defer log.CapturePanic(s.logger, &retError)
283+
defer log.CapturePanic(s.replicationLogger, &retError)
259284

260285
targetMetadata, ok := metadata.FromIncomingContext(targetStreamServer.Context())
261286
if !ok {
@@ -268,7 +293,7 @@ func (s *adminServiceProxyServer) StreamWorkflowReplicationMessages(
268293
return err
269294
}
270295

271-
logger := log.With(s.logger,
296+
logger := log.With(s.replicationLogger,
272297
tag.NewStringTag("source", ClusterShardIDtoString(sourceClusterShardID)),
273298
tag.NewStringTag("target", ClusterShardIDtoString(targetClusterShardID)))
274299

0 commit comments

Comments
 (0)