Skip to content

Commit b8b3cda

Browse files
authored
Do not send backfill task when event is empty (temporalio#7130)
## What changed? <!-- Describe what has changed in this PR --> Do not send backfill task when event is empty ## Why? <!-- Tell your future self why have you made these changes --> Backfill tasks should always have associated events. If there is no event(i.e. for a state only transition), then no backfill to perform and we should skip the task ## How did you test it? <!-- How have you verified this change? Tested locally? Added a unit test? Checked in staging env? --> n/a ## Potential risks <!-- Assuming the worst case, what can be broken when deploying this change to production? --> n/a ## Documentation <!-- Have you made sure this change doesn't falsify anything currently stated in `docs/`? If significant new behavior is added, have you described that in `docs/`? --> n/a ## Is hotfix candidate? <!-- Is this PR a hotfix candidate or does it require a notification to be sent to the broader community? (Yes/No) --> no
1 parent 767cc29 commit b8b3cda

File tree

7 files changed

+1189
-1160
lines changed

7 files changed

+1189
-1160
lines changed

api/persistence/v1/executions.pb.go

Lines changed: 1135 additions & 1118 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

common/persistence/serialization/task_serializer.go

Lines changed: 22 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1265,18 +1265,19 @@ func (s *TaskSerializer) replicationSyncVersionedTransitionTaskToProto(
12651265
}
12661266

12671267
return &persistencespb.ReplicationTaskInfo{
1268-
NamespaceId: syncVersionedTransitionTask.WorkflowKey.NamespaceID,
1269-
WorkflowId: syncVersionedTransitionTask.WorkflowKey.WorkflowID,
1270-
RunId: syncVersionedTransitionTask.WorkflowKey.RunID,
1271-
TaskType: enumsspb.TASK_TYPE_REPLICATION_SYNC_VERSIONED_TRANSITION,
1272-
TaskId: syncVersionedTransitionTask.TaskID,
1273-
VisibilityTime: timestamppb.New(syncVersionedTransitionTask.VisibilityTimestamp),
1274-
VersionedTransition: syncVersionedTransitionTask.VersionedTransition,
1275-
FirstEventId: syncVersionedTransitionTask.FirstEventID,
1276-
Version: syncVersionedTransitionTask.FirstEventVersion,
1277-
NextEventId: syncVersionedTransitionTask.NextEventID,
1278-
NewRunId: syncVersionedTransitionTask.NewRunID,
1279-
TaskEquivalents: taskInfoEquivalents,
1268+
NamespaceId: syncVersionedTransitionTask.WorkflowKey.NamespaceID,
1269+
WorkflowId: syncVersionedTransitionTask.WorkflowKey.WorkflowID,
1270+
RunId: syncVersionedTransitionTask.WorkflowKey.RunID,
1271+
TaskType: enumsspb.TASK_TYPE_REPLICATION_SYNC_VERSIONED_TRANSITION,
1272+
TaskId: syncVersionedTransitionTask.TaskID,
1273+
VisibilityTime: timestamppb.New(syncVersionedTransitionTask.VisibilityTimestamp),
1274+
VersionedTransition: syncVersionedTransitionTask.VersionedTransition,
1275+
FirstEventId: syncVersionedTransitionTask.FirstEventID,
1276+
Version: syncVersionedTransitionTask.FirstEventVersion,
1277+
NextEventId: syncVersionedTransitionTask.NextEventID,
1278+
NewRunId: syncVersionedTransitionTask.NewRunID,
1279+
LastVersionHistoryItem: syncVersionedTransitionTask.LastVersionHistoryItem,
1280+
TaskEquivalents: taskInfoEquivalents,
12801281
}, nil
12811282
}
12821283

