Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -521,6 +521,19 @@ public boolean isReady(Node node, long now) {
return !metadataUpdater.isUpdateDue(now) && canSendRequest(node.idString(), now);
}

/**
* Returns true if we can send a {@link ApiKeys#LEAVE_GROUP} request to the given node at {@code now},
* ignoring the metadata-update priority gating in isReady().
* This method is internal helper for consumers: allow sending {@link ApiKeys#LEAVE_GROUP} even when metadata update is due.
*
* @param node The node
* @param now The current time in ms
* @return true if the node is ready
*/
public boolean isReadyForLeaveGroup(Node node, long now) {
return canSendRequest(node.idString(), now);
}

/**
* Are we connected and ready and able to send more requests to the given connection?
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.NetworkClientUtils;
import org.apache.kafka.clients.RequestCompletionHandler;
import org.apache.kafka.common.Node;
Expand All @@ -28,6 +29,7 @@
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
Expand Down Expand Up @@ -512,7 +514,24 @@ long trySend(long now) {

while (iterator.hasNext()) {
ClientRequest request = iterator.next();
if (client.ready(node, now)) {

AbstractRequest.Builder<?> builder = request.requestBuilder();
boolean ready = client.ready(node, now);

if (!ready
&& builder.apiKey() == ApiKeys.LEAVE_GROUP
&& client instanceof NetworkClient) {
// KAFKA-17397 && KAFKA-18031
// When a consumer is leaving a group, it does not need to
// wait for a metadata update. Otherwise, the LeaveGroup request can remain in unsent
// and an InterruptException may be thrown on the final poll(),
// preventing the LeaveGroup request from being sent to the group coordinator.
// Even though this is a normal shutdown path, it can delay termination until 'session.timeout.ms' elapses.
NetworkClient networkClient = (NetworkClient) client;
ready = networkClient.isReadyForLeaveGroup(node, now);
}

if (ready) {
client.send(request, now);
iterator.remove();
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import java.util
import kafka.utils.{TestInfoUtils, TestUtils}
import org.apache.kafka.clients.consumer._
import org.apache.kafka.common.errors.InterruptException
import org.apache.kafka.common.test.api.Flaky
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Timeout
import org.junit.jupiter.params.ParameterizedTest
Expand All @@ -27,7 +26,6 @@ import java.util.concurrent.ExecutionException
@Timeout(60)
class PlaintextConsumerTest extends AbstractConsumerTest {

@Flaky("KAFKA-18031")
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames)
@MethodSource(Array("getTestGroupProtocolParametersAll"))
def testCloseLeavesGroupOnInterrupt(groupProtocol: String): Unit = {
Expand Down