Skip to content

Commit ab88df7

Browse files
KAFKA-20535: Improve async consumer CPU usage under low max.poll.records. (apache#22199)
### Description KAFKA-20332 fixed a correctness issue in the async consumer where the application thread could collect buffered records before the background thread had checked for pending reconciliations. The fix added a wait on `inflightPoll.reconciliationCheckFuture()` in `AsyncKafkaConsumer.collectFetch()`. This restored the correctness guarantee, but it also increased CPU usage in low `max.poll.records` scenarios. With `max.poll.records=5`, profiling shows that the additional cost mainly comes from the application thread waiting on `ConsumerUtils.getResult(inflightPoll.reconciliationCheckFuture(), timeoutMs)` even when the consumer group member is not reconciling. This patch avoids that unnecessary wait by tracking the consumer group member state in `AsyncKafkaConsumer`. `AbstractMembershipManager` now notifies `MemberStateListener` whenever the consumer member transitions to a new state. `AsyncKafkaConsumer` uses this signal to wait for the reconciliation check only while the member is in `MemberState.RECONCILING`. ### Test Condition - Broker - stand alone - 12 vCPU, 32GB RAM. - Producer - Use `bin/kafka-producer-perf-test.sh` in Broker. - throughput 50000 - record-size 100 - Consumer (before, after, optimized) - kubernetes environment. - All consumers are scheduled on the same worker node. - Profiler - async-profiler - duration 180 seconds. - branch - `before`: 5a2dcf8 - `after`: 7e1c9db - `optimized`: this PR ### Test Result 1. Check the throughput of each consumer ``` Before - RATE: 49997.400259974005 records/sec, total=7050826 After - RATE: 50002.199780022 records/sec, total=7557854 Optimized - RATE: 50002.199780022 records/sec, total=7584198 ``` - All consumers have same throughput. 2. Average CPU usage from `kubectl top pod` | Revision | Average CPU | |---|---:| | Before | 225.7m | | After | 325.7m | | Optimized | 248.4m | every 30second, 10 times. The optimized version reduced CPU usage by about 23.7% compared with `after`. ### Flame Graph Summary | Metric | Before | After | Optimized | |---|---:|---:|---:| | Samples | 2,402 | 3,160 | 2,542 | | markReconciliationCheckComplete | 0.00% | 2.82% | 0.51% | | setActiveTask | 0.00% | 0.06% | 0.00% | | pollTimeMs | 0.00% | 0.00% | 0.00% | | AsyncPollEvent | 0.37% | 2.91% | 0.87% | | processBackgroundEvents | 2.37% | 1.93% | 2.28% | | Reaper | 0.17% | 0.19% | 0.35% | | parkNanos | 1.08% | 4.05% | 0.94% | | unpark | 0.37% | 2.06% | 0.63% | | AsyncKafkaConsumer.poll | 38.68% | 38.32% | 38.20% | | AsyncKafkaConsumer.collectFetch | 20.32% | 23.61% | 19.39% | | ApplicationEventProcessor.process | 2.66% | 6.46% | 2.83% | | ApplicationEventHandler.add | 7.87% | 7.37% | 8.06% | `After` shows higher CPU usage, and the profile also shows increased time in `parkNanos` and `unpark`. This suggests that the additional wait on `reconciliationCheckFuture` introduced more application/background thread coordination overhead. ### AsyncKafkaConsumer.collectFetch | Metric | Before | After | Optimized | |---|---:|---:|---:| | AsyncKafkaConsumer.collectFetch samples | 488 | 746 | 493 | | AsyncKafkaConsumer.collectFetch % | 20.32% | 23.61% | 19.39% | | FetchCollector.collectFetch samples | 482 | 512 | 477 | | FetchCollector.collectFetch % | 20.07% | 16.20% | 18.76% | | ConsumerUtils.getResult samples | 0 | 194 | 0 | | ConsumerUtils.getResult % | 0.00% | 6.14% | 0.00% | In `after`, the application thread spends additional time in `ConsumerUtils.getResult()` while waiting for the reconciliation check future. This also increases related park/unpark activity and application event processing on the background thread. In the optimized version, this wait is skipped unless the member is actually in `RECONCILING` state. ### ConsumerNetworkThread.runOnce | Metric | Before | After | Optimized | |---|---:|---:|---:| | ConsumerNetworkThread.runOnce samples | 1,216 | 1,621 | 1,295 | | ConsumerNetworkThread.runOnce % | 50.62% | 51.30% | 50.94% | | ConsumerNetworkThread.processApplicationEvents samples | 283 | 442 | 263 | | ConsumerNetworkThread.processApplicationEvents % | 11.78% | 13.99% | 10.35% | | FetchRequestManager.poll samples | 169 | 194 | 158 | | FetchRequestManager.poll % | 7.04% | 6.13% | 6.22% | | NetworkClientDelegate.poll samples | 600 | 755 | 684 | | NetworkClientDelegate.poll % | 24.98% | 23.89% | 26.91% | The higher CPU usage in `onsumerNetworkThread` appears to be a secondary effect of the application thread waiting on `reconciliationCheckFuture` more often. Each wait requires coordination between the application thread and the ackground thread: the app thread enqueues an `AsyncPollEvent`, waits for the reconciliation check to complete, and the background thread processes that event and completes the future. As a result, `ConsumerNetworkThread.processApplicationEvents` and related `AsyncPollEvent` processing show higher CPU usage in the after profile. Reviewers: Lianet Magrans <lmagrans@confluent.io>
1 parent 7b7fae7 commit ab88df7

4 files changed

Lines changed: 77 additions & 1 deletion

File tree

clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -245,6 +245,7 @@ protected void transitionTo(MemberState nextState) {
245245

246246
log.info("Member {} with epoch {} transitioned from {} to {}.", memberId, memberEpoch, state, nextState);
247247
this.state = nextState;
248+
stateUpdatesListeners.forEach(listener -> listener.onMemberStateChange(nextState));
248249
}
249250

250251
private static boolean isCompletingRebalance(MemberState currentState, MemberState nextState) {

clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -420,6 +420,8 @@ private StreamsRebalanceListenerInvoker streamsRebalanceListenerInvoker() {
420420
private final AtomicLong currentThread = new AtomicLong(NO_CURRENT_THREAD);
421421
private final AtomicInteger refCount = new AtomicInteger(0);
422422

423+
private volatile boolean hasPendingReconciliation = false;
424+
423425
private final MemberStateListener memberStateListener = new MemberStateListener() {
424426
@Override
425427
public void onMemberEpochUpdated(Optional<Integer> memberEpoch, String memberId) {
@@ -430,6 +432,11 @@ public void onMemberEpochUpdated(Optional<Integer> memberEpoch, String memberId)
430432
public void onGroupAssignmentUpdated(Set<TopicPartition> partitions) {
431433
setGroupAssignmentSnapshot(partitions);
432434
}
435+
436+
@Override
437+
public void onMemberStateChange(MemberState memberState) {
438+
setHasPendingReconciliation(memberState == MemberState.RECONCILING);
439+
}
433440
};
434441

435442
public AsyncKafkaConsumer(final ConsumerConfig config,
@@ -873,6 +880,10 @@ void setGroupAssignmentSnapshot(final Set<TopicPartition> partitions) {
873880
groupAssignmentSnapshot.set(Collections.unmodifiableSet(partitions));
874881
}
875882

883+
void setHasPendingReconciliation(final boolean hasPendingReconciliation) {
884+
this.hasPendingReconciliation = hasPendingReconciliation;
885+
}
886+
876887
@Override
877888
public void registerMetricForSubscription(KafkaMetric metric) {
878889
if (!metrics().containsKey(metric.metricName())) {
@@ -2028,7 +2039,7 @@ private Fetch<K, V> collectFetch() {
20282039
// This is key because partitions may need revocation, so we need to wait for the reconciliation check
20292040
// that triggers commits and marks partitions as pending revocation, before we can
20302041
// safely collect records from the buffer.
2031-
if (inflightPoll != null && !inflightPoll.isReconciliationCheckComplete()) {
2042+
if (hasPendingReconciliation && inflightPoll != null && !inflightPoll.isReconciliationCheckComplete()) {
20322043
// If the background hasn't had the time to check for pending reconciliation,
20332044
// we need to wait for that check before moving on (instead of returning empty right away,
20342045
// which will lead to blocking on buffer data)

clients/src/main/java/org/apache/kafka/clients/consumer/internals/MemberStateListener.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,4 +47,13 @@ public interface MemberStateListener {
4747
default void onGroupAssignmentUpdated(Set<TopicPartition> partitions) {
4848

4949
}
50+
51+
/**
52+
* Called whenever the member transitions to a new state.
53+
*
54+
* @param memberState The member state.
55+
*/
56+
default void onMemberStateChange(MemberState memberState) {
57+
58+
}
5059
}

clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -609,6 +609,7 @@ public void testPollWaitsForReconciliationCheckComplete() {
609609
// Do NOT mark reconciliation check complete - simulating background hasn't processed it yet
610610
return null;
611611
}).when(applicationEventHandler).add(ArgumentMatchers.isA(AsyncPollEvent.class));
612+
consumer.setHasPendingReconciliation(true);
612613

613614
// Poll should return empty because reconciliation check is not complete.
614615
ConsumerRecords<?, ?> result1 = consumer.poll(Duration.ZERO);
@@ -623,6 +624,60 @@ public void testPollWaitsForReconciliationCheckComplete() {
623624
assertEquals(2, result2.count(), "Expected 2 records after reconciliation check is complete");
624625
}
625626

627+
@Test
628+
public void testPollDoesNotWaitForReconciliationCheckIfNoPendingReconciliation() {
629+
final String topicName = "foo";
630+
final int partition = 3;
631+
final TopicPartition tp = new TopicPartition(topicName, partition);
632+
final List<ConsumerRecord<String, String>> records = asList(
633+
new ConsumerRecord<>(topicName, partition, 2, "key1", "value1"),
634+
new ConsumerRecord<>(topicName, partition, 3, "key2", "value2")
635+
);
636+
637+
SubscriptionState subscriptions = new SubscriptionState(new LogContext(), AutoOffsetResetStrategy.NONE);
638+
consumer = newConsumer(
639+
mock(FetchBuffer.class),
640+
new ConsumerInterceptors<>(Collections.emptyList(), metrics),
641+
mock(ConsumerRebalanceListenerInvoker.class),
642+
subscriptions);
643+
644+
doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any());
645+
// PositionsValidator starts with metadataUpdateVersion=-1. Stub metadata.updateVersion() to match,
646+
// so canSkipUpdateFetchPositions() passes and we test the reconciliation check path.
647+
doReturn(-1).when(metadata).updateVersion();
648+
649+
completeTopicSubscriptionChangeEventSuccessfully();
650+
consumer.subscribe(singleton(topicName), mock(ConsumerRebalanceListener.class));
651+
// Simulate partition assignment from group coordinator
652+
subscriptions.assignFromSubscribed(singleton(tp));
653+
654+
// Set up position so canSkipUpdateFetchPositions() returns true (partition in FETCHING state)
655+
completeSeekUnvalidatedEventSuccessfully();
656+
subscriptions.seek(tp, 0);
657+
658+
// Set up fetch collector to return records when called
659+
doReturn(Fetch.forPartition(tp, records, true, new OffsetAndMetadata(4, Optional.of(0), "")))
660+
.when(fetchCollector).collectFetch(any(FetchBuffer.class));
661+
662+
// Capture the AsyncPollEvent but leave the reconciliation check incomplete.
663+
// Since there is no pending reconciliation, poll should not wait for it.
664+
AtomicReference<AsyncPollEvent> capturedEvent = new AtomicReference<>();
665+
doAnswer(invocation -> {
666+
AsyncPollEvent event = invocation.getArgument(0);
667+
assertTrue(capturedEvent.compareAndSet(null, event));
668+
// Do NOT mark reconciliation check complete - simulating background hasn't processed it yet
669+
return null;
670+
}).when(applicationEventHandler).add(ArgumentMatchers.isA(AsyncPollEvent.class));
671+
consumer.setHasPendingReconciliation(false);
672+
673+
// Poll does not wait AsyncPollEvent if there is no pending reconciliation.
674+
ConsumerRecords<?, ?> result = consumer.poll(Duration.ZERO);
675+
676+
assertNotNull(capturedEvent.get(), "AsyncPollEvent should have been captured");
677+
assertFalse(capturedEvent.get().isReconciliationCheckComplete(), "Reconciliation check should still be incomplete");
678+
assertEquals(2, result.count(), "Expected records without waiting when no reconciliation is pending");
679+
}
680+
626681
@Test
627682
public void testEnsureCallbackExecutedByApplicationThread() {
628683
consumer = newConsumer();

0 commit comments

Comments
 (0)