-
Notifications
You must be signed in to change notification settings - Fork 937
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Special Handling for new workflow replication #7561
Conversation
} | ||
} | ||
|
||
wfCtx, releaseFn, err := r.workflowCache.GetOrCreateWorkflowExecution( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could use current workflow lock.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
GetOrCreateWorkflowExecution does not return the workflow context which is necessary parameter for ndc createWorkflow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. Looks like normal start workflow (manually creates the workflowContext object) and replication start workflow follows a different approach.
|
||
if mutation != nil && mutation.ExclusiveStartVersionedTransition.TransitionCount == 0 { | ||
// this is the first replication task for this workflow | ||
// TODO: Handle reset case to reduce the amount of history events write |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc @meiliang86 FYI
err = r.taskRefresher.Refresh(ctx, localMutableState) | ||
|
||
if err != nil { | ||
println(err.Error()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit; remove
//nolint:revive // cognitive complexity 35 (> max enabled 25) | ||
func (r *WorkflowStateReplicatorImpl) handleFirstReplicationTask( | ||
ctx context.Context, | ||
versionedTransition *replicationspb.VersionedTransitionArtifact, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: artifact maybe?
return err | ||
} | ||
|
||
localMutableState.SetHistoryBuilder(historybuilder.NewImmutable(historyEventBatchs...)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will this update the HistorySize in ExecutionStats?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it is handled here:
newSnapshot.ExecutionInfo.ExecutionStats.HistorySize += int64(newHistoryDiff.SizeDiff) |
SyncWorkflowStateMutationAttributes: &replicationspb.SyncWorkflowStateMutationAttributes{ | ||
StateMutation: mutation, | ||
ExclusiveStartVersionedTransition: &persistencespb.VersionedTransition{ | ||
NamespaceFailoverVersion: taskVersionedTransition.NamespaceFailoverVersion, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so this is for making sure workflow.TransitionHistoryStalenessCheck(localTransitionHistory, mutation.ExclusiveStartVersionedTransition) != nil
check can pass when applying mutation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes. so we don't need special handling for that.
} | ||
} | ||
|
||
wfCtx, releaseFn, err := r.workflowCache.GetOrCreateWorkflowExecution( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. Looks like normal start workflow (manually creates the workflowContext object) and replication start workflow follows a different approach.
What changed?
Add special handling for new workflow replication
Why?
To reduce passive side loadmutablestate not found attempt
How did you test it?
unit test
Potential risks
no risk
Documentation
n/a
Is hotfix candidate?
no