Skip to content

Commit beff261

Browse files
committed
KAFKA-18562: standardize election/fetch timeout between Unattached and Followers
JIRA: KAFKA-18562
1 parent 2b6e868 commit beff261

File tree

6 files changed

+35
-13
lines changed

6 files changed

+35
-13
lines changed

raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -3279,7 +3279,7 @@ private long pollUnattachedAsVoter(UnattachedState state, long currentTimeMs) {
32793279
// If shutting down, then remain in this state until either the
32803280
// shutdown completes or an epoch bump forces another state transition
32813281
return shutdown.remainingTimeMs();
3282-
} else if (state.hasElectionTimeoutExpired(currentTimeMs)) {
3282+
} else if (state.hasElectionTimeoutExpired(currentTimeMs) && state.hasFetchTimeoutExpired(currentTimeMs)) {
32833283
transitionToProspective(currentTimeMs);
32843284
return 0L;
32853285
} else {

raft/src/main/java/org/apache/kafka/raft/QuorumState.java

+6-1
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@
4545
*
4646
* Unattached transitions to:
4747
* Unattached: After learning of a new election with a higher epoch or after giving a binding vote
48-
* Prospective: After expiration of the election timeout
48+
* Prospective: After expiration of the election and fetch timeout
4949
* Follower: After discovering a leader with an equal or larger epoch
5050
*
5151
* Prospective transitions to:
@@ -170,6 +170,7 @@ public void initialize(OffsetAndEpoch logEndOffsetAndEpoch) throws IllegalStateE
170170
partitionState.lastVoterSet().voterIds(),
171171
Optional.empty(),
172172
randomElectionTimeoutMs(),
173+
fetchTimeoutMs,
173174
logContext
174175
);
175176
} else if (localId.isPresent() && election.isLeader(localId.getAsInt())) {
@@ -230,6 +231,7 @@ public void initialize(OffsetAndEpoch logEndOffsetAndEpoch) throws IllegalStateE
230231
partitionState.lastVoterSet().voterIds(),
231232
Optional.empty(),
232233
randomElectionTimeoutMs(),
234+
fetchTimeoutMs,
233235
logContext
234236
);
235237
} else {
@@ -254,6 +256,7 @@ public void initialize(OffsetAndEpoch logEndOffsetAndEpoch) throws IllegalStateE
254256
partitionState.lastVoterSet().voterIds(),
255257
Optional.empty(),
256258
randomElectionTimeoutMs(),
259+
fetchTimeoutMs,
257260
logContext
258261
);
259262
}
@@ -418,6 +421,7 @@ public void transitionToUnattached(int epoch, OptionalInt leaderId) {
418421
partitionState.lastVoterSet().voterIds(),
419422
state.highWatermark(),
420423
electionTimeoutMs,
424+
fetchTimeoutMs,
421425
logContext
422426
));
423427
}
@@ -467,6 +471,7 @@ public void unattachedAddVotedState(
467471
partitionState.lastVoterSet().voterIds(),
468472
state.highWatermark(),
469473
randomElectionTimeoutMs(),
474+
fetchTimeoutMs,
470475
logContext
471476
)
472477
);

raft/src/main/java/org/apache/kafka/raft/UnattachedState.java

