MINOR: Replace generic Task type with StreamTask and StandbyTask#21502
Conversation
mjsax
left a comment
There was a problem hiding this comment.
Great cleanup! It makes me very happy to see all these unnecessary casts and instanceof stuff and others boiler plat go away!
| // TODO: convert to StreamTask when we remove TaskManager#StateMachineTask with mocks | ||
| public Collection<Task> createTasks(final Consumer<byte[], byte[]> consumer, | ||
| public Collection<StreamTask> createTasks(final Consumer<byte[], byte[]> consumer, | ||
| final Map<TaskId, Set<TopicPartition>> tasksToBeCreated) { |
| private Collection<Task> tryCloseCleanActiveTasks(final Collection<Task> activeTasksToClose, | ||
| private Collection<StreamTask> tryCloseCleanActiveTasks(final Collection<StreamTask> activeTasksToClose, | ||
| final boolean clean, | ||
| final AtomicReference<RuntimeException> firstException) { |
| private Collection<Task> tryCloseCleanStandbyTasks(final Collection<Task> standbyTasksToClose, | ||
| private Collection<StandbyTask> tryCloseCleanStandbyTasks(final Collection<StandbyTask> standbyTasksToClose, | ||
| final boolean clean, | ||
| final AtomicReference<RuntimeException> firstException) { |
| } | ||
|
|
||
| if (standbyTasksPerId.containsKey(taskId)) { | ||
| throw new IllegalStateException("Attempted to create an active task while we already own its standby: " + taskId); |
There was a problem hiding this comment.
Both original error messages say Attempted to create an active task -- so wondering why the original method is called addTask? Or was the original message not fully correct to begin with? In the end, we cannot have active and standby (with task task-id) at the same time, but it goes both ways. So it would also be a valid error message to say Attempted to create an standby task...?
Also, by extracting checkTaskDoesNotExist(), we actually get a single more general error message -- given that checkTaskDoesNotExist() is very short, might be better to keep the code inlined, and use two different error messages.
There was a problem hiding this comment.
yeah, there was addActiveTask with these errors, then it became a more generic addTask method, but the errors stayed the same
sounds good. will do
mjsax
left a comment
There was a problem hiding this comment.
Thanks. LGTM. Can merge if CI passes.
| private Stream<Task> activeTaskStream() { | ||
| return Stream.concat( | ||
| activeRunningTaskStream(), | ||
| stateUpdater.tasks().stream().filter(Task::isActive) |
There was a problem hiding this comment.
not 100% if stateUpdater.tasks() needs to return ReadOnlyTask. or maybe we should have a separate method to get all active tasks to call it here, as we do for standby tasks
This PR improves type safety by replacing the generic Task type with
StreamTask and StandbyTask. As a side effect, we can avoid unnecessary
casts, usage of instanceof, or checking the task type via .isActiveTask.
Reviewers: Matthias J. Sax matthias@confluent.io