Skip to content

[WIP] KAFKA-18562: standardize election/fetch timeout between Unattached and Followers #18921

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: trunk
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3279,7 +3279,7 @@ private long pollUnattachedAsVoter(UnattachedState state, long currentTimeMs) {
// If shutting down, then remain in this state until either the
// shutdown completes or an epoch bump forces another state transition
return shutdown.remainingTimeMs();
} else if (state.hasElectionTimeoutExpired(currentTimeMs)) {
} else if (state.hasFetchTimeoutExpired(currentTimeMs)) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand correctly, we should use fetch timeout instead of election timeout when aligning with the follower's transition to the prospective state.

transitionToProspective(currentTimeMs);
return 0L;
} else {
Expand Down
7 changes: 6 additions & 1 deletion raft/src/main/java/org/apache/kafka/raft/QuorumState.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
*
* Unattached transitions to:
* Unattached: After learning of a new election with a higher epoch or after giving a binding vote
* Prospective: After expiration of the election timeout
* Prospective: After expiration of the fetch timeout
* Follower: After discovering a leader with an equal or larger epoch
*
* Prospective transitions to:
Expand Down Expand Up @@ -170,6 +170,7 @@ public void initialize(OffsetAndEpoch logEndOffsetAndEpoch) throws IllegalStateE
partitionState.lastVoterSet().voterIds(),
Optional.empty(),
randomElectionTimeoutMs(),
fetchTimeoutMs,
logContext
);
} else if (localId.isPresent() && election.isLeader(localId.getAsInt())) {
Expand Down Expand Up @@ -230,6 +231,7 @@ public void initialize(OffsetAndEpoch logEndOffsetAndEpoch) throws IllegalStateE
partitionState.lastVoterSet().voterIds(),
Optional.empty(),
randomElectionTimeoutMs(),
fetchTimeoutMs,
logContext
);
} else {
Expand All @@ -254,6 +256,7 @@ public void initialize(OffsetAndEpoch logEndOffsetAndEpoch) throws IllegalStateE
partitionState.lastVoterSet().voterIds(),
Optional.empty(),
randomElectionTimeoutMs(),
fetchTimeoutMs,
logContext
);
}
Expand Down Expand Up @@ -418,6 +421,7 @@ public void transitionToUnattached(int epoch, OptionalInt leaderId) {
partitionState.lastVoterSet().voterIds(),
state.highWatermark(),
electionTimeoutMs,
fetchTimeoutMs,
logContext
));
}
Expand Down Expand Up @@ -467,6 +471,7 @@ public void unattachedAddVotedState(
partitionState.lastVoterSet().voterIds(),
state.highWatermark(),
randomElectionTimeoutMs(),
fetchTimeoutMs,
logContext
)
);
Expand Down
14 changes: 14 additions & 0 deletions raft/src/main/java/org/apache/kafka/raft/UnattachedState.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@ public class UnattachedState implements EpochState {
private final Optional<ReplicaKey> votedKey;
private final Set<Integer> voters;
private final long electionTimeoutMs;
private final int fetchTimeoutMs;
private final Timer electionTimer;
private final Timer fetchTimer;
private final Optional<LogOffsetMetadata> highWatermark;
private final Logger log;

Expand All @@ -59,6 +61,7 @@ public UnattachedState(
Set<Integer> voters,
Optional<LogOffsetMetadata> highWatermark,
long electionTimeoutMs,
int fetchTimeoutMs,
LogContext logContext
) {
this.epoch = epoch;
Expand All @@ -67,7 +70,9 @@ public UnattachedState(
this.voters = voters;
this.highWatermark = highWatermark;
this.electionTimeoutMs = electionTimeoutMs;
this.fetchTimeoutMs = fetchTimeoutMs;
this.electionTimer = time.timer(electionTimeoutMs);
this.fetchTimer = time.timer(fetchTimeoutMs);
this.log = logContext.logger(UnattachedState.class);
}

Expand Down Expand Up @@ -105,6 +110,10 @@ public long electionTimeoutMs() {
return electionTimeoutMs;
}

public int fetchTimeoutMs() {
return fetchTimeoutMs;
}

public long remainingElectionTimeMs(long currentTimeMs) {
electionTimer.update(currentTimeMs);
return electionTimer.remainingMs();
Expand All @@ -115,6 +124,11 @@ public boolean hasElectionTimeoutExpired(long currentTimeMs) {
return electionTimer.isExpired();
}

public boolean hasFetchTimeoutExpired(long currentTimeMs) {
fetchTimer.update(currentTimeMs);
return fetchTimer.isExpired();
}

@Override
public Optional<LogOffsetMetadata> highWatermark() {
return highWatermark;
Expand Down
21 changes: 11 additions & 10 deletions raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -321,15 +321,16 @@ public void testInitializeAsResignedAndUnableToContactQuorum(boolean withKip853R
assertEquals(0L, context.log.endOffset().offset());
context.assertElectedLeader(epoch, localId);

// Election timeout
context.time.sleep(context.electionTimeoutMs());
// fetch timeout
context.time.sleep(context.fetchTimeoutMs);
context.client.poll();

// Become unattached with expired election timeout
assertTrue(context.client.quorum().isUnattached());
assertEquals(epoch + 1, context.currentEpoch());

// Become prospective immediately
// Become prospective after the fetch timeout
context.time.sleep(context.fetchTimeoutMs);
context.client.poll();
assertTrue(context.client.quorum().isProspective());

Expand Down Expand Up @@ -763,15 +764,15 @@ public void testElectionTimeoutAfterUserInitiatedResign(boolean withKip853Rpc) t
OptionalInt.of(localId)
);

// After the election timer, local should become unattached.
context.time.sleep(2L * context.electionTimeoutMs());
// After the fetch timeout, local should become unattached.
context.time.sleep(context.fetchTimeoutMs);
context.pollUntil(context.client.quorum()::isUnattached);
assertEquals(resignedEpoch + 1, context.currentEpoch());
assertEquals(new LeaderAndEpoch(OptionalInt.empty(), resignedEpoch + 1),
context.listener.currentLeaderAndEpoch());

// Local will become prospective right away
assertEquals(0, context.client.quorum().unattachedStateOrThrow().electionTimeoutMs());
context.time.sleep(context.fetchTimeoutMs);
context.client.poll();
assertTrue(context.client.quorum().isProspective());
}
Expand Down Expand Up @@ -886,8 +887,8 @@ public void testInitializeAsUnattachedAndBecomeLeader(boolean withKip853Rpc) thr
assertTrue(context.client.quorum().isUnattached());
assertTrue(context.client.quorum().isVoter());

// after election timeout should become prospective
context.time.sleep(context.electionTimeoutMs() * 2L);
// after fetch timeout should become prospective
context.time.sleep(context.fetchTimeoutMs);
context.pollUntilRequest();
assertTrue(context.client.quorum().isProspective());

Expand Down Expand Up @@ -1604,7 +1605,7 @@ public void testHandleVoteRequestAsProspectiveWithVotedCandidate(boolean withKip
.build();

// Sleep a little to ensure that we become a prospective
context.time.sleep(context.electionTimeoutMs() * 2L);
context.time.sleep(context.fetchTimeoutMs);
context.client.poll();
assertTrue(context.client.quorum().isProspectiveAndVoted());
context.assertVotedCandidate(epoch, votedCandidateKey.id());
Expand Down Expand Up @@ -4113,7 +4114,7 @@ public void testClusterAuthorizationFailedInBeginQuorumEpoch(boolean withKip853R
.withKip853Rpc(withKip853Rpc)
.build();

context.time.sleep(context.electionTimeoutMs());
context.time.sleep(context.fetchTimeoutMs);
context.expectAndGrantPreVotes(epoch - 1);
context.expectAndGrantVotes(epoch);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -572,7 +572,7 @@ static MemoryRecords buildBatch(
}

public void unattachedToCandidate() throws Exception {
time.sleep(electionTimeoutMs * 2L);
time.sleep(fetchTimeoutMs);
expectAndGrantPreVotes(currentEpoch());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public class UnattachedStateTest {
private final LogContext logContext = new LogContext();
private final int epoch = 5;
private final int electionTimeoutMs = 10000;
private final int fetchTimeoutMs = 15000;
private final Set<Integer> voters = Set.of(1, 2, 3);
private final ReplicaKey voter1Key = ReplicaKey.of(1, Uuid.randomUuid());
private final ReplicaKey votedKey = voter1Key;
Expand All @@ -55,6 +56,7 @@ private UnattachedState newUnattachedState(
voters,
Optional.empty(),
electionTimeoutMs,
fetchTimeoutMs,
logContext
);
}
Expand Down