Skip to content

Commit 75c1911

Browse files
hehaifengcnpglass
andauthored
Merge from scaleai/patch branch (#105)
* Fix nil pointer during SearchAttribute translation (#102) * Continue clientLoop even client failed to dial with retry and add more logs (#103) * Add more logging and enable debug * disable debug log * dial fail log * Continue clientLoop even client failed to dial with retry --------- Co-authored-by: Paul Glass <pnglass@gmail.com>
1 parent 728518d commit 75c1911

4 files changed

Lines changed: 135 additions & 14 deletions

File tree

interceptor/reflection.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,10 @@ func visitNamespace(obj any, match stringMatcher) (bool, error) {
5757

5858
// The visitor function can return Skip, Stop, or Continue to control recursion.
5959
err := visit.Values(obj, func(vwp visit.ValueWithParent) (visit.Action, error) {
60+
if vwp.Kind() == reflect.Ptr && vwp.IsNil() {
61+
return visit.Skip, nil
62+
}
63+
6064
// Grab name of this struct field from the parent.
6165
fieldType, action := getParentFieldType(vwp)
6266
if action != "" {
@@ -109,12 +113,15 @@ func visitSearchAttributes(obj any, match stringMatcher) (bool, error) {
109113

110114
// The visitor function can return Skip, Stop, or Continue to control recursion.
111115
err := visit.Values(obj, func(vwp visit.ValueWithParent) (visit.Action, error) {
116+
if vwp.Kind() == reflect.Ptr && vwp.IsNil() {
117+
return visit.Skip, nil
118+
}
119+
112120
// Grab name of this struct field from the parent.
113121
fieldType, action := getParentFieldType(vwp)
114122
if action != "" {
115123
return action, nil
116124
}
117-
118125
if dataBlobFieldNames[fieldType.Name] {
119126
changed, err := visitDataBlobs(vwp, match, visitSearchAttributes)
120127
matched = matched || changed

interceptor/search_attribute_translator_test.go

Lines changed: 114 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,41 @@ import (
77
"go.temporal.io/api/common/v1"
88
"go.temporal.io/api/enums/v1"
99
"go.temporal.io/api/history/v1"
10-
"go.temporal.io/api/sdk/v1"
1110
"go.temporal.io/server/api/adminservice/v1"
1211
"go.temporal.io/server/api/persistence/v1"
1312
replicationspb "go.temporal.io/server/api/replication/v1"
1413
"go.temporal.io/server/common/persistence/serialization"
15-
"google.golang.org/protobuf/types/known/timestamppb"
14+
)
15+
16+
type (
17+
Other struct{}
18+
SAWithTwoFields struct {
19+
Other *Other
20+
SearchAttributes *common.SearchAttributes
21+
}
22+
23+
SAWithTwoFieldsSwapped struct {
24+
SearchAttributes *common.SearchAttributes
25+
Other *Other
26+
}
27+
28+
SAWithOneField struct {
29+
SearchAttributes *common.SearchAttributes
30+
}
31+
32+
SAWithOneMap struct {
33+
SearchAttributes map[string]*common.Payload
34+
}
35+
36+
SAWithOneMapAndOneField struct {
37+
Other *Other
38+
SearchAttributes map[string]*common.Payload
39+
}
40+
41+
SAWithOneMapAndOneFieldSwapped struct {
42+
SearchAttributes map[string]*common.Payload
43+
Other *Other
44+
}
1645
)
1746

1847
func TestTranslateSearchAttribute(t *testing.T) {
@@ -21,6 +50,58 @@ func TestTranslateSearchAttribute(t *testing.T) {
2150

2251
func generateSearchAttributeObjs() []objCase {
2352
return []objCase{
53+
{
54+
objName: "nil",
55+
containsObj: false,
56+
makeType: func(name string) any {
57+
return nil
58+
},
59+
},
60+
{
61+
objName: "nil SearchAttributes",
62+
containsObj: false,
63+
makeType: func(name string) any {
64+
return &persistence.WorkflowExecutionInfo{
65+
NamespaceId: name,
66+
SearchAttributes: map[string]*common.Payload(nil),
67+
}
68+
},
69+
},
70+
{
71+
objName: "nil two fields",
72+
makeType: func(name string) any { return &SAWithTwoFields{} },
73+
},
74+
{
75+
objName: "nil two fields different order",
76+
makeType: func(name string) any { return &SAWithTwoFieldsSwapped{} },
77+
},
78+
{
79+
objName: "nil one field",
80+
makeType: func(name string) any { return &SAWithOneField{} },
81+
},
82+
{
83+
objName: "nil map",
84+
makeType: func(name string) any { return &SAWithOneMap{} },
85+
},
86+
{
87+
objName: "nil map and field",
88+
makeType: func(name string) any { return &SAWithOneMapAndOneField{} },
89+
},
90+
{
91+
objName: "nil map and field different order",
92+
makeType: func(name string) any { return &SAWithOneMapAndOneFieldSwapped{} },
93+
},
94+
{
95+
objName: "nil value in SearchAttributes",
96+
containsObj: true,
97+
makeType: func(name string) any {
98+
return &persistence.WorkflowExecutionInfo{
99+
SearchAttributes: map[string]*common.Payload{
100+
name: nil,
101+
},
102+
}
103+
},
104+
},
24105
{
25106
objName: "HistoryTaskAttributes",
26107
containsObj: true,
@@ -97,18 +178,42 @@ func makeHistoryEventsBlobWithSearchAttribute(name string) *common.DataBlob {
97178
TaskId: 100,
98179
Attributes: &history.HistoryEvent_WorkflowExecutionStartedEventAttributes{
99180
WorkflowExecutionStartedEventAttributes: &history.WorkflowExecutionStartedEventAttributes{
100-
WorkflowType: &common.WorkflowType{
101-
Name: "some-wf-type",
102-
},
181+
WorkflowType: &common.WorkflowType{Name: "some-wf-type-1"},
103182
SearchAttributes: &common.SearchAttributes{
104183
IndexedFields: makeTestIndexedFieldMap(name),
105184
},
106185
},
107186
},
108-
EventTime: &timestamppb.Timestamp{},
109-
WorkerMayIgnore: false,
110-
UserMetadata: &sdk.UserMetadata{},
111-
Links: []*common.Link{},
187+
},
188+
{
189+
Attributes: &history.HistoryEvent_WorkflowExecutionStartedEventAttributes{
190+
WorkflowExecutionStartedEventAttributes: &history.WorkflowExecutionStartedEventAttributes{
191+
WorkflowType: &common.WorkflowType{Name: "some-wf-type-2"},
192+
SearchAttributes: nil,
193+
},
194+
},
195+
},
196+
{
197+
Attributes: &history.HistoryEvent_WorkflowExecutionStartedEventAttributes{
198+
WorkflowExecutionStartedEventAttributes: &history.WorkflowExecutionStartedEventAttributes{
199+
WorkflowType: &common.WorkflowType{Name: "some-wf-type-2"},
200+
SearchAttributes: &common.SearchAttributes{
201+
IndexedFields: nil,
202+
},
203+
},
204+
},
205+
},
206+
{
207+
Attributes: &history.HistoryEvent_WorkflowExecutionStartedEventAttributes{
208+
WorkflowExecutionStartedEventAttributes: &history.WorkflowExecutionStartedEventAttributes{
209+
WorkflowType: &common.WorkflowType{Name: "some-wf-type-3"},
210+
SearchAttributes: &common.SearchAttributes{
211+
IndexedFields: map[string]*common.Payload{
212+
name: nil,
213+
},
214+
},
215+
},
216+
},
112217
},
113218
}
114219

proxy/adminservice.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -284,6 +284,7 @@ func (s *adminServiceProxyServer) StreamWorkflowReplicationMessages(
284284
for !shutdownChan.IsShutdown() {
285285
req, err := targetStreamServer.Recv()
286286
if err == io.EOF {
287+
logger.Info("targetStreamServer.Recv encountered EOF", tag.Error(err))
287288
return
288289
}
289290

@@ -296,7 +297,11 @@ func (s *adminServiceProxyServer) StreamWorkflowReplicationMessages(
296297
case *adminservice.StreamWorkflowReplicationMessagesRequest_SyncReplicationState:
297298
logger.Debug(fmt.Sprintf("forwarding SyncReplicationState: inclusive %v", attr.SyncReplicationState.InclusiveLowWatermark))
298299
if err = sourceStreamClient.Send(req); err != nil {
299-
logger.Error("sourceStreamClient.Send encountered error", tag.Error(err))
300+
if err != io.EOF {
301+
logger.Error("sourceStreamClient.Send encountered error", tag.Error(err))
302+
} else {
303+
logger.Info("sourceStreamClient.Send encountered EOF", tag.Error(err))
304+
}
300305
return
301306
}
302307
default:
@@ -327,6 +332,7 @@ func (s *adminServiceProxyServer) StreamWorkflowReplicationMessages(
327332
for !shutdownChan.IsShutdown() {
328333
resp, err := sourceStreamClient.Recv()
329334
if err == io.EOF {
335+
logger.Info("sourceStreamClient.Recv encountered EOF", tag.Error(err))
330336
return
331337
}
332338

@@ -340,7 +346,8 @@ func (s *adminServiceProxyServer) StreamWorkflowReplicationMessages(
340346
if err = targetStreamServer.Send(resp); err != nil {
341347
if err != io.EOF {
342348
logger.Error("targetStreamServer.Send encountered error", tag.Error(err))
343-
349+
} else {
350+
logger.Info("targetStreamServer.Send encountered EOF", tag.Error(err))
344351
}
345352
return
346353
}

transport/mux_connection_manager.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,7 @@ func (m *muxConnectMananger) clientLoop(setting config.TCPClientSetting) error {
189189
m.wg.Add(1)
190190
go func() {
191191
defer m.wg.Done()
192+
connect:
192193
for {
193194
select {
194195
case <-m.shutdownCh:
@@ -204,10 +205,11 @@ func (m *muxConnectMananger) clientLoop(setting config.TCPClientSetting) error {
204205
}
205206

206207
if err := backoff.ThrottleRetry(op, retryPolicy, func(err error) bool {
208+
m.logger.Error("mux client failed to dial", tag.Error(err))
207209
return !m.isShuttingDown()
208210
}); err != nil {
209-
m.logger.Error("mux client failed to dial", tag.Error(err))
210-
return
211+
m.logger.Error("mux client failed to dial with retry", tag.Error(err))
212+
continue connect
211213
}
212214

213215
var conn net.Conn

0 commit comments

Comments
 (0)