Skip to content

Commit 06f7cb3

Browse files
authored
MINOR: Unwrap exceptions in CoordinatorTimerImpl and CoordinatorExecutorImpl (#21458)
The exceptionally handlers in CoordinatorTimerImpl and CoordinatorExecutorImpl use instanceof checks to determine how to handle failures. However, exceptions propagated through CompletableFuture chains may be wrapped in CompletionException, causing the instanceof checks to fail and the exceptions to be misclassified. This could lead to incorrect behavior such as retrying when the coordinator is no longer active or logging at the wrong level. This patch unwraps exceptions using Errors.maybeUnwrapException at the top of each exceptionally handler before performing the instanceof checks. It also adds parameterized test coverage in CoordinatorTimerImplTest to verify correct behavior with both wrapped and unwrapped exceptions. Reviewers: Sean Quah <[email protected]>, Chia-Ping Tsai <[email protected]>
1 parent 6d9ba76 commit 06f7cb3

File tree

3 files changed

+30
-9
lines changed

3 files changed

+30
-9
lines changed

coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorExecutorImpl.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import org.apache.kafka.common.errors.CoordinatorLoadInProgressException;
2020
import org.apache.kafka.common.errors.NotCoordinatorException;
21+
import org.apache.kafka.common.protocol.Errors;
2122
import org.apache.kafka.common.utils.LogContext;
2223

2324
import org.slf4j.Logger;
@@ -85,6 +86,11 @@ public <R> boolean schedule(
8586
return operation.onComplete(result.result(), result.exception());
8687
}
8788
).exceptionally(exception -> {
89+
// Exceptions may be wrapped in CompletionException when propagated
90+
// through CompletableFuture chains, so we unwrap them before
91+
// checking types with instanceof.
92+
exception = Errors.maybeUnwrapException(exception);
93+
8894
// Remove the task after a failure.
8995
tasks.remove(key, task);
9096

coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorTimerImpl.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import org.apache.kafka.common.errors.CoordinatorLoadInProgressException;
2020
import org.apache.kafka.common.errors.NotCoordinatorException;
21+
import org.apache.kafka.common.protocol.Errors;
2122
import org.apache.kafka.common.utils.LogContext;
2223
import org.apache.kafka.server.util.timer.Timer;
2324
import org.apache.kafka.server.util.timer.TimerTask;
@@ -97,6 +98,11 @@ public void run() {
9798
return operation.generateRecords();
9899
}
99100
).exceptionally(ex -> {
101+
// Exceptions may be wrapped in CompletionException when propagated
102+
// through CompletableFuture chains, so we unwrap them before
103+
// checking types with instanceof.
104+
ex = Errors.maybeUnwrapException(ex);
105+
100106
// Remove the task after a failure.
101107
tasks.remove(key, this);
102108

coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorTimerImplTest.java

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,12 @@
2222
import org.apache.kafka.server.util.timer.MockTimer;
2323

2424
import org.junit.jupiter.api.Test;
25+
import org.junit.jupiter.params.ParameterizedTest;
26+
import org.junit.jupiter.params.provider.ValueSource;
2527

2628
import java.util.List;
2729
import java.util.concurrent.CompletableFuture;
30+
import java.util.concurrent.CompletionException;
2831
import java.util.concurrent.RejectedExecutionException;
2932
import java.util.concurrent.TimeUnit;
3033
import java.util.concurrent.atomic.AtomicBoolean;
@@ -313,15 +316,17 @@ public void testTimerNoRetryOnFailure() throws InterruptedException {
313316
assertEquals(1, callCount.get());
314317
}
315318

316-
@Test
317-
public void testTimerIgnoredOnNotCoordinatorException() throws InterruptedException {
319+
@ParameterizedTest
320+
@ValueSource(booleans = {true, false})
321+
public void testTimerIgnoredOnNotCoordinatorException(boolean wrapException) throws InterruptedException {
318322
var mockTimer = new MockTimer();
319323
var callCount = new AtomicInteger(0);
320324

321325
CoordinatorShardScheduler<String> scheduler = (operationName, operation) -> {
322326
operation.generate();
323327
callCount.incrementAndGet();
324-
return CompletableFuture.failedFuture(new NotCoordinatorException("Not coordinator"));
328+
var ex = new NotCoordinatorException("Not coordinator");
329+
return CompletableFuture.failedFuture(wrapException ? new CompletionException(ex) : ex);
325330
};
326331

327332
var timer = new CoordinatorTimerImpl<>(
@@ -354,15 +359,17 @@ public void testTimerIgnoredOnNotCoordinatorException() throws InterruptedExcept
354359
assertEquals(1, callCount.get());
355360
}
356361

357-
@Test
358-
public void testTimerIgnoredOnCoordinatorLoadInProgressException() throws InterruptedException {
362+
@ParameterizedTest
363+
@ValueSource(booleans = {true, false})
364+
public void testTimerIgnoredOnCoordinatorLoadInProgressException(boolean wrapException) throws InterruptedException {
359365
var mockTimer = new MockTimer();
360366
var callCount = new AtomicInteger(0);
361367

362368
CoordinatorShardScheduler<String> scheduler = (operationName, operation) -> {
363369
operation.generate();
364370
callCount.incrementAndGet();
365-
return CompletableFuture.failedFuture(new CoordinatorLoadInProgressException("Loading"));
371+
var ex = new CoordinatorLoadInProgressException("Loading");
372+
return CompletableFuture.failedFuture(wrapException ? new CompletionException(ex) : ex);
366373
};
367374

368375
var timer = new CoordinatorTimerImpl<>(
@@ -578,8 +585,9 @@ public void testDefaultRetryBackoff() throws InterruptedException {
578585
assertEquals(0, timer.size());
579586
}
580587

581-
@Test
582-
public void testTaskCleanupOnFailedFutureWithoutOperationExecution() throws InterruptedException {
588+
@ParameterizedTest
589+
@ValueSource(booleans = {true, false})
590+
public void testTaskCleanupOnFailedFutureWithoutOperationExecution(boolean wrapException) throws InterruptedException {
583591
var mockTimer = new MockTimer();
584592
var operationCalled = new AtomicBoolean(false);
585593

@@ -588,7 +596,8 @@ public void testTaskCleanupOnFailedFutureWithoutOperationExecution() throws Inte
588596
// (2) events failing before being executed.
589597
CoordinatorShardScheduler<String> scheduler = (operationName, operation) -> {
590598
// Don't call operation.generate() - simulates event never being executed
591-
return CompletableFuture.failedFuture(new NotCoordinatorException("Not coordinator"));
599+
NotCoordinatorException ex = new NotCoordinatorException("Not coordinator");
600+
return CompletableFuture.failedFuture(wrapException ? new CompletionException(ex) : ex);
592601
};
593602

594603
var timer = new CoordinatorTimerImpl<>(

0 commit comments

Comments
 (0)