Skip to content
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
8b20d7d
KAFKA-20131: SubscriptionState endOffsetRequested is left set when LI…
kirktrue Feb 5, 2026
af7def2
WIP
kirktrue Feb 10, 2026
e91784c
Updates
kirktrue Feb 11, 2026
39bb381
Clearing partition end offset request flag for new consumer
kirktrue Feb 11, 2026
49b51b3
Comments and minor formatting
kirktrue Feb 11, 2026
5cda4f8
More comments
kirktrue Feb 11, 2026
047d3cf
Catching more cases
kirktrue Feb 12, 2026
d56e17e
Merge branch 'trunk' into KAFKA-20131-clear-endOffsetRequested-on-LIS…
kirktrue Feb 12, 2026
4d7073c
Merge branch 'trunk' into KAFKA-20131-clear-endOffsetRequested-on-LIS…
kirktrue Feb 13, 2026
15dbff5
Fixed typo in log message and verifying requestPartitionEndOffset inv…
kirktrue Feb 13, 2026
a1a71f5
Updates to remove changes to AsyncKafkaConsumer
kirktrue Feb 13, 2026
d393a59
Updates to remove changes to AsyncKafkaConsumer
kirktrue Feb 13, 2026
820f91e
More clean up for PR
kirktrue Feb 13, 2026
29e8810
Removed unnecessary changes to OffsetFetcher
kirktrue Feb 13, 2026
aedb1b4
Another minor refactor
kirktrue Feb 13, 2026
9ba4c96
Removed checking for log message output
kirktrue Feb 17, 2026
90aa258
Merge branch 'trunk' into KAFKA-20131-clear-endOffsetRequested-on-LIS…
kirktrue Feb 18, 2026
c5d5317
Revising the logic around currentLag() in ClassicKafkaConsumer and Of…
kirktrue Feb 18, 2026
6c72eb8
Reverting changes in ApplicationEventProcessor
kirktrue Feb 18, 2026
4d0727f
Reverted several changes to SubscriptionState
kirktrue Feb 18, 2026
ee3399c
Moved both setting and clearing of partitionEndOffsets flag to fetchO…
kirktrue Feb 18, 2026
ba79cf1
Minor clean up to reduce diff noise
kirktrue Feb 18, 2026
9fd51f8
Added setPartitionEndOffsetRequests
kirktrue Feb 19, 2026
5b13372
Renamed shouldUpdatePartitionEndOffsets → updatePartitionEndOffsetsFl…
kirktrue Feb 19, 2026
f3ead35
Adding testCurrentLagPreventsMultipleInFlightRequests test and minor …
kirktrue Feb 19, 2026
6a0613c
Minor refactoring of request count check to reformat and include message
kirktrue Feb 19, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1049,25 +1049,7 @@ public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partition
public OptionalLong currentLag(TopicPartition topicPartition) {
acquireAndEnsureOpen();
try {
final Long lag = subscriptions.partitionLag(topicPartition, isolationLevel);

// if the log end offset is not known and hence cannot return lag and there is
// no in-flight list offset requested yet,
// issue a list offset request for that partition so that next time
// we may get the answer; we do not need to wait for the return value
// since we would not try to poll the network client synchronously
if (lag == null) {
if (subscriptions.partitionEndOffset(topicPartition, isolationLevel) == null &&
!subscriptions.partitionEndOffsetRequested(topicPartition)) {
log.info("Requesting the log end offset for {} in order to compute lag", topicPartition);
subscriptions.requestPartitionEndOffset(topicPartition);
offsetFetcher.endOffsets(Collections.singleton(topicPartition), time.timer(0L));
}

return OptionalLong.empty();
}

return OptionalLong.of(lag);
return offsetFetcher.currentLag(topicPartition);
} finally {
release();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
Expand Down Expand Up @@ -126,7 +127,7 @@ public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartitio

try {
Map<TopicPartition, ListOffsetData> fetchedOffsets = fetchOffsetsByTimes(timestampsToSearch,
timer, true).fetchedOffsets;
timer, true, false).fetchedOffsets;

return buildOffsetsForTimesResult(timestampsToSearch, fetchedOffsets);
} finally {
Expand All @@ -136,11 +137,16 @@ public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartitio

private ListOffsetResult fetchOffsetsByTimes(Map<TopicPartition, Long> timestampsToSearch,
Timer timer,
boolean requireTimestamps) {
boolean requireTimestamps,
boolean shouldUpdatePartitionEndOffsets) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this name is very confusing because it clearly says it will update end offsets (first thing that comes to mind is an actual change to positions, not a flag).

Would it help if we rename to mention it's to update a flag (maybe updatePartitionEndOffsetsFlag), or at least a description of the param?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed the variable name to updatePartitionEndOffsetsFlag.

ListOffsetResult result = new ListOffsetResult();
if (timestampsToSearch.isEmpty())
return result;

if (shouldUpdatePartitionEndOffsets) {
offsetFetcherUtils.setPartitionEndOffsetRequests(timestampsToSearch.keySet());
}

Map<TopicPartition, Long> remainingToSearch = new HashMap<>(timestampsToSearch);
do {
RequestFuture<ListOffsetResult> future = sendListOffsetsRequests(remainingToSearch, requireTimestamps);
Expand All @@ -153,11 +159,17 @@ public void onSuccess(ListOffsetResult value) {
remainingToSearch.keySet().retainAll(value.partitionsToRetry);

offsetFetcherUtils.updateSubscriptionState(value.fetchedOffsets, isolationLevel);

if (shouldUpdatePartitionEndOffsets)
offsetFetcherUtils.clearPartitionEndOffsetRequests(remainingToSearch.keySet());
}
}

@Override
public void onFailure(RuntimeException e) {
if (shouldUpdatePartitionEndOffsets)
offsetFetcherUtils.clearPartitionEndOffsetRequests(remainingToSearch.keySet());

if (!(e instanceof RetriableException)) {
throw future.exception();
}
Expand Down Expand Up @@ -185,23 +197,48 @@ public void onFailure(RuntimeException e) {
}

public Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions, Timer timer) {
return beginningOrEndOffset(partitions, ListOffsetsRequest.EARLIEST_TIMESTAMP, timer);
return beginningOrEndOffset(partitions, ListOffsetsRequest.EARLIEST_TIMESTAMP, timer, false);
}

public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions, Timer timer) {
return beginningOrEndOffset(partitions, ListOffsetsRequest.LATEST_TIMESTAMP, timer);
return beginningOrEndOffset(partitions, ListOffsetsRequest.LATEST_TIMESTAMP, timer, false);
}

public OptionalLong currentLag(TopicPartition topicPartition) {
final Long lag = subscriptions.partitionLag(topicPartition, isolationLevel);

// if the log end offset is not known and hence cannot return lag and there is
// no in-flight list offset requested yet,
// issue a list offset request for that partition so that next time
// we may get the answer; we do not need to wait for the return value
// since we would not try to poll the network client synchronously
if (lag == null) {
if (subscriptions.partitionEndOffset(topicPartition, isolationLevel) == null) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

aren't we missing the check here to ensure there is no request in-flight?

We should also ensure we have a test for this: 2 consecutive calls to currentLag at this level, first one should generate a request, no response, second call should not generate the request. I would expect such test should be failing now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a unit test to catch multiple inflight LIST_OFFESTS requests.

beginningOrEndOffset(
Set.of(topicPartition),
ListOffsetsRequest.LATEST_TIMESTAMP,
time.timer(0L),
true
);
}

return OptionalLong.empty();
}

return OptionalLong.of(lag);
}

