diff --git a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java index e05007e6986c2..dc8df7d8dc6cd 100644 --- a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java +++ b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java @@ -72,6 +72,7 @@ public class LeaderState implements EpochState { // This field is non-empty if the voter set at epoch start came from a snapshot or log segment private final OptionalLong offsetOfVotersAtEpochStart; private final KRaftVersion kraftVersionAtEpochStart; + private final Optional votedKey; private Optional highWatermark = Optional.empty(); private Map voterStates = new HashMap<>(); @@ -116,6 +117,7 @@ protected LeaderState( VoterSet voterSetAtEpochStart, OptionalLong offsetOfVotersAtEpochStart, KRaftVersion kraftVersionAtEpochStart, + Optional votedKey, Set grantingVoters, BatchAccumulator accumulator, int fetchTimeoutMs, @@ -158,6 +160,7 @@ protected LeaderState( this.voterSetAtEpochStart = voterSetAtEpochStart; this.offsetOfVotersAtEpochStart = offsetOfVotersAtEpochStart; this.kraftVersionAtEpochStart = kraftVersionAtEpochStart; + this.votedKey = Objects.requireNonNull(votedKey, "votedKey must be non-null"); kafkaRaftMetrics.addLeaderMetrics(); this.kafkaRaftMetrics = kafkaRaftMetrics; @@ -687,7 +690,7 @@ public Optional highWatermark() { @Override public ElectionState election() { - return ElectionState.withElectedLeader(epoch, localVoterNode.voterKey().id(), Optional.empty(), voterStates.keySet()); + return ElectionState.withElectedLeader(epoch, localVoterNode.voterKey().id(), votedKey, voterStates.keySet()); } @Override diff --git a/raft/src/main/java/org/apache/kafka/raft/QuorumState.java b/raft/src/main/java/org/apache/kafka/raft/QuorumState.java index 7703a69d4d40b..57d40ad1504cb 100644 --- a/raft/src/main/java/org/apache/kafka/raft/QuorumState.java +++ b/raft/src/main/java/org/apache/kafka/raft/QuorumState.java @@ -181,6 +181,7 @@ public void initialize(OffsetAndEpoch logEndOffsetAndEpoch) throws IllegalStateE time, localId.getAsInt(), election.epoch(), + election.optionalVotedKey(), partitionState.lastVoterSet().voterIds(), randomElectionTimeoutMs(), List.of(), @@ -360,6 +361,7 @@ public void transitionToResigned(List preferredSuccessors) { time, localIdOrThrow(), epoch, + votedKey(), partitionState.lastVoterSet().voterIds(), randomElectionTimeoutMs(), preferredSuccessors, @@ -715,6 +717,7 @@ public LeaderState transitionToLeader(long epochStartOffset, BatchAccumul partitionState.lastVoterSet(), partitionState.lastVoterSetOffset(), partitionState.lastKraftVersion(), + candidateState.election().optionalVotedKey(), candidateState.epochElection().grantingVoters(), accumulator, fetchTimeoutMs, diff --git a/raft/src/main/java/org/apache/kafka/raft/ResignedState.java b/raft/src/main/java/org/apache/kafka/raft/ResignedState.java index 65d16c1f72bd9..fd69f2b8d1194 100644 --- a/raft/src/main/java/org/apache/kafka/raft/ResignedState.java +++ b/raft/src/main/java/org/apache/kafka/raft/ResignedState.java @@ -24,6 +24,7 @@ import java.util.HashSet; import java.util.List; +import java.util.Objects; import java.util.Optional; import java.util.Set; @@ -44,6 +45,7 @@ public class ResignedState implements EpochState { private final int localId; private final int epoch; + private final Optional votedKey; private final Endpoints endpoints; private final Set voters; private final long electionTimeoutMs; @@ -56,6 +58,7 @@ public ResignedState( Time time, int localId, int epoch, + Optional votedKey, Set voters, long electionTimeoutMs, List preferredSuccessors, @@ -64,6 +67,7 @@ public ResignedState( ) { this.localId = localId; this.epoch = epoch; + this.votedKey = Objects.requireNonNull(votedKey, "votedKey must be non-null"); this.voters = voters; this.unackedVoters = new HashSet<>(voters); this.unackedVoters.remove(localId); @@ -76,7 +80,7 @@ public ResignedState( @Override public ElectionState election() { - return ElectionState.withElectedLeader(epoch, localId, Optional.empty(), voters); + return ElectionState.withElectedLeader(epoch, localId, votedKey, voters); } @Override @@ -167,6 +171,7 @@ public String toString() { return "ResignedState(" + "localId=" + localId + ", epoch=" + epoch + + ", votedKey=" + votedKey + ", voters=" + voters + ", electionTimeoutMs=" + electionTimeoutMs + ", unackedVoters=" + unackedVoters + diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientPreVoteTest.java b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientPreVoteTest.java index 1a4ca403db9c7..a6fc6416cbd78 100644 --- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientPreVoteTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientPreVoteTest.java @@ -415,7 +415,7 @@ public void testLeaderRejectPreVoteRequestOnSameEpoch(KRaftVersion kraftVersion) context.client.poll(); context.assertSentVoteResponse(Errors.NONE, leaderEpoch, OptionalInt.of(localId), false); - context.assertElectedLeader(leaderEpoch, localId); + context.assertElectedLeaderWithLocalVotedKey(leaderEpoch); } @ParameterizedTest diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java index e4d879c64edf3..87a3ff8e4ce25 100644 --- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java @@ -93,7 +93,7 @@ public void testInitializeSingleMemberQuorum(boolean withKip853Rpc) throws IOExc RaftClientTestContext context = new RaftClientTestContext.Builder(localId, Set.of(localId)) .withKip853Rpc(withKip853Rpc) .build(); - context.assertElectedLeader(1, localId); + context.assertElectedLeaderWithLocalVotedKey(1); assertEquals(context.log.endOffset().offset(), context.client.logEndOffset()); } @@ -114,7 +114,7 @@ public void testInitializeAsLeaderFromStateStoreSingleMemberQuorum(boolean withK assertEquals(initialEpoch + 1, context.log.lastFetchedEpoch()); assertEquals(new LeaderAndEpoch(OptionalInt.of(localId), initialEpoch + 1), context.currentLeaderAndEpoch()); - context.assertElectedLeader(initialEpoch + 1, localId); + context.assertElectedLeaderWithLocalVotedKey(initialEpoch + 1); } @ParameterizedTest @@ -540,7 +540,7 @@ public void testResignWillCompleteFetchPurgatory(boolean withKip853Rpc) throws E context.client.shutdown(1000); context.client.poll(); context.assertSentFetchPartitionResponse(Errors.NOT_LEADER_OR_FOLLOWER, epoch, OptionalInt.of(localId)); - context.assertResignedLeader(epoch, localId); + context.assertResignedLeaderWithLocalVotedKey(epoch); // shutting down finished context.time.sleep(1000); @@ -570,7 +570,7 @@ public void testResignInOlderEpochIgnored(boolean withKip853Rpc) throws Exceptio // Ensure we are still leader even after expiration of the election timeout. context.time.sleep(context.electionTimeoutMs() * 2L); context.client.poll(); - context.assertElectedLeader(currentEpoch, localId); + context.assertElectedLeaderWithLocalVotedKey(currentEpoch); } @ParameterizedTest @@ -738,7 +738,7 @@ public void testLeaderShouldResignLeadershipIfNotGetFetchRequestFromMajorityVote // The leadership should get resigned now assertTrue(context.client.quorum().isResigned()); - context.assertResignedLeader(epoch, localId); + context.assertResignedLeaderWithLocalVotedKey(epoch); } @ParameterizedTest @@ -947,7 +947,7 @@ public void testInitializeAsUnattachedAndBecomeLeader(boolean withKip853Rpc) thr // Become leader after receiving the vote context.pollUntil(() -> context.log.endOffset().offset() == 1L); - context.assertElectedLeader(1, localId); + context.assertElectedLeaderWithLocalVotedKey(1); long electionTimestamp = context.time.milliseconds(); // Leader change record appended @@ -999,7 +999,7 @@ public void testInitializeAsCandidateAndBecomeLeaderQuorumOfThree(boolean withKi // Become leader after receiving the vote context.pollUntil(() -> context.log.endOffset().offset() == 1L); - context.assertElectedLeader(2, localId); + context.assertElectedLeaderWithLocalVotedKey(2); long electionTimestamp = context.time.milliseconds(); // Leader change record appended @@ -1031,7 +1031,7 @@ public void testInitializeAsOnlyVoterWithEmptyElectionState(boolean withKip853Rp RaftClientTestContext context = new RaftClientTestContext.Builder(localId, Set.of(localId)) .withKip853Rpc(withKip853Rpc) .build(); - context.assertElectedLeader(1, localId); + context.assertElectedLeaderWithLocalVotedKey(1); assertEquals(0L, context.log.endOffset().offset()); assertTrue(context.client.quorum().isLeader()); } @@ -1043,7 +1043,7 @@ public void testInitializeAsFollowerAndOnlyVoter() throws Exception { .withRaftProtocol(KIP_853_PROTOCOL) .withElectedLeader(2, localId + 1) .build(); - context.assertElectedLeader(3, localId); + context.assertElectedLeaderWithLocalVotedKey(3); assertEquals(0L, context.log.endOffset().offset()); assertTrue(context.client.quorum().isLeader()); } @@ -1055,7 +1055,7 @@ public void testInitializeAsCandidateAndOnlyVoter() throws Exception { .withRaftProtocol(KIP_853_PROTOCOL) .withVotedCandidate(2, ReplicaKey.of(localId, ReplicaKey.NO_DIRECTORY_ID)) .build(); - context.assertElectedLeader(2, localId); + context.assertElectedLeaderWithLocalVotedKey(2); assertTrue(context.client.quorum().isLeader()); } @@ -1066,7 +1066,7 @@ public void testInitializeAsResignedAndOnlyVoter() throws Exception { .withRaftProtocol(KIP_853_PROTOCOL) .withElectedLeader(2, localId) .build(); - context.assertElectedLeader(3, localId); + context.assertElectedLeaderWithLocalVotedKey(3); assertTrue(context.client.quorum().isLeader()); } @@ -1219,7 +1219,7 @@ public void testEndQuorumIgnoredAsLeaderIfOlderEpoch(boolean withKip853Rpc) thro // Replica should still be leader as long as fetch timeout has not expired context.time.sleep(context.fetchTimeoutMs - 1); context.client.poll(); - context.assertElectedLeader(epoch, localId); + context.assertElectedLeaderWithLocalVotedKey(epoch); } @ParameterizedTest @@ -1538,7 +1538,7 @@ public void testVoteRequestTimeout(boolean withKip853Rpc) throws Exception { context.voteResponse(true, OptionalInt.empty(), 1) ); context.client.poll(); - context.assertElectedLeader(epoch, localId); + context.assertElectedLeaderWithLocalVotedKey(epoch); } @ParameterizedTest @@ -1721,7 +1721,7 @@ public void testLeaderIgnoreVoteRequestOnSameEpoch(boolean withKip853Rpc) throws context.client.poll(); context.assertSentVoteResponse(Errors.NONE, leaderEpoch, OptionalInt.of(localId), false); - context.assertElectedLeader(leaderEpoch, localId); + context.assertElectedLeaderWithLocalVotedKey(leaderEpoch); } @ParameterizedTest @@ -3797,7 +3797,7 @@ public void testGracefulShutdownSingleMemberQuorum(boolean withKip853Rpc) throws .withKip853Rpc(withKip853Rpc) .build(); - context.assertElectedLeader(1, localId); + context.assertElectedLeaderWithLocalVotedKey(1); context.client.poll(); assertEquals(0, context.channel.drainSendQueue().size()); int shutdownTimeoutMs = 5000; @@ -3984,7 +3984,7 @@ public void testLeaderAppendSingleMemberQuorum(boolean withKip853Rpc) throws Exc long now = context.time.milliseconds(); context.pollUntil(() -> context.log.endOffset().offset() == 1L); - context.assertElectedLeader(1, localId); + context.assertElectedLeaderWithLocalVotedKey(1); // We still write the leader change message assertEquals(OptionalLong.of(1L), context.client.highWatermark()); diff --git a/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java b/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java index 1ed1dd972d1e2..09a486669c5b9 100644 --- a/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java @@ -83,6 +83,7 @@ private LeaderState newLeaderState( voters, OptionalLong.of(0L), kraftVersion, + Optional.empty(), voters.voterIds(), accumulator, fetchTimeoutMs, @@ -134,6 +135,7 @@ public void testRequireNonNullAccumulator() { voterSet, OptionalLong.of(0), KRaftVersion.KRAFT_VERSION_1, + Optional.empty(), Set.of(), null, fetchTimeoutMs, diff --git a/raft/src/test/java/org/apache/kafka/raft/QuorumStateTest.java b/raft/src/test/java/org/apache/kafka/raft/QuorumStateTest.java index 6c3418d64e8aa..b02a316df9c17 100644 --- a/raft/src/test/java/org/apache/kafka/raft/QuorumStateTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/QuorumStateTest.java @@ -550,8 +550,25 @@ public void testInitializeAsResignedLeader(KRaftVersion kraftVersion) { int node2 = 2; int epoch = 5; VoterSet voters = localWithRemoteVoterSet(IntStream.of(node1, node2), kraftVersion); - ElectionState election = ElectionState.withElectedLeader(epoch, localId, Optional.empty(), voters.voterIds()); + ElectionState election = ElectionState.withElectedLeader( + epoch, + localId, + Optional.of(localVoterKey), + voters.voterIds() + ); store.writeElectionState(election, kraftVersion); + ElectionState persistedElection = ElectionState.withElectedLeader( + epoch, + localId, + Optional.of(persistedVotedKey(localVoterKey, kraftVersion)), + persistedVoters(voters.voterIds(), kraftVersion) + ); + ElectionState resignedElection = ElectionState.withElectedLeader( + epoch, + localId, + Optional.of(persistedVotedKey(localVoterKey, kraftVersion)), + voters.voterIds() + ); // If we were previously a leader, we will start as resigned in order to ensure // a new leader gets elected. This ensures that records are always uniquely @@ -568,7 +585,8 @@ public void testInitializeAsResignedLeader(KRaftVersion kraftVersion) { ResignedState resignedState = state.resignedStateOrThrow(); assertEquals(epoch, resignedState.epoch()); - assertEquals(election, resignedState.election()); + assertEquals(resignedElection, resignedState.election()); + assertEquals(persistedElection, store.readElectionState().get()); assertEquals(Set.of(node1, node2), resignedState.unackedVoters()); assertEquals( electionTimeoutMs + jitterMs, @@ -2240,6 +2258,15 @@ public void testCandidateToLeader(KRaftVersion kraftVersion) { assertTrue(state.isLeader()); assertEquals(1, leaderState.epoch()); assertEquals(Optional.empty(), leaderState.highWatermark()); + assertEquals( + ElectionState.withElectedLeader( + 1, + localId, + Optional.of(persistedVotedKey(localVoterKey, kraftVersion)), + persistedVoters(voters.voterIds(), kraftVersion) + ), + store.readElectionState().get() + ); } @ParameterizedTest @@ -2463,7 +2490,7 @@ public void testLeaderToResigned(KRaftVersion kraftVersion) { assertTrue(state.isResigned()); ResignedState resignedState = state.resignedStateOrThrow(); assertEquals( - ElectionState.withElectedLeader(1, localId, Optional.empty(), voters.voterIds()), + ElectionState.withElectedLeader(1, localId, Optional.of(localVoterKey), voters.voterIds()), resignedState.election() ); assertEquals(1, resignedState.epoch()); @@ -2496,7 +2523,7 @@ public void testLeaderToAnyStateLowerEpoch(KRaftVersion kraftVersion) { ElectionState.withElectedLeader( 6, localId, - Optional.empty(), + Optional.of(persistedVotedKey(localVoterKey, kraftVersion)), persistedVoters(voters.voterIds(), kraftVersion) ), store.readElectionState().get() @@ -2514,14 +2541,31 @@ public void testResignedToFollowerInSameEpoch(KRaftVersion kraftVersion) { int node2 = 2; int epoch = 5; VoterSet voters = localWithRemoteVoterSet(IntStream.of(node1, node2), kraftVersion); - ElectionState election = ElectionState.withElectedLeader(epoch, localId, Optional.empty(), voters.voterIds()); + ElectionState election = ElectionState.withElectedLeader( + epoch, + localId, + Optional.of(localVoterKey), + voters.voterIds() + ); store.writeElectionState(election, kraftVersion); QuorumState state = buildQuorumState(OptionalInt.of(localId), voters, kraftVersion); state.initialize(new OffsetAndEpoch(0L, logEndEpoch)); assertTrue(state.isResigned()); - assertThrows(IllegalStateException.class, () -> state.transitionToFollower(epoch, localId, voters.listeners(localId))); + assertThrows( + IllegalStateException.class, + () -> state.transitionToFollower(epoch, localId, voters.listeners(localId)) + ); // KAFKA-18379 will fix this state.transitionToFollower(epoch, node1, voters.listeners(node1)); + assertEquals( + ElectionState.withElectedLeader( + epoch, + node1, + Optional.of(persistedVotedKey(localVoterKey, kraftVersion)), + persistedVoters(voters.voterIds(), kraftVersion) + ), + store.readElectionState().get() + ); } @ParameterizedTest diff --git a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java index 02f92500711ba..397ad346acbad 100644 --- a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java +++ b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java @@ -643,7 +643,7 @@ void expectAndGrantVotes(int epoch) throws Exception { } client.poll(); - assertElectedLeader(epoch, localIdOrThrow()); + assertElectedLeaderAndVotedKey(epoch, localIdOrThrow(), localVotedKey()); } void expectAndGrantPreVotes(int epoch) throws Exception { @@ -682,6 +682,10 @@ public ReplicaKey localReplicaKey() { ReplicaKey.of(localIdOrThrow(), ReplicaKey.NO_DIRECTORY_ID); } + private ReplicaKey localVotedKey() { + return ReplicaKey.of(localIdOrThrow(), localDirectoryId); + } + private void expectBeginEpoch(int epoch) throws Exception { pollUntilRequest(); for (RaftRequest.Outbound request : collectBeginEpochRequests(epoch)) { @@ -741,6 +745,23 @@ public void assertElectedLeaderAndVotedKey(int epoch, int leaderId, ReplicaKey c ); } + public void assertElectedLeaderWithLocalVotedKey(int epoch) { + assertElectedLeaderAndVotedKey(epoch, localIdOrThrow(), localVotedKey()); + } + + public void assertResignedLeaderWithLocalVotedKey(int epoch) { + assertTrue(client.quorum().isResigned()); + assertEquals( + ElectionState.withElectedLeader( + epoch, + localIdOrThrow(), + Optional.of(persistedVotedKey(localVotedKey(), kraftVersion)), + expectedVoters() + ), + quorumStateStore.readElectionState().get() + ); + } + private static ReplicaKey persistedVotedKey(ReplicaKey replicaKey, KRaftVersion kraftVersion) { if (kraftVersion.isReconfigSupported()) { return replicaKey; diff --git a/raft/src/test/java/org/apache/kafka/raft/ResignedStateTest.java b/raft/src/test/java/org/apache/kafka/raft/ResignedStateTest.java index 0564e35582e35..4d1893ac16c43 100644 --- a/raft/src/test/java/org/apache/kafka/raft/ResignedStateTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/ResignedStateTest.java @@ -50,10 +50,15 @@ class ResignedStateTest { ); private ResignedState newResignedState(Set voters) { + return newResignedState(Optional.empty(), voters); + } + + private ResignedState newResignedState(Optional votedKey, Set voters) { return new ResignedState( time, localId, epoch, + votedKey, voters, electionTimeoutMs, List.of(), @@ -86,6 +91,16 @@ public void testResignedState() { assertTrue(state.hasElectionTimeoutExpired(time.milliseconds())); } + @Test + public void testElectionPreservesVotedKey() { + ReplicaKey votedKey = ReplicaKey.of(localId, ReplicaKey.NO_DIRECTORY_ID); + Set voters = Set.of(localId, 1); + + ResignedState state = newResignedState(Optional.of(votedKey), voters); + + assertEquals(ElectionState.withElectedLeader(epoch, localId, Optional.of(votedKey), voters), state.election()); + } + @ParameterizedTest @ValueSource(booleans = {true, false}) public void testGrantVote(boolean isLogUpToDate) {