Skip to content

Commit c46d915

Browse files
committed
impl
1 parent f6c9fee commit c46d915

File tree

3 files changed

+98
-22
lines changed

3 files changed

+98
-22
lines changed

clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java

Lines changed: 35 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -984,6 +984,8 @@ public void close(final Timer timer) {
984984
}
985985
} finally {
986986
super.close(timer);
987+
// Super-class close may wait for more commit callbacks to complete.
988+
invokeCompletedOffsetCommitCallbacks();
987989
}
988990
}
989991

@@ -1033,16 +1035,22 @@ public RequestFuture<Void> commitOffsetsAsync(final Map<TopicPartition, OffsetAn
10331035
lookupCoordinator().addListener(new RequestFutureListener<Void>() {
10341036
@Override
10351037
public void onSuccess(Void value) {
1036-
pendingAsyncCommits.decrementAndGet();
1037-
doCommitOffsetsAsync(offsets, callback);
1038-
client.pollNoWakeup();
1038+
try {
1039+
doCommitOffsetsAsync(offsets, callback);
1040+
client.pollNoWakeup();
1041+
} finally {
1042+
pendingAsyncCommits.decrementAndGet();
1043+
}
10391044
}
10401045

10411046
@Override
10421047
public void onFailure(RuntimeException e) {
1043-
pendingAsyncCommits.decrementAndGet();
1044-
completedOffsetCommits.add(new OffsetCommitCompletion(callback, offsets,
1048+
try {
1049+
completedOffsetCommits.add(new OffsetCommitCompletion(callback, offsets,
10451050
new RetriableCommitFailedException(e)));
1051+
} finally {
1052+
pendingAsyncCommits.decrementAndGet();
1053+
}
10461054
}
10471055
});
10481056
}
@@ -1061,25 +1069,29 @@ private RequestFuture<Void> doCommitOffsetsAsync(final Map<TopicPartition, Offse
10611069
future.addListener(new RequestFutureListener<Void>() {
10621070
@Override
10631071
public void onSuccess(Void value) {
1064-
inFlightAsyncCommits.decrementAndGet();
1065-
1066-
if (interceptors != null)
1067-
interceptors.onCommit(offsets);
1068-
completedOffsetCommits.add(new OffsetCommitCompletion(cb, offsets, null));
1072+
try {
1073+
if (interceptors != null)
1074+
interceptors.onCommit(offsets);
1075+
completedOffsetCommits.add(new OffsetCommitCompletion(cb, offsets, null));
1076+
} finally {
1077+
inFlightAsyncCommits.decrementAndGet();
1078+
}
10691079
}
10701080

10711081
@Override
10721082
public void onFailure(RuntimeException e) {
1073-
inFlightAsyncCommits.decrementAndGet();
1083+
try {
1084+
Exception commitException = e;
10741085

1075-
Exception commitException = e;
1076-
1077-
if (e instanceof RetriableException) {
1078-
commitException = new RetriableCommitFailedException(e);
1079-
}
1080-
completedOffsetCommits.add(new OffsetCommitCompletion(cb, offsets, commitException));
1081-
if (commitException instanceof FencedInstanceIdException) {
1082-
asyncCommitFenced.set(true);
1086+
if (e instanceof RetriableException) {
1087+
commitException = new RetriableCommitFailedException(e);
1088+
}
1089+
completedOffsetCommits.add(new OffsetCommitCompletion(cb, offsets, commitException));
1090+
if (commitException instanceof FencedInstanceIdException) {
1091+
asyncCommitFenced.set(true);
1092+
}
1093+
} finally {
1094+
inFlightAsyncCommits.decrementAndGet();
10831095
}
10841096
}
10851097
});
@@ -1164,7 +1176,8 @@ public void maybeAutoCommitOffsetsAsync(long now) {
11641176
}
11651177

11661178
private boolean invokePendingAsyncCommits(Timer timer) {
1167-
if (inFlightAsyncCommits.get() == 0) {
1179+
if (pendingAsyncCommits.get() == 0 && inFlightAsyncCommits.get() == 0) {
1180+
invokeCompletedOffsetCommitCallbacks();
11681181
return true;
11691182
}
11701183

@@ -1174,7 +1187,8 @@ private boolean invokePendingAsyncCommits(Timer timer) {
11741187
client.poll(timer);
11751188
invokeCompletedOffsetCommitCallbacks();
11761189

1177-
if (inFlightAsyncCommits.get() == 0) {
1190+
if (pendingAsyncCommits.get() == 0 && inFlightAsyncCommits.get() == 0) {
1191+
invokeCompletedOffsetCommitCallbacks();
11781192
return true;
11791193
}
11801194

clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -229,7 +229,11 @@ private GroupRebalanceConfig buildRebalanceConfig(Optional<String> groupInstance
229229
@AfterEach
230230
public void teardown() {
231231
this.metrics.close();
232-
this.coordinator.close(time.timer(0));
232+
try {
233+
this.coordinator.close(time.timer(0));
234+
} catch (Exception e) {
235+
// ignore
236+
}
233237
}
234238

235239
@Test

core/src/test/scala/integration/kafka/api/PlaintextConsumerCommitTest.scala

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -304,6 +304,62 @@ class PlaintextConsumerCommitTest extends AbstractConsumerTest {
304304
consumeAndVerifyRecords(consumer = otherConsumer, numRecords = 1, startingOffset = 5, startingTimestamp = startingTimestamp)
305305
}
306306

307+
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
308+
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
309+
def testCommitAsyncCompletedBeforeConsumerCloses(quorum: String, groupProtocol: String): Unit = {
310+
// This is testing the contract that asynchronous offset commit are completed before the consumer
311+
// is closed, even when no commit sync is performed as part of the close (due to auto-commit
312+
// disabled, or simply because there no consumed offsets).
313+
val producer = createProducer()
314+
sendRecords(producer, numRecords = 3, tp)
315+
sendRecords(producer, numRecords = 3, tp2)
316+
317+
val consumer = createConsumer()
318+
consumer.assign(List(tp, tp2).asJava)
319+
320+
// Try without looking up the coordinator first
321+
val cb = new CountConsumerCommitCallback
322+
consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp, new OffsetAndMetadata(1L))).asJava, cb)
323+
consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp2, new OffsetAndMetadata(1L))).asJava, cb)
324+
consumer.close()
325+
assertEquals(2, cb.successCount)
326+
}
327+
328+
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
329+
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
330+
def testCommitAsyncCompletedBeforeCommitSyncReturns(quorum: String, groupProtocol: String): Unit = {
331+
// This is testing the contract that asynchronous offset commits sent previously with the
332+
// `commitAsync` are guaranteed to have their callbacks invoked prior to completion of
333+
// `commitSync` (given that it does not time out).
334+
val producer = createProducer()
335+
sendRecords(producer, numRecords = 3, tp)
336+
sendRecords(producer, numRecords = 3, tp2)
337+
338+
val consumer = createConsumer()
339+
consumer.assign(List(tp, tp2).asJava)
340+
341+
// Try without looking up the coordinator first
342+
val cb = new CountConsumerCommitCallback
343+
consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp, new OffsetAndMetadata(1L))).asJava, cb)
344+
consumer.commitSync(Map.empty[TopicPartition, OffsetAndMetadata].asJava)
345+
assertEquals(1, consumer.committed(Set(tp).asJava).get(tp).offset)
346+
assertEquals(1, cb.successCount)
347+
348+
// Try with coordinator known
349+
consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp, new OffsetAndMetadata(2L))).asJava, cb)
350+
consumer.commitSync(Map[TopicPartition, OffsetAndMetadata]((tp2, new OffsetAndMetadata(2L))).asJava)
351+
assertEquals(2, consumer.committed(Set(tp).asJava).get(tp).offset)
352+
assertEquals(2, consumer.committed(Set(tp2).asJava).get(tp2).offset)
353+
assertEquals(2, cb.successCount)
354+
355+
// Try with empty sync commit
356+
consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp, new OffsetAndMetadata(3L))).asJava, cb)
357+
consumer.commitSync(Map.empty[TopicPartition, OffsetAndMetadata].asJava)
358+
assertEquals(3, consumer.committed(Set(tp).asJava).get(tp).offset)
359+
assertEquals(2, consumer.committed(Set(tp2).asJava).get(tp2).offset)
360+
assertEquals(3, cb.successCount)
361+
}
362+
307363
def changeConsumerSubscriptionAndValidateAssignment[K, V](consumer: Consumer[K, V],
308364
topicsToSubscribe: List[String],
309365
expectedAssignment: Set[TopicPartition],
@@ -314,6 +370,8 @@ class PlaintextConsumerCommitTest extends AbstractConsumerTest {
314370
}
315371

316372
object PlaintextConsumerCommitTest {
373+
def getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly: Stream[Arguments] =
374+
BaseConsumerTest.getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly()
317375

318376
def getTestQuorumAndGroupProtocolParametersAll: Stream[Arguments] =
319377
BaseConsumerTest.getTestQuorumAndGroupProtocolParametersAll()

0 commit comments

Comments
 (0)