Skip to content

Commit f568932

Browse files
MINOR: rename TaskRegistry methods to better reflect their purpose. (apache#21448)
Changed the name of method that work only with initialized tasks(not pending) to better reflect their purpose. Reviewers: Matthias J. Sax <matthias@confluent.io>, Lucas Brutschy <lbrutschy@confluent.io>
1 parent 4d0c588 commit f568932

9 files changed

Lines changed: 228 additions & 227 deletions

File tree

streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ int process(final int maxNumRecords, final Time time) {
6868
int totalProcessed = 0;
6969
Task lastProcessed = null;
7070

71-
for (final Task task : tasks.activeTasks()) {
71+
for (final Task task : tasks.activeInitializedTasks()) {
7272
final long now = time.milliseconds();
7373
try {
7474
if (executionMetadata.canProcessTask(task, now)) {
@@ -233,7 +233,7 @@ void commitOffsetsOrTransaction(final Map<Task, Map<TopicPartition, OffsetAndMet
233233

234234
private void updateTaskCommitMetadata(final Map<TopicPartition, OffsetAndMetadata> allOffsets) {
235235
if (!allOffsets.isEmpty()) {
236-
for (final Task task : tasks.activeTasks()) {
236+
for (final Task task : tasks.activeInitializedTasks()) {
237237
if (task instanceof StreamTask) {
238238
for (final TopicPartition topicPartition : task.inputPartitions()) {
239239
if (allOffsets.containsKey(topicPartition)) {
@@ -261,7 +261,7 @@ private void commitSuccessfullyProcessedTasks() {
261261
int punctuate() {
262262
int punctuated = 0;
263263

264-
for (final Task task : tasks.activeTasks()) {
264+
for (final Task task : tasks.activeInitializedTasks()) {
265265
try {
266266
if (executionMetadata.canPunctuateTask(task)) {
267267
if (task.maybePunctuateStreamTime()) {

streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java

Lines changed: 27 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -197,7 +197,7 @@ void handleRebalanceComplete() {
197197
// we should pause consumer only within the listener since
198198
// before then the assignment has not been updated yet.
199199
// All tasks that are owned by the task manager are ready and do not need to be paused
200-
final Set<TopicPartition> partitionsNotToPause = tasks.allNonFailedTasks()
200+
final Set<TopicPartition> partitionsNotToPause = tasks.allNonFailedInitializedTasks()
201201
.stream()
202202
.flatMap(task -> task.inputPartitions().stream())
203203
.collect(Collectors.toSet());
@@ -214,7 +214,7 @@ void handleRebalanceComplete() {
214214
* @throws TaskMigratedException
215215
*/
216216
boolean handleCorruption(final Set<TaskId> corruptedTasks) {
217-
final Set<TaskId> activeTasks = new HashSet<>(tasks.activeTaskIds());
217+
final Set<TaskId> activeTasks = new HashSet<>(tasks.activeInitializedTaskIds());
218218

219219
// We need to stop all processing, since we need to commit non-corrupted tasks as well.
220220
maybeLockTasks(activeTasks);
@@ -223,7 +223,7 @@ boolean handleCorruption(final Set<TaskId> corruptedTasks) {
223223
final Set<Task> corruptedStandbyTasks = new TreeSet<>(Comparator.comparing(Task::id));
224224

225225
for (final TaskId taskId : corruptedTasks) {
226-
final Task task = tasks.task(taskId);
226+
final Task task = tasks.initializedTask(taskId);
227227
if (task.isActive()) {
228228
corruptedActiveTasks.add(task);
229229
} else {
@@ -237,7 +237,7 @@ boolean handleCorruption(final Set<TaskId> corruptedTasks) {
237237

238238
// We need to commit before closing the corrupted active tasks since this will force the ongoing txn to abort
239239
try {
240-
final Collection<Task> tasksToCommit = tasks.allTasksPerId()
240+
final Collection<Task> tasksToCommit = tasks.allInitializedTasksPerId()
241241
.values()
242242
.stream()
243243
.filter(t -> t.state() == Task.State.RUNNING)
@@ -247,10 +247,10 @@ boolean handleCorruption(final Set<TaskId> corruptedTasks) {
247247
} catch (final TaskCorruptedException e) {
248248
log.info("Some additional tasks were found corrupted while trying to commit, these will be added to the " +
249249
"tasks to clean and revive: {}", e.corruptedTasks());
250-
corruptedActiveTasks.addAll(tasks.tasks(e.corruptedTasks()));
250+
corruptedActiveTasks.addAll(tasks.initializedTasks(e.corruptedTasks()));
251251
} catch (final TimeoutException e) {
252252
log.info("Hit TimeoutException when committing all non-corrupted tasks, these will be closed and revived");
253-
final Collection<Task> uncorruptedTasks = new HashSet<>(tasks.activeTasks());
253+
final Collection<Task> uncorruptedTasks = new HashSet<>(tasks.activeInitializedTasks());
254254
uncorruptedTasks.removeAll(corruptedActiveTasks);
255255
// Those tasks which just timed out can just be closed dirty without marking changelogs as corrupted
256256
closeDirtyAndRevive(uncorruptedTasks, false);
@@ -366,7 +366,7 @@ public void handleAssignment(final Map<TaskId, Set<TopicPartition>> activeTasks,
366366
final Set<Task> tasksToCloseClean = new TreeSet<>(Comparator.comparing(Task::id));
367367

368368
final Set<TaskId> tasksToLock =
369-
tasks.allTaskIds().stream()
369+
tasks.allInitializedTaskIds().stream()
370370
.filter(x -> activeTasksToCreate.containsKey(x) || standbyTasksToCreate.containsKey(x))
371371
.collect(Collectors.toSet());
372372

@@ -533,7 +533,7 @@ private void handleRunningAndSuspendedTasks(final Map<TaskId, Set<TopicPartition
533533
final Map<TaskId, Set<TopicPartition>> standbyTasksToCreate,
534534
final Map<Task, Set<TopicPartition>> tasksToRecycle,
535535
final Set<Task> tasksToCloseClean) {
536-
for (final Task task : tasks.allNonFailedTasks()) {
536+
for (final Task task : tasks.allNonFailedInitializedTasks()) {
537537
if (!task.isActive()) {
538538
throw new IllegalStateException("Standby tasks should only be managed by the state updater, " +
539539
"but standby task " + task.id() + " is managed by the stream thread");
@@ -733,7 +733,7 @@ private Map<TaskId, Set<TopicPartition>> pendingTasksToCreate(final Map<TaskId,
733733
while (iter.hasNext()) {
734734
final Map.Entry<TaskId, Set<TopicPartition>> entry = iter.next();
735735
final TaskId taskId = entry.getKey();
736-
final boolean taskIsOwned = tasks.allTaskIds().contains(taskId)
736+
final boolean taskIsOwned = tasks.allInitializedTaskIds().contains(taskId)
737737
|| (stateUpdater.tasks().stream().anyMatch(task -> task.id().equals(taskId)));
738738
if (taskId.topologyName() != null && !taskIsOwned && !topologyMetadata.namedTopologiesView().contains(taskId.topologyName())) {
739739
log.info("Cannot create the assigned task {} since it's topology name cannot be recognized, will put it " +
@@ -1079,7 +1079,7 @@ void handleRevocation(final Collection<TopicPartition> revokedPartitions) {
10791079
e.corruptedTasks());
10801080

10811081
// If we hit a TaskCorruptedException it must be EOS, just handle the cleanup for those corrupted tasks right here
1082-
dirtyTasks.addAll(tasks.tasks(e.corruptedTasks()));
1082+
dirtyTasks.addAll(tasks.initializedTasks(e.corruptedTasks()));
10831083
closeDirtyAndRevive(dirtyTasks, true);
10841084
} catch (final TimeoutException e) {
10851085
log.warn("Timed out while trying to commit all tasks during revocation, these will be cleaned and revived");
@@ -1193,8 +1193,8 @@ void handleLostAll() {
11931193
}
11941194

11951195
private void closeRunningTasksDirty() {
1196-
final Set<Task> allTask = tasks.allTasks();
1197-
final Set<TaskId> allTaskIds = tasks.allTaskIds();
1196+
final Set<Task> allTask = tasks.allInitializedTasks();
1197+
final Set<TaskId> allTaskIds = tasks.allInitializedTaskIds();
11981198
maybeLockTasks(allTaskIds);
11991199
for (final Task task : allTask) {
12001200
// Even though we've apparently dropped out of the group, we can continue safely to maintain our
@@ -1413,10 +1413,10 @@ void shutdown(final boolean clean) {
14131413

14141414
// TODO: change type to `StreamTask`
14151415
final Set<Task> activeTasks = new TreeSet<>(Comparator.comparing(Task::id));
1416-
activeTasks.addAll(tasks.activeTasks());
1416+
activeTasks.addAll(tasks.activeInitializedTasks());
14171417
// TODO: change type to `StandbyTask`
14181418
final Set<Task> standbyTasks = new TreeSet<>(Comparator.comparing(Task::id));
1419-
standbyTasks.addAll(tasks.standbyTasks());
1419+
standbyTasks.addAll(tasks.standbyInitializedTasks());
14201420

14211421
final Set<Task> pendingActiveTasks = tasks.drainPendingActiveTasksToInit();
14221422
activeTasks.addAll(pendingActiveTasks);
@@ -1670,7 +1670,7 @@ Map<TaskId, Task> allTasks() {
16701670
// not bothering with an unmodifiable map, since the tasks themselves are mutable, but
16711671
// if any outside code modifies the map or the tasks, it would be a severe transgression.
16721672
final Map<TaskId, Task> ret = stateUpdater.tasks().stream().collect(Collectors.toMap(Task::id, x -> x));
1673-
ret.putAll(tasks.allTasksPerId());
1673+
ret.putAll(tasks.allInitializedTasksPerId());
16741674
ret.putAll(tasks.pendingTasksToInit().stream().collect(Collectors.toMap(Task::id, x -> x)));
16751675
return ret;
16761676
}
@@ -1683,19 +1683,19 @@ Map<TaskId, Task> allTasks() {
16831683
Map<TaskId, Task> allRunningTasks() {
16841684
// not bothering with an unmodifiable map, since the tasks themselves are mutable, but
16851685
// if any outside code modifies the map or the tasks, it would be a severe transgression.
1686-
return tasks.allTasksPerId();
1686+
return tasks.allInitializedTasksPerId();
16871687
}
16881688

16891689
Set<Task> readOnlyAllTasks() {
16901690
// not bothering with an unmodifiable map, since the tasks themselves are mutable, but
16911691
// if any outside code modifies the map or the tasks, it would be a severe transgression.
16921692
final HashSet<Task> ret = new HashSet<>(stateUpdater.tasks());
1693-
ret.addAll(tasks.allTasks());
1693+
ret.addAll(tasks.allInitializedTasks());
16941694
return Collections.unmodifiableSet(ret);
16951695
}
16961696

16971697
Map<TaskId, Task> notPausedTasks() {
1698-
return Collections.unmodifiableMap(tasks.allTasks()
1698+
return Collections.unmodifiableMap(tasks.allInitializedTasks()
16991699
.stream()
17001700
.filter(t -> !topologyMetadata.isPaused(t.id().topologyName()))
17011701
.collect(Collectors.toMap(Task::id, v -> v)));
@@ -1721,7 +1721,7 @@ private Stream<Task> activeTaskStream() {
17211721
}
17221722

17231723
private Stream<Task> activeRunningTaskStream() {
1724-
return tasks.allTasks().stream().filter(Task::isActive);
1724+
return tasks.allInitializedTasks().stream().filter(Task::isActive);
17251725
}
17261726

17271727
Map<TaskId, Task> standbyTaskMap() {
@@ -1733,23 +1733,23 @@ private List<Task> standbyTaskIterable() {
17331733
}
17341734

17351735
private Stream<Task> standbyTaskStream() {
1736-
final Stream<Task> standbyTasksInTaskRegistry = tasks.allTasks().stream().filter(t -> !t.isActive());
1736+
final Stream<Task> standbyTasksInTaskRegistry = tasks.allInitializedTasks().stream().filter(t -> !t.isActive());
17371737
return Stream.concat(
17381738
stateUpdater.standbyTasks().stream(),
17391739
standbyTasksInTaskRegistry
17401740
);
17411741
}
17421742
// For testing only.
17431743
int commitAll() {
1744-
return commit(tasks.allTasks());
1744+
return commit(tasks.allInitializedTasks());
17451745
}
17461746

17471747
/**
17481748
* Resumes polling in the main consumer for all partitions for which
17491749
* the corresponding record queues have capacity (again).
17501750
*/
17511751
public void resumePollingForPartitionsWithAvailableSpace() {
1752-
for (final Task t: tasks.activeTasks()) {
1752+
for (final Task t: tasks.activeInitializedTasks()) {
17531753
t.resumePollingForPartitionsWithAvailableSpace();
17541754
}
17551755
}
@@ -1758,7 +1758,7 @@ public void resumePollingForPartitionsWithAvailableSpace() {
17581758
* Fetches up-to-date lag information from the consumer.
17591759
*/
17601760
public void updateLags() {
1761-
for (final Task t: tasks.activeTasks()) {
1761+
for (final Task t: tasks.activeInitializedTasks()) {
17621762
t.updateLags();
17631763
}
17641764
}
@@ -1808,7 +1808,7 @@ void maybeInitTaskTimeoutsOrThrow(
18081808
}
18091809

18101810
private Task getActiveTask(final TopicPartition partition) {
1811-
final Task activeTask = tasks.activeTasksForInputPartition(partition);
1811+
final Task activeTask = tasks.activeInitializedTasksForInputPartition(partition);
18121812

18131813
if (activeTask == null) {
18141814
log.error("Unable to locate active task for received-record partition {}. Current tasks: {}",
@@ -1912,7 +1912,7 @@ private int commitTasksAndMaybeUpdateCommittableOffsets(final Collection<Task> t
19121912
}
19131913

19141914
public void updateTaskEndMetadata(final TopicPartition topicPartition, final Long offset) {
1915-
for (final Task task : tasks.activeTasks()) {
1915+
for (final Task task : tasks.activeInitializedTasks()) {
19161916
if (task instanceof StreamTask) {
19171917
if (task.inputPartitions().contains(topicPartition)) {
19181918
((StreamTask) task).updateEndOffsets(topicPartition, offset);
@@ -1943,7 +1943,7 @@ void maybeCloseTasksFromRemovedTopologies(final Set<String> currentNamedTopologi
19431943
try {
19441944
final Set<Task> activeTasksToRemove = new TreeSet<>(Comparator.comparing(Task::id));
19451945
final Set<Task> standbyTasksToRemove = new TreeSet<>(Comparator.comparing(Task::id));
1946-
for (final Task task : tasks.allTasks()) {
1946+
for (final Task task : tasks.allInitializedTasks()) {
19471947
if (!currentNamedTopologies.contains(task.id().topologyName())) {
19481948
if (task.isActive()) {
19491949
activeTasksToRemove.add(task);
@@ -2032,7 +2032,7 @@ public String toString(final String indent) {
20322032
stringBuilder.append("TaskManager\n");
20332033
stringBuilder.append(indent).append("\tMetadataState:\n");
20342034
stringBuilder.append(indent).append("\tTasks:\n");
2035-
for (final Task task : tasks.allTasks()) {
2035+
for (final Task task : tasks.allInitializedTasks()) {
20362036
stringBuilder.append(indent)
20372037
.append("\t\t")
20382038
.append(task.id())

streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -280,6 +280,7 @@ private void removePartitionsForActiveTask(final TaskId taskId) {
280280
@Override
281281
public synchronized void clear() {
282282
pendingTasksToInit.clear();
283+
pendingTasksToClose.clear();
283284
pendingActiveTasksToCreate.clear();
284285
pendingStandbyTasksToCreate.clear();
285286
activeTasksPerId.clear();
@@ -290,7 +291,7 @@ public synchronized void clear() {
290291

291292
// TODO: change return type to `StreamTask`
292293
@Override
293-
public Task activeTasksForInputPartition(final TopicPartition partition) {
294+
public Task activeInitializedTasksForInputPartition(final TopicPartition partition) {
294295
return activeTasksPerPartition.get(partition);
295296
}
296297

@@ -305,7 +306,7 @@ private synchronized Task getTask(final TaskId taskId) {
305306
}
306307

307308
@Override
308-
public Task task(final TaskId taskId) {
309+
public Task initializedTask(final TaskId taskId) {
309310
final Task task = getTask(taskId);
310311

311312
if (task != null)
@@ -315,26 +316,26 @@ public Task task(final TaskId taskId) {
315316
}
316317

317318
@Override
318-
public Collection<Task> tasks(final Collection<TaskId> taskIds) {
319+
public Collection<Task> initializedTasks(final Collection<TaskId> taskIds) {
319320
final Set<Task> tasks = new HashSet<>();
320321
for (final TaskId taskId : taskIds) {
321-
tasks.add(task(taskId));
322+
tasks.add(initializedTask(taskId));
322323
}
323324
return tasks;
324325
}
325326

326327
@Override
327-
public synchronized Collection<TaskId> activeTaskIds() {
328+
public synchronized Collection<TaskId> activeInitializedTaskIds() {
328329
return Collections.unmodifiableCollection(activeTasksPerId.keySet());
329330
}
330331

331332
@Override
332-
public synchronized Collection<Task> activeTasks() {
333+
public synchronized Collection<Task> activeInitializedTasks() {
333334
return Collections.unmodifiableCollection(activeTasksPerId.values());
334335
}
335336

336337
@Override
337-
public synchronized Collection<Task> standbyTasks() {
338+
public synchronized Collection<Task> standbyInitializedTasks() {
338339
return Collections.unmodifiableCollection(standbyTasksPerId.values());
339340
}
340341

@@ -343,12 +344,12 @@ public synchronized Collection<Task> standbyTasks() {
343344
* and the returned task could be modified by other threads concurrently
344345
*/
345346
@Override
346-
public synchronized Set<Task> allTasks() {
347+
public synchronized Set<Task> allInitializedTasks() {
347348
return union(HashSet::new, new HashSet<>(activeTasksPerId.values()), new HashSet<>(standbyTasksPerId.values()));
348349
}
349350

350351
@Override
351-
public synchronized Set<Task> allNonFailedTasks() {
352+
public synchronized Set<Task> allNonFailedInitializedTasks() {
352353
final Set<Task> nonFailedActiveTasks = activeTasksPerId.values().stream()
353354
.filter(task -> !failedTaskIds.contains(task.id()))
354355
.collect(Collectors.toSet());
@@ -359,20 +360,20 @@ public synchronized Set<Task> allNonFailedTasks() {
359360
}
360361

361362
@Override
362-
public synchronized Set<TaskId> allTaskIds() {
363+
public synchronized Set<TaskId> allInitializedTaskIds() {
363364
return union(HashSet::new, activeTasksPerId.keySet(), standbyTasksPerId.keySet());
364365
}
365366

366367
@Override
367-
public synchronized Map<TaskId, Task> allTasksPerId() {
368+
public synchronized Map<TaskId, Task> allInitializedTasksPerId() {
368369
final Map<TaskId, Task> ret = new HashMap<>();
369370
ret.putAll(activeTasksPerId);
370371
ret.putAll(standbyTasksPerId);
371372
return ret;
372373
}
373374

374375
@Override
375-
public boolean contains(final TaskId taskId) {
376+
public boolean containsInitialized(final TaskId taskId) {
376377
return getTask(taskId) != null;
377378
}
378379

streams/src/main/java/org/apache/kafka/streams/processor/internals/TasksRegistry.java

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -69,25 +69,25 @@ public interface TasksRegistry {
6969

7070
void clear();
7171

72-
Task activeTasksForInputPartition(final TopicPartition partition);
72+
Task activeInitializedTasksForInputPartition(final TopicPartition partition);
7373

74-
Task task(final TaskId taskId);
74+
Task initializedTask(final TaskId taskId);
7575

76-
Collection<Task> tasks(final Collection<TaskId> taskIds);
76+
Collection<Task> initializedTasks(final Collection<TaskId> taskIds);
7777

78-
Collection<TaskId> activeTaskIds();
78+
Collection<TaskId> activeInitializedTaskIds();
7979

80-
Collection<Task> activeTasks();
80+
Collection<Task> activeInitializedTasks();
8181

82-
Collection<Task> standbyTasks();
82+
Collection<Task> standbyInitializedTasks();
8383

84-
Set<Task> allTasks();
84+
Set<Task> allInitializedTasks();
8585

86-
Set<Task> allNonFailedTasks();
86+
Set<Task> allNonFailedInitializedTasks();
8787

88-
Map<TaskId, Task> allTasksPerId();
88+
Map<TaskId, Task> allInitializedTasksPerId();
8989

90-
Set<TaskId> allTaskIds();
90+
Set<TaskId> allInitializedTaskIds();
9191

92-
boolean contains(final TaskId taskId);
92+
boolean containsInitialized(final TaskId taskId);
9393
}

0 commit comments

Comments
 (0)