@@ -1303,14 +1304,15 @@ func (s *TaskSerializer) replicationSyncVersionedTransitionTaskFromProto(
13031304
syncVersionedTransitionTask.WorkflowId,
13041305
syncVersionedTransitionTask.RunId,
13051306
),
1306-
VisibilityTimestamp: visibilityTimestamp,
1307-
TaskID: syncVersionedTransitionTask.TaskId,
1308-
FirstEventID: syncVersionedTransitionTask.FirstEventId,
1309-
FirstEventVersion: syncVersionedTransitionTask.Version,
1310-
NextEventID: syncVersionedTransitionTask.NextEventId,
1311-
NewRunID: syncVersionedTransitionTask.NewRunId,
1312-
VersionedTransition: syncVersionedTransitionTask.VersionedTransition,
1313-
TaskEquivalents: taskEquivalents,
1307+
VisibilityTimestamp: visibilityTimestamp,
1308+
TaskID: syncVersionedTransitionTask.TaskId,
1309+
FirstEventID: syncVersionedTransitionTask.FirstEventId,
1310+
FirstEventVersion: syncVersionedTransitionTask.Version,
1311+
NextEventID: syncVersionedTransitionTask.NextEventId,
1312+
NewRunID: syncVersionedTransitionTask.NewRunId,
1313+
VersionedTransition: syncVersionedTransitionTask.VersionedTransition,
1314+
LastVersionHistoryItem: syncVersionedTransitionTask.LastVersionHistoryItem,
1315+
TaskEquivalents: taskEquivalents,
13141316
}, nil
13151317
}
13161318

proto/internal/temporal/server/api/persistence/v1/executions.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -355,6 +355,7 @@ message ReplicationTaskInfo {
355355
// TODO: Remove this field when state-based replication is stable and
356356
// doesn't need to be disabled.
357357
repeated ReplicationTaskInfo task_equivalents = 20;
358+
history.v1.VersionHistoryItem last_version_history_item = 21;
358359
}
359360

360361
// visibility_task_data column

