diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java index c227a9511b7b6..5b54a759a9844 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java @@ -1049,25 +1049,7 @@ public Map endOffsets(Collection partition public OptionalLong currentLag(TopicPartition topicPartition) { acquireAndEnsureOpen(); try { - final Long lag = subscriptions.partitionLag(topicPartition, isolationLevel); - - // if the log end offset is not known and hence cannot return lag and there is - // no in-flight list offset requested yet, - // issue a list offset request for that partition so that next time - // we may get the answer; we do not need to wait for the return value - // since we would not try to poll the network client synchronously - if (lag == null) { - if (subscriptions.partitionEndOffset(topicPartition, isolationLevel) == null && - !subscriptions.partitionEndOffsetRequested(topicPartition)) { - log.info("Requesting the log end offset for {} in order to compute lag", topicPartition); - subscriptions.requestPartitionEndOffset(topicPartition); - offsetFetcher.endOffsets(Collections.singleton(topicPartition), time.timer(0L)); - } - - return OptionalLong.empty(); - } - - return OptionalLong.of(lag); + return offsetFetcher.currentLag(topicPartition); } finally { release(); } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java index f9cca5c1339a2..446f39686de5f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java @@ -44,6 +44,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Map; +import java.util.OptionalLong; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; @@ -126,7 +127,7 @@ public Map offsetsForTimes(Map fetchedOffsets = fetchOffsetsByTimes(timestampsToSearch, - timer, true).fetchedOffsets; + timer, true, false).fetchedOffsets; return buildOffsetsForTimesResult(timestampsToSearch, fetchedOffsets); } finally { @@ -136,7 +137,8 @@ public Map offsetsForTimes(Map timestampsToSearch, Timer timer, - boolean requireTimestamps) { + boolean requireTimestamps, + boolean updatePartitionEndOffsetsFlag) { ListOffsetResult result = new ListOffsetResult(); if (timestampsToSearch.isEmpty()) return result; @@ -153,11 +155,17 @@ public void onSuccess(ListOffsetResult value) { remainingToSearch.keySet().retainAll(value.partitionsToRetry); offsetFetcherUtils.updateSubscriptionState(value.fetchedOffsets, isolationLevel); + + if (updatePartitionEndOffsetsFlag) + offsetFetcherUtils.clearPartitionEndOffsetRequests(remainingToSearch.keySet()); } } @Override public void onFailure(RuntimeException e) { + if (updatePartitionEndOffsetsFlag) + offsetFetcherUtils.clearPartitionEndOffsetRequests(remainingToSearch.keySet()); + if (!(e instanceof RetriableException)) { throw future.exception(); } @@ -185,23 +193,49 @@ public void onFailure(RuntimeException e) { } public Map beginningOffsets(Collection partitions, Timer timer) { - return beginningOrEndOffset(partitions, ListOffsetsRequest.EARLIEST_TIMESTAMP, timer); + return beginningOrEndOffset(partitions, ListOffsetsRequest.EARLIEST_TIMESTAMP, timer, false); } public Map endOffsets(Collection partitions, Timer timer) { - return beginningOrEndOffset(partitions, ListOffsetsRequest.LATEST_TIMESTAMP, timer); + return beginningOrEndOffset(partitions, ListOffsetsRequest.LATEST_TIMESTAMP, timer, false); + } + + public OptionalLong currentLag(TopicPartition topicPartition) { + final Long lag = subscriptions.partitionLag(topicPartition, isolationLevel); + + // if the log end offset is not known and hence cannot return lag and there is + // no in-flight list offset requested yet, + // issue a list offset request for that partition so that next time + // we may get the answer; we do not need to wait for the return value + // since we would not try to poll the network client synchronously + if (lag == null) { + if (subscriptions.partitionEndOffset(topicPartition, isolationLevel) == null && + offsetFetcherUtils.maybeSetPartitionEndOffsetRequest(topicPartition)) { + beginningOrEndOffset( + Set.of(topicPartition), + ListOffsetsRequest.LATEST_TIMESTAMP, + time.timer(0L), + true + ); + } + + return OptionalLong.empty(); + } + + return OptionalLong.of(lag); } private Map beginningOrEndOffset(Collection partitions, long timestamp, - Timer timer) { + Timer timer, + boolean updatePartitionEndOffsetsFlag) { metadata.addTransientTopics(topicsForPartitions(partitions)); try { Map timestampsToSearch = partitions.stream() .distinct() .collect(Collectors.toMap(Function.identity(), tp -> timestamp)); - ListOffsetResult result = fetchOffsetsByTimes(timestampsToSearch, timer, false); + ListOffsetResult result = fetchOffsetsByTimes(timestampsToSearch, timer, false, updatePartitionEndOffsetsFlag); return result.fetchedOffsets.entrySet().stream() .collect(Collectors.toMap(Map.Entry::getKey, entry -> entry.getValue().offset)); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherUtils.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherUtils.java index a92237d0fc9fc..f6f324be070a8 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherUtils.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherUtils.java @@ -277,6 +277,47 @@ void updateSubscriptionState(MapLIST_OFFSETS request. This + * is only intended to be called when LIST_OFFSETS fails. Successful LIST_OFFSETS calls + * should use {@link #updateSubscriptionState(Map, IsolationLevel)}. + * + * @param partitions Partitions for which the 'end offset requested' flag should be cleared (if still assigned) + */ + void clearPartitionEndOffsetRequests(Collection partitions) { + for (final TopicPartition partition : partitions) { + if (subscriptionState.maybeClearPartitionEndOffsetRequested(partition)) { + log.trace("Clearing end offset requested for partition {}", partition); } } } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java index 38ed6c668d93f..820b99235e8a2 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java @@ -667,6 +667,17 @@ public synchronized boolean partitionEndOffsetRequested(TopicPartition tp) { return topicPartitionState.endOffsetRequested(); } + public synchronized boolean maybeClearPartitionEndOffsetRequested(TopicPartition tp) { + TopicPartitionState topicPartitionState = assignedStateOrNull(tp); + + if (topicPartitionState != null && topicPartitionState.endOffsetRequested()) { + topicPartitionState.clearEndOffset(); + return true; + } else { + return false; + } + } + synchronized Long partitionLead(TopicPartition tp) { TopicPartitionState topicPartitionState = assignedState(tp); return topicPartitionState.logStartOffset == null ? null : topicPartitionState.position.offset - topicPartitionState.logStartOffset; @@ -1037,6 +1048,10 @@ public void requestEndOffset() { endOffsetRequested = true; } + public void clearEndOffset() { + endOffsetRequested = false; + } + private void transitionState(FetchState newState, Runnable runIfTransitioned) { FetchState nextState = this.fetchState.transitionTo(newState); if (nextState.equals(newState)) { diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java index a6fa2f0fa796f..9d1011601ab05 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java @@ -48,6 +48,7 @@ import org.apache.kafka.common.errors.InvalidTopicException; import org.apache.kafka.common.errors.RecordDeserializationException; import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.errors.TopicAuthorizationException; import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.header.Headers; @@ -177,6 +178,7 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.atMostOnce; +import static org.mockito.Mockito.clearInvocations; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mockStatic; import static org.mockito.Mockito.never; @@ -233,7 +235,7 @@ public class KafkaConsumerTest { private final Collection singleTopicPartition = Set.of(new TopicPartition(topic, 0)); private final Time time = new MockTime(); - private final SubscriptionState subscription = new SubscriptionState(new LogContext(), AutoOffsetResetStrategy.EARLIEST); + private final SubscriptionState subscription = spy(new SubscriptionState(new LogContext(), AutoOffsetResetStrategy.EARLIEST)); private final ConsumerPartitionAssignor assignor = new RoundRobinAssignor(); private KafkaConsumer consumer; @@ -2663,21 +2665,7 @@ public void testInvalidGroupMetadata(GroupProtocol groupProtocol) throws Interru public void testCurrentLag(GroupProtocol groupProtocol) throws InterruptedException { final ConsumerMetadata metadata = createMetadata(subscription); final MockClient client = new MockClient(time, metadata); - - initMetadata(client, Map.of(topic, 1)); - - consumer = newConsumer(groupProtocol, time, client, subscription, metadata, assignor, true, groupInstanceId); - - // throws for unassigned partition - assertThrows(IllegalStateException.class, () -> consumer.currentLag(tp0)); - - consumer.assign(Set.of(tp0)); - - // poll once to update with the current metadata - consumer.poll(Duration.ofMillis(0)); - TestUtils.waitForCondition(() -> requestGenerated(client, ApiKeys.FIND_COORDINATOR), - "No metadata requests sent"); - client.respond(FindCoordinatorResponse.prepareResponse(Errors.NONE, groupId, metadata.fetch().nodes().get(0))); + consumer = setUpConsumerForCurrentLag(groupProtocol, client, metadata); // no error for no current position assertEquals(OptionalLong.empty(), consumer.currentLag(tp0)); @@ -2729,6 +2717,155 @@ public void testCurrentLag(GroupProtocol groupProtocol) throws InterruptedExcept assertEquals(OptionalLong.of(45L), consumer.currentLag(tp0)); } + // TODO: this test validate that the consumer clears the endOffsetRequested flag, but this is not yet implemented + // in the CONSUMER group protocol (see KAFKA-20187). + // Once it is implemented, this should use both group protocols. + @ParameterizedTest + @EnumSource(value = GroupProtocol.class, names = "CLASSIC") + public void testCurrentLagPreventsMultipleInFlightRequests(GroupProtocol groupProtocol) throws InterruptedException { + final ConsumerMetadata metadata = createMetadata(subscription); + final MockClient client = new MockClient(time, metadata); + consumer = setUpConsumerForCurrentLag(groupProtocol, client, metadata); + + // Validate the state of the endOffsetRequested flag. It should be unset before the call to currentLag(), + // then set immediately afterward. + assertFalse(subscription.partitionEndOffsetRequested(tp0)); + assertEquals(OptionalLong.empty(), consumer.currentLag(tp0)); + assertEquals(OptionalLong.empty(), consumer.currentLag(tp0)); + + if (groupProtocol == GroupProtocol.CLASSIC) { + // Classic consumer does not send the LIST_OFFSETS right away (requires an explicit poll), + // different from the new async consumer, that will send the LIST_OFFSETS request in the background + // thread on the next background thread poll. + consumer.poll(Duration.ofMillis(0)); + } + + long count = client.requests().stream() + .filter(request -> request.requestBuilder().apiKey().equals(ApiKeys.LIST_OFFSETS)) + .count(); + assertEquals( + 1L, + count, + "Expected only one in-flight LIST_OFFSETS request for consumerLag(), but consumer submitted " + count + " requests" + ); + } + + // TODO: this test validate that the consumer clears the endOffsetRequested flag, but this is not yet implemented + // in the CONSUMER group protocol (see KAFKA-20187). + // Once it is implemented, this should use both group protocols. + @ParameterizedTest + @EnumSource(value = GroupProtocol.class, names = "CLASSIC") + public void testCurrentLagClearsFlagOnFatalPartitionError(GroupProtocol groupProtocol) throws InterruptedException { + final ConsumerMetadata metadata = createMetadata(subscription); + final MockClient client = new MockClient(time, metadata); + consumer = setUpConsumerForCurrentLag(groupProtocol, client, metadata); + + // Validate the state of the endOffsetRequested flag. It should be unset before the call to currentLag(), + // then set immediately afterward. + assertFalse(subscription.partitionEndOffsetRequested(tp0)); + assertEquals(OptionalLong.empty(), consumer.currentLag(tp0)); + assertTrue(subscription.partitionEndOffsetRequested(tp0)); + verify(subscription).requestPartitionEndOffset(tp0); + + if (groupProtocol == GroupProtocol.CLASSIC) { + // Classic consumer does not send the LIST_OFFSETS right away (requires an explicit poll), + // different from the new async consumer, that will send the LIST_OFFSETS request in the background + // thread on the next background thread poll. + consumer.poll(Duration.ofMillis(0)); + } + + TestUtils.waitForCondition( + () -> requestGenerated(client, ApiKeys.LIST_OFFSETS), + "No LIST_OFFSETS request sent within allotted timeout" + ); + + clearInvocations(subscription); + + // Validate the state of the endOffsetRequested flag. It should still be set before the call to + // currentLag(), because the previous LIST_OFFSETS call has not received a response. In this case, + // the SubscriptionState.requestPartitionEndOffset() method should *not* have been invoked. + assertTrue(subscription.partitionEndOffsetRequested(tp0)); + assertEquals(OptionalLong.empty(), consumer.currentLag(tp0)); + assertTrue(subscription.partitionEndOffsetRequested(tp0)); + verify(subscription, never()).requestPartitionEndOffset(tp0); + + // Now respond to the LIST_OFFSETS request with an error in the partition. + ClientRequest listOffsetRequest = findRequest(client, ApiKeys.LIST_OFFSETS); + client.respondToRequest(listOffsetRequest, listOffsetsResponse(Map.of(), Map.of(tp0, Errors.TOPIC_AUTHORIZATION_FAILED))); + + if (groupProtocol == GroupProtocol.CLASSIC) { + // Classic consumer does not send the LIST_OFFSETS right away (requires an explicit poll), + // different from the new async consumer, that will send the LIST_OFFSETS request in the background + // thread on the next background thread poll. + assertThrows(TopicAuthorizationException.class, () -> consumer.poll(Duration.ofMillis(0))); + } + + // AsyncKafkaConsumer may take a moment to poll and process the LIST_OFFSETS response, so a repeated + // wait is appropriate here. + TestUtils.waitForCondition( + () -> !subscription.partitionEndOffsetRequested(tp0), + "endOffsetRequested flag was not cleared within allotted timeout" + ); + } + + // TODO: this test validate that the consumer clears the endOffsetRequested flag, but this is not yet implemented + // in the CONSUMER group protocol (see KAFKA-20187). + // Once it is implemented, this should use both group protocols. + @ParameterizedTest + @EnumSource(value = GroupProtocol.class, names = "CLASSIC") + public void testCurrentLagClearsFlagOnRetriablePartitionError(GroupProtocol groupProtocol) throws InterruptedException { + final ConsumerMetadata metadata = createMetadata(subscription); + final MockClient client = new MockClient(time, metadata); + consumer = setUpConsumerForCurrentLag(groupProtocol, client, metadata); + + // Validate the state of the endOffsetRequested flag. It should be unset before the call to currentLag(), + // then set immediately afterward. + assertFalse(subscription.partitionEndOffsetRequested(tp0)); + assertEquals(OptionalLong.empty(), consumer.currentLag(tp0)); + assertTrue(subscription.partitionEndOffsetRequested(tp0)); + verify(subscription).requestPartitionEndOffset(tp0); + + if (groupProtocol == GroupProtocol.CLASSIC) { + // Classic consumer does not send the LIST_OFFSETS right away (requires an explicit poll), + // different from the new async consumer, that will send the LIST_OFFSETS request in the background + // thread on the next background thread poll. + consumer.poll(Duration.ofMillis(0)); + } + + TestUtils.waitForCondition( + () -> requestGenerated(client, ApiKeys.LIST_OFFSETS), + "No LIST_OFFSETS request sent within allotted timeout" + ); + + clearInvocations(subscription); + + // Validate the state of the endOffsetRequested flag. It should still be set before the call to + // currentLag(), because the previous LIST_OFFSETS call has not received a response. In this case, + // the SubscriptionState.requestPartitionEndOffset() method should *not* have been invoked. + assertTrue(subscription.partitionEndOffsetRequested(tp0)); + assertEquals(OptionalLong.empty(), consumer.currentLag(tp0)); + assertTrue(subscription.partitionEndOffsetRequested(tp0)); + verify(subscription, never()).requestPartitionEndOffset(tp0); + + // Now respond to the LIST_OFFSETS request with an error in the partition. + ClientRequest listOffsetRequest = findRequest(client, ApiKeys.LIST_OFFSETS); + client.respondToRequest(listOffsetRequest, listOffsetsResponse(Map.of(), Map.of(tp0, Errors.OFFSET_NOT_AVAILABLE))); + + if (groupProtocol == GroupProtocol.CLASSIC) { + // Classic consumer does not send the LIST_OFFSETS right away (requires an explicit poll), + // different from the new async consumer, that will send the LIST_OFFSETS request in the background + // thread on the next background thread poll. + consumer.poll(Duration.ofMillis(0)); + } + + // AsyncKafkaConsumer may take a moment to poll and process the LIST_OFFSETS response, so a repeated + // wait is appropriate here. + TestUtils.waitForCondition( + () -> !subscription.partitionEndOffsetRequested(tp0), + "endOffsetRequested flag was not cleared within allotted timeout" + ); + } + @ParameterizedTest @EnumSource(GroupProtocol.class) public void testListOffsetShouldUpdateSubscriptions(GroupProtocol groupProtocol) { @@ -2748,6 +2885,30 @@ public void testListOffsetShouldUpdateSubscriptions(GroupProtocol groupProtocol) assertEquals(OptionalLong.of(40L), consumer.currentLag(tp0)); } + private KafkaConsumer setUpConsumerForCurrentLag(GroupProtocol groupProtocol, + MockClient client, + ConsumerMetadata metadata) throws InterruptedException { + initMetadata(client, Map.of(topic, 1)); + + KafkaConsumer consumer = newConsumer(groupProtocol, time, client, subscription, metadata, assignor, false, + groupId, groupInstanceId, false); + + // throws for unassigned partition + assertThrows(IllegalStateException.class, () -> consumer.currentLag(tp0)); + + consumer.assign(Set.of(tp0)); + + // poll once to update with the current metadata + consumer.poll(Duration.ofMillis(0)); + TestUtils.waitForCondition( + () -> requestGenerated(client, ApiKeys.FIND_COORDINATOR), + "No FIND_COORDINATOR request sent within allotted timeout" + ); + client.respond(FindCoordinatorResponse.prepareResponse(Errors.NONE, groupId, metadata.fetch().nodes().get(0))); + + return consumer; + } + private ClientRequest findRequest(MockClient client, ApiKeys apiKey) { Optional request = client.requests().stream().filter(r -> r.requestBuilder().apiKey().equals(apiKey)).findFirst(); assertTrue(request.isPresent(), "No " + apiKey + " request was submitted to the client");