Skip to content

Commit 8977071

Browse files
committed
timeskipping support replication
1 parent e004436 commit 8977071

10 files changed

Lines changed: 1089 additions & 384 deletions

File tree

api/persistence/v1/executions.go-helpers.pb.go

Lines changed: 37 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

api/persistence/v1/executions.pb.go

Lines changed: 437 additions & 367 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

proto/internal/temporal/server/api/persistence/v1/executions.proto

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -325,10 +325,22 @@ message TimeSkippingInfo {
325325
temporal.api.workflow.v1.TimeSkippingConfig config = 1;
326326

327327
// Total skipped duration for the current workflow execution run, including any
328-
./* inherited skipped duration carried over from a preceding execution that started this run. */google.protobuf.Duration accumulated_skipped_duration = 2;
328+
// inherited skipped duration carried over from a preceding execution that started this run.
329+
google.protobuf.Duration accumulated_skipped_duration = 2;
329330

330331
// The current bound based on elapsed duration for time skipping.
331332
TimeSkippingBoundInfo current_elapsed_duration_bound = 3;
333+
334+
repeated TimeSkipTaskRegenEntry task_regen_entries = 4;
335+
}
336+
337+
message TimeSkipTaskRegenEntry {
338+
// The TimeSkippingTransitionedEvent that caused the skipped duration.
339+
int64 source_event_id = 1;
340+
341+
// the mask to have idempotency for task recreation
342+
// especially when tasks are regenerated in the passive cluster in a state-based replication
343+
int32 task_regen_status = 2;
332344
}
333345

334346
message TimeSkippingBoundInfo {

service/history/timer_queue_standby_task_executor.go

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -110,9 +110,7 @@ func (t *timerQueueStandbyTaskExecutor) Execute(
110110
case *tasks.ChasmTask:
111111
err = t.executeChasmSideEffectTimerTask(ctx, task)
112112
case *tasks.TimeSkippingTimerTask:
113-
// todo@time-skipping: replication. The disable-after-bound transition is emitted
114-
// on the active side and will replicate; standby drops the local task.
115-
err = nil
113+
err = t.executeTimeSkippingTimerTask(ctx, task)
116114
default:
117115
err = queueserrors.NewUnprocessableTaskError("unknown task type")
118116
}
@@ -235,17 +233,32 @@ func (t *timerQueueStandbyTaskExecutor) discardChasmTask(
235233
)
236234
}
237235