+14
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,9 @@ public class UnattachedState implements EpochState {
4747
private final Optional<ReplicaKey> votedKey;
4848
private final Set<Integer> voters;
4949
private final long electionTimeoutMs;
50+
private final int fetchTimeoutMs;
5051
private final Timer electionTimer;
52+
private final Timer fetchTimer;
5153
private final Optional<LogOffsetMetadata> highWatermark;
5254
private final Logger log;
5355

@@ -59,6 +61,7 @@ public UnattachedState(
5961
Set<Integer> voters,
6062
Optional<LogOffsetMetadata> highWatermark,
6163
long electionTimeoutMs,
64+
int fetchTimeoutMs,
6265
LogContext logContext
6366
) {
6467
this.epoch = epoch;
@@ -67,7 +70,9 @@ public UnattachedState(
6770
this.voters = voters;
6871
this.highWatermark = highWatermark;
6972
this.electionTimeoutMs = electionTimeoutMs;
73+
this.fetchTimeoutMs = fetchTimeoutMs;
7074
this.electionTimer = time.timer(electionTimeoutMs);
75+
this.fetchTimer = time.timer(fetchTimeoutMs);
7176
this.log = logContext.logger(UnattachedState.class);
7277
}
7378

@@ -105,6 +110,10 @@ public long electionTimeoutMs() {
105110
return electionTimeoutMs;
106111
}
107112

113+
public int fetchTimeoutMs() {
114+
return fetchTimeoutMs;
115+
}
116+
108117
public long remainingElectionTimeMs(long currentTimeMs) {
109118
electionTimer.update(currentTimeMs);
110119
return electionTimer.remainingMs();
@@ -115,6 +124,11 @@ public boolean hasElectionTimeoutExpired(long currentTimeMs) {
115124
return electionTimer.isExpired();
116125
}
117126

127+
public boolean hasFetchTimeoutExpired(long currentTimeMs) {
128+
fetchTimer.update(currentTimeMs);
129+
return fetchTimer.isExpired();
130+
}
131+
118132
@Override
119133
public Optional<LogOffsetMetadata> highWatermark() {
120134
return highWatermark;

raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java

+11-10
Original file line numberDiff line numberDiff line change
@@ -321,15 +321,16 @@ public void testInitializeAsResignedAndUnableToContactQuorum(boolean withKip853R
321321
assertEquals(0L, context.log.endOffset().offset());
322322
context.assertElectedLeader(epoch, localId);
323323

324-
// Election timeout
325-
context.time.sleep(context.electionTimeoutMs());
324+
// fetch timeout
325+
context.time.sleep(context.fetchTimeoutMs);
326326
context.client.poll();
327327

328328
// Become unattached with expired election timeout
329329
assertTrue(context.client.quorum().isUnattached());
330330
assertEquals(epoch + 1, context.currentEpoch());
331331

332-
// Become prospective immediately
332+
// Become prospective after the fetch timeout
333+
context.time.sleep(context.fetchTimeoutMs);
333334
context.client.poll();
334335
assertTrue(context.client.quorum().isProspective());
335336

@@ -763,15 +764,15 @@ public void testElectionTimeoutAfterUserInitiatedResign(boolean withKip853Rpc) t
763764
OptionalInt.of(localId)
764765
);
765766

766-
// After the election timer, local should become unattached.
767-
context.time.sleep(2L * context.electionTimeoutMs());
767+
// After the fetch timeout, local should become unattached.
768+
context.time.sleep(context.fetchTimeoutMs);
768769
context.pollUntil(context.client.quorum()::isUnattached);
769770
assertEquals(resignedEpoch + 1, context.currentEpoch());
770771
assertEquals(new LeaderAndEpoch(OptionalInt.empty(), resignedEpoch + 1),
771772
context.listener.currentLeaderAndEpoch());
772773

773-
// Local will become prospective right away
774774
assertEquals(0, context.client.quorum().unattachedStateOrThrow().electionTimeoutMs());
775+
context.time.sleep(context.fetchTimeoutMs);
775776
context.client.poll();
776777
assertTrue(context.client.quorum().isProspective());
777778
}
@@ -886,8 +887,8 @@ public void testInitializeAsUnattachedAndBecomeLeader(boolean withKip853Rpc) thr
886887
assertTrue(context.client.quorum().isUnattached());
887888
assertTrue(context.client.quorum().isVoter());
888889

889-
// after election timeout should become prospective
890-
context.time.sleep(context.electionTimeoutMs() * 2L);
890+
// after fetch timeout should become prospective
891+
context.time.sleep(context.fetchTimeoutMs);
891892
context.pollUntilRequest();
892893
assertTrue(context.client.quorum().isProspective());
893894

@@ -1604,7 +1605,7 @@ public void testHandleVoteRequestAsProspectiveWithVotedCandidate(boolean withKip
16041605
.build();
16051606

16061607
// Sleep a little to ensure that we become a prospective
1607-
context.time.sleep(context.electionTimeoutMs() * 2L);
1608+
context.time.sleep(context.fetchTimeoutMs);
16081609
context.client.poll();
16091610
assertTrue(context.client.quorum().isProspectiveAndVoted());
16101611
context.assertVotedCandidate(epoch, votedCandidateKey.id());
@@ -4113,7 +4114,7 @@ public void testClusterAuthorizationFailedInBeginQuorumEpoch(boolean withKip853R
41134114
.withKip853Rpc(withKip853Rpc)
41144115
.build();
41154116

4116-
context.time.sleep(context.electionTimeoutMs());
4117+
context.time.sleep(context.fetchTimeoutMs);
41174118
context.expectAndGrantPreVotes(epoch - 1);
41184119
context.expectAndGrantVotes(epoch);
41194120

raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -572,7 +572,7 @@ static MemoryRecords buildBatch(
572572
}
573573

574574
public void unattachedToCandidate() throws Exception {
575-
time.sleep(electionTimeoutMs * 2L);
575+
time.sleep(fetchTimeoutMs);
576576
expectAndGrantPreVotes(currentEpoch());
577577
}
578578

raft/src/test/java/org/apache/kafka/raft/UnattachedStateTest.java

+2
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ public class UnattachedStateTest {
3939
private final LogContext logContext = new LogContext();
4040
private final int epoch = 5;
4141
private final int electionTimeoutMs = 10000;
42+
private final int fetchTimeoutMs = 15000;
4243
private final Set<Integer> voters = Set.of(1, 2, 3);
4344
private final ReplicaKey voter1Key = ReplicaKey.of(1, Uuid.randomUuid());
4445
private final ReplicaKey votedKey = voter1Key;
@@ -55,6 +56,7 @@ private UnattachedState newUnattachedState(
5556
voters,
5657
Optional.empty(),
5758
electionTimeoutMs,
59+
fetchTimeoutMs,
5860
logContext
5961
);
6062
}

0 commit comments

Comments
 (0)