diff --git a/internal/internal_search_attributes.go b/internal/internal_search_attributes.go index 00c0d009f..2e7278786 100644 --- a/internal/internal_search_attributes.go +++ b/internal/internal_search_attributes.go @@ -416,6 +416,31 @@ func serializeTypedSearchAttributes(searchAttributes map[SearchAttributeKey]inte return &commonpb.SearchAttributes{IndexedFields: serializedAttr}, nil } +// Start-like requests should not carry null or unset search attributes, even +// though workflow info may still contain them in the current run. +func sanitizeSearchAttributesForStart(attributes *commonpb.SearchAttributes) *commonpb.SearchAttributes { + if attributes == nil || len(attributes.IndexedFields) == 0 { + return attributes + } + + filteredAttributes := make(map[string]*commonpb.Payload, len(attributes.IndexedFields)) + filteredAny := false + for key, payload := range attributes.IndexedFields { + if payload == nil || string(payload.GetMetadata()[converter.MetadataEncoding]) == converter.MetadataEncodingNil { + filteredAny = true + continue + } + filteredAttributes[key] = payload + } + if !filteredAny { + return attributes + } + if len(filteredAttributes) == 0 { + return nil + } + return &commonpb.SearchAttributes{IndexedFields: filteredAttributes} +} + func serializeSearchAttributes( untypedAttributes map[string]interface{}, typedAttributes SearchAttributes, @@ -435,7 +460,7 @@ func serializeSearchAttributes( return nil, err } } - return searchAttr, nil + return sanitizeSearchAttributesForStart(searchAttr), nil } func convertToTypedSearchAttributes(logger log.Logger, attributes map[string]*commonpb.Payload) SearchAttributes { diff --git a/internal/internal_task_handlers.go b/internal/internal_task_handlers.go index 72a74ed93..28514df4d 100644 --- a/internal/internal_task_handlers.go +++ b/internal/internal_task_handlers.go @@ -1896,7 +1896,7 @@ func (wth *workflowTaskHandlerImpl) completeWorkflow( WorkflowTaskTimeout: durationpb.New(contErr.WorkflowTaskTimeout), Header: contErr.Header, Memo: workflowContext.workflowInfo.Memo, - SearchAttributes: workflowContext.workflowInfo.SearchAttributes, + SearchAttributes: sanitizeSearchAttributesForStart(workflowContext.workflowInfo.SearchAttributes), RetryPolicy: convertToPBRetryPolicy(retryPolicy), InheritBuildId: useCompat, InitialVersioningBehavior: continueAsNewVersioningBehaviorToProto(contErr.InitialVersioningBehavior), diff --git a/internal/internal_workflow_client_test.go b/internal/internal_workflow_client_test.go index ad04a8964..45d895239 100644 --- a/internal/internal_workflow_client_test.go +++ b/internal/internal_workflow_client_test.go @@ -1761,8 +1761,12 @@ func (s *workflowClientTestSuite) TestStartWorkflowWithMemoAndSearchAttr() { memo := map[string]interface{}{ "testMemo": "memo value", } + nilPayload, err := converter.GetDefaultDataConverter().ToPayload(nil) + s.NoError(err) searchAttributes := map[string]interface{}{ - "testAttr": "attr value", + "testAttr": "attr value", + "nilAttr": nil, + "nilPayloadAttr": nilPayload, } options := StartWorkflowOptions{ ID: workflowID, @@ -1787,6 +1791,11 @@ func (s *workflowClientTestSuite) TestStartWorkflowWithMemoAndSearchAttr() { err = converter.GetDefaultDataConverter().FromPayload(req.SearchAttributes.IndexedFields["testAttr"], &resultAttr) s.NoError(err) s.Equal("attr value", resultAttr) + s.Len(req.SearchAttributes.IndexedFields, 1) + _, ok := req.SearchAttributes.IndexedFields["nilAttr"] + s.False(ok) + _, ok = req.SearchAttributes.IndexedFields["nilPayloadAttr"] + s.False(ok) }) _, _ = s.client.ExecuteWorkflow(context.Background(), options, wf) } @@ -1795,8 +1804,12 @@ func (s *workflowClientTestSuite) TestSignalWithStartWorkflowWithMemoAndSearchAt memo := map[string]interface{}{ "testMemo": "memo value", } + nilPayload, err := converter.GetDefaultDataConverter().ToPayload(nil) + s.NoError(err) searchAttributes := map[string]interface{}{ - "testAttr": "attr value", + "testAttr": "attr value", + "nilAttr": nil, + "nilPayloadAttr": nilPayload, } options := StartWorkflowOptions{ ID: "wid", @@ -1821,6 +1834,11 @@ func (s *workflowClientTestSuite) TestSignalWithStartWorkflowWithMemoAndSearchAt err = converter.GetDefaultDataConverter().FromPayload(req.SearchAttributes.IndexedFields["testAttr"], &resultAttr) s.NoError(err) s.Equal("attr value", resultAttr) + s.Len(req.SearchAttributes.IndexedFields, 1) + _, ok := req.SearchAttributes.IndexedFields["nilAttr"] + s.False(ok) + _, ok = req.SearchAttributes.IndexedFields["nilPayloadAttr"] + s.False(ok) }) _, _ = s.client.SignalWithStartWorkflow(context.Background(), "wid", "signal", "value", options, wf) } @@ -2068,6 +2086,16 @@ func (s *workflowClientTestSuite) TestSerializeSearchAttributes() { _ = converter.GetDefaultDataConverter().FromPayload(result3.IndexedFields["t1"], &resultString) s.Equal("v1", resultString) + input1 = map[string]interface{}{ + "nil-attr": nil, + } + resultNil, err := serializeUntypedSearchAttributes(input1) + s.NoError(err) + s.NotNil(resultNil) + s.Contains(resultNil.IndexedFields, "nil-attr") + s.NotNil(resultNil.IndexedFields["nil-attr"]) + s.Nil(resultNil.IndexedFields["nil-attr"].GetData()) + // *Payload type goes through. p, err := converter.GetDefaultDataConverter().ToPayload("5eaf00d") s.NoError(err) @@ -2088,6 +2116,44 @@ func (s *workflowClientTestSuite) TestSerializeSearchAttributes() { s.Error(err) } +func (s *workflowClientTestSuite) TestSerializeSearchAttributesOmitsNilValuesOnStart() { + nilPayload, err := converter.GetDefaultDataConverter().ToPayload(nil) + s.NoError(err) + payload, err := converter.GetDefaultDataConverter().ToPayload("value") + s.NoError(err) + var nilBytes []byte + nilBytesPayload, err := converter.GetDefaultDataConverter().ToPayload(nilBytes) + s.NoError(err) + + result, err := serializeSearchAttributes(map[string]interface{}{ + "realAttr": payload, + "nilAttr": nil, + "nilPayloadAttr": nilPayload, + "nilBytesPayload": nilBytesPayload, + "typedNilBytesAttr": nilBytes, + }, SearchAttributes{}) + s.NoError(err) + s.NotNil(result) + s.Len(result.IndexedFields, 3) + _, ok := result.IndexedFields["nilAttr"] + s.False(ok) + _, ok = result.IndexedFields["nilPayloadAttr"] + s.False(ok) + s.Equal(converter.MetadataEncodingBinary, string(result.IndexedFields["nilBytesPayload"].GetMetadata()[converter.MetadataEncoding])) + s.Nil(result.IndexedFields["nilBytesPayload"].GetData()) + s.Equal(converter.MetadataEncodingBinary, string(result.IndexedFields["typedNilBytesAttr"].GetMetadata()[converter.MetadataEncoding])) + s.Nil(result.IndexedFields["typedNilBytesAttr"].GetData()) + + var resultString string + err = converter.GetDefaultDataConverter().FromPayload(result.IndexedFields["realAttr"], &resultString) + s.NoError(err) + s.Equal("value", resultString) + + result, err = serializeSearchAttributes(map[string]interface{}{"nilAttr": nil}, SearchAttributes{}) + s.NoError(err) + s.Nil(result) +} + func (s *workflowClientTestSuite) TestListWorkflow() { request := &workflowservice.ListWorkflowExecutionsRequest{} response := &workflowservice.ListWorkflowExecutionsResponse{} diff --git a/internal/workflow_testsuite.go b/internal/workflow_testsuite.go index 3d15a2c59..eec0c73d4 100644 --- a/internal/workflow_testsuite.go +++ b/internal/workflow_testsuite.go @@ -1271,7 +1271,7 @@ func (e *TestWorkflowEnvironment) SetSearchAttributesOnStart(searchAttributes ma if err != nil { return err } - e.impl.workflowInfo.SearchAttributes = attr + e.impl.workflowInfo.SearchAttributes = sanitizeSearchAttributesForStart(attr) return nil } diff --git a/test/integration_test.go b/test/integration_test.go index 07af77117..198c40890 100644 --- a/test/integration_test.go +++ b/test/integration_test.go @@ -670,6 +670,25 @@ func (ts *IntegrationTestSuite) TestContinueAsNewCarryOver() { ts.Equal("memoVal,searchAttr,123", result) } +func (ts *IntegrationTestSuite) TestContinueAsNewOmitsUnsetSearchAttributes() { + var result string + stringKey := temporal.NewSearchAttributeKeyString("CustomStringField") + keywordKey := temporal.NewSearchAttributeKeyKeyword("CustomKeywordField") + startOptions := ts.startWorkflowOptions("test-continueasnew-omit-unset-search-attributes") + startOptions.TypedSearchAttributes = temporal.NewSearchAttributes( + stringKey.ValueSet("carry-over"), + keywordKey.ValueSet("drop-me"), + ) + err := ts.executeWorkflowWithOption( + startOptions, + ts.workflows.ContinueAsNewAfterUnsetSearchAttribute, + &result, + false, + ) + ts.NoError(err) + ts.Equal("carry-over", result) +} + func (ts *IntegrationTestSuite) TestContinueAsNewWithRetryPolicy() { const ( initialMaximumAttempts = 3 diff --git a/test/workflow_test.go b/test/workflow_test.go index 2962bb7d7..ecbe3c820 100644 --- a/test/workflow_test.go +++ b/test/workflow_test.go @@ -545,6 +545,59 @@ func (w *Workflows) ContinueAsNewWithOptions(ctx workflow.Context, count int, ta return "", workflow.NewContinueAsNewError(ctx, w.ContinueAsNewWithOptions, count-1, taskQueue) } +func (w *Workflows) ContinueAsNewAfterUnsetSearchAttribute(ctx workflow.Context, continued bool) (string, error) { + info := workflow.GetInfo(ctx) + if info.SearchAttributes == nil { + return "", errors.New("search attributes are not present") + } + + fields := info.SearchAttributes.GetIndexedFields() + stringPayload, ok := fields["CustomStringField"] + if !ok { + return "", errors.New("search attribute CustomStringField not present") + } + + var stringValue string + err := converter.GetDefaultDataConverter().FromPayload(stringPayload, &stringValue) + if err != nil { + return "", errors.New("error when get CustomStringField value") + } + if stringValue != "carry-over" { + return "", fmt.Errorf("unexpected CustomStringField value: %s", stringValue) + } + + keywordKey := temporal.NewSearchAttributeKeyKeyword("CustomKeywordField") + if !continued { + keywordPayload, ok := fields["CustomKeywordField"] + if !ok { + return "", errors.New("search attribute CustomKeywordField not present before unset") + } + + var keywordValue string + err = converter.GetDefaultDataConverter().FromPayload(keywordPayload, &keywordValue) + if err != nil { + return "", errors.New("error when get CustomKeywordField value") + } + if keywordValue != "drop-me" { + return "", fmt.Errorf("unexpected CustomKeywordField value: %s", keywordValue) + } + + err = workflow.UpsertTypedSearchAttributes(ctx, keywordKey.ValueUnset()) + if err != nil { + return "", err + } + return "", workflow.NewContinueAsNewError(ctx, w.ContinueAsNewAfterUnsetSearchAttribute, true) + } + + if _, ok := fields["CustomKeywordField"]; ok { + return "", errors.New("unset search attribute carried over") + } + if keywordValue, ok := workflow.GetTypedSearchAttributes(ctx).GetKeyword(keywordKey); ok || keywordValue != "" { + return "", errors.New("unset search attribute unexpectedly present in typed search attributes") + } + return stringValue, nil +} + func (w *Workflows) ContinueAsNewWithRetryPolicy( ctx workflow.Context, initialMaximumAttempts int, @@ -3697,6 +3750,7 @@ func (w *Workflows) register(worker worker.Worker) { worker.RegisterWorkflow(w.ContinueAsNew) worker.RegisterWorkflow(w.UpsertSearchAttributesConditional) worker.RegisterWorkflow(w.UpsertMemoConditional) + worker.RegisterWorkflow(w.ContinueAsNewAfterUnsetSearchAttribute) worker.RegisterWorkflow(w.ContinueAsNewWithOptions) worker.RegisterWorkflow(w.ContinueAsNewWithRetryPolicy) worker.RegisterWorkflow(w.ContinueAsNewWithChildWF)