Skip to content

Commit 722e086

Browse files
xwduanyycptt
andauthored
Special Handling for new workflow replication (#7561)
## What changed? <!-- Describe what has changed in this PR --> Add special handling for new workflow replication ## Why? <!-- Tell your future self why have you made these changes --> To reduce passive side loadmutablestate not found attempt ## How did you test it? <!-- How have you verified this change? Tested locally? Added a unit test? Checked in staging env? --> unit test ## Potential risks <!-- Assuming the worst case, what can be broken when deploying this change to production? --> no risk ## Documentation <!-- Have you made sure this change doesn't falsify anything currently stated in `docs/`? If significant new behavior is added, have you described that in `docs/`? --> n/a ## Is hotfix candidate? <!-- Is this PR a hotfix candidate or does it require a notification to be sent to the broader community? (Yes/No) --> no --------- Co-authored-by: Yichao Yang <[email protected]>
1 parent 2ec03b0 commit 722e086

File tree

12 files changed

+637
-18
lines changed

12 files changed

+637
-18
lines changed

api/persistence/v1/executions.pb.go

+11-2
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

common/persistence/serialization/task_serializer.go

+2
Original file line numberDiff line numberDiff line change
@@ -1277,6 +1277,7 @@ func (s *TaskSerializer) replicationSyncVersionedTransitionTaskToProto(
12771277
NextEventId: syncVersionedTransitionTask.NextEventID,
12781278
NewRunId: syncVersionedTransitionTask.NewRunID,
12791279
LastVersionHistoryItem: syncVersionedTransitionTask.LastVersionHistoryItem,
1280+
IsFirstTask: syncVersionedTransitionTask.IsFirstTask,
12801281
TaskEquivalents: taskInfoEquivalents,
12811282
}, nil
12821283
}
@@ -1313,6 +1314,7 @@ func (s *TaskSerializer) replicationSyncVersionedTransitionTaskFromProto(
13131314
VersionedTransition: syncVersionedTransitionTask.VersionedTransition,
13141315
LastVersionHistoryItem: syncVersionedTransitionTask.LastVersionHistoryItem,
13151316
TaskEquivalents: taskEquivalents,
1317+
IsFirstTask: syncVersionedTransitionTask.IsFirstTask,
13161318
}, nil
13171319
}
13181320

proto/internal/temporal/server/api/persistence/v1/executions.proto

+1
Original file line numberDiff line numberDiff line change
@@ -379,6 +379,7 @@ message ReplicationTaskInfo {
379379
// doesn't need to be disabled.
380380
repeated ReplicationTaskInfo task_equivalents = 20;
381381
history.v1.VersionHistoryItem last_version_history_item = 21;
382+
bool is_first_task = 22;
382383
}
383384

384385
// visibility_task_data column

service/history/ndc/workflow_state_replicator.go

