Skip to content

Commit 1b11ed2

Browse files
authored
[Reset] Track child workflows initiated after reset (temporalio#7210)
## What changed? In this PR I'm recording a set of children that were initialized after the reset point. The children are identified by "workflow_type:workflow_id" key. ## Why? When the parent starts to make progress after reset, it needs to know all the children that were already started after the reset point. In the current iteration of the feature we will use this information to terminate any running child before starting another instance. We need this information mainly for 2 reasons. 1. If we don't attempt to terminate the parent may not make progress due to ID conflict policy 2. On the other hand, always terminating all children before starting is also wrong since we may, - Terminate any children that may be started in the future that were not present at the time of reset. - Terminate children belonging to a different parent. So, we need to record an exact set of children that were observed (at the time of reset) to have been initialized after the reset point. ## How did you test it? <!-- How have you verified this change? Tested locally? Added a unit test? Checked in staging env? --> ## Potential risks <!-- Assuming the worst case, what can be broken when deploying this change to production? --> ## Documentation <!-- Have you made sure this change doesn't falsify anything currently stated in `docs/`? If significant new behavior is added, have you described that in `docs/`? --> ## Is hotfix candidate? <!-- Is this PR a hotfix candidate or does it require a notification to be sent to the broader community? (Yes/No) -->
1 parent 8de299c commit 1b11ed2

File tree

2 files changed

+35
-0
lines changed

2 files changed

+35
-0
lines changed

service/history/ndc/workflow_resetter.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,14 @@ import (
5959
"google.golang.org/protobuf/types/known/timestamppb"
6060
)
6161

62+
const (
63+
maxChildrenInResetMutableState = 1000 // max number of children tracked in reset mutable state
64+
)
65+
66+
var (
67+
errWorkflowResetterMaxChildren = serviceerror.NewInvalidArgument(fmt.Sprintf("WorkflowResetter encountered max allowed children [%d] while resetting.", maxChildrenInResetMutableState))
68+
)
69+
6270
type (
6371
workflowResetReapplyEventsFn func(ctx context.Context, resetMutableState workflow.MutableState) error
6472

@@ -648,6 +656,11 @@ func (r *workflowResetterImpl) reapplyContinueAsNewWorkflowEvents(
648656
// All subsequent events should be excluded from being re-applied. So, do nothing and return.
649657
return lastVisitedRunID, nil
650658
}
659+
660+
// track the child workflows initiated after reset.
661+
// This will be saved in the parent workflow (in execution info) and used by the parent later to determine how to start these child workflows again.
662+
childrenInitializedAfterReset := make(map[string]*persistencespb.ResetChildInfo)
663+
651664
// First, special handling of remaining events for base workflow
652665
nextRunID, err := r.reapplyEventsFromBranch(
653666
ctx,
@@ -656,6 +669,7 @@ func (r *workflowResetterImpl) reapplyContinueAsNewWorkflowEvents(
656669
baseNextEventID,
657670
baseBranchToken,
658671
resetReapplyExcludeTypes,
672+
childrenInitializedAfterReset,
659673
)
660674
switch err.(type) {
661675
case nil:
@@ -721,6 +735,7 @@ func (r *workflowResetterImpl) reapplyContinueAsNewWorkflowEvents(
721735
nextWorkflowNextEventID,
722736
nextWorkflowBranchToken,
723737
resetReapplyExcludeTypes,
738+
childrenInitializedAfterReset,
724739
)
725740
switch err.(type) {
726741
case nil:
@@ -733,6 +748,9 @@ func (r *workflowResetterImpl) reapplyContinueAsNewWorkflowEvents(
733748
return "", err
734749
}
735750
}
751+
if len(childrenInitializedAfterReset) > 0 {
752+
resetMutableState.SetChildrenInitializedPostResetPoint(childrenInitializedAfterReset)
753+
}
736754
return lastVisitedRunID, nil
737755
}
738756

@@ -743,6 +761,7 @@ func (r *workflowResetterImpl) reapplyEventsFromBranch(
743761
nextEventID int64,
744762
branchToken []byte,
745763
resetReapplyExcludeTypes map[enumspb.ResetReapplyExcludeType]struct{},
764+
childrenInitializedAfterReset map[string]*persistencespb.ResetChildInfo,
746765
) (string, error) {
747766

748767
// TODO change this logic to fetching all workflow [baseWorkflow, currentWorkflow]
@@ -768,6 +787,21 @@ func (r *workflowResetterImpl) reapplyEventsFromBranch(
768787
if _, err := r.reapplyEvents(ctx, mutableState, lastEvents, resetReapplyExcludeTypes); err != nil {
769788
return "", err
770789
}
790+
// track the child workflows initiated after reset-point
791+
for _, event := range lastEvents {
792+
if event.GetEventType() == enumspb.EVENT_TYPE_START_CHILD_WORKFLOW_EXECUTION_INITIATED {
793+
attr := event.GetStartChildWorkflowExecutionInitiatedEventAttributes()
794+
// TODO: there is a possibility the childIDs constructed this way may not be unique. But the probability of that is very low.
795+
// Need to figure out a better way to track these child workflows.
796+
childID := fmt.Sprintf("%s:%s", attr.GetWorkflowType().Name, attr.GetWorkflowId())
797+
childrenInitializedAfterReset[childID] = &persistencespb.ResetChildInfo{
798+
ShouldTerminateAndStart: true,
799+
}
800+
if len(childrenInitializedAfterReset) > maxChildrenInResetMutableState {
801+
return "", errWorkflowResetterMaxChildren
802+
}
803+
}
804+
}
771805
}
772806

773807
if len(lastEvents) > 0 {

service/history/ndc/workflow_resetter_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -824,6 +824,7 @@ func (s *workflowResetterSuite) TestReapplyWorkflowEvents() {
824824
nextEventID,
825825
branchToken,
826826
nil,
827+
map[string]*persistencespb.ResetChildInfo{},
827828
)
828829
s.NoError(err)
829830
s.Equal(newRunID, nextRunID)

0 commit comments

Comments
 (0)