private Map<TopicPartition, Long> beginningOrEndOffset(Collection<TopicPartition> partitions,
long timestamp,
Timer timer) {
Timer timer,
boolean shouldUpdatePartitionEndOffsets) {
metadata.addTransientTopics(topicsForPartitions(partitions));
try {
Map<TopicPartition, Long> timestampsToSearch = partitions.stream()
.distinct()
.collect(Collectors.toMap(Function.identity(), tp -> timestamp));

ListOffsetResult result = fetchOffsetsByTimes(timestampsToSearch, timer, false);
ListOffsetResult result = fetchOffsetsByTimes(timestampsToSearch, timer, false, shouldUpdatePartitionEndOffsets);

return result.fetchedOffsets.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, entry -> entry.getValue().offset));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,44 @@ void updateSubscriptionState(Map<TopicPartition, OffsetFetcherUtils.ListOffsetDa
log.trace("Updating high watermark for partition {} to {}", partition, offset);
subscriptionState.updateHighWatermark(partition, offset);
}
} else {
if (isolationLevel == IsolationLevel.READ_COMMITTED) {
log.warn("Not updating last stable offset for partition {} as it is no longer assigned", partition);
} else {
log.warn("Not updating high watermark for partition {} as it is no longer assigned", partition);
}
}
}
}

