Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 26 additions & 1 deletion internal/internal_search_attributes.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,31 @@ func serializeTypedSearchAttributes(searchAttributes map[SearchAttributeKey]inte
return &commonpb.SearchAttributes{IndexedFields: serializedAttr}, nil
}

// Start-like requests should not carry null or unset search attributes, even
// though workflow info may still contain them in the current run.
func sanitizeSearchAttributesForStart(attributes *commonpb.SearchAttributes) *commonpb.SearchAttributes {
if attributes == nil || len(attributes.IndexedFields) == 0 {
return attributes
}

filteredAttributes := make(map[string]*commonpb.Payload, len(attributes.IndexedFields))
filteredAny := false
for key, payload := range attributes.IndexedFields {
if payload == nil || string(payload.GetMetadata()[converter.MetadataEncoding]) == converter.MetadataEncodingNil {
filteredAny = true
continue
}
filteredAttributes[key] = payload
}
if !filteredAny {
return attributes
}
if len(filteredAttributes) == 0 {
return nil
}
return &commonpb.SearchAttributes{IndexedFields: filteredAttributes}
}

func serializeSearchAttributes(
untypedAttributes map[string]interface{},
typedAttributes SearchAttributes,
Expand All @@ -435,7 +460,7 @@ func serializeSearchAttributes(
return nil, err
}
}
return searchAttr, nil
return sanitizeSearchAttributesForStart(searchAttr), nil
}

