Skip to content

Commit c1b91c4

Browse files
committed
add state notification support in raft core
1 parent f9bc358 commit c1b91c4

File tree

2 files changed

+29
-3
lines changed

2 files changed

+29
-3
lines changed

raft/src/main/scala/zio/raft/Raft.scala

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ class Raft[S, A <: Command](
2828
peers: Peers,
2929
private[raft] val raftState: Ref[State[S]],
3030
commandsQueue: Queue[StreamItem[A, S]],
31+
stateNotificationsQueue: Queue[StateNotification],
3132
stable: Stable,
3233
private[raft] val logStore: LogStore[A],
3334
snapshotStore: SnapshotStore,
@@ -39,6 +40,9 @@ class Raft[S, A <: Command](
3940
val batchSize = 100
4041
val numberOfServers = peers.size + 1
4142

43+
def stateNotifications: ZStream[Any, Nothing, StateNotification] =
44+
ZStream.fromQueue(stateNotificationsQueue)
45+
4246
private def stepDown(newTerm: Term, leaderId: Option[MemberId]) =
4347
for
4448
_ <- stable.newTerm(newTerm, None)
@@ -55,6 +59,15 @@ class Raft[S, A <: Command](
5559
_ <- ZIO.logDebug(
5660
s"memberId=${this.memberId} Following $leaderId $currentTerm"
5761
)
62+
63+
_ <- (currentState, leaderId) match
64+
case (l: Leader[S], _) =>
65+
stateNotificationsQueue.offer(StateNotification.SteppedDown(leaderId))
66+
case (f: State.Follower[S], Some(leaderId)) if f.leaderId != Some(leaderId) =>
67+
stateNotificationsQueue.offer(StateNotification.LeaderChanged(leaderId))
68+
case (c: State.Candidate[S], Some(leaderId)) =>
69+
stateNotificationsQueue.offer(StateNotification.LeaderChanged(leaderId))
70+
case _ => ZIO.unit
5871
yield newTerm
5972

6073
private def convertToFollower(leaderId: MemberId) =
@@ -66,9 +79,14 @@ class Raft[S, A <: Command](
6679

6780
_ <- s match
6881
case s: State.Follower[S] if s.leaderId != Some(leaderId) =>
69-
ZIO.logDebug(
70-
s"memberId=${this.memberId} Following $leaderId $currentTerm"
71-
)
82+
for
83+
_ <- ZIO.logDebug(
84+
s"memberId=${this.memberId} Following $leaderId $currentTerm"
85+
)
86+
87+
// Leader changed, so let's notify
88+
_ <- stateNotificationsQueue.offer(StateNotification.LeaderChanged(leaderId))
89+
yield ()
7290
case s: State.Follower[S] => ZIO.unit
7391
case s =>
7492
ZIO.logDebug(
@@ -1046,12 +1064,14 @@ object Raft:
10461064
State.Follower[S](previousIndex, previousIndex, electionTimeout, None)
10471065
)
10481066
commandsQueue <- Queue.unbounded[StreamItem[A, S]] // TODO: should this be bounded for back-pressure?
1067+
stateNotificationsQueue <- Queue.unbounded[StateNotification]
10491068

10501069
raft = new Raft[S, A](
10511070
memberId,
10521071
peers,
10531072
state,
10541073
commandsQueue,
1074+
stateNotificationsQueue,
10551075
stable,
10561076
logStore,
10571077
snapshotStore,

raft/src/main/scala/zio/raft/Types.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,3 +126,9 @@ object InstallSnapshotResult:
126126
done: Boolean
127127
) extends InstallSnapshotResult[A]
128128
case class Failure[A <: Command](from: MemberId, term: Term, index: Index) extends InstallSnapshotResult[A]
129+
130+
sealed trait StateNotification
131+
object StateNotification:
132+
case object SteppedUp extends StateNotification
133+
case class SteppedDown(leaderId: Option[MemberId]) extends StateNotification
134+
case class LeaderChanged(leaderId: MemberId) extends StateNotification

0 commit comments

Comments
 (0)