Skip to content

Commit 7e97270

Browse files
authored
Refactor task executor to use specific task types (#6760)
<!-- Describe what has changed in this PR --> **What changed?** - Delete task.Info interface and use persistence.Task instead - Refactor task executors to use concrete persistence.Task types for task execution - Unify some helper methods in task package <!-- Tell your future self why have you made these changes --> **Why?** - To get rid of persistence.TransferTaskInfo and persistence.TimerTaskInfo - To simplify code and reduce type conversions <!-- How have you verified this change? Tested locally? Added a unit test? Checked in staging env? --> **How did you test it?** unit tests, integration tests <!-- Assuming the worst case, what can be broken when deploying this change to production? --> **Potential risks** <!-- Is it notable for release? e.g. schema updates, configuration or data migration required? If so, please mention it, and also update CHANGELOG.md --> **Release notes** <!-- Is there any documentation updates should be made for config, https://cadenceworkflow.io/docs/operation-guide/setup/ ? If so, please open an PR in https://github.com/cadence-workflow/cadence-docs --> **Documentation Changes**
1 parent 0d54d4e commit 7e97270

12 files changed

+316
-670
lines changed

service/history/task/interface.go

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,6 @@
2323
package task
2424

2525
import (
26-
"time"
27-
2826
"github.com/uber/cadence/common"
2927
"github.com/uber/cadence/common/future"
3028
"github.com/uber/cadence/common/persistence"
@@ -34,17 +32,6 @@ import (
3432
)
3533

3634
type (
37-
// Info contains the metadata for a task
38-
Info interface {
39-
GetVersion() int64
40-
GetTaskID() int64
41-
GetTaskType() int
42-
GetVisibilityTimestamp() time.Time
43-
GetWorkflowID() string
44-
GetRunID() string
45-
GetDomainID() string
46-
}
47-
4835
// Task is the interface for all tasks generated by history service
4936
Task interface {
5037
task.PriorityTask

service/history/task/interface_mock.go

Lines changed: 0 additions & 122 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

service/history/task/standby_task_util.go

Lines changed: 13 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -35,14 +35,14 @@ import (
3535

3636
type (
3737
standbyActionFn func(context.Context, execution.Context, execution.MutableState) (interface{}, error)
38-
standbyPostActionFn func(context.Context, Info, interface{}, log.Logger) error
38+
standbyPostActionFn func(context.Context, persistence.Task, interface{}, log.Logger) error
3939

4040
standbyCurrentTimeFn func() time.Time
4141
)
4242

4343
func standbyTaskPostActionNoOp(
4444
ctx context.Context,
45-
taskInfo Info,
45+
taskInfo persistence.Task,
4646
postActionInfo interface{},
4747
_ log.Logger,
4848
) error {
@@ -55,9 +55,9 @@ func standbyTaskPostActionNoOp(
5555
return &redispatchError{Reason: fmt.Sprintf("post action is %T", postActionInfo)}
5656
}
5757

58-
func standbyTransferTaskPostActionTaskDiscarded(
58+
func standbyTaskPostActionTaskDiscarded(
5959
ctx context.Context,
60-
taskInfo Info,
60+
task persistence.Task,
6161
postActionInfo interface{},
6262
logger log.Logger,
6363
) error {
@@ -66,41 +66,14 @@ func standbyTransferTaskPostActionTaskDiscarded(
6666
return nil
6767
}
6868

69-
transferTask := taskInfo.(*persistence.TransferTaskInfo)
70-
logger.Error("Discarding standby transfer task due to task being pending for too long.",
71-
tag.WorkflowID(transferTask.WorkflowID),
72-
tag.WorkflowRunID(transferTask.RunID),
73-
tag.WorkflowDomainID(transferTask.DomainID),
74-
tag.TaskID(transferTask.TaskID),
75-
tag.TaskType(transferTask.TaskType),
76-
tag.FailoverVersion(transferTask.GetVersion()),
77-
tag.Timestamp(transferTask.VisibilityTimestamp),
78-
tag.WorkflowEventID(transferTask.ScheduleID))
79-
return ErrTaskDiscarded
80-
}
81-
82-
func standbyTimerTaskPostActionTaskDiscarded(
83-
ctx context.Context,
84-
taskInfo Info,
85-
postActionInfo interface{},
86-
logger log.Logger,
87-
) error {
88-
89-
if postActionInfo == nil {
90-
return nil
91-
}
92-
93-
timerTask := taskInfo.(*persistence.TimerTaskInfo)
94-
logger.Error("Discarding standby timer task due to task being pending for too long.",
95-
tag.WorkflowID(timerTask.WorkflowID),
96-
tag.WorkflowRunID(timerTask.RunID),
97-
tag.WorkflowDomainID(timerTask.DomainID),
98-
tag.TaskID(timerTask.TaskID),
99-
tag.TaskType(timerTask.TaskType),
100-
tag.WorkflowTimeoutType(int64(timerTask.TimeoutType)),
101-
tag.FailoverVersion(timerTask.GetVersion()),
102-
tag.Timestamp(timerTask.VisibilityTimestamp),
103-
tag.WorkflowEventID(timerTask.EventID))
69+
logger.Error("Discarding standby task due to task being pending for too long.",
70+
tag.WorkflowID(task.GetWorkflowID()),
71+
tag.WorkflowRunID(task.GetRunID()),
72+
tag.WorkflowDomainID(task.GetDomainID()),
73+
tag.TaskID(task.GetTaskID()),
74+
tag.TaskType(task.GetTaskType()),
75+
tag.FailoverVersion(task.GetVersion()),
76+
tag.Timestamp(task.GetVisibilityTimestamp()))
10477
return ErrTaskDiscarded
10578
}
10679

@@ -170,7 +143,7 @@ func getHistoryResendInfo(
170143
}
171144

172145
func getStandbyPostActionFn(
173-
taskInfo Info,
146+
taskInfo persistence.Task,
174147
standbyNow standbyCurrentTimeFn,
175148
standbyTaskMissingEventsResendDelay time.Duration,
176149
standbyTaskMissingEventsDiscardDelay time.Duration,

0 commit comments

Comments
 (0)