Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -1429,6 +1429,12 @@ HistoryCacheSizeBasedLimit is set to true.`,
true,
`EnableWorkflowExecutionTimeoutTimer controls whether to enable the new logic for generating a workflow execution
timeout timer when execution timeout is specified when starting a workflow.`,
)
EnableUpdateWorkflowModeIgnoreCurrent = NewGlobalBoolSetting(
"history.enableUpdateWorkflowModeIgnoreCurrent",
true,
`EnableUpdateWorkflowModeIgnoreCurrent controls whether to enable the new logic for updating closed workflow execution
by mutation using UpdateWorkflowModeIgnoreCurrent`,
)
EnableTransitionHistory = NewGlobalBoolSetting(
"history.enableTransitionHistory",
Expand Down
3 changes: 3 additions & 0 deletions common/persistence/cassandra/mutable_state_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -626,6 +626,9 @@ func (d *MutableStateStore) UpdateWorkflowExecution(
shardID := request.ShardID

switch request.Mode {
case p.UpdateWorkflowModeIgnoreCurrent:
// noop

case p.UpdateWorkflowModeBypassCurrent:
if err := d.assertNotCurrentExecution(
ctx,
Expand Down
6 changes: 6 additions & 0 deletions common/persistence/data_interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,12 @@ const (
// UpdateWorkflowModeBypassCurrent update workflow, without current record
// NOTE: current record CANNOT point to the workflow to be updated
UpdateWorkflowModeBypassCurrent
// UpdateWorkflowModeIgnoreCurrent update workflow, without checking or update current record.
// This mode should only be used when we don't know if the workflow being updated is the current workflow or not in DB.
// For example, when updating a closed workflow, it may or may not be the current workflow.
// This is similar to SetWorkflowExecution, but UpdateWorkflowExecution with this mode persists the workflow as a mutation,
// instead of a snapshot.
UpdateWorkflowModeIgnoreCurrent
)

// ConflictResolveWorkflowMode conflict resolve mode
Expand Down
11 changes: 11 additions & 0 deletions common/persistence/operation_mode_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,17 @@ func ValidateUpdateWorkflowModeState(
}
return nil

case UpdateWorkflowModeIgnoreCurrent:
// Cannot have new workflow when skipping current workflow check
if newWorkflowState != nil {
return newInvalidUpdateWorkflowWithNewMode(
mode,
currentWorkflowState,
*newWorkflowState,
)
}
return nil

default:
return serviceerror.NewInternal(fmt.Sprintf("unknown mode: %v", mode))
}
Expand Down
3 changes: 3 additions & 0 deletions common/persistence/sql/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,9 @@ func (m *sqlExecutionStore) updateWorkflowExecutionTx(
shardID := request.ShardID

switch request.Mode {
case p.UpdateWorkflowModeIgnoreCurrent:
// noop

case p.UpdateWorkflowModeBypassCurrent:
if err := assertNotCurrentExecution(ctx,
tx,
Expand Down
108 changes: 108 additions & 0 deletions common/persistence/tests/execution_mutable_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -851,6 +851,114 @@ func (s *ExecutionMutableStateSuite) TestUpdate_Zombie_WithNew() {
s.AssertHEEqualWithDB(newBranchToken, newEvents3)
}

func (s *ExecutionMutableStateSuite) TestUpdate_ClosedWorkflow_IsCurrent() {
branchToken, newSnapshot, newEvents := s.CreateWorkflow(
rand.Int63(),
enumsspb.WORKFLOW_EXECUTION_STATE_COMPLETED,
enumspb.WORKFLOW_EXECUTION_STATUS_COMPLETED,
rand.Int63(),
)

// NOTE: no new events for closed workflows
currentMutation, _ := RandomMutation(
s.T(),
s.NamespaceID,
s.WorkflowID,
s.RunID,
newSnapshot.NextEventID,
rand.Int63(),
enumsspb.WORKFLOW_EXECUTION_STATE_COMPLETED,
enumspb.WORKFLOW_EXECUTION_STATUS_COMPLETED,
newSnapshot.DBRecordVersion+1,
branchToken,
)
_, err := s.ExecutionManager.UpdateWorkflowExecution(s.Ctx, &p.UpdateWorkflowExecutionRequest{
ShardID: s.ShardID,
RangeID: s.RangeID,
Mode: p.UpdateWorkflowModeIgnoreCurrent,

UpdateWorkflowMutation: *currentMutation,
UpdateWorkflowEvents: nil,

NewWorkflowSnapshot: nil,
NewWorkflowEvents: nil,
})
s.NoError(err)

s.AssertMSEqualWithDB(newSnapshot, currentMutation)
s.AssertHEEqualWithDB(branchToken, newEvents)
}

func (s *ExecutionMutableStateSuite) TestUpdate_ClosedWorkflow_IsNonCurrent() {
nonCurrentLastWriteVersion := rand.Int63()
nonCurrentBranchToken, nonCurrentSnapshot, nonCurrentEvents := s.CreateWorkflow(
nonCurrentLastWriteVersion,
enumsspb.WORKFLOW_EXECUTION_STATE_COMPLETED,
enumspb.WORKFLOW_EXECUTION_STATUS_COMPLETED,
rand.Int63(),
)

// make current workflow to a different run
currentRunID := uuid.New().String()
currentBranchToken := RandomBranchToken(s.NamespaceID, s.WorkflowID, currentRunID, s.historyBranchUtil)
currentSnapshot, currentEvents := RandomSnapshot(
s.T(),
s.NamespaceID,
s.WorkflowID,
currentRunID,
common.FirstEventID,
rand.Int63(),
enumsspb.WORKFLOW_EXECUTION_STATE_RUNNING,
enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING,
rand.Int63(),
currentBranchToken,
)
_, err := s.ExecutionManager.CreateWorkflowExecution(s.Ctx, &p.CreateWorkflowExecutionRequest{
ShardID: s.ShardID,
RangeID: s.RangeID,
Mode: p.CreateWorkflowModeUpdateCurrent,

PreviousRunID: nonCurrentSnapshot.ExecutionState.RunId,
PreviousLastWriteVersion: nonCurrentLastWriteVersion,

NewWorkflowSnapshot: *currentSnapshot,
NewWorkflowEvents: currentEvents,
})
s.NoError(err)

// Update the original closed workflow
// NOTE: no new events for closed workflows
nonCurrentMutation, _ := RandomMutation(
s.T(),
s.NamespaceID,
s.WorkflowID,
s.RunID,
nonCurrentSnapshot.NextEventID,
rand.Int63(),
enumsspb.WORKFLOW_EXECUTION_STATE_COMPLETED,
enumspb.WORKFLOW_EXECUTION_STATUS_COMPLETED,
nonCurrentSnapshot.DBRecordVersion+1,
nonCurrentBranchToken,
)
_, err = s.ExecutionManager.UpdateWorkflowExecution(s.Ctx, &p.UpdateWorkflowExecutionRequest{
ShardID: s.ShardID,
RangeID: s.RangeID,
Mode: p.UpdateWorkflowModeIgnoreCurrent,

UpdateWorkflowMutation: *nonCurrentMutation,
UpdateWorkflowEvents: nil,

NewWorkflowSnapshot: nil,
NewWorkflowEvents: nil,
})
s.NoError(err)

s.AssertMSEqualWithDB(nonCurrentSnapshot, nonCurrentMutation)
s.AssertHEEqualWithDB(nonCurrentBranchToken, nonCurrentEvents)
s.AssertMSEqualWithDB(currentSnapshot)
s.AssertHEEqualWithDB(currentBranchToken, currentEvents)
}

func (s *ExecutionMutableStateSuite) TestConflictResolve_SuppressCurrent() {
branchToken, currentSnapshot, currentEvents1 := s.CreateWorkflow(
rand.Int63(),
Expand Down
2 changes: 2 additions & 0 deletions service/history/configs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ type Config struct {
EnableHostLevelHistoryCache dynamicconfig.BoolPropertyFn
EnableNexus dynamicconfig.BoolPropertyFn
EnableWorkflowExecutionTimeoutTimer dynamicconfig.BoolPropertyFn
EnableUpdateWorkflowModeIgnoreCurrent dynamicconfig.BoolPropertyFn
EnableTransitionHistory dynamicconfig.BoolPropertyFn
MaxCallbacksPerWorkflow dynamicconfig.IntPropertyFnWithNamespaceFilter

Expand Down Expand Up @@ -438,6 +439,7 @@ func NewConfig(
EnableHostLevelHistoryCache: dynamicconfig.EnableHostHistoryCache.Get(dc),
EnableNexus: dynamicconfig.EnableNexus.Get(dc),
EnableWorkflowExecutionTimeoutTimer: dynamicconfig.EnableWorkflowExecutionTimeoutTimer.Get(dc),
EnableUpdateWorkflowModeIgnoreCurrent: dynamicconfig.EnableUpdateWorkflowModeIgnoreCurrent.Get(dc),
EnableTransitionHistory: dynamicconfig.EnableTransitionHistory.Get(dc),
MaxCallbacksPerWorkflow: dynamicconfig.MaxCallbacksPerWorkflow.Get(dc),

Expand Down
2 changes: 1 addition & 1 deletion service/history/history_engine2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2681,7 +2681,7 @@ func (s *engine2Suite) TestRefreshWorkflowTasks() {
wfMs := workflow.TestCloneToProto(ms)
gwmsResponse := &persistence.GetWorkflowExecutionResponse{State: wfMs}
s.mockExecutionMgr.EXPECT().GetWorkflowExecution(gomock.Any(), gomock.Any()).Return(gwmsResponse, nil)
s.mockExecutionMgr.EXPECT().SetWorkflowExecution(gomock.Any(), gomock.Any()).Return(&persistence.SetWorkflowExecutionResponse{}, nil)
s.mockExecutionMgr.EXPECT().UpdateWorkflowExecution(gomock.Any(), gomock.Any()).Return(tests.UpdateWorkflowExecutionResponse, nil)
s.mockEventsCache.EXPECT().GetEvent(
gomock.Any(),
gomock.Any(),
Expand Down
1 change: 1 addition & 0 deletions service/history/interfaces/mutable_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@ type (
IsCancelRequested() bool
IsWorkflowCloseAttempted() bool
IsCurrentWorkflowGuaranteed() bool
IsNonCurrentWorkflowGuaranteed() (bool, error)
IsSignalRequested(requestID string) bool
GetApproximatePersistedSize() int

Expand Down
15 changes: 15 additions & 0 deletions service/history/interfaces/mutable_state_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 10 additions & 0 deletions service/history/ndc/activity_state_replicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,11 @@ func (r *ActivityStateReplicatorImpl) SyncActivityState(
return err
}

if r.shardContext.GetConfig().EnableUpdateWorkflowModeIgnoreCurrent() {
return executionContext.UpdateWorkflowExecutionAsPassive(ctx, r.shardContext)
}

// TODO: remove following code once EnableUpdateWorkflowModeIgnoreCurrent config is deprecated.
updateMode := persistence.UpdateWorkflowModeUpdateCurrent
if state, _ := mutableState.GetWorkflowStateStatus(); state == enumsspb.WORKFLOW_EXECUTION_STATE_ZOMBIE {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The previous logic is kinda wrong/misleading as it making decision based on the in memory workflow state instead of the workflow state in DB. The updateMode though, is for doing current record CAS in db, so we need to pick it based on the state in db (i.e. the workflow state before the current mutable state transaction)

The logic still works because activity state replicator (and same for the hsm state replicator) below don't really change the workflow state, so in memory state == state in DB.

updateMode = persistence.UpdateWorkflowModeBypassCurrent
Expand Down Expand Up @@ -262,6 +267,11 @@ func (r *ActivityStateReplicatorImpl) SyncActivitiesState(
return err
}

if r.shardContext.GetConfig().EnableUpdateWorkflowModeIgnoreCurrent() {
return executionContext.UpdateWorkflowExecutionAsPassive(ctx, r.shardContext)
}

// TODO: remove following code once EnableUpdateWorkflowModeIgnoreCurrent config is deprecated.
updateMode := persistence.UpdateWorkflowModeUpdateCurrent
if state, _ := mutableState.GetWorkflowStateStatus(); state == enumsspb.WORKFLOW_EXECUTION_STATE_ZOMBIE {
updateMode = persistence.UpdateWorkflowModeBypassCurrent
Expand Down
40 changes: 4 additions & 36 deletions service/history/ndc/activity_state_replicator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1035,15 +1035,7 @@ func (s *activityReplicatorStateSuite) TestSyncActivity_ActivityFound_Zombie() {
}).Return(false)
s.mockMutableState.EXPECT().GetPendingActivityInfos().Return(map[int64]*persistencespb.ActivityInfo{})

weContext.EXPECT().UpdateWorkflowExecutionWithNew(
gomock.Any(),
s.mockShard,
persistence.UpdateWorkflowModeBypassCurrent,
historyi.WorkflowContext(nil),
historyi.MutableState(nil),
historyi.TransactionPolicyPassive,
(*historyi.TransactionPolicy)(nil),
).Return(nil)
weContext.EXPECT().UpdateWorkflowExecutionAsPassive(gomock.Any(), s.mockShard).Return(nil)

s.mockNamespaceCache.EXPECT().GetNamespaceByID(namespaceID).Return(
namespace.NewGlobalNamespaceForTest(
Expand Down Expand Up @@ -1148,15 +1140,7 @@ func (s *activityReplicatorStateSuite) TestSyncActivities_ActivityFound_Zombie()
}).Return(false)
s.mockMutableState.EXPECT().GetPendingActivityInfos().Return(map[int64]*persistencespb.ActivityInfo{})

weContext.EXPECT().UpdateWorkflowExecutionWithNew(
gomock.Any(),
s.mockShard,
persistence.UpdateWorkflowModeBypassCurrent,
historyi.WorkflowContext(nil),
historyi.MutableState(nil),
historyi.TransactionPolicyPassive,
(*historyi.TransactionPolicy)(nil),
).Return(nil)
weContext.EXPECT().UpdateWorkflowExecutionAsPassive(gomock.Any(), s.mockShard).Return(nil)

s.mockNamespaceCache.EXPECT().GetNamespaceByID(namespaceID).Return(
namespace.NewGlobalNamespaceForTest(
Expand Down Expand Up @@ -1257,15 +1241,7 @@ func (s *activityReplicatorStateSuite) TestSyncActivity_ActivityFound_NonZombie(
}).Return(false)
s.mockMutableState.EXPECT().GetPendingActivityInfos().Return(map[int64]*persistencespb.ActivityInfo{})

weContext.EXPECT().UpdateWorkflowExecutionWithNew(
gomock.Any(),
s.mockShard,
persistence.UpdateWorkflowModeUpdateCurrent,
historyi.WorkflowContext(nil),
historyi.MutableState(nil),
historyi.TransactionPolicyPassive,
(*historyi.TransactionPolicy)(nil),
).Return(nil)
weContext.EXPECT().UpdateWorkflowExecutionAsPassive(gomock.Any(), s.mockShard).Return(nil)

s.mockNamespaceCache.EXPECT().GetNamespaceByID(namespaceID).Return(
namespace.NewGlobalNamespaceForTest(
Expand Down Expand Up @@ -1370,15 +1346,7 @@ func (s *activityReplicatorStateSuite) TestSyncActivities_ActivityFound_NonZombi
}).Return(false)
s.mockMutableState.EXPECT().GetPendingActivityInfos().Return(map[int64]*persistencespb.ActivityInfo{})

weContext.EXPECT().UpdateWorkflowExecutionWithNew(
gomock.Any(),
s.mockShard,
persistence.UpdateWorkflowModeUpdateCurrent,
historyi.WorkflowContext(nil),
historyi.MutableState(nil),
historyi.TransactionPolicyPassive,
(*historyi.TransactionPolicy)(nil),
).Return(nil)
weContext.EXPECT().UpdateWorkflowExecutionAsPassive(gomock.Any(), s.mockShard).Return(nil)

s.mockNamespaceCache.EXPECT().GetNamespaceByID(namespaceID).Return(
namespace.NewGlobalNamespaceForTest(
Expand Down
5 changes: 5 additions & 0 deletions service/history/ndc/hsm_state_replicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,11 @@ func (r *HSMStateReplicatorImpl) SyncHSMState(
return consts.ErrDuplicate
}

if r.shardContext.GetConfig().EnableUpdateWorkflowModeIgnoreCurrent() {
return workflowContext.UpdateWorkflowExecutionAsPassive(ctx, r.shardContext)
}

// TODO: remove following code once EnableUpdateWorkflowModeIgnoreCurrent config is deprecated.
state, _ := mutableState.GetWorkflowStateStatus()
if state == enumsspb.WORKFLOW_EXECUTION_STATE_COMPLETED {
return workflowContext.SubmitClosedWorkflowSnapshot(
Expand Down
Loading
Loading