service/history/ndc/workflow_state_replicator.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -267,7 +267,7 @@ func (r *WorkflowStateReplicatorImpl) ReplicateVersionedTransition(
267267
// TODO: Revisit these logic when working on roll out/back plan
268268
if snapshot == nil {
269269
return serviceerrors.NewSyncState(
270-
"failed to apply mutation due to missing mutable state",
270+
"failed to apply versioned transition due to missing snapshot",
271271
namespaceID.String(),
272272
wid,
273273
rid,
@@ -350,7 +350,7 @@ func (r *WorkflowStateReplicatorImpl) applyMutation(
350350
}
351351
if localMutableState == nil {
352352
return serviceerrors.NewSyncState(
353-
"failed to apply mutation due to missing mutable state",
353+
"failed to apply mutation due to missing local mutable state",
354354
namespaceID.String(),
355355
workflowID,
356356
runID,
@@ -438,7 +438,7 @@ func (r *WorkflowStateReplicatorImpl) applySnapshot(
438438
versionHistories = localMutableState.GetExecutionInfo().VersionHistories
439439
}
440440
return serviceerrors.NewSyncState(
441-
"failed to apply mutation due to missing mutable state",
441+
"failed to apply mutation due to missing task snapshot",
442442
namespaceID.String(),
443443
workflowID,
444444
runID,

service/history/replication/raw_task_converter.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -730,12 +730,16 @@ func (c *syncVersionedTransitionTaskConverter) generateVerifyVersionedTransition
730730
if err != nil {
731731
return nil, err
732732
}
733-
lastEventVersion, err := versionhistory.GetVersionHistoryEventVersion(currentHistory, taskInfo.NextEventID-1)
733+
var nextEventId = taskInfo.NextEventID
734+
if nextEventId == common.EmptyEventID {
735+
nextEventId = taskInfo.LastVersionHistoryItem.GetEventId() + 1
736+
}
737+
lastEventVersion, err := versionhistory.GetVersionHistoryEventVersion(currentHistory, nextEventId-1)
734738
if err != nil {
735739
return nil, err
736740
}
737741
capItems, err := versionhistory.CopyVersionHistoryUntilLCAVersionHistoryItem(currentHistory, &historyspb.VersionHistoryItem{
738-
EventId: taskInfo.NextEventID - 1,
742+
EventId: nextEventId - 1,
739743
Version: lastEventVersion,
740744
})
741745
if err != nil {
@@ -751,7 +755,7 @@ func (c *syncVersionedTransitionTaskConverter) generateVerifyVersionedTransition
751755
RunId: taskInfo.RunID,
752756
NewRunId: taskInfo.NewRunID,
753757
EventVersionHistory: capItems.Items,
754-
NextEventId: taskInfo.NextEventID,
758+
NextEventId: nextEventId,
755759
},
756760
},
757761
VersionedTransition: taskInfo.VersionedTransition,

service/history/tasks/sync_versioned_transition_task.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
"time"
2828

2929
enumsspb "go.temporal.io/server/api/enums/v1"
30+
historyspb "go.temporal.io/server/api/history/v1"
3031
persistencespb "go.temporal.io/server/api/persistence/v1"
3132
"go.temporal.io/server/common/definition"
3233
)
@@ -40,11 +41,12 @@ type (
4041
TaskID int64
4142
Priority enumsspb.TaskPriority
4243

43-
VersionedTransition *persistencespb.VersionedTransition
44-
FirstEventVersion int64
45-
FirstEventID int64
46-
NextEventID int64
47-
NewRunID string
44+
VersionedTransition *persistencespb.VersionedTransition
45+
FirstEventVersion int64
46+
FirstEventID int64 // First event ID of version transition
47+
NextEventID int64 // Next event ID after version transition
48+
LastVersionHistoryItem *historyspb.VersionHistoryItem // Last version history item of version transition when version transition does not have associated events
49+
NewRunID string
4850

4951
TaskEquivalents []Task
5052
}

service/history/workflow/mutable_state_impl.go

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6096,6 +6096,7 @@ func (ms *MutableStateImpl) closeTransactionPrepareReplicationTasks(
60966096
ms.executionState.RunId,
60976097
)
60986098
var firstEventID, firstEventVersion, nextEventID int64
6099+
var lastVersionHistoryItem *historyspb.VersionHistoryItem
60996100
if len(eventBatches) > 0 {
61006101
firstEventID = eventBatches[0][0].EventId
61016102
firstEventVersion = eventBatches[0][0].Version
@@ -6110,24 +6111,26 @@ func (ms *MutableStateImpl) closeTransactionPrepareReplicationTasks(
61106111
if err != nil {
61116112
return err
61126113
}
6113-
firstEventID = item.EventId
6114-
firstEventVersion = item.Version
6115-
nextEventID = item.EventId + 1
6114+
firstEventID = common.EmptyEventID
6115+
firstEventVersion = common.EmptyVersion
6116+
nextEventID = common.EmptyEventID
6117+
lastVersionHistoryItem = versionhistory.CopyVersionHistoryItem(item)
61166118
}
61176119
transitionHistory := ms.executionInfo.TransitionHistory
61186120
if len(transitionHistory) > 0 && CompareVersionedTransition(
61196121
ms.versionedTransitionInDB,
61206122
ms.executionInfo.TransitionHistory[len(ms.executionInfo.TransitionHistory)-1],
61216123
) != 0 {
61226124
syncVersionedTransitionTask := &tasks.SyncVersionedTransitionTask{
6123-
WorkflowKey: workflowKey,
6124-
VisibilityTimestamp: now,
6125-
Priority: enumsspb.TASK_PRIORITY_HIGH,
6126-
VersionedTransition: transitionHistory[len(transitionHistory)-1],
6127-
FirstEventID: firstEventID,
6128-
FirstEventVersion: firstEventVersion,
6129-
NextEventID: nextEventID,
6130-
TaskEquivalents: replicationTasks,
6125+
WorkflowKey: workflowKey,
6126+
VisibilityTimestamp: now,
6127+
Priority: enumsspb.TASK_PRIORITY_HIGH,
6128+
VersionedTransition: transitionHistory[len(transitionHistory)-1],
6129+
FirstEventID: firstEventID,
6130+
FirstEventVersion: firstEventVersion,
6131+
NextEventID: nextEventID,
6132+
TaskEquivalents: replicationTasks,
6133+
LastVersionHistoryItem: lastVersionHistoryItem,
61316134
}
61326135

61336136
// versioned transition updated in the transaction

0 commit comments

Comments
 (0)