Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion raft/src/main/java/org/apache/kafka/raft/LeaderState.java
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ public class LeaderState<T> implements EpochState {
// This field is non-empty if the voter set at epoch start came from a snapshot or log segment
private final OptionalLong offsetOfVotersAtEpochStart;
private final KRaftVersion kraftVersionAtEpochStart;
private final Optional<ReplicaKey> votedKey;

private Optional<LogOffsetMetadata> highWatermark = Optional.empty();
private Map<Integer, ReplicaState> voterStates = new HashMap<>();
Expand Down Expand Up @@ -116,6 +117,7 @@ protected LeaderState(
VoterSet voterSetAtEpochStart,
OptionalLong offsetOfVotersAtEpochStart,
KRaftVersion kraftVersionAtEpochStart,
Optional<ReplicaKey> votedKey,
Set<Integer> grantingVoters,
BatchAccumulator<T> accumulator,
int fetchTimeoutMs,
Expand Down Expand Up @@ -158,6 +160,7 @@ protected LeaderState(
this.voterSetAtEpochStart = voterSetAtEpochStart;
this.offsetOfVotersAtEpochStart = offsetOfVotersAtEpochStart;
this.kraftVersionAtEpochStart = kraftVersionAtEpochStart;
this.votedKey = Objects.requireNonNull(votedKey, "votedKey must be non-null");

kafkaRaftMetrics.addLeaderMetrics();
this.kafkaRaftMetrics = kafkaRaftMetrics;
Expand Down Expand Up @@ -687,7 +690,7 @@ public Optional<LogOffsetMetadata> highWatermark() {

@Override
public ElectionState election() {
return ElectionState.withElectedLeader(epoch, localVoterNode.voterKey().id(), Optional.empty(), voterStates.keySet());
return ElectionState.withElectedLeader(epoch, localVoterNode.voterKey().id(), votedKey, voterStates.keySet());
}

@Override
Expand Down
3 changes: 3 additions & 0 deletions raft/src/main/java/org/apache/kafka/raft/QuorumState.java
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ public void initialize(OffsetAndEpoch logEndOffsetAndEpoch) throws IllegalStateE
time,
localId.getAsInt(),
election.epoch(),
election.optionalVotedKey(),
partitionState.lastVoterSet().voterIds(),
randomElectionTimeoutMs(),
List.of(),
Expand Down Expand Up @@ -360,6 +361,7 @@ public void transitionToResigned(List<ReplicaKey> preferredSuccessors) {
time,
localIdOrThrow(),
epoch,
votedKey(),
partitionState.lastVoterSet().voterIds(),
randomElectionTimeoutMs(),
preferredSuccessors,
Expand Down Expand Up @@ -715,6 +717,7 @@ public <T> LeaderState<T> transitionToLeader(long epochStartOffset, BatchAccumul
partitionState.lastVoterSet(),
partitionState.lastVoterSetOffset(),
partitionState.lastKraftVersion(),
candidateState.election().optionalVotedKey(),
candidateState.epochElection().grantingVoters(),
accumulator,
fetchTimeoutMs,
Expand Down
7 changes: 6 additions & 1 deletion raft/src/main/java/org/apache/kafka/raft/ResignedState.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;

Expand All @@ -44,6 +45,7 @@
public class ResignedState implements EpochState {
private final int localId;
private final int epoch;
private final Optional<ReplicaKey> votedKey;
private final Endpoints endpoints;
private final Set<Integer> voters;
private final long electionTimeoutMs;
Expand All @@ -56,6 +58,7 @@ public ResignedState(
Time time,
int localId,
int epoch,
Optional<ReplicaKey> votedKey,
Set<Integer> voters,
long electionTimeoutMs,
List<ReplicaKey> preferredSuccessors,
Expand All @@ -64,6 +67,7 @@ public ResignedState(
) {
this.localId = localId;
this.epoch = epoch;
this.votedKey = Objects.requireNonNull(votedKey, "votedKey must be non-null");
this.voters = voters;
this.unackedVoters = new HashSet<>(voters);
this.unackedVoters.remove(localId);
Expand All @@ -76,7 +80,7 @@ public ResignedState(

@Override
public ElectionState election() {
return ElectionState.withElectedLeader(epoch, localId, Optional.empty(), voters);
return ElectionState.withElectedLeader(epoch, localId, votedKey, voters);
}

@Override
Expand Down Expand Up @@ -167,6 +171,7 @@ public String toString() {
return "ResignedState(" +
"localId=" + localId +
", epoch=" + epoch +
", votedKey=" + votedKey +
", voters=" + voters +
", electionTimeoutMs=" + electionTimeoutMs +
", unackedVoters=" + unackedVoters +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,7 @@ public void testLeaderRejectPreVoteRequestOnSameEpoch(KRaftVersion kraftVersion)
context.client.poll();

context.assertSentVoteResponse(Errors.NONE, leaderEpoch, OptionalInt.of(localId), false);
context.assertElectedLeader(leaderEpoch, localId);
context.assertElectedLeaderWithLocalVotedKey(leaderEpoch);
}

@ParameterizedTest
Expand Down
32 changes: 16 additions & 16 deletions raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public void testInitializeSingleMemberQuorum(boolean withKip853Rpc) throws IOExc
RaftClientTestContext context = new RaftClientTestContext.Builder(localId, Set.of(localId))
.withKip853Rpc(withKip853Rpc)
.build();
context.assertElectedLeader(1, localId);
context.assertElectedLeaderWithLocalVotedKey(1);
assertEquals(context.log.endOffset().offset(), context.client.logEndOffset());
}

Expand All @@ -114,7 +114,7 @@ public void testInitializeAsLeaderFromStateStoreSingleMemberQuorum(boolean withK
assertEquals(initialEpoch + 1, context.log.lastFetchedEpoch());
assertEquals(new LeaderAndEpoch(OptionalInt.of(localId), initialEpoch + 1),
context.currentLeaderAndEpoch());
context.assertElectedLeader(initialEpoch + 1, localId);
context.assertElectedLeaderWithLocalVotedKey(initialEpoch + 1);
}

@ParameterizedTest
Expand Down Expand Up @@ -540,7 +540,7 @@ public void testResignWillCompleteFetchPurgatory(boolean withKip853Rpc) throws E
context.client.shutdown(1000);
context.client.poll();
context.assertSentFetchPartitionResponse(Errors.NOT_LEADER_OR_FOLLOWER, epoch, OptionalInt.of(localId));
context.assertResignedLeader(epoch, localId);
context.assertResignedLeaderWithLocalVotedKey(epoch);

// shutting down finished
context.time.sleep(1000);
Expand Down Expand Up @@ -570,7 +570,7 @@ public void testResignInOlderEpochIgnored(boolean withKip853Rpc) throws Exceptio
// Ensure we are still leader even after expiration of the election timeout.
context.time.sleep(context.electionTimeoutMs() * 2L);
context.client.poll();
context.assertElectedLeader(currentEpoch, localId);
context.assertElectedLeaderWithLocalVotedKey(currentEpoch);
}

@ParameterizedTest
Expand Down Expand Up @@ -738,7 +738,7 @@ public void testLeaderShouldResignLeadershipIfNotGetFetchRequestFromMajorityVote

// The leadership should get resigned now
assertTrue(context.client.quorum().isResigned());
context.assertResignedLeader(epoch, localId);
context.assertResignedLeaderWithLocalVotedKey(epoch);
}

@ParameterizedTest
Expand Down Expand Up @@ -947,7 +947,7 @@ public void testInitializeAsUnattachedAndBecomeLeader(boolean withKip853Rpc) thr

// Become leader after receiving the vote
context.pollUntil(() -> context.log.endOffset().offset() == 1L);
context.assertElectedLeader(1, localId);
context.assertElectedLeaderWithLocalVotedKey(1);
long electionTimestamp = context.time.milliseconds();

// Leader change record appended
Expand Down Expand Up @@ -999,7 +999,7 @@ public void testInitializeAsCandidateAndBecomeLeaderQuorumOfThree(boolean withKi

// Become leader after receiving the vote
context.pollUntil(() -> context.log.endOffset().offset() == 1L);
context.assertElectedLeader(2, localId);
context.assertElectedLeaderWithLocalVotedKey(2);
long electionTimestamp = context.time.milliseconds();

// Leader change record appended
Expand Down Expand Up @@ -1031,7 +1031,7 @@ public void testInitializeAsOnlyVoterWithEmptyElectionState(boolean withKip853Rp
RaftClientTestContext context = new RaftClientTestContext.Builder(localId, Set.of(localId))
.withKip853Rpc(withKip853Rpc)
.build();
context.assertElectedLeader(1, localId);
context.assertElectedLeaderWithLocalVotedKey(1);
assertEquals(0L, context.log.endOffset().offset());
assertTrue(context.client.quorum().isLeader());
}
Expand All @@ -1043,7 +1043,7 @@ public void testInitializeAsFollowerAndOnlyVoter() throws Exception {
.withRaftProtocol(KIP_853_PROTOCOL)
.withElectedLeader(2, localId + 1)
.build();
context.assertElectedLeader(3, localId);
context.assertElectedLeaderWithLocalVotedKey(3);
assertEquals(0L, context.log.endOffset().offset());
assertTrue(context.client.quorum().isLeader());
}
Expand All @@ -1055,7 +1055,7 @@ public void testInitializeAsCandidateAndOnlyVoter() throws Exception {
.withRaftProtocol(KIP_853_PROTOCOL)
.withVotedCandidate(2, ReplicaKey.of(localId, ReplicaKey.NO_DIRECTORY_ID))
.build();
context.assertElectedLeader(2, localId);
context.assertElectedLeaderWithLocalVotedKey(2);
assertTrue(context.client.quorum().isLeader());
}

Expand All @@ -1066,7 +1066,7 @@ public void testInitializeAsResignedAndOnlyVoter() throws Exception {
.withRaftProtocol(KIP_853_PROTOCOL)
.withElectedLeader(2, localId)
.build();
context.assertElectedLeader(3, localId);
context.assertElectedLeaderWithLocalVotedKey(3);
assertTrue(context.client.quorum().isLeader());
}

Expand Down Expand Up @@ -1219,7 +1219,7 @@ public void testEndQuorumIgnoredAsLeaderIfOlderEpoch(boolean withKip853Rpc) thro
// Replica should still be leader as long as fetch timeout has not expired
context.time.sleep(context.fetchTimeoutMs - 1);
context.client.poll();
context.assertElectedLeader(epoch, localId);
context.assertElectedLeaderWithLocalVotedKey(epoch);
}

@ParameterizedTest
Expand Down Expand Up @@ -1538,7 +1538,7 @@ public void testVoteRequestTimeout(boolean withKip853Rpc) throws Exception {
context.voteResponse(true, OptionalInt.empty(), 1)
);
context.client.poll();
context.assertElectedLeader(epoch, localId);
context.assertElectedLeaderWithLocalVotedKey(epoch);
}

@ParameterizedTest
Expand Down Expand Up @@ -1721,7 +1721,7 @@ public void testLeaderIgnoreVoteRequestOnSameEpoch(boolean withKip853Rpc) throws
context.client.poll();

context.assertSentVoteResponse(Errors.NONE, leaderEpoch, OptionalInt.of(localId), false);
context.assertElectedLeader(leaderEpoch, localId);
context.assertElectedLeaderWithLocalVotedKey(leaderEpoch);
}

@ParameterizedTest
Expand Down Expand Up @@ -3797,7 +3797,7 @@ public void testGracefulShutdownSingleMemberQuorum(boolean withKip853Rpc) throws
.withKip853Rpc(withKip853Rpc)
.build();

context.assertElectedLeader(1, localId);
context.assertElectedLeaderWithLocalVotedKey(1);
context.client.poll();
assertEquals(0, context.channel.drainSendQueue().size());
int shutdownTimeoutMs = 5000;
Expand Down Expand Up @@ -3984,7 +3984,7 @@ public void testLeaderAppendSingleMemberQuorum(boolean withKip853Rpc) throws Exc
long now = context.time.milliseconds();

context.pollUntil(() -> context.log.endOffset().offset() == 1L);
context.assertElectedLeader(1, localId);
context.assertElectedLeaderWithLocalVotedKey(1);

// We still write the leader change message
assertEquals(OptionalLong.of(1L), context.client.highWatermark());
Expand Down
2 changes: 2 additions & 0 deletions raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ private LeaderState<?> newLeaderState(
voters,
OptionalLong.of(0L),
kraftVersion,
Optional.empty(),
voters.voterIds(),
accumulator,
fetchTimeoutMs,
Expand Down Expand Up @@ -134,6 +135,7 @@ public void testRequireNonNullAccumulator() {
voterSet,
OptionalLong.of(0),
KRaftVersion.KRAFT_VERSION_1,
Optional.empty(),
Set.of(),
null,
fetchTimeoutMs,
Expand Down
56 changes: 50 additions & 6 deletions raft/src/test/java/org/apache/kafka/raft/QuorumStateTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -550,8 +550,25 @@ public void testInitializeAsResignedLeader(KRaftVersion kraftVersion) {
int node2 = 2;
int epoch = 5;
VoterSet voters = localWithRemoteVoterSet(IntStream.of(node1, node2), kraftVersion);
ElectionState election = ElectionState.withElectedLeader(epoch, localId, Optional.empty(), voters.voterIds());
ElectionState election = ElectionState.withElectedLeader(
epoch,
localId,
Optional.of(localVoterKey),
voters.voterIds()
);
store.writeElectionState(election, kraftVersion);
ElectionState persistedElection = ElectionState.withElectedLeader(
epoch,
localId,
Optional.of(persistedVotedKey(localVoterKey, kraftVersion)),
persistedVoters(voters.voterIds(), kraftVersion)
);
ElectionState resignedElection = ElectionState.withElectedLeader(
epoch,
localId,
Optional.of(persistedVotedKey(localVoterKey, kraftVersion)),
voters.voterIds()
);

// If we were previously a leader, we will start as resigned in order to ensure
// a new leader gets elected. This ensures that records are always uniquely
Expand All @@ -568,7 +585,8 @@ public void testInitializeAsResignedLeader(KRaftVersion kraftVersion) {

ResignedState resignedState = state.resignedStateOrThrow();
assertEquals(epoch, resignedState.epoch());
assertEquals(election, resignedState.election());
assertEquals(resignedElection, resignedState.election());
assertEquals(persistedElection, store.readElectionState().get());
assertEquals(Set.of(node1, node2), resignedState.unackedVoters());
assertEquals(
electionTimeoutMs + jitterMs,
Expand Down Expand Up @@ -2240,6 +2258,15 @@ public void testCandidateToLeader(KRaftVersion kraftVersion) {
assertTrue(state.isLeader());
assertEquals(1, leaderState.epoch());
assertEquals(Optional.empty(), leaderState.highWatermark());
assertEquals(
ElectionState.withElectedLeader(
1,
localId,
Optional.of(persistedVotedKey(localVoterKey, kraftVersion)),
persistedVoters(voters.voterIds(), kraftVersion)
),
store.readElectionState().get()
);
}

@ParameterizedTest
Expand Down Expand Up @@ -2463,7 +2490,7 @@ public void testLeaderToResigned(KRaftVersion kraftVersion) {
assertTrue(state.isResigned());
ResignedState resignedState = state.resignedStateOrThrow();
assertEquals(
ElectionState.withElectedLeader(1, localId, Optional.empty(), voters.voterIds()),
ElectionState.withElectedLeader(1, localId, Optional.of(localVoterKey), voters.voterIds()),
resignedState.election()
);
assertEquals(1, resignedState.epoch());
Expand Down Expand Up @@ -2496,7 +2523,7 @@ public void testLeaderToAnyStateLowerEpoch(KRaftVersion kraftVersion) {
ElectionState.withElectedLeader(
6,
localId,
Optional.empty(),
Optional.of(persistedVotedKey(localVoterKey, kraftVersion)),
persistedVoters(voters.voterIds(), kraftVersion)
),
store.readElectionState().get()
Expand All @@ -2514,14 +2541,31 @@ public void testResignedToFollowerInSameEpoch(KRaftVersion kraftVersion) {
int node2 = 2;
int epoch = 5;
VoterSet voters = localWithRemoteVoterSet(IntStream.of(node1, node2), kraftVersion);
ElectionState election = ElectionState.withElectedLeader(epoch, localId, Optional.empty(), voters.voterIds());
ElectionState election = ElectionState.withElectedLeader(
epoch,
localId,
Optional.of(localVoterKey),
voters.voterIds()
);
store.writeElectionState(election, kraftVersion);
QuorumState state = buildQuorumState(OptionalInt.of(localId), voters, kraftVersion);
state.initialize(new OffsetAndEpoch(0L, logEndEpoch));
assertTrue(state.isResigned());
assertThrows(IllegalStateException.class, () -> state.transitionToFollower(epoch, localId, voters.listeners(localId)));
assertThrows(
IllegalStateException.class,
() -> state.transitionToFollower(epoch, localId, voters.listeners(localId))
);
// KAFKA-18379 will fix this
state.transitionToFollower(epoch, node1, voters.listeners(node1));
assertEquals(
ElectionState.withElectedLeader(
epoch,
node1,
Optional.of(persistedVotedKey(localVoterKey, kraftVersion)),
persistedVoters(voters.voterIds(), kraftVersion)
),
store.readElectionState().get()
);
}

@ParameterizedTest
Expand Down
Loading