func convertToTypedSearchAttributes(logger log.Logger, attributes map[string]*commonpb.Payload) SearchAttributes {
Expand Down
2 changes: 1 addition & 1 deletion internal/internal_task_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -1896,7 +1896,7 @@ func (wth *workflowTaskHandlerImpl) completeWorkflow(
WorkflowTaskTimeout: durationpb.New(contErr.WorkflowTaskTimeout),
Header: contErr.Header,
Memo: workflowContext.workflowInfo.Memo,
SearchAttributes: workflowContext.workflowInfo.SearchAttributes,
SearchAttributes: sanitizeSearchAttributesForStart(workflowContext.workflowInfo.SearchAttributes),
RetryPolicy: convertToPBRetryPolicy(retryPolicy),
InheritBuildId: useCompat,
InitialVersioningBehavior: continueAsNewVersioningBehaviorToProto(contErr.InitialVersioningBehavior),
Expand Down
70 changes: 68 additions & 2 deletions internal/internal_workflow_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1761,8 +1761,12 @@ func (s *workflowClientTestSuite) TestStartWorkflowWithMemoAndSearchAttr() {
memo := map[string]interface{}{
"testMemo": "memo value",
}
nilPayload, err := converter.GetDefaultDataConverter().ToPayload(nil)
s.NoError(err)
searchAttributes := map[string]interface{}{
"testAttr": "attr value",
"testAttr": "attr value",
"nilAttr": nil,
"nilPayloadAttr": nilPayload,
}
options := StartWorkflowOptions{
ID: workflowID,
Expand All @@ -1787,6 +1791,11 @@ func (s *workflowClientTestSuite) TestStartWorkflowWithMemoAndSearchAttr() {
err = converter.GetDefaultDataConverter().FromPayload(req.SearchAttributes.IndexedFields["testAttr"], &resultAttr)
s.NoError(err)
s.Equal("attr value", resultAttr)
s.Len(req.SearchAttributes.IndexedFields, 1)
_, ok := req.SearchAttributes.IndexedFields["nilAttr"]
s.False(ok)
_, ok = req.SearchAttributes.IndexedFields["nilPayloadAttr"]
s.False(ok)
})
_, _ = s.client.ExecuteWorkflow(context.Background(), options, wf)
}
Expand All @@ -1795,8 +1804,12 @@ func (s *workflowClientTestSuite) TestSignalWithStartWorkflowWithMemoAndSearchAt
memo := map[string]interface{}{
"testMemo": "memo value",
}
nilPayload, err := converter.GetDefaultDataConverter().ToPayload(nil)
s.NoError(err)
searchAttributes := map[string]interface{}{
"testAttr": "attr value",
"testAttr": "attr value",
"nilAttr": nil,
"nilPayloadAttr": nilPayload,
}
options := StartWorkflowOptions{
ID: "wid",
Expand All @@ -1821,6 +1834,11 @@ func (s *workflowClientTestSuite) TestSignalWithStartWorkflowWithMemoAndSearchAt
err = converter.GetDefaultDataConverter().FromPayload(req.SearchAttributes.IndexedFields["testAttr"], &resultAttr)
s.NoError(err)
s.Equal("attr value", resultAttr)
s.Len(req.SearchAttributes.IndexedFields, 1)
_, ok := req.SearchAttributes.IndexedFields["nilAttr"]
s.False(ok)
_, ok = req.SearchAttributes.IndexedFields["nilPayloadAttr"]
s.False(ok)
})
_, _ = s.client.SignalWithStartWorkflow(context.Background(), "wid", "signal", "value", options, wf)
}
Expand Down Expand Up @@ -2068,6 +2086,16 @@ func (s *workflowClientTestSuite) TestSerializeSearchAttributes() {
_ = converter.GetDefaultDataConverter().FromPayload(result3.IndexedFields["t1"], &resultString)
s.Equal("v1", resultString)

input1 = map[string]interface{}{
"nil-attr": nil,
}
resultNil, err := serializeUntypedSearchAttributes(input1)
s.NoError(err)
s.NotNil(resultNil)
s.Contains(resultNil.IndexedFields, "nil-attr")
s.NotNil(resultNil.IndexedFields["nil-attr"])
s.Nil(resultNil.IndexedFields["nil-attr"].GetData())

// *Payload type goes through.
p, err := converter.GetDefaultDataConverter().ToPayload("5eaf00d")
s.NoError(err)
Expand All @@ -2088,6 +2116,44 @@ func (s *workflowClientTestSuite) TestSerializeSearchAttributes() {
s.Error(err)
}

func (s *workflowClientTestSuite) TestSerializeSearchAttributesOmitsNilValuesOnStart() {
nilPayload, err := converter.GetDefaultDataConverter().ToPayload(nil)
s.NoError(err)
payload, err := converter.GetDefaultDataConverter().ToPayload("value")
s.NoError(err)
var nilBytes []byte
nilBytesPayload, err := converter.GetDefaultDataConverter().ToPayload(nilBytes)
s.NoError(err)

result, err := serializeSearchAttributes(map[string]interface{}{
"realAttr": payload,
"nilAttr": nil,
"nilPayloadAttr": nilPayload,
"nilBytesPayload": nilBytesPayload,
"typedNilBytesAttr": nilBytes,
}, SearchAttributes{})
s.NoError(err)
s.NotNil(result)
s.Len(result.IndexedFields, 3)
_, ok := result.IndexedFields["nilAttr"]
s.False(ok)
_, ok = result.IndexedFields["nilPayloadAttr"]
s.False(ok)
s.Equal(converter.MetadataEncodingBinary, string(result.IndexedFields["nilBytesPayload"].GetMetadata()[converter.MetadataEncoding]))
s.Nil(result.IndexedFields["nilBytesPayload"].GetData())
s.Equal(converter.MetadataEncodingBinary, string(result.IndexedFields["typedNilBytesAttr"].GetMetadata()[converter.MetadataEncoding]))
s.Nil(result.IndexedFields["typedNilBytesAttr"].GetData())

var resultString string
err = converter.GetDefaultDataConverter().FromPayload(result.IndexedFields["realAttr"], &resultString)
s.NoError(err)
s.Equal("value", resultString)

result, err = serializeSearchAttributes(map[string]interface{}{"nilAttr": nil}, SearchAttributes{})
s.NoError(err)
s.Nil(result)
}

func (s *workflowClientTestSuite) TestListWorkflow() {
request := &workflowservice.ListWorkflowExecutionsRequest{}
response := &workflowservice.ListWorkflowExecutionsResponse{}
Expand Down
2 changes: 1 addition & 1 deletion internal/workflow_testsuite.go
Original file line number Diff line number Diff line change
Expand Up @@ -1271,7 +1271,7 @@ func (e *TestWorkflowEnvironment) SetSearchAttributesOnStart(searchAttributes ma
if err != nil {
return err
}
e.impl.workflowInfo.SearchAttributes = attr
e.impl.workflowInfo.SearchAttributes = sanitizeSearchAttributesForStart(attr)
return nil
}

Expand Down
19 changes: 19 additions & 0 deletions test/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -670,6 +670,25 @@ func (ts *IntegrationTestSuite) TestContinueAsNewCarryOver() {
ts.Equal("memoVal,searchAttr,123", result)
}

func (ts *IntegrationTestSuite) TestContinueAsNewOmitsUnsetSearchAttributes() {
var result string
stringKey := temporal.NewSearchAttributeKeyString("CustomStringField")
keywordKey := temporal.NewSearchAttributeKeyKeyword("CustomKeywordField")
startOptions := ts.startWorkflowOptions("test-continueasnew-omit-unset-search-attributes")
startOptions.TypedSearchAttributes = temporal.NewSearchAttributes(
stringKey.ValueSet("carry-over"),
keywordKey.ValueSet("drop-me"),
)
err := ts.executeWorkflowWithOption(
startOptions,
ts.workflows.ContinueAsNewAfterUnsetSearchAttribute,
&result,
false,
)
ts.NoError(err)
ts.Equal("carry-over", result)
}

func (ts *IntegrationTestSuite) TestContinueAsNewWithRetryPolicy() {
const (
initialMaximumAttempts = 3
Expand Down
54 changes: 54 additions & 0 deletions test/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -545,6 +545,59 @@ func (w *Workflows) ContinueAsNewWithOptions(ctx workflow.Context, count int, ta
return "", workflow.NewContinueAsNewError(ctx, w.ContinueAsNewWithOptions, count-1, taskQueue)
}

func (w *Workflows) ContinueAsNewAfterUnsetSearchAttribute(ctx workflow.Context, continued bool) (string, error) {
info := workflow.GetInfo(ctx)
if info.SearchAttributes == nil {
return "", errors.New("search attributes are not present")
}

fields := info.SearchAttributes.GetIndexedFields()
stringPayload, ok := fields["CustomStringField"]
if !ok {
return "", errors.New("search attribute CustomStringField not present")
}

var stringValue string
err := converter.GetDefaultDataConverter().FromPayload(stringPayload, &stringValue)
if err != nil {
return "", errors.New("error when get CustomStringField value")
}
if stringValue != "carry-over" {
return "", fmt.Errorf("unexpected CustomStringField value: %s", stringValue)
}

keywordKey := temporal.NewSearchAttributeKeyKeyword("CustomKeywordField")
if !continued {
keywordPayload, ok := fields["CustomKeywordField"]
if !ok {
return "", errors.New("search attribute CustomKeywordField not present before unset")
}

var keywordValue string
err = converter.GetDefaultDataConverter().FromPayload(keywordPayload, &keywordValue)
if err != nil {
return "", errors.New("error when get CustomKeywordField value")
}
if keywordValue != "drop-me" {
return "", fmt.Errorf("unexpected CustomKeywordField value: %s", keywordValue)
}

err = workflow.UpsertTypedSearchAttributes(ctx, keywordKey.ValueUnset())
if err != nil {
return "", err
}
return "", workflow.NewContinueAsNewError(ctx, w.ContinueAsNewAfterUnsetSearchAttribute, true)
}

if _, ok := fields["CustomKeywordField"]; ok {
return "", errors.New("unset search attribute carried over")
}
if keywordValue, ok := workflow.GetTypedSearchAttributes(ctx).GetKeyword(keywordKey); ok || keywordValue != "" {
return "", errors.New("unset search attribute unexpectedly present in typed search attributes")
}
return stringValue, nil
}

func (w *Workflows) ContinueAsNewWithRetryPolicy(
ctx workflow.Context,
initialMaximumAttempts int,
Expand Down Expand Up @@ -3697,6 +3750,7 @@ func (w *Workflows) register(worker worker.Worker) {
worker.RegisterWorkflow(w.ContinueAsNew)
worker.RegisterWorkflow(w.UpsertSearchAttributesConditional)
worker.RegisterWorkflow(w.UpsertMemoConditional)
worker.RegisterWorkflow(w.ContinueAsNewAfterUnsetSearchAttribute)
worker.RegisterWorkflow(w.ContinueAsNewWithOptions)
worker.RegisterWorkflow(w.ContinueAsNewWithRetryPolicy)
worker.RegisterWorkflow(w.ContinueAsNewWithChildWF)
Expand Down
Loading