diff --git a/raft/src/main/scala/zio/raft/Raft.scala b/raft/src/main/scala/zio/raft/Raft.scala index 9082e25c..9619c87a 100644 --- a/raft/src/main/scala/zio/raft/Raft.scala +++ b/raft/src/main/scala/zio/raft/Raft.scala @@ -28,6 +28,7 @@ class Raft[S, A <: Command]( peers: Peers, private[raft] val raftState: Ref[State[S]], commandsQueue: Queue[StreamItem[A, S]], + stateNotificationsQueue: Queue[StateNotification], stable: Stable, private[raft] val logStore: LogStore[A], snapshotStore: SnapshotStore, @@ -39,6 +40,9 @@ class Raft[S, A <: Command]( val batchSize = 100 val numberOfServers = peers.size + 1 + def stateNotifications: ZStream[Any, Nothing, StateNotification] = + ZStream.fromQueue(stateNotificationsQueue) + private def stepDown(newTerm: Term, leaderId: Option[MemberId]) = for _ <- stable.newTerm(newTerm, None) @@ -55,6 +59,15 @@ class Raft[S, A <: Command]( _ <- ZIO.logDebug( s"memberId=${this.memberId} Following $leaderId $currentTerm" ) + + _ <- (currentState, leaderId) match + case (_: Leader[S], _) => + stateNotificationsQueue.offer(StateNotification.SteppedDown(leaderId)) + case (f: State.Follower[S], Some(leaderId)) if f.leaderId != Some(leaderId) => + stateNotificationsQueue.offer(StateNotification.LeaderChanged(leaderId)) + case (_: State.Candidate[S], Some(leaderId)) => + stateNotificationsQueue.offer(StateNotification.LeaderChanged(leaderId)) + case _ => ZIO.unit yield newTerm private def convertToFollower(leaderId: MemberId) = @@ -66,9 +79,14 @@ class Raft[S, A <: Command]( _ <- s match case s: State.Follower[S] if s.leaderId != Some(leaderId) => - ZIO.logDebug( - s"memberId=${this.memberId} Following $leaderId $currentTerm" - ) + for + _ <- ZIO.logDebug( + s"memberId=${this.memberId} Following $leaderId $currentTerm" + ) + + // Leader changed, so let's notify + _ <- stateNotificationsQueue.offer(StateNotification.LeaderChanged(leaderId)) + yield () case s: State.Follower[S] => ZIO.unit case s => ZIO.logDebug( @@ -514,6 +532,8 @@ class Raft[S, A <: Command]( PendingCommands.empty ) ) + + _ <- stateNotificationsQueue.offer(StateNotification.SteppedUp) yield () for @@ -1046,12 +1066,14 @@ object Raft: State.Follower[S](previousIndex, previousIndex, electionTimeout, None) ) commandsQueue <- Queue.unbounded[StreamItem[A, S]] // TODO: should this be bounded for back-pressure? + stateNotificationsQueue <- Queue.unbounded[StateNotification] raft = new Raft[S, A]( memberId, peers, state, commandsQueue, + stateNotificationsQueue, stable, logStore, snapshotStore, diff --git a/raft/src/main/scala/zio/raft/Types.scala b/raft/src/main/scala/zio/raft/Types.scala index 64d7451e..10fe295e 100644 --- a/raft/src/main/scala/zio/raft/Types.scala +++ b/raft/src/main/scala/zio/raft/Types.scala @@ -126,3 +126,9 @@ object InstallSnapshotResult: done: Boolean ) extends InstallSnapshotResult[A] case class Failure[A <: Command](from: MemberId, term: Term, index: Index) extends InstallSnapshotResult[A] + +sealed trait StateNotification +object StateNotification: + case object SteppedUp extends StateNotification + case class SteppedDown(leaderId: Option[MemberId]) extends StateNotification + case class LeaderChanged(leaderId: MemberId) extends StateNotification