Skip to content

Commit 70dcf17

Browse files
authored
No-op close transfer task for SyncWorkflowState (temporalio#8507)
## What changed? No-oping close transfer tasks for SyncWorkflowState tasks ## Why? We need non state based replication to be eligible for this optimization. ## How did you test it? - [x] built - [x] run locally and tested manually - [x] covered by existing tests - [ ] added new unit test(s) - [x] added new functional test(s) `go test -v -tags test_dep ./tests/xdc -run TestStreamBasedReplicationTestSuite/DisableTransitionHistory/TestCloseTransferTaskAckedReplication -timeout 10m -count=1` shows `2025-10-20T08:15:08.290-0700 info Skipping close transfer task generation - already acked on active cluster {"cluster-name": "standby_aadnd", "host": "127.0.0.1:57179", "shard-id": 1, "address": "127.0.0.1:57179", "wf-namespace-id": "e550305e-0b43-4bcd-a490-8e3223f51ce1", "wf-id": "test-replication-e2c094d3-c34f-42d9-a166-a967d4e7f602", "wf-run-id": "019a0230-299a-74ba-a31f-ac247c05f2b9", "logging-call-at": "/Users/michaely520/projects/temporal/service/history/workflow/task_generator.go:206"} stream_based_replication_test.go:975: Verified IsCloseTransferTaskAcked and IsForceReplication flags in SyncWorkflowStateTask`
1 parent 661ad8c commit 70dcf17

File tree

12 files changed

+160
-57
lines changed

12 files changed

+160
-57
lines changed

api/historyservice/v1/request_response.pb.go

Lines changed: 26 additions & 8 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

api/replication/v1/message.pb.go

Lines changed: 24 additions & 6 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

common/persistence/serialization/task_serializer.go

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1293,15 +1293,16 @@ func (s *TaskSerializer) replicationSyncWorkflowStateTaskToProto(
12931293
syncWorkflowStateTask *tasks.SyncWorkflowStateTask,
12941294
) *persistencespb.ReplicationTaskInfo {
12951295
return &persistencespb.ReplicationTaskInfo{
1296-
NamespaceId: syncWorkflowStateTask.WorkflowKey.NamespaceID,
1297-
WorkflowId: syncWorkflowStateTask.WorkflowKey.WorkflowID,
1298-
RunId: syncWorkflowStateTask.WorkflowKey.RunID,
1299-
TaskType: enumsspb.TASK_TYPE_REPLICATION_SYNC_WORKFLOW_STATE,
1300-
TaskId: syncWorkflowStateTask.TaskID,
1301-
Version: syncWorkflowStateTask.Version,
1302-
VisibilityTime: timestamppb.New(syncWorkflowStateTask.VisibilityTimestamp),
1303-
Priority: syncWorkflowStateTask.Priority,
1304-
TargetClusters: syncWorkflowStateTask.TargetClusters,
1296+
NamespaceId: syncWorkflowStateTask.NamespaceID,
1297+
WorkflowId: syncWorkflowStateTask.WorkflowID,
1298+
RunId: syncWorkflowStateTask.RunID,
1299+
TaskType: enumsspb.TASK_TYPE_REPLICATION_SYNC_WORKFLOW_STATE,
1300+
TaskId: syncWorkflowStateTask.TaskID,
1301+
Version: syncWorkflowStateTask.Version,
1302+
VisibilityTime: timestamppb.New(syncWorkflowStateTask.VisibilityTimestamp),
1303+
Priority: syncWorkflowStateTask.Priority,
1304+
TargetClusters: syncWorkflowStateTask.TargetClusters,
1305+
IsForceReplication: syncWorkflowStateTask.IsForceReplication,
13051306
}
13061307
}
13071308

@@ -1323,6 +1324,7 @@ func (s *TaskSerializer) replicationSyncWorkflowStateTaskFromProto(
13231324
TaskID: syncWorkflowStateTask.TaskId,
13241325
Priority: syncWorkflowStateTask.Priority,
13251326
TargetClusters: syncWorkflowStateTask.TargetClusters,
1327+
IsForceReplication: syncWorkflowStateTask.IsForceReplication,
13261328
}
13271329
}
13281330

