Skip to content

Commit f7b242f

Browse files
authored
KAFKA-10199: Revoke tasks from state updater with new remove (apache#15871)
Uses the new remove operation of the state updater that returns a future to remove revoked tasks from the state updater. Reviewer: Lucas Brutschy <[email protected]>
1 parent 3b43edd commit f7b242f

File tree

2 files changed

+87
-15
lines changed

2 files changed

+87
-15
lines changed

streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java

+31-4
Original file line numberDiff line numberDiff line change
@@ -623,6 +623,21 @@ private void addToTasksToClose(final Map<TaskId, CompletableFuture<StateUpdater.
623623
});
624624
}
625625

626+
private void iterateAndActOnRemovedTask(final Map<TaskId, CompletableFuture<StateUpdater.RemovedTaskResult>> futures,
627+
final Map<TaskId, RuntimeException> failedTasks,
628+
final java.util.function.Consumer<Task> action) {
629+
iterateAndActOnFuture(futures, removedTaskResult -> {
630+
final Task task = removedTaskResult.task();
631+
final Optional<RuntimeException> exception = removedTaskResult.exception();
632+
if (exception.isPresent()) {
633+
failedTasks.put(task.id(), exception.get());
634+
tasks.addTask(task);
635+
} else {
636+
action.accept(task);
637+
}
638+
});
639+
}
640+
626641
private void iterateAndActOnFuture(final Map<TaskId, CompletableFuture<StateUpdater.RemovedTaskResult>> futures,
627642
final java.util.function.Consumer<StateUpdater.RemovedTaskResult> action) {
628643
for (final Map.Entry<TaskId, CompletableFuture<StateUpdater.RemovedTaskResult>> entry : futures.entrySet()) {
@@ -632,7 +647,8 @@ private void iterateAndActOnFuture(final Map<TaskId, CompletableFuture<StateUpda
632647
final StateUpdater.RemovedTaskResult removedTaskResult = waitForFuture(taskId, future);
633648
action.accept(removedTaskResult);
634649
} catch (final ExecutionException executionException) {
635-
log.warn("An exception happened when removing task {} from the state updater. The exception will be handled later: ",
650+
log.warn("An exception happened when removing task {} from the state updater. The task was added to the " +
651+
"failed task in the state updater: ",
636652
taskId, executionException);
637653
} catch (final InterruptedException shouldNotHappen) {
638654
Thread.currentThread().interrupt();
@@ -1085,7 +1101,7 @@ void handleRevocation(final Collection<TopicPartition> revokedPartitions) {
10851101
}
10861102
}
10871103

1088-
addRevokedTasksInStateUpdaterToPendingTasksToSuspend(remainingRevokedPartitions);
1104+
revokeTasksInStateUpdater(remainingRevokedPartitions);
10891105

10901106
if (!remainingRevokedPartitions.isEmpty()) {
10911107
log.debug("The following revoked partitions {} are missing from the current task partitions. It could "
@@ -1174,16 +1190,27 @@ void handleRevocation(final Collection<TopicPartition> revokedPartitions) {
11741190
}
11751191
}
11761192

1177-
private void addRevokedTasksInStateUpdaterToPendingTasksToSuspend(final Set<TopicPartition> remainingRevokedPartitions) {
1193+
private void revokeTasksInStateUpdater(final Set<TopicPartition> remainingRevokedPartitions) {
11781194
if (stateUpdater != null) {
1195+
final Map<TaskId, CompletableFuture<StateUpdater.RemovedTaskResult>> futures = new LinkedHashMap<>();
1196+
final Map<TaskId, RuntimeException> failedTasksFromStateUpdater = new HashMap<>();
11791197
for (final Task restoringTask : stateUpdater.getTasks()) {
11801198
if (restoringTask.isActive()) {
11811199
if (remainingRevokedPartitions.containsAll(restoringTask.inputPartitions())) {
1182-
tasks.addPendingActiveTaskToSuspend(restoringTask.id());
1200+
futures.put(restoringTask.id(), stateUpdater.removeWithFuture(restoringTask.id()));
11831201
remainingRevokedPartitions.removeAll(restoringTask.inputPartitions());
11841202
}
11851203
}
11861204
}
1205+
iterateAndActOnRemovedTask(
1206+
futures,
1207+
failedTasksFromStateUpdater,
1208+
task -> {
1209+
task.suspend();
1210+
tasks.addTask(task);
1211+
}
1212+
);
1213+
maybeThrowTaskExceptions(failedTasksFromStateUpdater);
11871214
}
11881215
}
11891216

streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java

+56-11
Original file line numberDiff line numberDiff line change
@@ -1351,21 +1351,26 @@ public void shouldReturnTrueFromCheckStateUpdaterIfActiveTasksAreNotRestoringAnd
13511351
}
13521352

13531353
@Test
1354-
public void shouldAddActiveTaskWithRevokedInputPartitionsInStateUpdaterToPendingTasksToSuspend() {
1354+
public void shouldSuspendActiveTaskWithRevokedInputPartitionsInStateUpdater() {
13551355
final StreamTask task = statefulTask(taskId00, taskId00ChangelogPartitions)
13561356
.inState(State.RESTORING)
13571357
.withInputPartitions(taskId00Partitions).build();
13581358
final TasksRegistry tasks = mock(TasksRegistry.class);
13591359
final TaskManager taskManager = setupForRevocationAndLost(mkSet(task), tasks);
13601360
when(stateUpdater.getTasks()).thenReturn(mkSet(task));
1361+
final CompletableFuture<StateUpdater.RemovedTaskResult> future = new CompletableFuture<>();
1362+
when(stateUpdater.removeWithFuture(task.id())).thenReturn(future);
1363+
future.complete(new StateUpdater.RemovedTaskResult(task));
13611364

13621365
taskManager.handleRevocation(task.inputPartitions());
13631366

1364-
verify(tasks).addPendingActiveTaskToSuspend(task.id());
1365-
verify(stateUpdater, never()).remove(task.id());
1367+
verify(task).suspend();
1368+
verify(tasks).addTask(task);
1369+
verify(stateUpdater).removeWithFuture(task.id());
13661370
}
13671371

1368-
public void shouldAddMultipleActiveTasksWithRevokedInputPartitionsInStateUpdaterToPendingTasksToSuspend() {
1372+
@Test
1373+
public void shouldSuspendMultipleActiveTasksWithRevokedInputPartitionsInStateUpdater() {
13691374
final StreamTask task1 = statefulTask(taskId00, taskId00ChangelogPartitions)
13701375
.inState(State.RESTORING)
13711376
.withInputPartitions(taskId00Partitions).build();
@@ -1374,15 +1379,23 @@ public void shouldAddMultipleActiveTasksWithRevokedInputPartitionsInStateUpdater
13741379
.withInputPartitions(taskId01Partitions).build();
13751380
final TasksRegistry tasks = mock(TasksRegistry.class);
13761381
final TaskManager taskManager = setupForRevocationAndLost(mkSet(task1, task2), tasks);
1382+
final CompletableFuture<StateUpdater.RemovedTaskResult> future1 = new CompletableFuture<>();
1383+
when(stateUpdater.removeWithFuture(task1.id())).thenReturn(future1);
1384+
future1.complete(new StateUpdater.RemovedTaskResult(task1));
1385+
final CompletableFuture<StateUpdater.RemovedTaskResult> future2 = new CompletableFuture<>();
1386+
when(stateUpdater.removeWithFuture(task2.id())).thenReturn(future2);
1387+
future2.complete(new StateUpdater.RemovedTaskResult(task2));
13771388

13781389
taskManager.handleRevocation(union(HashSet::new, taskId00Partitions, taskId01Partitions));
13791390

1380-
verify(tasks).addPendingActiveTaskToSuspend(task1.id());
1381-
verify(tasks).addPendingActiveTaskToSuspend(task2.id());
1391+
verify(task1).suspend();
1392+
verify(tasks).addTask(task1);
1393+
verify(task2).suspend();
1394+
verify(tasks).addTask(task2);
13821395
}
13831396

13841397
@Test
1385-
public void shouldNotAddActiveTaskWithoutRevokedInputPartitionsInStateUpdaterToPendingTasksToSuspend() {
1398+
public void shouldNotSuspendActiveTaskWithoutRevokedInputPartitionsInStateUpdater() {
13861399
final StreamTask task = statefulTask(taskId00, taskId00ChangelogPartitions)
13871400
.inState(State.RESTORING)
13881401
.withInputPartitions(taskId00Partitions).build();
@@ -1391,8 +1404,9 @@ public void shouldNotAddActiveTaskWithoutRevokedInputPartitionsInStateUpdaterToP
13911404

13921405
taskManager.handleRevocation(taskId01Partitions);
13931406

1394-
verify(stateUpdater, never()).remove(task.id());
1395-
verify(tasks, never()).addPendingActiveTaskToSuspend(task.id());
1407+
verify(task, never()).suspend();
1408+
verify(tasks, never()).addTask(task);
1409+
verify(stateUpdater, never()).removeWithFuture(task.id());
13961410
}
13971411

13981412
@Test
@@ -1405,8 +1419,39 @@ public void shouldNotRevokeStandbyTaskInStateUpdaterOnRevocation() {
14051419

14061420
taskManager.handleRevocation(taskId00Partitions);
14071421

1408-
verify(stateUpdater, never()).remove(task.id());
1409-
verify(tasks, never()).addPendingActiveTaskToSuspend(task.id());
1422+
verify(task, never()).suspend();
1423+
verify(tasks, never()).addTask(task);
1424+
verify(stateUpdater, never()).removeWithFuture(task.id());
1425+
}
1426+
1427+
@Test
1428+
public void shouldThrowIfRevokingTasksInStateUpdaterFindsFailedTasks() {
1429+
final StreamTask task1 = statefulTask(taskId00, taskId00ChangelogPartitions)
1430+
.inState(State.RESTORING)
1431+
.withInputPartitions(taskId00Partitions).build();
1432+
final StreamTask task2 = statefulTask(taskId01, taskId01ChangelogPartitions)
1433+
.inState(State.RESTORING)
1434+
.withInputPartitions(taskId01Partitions).build();
1435+
final TasksRegistry tasks = mock(TasksRegistry.class);
1436+
final TaskManager taskManager = setupForRevocationAndLost(mkSet(task1, task2), tasks);
1437+
final CompletableFuture<StateUpdater.RemovedTaskResult> future1 = new CompletableFuture<>();
1438+
when(stateUpdater.removeWithFuture(task1.id())).thenReturn(future1);
1439+
future1.complete(new StateUpdater.RemovedTaskResult(task1));
1440+
final CompletableFuture<StateUpdater.RemovedTaskResult> future2 = new CompletableFuture<>();
1441+
when(stateUpdater.removeWithFuture(task2.id())).thenReturn(future2);
1442+
final StreamsException streamsException = new StreamsException("Something happened");
1443+
future2.complete(new StateUpdater.RemovedTaskResult(task2, streamsException));
1444+
1445+
final StreamsException thrownException = assertThrows(
1446+
StreamsException.class,
1447+
() -> taskManager.handleRevocation(union(HashSet::new, taskId00Partitions, taskId01Partitions))
1448+
);
1449+
1450+
assertEquals(thrownException, streamsException);
1451+
verify(task1).suspend();
1452+
verify(tasks).addTask(task1);
1453+
verify(task2, never()).suspend();
1454+
verify(tasks).addTask(task2);
14101455
}
14111456

14121457
@Test

0 commit comments

Comments
 (0)