diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java index bc7e591d78b87..448e3436bd9b6 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java @@ -204,6 +204,7 @@ private void process(final ErrorEvent event) { // Init value is needed to avoid NPE in case of exception raised in the constructor private Optional clientTelemetryReporter = Optional.empty(); + private SharePollEvent inFlightPoll; private final WakeupTrigger wakeupTrigger = new WakeupTrigger(); // currentThread holds the threadId of the current thread accessing the KafkaShareConsumer @@ -614,15 +615,19 @@ public synchronized ConsumerRecords poll(final Duration timeout) { shouldSendShareFetchEvent = true; - do { - // Make sure the network thread can tell the application is actively polling - applicationEventHandler.add(new SharePollEvent(timer.currentTimeMs())); + // This distinguishes the first pass of the inner do/while loop from subsequent passes for the + // in-flight poll event logic. + boolean firstPass = true; + do { // We must not allow wake-ups between polling for fetches and returning the records. // A wake-up between returned fetches and returning records would lead to never // returning the records in the fetches. Thus, we trigger a possible wake-up before we poll fetches. wakeupTrigger.maybeTriggerWakeup(); + // Make sure the network thread can tell the application is actively polling + checkInFlightPoll(timer, firstPass); + firstPass = false; final ShareFetch fetch = pollForFetches(timer); if (!fetch.isEmpty()) { currentFetch = fetch; @@ -649,6 +654,49 @@ public synchronized ConsumerRecords poll(final Duration timeout) { } } + private void checkInFlightPoll(Timer timer, boolean firstPass) { + if (firstPass && inFlightPoll != null) { + maybeClearPreviousInFlightPoll(); + } + + boolean newlySubmittedEvent = false; + + if (inFlightPoll == null) { + inFlightPoll = new SharePollEvent(calculateDeadlineMs(timer), timer.currentTimeMs()); + newlySubmittedEvent = true; + log.trace("In-flight event {} submitted", inFlightPoll); + applicationEventHandler.add(inFlightPoll); + } + + timer.update(); + + if (inFlightPoll != null) { + maybeClearCurrentInFlightPoll(newlySubmittedEvent); + } + } + + private void maybeClearPreviousInFlightPoll() { + if (inFlightPoll.isComplete()) { + log.trace("Previous in-flight event {} completed, clearing", inFlightPoll); + inFlightPoll = null; + } else if (inFlightPoll.isExpired(time)) { + log.trace("Previous in-flight event {} expired without completing, clearing", inFlightPoll); + inFlightPoll = null; + } + } + + private void maybeClearCurrentInFlightPoll(boolean newlySubmittedEvent) { + if (inFlightPoll.isComplete()) { + log.trace("In-flight event {} completed without error, clearing", inFlightPoll); + inFlightPoll = null; + } else if (!newlySubmittedEvent) { + if (inFlightPoll.isExpired(time)) { + log.trace("In-flight event {} expired without completing, clearing", inFlightPoll); + inFlightPoll = null; + } + } + } + private ShareFetch pollForFetches(final Timer timer) { long pollTimeout = Math.min(applicationEventHandler.maximumTimeToWait(), timer.remainingMs()); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java index 314684b612978..5e11b3a0ef6e5 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java @@ -28,6 +28,7 @@ import org.apache.kafka.clients.consumer.internals.OffsetAndTimestampInternal; import org.apache.kafka.clients.consumer.internals.RequestManagers; import org.apache.kafka.clients.consumer.internals.ShareConsumeRequestManager; +import org.apache.kafka.clients.consumer.internals.ShareMembershipManager; import org.apache.kafka.clients.consumer.internals.StreamsMembershipManager; import org.apache.kafka.clients.consumer.internals.SubscriptionState; import org.apache.kafka.common.Cluster; @@ -228,12 +229,16 @@ public void process(ApplicationEvent event) { } private void process(final SharePollEvent event) { - requestManagers.shareMembershipManager.ifPresent(shareMembershipManager -> { - shareMembershipManager.maybeReconcile(true); - shareMembershipManager.onConsumerPoll(); + requestManagers.shareMembershipManager.ifPresent(shareMembershipManager -> + shareMembershipManager.maybeReconcile(true)); + + requestManagers.shareHeartbeatRequestManager.ifPresent(hrm -> { + ShareMembershipManager membershipManager = hrm.membershipManager(); + membershipManager.onConsumerPoll(); + hrm.resetPollTimer(event.pollTimeMs()); }); - requestManagers.shareHeartbeatRequestManager.ifPresent(hrm -> - hrm.resetPollTimer(event.pollTimeMs())); + + event.completeSuccessfully(); } private void process(final CreateFetchRequestsEvent event) { diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/SharePollEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/SharePollEvent.java index 2db7b18173c01..577f95a483707 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/SharePollEvent.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/SharePollEvent.java @@ -16,21 +16,53 @@ */ package org.apache.kafka.clients.consumer.internals.events; +import org.apache.kafka.clients.consumer.ShareConsumer; +import org.apache.kafka.common.utils.Time; + +import java.time.Duration; + public class SharePollEvent extends ApplicationEvent { + private final long deadlineMs; private final long pollTimeMs; + private volatile boolean isComplete; - public SharePollEvent(final long pollTimeMs) { + /** + * @param deadlineMs Time, in milliseconds, at which point the event must be completed; based on the + * {@link Duration} passed to {@link ShareConsumer#poll(Duration)} + * @param pollTimeMs Time, in milliseconds, at which point the event was created + */ + public SharePollEvent(final long deadlineMs, final long pollTimeMs) { super(Type.SHARE_POLL); + this.deadlineMs = deadlineMs; this.pollTimeMs = pollTimeMs; } + public long deadlineMs() { + return deadlineMs; + } + public long pollTimeMs() { return pollTimeMs; } + public boolean isExpired(final Time time) { + return time.milliseconds() >= deadlineMs(); + } + + public boolean isComplete() { + return isComplete; + } + + public void completeSuccessfully() { + isComplete = true; + } + @Override public String toStringBase() { - return super.toStringBase() + ", pollTimeMs=" + pollTimeMs; + return super.toStringBase() + + ", deadlineMs=" + deadlineMs + + ", pollTimeMs=" + pollTimeMs + + ", isComplete=" + isComplete; } } \ No newline at end of file diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java index 938a4b866186c..e3a9a05c4cd8c 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java @@ -306,14 +306,14 @@ public void testAsyncPollEvent() { @Test public void testSharePollEventCallsShareManagers() { - SharePollEvent event = new SharePollEvent(12345); + SharePollEvent event = new SharePollEvent(12346, 12345); setupShareProcessor(); + when(shareHeartbeatRequestManager.membershipManager()).thenReturn(shareMembershipManager); processor.process(event); - + assertTrue(event.isComplete()); verify(shareMembershipManager).maybeReconcile(true); verify(shareMembershipManager).onConsumerPoll(); - verify(shareHeartbeatRequestManager).resetPollTimer(event.pollTimeMs()); }