proto/internal/temporal/server/api/historyservice/v1/request_response.proto

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -606,6 +606,8 @@ message ReplicateWorkflowStateRequest {
606606
temporal.server.api.persistence.v1.WorkflowMutableState workflow_state = 1;
607607
string remote_cluster = 2;
608608
string namespace_id= 3;
609+
bool is_force_replication = 4;
610+
bool is_close_transfer_task_acked = 5;
609611
}
610612

611613
message ReplicateWorkflowStateResponse {

proto/internal/temporal/server/api/replication/v1/message.proto

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,8 @@ message HistoryTaskAttributes {
179179

180180
message SyncWorkflowStateTaskAttributes {
181181
temporal.server.api.persistence.v1.WorkflowMutableState workflow_state = 1;
182+
bool is_force_replication = 2;
183+
bool is_close_transfer_task_acked = 3;
182184
}
183185

184186
message TaskQueueUserDataAttributes {

service/history/ndc/workflow_state_replicator.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,8 @@ func (r *WorkflowStateReplicatorImpl) SyncWorkflowState(
187187
default:
188188
return err
189189
}
190-
return r.applySnapshotWhenWorkflowNotExist(ctx, namespaceID, wid, rid, wfCtx, releaseFn, request.GetWorkflowState(), request.RemoteCluster, nil, false, false)
190+
skipCloseTransferTask := request.GetIsForceReplication() && request.GetIsCloseTransferTaskAcked()
191+
return r.applySnapshotWhenWorkflowNotExist(ctx, namespaceID, wid, rid, wfCtx, releaseFn, request.GetWorkflowState(), request.RemoteCluster, nil, false, skipCloseTransferTask)
191192
}
192193

193194
//nolint:revive // cognitive complexity 37 (> max enabled 25)

service/history/replication/executable_workflow_state_task.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -62,9 +62,11 @@ func NewExecutableWorkflowStateTask(
6262
replicationTask,
6363
),
6464
req: &historyservice.ReplicateWorkflowStateRequest{
65-
NamespaceId: namespaceID,
66-
WorkflowState: task.GetWorkflowState(),
67-
RemoteCluster: sourceClusterName,
65+
NamespaceId: namespaceID,
66+
WorkflowState: task.GetWorkflowState(),
67+
RemoteCluster: sourceClusterName,
68+
IsForceReplication: task.GetIsForceReplication(),
69+
IsCloseTransferTaskAcked: task.GetIsCloseTransferTaskAcked(),
6870
},
6971
}
7072
}

