diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java index 0f7e726a38e44..05b2f99f7e5fe 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java @@ -47,6 +47,7 @@ import org.apache.kafka.clients.consumer.internals.events.ShareFetchEvent; import org.apache.kafka.clients.consumer.internals.events.ShareSubscriptionChangeEvent; import org.apache.kafka.clients.consumer.internals.events.ShareUnsubscribeEvent; +import org.apache.kafka.clients.consumer.internals.events.StopFindCoordinatorOnCloseEvent; import org.apache.kafka.clients.consumer.internals.metrics.AsyncConsumerMetrics; import org.apache.kafka.clients.consumer.internals.metrics.KafkaShareConsumerMetrics; import org.apache.kafka.common.KafkaException; @@ -886,6 +887,8 @@ private void close(final Duration timeout, final boolean swallowException) { // Prepare shutting down the network thread swallow(log, Level.ERROR, "Failed to release assignment before closing consumer", () -> sendAcknowledgementsAndLeaveGroup(closeTimer, firstException), firstException); + swallow(log, Level.ERROR, "Failed to stop finding coordinator", + this::stopFindCoordinatorOnClose, firstException); swallow(log, Level.ERROR, "Failed invoking acknowledgement commit callback", this::handleCompletedAcknowledgements, firstException); if (applicationEventHandler != null) @@ -914,6 +917,11 @@ private void close(final Duration timeout, final boolean swallowException) { } } + private void stopFindCoordinatorOnClose() { + log.debug("Stop finding coordinator during consumer close"); + applicationEventHandler.add(new StopFindCoordinatorOnCloseEvent()); + } + private Timer createTimerForCloseRequests(Duration timeout) { // this.time could be null if an exception occurs in constructor prior to setting the this.time field final Time time = (this.time == null) ? Time.SYSTEM : this.time; diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java index 48b63301af509..45cabc7b40714 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java @@ -29,6 +29,7 @@ import org.apache.kafka.clients.consumer.internals.events.ShareFetchEvent; import org.apache.kafka.clients.consumer.internals.events.ShareSubscriptionChangeEvent; import org.apache.kafka.clients.consumer.internals.events.ShareUnsubscribeEvent; +import org.apache.kafka.clients.consumer.internals.events.StopFindCoordinatorOnCloseEvent; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.TopicIdPartition; import org.apache.kafka.common.TopicPartition; @@ -48,6 +49,7 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; import org.mockito.ArgumentMatchers; +import org.mockito.InOrder; import org.mockito.Mockito; import java.time.Duration; @@ -77,6 +79,7 @@ import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -340,6 +343,25 @@ public void testCloseWithTopicAuthorizationException() { assertDoesNotThrow(() -> consumer.close()); } + @Test + public void testStopFindCoordinatorOnClose() { + SubscriptionState subscriptions = new SubscriptionState(new LogContext(), AutoOffsetResetStrategy.NONE); + consumer = newConsumer(subscriptions); + + // Setup the expected successful completion of close events + completeShareAcknowledgeOnCloseApplicationEventSuccessfully(); + completeShareUnsubscribeApplicationEventSuccessfully(subscriptions); + + // Close the consumer + consumer.close(); + + // Verify events are sent in correct order using InOrder + InOrder inOrder = inOrder(applicationEventHandler); + inOrder.verify(applicationEventHandler).addAndGet(any(ShareAcknowledgeOnCloseEvent.class)); + inOrder.verify(applicationEventHandler).add(any(ShareUnsubscribeEvent.class)); + inOrder.verify(applicationEventHandler).add(any(StopFindCoordinatorOnCloseEvent.class)); + } + @Test public void testVerifyApplicationEventOnShutdown() { SubscriptionState subscriptions = new SubscriptionState(new LogContext(), AutoOffsetResetStrategy.NONE);