+175-1
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,17 @@ func (r *WorkflowStateReplicatorImpl) ReplicateVersionedTransition(
229229
default:
230230
return serviceerror.NewInvalidArgument(fmt.Sprintf("unknown artifact type %T", artifactType))
231231
}
232+
233+
if mutation != nil && mutation.ExclusiveStartVersionedTransition.TransitionCount == 0 {
234+
// this is the first replication task for this workflow
235+
// TODO: Handle reset case to reduce the amount of history events write
236+
err := r.handleFirstReplicationTask(ctx, versionedTransition, sourceClusterName)
237+
if !errors.Is(err, consts.ErrDuplicate) {
238+
// if ErrDuplicate is returned from creation, it means the workflow is already existed, continue to apply mutation
239+
return err
240+
}
241+
}
242+
232243
executionState, executionInfo := func() (*persistencespb.WorkflowExecutionState, *persistencespb.WorkflowExecutionInfo) {
233244
if snapshot != nil {
234245
return snapshot.State.ExecutionState, snapshot.State.ExecutionInfo
@@ -337,6 +348,168 @@ func (r *WorkflowStateReplicatorImpl) ReplicateVersionedTransition(
337348
}
338349
}
339350

351+
//nolint:revive // cognitive complexity 35 (> max enabled 25)
352+
func (r *WorkflowStateReplicatorImpl) handleFirstReplicationTask(
353+
ctx context.Context,
354+
versionedTransitionArtifact *replicationspb.VersionedTransitionArtifact,
355+
sourceClusterName string,
356+
) (retErr error) {
357+
mutation := versionedTransitionArtifact.GetSyncWorkflowStateMutationAttributes()
358+
executionInfo := mutation.StateMutation.ExecutionInfo
359+
executionState := mutation.StateMutation.ExecutionState
360+
sourceVersionHistories := mutation.StateMutation.ExecutionInfo.VersionHistories
361+
currentVersionHistory, err := versionhistory.GetCurrentVersionHistory(sourceVersionHistories)
362+
if err != nil {
363+
return err
364+
}
365+
lastVersionHistoryItem, err := versionhistory.GetLastVersionHistoryItem(currentVersionHistory)
366+
if err != nil {
367+
return err
368+
}
369+
370+
var historyEventBatchs [][]*historypb.HistoryEvent
371+
for _, blob := range versionedTransitionArtifact.EventBatches {
372+
e, err := r.historySerializer.DeserializeEvents(blob)
373+
if err != nil {
374+
return err
375+
}
376+
historyEventBatchs = append(historyEventBatchs, e)
377+
}
378+
lastBatch := historyEventBatchs[len(historyEventBatchs)-1]
379+
lastEvent := lastBatch[len(lastBatch)-1]
380+
if lastEvent.EventId < lastVersionHistoryItem.EventId {
381+
remoteHistoryIterator := collection.NewPagingIterator(r.getHistoryFromRemotePaginationFn(
382+
ctx,
383+
sourceClusterName,
384+
namespace.ID(executionInfo.NamespaceId),
385+
executionInfo.WorkflowId,
386+
executionState.RunId,
387+
lastEvent.EventId,
388+
lastEvent.Version,
389+
lastVersionHistoryItem.EventId+1,
390+
lastVersionHistoryItem.Version),
391+
)
392+
for remoteHistoryIterator.HasNext() {
393+
batch, err := remoteHistoryIterator.Next()
394+
if err != nil {
395+
return err
396+
}
397+
sourceEvents, err := r.historySerializer.DeserializeEvents(batch.rawHistory)
398+
if err != nil {
399+
return err
400+
}
401+
historyEventBatchs = append(historyEventBatchs, sourceEvents)
402+
}
403+
}
404+
405+
wfCtx, releaseFn, err := r.workflowCache.GetOrCreateWorkflowExecution(
406+
ctx,
407+
r.shardContext,
408+
namespace.ID(executionInfo.NamespaceId),
409+
&commonpb.WorkflowExecution{
410+
WorkflowId: executionInfo.WorkflowId,
411+
RunId: executionState.RunId,
412+
},
413+
locks.PriorityLow,
414+
)
415+
if err != nil {
416+
return err
417+
}
418+
defer func() {
419+
if rec := recover(); rec != nil {
420+
releaseFn(errPanic)
421+
panic(rec)
422+
}
423+
releaseFn(retErr)
424+
}()
425+
426+
nsEntry, err := r.namespaceRegistry.GetNamespaceByID(namespace.ID(executionInfo.NamespaceId))
427+
if err != nil {
428+
return err
429+
}
430+
localMutableState := workflow.NewMutableState(
431+
r.shardContext,
432+
r.shardContext.GetEventsCache(),
433+
r.logger,
434+
nsEntry,
435+
executionInfo.WorkflowId,
436+
executionState.RunId,
437+
timestamp.TimeValue(executionState.StartTime),
438+
)
439+
err = localMutableState.ApplyMutation(mutation.StateMutation)
440+
if err != nil {
441+
return err
442+
}
443+
444+
err = localMutableState.SetHistoryTree(executionInfo.WorkflowExecutionTimeout, executionInfo.WorkflowRunTimeout, executionState.RunId)
445+
if err != nil {
446+
return nil
447+
}
448+
localCurrentVersionHistory, err := versionhistory.GetCurrentVersionHistory(localMutableState.GetExecutionInfo().VersionHistories)
449+
if err != nil {
450+
return err
451+
}
452+
defer func() {
453+
if retErr != nil {
454+
// if we fail to create workflow, we need to clean up the history branch
455+
if err := r.shardContext.GetExecutionManager().DeleteHistoryBranch(ctx, &persistence.DeleteHistoryBranchRequest{
456+
ShardID: r.shardContext.GetShardID(),
457+
BranchToken: localCurrentVersionHistory.BranchToken,
458+
}); err != nil {
459+
r.logger.Error("failed to clean up workflow execution", tag.Error(err))
460+
}
461+
}
462+
}()
463+
if err != nil {
464+
return err
465+
}
466+
467+
localCurrentVersionHistory.Items = versionhistory.CopyVersionHistoryItems(currentVersionHistory.Items)
468+
if err != nil {
469+
return err
470+
}
471+
472+
localMutableState.SetHistoryBuilder(historybuilder.NewImmutable(historyEventBatchs...))
473+
for _, historyEventBatch := range historyEventBatchs {
474+
for _, historyEvent := range historyEventBatch {
475+
r.addEventToCache(definition.WorkflowKey{
476+
NamespaceID: executionInfo.NamespaceId,
477+
WorkflowID: executionInfo.WorkflowId,
478+
RunID: executionState.RunId,
479+
}, historyEvent)
480+
}
481+
}
482+
if versionedTransitionArtifact.NewRunInfo != nil {
483+
err = r.createNewRunWorkflow(
484+
ctx,
485+
namespace.ID(executionInfo.NamespaceId),
486+
executionInfo.WorkflowId,
487+
versionedTransitionArtifact.NewRunInfo,
488+
localMutableState,
489+
true,
490+
)
491+
if err != nil {
492+
return err
493+
}
494+
}
495+
496+
err = r.taskRefresher.Refresh(ctx, localMutableState)
497+
498+
if err != nil {
499+
return err
500+
}
501+
502+
return r.transactionMgr.CreateWorkflow(
503+
ctx,
504+
NewWorkflow(
505+
r.clusterMetadata,
506+
wfCtx,
507+
localMutableState,
508+
releaseFn,
509+
),
510+
)
511+
}
512+
340513
func (r *WorkflowStateReplicatorImpl) applyMutation(
341514
ctx context.Context,
342515
namespaceID namespace.ID,
@@ -370,7 +543,8 @@ func (r *WorkflowStateReplicatorImpl) applyMutation(
370543
if workflow.TransitionHistoryStalenessCheck(localTransitionHistory, mutation.ExclusiveStartVersionedTransition) != nil ||
371544
workflow.TransitionHistoryStalenessCheck(sourceTransitionHistory, localVersionedTransition) != nil {
372545
return serviceerrors.NewSyncState(
373-
fmt.Sprintf("Failed to apply mutation due to version check failed. local transition history: %v, source transition history: %v", localTransitionHistory, sourceTransitionHistory),
546+
fmt.Sprintf("Failed to apply mutation due to version check failed. local transition history: %v, source transition history: %v, exclusiveStartVersionedTransition: %v",
547+
localTransitionHistory, sourceTransitionHistory, mutation.ExclusiveStartVersionedTransition),
374548
namespaceID.String(),
375549
workflowID,
376550
runID,

service/history/ndc/workflow_state_replicator_test.go

+97
Original file line numberDiff line numberDiff line change
@@ -878,6 +878,103 @@ func (s *workflowReplicatorSuite) Test_ReplicateVersionedTransition_SameBranch_S
878878
s.NoError(err)
879879
}
880880

881+
func (s *workflowReplicatorSuite) Test_ReplicateVersionedTransition_FirstTask_SyncMutation() {
882+
workflowStateReplicator := NewWorkflowStateReplicator(
883+
s.mockShard,
884+
s.mockWorkflowCache,
885+
nil,
886+
serialization.NewSerializer(),
887+
s.logger,
888+
)
889+
mockTransactionManager := NewMockTransactionManager(s.controller)
890+
mockTaskRefresher := workflow.NewMockTaskRefresher(s.controller)
891+
workflowStateReplicator.transactionMgr = mockTransactionManager
892+
workflowStateReplicator.taskRefresher = mockTaskRefresher
893+
namespaceID := uuid.New()
894+
s.workflowStateReplicator.transactionMgr = NewMockTransactionManager(s.controller)
895+
versionHistories := &historyspb.VersionHistories{
896+
CurrentVersionHistoryIndex: 0,
897+
Histories: []*historyspb.VersionHistory{
898+
{
899+
BranchToken: []byte("branchToken"),
900+
Items: []*historyspb.VersionHistoryItem{
901+
{
902+
EventId: int64(4),
903+
Version: int64(2),
904+
},
905+
},
906+
},
907+
},
908+
}
909+
eventBatches := []*historypb.HistoryEvent{
910+
{EventId: 1, Version: 2}, {EventId: 2, Version: 2},
911+
{EventId: 3, Version: 2}, {EventId: 4, Version: 2},
912+
}
913+
eventBatchBlob, err := serialization.NewSerializer().SerializeEvents(eventBatches, enumspb.ENCODING_TYPE_PROTO3)
914+
s.NoError(err)
915+
transitionHistory := []*persistencespb.VersionedTransition{
916+
{NamespaceFailoverVersion: 2, TransitionCount: 10},
917+
}
918+
versionedTransitionArtifact := &replicationspb.VersionedTransitionArtifact{
919+
StateAttributes: &replicationspb.VersionedTransitionArtifact_SyncWorkflowStateMutationAttributes{
920+
SyncWorkflowStateMutationAttributes: &replicationspb.SyncWorkflowStateMutationAttributes{
921+
StateMutation: &persistencespb.WorkflowMutableStateMutation{
922+
ExecutionInfo: &persistencespb.WorkflowExecutionInfo{
923+
WorkflowId: s.workflowID,
924+
NamespaceId: namespaceID,
925+
VersionHistories: versionHistories,
926+
TransitionHistory: transitionHistory,
927+
},
928+
ExecutionState: &persistencespb.WorkflowExecutionState{
929+
RunId: s.runID,
930+
State: enumsspb.WORKFLOW_EXECUTION_STATE_RUNNING,
931+
Status: enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING,
932+
},
933+
},
934+
ExclusiveStartVersionedTransition: &persistencespb.VersionedTransition{
935+
NamespaceFailoverVersion: 2, TransitionCount: 0,
936+
},
937+
},
938+
},
939+
EventBatches: []*commonpb.DataBlob{eventBatchBlob},
940+
}
941+
mockWeCtx := historyi.NewMockWorkflowContext(s.controller)
942+
s.mockWorkflowCache.EXPECT().GetOrCreateWorkflowExecution(
943+
gomock.Any(),
944+
s.mockShard,
945+
namespace.ID(namespaceID),
946+
&commonpb.WorkflowExecution{
947+
WorkflowId: s.workflowID,
948+
RunId: s.runID,
949+
},
950+
locks.PriorityLow,
951+
).Return(mockWeCtx, wcache.NoopReleaseFn, nil)
952+
s.mockNamespaceCache.EXPECT().GetNamespaceByID(namespace.ID(namespaceID)).Return(namespace.NewNamespaceForTest(
953+
&persistencespb.NamespaceInfo{},
954+
nil,
955+
false,
956+
nil,
957+
int64(100),
958+
), nil).AnyTimes()
959+
mockTaskRefresher.EXPECT().Refresh(gomock.Any(), gomock.Any()).Return(nil).Times(1)
960+
961+
mockTransactionManager.EXPECT().CreateWorkflow(
962+
gomock.Any(),
963+
gomock.AssignableToTypeOf(&WorkflowImpl{}),
964+
).DoAndReturn(func(ctx context.Context, wf Workflow) error {
965+
// Capture localMutableState from the workflow
966+
localMutableState := wf.GetMutableState()
967+
968+
// Perform your comparisons here
969+
s.Equal(localMutableState.GetExecutionInfo().TransitionHistory, transitionHistory)
970+
971+
return nil
972+
}).Times(1)
973+
err = workflowStateReplicator.ReplicateVersionedTransition(context.Background(), versionedTransitionArtifact, "test")
974+
s.NoError(err)
975+
976+
}
977+
881978
func (s *workflowReplicatorSuite) Test_ReplicateVersionedTransition_MutationProvidedWithGap_ReturnSyncStateError() {
882979
workflowStateReplicator := NewWorkflowStateReplicator(
883980
s.mockShard,

0 commit comments

Comments
 (0)