@@ -908,59 +908,12 @@ class Raft[S, A <: Command](
908908 res <- promiseArg.await
909909 yield (res)
910910
911- private def validateLeadership (leader : Leader [S ]): ZIO [Any , NotALeaderError , Unit ] =
912- for
913- now <- zio.Clock .instant
914- peersRequiringHeartbeat = peers.filter(peer => leader.heartbeatDue.due(now, peer))
915-
916- _ <- ZIO .logDebug(
917- s " memberId= ${this .memberId} validateLeadership: ${peersRequiringHeartbeat.length}/ ${peers.length} peers requiring heartbeat "
918- )
919-
920- // TODO (eran): TBD with Doron: as per Raft paper, we might want to always send heartbeat and not rely on the heartbeat due time
921-
922- // If majority of peers don't require heartbeat, leadership is likely valid
923- // peersRequiringHeartbeat.length < numberOfServers / 2 means:
924- // - peers not requiring heartbeat + self > numberOfServers / 2 (majority)
925- _ <- if peersRequiringHeartbeat.length < numberOfServers / 2 then
926- ZIO .logDebug(s " memberId= ${this .memberId} Leadership validation: recent heartbeats sufficient " )
927- *> ZIO .unit
928- else
929- // Need to send heartbeats to validate leadership
930- for
931- _ <- ZIO .logDebug(
932- s " memberId= ${this .memberId} Leadership validation: sending heartbeats to ${peersRequiringHeartbeat.mkString(" , " )}"
933- )
934-
935- // Reuse existing sendHeartbeatRule for each peer requiring heartbeat
936- _ <- ZIO .foreachDiscard(peersRequiringHeartbeat)(sendHeartbeatRule)
937-
938- // Wait briefly for responses to validate leadership
939- // If we don't receive majority responses within timeout, assume leadership lost
940- _ <- zio.Clock .sleep(rpcTimeout)
941-
942- // Check if we're still a leader after sending heartbeats
943- finalState <- raftState.get
944- _ <- finalState match
945- case _ : Leader [S ] =>
946- ZIO .logDebug(s " memberId= ${this .memberId} Leadership validation: successful " )
947- case f : Follower [S ] =>
948- ZIO .logDebug(s " memberId= ${this .memberId} Leadership validation: failed, now following ${f.leaderId}" )
949- *> ZIO .fail(NotALeaderError (f.leaderId))
950- case _ : Candidate [S ] =>
951- ZIO .logDebug(s " memberId= ${this .memberId} Leadership validation: failed, now candidate " )
952- *> ZIO .fail(NotALeaderError (None ))
953- yield ()
954- yield ()
955-
956911 def readState : ZIO [Any , NotALeaderError , S ] =
957912 for
958913 s <- raftState.get
959914 result <- s match
960915 case l : Leader [S ] =>
961916 for
962- _ <- validateLeadership(l)
963-
964917 promise <- Promise .make[NotALeaderError , S ]
965918 now <- zio.Clock .instant
966919 lastApplied <- logStore.lastIndex
0 commit comments