Skip to content

Commit 154b40d

Browse files
authored
fix state-based replication for nexus and enable related test (#6932)
## What changed? <!-- Describe what has changed in this PR --> - Use deserialized data when applying hsm node mutation. - Invalid cache after update hsm node. - Skip current run check when dispatch workflow update if transition history is enabled. - xdc nexus: add test with transition history enabled. ## Why? <!-- Tell your future self why have you made these changes --> ## How did you test it? <!-- How have you verified this change? Tested locally? Added a unit test? Checked in staging env? --> Unit test. ## Potential risks <!-- Assuming the worst case, what can be broken when deploying this change to production? --> ## Documentation <!-- Have you made sure this change doesn't falsify anything currently stated in `docs/`? If significant new behavior is added, have you described that in `docs/`? --> ## Is hotfix candidate? <!-- Is this PR a hotfix candidate or does it require a notification to be sent to the broader community? (Yes/No) -->
1 parent 62d71af commit 154b40d

8 files changed

+65
-21
lines changed

Diff for: service/history/hsm/tree.go

+4
Original file line numberDiff line numberDiff line change
@@ -356,6 +356,10 @@ func (n *Node) Child(path []Key) (*Node, error) {
356356
return child.Child(rest)
357357
}
358358

359+
func (n *Node) InvalidateCache() {
360+
n.cache.dataLoaded = false
361+
}
362+
359363
// AddChild adds an immediate child to a node, serializing the given data.
360364
// Returns [ErrStateMachineAlreadyExists] if a child with the given key already exists, [ErrNotRegistered] if the key's
361365
// type is not found in the node's state machine registry and serialization errors.

Diff for: service/history/ndc/transaction_manager_existing_workflow.go

+4-3
Original file line numberDiff line numberDiff line change
@@ -93,8 +93,9 @@ func (r *nDCTransactionMgrForExistingWorkflowImpl) dispatchForExistingWorkflow(
9393
)
9494
}
9595

96-
targetExecutionInfo := targetWorkflow.GetMutableState().GetExecutionInfo()
97-
targetExecutionState := targetWorkflow.GetMutableState().GetExecutionState()
96+
mutableState := targetWorkflow.GetMutableState()
97+
targetExecutionInfo := mutableState.GetExecutionInfo()
98+
targetExecutionState := mutableState.GetExecutionState()
9899
namespaceID := namespace.ID(targetExecutionInfo.NamespaceId)
99100
workflowID := targetExecutionInfo.WorkflowId
100101
targetRunID := targetExecutionState.RunId
@@ -115,7 +116,7 @@ func (r *nDCTransactionMgrForExistingWorkflowImpl) dispatchForExistingWorkflow(
115116
}
116117