236+
// executeTimeSkippingTimerTask is a deliberate no-op on the standby side. The
237+
// disable-after-bound transition is emitted on the active cluster as a regular
238+
// history event and replicates through the normal path; the bound flag is
239+
// advisory on standby (it doesn't drive any local progress), so there's nothing
240+
// for the verifier to gain by checking it.
241+
func (t *timerQueueStandbyTaskExecutor) executeTimeSkippingTimerTask(
242+
_ context.Context,
243+
_ *tasks.TimeSkippingTimerTask,
244+
) error {
245+
return nil
246+
}
247+
238248
func (t *timerQueueStandbyTaskExecutor) executeUserTimerTimeoutTask(
239249
ctx context.Context,
240250
timerTask *tasks.UserTimerTask,
241251
) error {
242-
referenceTime := t.Now()
243252
actionFn := func(_ context.Context, wfContext historyi.WorkflowContext, mutableState historyi.MutableState, _ historyi.ReleaseWorkflowContextFunc) (any, error) {
244253
if !mutableState.IsWorkflowExecutionRunning() {
245254
// workflow already finished, no need to process the timer
246255
return nil, nil
247256
}
248257

258+
// Use mutableState.Now() as reference time as the mutable state may use virtual time
259+
// which can skip duration and be before the wall clock time.
260+
referenceTime := mutableState.Now()
261+
249262
timerSequence := t.getTimerSequence(mutableState)
250263
timerSequenceIDs := timerSequence.LoadAndSortUserTimers()
251264
if len(timerSequenceIDs) > 0 {
@@ -299,13 +312,14 @@ func (t *timerQueueStandbyTaskExecutor) executeActivityTimeoutTask(
299312
//
300313
// the overall solution is to attempt to generate a new activity timer task whenever the
301314
// task passed in is safe to be throw away.
302-
referenceTime := t.Now()
303315
actionFn := func(ctx context.Context, wfContext historyi.WorkflowContext, mutableState historyi.MutableState, _ historyi.ReleaseWorkflowContextFunc) (any, error) {
304316
if !mutableState.IsWorkflowExecutionRunning() {
305317
// workflow already finished, no need to process the timer
306318
return nil, nil
307319
}
308320

321+
referenceTime := mutableState.Now()
322+
309323
timerSequence := t.getTimerSequence(mutableState)
310324
updateMutableState := false
311325
timerSequenceIDs := timerSequence.LoadAndSortActivityTimers()
@@ -778,6 +792,12 @@ func (t *timerQueueStandbyTaskExecutor) pushActivity(
778792
)
779793
}
780794

795+
// getCurrentTime returns the shard's wall-clock view of "now" for t.clusterName.
796+
// Must stay wall-clock: it gates standby task-retry timing against VisibilityTime
797+
// (also wall-clock); mutableState.Now() is virtual time and would force-discard
798+
// time-skipping workflows. actionFn closures compare against virtual timestamps,
799+
// so they use mutableState.Now() instead.
800+
//
781801
// TODO: deprecate this function and always use t.Now()
782802
// Only test code sets t.clusterName to be non-current cluster name
783803
// and advance the time by setting calling shardContext.SetCurrentTime.

service/history/workflow/mutable_state_impl.go

Lines changed: 60 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4012,8 +4012,12 @@ func (ms *MutableStateImpl) ApplyWorkflowExecutionTimeSkippingTransitionedEvent(
40124012
tsi.AccumulatedSkippedDuration = durationpb.New(0)
40134013
}
40144014
accumulatedSkippedDuration := tsi.GetAccumulatedSkippedDuration().AsDuration()
4015+
4016+
// todo@time-skipping: add precision to the entire time-skipping feature
4017+
var skipDelta time.Duration
40154018
if !timeNotSet(attr.TargetTime) {
4016-
accumulatedSkippedDuration += attr.TargetTime.AsTime().Sub(event.GetEventTime().AsTime())
4019+
skipDelta = attr.TargetTime.AsTime().Sub(event.GetEventTime().AsTime())
4020+
accumulatedSkippedDuration += skipDelta
40174021
}
40184022
tsi.AccumulatedSkippedDuration = durationpb.New(accumulatedSkippedDuration)
40194023
tsi.Config.Enabled = !attr.GetDisabledAfterBound()
@@ -4022,6 +4026,13 @@ func (ms *MutableStateImpl) ApplyWorkflowExecutionTimeSkippingTransitionedEvent(
40224026
tsi.CurrentElapsedDurationBound.HasReached = true
40234027
}
40244028

4029+
if skipDelta > 0 {
4030+
tsi.TaskRegenEntries = append(tsi.TaskRegenEntries, &persistencespb.TimeSkipTaskRegenEntry{
4031+
SourceEventId: event.GetEventId(),
4032+
TaskRegenStatus: TimeSkippingTaskRegenStatusNone,
4033+
})
4034+
}
4035+
40254036
ms.timeSkippingInfoUpdated = true
40264037
return nil
40274038
}
@@ -8483,7 +8494,6 @@ func (ms *MutableStateImpl) closeTransactionRegenerateTimerTasksForTimeSkipping(
84838494
) error {
84848495
switch transactionPolicy {
84858496
case historyi.TransactionPolicyActive:
8486-
// todo@time-skipping: impacts of paused workflow to be considered
84878497
if !ms.IsWorkflowExecutionRunning() {
84888498
return nil
84898499
}
@@ -9011,6 +9021,8 @@ func (ms *MutableStateImpl) syncExecutionInfo(current *persistencespb.WorkflowEx
90119021
}
90129022
}
90139023

9024+
ms.applyIncomingTimeSkippingInfo(current, incoming)
9025+
90149026
doNotSync := func(v any) []any {
90159027
info, ok := v.(*persistencespb.WorkflowExecutionInfo)
90169028
if !ok || info == nil {
@@ -9044,6 +9056,10 @@ func (ms *MutableStateImpl) syncExecutionInfo(current *persistencespb.WorkflowEx
90449056
&info.StateMachineTimers,
90459057
&info.TaskGenerationShardClockTimestamp,
90469058
&info.UpdateInfos,
9059+
// TimeSkippingInfo is handled by applyIncomingTimeSkippingInfo above
9060+
// because TimeSkipTaskRegenEntry.TaskRegenStatus is per-cluster local state
9061+
// and must not be clobbered by the wire value.
9062+
&info.TimeSkippingInfo,
90479063
}
90489064
if !isSnapshot {
90499065
ignoreFields = append(ignoreFields, &info.SubStateMachineTombstoneBatches)
@@ -9060,6 +9076,40 @@ func (ms *MutableStateImpl) syncExecutionInfo(current *persistencespb.WorkflowEx
90609076
return nil
90619077
}
90629078

9079+
// applyIncomingTimeSkippingInfo merges the incoming TimeSkippingInfo (from a
9080+
// replication mutation/snapshot) into current, preserving each local
9081+
// TimeSkipTaskRegenEntry's TaskRegenStatus by SourceEventId. Entries that are
9082+
// new to this cluster get TaskRegenStatus reset to None so the local refresh
9083+
// can regenerate timer tasks. Mirrors the TimerTaskStatus override pattern at
9084+
// applyUpdatesToSubStateMachines.
9085+
func (ms *MutableStateImpl) applyIncomingTimeSkippingInfo(
9086+
current *persistencespb.WorkflowExecutionInfo,
9087+
incoming *persistencespb.WorkflowExecutionInfo,
9088+
) {
9089+
if incoming.GetTimeSkippingInfo() == nil {
9090+
current.TimeSkippingInfo = nil
9091+
return
9092+
}
9093+
9094+
localStatusByEventID := make(map[int64]int32)
9095+
if current.GetTimeSkippingInfo() != nil {
9096+
for _, d := range current.TimeSkippingInfo.GetTaskRegenEntries() {
9097+
localStatusByEventID[d.GetSourceEventId()] = d.GetTaskRegenStatus()
9098+
}
9099+
}
9100+
9101+
newTSI := common.CloneProto(incoming.TimeSkippingInfo)
9102+
for _, d := range newTSI.GetTaskRegenEntries() {
9103+
if localStatus, ok := localStatusByEventID[d.GetSourceEventId()]; ok {
9104+
d.TaskRegenStatus = localStatus
9105+
} else {
9106+
d.TaskRegenStatus = TimeSkippingTaskRegenStatusNone
9107+
}
9108+
}
9109+
9110+
current.TimeSkippingInfo = newTSI
9111+
}
9112+
90639113
func (ms *MutableStateImpl) syncSubStateMachinesByType(incoming map[string]*persistencespb.StateMachineMap) error {
90649114
// check if there is node been deleted
90659115
currentHSM := ms.HSM()
@@ -9541,10 +9591,17 @@ func (ms *MutableStateImpl) initTimeSkippingInfo(
95419591
initialSkippedDuration *durationpb.Duration,
95429592
currentEventID int64,
95439593
) {
9544-
ms.executionInfo.TimeSkippingInfo = &persistencespb.TimeSkippingInfo{
9594+
tsi := &persistencespb.TimeSkippingInfo{
95459595
Config: config,
95469596
AccumulatedSkippedDuration: initialSkippedDuration,
95479597
}
9598+
if initialSkippedDuration != nil && initialSkippedDuration.AsDuration() > 0 {
9599+
tsi.TaskRegenEntries = []*persistencespb.TimeSkipTaskRegenEntry{{
9600+
SourceEventId: currentEventID,
9601+
TaskRegenStatus: TimeSkippingTaskRegenStatusNone,
9602+
}}
9603+
}
9604+
ms.executionInfo.TimeSkippingInfo = tsi
95489605
ms.wrapTimeSourceWithTimeSkipping()
95499606
ms.shiftWorkflowTimes(initialSkippedDuration)
95509607
ms.applyTimeSkippingBound(currentEventID)

service/history/workflow/mutable_state_impl_test.go

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8153,3 +8153,115 @@ func (s *mutableStateSuite) TestCloseTransactionTimeSkipping_Bound() {
81538153
s.InDelta(float64(time.Hour), float64(got), float64(time.Millisecond))
81548154
})
81558155
}
8156+
8157+
// TestApplyIncomingTimeSkippingInfo_PreservesLocalRegenStatus verifies the
8158+
// merge contract: TimeSkipTaskRegenEntry.TaskRegenStatus is per-cluster local
8159+
// state. For details the receiver already has (matched by SourceEventId), the
8160+
// local status is preserved; new details are reset to None so the local
8161+
// refresh can regenerate timer tasks.
8162+
func TestApplyIncomingTimeSkippingInfo_PreservesLocalRegenStatus(t *testing.T) {
8163+
t.Parallel()
8164+
8165+
ms := &MutableStateImpl{}
8166+
8167+
for _, tc := range []struct {
8168+
name string
8169+
current *persistencespb.TimeSkippingInfo
8170+
incoming *persistencespb.TimeSkippingInfo
8171+
wantStatuses map[int64]int32 // SourceEventId -> expected TaskRegenStatus
8172+
}{
8173+
{
8174+
name: "current nil",
8175+
current: nil,
8176+
incoming: &persistencespb.TimeSkippingInfo{
8177+
TaskRegenEntries: []*persistencespb.TimeSkipTaskRegenEntry{
8178+
{SourceEventId: 5, TaskRegenStatus: TimeSkippingTaskRegenStatusRegenerated},
8179+
{SourceEventId: 9, TaskRegenStatus: TimeSkippingTaskRegenStatusRegenerated},
8180+
},
8181+
},
8182+
wantStatuses: map[int64]int32{
8183+
5: TimeSkippingTaskRegenStatusNone,
8184+
9: TimeSkippingTaskRegenStatusNone,
8185+
},
8186+
},
8187+
{
8188+
name: "matching by SourceEventId preserves local status; new entry resets to None",
8189+
current: &persistencespb.TimeSkippingInfo{
8190+
TaskRegenEntries: []*persistencespb.TimeSkipTaskRegenEntry{
8191+
{SourceEventId: 5, TaskRegenStatus: TimeSkippingTaskRegenStatusRegenerated},
8192+
{SourceEventId: 9, TaskRegenStatus: TimeSkippingTaskRegenStatusNone},
8193+
},
8194+
},
8195+
incoming: &persistencespb.TimeSkippingInfo{
8196+
TaskRegenEntries: []*persistencespb.TimeSkipTaskRegenEntry{
8197+
// Wire always carries Regenerated (sender marked before sending).
8198+
{SourceEventId: 5, TaskRegenStatus: TimeSkippingTaskRegenStatusRegenerated},
8199+
{SourceEventId: 9, TaskRegenStatus: TimeSkippingTaskRegenStatusRegenerated},
8200+
{SourceEventId: 13, TaskRegenStatus: TimeSkippingTaskRegenStatusRegenerated},
8201+
},
8202+
},
8203+
wantStatuses: map[int64]int32{
8204+
5: TimeSkippingTaskRegenStatusRegenerated, // local kept
8205+
9: TimeSkippingTaskRegenStatusNone, // local kept
8206+
13: TimeSkippingTaskRegenStatusNone, // new -> None
8207+
},
8208+
},
8209+
} {
8210+
tc := tc
8211+
t.Run(tc.name, func(t *testing.T) {
8212+
t.Parallel()
8213+
8214+
current := &persistencespb.WorkflowExecutionInfo{TimeSkippingInfo: tc.current}
8215+
incoming := &persistencespb.WorkflowExecutionInfo{TimeSkippingInfo: tc.incoming}
8216+
8217+
ms.applyIncomingTimeSkippingInfo(current, incoming)
8218+
8219+
if tc.wantStatuses == nil {
8220+
require.Nil(t, current.TimeSkippingInfo)
8221+
return
8222+
}
8223+
require.NotNil(t, current.TimeSkippingInfo)
8224+
require.Len(t, current.TimeSkippingInfo.TaskRegenEntries, len(tc.wantStatuses))
8225+
for _, d := range current.TimeSkippingInfo.TaskRegenEntries {
8226+
want, ok := tc.wantStatuses[d.GetSourceEventId()]
8227+
require.True(t, ok, "unexpected detail SourceEventId=%d in result", d.GetSourceEventId())
8228+
require.Equal(t, want, d.GetTaskRegenStatus(),
8229+
"detail SourceEventId=%d: expected status %d, got %d",
8230+
d.GetSourceEventId(), want, d.GetTaskRegenStatus())
8231+
}
8232+
})
8233+
}
8234+
}
8235+
8236+
// TestApplyIncomingTimeSkippingInfo_ClonesToBreakAliasing verifies the merge
8237+
// clones the incoming TimeSkippingInfo so subsequent local mutations
8238+
// (e.g. flipping TaskRegenStatus during PartialRefresh) cannot leak back into
8239+
// the wire message.
8240+
func TestApplyIncomingTimeSkippingInfo_ClonesToBreakAliasing(t *testing.T) {
8241+
t.Parallel()
8242+
8243+
ms := &MutableStateImpl{}
8244+
8245+
incoming := &persistencespb.WorkflowExecutionInfo{
8246+
TimeSkippingInfo: &persistencespb.TimeSkippingInfo{
8247+
TaskRegenEntries: []*persistencespb.TimeSkipTaskRegenEntry{
8248+
{SourceEventId: 5, TaskRegenStatus: TimeSkippingTaskRegenStatusRegenerated},
8249+
},
8250+
},
8251+
}
8252+
current := &persistencespb.WorkflowExecutionInfo{}
8253+
8254+
ms.applyIncomingTimeSkippingInfo(current, incoming)
8255+
8256+
require.NotSame(t, current.TimeSkippingInfo, incoming.TimeSkippingInfo,
8257+
"current.TimeSkippingInfo must be a cloned copy, not aliased to incoming")
8258+
require.NotSame(t, current.TimeSkippingInfo.TaskRegenEntries[0],
8259+
incoming.TimeSkippingInfo.TaskRegenEntries[0],
8260+
"detail must also be cloned")
8261+
8262+
// Mutate local; incoming must not change.
8263+
current.TimeSkippingInfo.TaskRegenEntries[0].TaskRegenStatus = TimeSkippingTaskRegenStatusRegenerated
8264+
require.Equal(t, TimeSkippingTaskRegenStatusRegenerated,
8265+
incoming.TimeSkippingInfo.TaskRegenEntries[0].GetTaskRegenStatus(),
8266+
"sanity: incoming was already Regenerated; this assertion is a regression guard if the wire value were ever mutated")
8267+
}

0 commit comments

Comments
 (0)