/**
* The {@code LIST_OFFSETS} lag lookup is serialized, so if there's an inflight request it must finish before
* another request can be issued. This serialization mechanism is controlled by the 'end offset requested'
* flag in {@link SubscriptionState}.
*/
void setPartitionEndOffsetRequests(Collection<TopicPartition> partitions) {
for (final TopicPartition partition : partitions) {
if (subscriptionState.partitionEndOffsetRequested(partition)) {
log.info("Not requesting the log end offset for {} to compute lag as an outstanding request already exists", partition);
} else {
log.info("Requesting the log end offset for {} in order to compute lag", partition);
subscriptionState.requestPartitionEndOffset(partition);
}
}
}

/**
* If any of the given partitions are assigned, this will clear the partition's 'end offset requested' flag so
* that the next attempt to look up the lag will properly issue another <code>LIST_OFFSETS</code> request. This
* is only intended to be called when <code>LIST_OFFSETS</code> fails. Successful <code>LIST_OFFSETS</code> calls
* should use {@link #updateSubscriptionState(Map, IsolationLevel)}.
*
* @param partitions Partitions for which the 'end offset requested' flag should be cleared (if still assigned)
*/
void clearPartitionEndOffsetRequests(Collection<TopicPartition> partitions) {
for (final TopicPartition partition : partitions) {
if (subscriptionState.maybeClearPartitionEndOffsetRequested(partition)) {
log.trace("Clearing end offset requested for partition {}", partition);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -667,6 +667,17 @@ public synchronized boolean partitionEndOffsetRequested(TopicPartition tp) {
return topicPartitionState.endOffsetRequested();
}

public synchronized boolean maybeClearPartitionEndOffsetRequested(TopicPartition tp) {
TopicPartitionState topicPartitionState = assignedStateOrNull(tp);

if (topicPartitionState != null && topicPartitionState.endOffsetRequested()) {
topicPartitionState.clearEndOffset();
return true;
} else {
return false;
}
}

synchronized Long partitionLead(TopicPartition tp) {
TopicPartitionState topicPartitionState = assignedState(tp);
return topicPartitionState.logStartOffset == null ? null : topicPartitionState.position.offset - topicPartitionState.logStartOffset;
Expand Down Expand Up @@ -1037,6 +1048,10 @@ public void requestEndOffset() {
endOffsetRequested = true;
}

public void clearEndOffset() {
endOffsetRequested = false;
}

private void transitionState(FetchState newState, Runnable runIfTransitioned) {
FetchState nextState = this.fetchState.transitionTo(newState);
if (nextState.equals(newState)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.RecordDeserializationException;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.header.Headers;
Expand Down Expand Up @@ -177,6 +178,7 @@
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.atMostOnce;
import static org.mockito.Mockito.clearInvocations;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.never;
Expand Down Expand Up @@ -233,7 +235,7 @@ public class KafkaConsumerTest {

private final Collection<TopicPartition> singleTopicPartition = Set.of(new TopicPartition(topic, 0));
private final Time time = new MockTime();
private final SubscriptionState subscription = new SubscriptionState(new LogContext(), AutoOffsetResetStrategy.EARLIEST);
private final SubscriptionState subscription = spy(new SubscriptionState(new LogContext(), AutoOffsetResetStrategy.EARLIEST));
private final ConsumerPartitionAssignor assignor = new RoundRobinAssignor();

private KafkaConsumer<?, ?> consumer;
Expand Down Expand Up @@ -2729,6 +2731,148 @@ public void testCurrentLag(GroupProtocol groupProtocol) throws InterruptedExcept
assertEquals(OptionalLong.of(45L), consumer.currentLag(tp0));
}

// TODO: this test validate that the consumer clears the endOffsetRequested flag, but this is not yet implemented
// in the CONSUMER group protocol (see KAFKA-20187).
// Once it is implemented, this should use both group protocols.
@ParameterizedTest
@EnumSource(value = GroupProtocol.class, names = "CLASSIC")
public void testCurrentLagClearsFlagOnFatalPartitionError(GroupProtocol groupProtocol) throws InterruptedException {
final ConsumerMetadata metadata = createMetadata(subscription);
final MockClient client = new MockClient(time, metadata);

initMetadata(client, Map.of(topic, 1));

consumer = newConsumer(groupProtocol, time, client, subscription, metadata, assignor, false,
groupId, groupInstanceId, false);
consumer.assign(Set.of(tp0));

// poll once to update with the current metadata
consumer.poll(Duration.ofMillis(0));
TestUtils.waitForCondition(
() -> requestGenerated(client, ApiKeys.FIND_COORDINATOR),
"No FIND_COORDINATOR request sent within allotted timeout"
);
client.respond(FindCoordinatorResponse.prepareResponse(Errors.NONE, groupId, metadata.fetch().nodes().get(0)));

// Validate the state of the endOffsetRequested flag. It should be unset before the call to currentLag(),
// then set immediately afterward.
assertFalse(subscription.partitionEndOffsetRequested(tp0));
assertEquals(OptionalLong.empty(), consumer.currentLag(tp0));
assertTrue(subscription.partitionEndOffsetRequested(tp0));
verify(subscription).requestPartitionEndOffset(tp0);

if (groupProtocol == GroupProtocol.CLASSIC) {
// Classic consumer does not send the LIST_OFFSETS right away (requires an explicit poll),
// different from the new async consumer, that will send the LIST_OFFSETS request in the background
// thread on the next background thread poll.
consumer.poll(Duration.ofMillis(0));
}

TestUtils.waitForCondition(
() -> requestGenerated(client, ApiKeys.LIST_OFFSETS),
"No LIST_OFFSETS request sent within allotted timeout"
);

clearInvocations(subscription);

// Validate the state of the endOffsetRequested flag. It should still be set before the call to
// currentLag(), because the previous LIST_OFFSETS call has not received a response. In this case,
// the SubscriptionState.requestPartitionEndOffset() method should *not* have been invoked.
assertTrue(subscription.partitionEndOffsetRequested(tp0));
assertEquals(OptionalLong.empty(), consumer.currentLag(tp0));
assertTrue(subscription.partitionEndOffsetRequested(tp0));
verify(subscription, never()).requestPartitionEndOffset(tp0);

// Now respond to the LIST_OFFSETS request with an error in the partition.
ClientRequest listOffsetRequest = findRequest(client, ApiKeys.LIST_OFFSETS);
client.respondToRequest(listOffsetRequest, listOffsetsResponse(Map.of(), Map.of(tp0, Errors.TOPIC_AUTHORIZATION_FAILED)));

if (groupProtocol == GroupProtocol.CLASSIC) {
// Classic consumer does not send the LIST_OFFSETS right away (requires an explicit poll),
// different from the new async consumer, that will send the LIST_OFFSETS request in the background
// thread on the next background thread poll.
assertThrows(TopicAuthorizationException.class, () -> consumer.poll(Duration.ofMillis(0)));
}

// AsyncKafkaConsumer may take a moment to poll and process the LIST_OFFSETS response, so a repeated
// wait is appropriate here.
TestUtils.waitForCondition(
() -> !subscription.partitionEndOffsetRequested(tp0),
"endOffsetRequested flag was not cleared within allotted timeout"
);
}

// TODO: this test validate that the consumer clears the endOffsetRequested flag, but this is not yet implemented
// in the CONSUMER group protocol (see KAFKA-20187).
// Once it is implemented, this should use both group protocols.
@ParameterizedTest
@EnumSource(value = GroupProtocol.class, names = "CLASSIC")
public void testCurrentLagClearsFlagOnRetriablePartitionError(GroupProtocol groupProtocol) throws InterruptedException {
final ConsumerMetadata metadata = createMetadata(subscription);
final MockClient client = new MockClient(time, metadata);

initMetadata(client, Map.of(topic, 1));

consumer = newConsumer(groupProtocol, time, client, subscription, metadata, assignor, false,
groupId, groupInstanceId, false);
consumer.assign(Set.of(tp0));

// poll once to update with the current metadata
consumer.poll(Duration.ofMillis(0));
TestUtils.waitForCondition(
() -> requestGenerated(client, ApiKeys.FIND_COORDINATOR),
"No FIND_COORDINATOR request sent within allotted timeout"
);
client.respond(FindCoordinatorResponse.prepareResponse(Errors.NONE, groupId, metadata.fetch().nodes().get(0)));

// Validate the state of the endOffsetRequested flag. It should be unset before the call to currentLag(),
// then set immediately afterward.
assertFalse(subscription.partitionEndOffsetRequested(tp0));
assertEquals(OptionalLong.empty(), consumer.currentLag(tp0));
assertTrue(subscription.partitionEndOffsetRequested(tp0));
verify(subscription).requestPartitionEndOffset(tp0);

if (groupProtocol == GroupProtocol.CLASSIC) {
// Classic consumer does not send the LIST_OFFSETS right away (requires an explicit poll),
// different from the new async consumer, that will send the LIST_OFFSETS request in the background
// thread on the next background thread poll.
consumer.poll(Duration.ofMillis(0));
}

TestUtils.waitForCondition(
() -> requestGenerated(client, ApiKeys.LIST_OFFSETS),
"No LIST_OFFSETS request sent within allotted timeout"
);

clearInvocations(subscription);

// Validate the state of the endOffsetRequested flag. It should still be set before the call to
// currentLag(), because the previous LIST_OFFSETS call has not received a response. In this case,
// the SubscriptionState.requestPartitionEndOffset() method should *not* have been invoked.
assertTrue(subscription.partitionEndOffsetRequested(tp0));
assertEquals(OptionalLong.empty(), consumer.currentLag(tp0));
assertTrue(subscription.partitionEndOffsetRequested(tp0));
verify(subscription, never()).requestPartitionEndOffset(tp0);

// Now respond to the LIST_OFFSETS request with an error in the partition.
ClientRequest listOffsetRequest = findRequest(client, ApiKeys.LIST_OFFSETS);
client.respondToRequest(listOffsetRequest, listOffsetsResponse(Map.of(), Map.of(tp0, Errors.OFFSET_NOT_AVAILABLE)));

if (groupProtocol == GroupProtocol.CLASSIC) {
// Classic consumer does not send the LIST_OFFSETS right away (requires an explicit poll),
// different from the new async consumer, that will send the LIST_OFFSETS request in the background
// thread on the next background thread poll.
consumer.poll(Duration.ofMillis(0));
}

// AsyncKafkaConsumer may take a moment to poll and process the LIST_OFFSETS response, so a repeated
// wait is appropriate here.
TestUtils.waitForCondition(
() -> !subscription.partitionEndOffsetRequested(tp0),
"endOffsetRequested flag was not cleared within allotted timeout"
);
}

@ParameterizedTest
@EnumSource(GroupProtocol.class)
public void testListOffsetShouldUpdateSubscriptions(GroupProtocol groupProtocol) {
Expand Down