service/history/replication/raw_task_converter.go

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -223,13 +223,21 @@ func convertWorkflowStateReplicationTask(
223223
if err := common.DiscardUnknownProto(workflowMutableState); err != nil {
224224
return nil, err
225225
}
226+
227+
isCloseTransferTaskAcked := isCloseTransferTaskAckedForWorkflow(shardContext, &tasks.CloseExecutionTask{
228+
WorkflowKey: mutableState.GetWorkflowKey(),
229+
TaskID: mutableState.GetExecutionInfo().GetCloseTransferTaskId(),
230+
})
231+
226232
return &replicationspb.ReplicationTask{
227233
TaskType: enumsspb.REPLICATION_TASK_TYPE_SYNC_WORKFLOW_STATE_TASK,
228234
SourceTaskId: taskInfo.TaskID,
229235
Priority: taskInfo.Priority,
230236
Attributes: &replicationspb.ReplicationTask_SyncWorkflowStateTaskAttributes{
231237
SyncWorkflowStateTaskAttributes: &replicationspb.SyncWorkflowStateTaskAttributes{
232-
WorkflowState: workflowMutableState,
238+
WorkflowState: workflowMutableState,
239+
IsForceReplication: taskInfo.IsForceReplication,
240+
IsCloseTransferTaskAcked: isCloseTransferTaskAcked,
233241
},
234242
},
235243
VisibilityTime: timestamppb.New(taskInfo.VisibilityTimestamp),
@@ -864,12 +872,19 @@ func (c *syncVersionedTransitionTaskConverter) generateBackfillHistoryTask(
864872

865873
func (c *syncVersionedTransitionTaskConverter) isCloseTransferTaskAcked(
866874
closeTransferTask *tasks.CloseExecutionTask,
875+
) bool {
876+
return isCloseTransferTaskAckedForWorkflow(c.shardContext, closeTransferTask)
877+
}
878+
879+
func isCloseTransferTaskAckedForWorkflow(
880+
shardContext historyi.ShardContext,
881+
closeTransferTask *tasks.CloseExecutionTask,
867882
) bool {
868883
if closeTransferTask.TaskID == 0 {
869884
return false
870885
}
871886

872-
transferQueueState, ok := c.shardContext.GetQueueState(tasks.CategoryTransfer)
887+
transferQueueState, ok := shardContext.GetQueueState(tasks.CategoryTransfer)
873888
if !ok {
874889
return false
875890
}

service/history/replication/raw_task_converter_test.go

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -560,6 +560,16 @@ func (s *rawTaskConverterSuite) TestConvertWorkflowStateReplicationTask_Workflow
560560
},
561561
}).AnyTimes()
562562
s.mutableState.EXPECT().GetWorkflowStateStatus().Return(enumsspb.WORKFLOW_EXECUTION_STATE_COMPLETED, enumspb.WORKFLOW_EXECUTION_STATUS_COMPLETED).AnyTimes()
563+
// Mock for watermark check
564+
executionInfo := &persistencespb.WorkflowExecutionInfo{
565+
NamespaceId: s.namespaceID,
566+
WorkflowId: s.workflowID,
567+
TaskGenerationShardClockTimestamp: 123,
568+
CloseVisibilityTaskId: 456,
569+
CloseTransferTaskId: 789,
570+
}
571+
s.mutableState.EXPECT().GetExecutionInfo().Return(executionInfo).AnyTimes()
572+
s.mutableState.EXPECT().GetWorkflowKey().Return(definition.NewWorkflowKey(s.namespaceID, s.workflowID, s.runID)).AnyTimes()
563573

564574
result, err := convertWorkflowStateReplicationTask(ctx, s.shardContext, task, s.workflowCache)
565575
s.NoError(err)
@@ -571,7 +581,9 @@ func (s *rawTaskConverterSuite) TestConvertWorkflowStateReplicationTask_Workflow
571581
SourceTaskId: task.TaskID,
572582
Attributes: &replicationspb.ReplicationTask_SyncWorkflowStateTaskAttributes{
573583
SyncWorkflowStateTaskAttributes: &replicationspb.SyncWorkflowStateTaskAttributes{
574-
WorkflowState: sanitizedMutableState,
584+
WorkflowState: sanitizedMutableState,
585+
IsForceReplication: task.IsForceReplication,
586+
IsCloseTransferTaskAcked: false, // No queue state available
575587
},
576588
},
577589
VisibilityTime: timestamppb.New(task.VisibilityTimestamp),

service/history/tasks/sync_workflow_state_task.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,10 @@ type (
1515
VisibilityTimestamp time.Time
1616
TaskID int64
1717
// TODO: validate this version in source task converter
18-
Version int64
19-
Priority enumsspb.TaskPriority
20-
TargetClusters []string
18+
Version int64
19+
Priority enumsspb.TaskPriority
20+
TargetClusters []string
21+
IsForceReplication bool
2122
}
2223
)
2324

0 commit comments

Comments
 (0)