117118
if currentRunID == targetRunID {
118-
if !isWorkflowRebuilt {
119+
if !mutableState.IsTransitionHistoryEnabled() && !isWorkflowRebuilt {
119120
return serviceerror.NewInternal("transactionMgr: encountered workflow not rebuilt & current workflow not guaranteed")
120121
}
121122

Diff for: service/history/ndc/transaction_manager_existing_workflow_test.go

+2
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,7 @@ func (s *transactionMgrForExistingWorkflowSuite) TestDispatchForExistingWorkflow
139139
targetMutableState.EXPECT().GetExecutionState().Return(&persistencespb.WorkflowExecutionState{
140140
RunId: targetRunID,
141141
}).AnyTimes()
142+
targetMutableState.EXPECT().IsTransitionHistoryEnabled().Return(false)
142143
s.mockTransactionMgr.EXPECT().GetCurrentWorkflowRunID(ctx, namespaceID, workflowID).Return(targetRunID, nil)
143144

144145
err := s.updateMgr.dispatchForExistingWorkflow(ctx, isWorkflowRebuilt, targetWorkflow, newWorkflow)
@@ -482,6 +483,7 @@ func (s *transactionMgrForExistingWorkflowSuite) TestDispatchForExistingWorkflow
482483
targetMutableState.EXPECT().GetExecutionState().Return(&persistencespb.WorkflowExecutionState{
483484
RunId: targetRunID,
484485
}).AnyTimes()
486+
targetMutableState.EXPECT().IsTransitionHistoryEnabled().Return(false)
485487
s.mockTransactionMgr.EXPECT().GetCurrentWorkflowRunID(ctx, namespaceID, workflowID).Return(targetRunID, nil)
486488

487489
targetContext.EXPECT().ConflictResolveWorkflowExecution(

Diff for: service/history/shard/task_key_generator.go

+2
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,8 @@ func (a *taskKeyGenerator) setTaskKeys(
106106
tag.WorkflowNamespaceID(task.GetNamespaceID()),
107107
tag.WorkflowID(task.GetWorkflowID()),
108108
tag.WorkflowRunID(task.GetRunID()),
109+
tag.TaskType(task.GetType()),
110+
tag.TaskID(id),
109111
tag.Timestamp(taskScheduledTime),
110112
tag.CursorTimestamp(a.taskMinScheduledTime),
111113
tag.ValueShardAllocateTimerBeforeRead,

Diff for: service/history/workflow/mutable_state_impl.go

+19-6
Original file line numberDiff line numberDiff line change
@@ -6929,7 +6929,7 @@ func (ms *MutableStateImpl) applyUpdatesToStateMachineNodes(
69296929
for _, p := range nodeMutation.Path.Path {
69306930
incomingPath = append(incomingPath, hsm.Key{Type: p.Type, ID: p.Id})
69316931
}
6932-
currentNode, err := root.Child(incomingPath)
6932+
node, err := root.Child(incomingPath)
69336933
if err != nil {
69346934
if !errors.Is(err, hsm.ErrStateMachineNotFound) {
69356935
return err
@@ -6943,16 +6943,29 @@ func (ms *MutableStateImpl) applyUpdatesToStateMachineNodes(
69436943
// we don't have enough information to recreate all parents
69446944
return err
69456945
}
6946-
newNode, err := parentNode.AddChild(incomingPath[len(incomingPath)-1], nodeMutation.Data)
6947-
if err != nil {
6948-
return err
6946+
6947+
key := incomingPath[len(incomingPath)-1]
6948+
parentInternalNode := parentNode.InternalRepr()
6949+
6950+
internalNode = &persistencespb.StateMachineNode{
6951+
Children: make(map[string]*persistencespb.StateMachineMap),
6952+
}
6953+
children, ok := parentInternalNode.Children[key.Type]
6954+
if !ok {
6955+
children = &persistencespb.StateMachineMap{MachinesById: make(map[string]*persistencespb.StateMachineNode)}
6956+
// Children may be nil if the map was empty and the proto message we serialized and deserialized.
6957+
if parentInternalNode.Children == nil {
6958+
parentInternalNode.Children = make(map[string]*persistencespb.StateMachineMap, 1)
6959+
}
6960+
parentInternalNode.Children[key.Type] = children
69496961
}
6950-
internalNode = newNode.InternalRepr()
6962+
children.MachinesById[key.ID] = internalNode
69516963
} else {
6952-
internalNode = currentNode.InternalRepr()
6964+
internalNode = node.InternalRepr()
69536965
if CompareVersionedTransition(nodeMutation.LastUpdateVersionedTransition, internalNode.LastUpdateVersionedTransition) == 0 {
69546966
continue
69556967
}
6968+
node.InvalidateCache()
69566969
}
69576970
internalNode.Data = nodeMutation.Data
69586971
internalNode.InitialVersionedTransition = nodeMutation.InitialVersionedTransition

Diff for: tests/xdc/base.go

+3
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,8 @@ type (
7272

7373
startTime time.Time
7474
onceClusterConnect sync.Once
75+
76+
enableTransitionHistory bool
7577
}
7678
)
7779

@@ -98,6 +100,7 @@ func (s *xdcBaseSuite) setupSuite(clusterNames []string, opts ...testcore.Option
98100
s.dynamicConfigOverrides = make(map[dynamicconfig.Key]interface{})
99101
}
100102
s.dynamicConfigOverrides[dynamicconfig.ClusterMetadataRefreshInterval.Key()] = time.Second * 5
103+
s.dynamicConfigOverrides[dynamicconfig.EnableTransitionHistory.Key()] = s.enableTransitionHistory
101104

102105
fileName := "../testdata/xdc_clusters.yaml"
103106
if testcore.TestFlags.TestClusterConfigFile != "" {

Diff for: tests/xdc/nexus_state_replication_test.go

+22-2
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ import (
4747
"go.temporal.io/api/workflow/v1"
4848
"go.temporal.io/api/workflowservice/v1"
4949
sdkclient "go.temporal.io/sdk/client"
50+
"go.temporal.io/server/common"
5051
"go.temporal.io/server/common/dynamicconfig"
5152
commonnexus "go.temporal.io/server/common/nexus"
5253
"go.temporal.io/server/common/nexus/nexustest"
@@ -62,7 +63,25 @@ type NexusStateReplicationSuite struct {
6263

6364
func TestNexusStateReplicationTestSuite(t *testing.T) {
6465
t.Parallel()
65-
suite.Run(t, new(NexusStateReplicationSuite))
66+
for _, tc := range []struct {
67+
name string
68+
enableTransitionHistory bool
69+
}{
70+
{
71+
name: "DisableTransitionHistory",
72+
enableTransitionHistory: false,
73+
},
74+
{
75+
name: "EnableTransitionHistory",
76+
enableTransitionHistory: true,
77+
},
78+
} {
79+
t.Run(tc.name, func(t *testing.T) {
80+
s := &NexusStateReplicationSuite{}
81+
s.enableTransitionHistory = tc.enableTransitionHistory
82+
suite.Run(t, s)
83+
})
84+
}
6685
}
6786

6887
func (s *NexusStateReplicationSuite) SetupSuite() {
@@ -73,7 +92,8 @@ func (s *NexusStateReplicationSuite) SetupSuite() {
7392
dynamicconfig.RefreshNexusEndpointsMinWait.Key(): 1 * time.Millisecond,
7493
callbacks.AllowedAddresses.Key(): []any{map[string]any{"Pattern": "*", "AllowInsecure": true}},
7594
}
76-
s.setupSuite([]string{"nexus_state_replication_active", "nexus_state_replication_standby"})
95+
suffix := "_" + common.GenerateRandomString(5)
96+
s.setupSuite([]string{"nexus_state_replication_active" + suffix, "nexus_state_replication_standby" + suffix})
7797
}
7898

7999
func (s *NexusStateReplicationSuite) SetupTest() {

Diff for: tests/xdc/stream_based_replication_test.go

+9-10
Original file line numberDiff line numberDiff line change
@@ -65,17 +65,17 @@ import (
6565
type (
6666
streamBasedReplicationTestSuite struct {
6767
xdcBaseSuite
68-
controller *gomock.Controller
69-
namespaceName string
70-
namespaceID string
71-
serializer serialization.Serializer
72-
generator test.Generator
73-
once sync.Once
74-
enableTransitionHistory bool
68+
controller *gomock.Controller
69+
namespaceName string
70+
namespaceID string
71+
serializer serialization.Serializer
72+
generator test.Generator
73+
once sync.Once
7574
}
7675
)
7776

7877
func TestStreamBasedReplicationTestSuite(t *testing.T) {
78+
t.Parallel()
7979
for _, tc := range []struct {
8080
name string
8181
enableTransitionHistory bool
@@ -91,9 +91,9 @@ func TestStreamBasedReplicationTestSuite(t *testing.T) {
9191
} {
9292
t.Run(tc.name, func(t *testing.T) {
9393
s := &streamBasedReplicationTestSuite{
94-
namespaceName: "replication-test-" + common.GenerateRandomString(5),
95-
enableTransitionHistory: tc.enableTransitionHistory,
94+
namespaceName: "replication-test-" + common.GenerateRandomString(5),
9695
}
96+
s.enableTransitionHistory = tc.enableTransitionHistory
9797
suite.Run(t, s)
9898
})
9999
}
@@ -106,7 +106,6 @@ func (s *streamBasedReplicationTestSuite) SetupSuite() {
106106
dynamicconfig.EnableEagerNamespaceRefresher.Key(): true,
107107
dynamicconfig.EnableReplicationTaskBatching.Key(): true,
108108
dynamicconfig.EnableReplicateLocalGeneratedEvents.Key(): true,
109-
dynamicconfig.EnableTransitionHistory.Key(): s.enableTransitionHistory,
110109
}
111110
s.logger = log.NewTestLogger()
112111
s.serializer = serialization.NewSerializer()

0 commit comments

Comments
 (0)