Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
8b20d7d
KAFKA-20131: SubscriptionState endOffsetRequested is left set when LI…
kirktrue Feb 5, 2026
af7def2
WIP
kirktrue Feb 10, 2026
e91784c
Updates
kirktrue Feb 11, 2026
39bb381
Clearing partition end offset request flag for new consumer
kirktrue Feb 11, 2026
49b51b3
Comments and minor formatting
kirktrue Feb 11, 2026
5cda4f8
More comments
kirktrue Feb 11, 2026
047d3cf
Catching more cases
kirktrue Feb 12, 2026
d56e17e
Merge branch 'trunk' into KAFKA-20131-clear-endOffsetRequested-on-LIS…
kirktrue Feb 12, 2026
4d7073c
Merge branch 'trunk' into KAFKA-20131-clear-endOffsetRequested-on-LIS…
kirktrue Feb 13, 2026
15dbff5
Fixed typo in log message and verifying requestPartitionEndOffset inv…
kirktrue Feb 13, 2026
a1a71f5
Updates to remove changes to AsyncKafkaConsumer
kirktrue Feb 13, 2026
d393a59
Updates to remove changes to AsyncKafkaConsumer
kirktrue Feb 13, 2026
820f91e
More clean up for PR
kirktrue Feb 13, 2026
29e8810
Removed unnecessary changes to OffsetFetcher
kirktrue Feb 13, 2026
aedb1b4
Another minor refactor
kirktrue Feb 13, 2026
9ba4c96
Removed checking for log message output
kirktrue Feb 17, 2026
90aa258
Merge branch 'trunk' into KAFKA-20131-clear-endOffsetRequested-on-LIS…
kirktrue Feb 18, 2026
c5d5317
Revising the logic around currentLag() in ClassicKafkaConsumer and Of…
kirktrue Feb 18, 2026
6c72eb8
Reverting changes in ApplicationEventProcessor
kirktrue Feb 18, 2026
4d0727f
Reverted several changes to SubscriptionState
kirktrue Feb 18, 2026
ee3399c
Moved both setting and clearing of partitionEndOffsets flag to fetchO…
kirktrue Feb 18, 2026
ba79cf1
Minor clean up to reduce diff noise
kirktrue Feb 18, 2026
9fd51f8
Added setPartitionEndOffsetRequests
kirktrue Feb 19, 2026
5b13372
Renamed shouldUpdatePartitionEndOffsets → updatePartitionEndOffsetsFl…
kirktrue Feb 19, 2026
f3ead35
Adding testCurrentLagPreventsMultipleInFlightRequests test and minor …
kirktrue Feb 19, 2026
6a0613c
Minor refactoring of request count check to reformat and include message
kirktrue Feb 19, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1049,25 +1049,7 @@ public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partition
public OptionalLong currentLag(TopicPartition topicPartition) {
acquireAndEnsureOpen();
try {
final Long lag = subscriptions.partitionLag(topicPartition, isolationLevel);

// if the log end offset is not known and hence cannot return lag and there is
// no in-flight list offset requested yet,
// issue a list offset request for that partition so that next time
// we may get the answer; we do not need to wait for the return value
// since we would not try to poll the network client synchronously
if (lag == null) {
if (subscriptions.partitionEndOffset(topicPartition, isolationLevel) == null &&
!subscriptions.partitionEndOffsetRequested(topicPartition)) {
log.info("Requesting the log end offset for {} in order to compute lag", topicPartition);
subscriptions.requestPartitionEndOffset(topicPartition);
offsetFetcher.endOffsets(Collections.singleton(topicPartition), time.timer(0L));
}

return OptionalLong.empty();
}

return OptionalLong.of(lag);
return offsetFetcher.currentLag(topicPartition);
} finally {
release();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
Expand Down Expand Up @@ -126,7 +127,7 @@ public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartitio

try {
Map<TopicPartition, ListOffsetData> fetchedOffsets = fetchOffsetsByTimes(timestampsToSearch,
timer, true).fetchedOffsets;
timer, true, false).fetchedOffsets;

return buildOffsetsForTimesResult(timestampsToSearch, fetchedOffsets);
} finally {
Expand All @@ -136,7 +137,8 @@ public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartitio

private ListOffsetResult fetchOffsetsByTimes(Map<TopicPartition, Long> timestampsToSearch,
Timer timer,
boolean requireTimestamps) {
boolean requireTimestamps,
boolean updatePartitionEndOffsetsFlag) {
ListOffsetResult result = new ListOffsetResult();
if (timestampsToSearch.isEmpty())
return result;
Expand All @@ -153,11 +155,17 @@ public void onSuccess(ListOffsetResult value) {
remainingToSearch.keySet().retainAll(value.partitionsToRetry);

offsetFetcherUtils.updateSubscriptionState(value.fetchedOffsets, isolationLevel);

if (updatePartitionEndOffsetsFlag)
offsetFetcherUtils.clearPartitionEndOffsetRequests(remainingToSearch.keySet());
}
}

@Override
public void onFailure(RuntimeException e) {
if (updatePartitionEndOffsetsFlag)
offsetFetcherUtils.clearPartitionEndOffsetRequests(remainingToSearch.keySet());

if (!(e instanceof RetriableException)) {
throw future.exception();
}
Expand Down Expand Up @@ -185,23 +193,49 @@ public void onFailure(RuntimeException e) {
}

public Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions, Timer timer) {
return beginningOrEndOffset(partitions, ListOffsetsRequest.EARLIEST_TIMESTAMP, timer);
return beginningOrEndOffset(partitions, ListOffsetsRequest.EARLIEST_TIMESTAMP, timer, false);
}

public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions, Timer timer) {
return beginningOrEndOffset(partitions, ListOffsetsRequest.LATEST_TIMESTAMP, timer);
return beginningOrEndOffset(partitions, ListOffsetsRequest.LATEST_TIMESTAMP, timer, false);
}

public OptionalLong currentLag(TopicPartition topicPartition) {
final Long lag = subscriptions.partitionLag(topicPartition, isolationLevel);

// if the log end offset is not known and hence cannot return lag and there is
// no in-flight list offset requested yet,
// issue a list offset request for that partition so that next time
// we may get the answer; we do not need to wait for the return value
// since we would not try to poll the network client synchronously
if (lag == null) {
if (subscriptions.partitionEndOffset(topicPartition, isolationLevel) == null &&
offsetFetcherUtils.maybeSetPartitionEndOffsetRequest(topicPartition)) {
beginningOrEndOffset(
Set.of(topicPartition),
ListOffsetsRequest.LATEST_TIMESTAMP,
time.timer(0L),
true
);
}

return OptionalLong.empty();
}

return OptionalLong.of(lag);
}

private Map<TopicPartition, Long> beginningOrEndOffset(Collection<TopicPartition> partitions,
long timestamp,
Timer timer) {
Timer timer,
boolean updatePartitionEndOffsetsFlag) {
metadata.addTransientTopics(topicsForPartitions(partitions));
try {
Map<TopicPartition, Long> timestampsToSearch = partitions.stream()
.distinct()
.collect(Collectors.toMap(Function.identity(), tp -> timestamp));

ListOffsetResult result = fetchOffsetsByTimes(timestampsToSearch, timer, false);
ListOffsetResult result = fetchOffsetsByTimes(timestampsToSearch, timer, false, updatePartitionEndOffsetsFlag);

return result.fetchedOffsets.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, entry -> entry.getValue().offset));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,47 @@ void updateSubscriptionState(Map<TopicPartition, OffsetFetcherUtils.ListOffsetDa
log.trace("Updating high watermark for partition {} to {}", partition, offset);
subscriptionState.updateHighWatermark(partition, offset);
}
} else {
if (isolationLevel == IsolationLevel.READ_COMMITTED) {
log.warn("Not updating last stable offset for partition {} as it is no longer assigned", partition);
} else {
log.warn("Not updating high watermark for partition {} as it is no longer assigned", partition);
}
}
}
}

