Skip to content

Commit 1ae278c

Browse files
committed
KAFKA-19804: Improve heartbeat request manager initial HB interval
1 parent 96f83be commit 1ae278c

File tree

6 files changed

+46
-12
lines changed

6 files changed

+46
-12
lines changed

clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ public abstract class AbstractHeartbeatRequestManager<R extends AbstractResponse
8787
private final BackgroundEventHandler backgroundEventHandler;
8888

8989
/**
90-
* Timer for tracking the time since the last consumer poll. If the timer expires, the consumer will stop
90+
* Timer for tracking the time since the last consumer poll. If the timer expires, the consumer will stop
9191
* sending heartbeat until the next poll.
9292
*/
9393
private final Timer pollTimer;
@@ -114,7 +114,8 @@ public abstract class AbstractHeartbeatRequestManager<R extends AbstractResponse
114114
this.maxPollIntervalMs = config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG);
115115
long retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
116116
long retryBackoffMaxMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
117-
this.heartbeatRequestState = new HeartbeatRequestState(logContext, time, 0, retryBackoffMs,
117+
int requestTimeout = config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
118+
this.heartbeatRequestState = new HeartbeatRequestState(logContext, time, requestTimeout, retryBackoffMs,
118119
retryBackoffMaxMs, RETRY_BACKOFF_JITTER);
119120
this.pollTimer = time.timer(maxPollIntervalMs);
120121
this.metricsManager = metricsManager;

clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestState.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,11 @@ public void onFailedAttempt(final long currentTimeMs) {
8888
@Override
8989
public boolean canSendRequest(final long currentTimeMs) {
9090
update(currentTimeMs);
91+
// Allow the first heartbeat to be sent immediately, after the first heartbeat is sent,
92+
// lastSentMs will be set by onSendAttempt(), and subsequent heartbeats will be controlled by the timer.
93+
if (lastSentMs == -1) {
94+
return super.canSendRequest(currentTimeMs);
95+
}
9196
return heartbeatTimer.isExpired() && super.canSendRequest(currentTimeMs);
9297
}
9398

clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -326,10 +326,11 @@ public StreamsGroupHeartbeatRequestManager(final LogContext logContext,
326326
long retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
327327
long retryBackoffMaxMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
328328
this.heartbeatState = new HeartbeatState(streamsRebalanceData, membershipManager, maxPollIntervalMs);
329+
int requestTimeout = config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
329330
this.heartbeatRequestState = new HeartbeatRequestState(
330331
logContext,
331332
time,
332-
0,
333+
requestTimeout,
333334
retryBackoffMs,
334335
retryBackoffMaxMs,
335336
RETRY_BACKOFF_JITTER

clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManagerTest.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@
8888
import static org.mockito.Mockito.mock;
8989
import static org.mockito.Mockito.never;
9090
import static org.mockito.Mockito.spy;
91+
import static org.mockito.Mockito.times;
9192
import static org.mockito.Mockito.verify;
9293
import static org.mockito.Mockito.when;
9394

@@ -97,6 +98,7 @@ public class ConsumerHeartbeatRequestManagerTest {
9798
private static final String DEFAULT_REMOTE_ASSIGNOR = "uniform";
9899
private static final String DEFAULT_GROUP_INSTANCE_ID = "group-instance-id";
99100
private static final int DEFAULT_HEARTBEAT_INTERVAL_MS = 1000;
101+
private static final int DEFAULT_REQUEST_TIMEOUT_MS = 30000;
100102
private static final int DEFAULT_MAX_POLL_INTERVAL_MS = 10000;
101103
private static final long DEFAULT_RETRY_BACKOFF_MS = 80;
102104
private static final long DEFAULT_RETRY_BACKOFF_MAX_MS = 1000;
@@ -218,7 +220,7 @@ public void testHeartBeatRequestStateToStringBase() {
218220
@Test
219221
public void testHeartbeatOnStartup() {
220222
NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds());
221-
assertEquals(0, result.unsentRequests.size());
223+
assertEquals(1, result.unsentRequests.size());
222224

223225
createHeartbeatRequestStateWithZeroHeartbeatInterval();
224226
assertEquals(0, heartbeatRequestManager.maximumTimeToWait(time.milliseconds()));
@@ -232,6 +234,7 @@ public void testHeartbeatOnStartup() {
232234

233235
@Test
234236
public void testSuccessfulHeartbeatTiming() {
237+
assertHeartbeat(heartbeatRequestManager, DEFAULT_HEARTBEAT_INTERVAL_MS); // the initial heartbeat request
235238
NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds());
236239
assertEquals(0, result.unsentRequests.size(),
237240
"No heartbeat should be sent while interval has not expired");
@@ -316,6 +319,7 @@ public void testSkippingHeartbeat(final boolean shouldSkipHeartbeat) {
316319

317320
@Test
318321
public void testTimerNotDue() {
322+
assertHeartbeat(heartbeatRequestManager, DEFAULT_HEARTBEAT_INTERVAL_MS); // the initial heartbeat request
319323
time.sleep(100); // time elapsed < heartbeatInterval, no heartbeat should be sent
320324
NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds());
321325

@@ -780,18 +784,21 @@ public void testPollOnLeaving(Optional<String> groupInstanceId, CloseOptions.Gro
780784
heartbeatState,
781785
heartbeatRequestState,
782786
backgroundEventHandler);
787+
// Clear the initial heartbeat request
788+
assertHeartbeat(heartbeatRequestManager, DEFAULT_HEARTBEAT_INTERVAL_MS);
783789
when(membershipManager.state()).thenReturn(MemberState.LEAVING);
784790
when(membershipManager.groupInstanceId()).thenReturn(groupInstanceId);
785791
when(membershipManager.leaveGroupOperation()).thenReturn(operation);
786792

787793
if (groupInstanceId.isEmpty() && REMAIN_IN_GROUP == operation) {
788794
assertNoHeartbeat(heartbeatRequestManager);
789-
verify(membershipManager, never()).onHeartbeatRequestGenerated();
795+
// The onHeartbeatRequestGenerated was triggered by the initial heartbeat request
796+
verify(membershipManager, times(1)).onHeartbeatRequestGenerated();
790797
} else {
791798
assertHeartbeat(heartbeatRequestManager, DEFAULT_HEARTBEAT_INTERVAL_MS);
792-
verify(membershipManager).onHeartbeatRequestGenerated();
799+
// The onHeartbeatRequestGenerated was triggered by the initial heartbeat request and leaveGroupOperation
800+
verify(membershipManager, times(2)).onHeartbeatRequestGenerated();
793801
}
794-
795802
}
796803

797804
/**

clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestStateTest.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,10 @@ public void testCanSendRequestAndTimeToNextHeartbeatMs() {
4747
JITTER
4848
);
4949

50+
assertTrue(heartbeatRequestState.canSendRequest(time.milliseconds())); // the initial heartbeat request attempt
51+
heartbeatRequestState.onSendAttempt(time.milliseconds());
52+
heartbeatRequestState.onSuccessfulAttempt(time.milliseconds());
53+
5054
assertFalse(heartbeatRequestState.canSendRequest(time.milliseconds()));
5155
assertEquals(HEARTBEAT_INTERVAL_MS, heartbeatRequestState.timeToNextHeartbeatMs(time.milliseconds()));
5256
time.sleep(HEARTBEAT_INTERVAL_MS - 1);
@@ -70,6 +74,11 @@ public void testResetTimer() {
7074
RETRY_BACKOFF_MAX_MS,
7175
JITTER
7276
);
77+
78+
assertTrue(heartbeatRequestState.canSendRequest(time.milliseconds())); // the initial heartbeat request attempt
79+
heartbeatRequestState.onSendAttempt(time.milliseconds());
80+
heartbeatRequestState.onSuccessfulAttempt(time.milliseconds());
81+
7382
time.sleep(HEARTBEAT_INTERVAL_MS + 100);
7483
assertTrue(heartbeatRequestState.canSendRequest(time.milliseconds()));
7584
assertEquals(0, heartbeatRequestState.timeToNextHeartbeatMs(time.milliseconds()));
@@ -90,6 +99,11 @@ public void testUpdateHeartbeatIntervalMs() {
9099
RETRY_BACKOFF_MAX_MS,
91100
JITTER
92101
);
102+
103+
assertTrue(heartbeatRequestState.canSendRequest(time.milliseconds())); // the initial heartbeat request attempt
104+
heartbeatRequestState.onSendAttempt(time.milliseconds());
105+
heartbeatRequestState.onSuccessfulAttempt(time.milliseconds());
106+
93107
final long updatedHeartbeatIntervalMs = 2 * HEARTBEAT_INTERVAL_MS;
94108
time.sleep(HEARTBEAT_INTERVAL_MS + 100);
95109

clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareHeartbeatRequestManagerTest.java

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,7 @@ private void createHeartbeatStateAndRequestManager() {
169169
@Test
170170
public void testHeartbeatOnStartup() {
171171
NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds());
172-
assertEquals(0, result.unsentRequests.size());
172+
assertEquals(1, result.unsentRequests.size()); // the initial heartbeat request attempt
173173

174174
createHeartbeatRequestStateWithZeroHeartbeatInterval();
175175
assertEquals(0, heartbeatRequestManager.maximumTimeToWait(time.milliseconds()));
@@ -184,14 +184,16 @@ public void testHeartbeatOnStartup() {
184184
@Test
185185
public void testSuccessfulHeartbeatTiming() {
186186
NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds());
187-
assertEquals(0, result.unsentRequests.size(),
188-
"No heartbeat should be sent while interval has not expired");
187+
assertEquals(1, result.unsentRequests.size(),
188+
"initial heartbeat request should be send on the first poll");
189189
assertEquals(heartbeatRequestState.timeToNextHeartbeatMs(time.milliseconds()), result.timeUntilNextPollMs);
190+
NetworkClientDelegate.UnsentRequest inflightReq = result.unsentRequests.get(0);
191+
inflightReq.handler().onComplete(createHeartbeatResponse(inflightReq, Errors.NONE));
190192
assertNextHeartbeatTiming(DEFAULT_HEARTBEAT_INTERVAL_MS);
191193

192194
result = heartbeatRequestManager.poll(time.milliseconds());
193195
assertEquals(1, result.unsentRequests.size(), "A heartbeat should be sent when interval expires");
194-
NetworkClientDelegate.UnsentRequest inflightReq = result.unsentRequests.get(0);
196+
inflightReq = result.unsentRequests.get(0);
195197
assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS,
196198
heartbeatRequestState.timeToNextHeartbeatMs(time.milliseconds()),
197199
"Heartbeat timer was not reset to the interval when the heartbeat request was sent.");
@@ -262,8 +264,12 @@ public void testSkippingHeartbeat(final boolean shouldSkipHeartbeat) {
262264

263265
@Test
264266
public void testTimerNotDue() {
265-
time.sleep(100); // time elapsed < heartbeatInterval, no heartbeat should be sent
266267
NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds());
268+
NetworkClientDelegate.UnsentRequest inflightReq = result.unsentRequests.get(0); // the initial heartbeat request attempt
269+
inflightReq.handler().onComplete(createHeartbeatResponse(inflightReq, Errors.NONE));
270+
271+
time.sleep(100); // time elapsed < heartbeatInterval, no heartbeat should be sent
272+
result = heartbeatRequestManager.poll(time.milliseconds());
267273

268274
assertEquals(0, result.unsentRequests.size());
269275
assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS - 100, result.timeUntilNextPollMs);

0 commit comments

Comments
 (0)