Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
8b20d7d
KAFKA-20131: SubscriptionState endOffsetRequested is left set when LI…
kirktrue Feb 5, 2026
af7def2
WIP
kirktrue Feb 10, 2026
e91784c
Updates
kirktrue Feb 11, 2026
39bb381
Clearing partition end offset request flag for new consumer
kirktrue Feb 11, 2026
49b51b3
Comments and minor formatting
kirktrue Feb 11, 2026
5cda4f8
More comments
kirktrue Feb 11, 2026
047d3cf
Catching more cases
kirktrue Feb 12, 2026
d56e17e
Merge branch 'trunk' into KAFKA-20131-clear-endOffsetRequested-on-LIS…
kirktrue Feb 12, 2026
4d7073c
Merge branch 'trunk' into KAFKA-20131-clear-endOffsetRequested-on-LIS…
kirktrue Feb 13, 2026
15dbff5
Fixed typo in log message and verifying requestPartitionEndOffset inv…
kirktrue Feb 13, 2026
a1a71f5
Updates to remove changes to AsyncKafkaConsumer
kirktrue Feb 13, 2026
d393a59
Updates to remove changes to AsyncKafkaConsumer
kirktrue Feb 13, 2026
820f91e
More clean up for PR
kirktrue Feb 13, 2026
29e8810
Removed unnecessary changes to OffsetFetcher
kirktrue Feb 13, 2026
aedb1b4
Another minor refactor
kirktrue Feb 13, 2026
9ba4c96
Removed checking for log message output
kirktrue Feb 17, 2026
90aa258
Merge branch 'trunk' into KAFKA-20131-clear-endOffsetRequested-on-LIS…
kirktrue Feb 18, 2026
c5d5317
Revising the logic around currentLag() in ClassicKafkaConsumer and Of…
kirktrue Feb 18, 2026
6c72eb8
Reverting changes in ApplicationEventProcessor
kirktrue Feb 18, 2026
4d0727f
Reverted several changes to SubscriptionState
kirktrue Feb 18, 2026
ee3399c
Moved both setting and clearing of partitionEndOffsets flag to fetchO…
kirktrue Feb 18, 2026
ba79cf1
Minor clean up to reduce diff noise
kirktrue Feb 18, 2026
9fd51f8
Added setPartitionEndOffsetRequests
kirktrue Feb 19, 2026
5b13372
Renamed shouldUpdatePartitionEndOffsets → updatePartitionEndOffsetsFl…
kirktrue Feb 19, 2026
f3ead35
Adding testCurrentLagPreventsMultipleInFlightRequests test and minor …
kirktrue Feb 19, 2026
6a0613c
Minor refactoring of request count check to reformat and include message
kirktrue Feb 19, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1057,11 +1057,17 @@ public OptionalLong currentLag(TopicPartition topicPartition) {
// we may get the answer; we do not need to wait for the return value
// since we would not try to poll the network client synchronously
if (lag == null) {
if (subscriptions.partitionEndOffset(topicPartition, isolationLevel) == null &&
!subscriptions.partitionEndOffsetRequested(topicPartition)) {
log.info("Requesting the log end offset for {} in order to compute lag", topicPartition);
subscriptions.requestPartitionEndOffset(topicPartition);
offsetFetcher.endOffsets(Collections.singleton(topicPartition), time.timer(0L));
if (subscriptions.partitionEndOffset(topicPartition, isolationLevel) == null) {
// The LIST_OFFSETS lag lookup is serialized, so if there's an inflight request it must
// finish before another request can be issued. This serialization mechanism is controlled
// by the 'end offset requested' flag in SubscriptionState.
if (subscriptions.partitionEndOffsetRequested(topicPartition)) {
log.info("Not requesting the log end offset for {} to compute lag as an outstanding request already exists", topicPartition);
} else {
log.info("Requesting the log end offset for {} in order to compute lag", topicPartition);
subscriptions.requestPartitionEndOffset(topicPartition);
offsetFetcher.endOffsets(Collections.singleton(topicPartition), time.timer(0L));
}
}

return OptionalLong.empty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,15 @@ private ListOffsetResult fetchOffsetsByTimes(Map<TopicPartition, Long> timestamp
if (timestampsToSearch.isEmpty())
return result;

// In the case that the user supplied a zero timeout to this method, only a single pass of the loop below will
// be performed before exiting. No TimeoutException will be thrown in that case. In the case that the single
// pass did not yield a response (either transient or fatal error), make sure to clear the relevant partitions'
// respective 'end offset requested' flags so that another attempt can be made by the user.
//
// If the timeout is not zero, the loop will be executed at least once. In the case that not all the partitions
// were found, a TimeoutException will be thrown.
boolean isZeroTimestamp = timer.timeoutMs() == 0L;

Map<TopicPartition, Long> remainingToSearch = new HashMap<>(timestampsToSearch);
do {
RequestFuture<ListOffsetResult> future = sendListOffsetsRequests(remainingToSearch, requireTimestamps);
Expand All @@ -153,11 +162,14 @@ public void onSuccess(ListOffsetResult value) {
remainingToSearch.keySet().retainAll(value.partitionsToRetry);

offsetFetcherUtils.updateSubscriptionState(value.fetchedOffsets, isolationLevel);
offsetFetcherUtils.clearPartitionEndOffsetRequests(remainingToSearch.keySet());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here we're clearing the flag for the partitions that didn't get offsets yet. I agree we need this if we don't have any time left to retry. But if there's still time, the do-while will try again. In that case, do we want to clear the flag here?

I would imagine we don't, because we'll continue retrying while there is time. It could be the case of missing leader info for instance: we want to keep the flag on for those partitions, hit the client.awaitMetadataUpdate(timer) below, and try again in the next iteration of the do-while, right?

If so, I imagine we could take the timer into consideration here? (clear the flag for the failed partitions only if timer expired?). Thoughts?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree we need this if we don't have any time left to retry. But if there's still time, the do-while will try again. In that case, do we want to clear the flag here?

That's precisely what happens in the currentLag() case, though. It's always using a timeout of 0, so there's never a second pass in that loop.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, we both agree we need it for currentLag/timerExpired. But in the way it's called now it applies to all cases, that's my concern. Isn't this going to clear the flag also in the case where there is time left to retry, and there is a partition that didn't have a known leader?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added an explicit parameter to 'clear end offsets requests' that only the ClassicKafkaConsumer.currentLag() sets to true. This should prevent other callers from clearing the flag, regardless of the timeout setting.

}
}

@Override
public void onFailure(RuntimeException e) {
offsetFetcherUtils.clearPartitionEndOffsetRequests(remainingToSearch.keySet());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above


if (!(e instanceof RetriableException)) {
throw future.exception();
}
Expand All @@ -167,7 +179,7 @@ public void onFailure(RuntimeException e) {
// if timeout is set to zero, do not try to poll the network client at all
// and return empty immediately; otherwise try to get the results synchronously
// and throw timeout exception if it cannot complete in time
if (timer.timeoutMs() == 0L)
if (isZeroTimestamp)
return result;

client.poll(future, timer);
Expand All @@ -181,6 +193,10 @@ public void onFailure(RuntimeException e) {
}
} while (timer.notExpired());

// If there are any remaining partitions that have not received responses, clear their respective
// 'end offset requested' flags so that another attempt can be made.
offsetFetcherUtils.clearPartitionEndOffsetRequests(remainingToSearch.keySet());

throw new TimeoutException("Failed to get offsets by times in " + timer.elapsedMs() + "ms");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
/**
* Utility functions for fetching offsets, validating and resetting positions.
*/
class OffsetFetcherUtils {
public class OffsetFetcherUtils {
private final ConsumerMetadata metadata;
private final SubscriptionState subscriptionState;
private final Time time;
Expand Down Expand Up @@ -277,6 +277,28 @@ void updateSubscriptionState(Map<TopicPartition, OffsetFetcherUtils.ListOffsetDa
log.trace("Updating high watermark for partition {} to {}", partition, offset);
subscriptionState.updateHighWatermark(partition, offset);
}
} else {
if (isolationLevel == IsolationLevel.READ_COMMITTED) {
log.warn("Not updating last stable offset for partition {} as it is no longer assigned", partition);
} else {
log.warn("Not updating high watermark for partition {} as it is no longer assigned", partition);
}
}
}
}

/**
* If any of the given partitions are assigned, this will clear the partition's 'end offset requested' flag so
* that the next attempt to look up the lag will properly issue another <code>LIST_OFFSETS</code> request. This
* is only intended to be called when <code>LIST_OFFSETS</code> fails. Successful <code>LIST_OFFSETS</code> calls
* should use {@link #updateSubscriptionState(Map, IsolationLevel)}.
*
* @param partitions Partitions for which the 'end offset requested' flag should be cleared (if still assigned)
*/
void clearPartitionEndOffsetRequests(Collection<TopicPartition> partitions) {
for (final TopicPartition partition : partitions) {
if (subscriptionState.maybeClearPartitionEndOffsetRequested(partition)) {
log.trace("Clearing partition end offset requested for partition {}", partition);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,30 +181,36 @@ public NetworkClientDelegate.PollResult poll(final long currentTimeMs) {
* @param timestampsToSearch Partitions and target timestamps to get offsets for
* @param requireTimestamps True if this should fail with an UnsupportedVersionException if the
* broker does not support fetching precise timestamps for offsets
* @param shouldRetry Determines if the code should attempt subsequent {@code LIST_OFFSETS} calls for
* any partitions that fail the first attempt
* @return Future containing the map of {@link TopicPartition} and {@link OffsetAndTimestamp}
* found .The future will complete when the requests responses are received and
* found. The future will complete when the requests responses are received and
* processed, following a call to {@link #poll(long)}
*/
public CompletableFuture<Map<TopicPartition, OffsetAndTimestampInternal>> fetchOffsets(
Map<TopicPartition, Long> timestampsToSearch,
boolean requireTimestamps) {
boolean requireTimestamps,
final boolean shouldRetry) {
if (timestampsToSearch.isEmpty()) {
return CompletableFuture.completedFuture(Collections.emptyMap());
}
metadata.addTransientTopics(OffsetFetcherUtils.topicsForPartitions(timestampsToSearch.keySet()));
ListOffsetsRequestState listOffsetsRequestState = new ListOffsetsRequestState(
timestampsToSearch,
requireTimestamps,
shouldRetry,
offsetFetcherUtils,
isolationLevel);
listOffsetsRequestState.globalResult.whenComplete((result, error) -> {
metadata.clearTransientTopics();
if (error != null) {
log.debug("Fetch offsets completed with error for partitions and timestamps {}.",
timestampsToSearch, error);
offsetFetcherUtils.clearPartitionEndOffsetRequests(timestampsToSearch.keySet());
} else {
log.debug("Fetch offsets completed successfully for partitions and timestamps {}." +
" Result {}", timestampsToSearch, result);
offsetFetcherUtils.clearPartitionEndOffsetRequests(result.partitionsToRetry);
}
});

Expand Down Expand Up @@ -559,7 +565,17 @@ private List<NetworkClientDelegate.UnsentRequest> buildListOffsetsRequests(
Map<Node, Map<TopicPartition, ListOffsetsRequestData.ListOffsetsPartition>> timestampsToSearchByNode =
groupListOffsetRequests(timestampsToSearch, Optional.of(listOffsetsRequestState));
if (timestampsToSearchByNode.isEmpty()) {
throw new StaleMetadataException();
if (listOffsetsRequestState.shouldRetry) {
// This exception is caught in prepareFetchOffsetsRequests() and signals to retry this set
// of partitions. Only do that if the caller wants to retry LIST_OFFSETS calls, of course.
throw new StaleMetadataException();
} else {
// If the caller doesn't want to retry LIST_OFFSETS calls, go ahead and clear the
// 'end offsets requested' flag since there won't be another chance. Then return an empty
// list to signal that there are no requests to be sent.
offsetFetcherUtils.clearPartitionEndOffsetRequests(timestampsToSearch.keySet());
return List.of();
}
}

final List<NetworkClientDelegate.UnsentRequest> unsentRequests = new ArrayList<>();
Expand All @@ -568,7 +584,13 @@ private List<NetworkClientDelegate.UnsentRequest> buildListOffsetsRequests(
// Done sending request to a set of known leaders
if (error == null) {
listOffsetsRequestState.fetchedOffsets.putAll(multiNodeResult.fetchedOffsets);
listOffsetsRequestState.addPartitionsToRetry(multiNodeResult.partitionsToRetry);

if (listOffsetsRequestState.shouldRetry) {
listOffsetsRequestState.addPartitionsToRetry(multiNodeResult.partitionsToRetry);
} else {
offsetFetcherUtils.clearPartitionEndOffsetRequests(multiNodeRequest.partitionsToRetry);
}

offsetFetcherUtils.updateSubscriptionState(multiNodeResult.fetchedOffsets,
isolationLevel);

Expand All @@ -578,7 +600,8 @@ private List<NetworkClientDelegate.UnsentRequest> buildListOffsetsRequests(
listOffsetsRequestState.remainingToSearch.keySet());
listOffsetsRequestState.globalResult.complete(listOffsetResult);
} else {
requestsToRetry.add(listOffsetsRequestState);
if (listOffsetsRequestState.shouldRetry)
requestsToRetry.add(listOffsetsRequestState);
metadata.requestUpdate(false);
}
} else {
Expand Down Expand Up @@ -822,11 +845,13 @@ private static class ListOffsetsRequestState {
private final Map<TopicPartition, Long> remainingToSearch;
private final CompletableFuture<ListOffsetResult> globalResult;
final boolean requireTimestamps;
final boolean shouldRetry;
final OffsetFetcherUtils offsetFetcherUtils;
final IsolationLevel isolationLevel;

private ListOffsetsRequestState(Map<TopicPartition, Long> timestampsToSearch,
boolean requireTimestamps,
boolean shouldRetry,
OffsetFetcherUtils offsetFetcherUtils,
IsolationLevel isolationLevel) {
remainingToSearch = new HashMap<>();
Expand All @@ -835,11 +860,15 @@ private ListOffsetsRequestState(Map<TopicPartition, Long> timestampsToSearch,

this.timestampsToSearch = timestampsToSearch;
this.requireTimestamps = requireTimestamps;
this.shouldRetry = shouldRetry;
this.offsetFetcherUtils = offsetFetcherUtils;
this.isolationLevel = isolationLevel;
}

private void addPartitionsToRetry(Set<TopicPartition> partitionsToRetry) {
if (!shouldRetry)
throw new IllegalStateException("Unexpected attempt to retry LIST_OFFSETS call for partitions (" + partitionsToRetry + ")");

remainingToSearch.putAll(partitionsToRetry.stream()
.collect(Collectors.toMap(tp -> tp, timestampsToSearch::get)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -659,14 +659,25 @@ public synchronized Long partitionEndOffset(TopicPartition tp, IsolationLevel is

public synchronized void requestPartitionEndOffset(TopicPartition tp) {
TopicPartitionState topicPartitionState = assignedState(tp);
topicPartitionState.requestEndOffset();
topicPartitionState.setRequestEndOffset(true);
}

public synchronized boolean partitionEndOffsetRequested(TopicPartition tp) {
TopicPartitionState topicPartitionState = assignedState(tp);
return topicPartitionState.endOffsetRequested();
}

public synchronized boolean maybeClearPartitionEndOffsetRequested(TopicPartition tp) {
TopicPartitionState topicPartitionState = assignedStateOrNull(tp);

if (topicPartitionState != null && topicPartitionState.endOffsetRequested()) {
topicPartitionState.setRequestEndOffset(false);
return true;
} else {
return false;
}
}

synchronized Long partitionLead(TopicPartition tp) {
TopicPartitionState topicPartitionState = assignedState(tp);
return topicPartitionState.logStartOffset == null ? null : topicPartitionState.position.offset - topicPartitionState.logStartOffset;
Expand Down Expand Up @@ -1033,8 +1044,8 @@ public boolean endOffsetRequested() {
return endOffsetRequested;
}

public void requestEndOffset() {
endOffsetRequested = true;
public void setRequestEndOffset(boolean endOffsetRequested) {
this.endOffsetRequested = endOffsetRequested;
}

private void transitionState(FetchState newState, Runnable runIfTransitioned) {
Expand Down Expand Up @@ -1238,7 +1249,7 @@ private boolean isFetchable() {

private void highWatermark(Long highWatermark) {
this.highWatermark = highWatermark;
this.endOffsetRequested = false;
setRequestEndOffset(false);
}

private void logStartOffset(Long logStartOffset) {
Expand All @@ -1247,7 +1258,7 @@ private void logStartOffset(Long logStartOffset) {

private void lastStableOffset(Long lastStableOffset) {
this.lastStableOffset = lastStableOffset;
this.endOffsetRequested = false;
setRequestEndOffset(false);
}

private AutoOffsetResetStrategy resetStrategy() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -313,8 +313,11 @@ private void process(final AssignmentChangeEvent event) {
* Handles ListOffsetsEvent by fetching the offsets for the given partitions and timestamps.
*/
private void process(final ListOffsetsEvent event) {
final CompletableFuture<Map<TopicPartition, OffsetAndTimestampInternal>> future =
requestManagers.offsetsRequestManager.fetchOffsets(event.timestampsToSearch(), event.requireTimestamps());
final CompletableFuture<Map<TopicPartition, OffsetAndTimestampInternal>> future = requestManagers.offsetsRequestManager.fetchOffsets(
event.timestampsToSearch(),
event.requireTimestamps(),
true
);
future.whenComplete(complete(event.future()));
}

Expand Down Expand Up @@ -656,21 +659,33 @@ private void process(final CurrentLagEvent event) {

final OptionalLong lagOpt;
if (lag == null) {
if (subscriptions.partitionEndOffset(topicPartition, isolationLevel) == null &&
!subscriptions.partitionEndOffsetRequested(topicPartition)) {
// If the log end offset is unknown and there isn't already an in-flight list offset
// request, issue one with the goal that the lag will be available the next time the
// user calls currentLag().
log.info("Requesting the log end offset for {} in order to compute lag", topicPartition);
subscriptions.requestPartitionEndOffset(topicPartition);

// Emulates the Consumer.endOffsets() logic...
Map<TopicPartition, Long> timestampToSearch = Collections.singletonMap(
topicPartition,
ListOffsetsRequest.LATEST_TIMESTAMP
);

requestManagers.offsetsRequestManager.fetchOffsets(timestampToSearch, false);
if (subscriptions.partitionEndOffset(topicPartition, isolationLevel) == null) {
// The LIST_OFFSETS lag lookup is serialized, so if there's an inflight request it must
// finish before another request can be issued. This serialization mechanism is controlled
// by the 'end offset requested' flag in SubscriptionState.
if (subscriptions.partitionEndOffsetRequested(topicPartition)) {
log.info("Not requesting the log end offset for {} to compute lag as an outstanding request already exists", topicPartition);
} else {
// If the log end offset is unknown and there isn't already an in-flight list offset
// request, issue one with the goal that the lag will be available the next time the
// user calls currentLag().
log.info("Requesting the log end offset for {} in order to compute lag", topicPartition);
subscriptions.requestPartitionEndOffset(topicPartition);

// Emulates the Consumer.endOffsets() logic...
Map<TopicPartition, Long> timestampToSearch = Collections.singletonMap(
topicPartition,
ListOffsetsRequest.LATEST_TIMESTAMP
);

// The currentLag() API is a "best effort" attempt at calling the LIST_OFFSETS RPC. If it
// fails, don't retry the attempt internally, but let the user attempt it again.
requestManagers.offsetsRequestManager.fetchOffsets(
timestampToSearch,
false,
false
);
}
}

lagOpt = OptionalLong.empty();
Expand Down
Loading