/**
* The {@code LIST_OFFSETS} lag lookup is serialized, so if there's an inflight request it must finish before
* another request can be issued. This serialization mechanism is controlled by the 'end offset requested'
* flag in {@link SubscriptionState}.
*
* @return {@code true} if the partition's end offset can be requested, {@code false} if there's already an
* in-flight request
*/
boolean maybeSetPartitionEndOffsetRequest(TopicPartition partition) {
if (subscriptionState.partitionEndOffsetRequested(partition)) {
log.info("Not requesting the log end offset for {} to compute lag as an outstanding request already exists", partition);
return false;
} else {
log.info("Requesting the log end offset for {} in order to compute lag", partition);
subscriptionState.requestPartitionEndOffset(partition);
return true;
}
}

/**
* If any of the given partitions are assigned, this will clear the partition's 'end offset requested' flag so
* that the next attempt to look up the lag will properly issue another <code>LIST_OFFSETS</code> request. This
* is only intended to be called when <code>LIST_OFFSETS</code> fails. Successful <code>LIST_OFFSETS</code> calls
* should use {@link #updateSubscriptionState(Map, IsolationLevel)}.
*
* @param partitions Partitions for which the 'end offset requested' flag should be cleared (if still assigned)
*/
void clearPartitionEndOffsetRequests(Collection<TopicPartition> partitions) {
for (final TopicPartition partition : partitions) {
if (subscriptionState.maybeClearPartitionEndOffsetRequested(partition)) {
log.trace("Clearing end offset requested for partition {}", partition);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -667,6 +667,17 @@ public synchronized boolean partitionEndOffsetRequested(TopicPartition tp) {
return topicPartitionState.endOffsetRequested();
}

public synchronized boolean maybeClearPartitionEndOffsetRequested(TopicPartition tp) {
TopicPartitionState topicPartitionState = assignedStateOrNull(tp);

if (topicPartitionState != null && topicPartitionState.endOffsetRequested()) {
topicPartitionState.clearEndOffset();
return true;
} else {
return false;
}
}

synchronized Long partitionLead(TopicPartition tp) {
TopicPartitionState topicPartitionState = assignedState(tp);
return topicPartitionState.logStartOffset == null ? null : topicPartitionState.position.offset - topicPartitionState.logStartOffset;
Expand Down Expand Up @@ -1037,6 +1048,10 @@ public void requestEndOffset() {
endOffsetRequested = true;
}

public void clearEndOffset() {
endOffsetRequested = false;
}

private void transitionState(FetchState newState, Runnable runIfTransitioned) {
FetchState nextState = this.fetchState.transitionTo(newState);
if (nextState.equals(newState)) {
Expand Down
Loading