diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java index 13e4a194eaa54..19e9dacceec2e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java @@ -984,6 +984,8 @@ public void close(final Timer timer) { } } finally { super.close(timer); + // Super-class close may wait for more commit callbacks to complete. + invokeCompletedOffsetCommitCallbacks(); } } @@ -1033,16 +1035,22 @@ public RequestFuture commitOffsetsAsync(final Map() { @Override public void onSuccess(Void value) { - pendingAsyncCommits.decrementAndGet(); - doCommitOffsetsAsync(offsets, callback); - client.pollNoWakeup(); + try { + doCommitOffsetsAsync(offsets, callback); + client.pollNoWakeup(); + } finally { + pendingAsyncCommits.decrementAndGet(); + } } @Override public void onFailure(RuntimeException e) { - pendingAsyncCommits.decrementAndGet(); - completedOffsetCommits.add(new OffsetCommitCompletion(callback, offsets, + try { + completedOffsetCommits.add(new OffsetCommitCompletion(callback, offsets, new RetriableCommitFailedException(e))); + } finally { + pendingAsyncCommits.decrementAndGet(); + } } }); } @@ -1061,25 +1069,29 @@ private RequestFuture doCommitOffsetsAsync(final Map() { @Override public void onSuccess(Void value) { - inFlightAsyncCommits.decrementAndGet(); - - if (interceptors != null) - interceptors.onCommit(offsets); - completedOffsetCommits.add(new OffsetCommitCompletion(cb, offsets, null)); + try { + if (interceptors != null) + interceptors.onCommit(offsets); + completedOffsetCommits.add(new OffsetCommitCompletion(cb, offsets, null)); + } finally { + inFlightAsyncCommits.decrementAndGet(); + } } @Override public void onFailure(RuntimeException e) { - inFlightAsyncCommits.decrementAndGet(); + try { + Exception commitException = e; - Exception commitException = e; - - if (e instanceof RetriableException) { - commitException = new RetriableCommitFailedException(e); - } - completedOffsetCommits.add(new OffsetCommitCompletion(cb, offsets, commitException)); - if (commitException instanceof FencedInstanceIdException) { - asyncCommitFenced.set(true); + if (e instanceof RetriableException) { + commitException = new RetriableCommitFailedException(e); + } + completedOffsetCommits.add(new OffsetCommitCompletion(cb, offsets, commitException)); + if (commitException instanceof FencedInstanceIdException) { + asyncCommitFenced.set(true); + } + } finally { + inFlightAsyncCommits.decrementAndGet(); } } }); @@ -1164,7 +1176,8 @@ public void maybeAutoCommitOffsetsAsync(long now) { } private boolean invokePendingAsyncCommits(Timer timer) { - if (inFlightAsyncCommits.get() == 0) { + if (pendingAsyncCommits.get() == 0 && inFlightAsyncCommits.get() == 0) { + invokeCompletedOffsetCommitCallbacks(); return true; } @@ -1174,7 +1187,8 @@ private boolean invokePendingAsyncCommits(Timer timer) { client.poll(timer); invokeCompletedOffsetCommitCallbacks(); - if (inFlightAsyncCommits.get() == 0) { + if (pendingAsyncCommits.get() == 0 && inFlightAsyncCommits.get() == 0) { + invokeCompletedOffsetCommitCallbacks(); return true; } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java index 954ed1c11e09b..944a9d189af1c 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java @@ -229,7 +229,11 @@ private GroupRebalanceConfig buildRebalanceConfig(Optional groupInstance @AfterEach public void teardown() { this.metrics.close(); - this.coordinator.close(time.timer(0)); + try { + this.coordinator.close(time.timer(0)); + } catch (Exception e) { + // ignore + } } @Test diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerCommitTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerCommitTest.scala index 1843696766ec5..5c95b074bc2d8 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerCommitTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerCommitTest.scala @@ -304,13 +304,12 @@ class PlaintextConsumerCommitTest extends AbstractConsumerTest { consumeAndVerifyRecords(consumer = otherConsumer, numRecords = 1, startingOffset = 5, startingTimestamp = startingTimestamp) } - // TODO: This only works in the new consumer, but should be fixed for the old consumer as well @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) - @MethodSource(Array("getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly")) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) def testCommitAsyncCompletedBeforeConsumerCloses(quorum: String, groupProtocol: String): Unit = { // This is testing the contract that asynchronous offset commit are completed before the consumer // is closed, even when no commit sync is performed as part of the close (due to auto-commit - // disabled, or simply because there are no consumed offsets). + // disabled, or simply because there no consumed offsets). val producer = createProducer() sendRecords(producer, numRecords = 3, tp) sendRecords(producer, numRecords = 3, tp2) @@ -326,9 +325,8 @@ class PlaintextConsumerCommitTest extends AbstractConsumerTest { assertEquals(2, cb.successCount) } - // TODO: This only works in the new consumer, but should be fixed for the old consumer as well @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) - @MethodSource(Array("getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly")) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) def testCommitAsyncCompletedBeforeCommitSyncReturns(quorum: String, groupProtocol: String): Unit = { // This is testing the contract that asynchronous offset commits sent previously with the // `commitAsync` are guaranteed to have their callbacks invoked prior to completion of