Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ private void process(final ErrorEvent event) {
// Init value is needed to avoid NPE in case of exception raised in the constructor
private Optional<ClientTelemetryReporter> clientTelemetryReporter = Optional.empty();

private SharePollEvent inFlightPoll;
private final WakeupTrigger wakeupTrigger = new WakeupTrigger();

// currentThread holds the threadId of the current thread accessing the KafkaShareConsumer
Expand Down Expand Up @@ -614,15 +615,19 @@ public synchronized ConsumerRecords<K, V> poll(final Duration timeout) {

shouldSendShareFetchEvent = true;

do {
// Make sure the network thread can tell the application is actively polling
applicationEventHandler.add(new SharePollEvent(timer.currentTimeMs()));
// This distinguishes the first pass of the inner do/while loop from subsequent passes for the
// in-flight poll event logic.
boolean firstPass = true;

do {
// We must not allow wake-ups between polling for fetches and returning the records.
// A wake-up between returned fetches and returning records would lead to never
// returning the records in the fetches. Thus, we trigger a possible wake-up before we poll fetches.
wakeupTrigger.maybeTriggerWakeup();

// Make sure the network thread can tell the application is actively polling
checkInFlightPoll(timer, firstPass);
firstPass = false;
final ShareFetch<K, V> fetch = pollForFetches(timer);
if (!fetch.isEmpty()) {
currentFetch = fetch;
Expand All @@ -649,6 +654,49 @@ public synchronized ConsumerRecords<K, V> poll(final Duration timeout) {
}
}

private void checkInFlightPoll(Timer timer, boolean firstPass) {
if (firstPass && inFlightPoll != null) {
maybeClearPreviousInFlightPoll();
}

boolean newlySubmittedEvent = false;

if (inFlightPoll == null) {
inFlightPoll = new SharePollEvent(calculateDeadlineMs(timer), timer.currentTimeMs());
newlySubmittedEvent = true;
log.trace("In-flight event {} submitted", inFlightPoll);
applicationEventHandler.add(inFlightPoll);
}

timer.update();

if (inFlightPoll != null) {
maybeClearCurrentInFlightPoll(newlySubmittedEvent);
}
}

private void maybeClearPreviousInFlightPoll() {
if (inFlightPoll.isComplete()) {
log.trace("Previous in-flight event {} completed, clearing", inFlightPoll);
inFlightPoll = null;
} else if (inFlightPoll.isExpired(time)) {
log.trace("Previous in-flight event {} expired without completing, clearing", inFlightPoll);
inFlightPoll = null;
}
}

private void maybeClearCurrentInFlightPoll(boolean newlySubmittedEvent) {
if (inFlightPoll.isComplete()) {
log.trace("In-flight event {} completed without error, clearing", inFlightPoll);
inFlightPoll = null;
} else if (!newlySubmittedEvent) {
if (inFlightPoll.isExpired(time)) {
log.trace("In-flight event {} expired without completing, clearing", inFlightPoll);
inFlightPoll = null;
}
}
}

private ShareFetch<K, V> pollForFetches(final Timer timer) {
long pollTimeout = Math.min(applicationEventHandler.maximumTimeToWait(), timer.remainingMs());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.kafka.clients.consumer.internals.OffsetAndTimestampInternal;
import org.apache.kafka.clients.consumer.internals.RequestManagers;
import org.apache.kafka.clients.consumer.internals.ShareConsumeRequestManager;
import org.apache.kafka.clients.consumer.internals.ShareMembershipManager;
import org.apache.kafka.clients.consumer.internals.StreamsMembershipManager;
import org.apache.kafka.clients.consumer.internals.SubscriptionState;
import org.apache.kafka.common.Cluster;
Expand Down Expand Up @@ -228,12 +229,16 @@ public void process(ApplicationEvent event) {
}

private void process(final SharePollEvent event) {
requestManagers.shareMembershipManager.ifPresent(shareMembershipManager -> {
shareMembershipManager.maybeReconcile(true);
shareMembershipManager.onConsumerPoll();
requestManagers.shareMembershipManager.ifPresent(shareMembershipManager ->
shareMembershipManager.maybeReconcile(true));

requestManagers.shareHeartbeatRequestManager.ifPresent(hrm -> {
ShareMembershipManager membershipManager = hrm.membershipManager();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just for my understanding, is there a reason why we prefer to go through the HBReqMgr to get the shareMembershipManager here, instead of just calling onConsumerPoll on the shareMembershipManager we already had above?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just followed the pattern with the other group types. I don't think there was a reason this was different.

membershipManager.onConsumerPoll();
hrm.resetPollTimer(event.pollTimeMs());
});
requestManagers.shareHeartbeatRequestManager.ifPresent(hrm ->
hrm.resetPollTimer(event.pollTimeMs()));

event.completeSuccessfully();
}

private void process(final CreateFetchRequestsEvent event) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,53 @@
*/
package org.apache.kafka.clients.consumer.internals.events;

import org.apache.kafka.clients.consumer.ShareConsumer;
import org.apache.kafka.common.utils.Time;

import java.time.Duration;

public class SharePollEvent extends ApplicationEvent {

private final long deadlineMs;
private final long pollTimeMs;
private volatile boolean isComplete;

public SharePollEvent(final long pollTimeMs) {
/**
* @param deadlineMs Time, in milliseconds, at which point the event must be completed; based on the
* {@link Duration} passed to {@link ShareConsumer#poll(Duration)}
* @param pollTimeMs Time, in milliseconds, at which point the event was created
*/
public SharePollEvent(final long deadlineMs, final long pollTimeMs) {
super(Type.SHARE_POLL);
this.deadlineMs = deadlineMs;
this.pollTimeMs = pollTimeMs;
}

public long deadlineMs() {
return deadlineMs;
}

public long pollTimeMs() {
return pollTimeMs;
}

public boolean isExpired(final Time time) {
return time.milliseconds() >= deadlineMs();
}

public boolean isComplete() {
return isComplete;
}

public void completeSuccessfully() {
isComplete = true;
}

@Override
public String toStringBase() {
return super.toStringBase() + ", pollTimeMs=" + pollTimeMs;
return super.toStringBase() +
", deadlineMs=" + deadlineMs +
", pollTimeMs=" + pollTimeMs +
", isComplete=" + isComplete;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -306,14 +306,14 @@ public void testAsyncPollEvent() {

@Test
public void testSharePollEventCallsShareManagers() {
SharePollEvent event = new SharePollEvent(12345);
SharePollEvent event = new SharePollEvent(12346, 12345);

setupShareProcessor();
when(shareHeartbeatRequestManager.membershipManager()).thenReturn(shareMembershipManager);
processor.process(event);

assertTrue(event.isComplete());
verify(shareMembershipManager).maybeReconcile(true);
verify(shareMembershipManager).onConsumerPoll();

verify(shareHeartbeatRequestManager).resetPollTimer(event.pollTimeMs());
}

Expand Down
Loading