Skip to content

Commit bd0c628

Browse files
authored
KAFKA-19994: TaskManager may not close all tasks on task timeouts (apache#21155)
When a TimeoutException occurs while trying to put multiple active tasks back into running, we will add the timed out task back to the state updater, so that we retry it. However, if we run into a Task timeout (failing to make progress for a long time), we will rethrow a StreamsException wrapping the TimeoutException we have drained multiple tasks from the state updater, they will be lost, and not added back to the state updater, and therefore not be closed correctly. The task directories remain locked, causing issues trying to replace the stream thread. Reviewers: Matthias J. Sax <matthias@confluent.io>
1 parent 95d164a commit bd0c628

2 files changed

Lines changed: 67 additions & 3 deletions

File tree

streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
import java.util.HashSet;
5858
import java.util.Iterator;
5959
import java.util.LinkedHashMap;
60+
import java.util.LinkedHashSet;
6061
import java.util.LinkedList;
6162
import java.util.List;
6263
import java.util.Map;
@@ -974,9 +975,12 @@ private void closeTaskClean(final Task task,
974975
}
975976
}
976977

978+
/**
979+
* @throws StreamsException if fetching committed offsets timed out often enough to exceed task timeout
980+
*/
977981
private void transitRestoredTaskToRunning(final Task task,
978982
final long now,
979-
final java.util.function.Consumer<Set<TopicPartition>> offsetResetter) {
983+
final java.util.function.Consumer<Set<TopicPartition>> offsetResetter) throws StreamsException {
980984
try {
981985
task.completeRestoration(offsetResetter);
982986
tasks.addTask(task);
@@ -1062,8 +1066,22 @@ public Map<TaskId, RuntimeException> collectExceptionsAndFailedTasksFromStateUpd
10621066
private void handleRestoredTasksFromStateUpdater(final long now,
10631067
final java.util.function.Consumer<Set<TopicPartition>> offsetResetter) {
10641068
final Duration timeout = Duration.ZERO;
1065-
for (final Task task : stateUpdater.drainRestoredActiveTasks(timeout)) {
1066-
transitRestoredTaskToRunning(task, now, offsetResetter);
1069+
// Create a mutable copy to support iterator.remove()
1070+
final Set<StreamTask> restoredTasks = new LinkedHashSet<>(stateUpdater.drainRestoredActiveTasks(timeout));
1071+
final Iterator<StreamTask> iterator = restoredTasks.iterator();
1072+
1073+
try {
1074+
while (iterator.hasNext()) {
1075+
final Task task = iterator.next();
1076+
transitRestoredTaskToRunning(task, now, offsetResetter);
1077+
iterator.remove(); // Remove successfully transitioned tasks
1078+
}
1079+
} finally {
1080+
// Add back any tasks that we drained but didn't successfully transition
1081+
// from the state updater, so that they are closed during shutdown.
1082+
for (final Task task : restoredTasks) {
1083+
stateUpdater.add(task);
1084+
}
10671085
}
10681086
}
10691087

streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1590,6 +1590,52 @@ public void shouldHandleTimeoutExceptionInTransitRestoredTaskToRunning() {
15901590
verifyNoInteractions(consumer);
15911591
}
15921592

1593+
@Test
1594+
public void shouldAddFailedRestoredTasksBackToStateUpdaterOnException() {
1595+
final StreamTask task1 = statefulTask(taskId00, taskId00ChangelogPartitions)
1596+
.inState(State.RESTORING)
1597+
.withInputPartitions(taskId00Partitions).build();
1598+
final StreamTask task2 = statefulTask(taskId01, taskId01ChangelogPartitions)
1599+
.inState(State.RESTORING)
1600+
.withInputPartitions(taskId01Partitions).build();
1601+
final StreamTask task3 = statefulTask(taskId02, taskId02ChangelogPartitions)
1602+
.inState(State.RESTORING)
1603+
.withInputPartitions(taskId02Partitions).build();
1604+
1605+
// Use LinkedHashSet to ensure predictable iteration order
1606+
final Set<StreamTask> restoredTasks = new java.util.LinkedHashSet<>();
1607+
restoredTasks.add(task1);
1608+
restoredTasks.add(task2);
1609+
restoredTasks.add(task3);
1610+
1611+
final TasksRegistry tasks = mock(TasksRegistry.class);
1612+
final TaskManager taskManager = setUpTransitionToRunningOfRestoredTask(restoredTasks, tasks);
1613+
1614+
// task1 completes successfully, task2 throws StreamsException from maybeInitTaskTimeoutOrThrow
1615+
// task3 is never processed because task2 throws
1616+
final TimeoutException timeoutException = new TimeoutException();
1617+
doThrow(timeoutException).when(task2).completeRestoration(noOpResetter);
1618+
doThrow(new StreamsException("Task timeout exceeded", task2.id())).when(task2).maybeInitTaskTimeoutOrThrow(anyLong(), eq(timeoutException));
1619+
1620+
assertThrows(StreamsException.class, () -> taskManager.checkStateUpdater(time.milliseconds(), noOpResetter));
1621+
1622+
// task1 should be successfully transitioned
1623+
verify(tasks).addTask(task1);
1624+
verify(consumer).resume(task1.inputPartitions());
1625+
verify(task1).clearTaskTimeout();
1626+
1627+
// task2 should be added back to state updater once in the finally block
1628+
// (the add in the catch block doesn't execute because maybeInitTaskTimeoutOrThrow throws)
1629+
verify(stateUpdater).add(task2);
1630+
verify(tasks, never()).addTask(task2);
1631+
verify(task2, never()).clearTaskTimeout();
1632+
1633+
// task3 should also be added back to state updater in the finally block
1634+
verify(stateUpdater).add(task3);
1635+
verify(tasks, never()).addTask(task3);
1636+
verify(task3, never()).clearTaskTimeout();
1637+
}
1638+
15931639
private TaskManager setUpTransitionToRunningOfRestoredTask(final Set<StreamTask> statefulTasks,
15941640
final TasksRegistry tasks) {
15951641
when(stateUpdater.restoresActiveTasks()).thenReturn(true);

0 commit comments

Comments
 (0)