Skip to content

Commit f717213

Browse files
authored
Refactor peristence layer to support inserting history tasks of new categories (#6671)
**Detailed Description** Introduce data and data_encoding columns to the executions table of Cassandra. These field are used to replace other columns in `executions` table. We'll start by replacing the history task columns (`transfer`, `timer` and `replication`). **Impact Analysis** - **Backward Compatibility**: Yes. It only changes the schema, no data is written to the new columns in this PR. - **Forward Compatibility**: Yes. It only changes the schema, no data is written to the new columns in this PR. **Testing Plan** - **Unit Tests**: Yes. - **Persistence Tests**: No. The code implementation is not fully done in this PR. - **Integration Tests**: No. - **Compatibility Tests**: No. **Rollout Plan** - What is the rollout plan? No special rollout plan. - Does the order of deployment matter? Doesn't matter. - Is it safe to rollback? Does the order of rollback matter? Yes. The order doesn't matter. - Is there a kill switch to mitigate the impact immediately? No. <!-- Describe what has changed in this PR --> **What changed?** - Introduce `data` and `data_encoding` columns to the executions table of Cassandra. (NOTE: no data is written to Cassandra in this PR, it will be introduced in a separate PR) - Refactor persistence data type to replace transfer tasks, timer tasks, and replication tasks with tasksByCategory map of tasks - Remove cross cluster tasks logic from persistence layer <!-- Tell your future self why have you made these changes --> **Why?** - The new columns will be used to store data in `executions` table. A migration is required later to migrate transfer, timer and replication tasks data from their corresponding columns to `data` and `data_encoding` columns. The migration will be done in several phases: 1. Dual writes: data is written in both `transfer/timer/replication` column and `data`, `data_encoding` columns 2. Read from old columns 3. Read from new columns Several other migrations will be initiated to migrate data from other columns of other types to these 2 columns. - The refactoring is to make it easier to introduce new categories of history tasks. - Cross cluster task is deprecated. - This change should only affect cadence-history <!-- How have you verified this change? Tested locally? Added a unit test? Checked in staging env? --> **How did you test it?** Unit tests <!-- Assuming the worst case, what can be broken when deploying this change to production? --> **Potential risks** <!-- Is it notable for release? e.g. schema updates, configuration or data migration required? If so, please mention it, and also update CHANGELOG.md --> **Release notes** <!-- Is there any documentation updates should be made for config, https://cadenceworkflow.io/docs/operation-guide/setup/ ? If so, please open an PR in https://github.com/cadence-workflow/cadence-docs --> **Documentation Changes**
1 parent 0383b18 commit f717213

39 files changed

+910
-1464
lines changed

common/persistence/data_manager_interfaces.go

Lines changed: 43 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,46 @@ const (
174174
TaskListKindSticky
175175
)
176176

177+
// HistoryTaskCategory represents various categories of history tasks
178+
type HistoryTaskCategory struct {
179+
categoryType int
180+
categoryID int
181+
}
182+
183+
func (c *HistoryTaskCategory) Type() int {
184+
return c.categoryType
185+
}
186+
187+
func (c *HistoryTaskCategory) ID() int {
188+
return c.categoryID
189+
}
190+
191+
const (
192+
HistoryTaskCategoryTypeImmediate = iota + 1
193+
HistoryTaskCategoryTypeScheduled
194+
)
195+
196+
const (
197+
HistoryTaskCategoryIDTransfer = 1
198+
HistoryTaskCategoryIDTimer = 2
199+
HistoryTaskCategoryIDReplication = 3
200+
)
201+
202+
var (
203+
HistoryTaskCategoryTransfer = HistoryTaskCategory{
204+
categoryType: HistoryTaskCategoryTypeImmediate,
205+
categoryID: HistoryTaskCategoryIDTransfer,
206+
}
207+
HistoryTaskCategoryTimer = HistoryTaskCategory{
208+
categoryType: HistoryTaskCategoryTypeScheduled,
209+
categoryID: HistoryTaskCategoryIDTimer,
210+
}
211+
HistoryTaskCategoryReplication = HistoryTaskCategory{
212+
categoryType: HistoryTaskCategoryTypeImmediate,
213+
categoryID: HistoryTaskCategoryIDReplication,
214+
}
215+
)
216+
177217
// Transfer task types
178218
const (
179219
TransferTaskTypeDecisionTask = iota
@@ -806,10 +846,7 @@ type (
806846
NewBufferedEvents []*types.HistoryEvent
807847
ClearBufferedEvents bool
808848

809-
TransferTasks []Task
810-
CrossClusterTasks []Task
811-
ReplicationTasks []Task
812-
TimerTasks []Task
849+
TasksByCategory map[HistoryTaskCategory][]Task
813850

814851
WorkflowRequests []*WorkflowRequest
815852

@@ -830,10 +867,7 @@ type (
830867
SignalInfos []*SignalInfo
831868
SignalRequestedIDs []string
832869

833-
TransferTasks []Task
834-
CrossClusterTasks []Task
835-
ReplicationTasks []Task
836-
TimerTasks []Task
870+
TasksByCategory map[HistoryTaskCategory][]Task
837871

838872
WorkflowRequests []*WorkflowRequest
839873

@@ -1326,10 +1360,7 @@ type (
13261360
DeleteSignalInfoCount int
13271361
DeleteRequestCancelInfoCount int
13281362

1329-
TransferTasksCount int
1330-
CrossClusterTaskCount int
1331-
TimerTasksCount int
1332-
ReplicationTasksCount int
1363+
TaskCountByCategory map[HistoryTaskCategory]int
13331364
}
13341365

13351366
// UpdateWorkflowExecutionResponse is response for UpdateWorkflowExecutionRequest

common/persistence/data_store_interfaces.go

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -466,10 +466,7 @@ type (
466466
NewBufferedEvents *DataBlob
467467
ClearBufferedEvents bool
468468

469-
TransferTasks []Task
470-
CrossClusterTasks []Task
471-
TimerTasks []Task
472-
ReplicationTasks []Task
469+
TasksByCategory map[HistoryTaskCategory][]Task
473470

474471
WorkflowRequests []*WorkflowRequest
475472

@@ -493,10 +490,7 @@ type (
493490
SignalInfos []*SignalInfo
494491
SignalRequestedIDs []string
495492

496-
TransferTasks []Task
497-
CrossClusterTasks []Task
498-
TimerTasks []Task
499-
ReplicationTasks []Task
493+
TasksByCategory map[HistoryTaskCategory][]Task
500494

501495
WorkflowRequests []*WorkflowRequest
502496

common/persistence/execution_manager.go

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -676,10 +676,7 @@ func (m *executionManagerImpl) SerializeWorkflowMutation(
676676
NewBufferedEvents: serializedNewBufferedEvents,
677677
ClearBufferedEvents: input.ClearBufferedEvents,
678678

679-
TransferTasks: input.TransferTasks,
680-
CrossClusterTasks: input.CrossClusterTasks,
681-
ReplicationTasks: input.ReplicationTasks,
682-
TimerTasks: input.TimerTasks,
679+
TasksByCategory: input.TasksByCategory,
683680

684681
WorkflowRequests: input.WorkflowRequests,
685682

@@ -742,10 +739,7 @@ func (m *executionManagerImpl) SerializeWorkflowSnapshot(
742739
SignalInfos: input.SignalInfos,
743740
SignalRequestedIDs: input.SignalRequestedIDs,
744741

745-
TransferTasks: input.TransferTasks,
746-
CrossClusterTasks: input.CrossClusterTasks,
747-
ReplicationTasks: input.ReplicationTasks,
748-
TimerTasks: input.TimerTasks,
742+
TasksByCategory: input.TasksByCategory,
749743

750744
WorkflowRequests: input.WorkflowRequests,
751745

common/persistence/execution_manager_test.go

Lines changed: 36 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -426,14 +426,15 @@ func TestExecutionManager_UpdateWorkflowExecution(t *testing.T) {
426426
res, err := manager.UpdateWorkflowExecution(context.Background(), request)
427427
assert.NoError(t, err)
428428
stats := &MutableStateUpdateSessionStats{
429-
MutableStateSize: 90,
430-
ExecutionInfoSize: 40,
431-
ActivityInfoSize: 20,
432-
TimerInfoSize: 10,
433-
ChildInfoSize: 20,
434-
ActivityInfoCount: 1,
435-
TimerInfoCount: 2,
436-
ChildInfoCount: 1,
429+
MutableStateSize: 90,
430+
ExecutionInfoSize: 40,
431+
ActivityInfoSize: 20,
432+
TimerInfoSize: 10,
433+
ChildInfoSize: 20,
434+
ActivityInfoCount: 1,
435+
TimerInfoCount: 1,
436+
ChildInfoCount: 1,
437+
TaskCountByCategory: map[HistoryTaskCategory]int{},
437438
}
438439
assert.Equal(t, stats, res.MutableStateUpdateSessionStats)
439440
}
@@ -1016,14 +1017,15 @@ func TestCreateWorkflowExecution(t *testing.T) {
10161017
checkRes: func(t *testing.T, response *CreateWorkflowExecutionResponse, err error) {
10171018
assert.Equal(t, &CreateWorkflowExecutionResponse{
10181019
MutableStateUpdateSessionStats: &MutableStateUpdateSessionStats{
1019-
MutableStateSize: 91,
1020-
ExecutionInfoSize: 20,
1021-
ActivityInfoSize: 29,
1022-
TimerInfoSize: 22,
1023-
ChildInfoSize: 20,
1024-
ActivityInfoCount: 1,
1025-
TimerInfoCount: 2,
1026-
ChildInfoCount: 1,
1020+
MutableStateSize: 91,
1021+
ExecutionInfoSize: 20,
1022+
ActivityInfoSize: 29,
1023+
TimerInfoSize: 22,
1024+
ChildInfoSize: 20,
1025+
ActivityInfoCount: 1,
1026+
TimerInfoCount: 2,
1027+
ChildInfoCount: 1,
1028+
TaskCountByCategory: map[HistoryTaskCategory]int{},
10271029
},
10281030
}, response)
10291031
},
@@ -1118,14 +1120,15 @@ func TestConflictResolveWorkflowExecution(t *testing.T) {
11181120
assert.NoError(t, err)
11191121
assert.Equal(t, &ConflictResolveWorkflowExecutionResponse{
11201122
MutableStateUpdateSessionStats: &MutableStateUpdateSessionStats{
1121-
MutableStateSize: 91,
1122-
ExecutionInfoSize: 20,
1123-
ActivityInfoSize: 29,
1124-
TimerInfoSize: 22,
1125-
ChildInfoSize: 20,
1126-
ActivityInfoCount: 1,
1127-
TimerInfoCount: 2,
1128-
ChildInfoCount: 1,
1123+
MutableStateSize: 91,
1124+
ExecutionInfoSize: 20,
1125+
ActivityInfoSize: 29,
1126+
TimerInfoSize: 22,
1127+
ChildInfoSize: 20,
1128+
ActivityInfoCount: 1,
1129+
TimerInfoCount: 2,
1130+
ChildInfoCount: 1,
1131+
TaskCountByCategory: map[HistoryTaskCategory]int{},
11291132
},
11301133
}, response)
11311134
},
@@ -1184,14 +1187,15 @@ func TestConflictResolveWorkflowExecution(t *testing.T) {
11841187
assert.NoError(t, err)
11851188
assert.Equal(t, &ConflictResolveWorkflowExecutionResponse{
11861189
MutableStateUpdateSessionStats: &MutableStateUpdateSessionStats{
1187-
MutableStateSize: 161,
1188-
ExecutionInfoSize: 40,
1189-
ActivityInfoSize: 49,
1190-
TimerInfoSize: 32,
1191-
ChildInfoSize: 40,
1192-
ActivityInfoCount: 2,
1193-
TimerInfoCount: 6,
1194-
ChildInfoCount: 2,
1190+
MutableStateSize: 161,
1191+
ExecutionInfoSize: 40,
1192+
ActivityInfoSize: 49,
1193+
TimerInfoSize: 32,
1194+
ChildInfoSize: 40,
1195+
ActivityInfoCount: 2,
1196+
TimerInfoCount: 3,
1197+
ChildInfoCount: 2,
1198+
TaskCountByCategory: map[HistoryTaskCategory]int{},
11951199
},
11961200
}, response)
11971201
},

common/persistence/nosql/nosql_execution_store.go

Lines changed: 35 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -90,10 +90,11 @@ func (d *nosqlExecutionStore) CreateWorkflowExecution(
9090
return nil, err
9191
}
9292

93-
transferTasks, crossClusterTasks, replicationTasks, timerTasks, err := d.prepareNoSQLTasksForWorkflowTxn(
93+
tasksByCategory := map[persistence.HistoryTaskCategory][]*nosqlplugin.HistoryMigrationTask{}
94+
err = d.prepareNoSQLTasksForWorkflowTxn(
9495
domainID, workflowID, runID,
95-
newWorkflow.TransferTasks, newWorkflow.CrossClusterTasks, newWorkflow.ReplicationTasks, newWorkflow.TimerTasks,
96-
nil, nil, nil, nil,
96+
newWorkflow.TasksByCategory,
97+
tasksByCategory,
9798
)
9899
if err != nil {
99100
return nil, err
@@ -115,7 +116,7 @@ func (d *nosqlExecutionStore) CreateWorkflowExecution(
115116
ctx,
116117
workflowRequestsWriteRequest,
117118
currentWorkflowWriteReq, workflowExecutionWriteReq,
118-
transferTasks, crossClusterTasks, replicationTasks, timerTasks,
119+
tasksByCategory,
119120
shardCondition,
120121
)
121122
if err != nil {
@@ -287,21 +288,18 @@ func (d *nosqlExecutionStore) UpdateWorkflowExecution(
287288
}
288289

289290
var mutateExecution, insertExecution *nosqlplugin.WorkflowExecutionRequest
290-
var nosqlTransferTasks []*nosqlplugin.TransferTask
291-
var nosqlCrossClusterTasks []*nosqlplugin.CrossClusterTask
292-
var nosqlReplicationTasks []*nosqlplugin.ReplicationTask
293-
var nosqlTimerTasks []*nosqlplugin.TimerTask
294291
var workflowRequests []*nosqlplugin.WorkflowRequestRow
292+
tasksByCategory := map[persistence.HistoryTaskCategory][]*nosqlplugin.HistoryMigrationTask{}
295293

296294
// 1. current
297295
mutateExecution, err = d.prepareUpdateWorkflowExecutionRequestWithMapsAndEventBuffer(&updateWorkflow)
298296
if err != nil {
299297
return err
300298
}
301-
nosqlTransferTasks, nosqlCrossClusterTasks, nosqlReplicationTasks, nosqlTimerTasks, err = d.prepareNoSQLTasksForWorkflowTxn(
299+
err = d.prepareNoSQLTasksForWorkflowTxn(
302300
domainID, workflowID, updateWorkflow.ExecutionInfo.RunID,
303-
updateWorkflow.TransferTasks, updateWorkflow.CrossClusterTasks, updateWorkflow.ReplicationTasks, updateWorkflow.TimerTasks,
304-
nosqlTransferTasks, nosqlCrossClusterTasks, nosqlReplicationTasks, nosqlTimerTasks,
301+
updateWorkflow.TasksByCategory,
302+
tasksByCategory,
305303
)
306304
if err != nil {
307305
return err
@@ -315,10 +313,10 @@ func (d *nosqlExecutionStore) UpdateWorkflowExecution(
315313
return err
316314
}
317315

318-
nosqlTransferTasks, nosqlCrossClusterTasks, nosqlReplicationTasks, nosqlTimerTasks, err = d.prepareNoSQLTasksForWorkflowTxn(
316+
err = d.prepareNoSQLTasksForWorkflowTxn(
319317
domainID, workflowID, newWorkflow.ExecutionInfo.RunID,
320-
newWorkflow.TransferTasks, newWorkflow.CrossClusterTasks, newWorkflow.ReplicationTasks, newWorkflow.TimerTasks,
321-
nosqlTransferTasks, nosqlCrossClusterTasks, nosqlReplicationTasks, nosqlTimerTasks,
318+
newWorkflow.TasksByCategory,
319+
tasksByCategory,
322320
)
323321
if err != nil {
324322
return err
@@ -339,7 +337,7 @@ func (d *nosqlExecutionStore) UpdateWorkflowExecution(
339337
err = d.db.UpdateWorkflowExecutionWithTasks(
340338
ctx, workflowRequestsWriteRequest, currentWorkflowWriteReq,
341339
mutateExecution, insertExecution, nil, // no workflow to reset here
342-
nosqlTransferTasks, nosqlCrossClusterTasks, nosqlReplicationTasks, nosqlTimerTasks,
340+
tasksByCategory,
343341
shardCondition)
344342

345343
return d.processUpdateWorkflowResult(err, request.RangeID)
@@ -422,22 +420,19 @@ func (d *nosqlExecutionStore) ConflictResolveWorkflowExecution(
422420
}
423421

424422
var mutateExecution, insertExecution, resetExecution *nosqlplugin.WorkflowExecutionRequest
425-
var nosqlTransferTasks []*nosqlplugin.TransferTask
426-
var nosqlCrossClusterTasks []*nosqlplugin.CrossClusterTask
427-
var nosqlReplicationTasks []*nosqlplugin.ReplicationTask
428-
var nosqlTimerTasks []*nosqlplugin.TimerTask
429423
var workflowRequests []*nosqlplugin.WorkflowRequestRow
424+
tasksByCategory := map[persistence.HistoryTaskCategory][]*nosqlplugin.HistoryMigrationTask{}
430425

431426
// 1. current
432427
if currentWorkflow != nil {
433428
mutateExecution, err = d.prepareUpdateWorkflowExecutionRequestWithMapsAndEventBuffer(currentWorkflow)
434429
if err != nil {
435430
return err
436431
}
437-
nosqlTransferTasks, nosqlCrossClusterTasks, nosqlReplicationTasks, nosqlTimerTasks, err = d.prepareNoSQLTasksForWorkflowTxn(
432+
err = d.prepareNoSQLTasksForWorkflowTxn(
438433
domainID, workflowID, currentWorkflow.ExecutionInfo.RunID,
439-
currentWorkflow.TransferTasks, currentWorkflow.CrossClusterTasks, currentWorkflow.ReplicationTasks, currentWorkflow.TimerTasks,
440-
nosqlTransferTasks, nosqlCrossClusterTasks, nosqlReplicationTasks, nosqlTimerTasks,
434+
currentWorkflow.TasksByCategory,
435+
tasksByCategory,
441436
)
442437
if err != nil {
443438
return err
@@ -450,10 +445,10 @@ func (d *nosqlExecutionStore) ConflictResolveWorkflowExecution(
450445
if err != nil {
451446
return err
452447
}
453-
nosqlTransferTasks, nosqlCrossClusterTasks, nosqlReplicationTasks, nosqlTimerTasks, err = d.prepareNoSQLTasksForWorkflowTxn(
448+
err = d.prepareNoSQLTasksForWorkflowTxn(
454449
domainID, workflowID, resetWorkflow.ExecutionInfo.RunID,
455-
resetWorkflow.TransferTasks, resetWorkflow.CrossClusterTasks, resetWorkflow.ReplicationTasks, resetWorkflow.TimerTasks,
456-
nosqlTransferTasks, nosqlCrossClusterTasks, nosqlReplicationTasks, nosqlTimerTasks,
450+
resetWorkflow.TasksByCategory,
451+
tasksByCategory,
457452
)
458453
if err != nil {
459454
return err
@@ -467,10 +462,10 @@ func (d *nosqlExecutionStore) ConflictResolveWorkflowExecution(
467462
return err
468463
}
469464

470-
nosqlTransferTasks, nosqlCrossClusterTasks, nosqlReplicationTasks, nosqlTimerTasks, err = d.prepareNoSQLTasksForWorkflowTxn(
465+
err = d.prepareNoSQLTasksForWorkflowTxn(
471466
domainID, workflowID, newWorkflow.ExecutionInfo.RunID,
472-
newWorkflow.TransferTasks, newWorkflow.CrossClusterTasks, newWorkflow.ReplicationTasks, newWorkflow.TimerTasks,
473-
nosqlTransferTasks, nosqlCrossClusterTasks, nosqlReplicationTasks, nosqlTimerTasks,
467+
newWorkflow.TasksByCategory,
468+
tasksByCategory,
474469
)
475470
if err != nil {
476471
return err
@@ -491,7 +486,7 @@ func (d *nosqlExecutionStore) ConflictResolveWorkflowExecution(
491486
err = d.db.UpdateWorkflowExecutionWithTasks(
492487
ctx, workflowRequestsWriteRequest, currentWorkflowWriteReq,
493488
mutateExecution, insertExecution, resetExecution,
494-
nosqlTransferTasks, nosqlCrossClusterTasks, nosqlReplicationTasks, nosqlTimerTasks,
489+
tasksByCategory,
495490
shardCondition)
496491
return d.processUpdateWorkflowResult(err, request.RangeID)
497492
}
@@ -759,7 +754,10 @@ func (d *nosqlExecutionStore) PutReplicationTaskToDLQ(
759754
ctx context.Context,
760755
request *persistence.InternalPutReplicationTaskToDLQRequest,
761756
) error {
762-
err := d.db.InsertReplicationDLQTask(ctx, d.shardID, request.SourceClusterName, *request.TaskInfo)
757+
err := d.db.InsertReplicationDLQTask(ctx, d.shardID, request.SourceClusterName, &nosqlplugin.HistoryMigrationTask{
758+
Replication: request.TaskInfo,
759+
Task: nil, // TODO: encode task infor into datablob
760+
})
763761
if err != nil {
764762
return convertCommonErrors(d.db, "PutReplicationTaskToDLQ", err)
765763
}
@@ -829,15 +827,20 @@ func (d *nosqlExecutionStore) CreateFailoverMarkerTasks(
829827
request *persistence.CreateFailoverMarkersRequest,
830828
) error {
831829

832-
var nosqlTasks []*nosqlplugin.ReplicationTask
830+
var nosqlTasks []*nosqlplugin.HistoryMigrationTask
833831
for _, task := range request.Markers {
834832
ts := []persistence.Task{task}
835833

836834
tasks, err := d.prepareReplicationTasksForWorkflowTxn(task.DomainID, rowTypeReplicationWorkflowID, rowTypeReplicationRunID, ts)
837835
if err != nil {
838836
return err
839837
}
840-
nosqlTasks = append(nosqlTasks, tasks...)
838+
for _, task := range tasks {
839+
nosqlTasks = append(nosqlTasks, &nosqlplugin.HistoryMigrationTask{
840+
Replication: task,
841+
Task: nil, // TODO: encode replication task into datablob
842+
})
843+
}
841844
}
842845

843846
err := d.db.InsertReplicationTask(ctx, nosqlTasks, nosqlplugin.ShardCondition{

0 commit comments

Comments
 (0)