Skip to content

KAFKA-16599: LegacyConsumer should always await pending async commits on commitSync and close #15693

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -984,6 +984,8 @@ public void close(final Timer timer) {
}
} finally {
super.close(timer);
// Super-class close may wait for more commit callbacks to complete.
invokeCompletedOffsetCommitCallbacks();
Comment on lines +987 to +988
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not familiar enough with this code. What situations would we want to invoke callbacks after we've closed the coordinator?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There may async commits that are completed during coordinator close. This is done to provide the guarantee that we want to complete async commits in Consumer.close (as long as we don't timeout). We also provide the guarantee that if an async commit completes, the corresponding callback is executed. So then we need to execute callbacks here.

This change could cause issues if we somehow interact with the consumer within the commit callback, but the consumer is already partially closed. Note that callback is already called during close (further above) where the coordinator isn't called yet. So essentially, this change will just make it "more commonplace" to call the callbacks during close.

But it would be an option to just leave the legacy consumer to have somewhat inconsistent behavior here, and just change it for the new consumer. WDYT?

}
}

Expand Down Expand Up @@ -1033,16 +1035,22 @@ public RequestFuture<Void> commitOffsetsAsync(final Map<TopicPartition, OffsetAn
lookupCoordinator().addListener(new RequestFutureListener<Void>() {
@Override
public void onSuccess(Void value) {
pendingAsyncCommits.decrementAndGet();
doCommitOffsetsAsync(offsets, callback);
client.pollNoWakeup();
try {
doCommitOffsetsAsync(offsets, callback);
client.pollNoWakeup();
} finally {
pendingAsyncCommits.decrementAndGet();
}
}

@Override
public void onFailure(RuntimeException e) {
pendingAsyncCommits.decrementAndGet();
completedOffsetCommits.add(new OffsetCommitCompletion(callback, offsets,
try {
completedOffsetCommits.add(new OffsetCommitCompletion(callback, offsets,
new RetriableCommitFailedException(e)));
} finally {
pendingAsyncCommits.decrementAndGet();
}
Comment on lines -1043 to +1053
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand moving the counter decrement to a finally block, but are we expecting that the add() method or constructor would throw an exception?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, this is just defensive programming

}
});
}
Expand All @@ -1061,25 +1069,29 @@ private RequestFuture<Void> doCommitOffsetsAsync(final Map<TopicPartition, Offse
future.addListener(new RequestFutureListener<Void>() {
@Override
public void onSuccess(Void value) {
inFlightAsyncCommits.decrementAndGet();

if (interceptors != null)
interceptors.onCommit(offsets);
completedOffsetCommits.add(new OffsetCommitCompletion(cb, offsets, null));
try {
if (interceptors != null)
interceptors.onCommit(offsets);
completedOffsetCommits.add(new OffsetCommitCompletion(cb, offsets, null));
} finally {
inFlightAsyncCommits.decrementAndGet();
}
}

@Override
public void onFailure(RuntimeException e) {
inFlightAsyncCommits.decrementAndGet();
try {
Exception commitException = e;

Exception commitException = e;

if (e instanceof RetriableException) {
commitException = new RetriableCommitFailedException(e);
}
completedOffsetCommits.add(new OffsetCommitCompletion(cb, offsets, commitException));
if (commitException instanceof FencedInstanceIdException) {
asyncCommitFenced.set(true);
if (e instanceof RetriableException) {
commitException = new RetriableCommitFailedException(e);
}
completedOffsetCommits.add(new OffsetCommitCompletion(cb, offsets, commitException));
if (commitException instanceof FencedInstanceIdException) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know you didn't write the original logic, but it seems funny to compare the exception against a FencedInstanceIdException here if up above commitException was converted to a RetriableCommitFailedException. I guess there's not a better way to construct this without potentially changing the logic to be incorrect.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FencedInstanceIdException is not retriable, so this shouldn't make a difference, right?

asyncCommitFenced.set(true);
}
} finally {
inFlightAsyncCommits.decrementAndGet();
}
}
});
Expand Down Expand Up @@ -1164,7 +1176,8 @@ public void maybeAutoCommitOffsetsAsync(long now) {
}

private boolean invokePendingAsyncCommits(Timer timer) {
if (inFlightAsyncCommits.get() == 0) {
if (pendingAsyncCommits.get() == 0 && inFlightAsyncCommits.get() == 0) {
invokeCompletedOffsetCommitCallbacks();
return true;
}

Expand All @@ -1174,7 +1187,8 @@ private boolean invokePendingAsyncCommits(Timer timer) {
client.poll(timer);
invokeCompletedOffsetCommitCallbacks();

if (inFlightAsyncCommits.get() == 0) {
if (pendingAsyncCommits.get() == 0 && inFlightAsyncCommits.get() == 0) {
invokeCompletedOffsetCommitCallbacks();
return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,11 @@ private GroupRebalanceConfig buildRebalanceConfig(Optional<String> groupInstance
@AfterEach
public void teardown() {
this.metrics.close();
this.coordinator.close(time.timer(0));
try {
this.coordinator.close(time.timer(0));
} catch (Exception e) {
// ignore
}
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -304,13 +304,12 @@ class PlaintextConsumerCommitTest extends AbstractConsumerTest {
consumeAndVerifyRecords(consumer = otherConsumer, numRecords = 1, startingOffset = 5, startingTimestamp = startingTimestamp)
}

// TODO: This only works in the new consumer, but should be fixed for the old consumer as well
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly"))
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testCommitAsyncCompletedBeforeConsumerCloses(quorum: String, groupProtocol: String): Unit = {
// This is testing the contract that asynchronous offset commit are completed before the consumer
// is closed, even when no commit sync is performed as part of the close (due to auto-commit
// disabled, or simply because there are no consumed offsets).
// disabled, or simply because there no consumed offsets).
val producer = createProducer()
sendRecords(producer, numRecords = 3, tp)
sendRecords(producer, numRecords = 3, tp2)
Expand All @@ -326,9 +325,8 @@ class PlaintextConsumerCommitTest extends AbstractConsumerTest {
assertEquals(2, cb.successCount)
}

// TODO: This only works in the new consumer, but should be fixed for the old consumer as well
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly"))
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testCommitAsyncCompletedBeforeCommitSyncReturns(quorum: String, groupProtocol: String): Unit = {
// This is testing the contract that asynchronous offset commits sent previously with the
// `commitAsync` are guaranteed to have their callbacks invoked prior to completion of
Expand Down