Skip to content

Commit 8be4979

Browse files
committed
convert StateMachine to ZPure
1 parent b0160bd commit 8be4979

File tree

6 files changed

+39
-36
lines changed

6 files changed

+39
-36
lines changed

kvstore/src/main/scala/zio/kvstore/App.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import scodec.bits.BitVector
1515
import scodec.codecs.{ascii, discriminated, fixedSizeBytes, utf8_32}
1616
import zio.raft.stores.FileStable
1717
import zio.raft.stores.segmentedlog.SegmentedLog
18+
import zio.prelude.fx.ZPure
1819

1920
sealed trait KVCommand extends Command
2021

@@ -34,7 +35,7 @@ object KVCommand:
3435
.typecase("S", setCodec)
3536
.typecase("G", getCodec)
3637

37-
class KVStateMachine extends StateMachine[Map[String, String], KVCommand]:
38+
class KVStateMachine extends StateMachine[Any, Map[String, String], KVCommand]:
3839

3940
override def emptyState: Map[String, String] = Map.empty
4041

@@ -52,13 +53,14 @@ class KVStateMachine extends StateMachine[Map[String, String], KVCommand]:
5253
override def shouldTakeSnapshot(lastSnaphotIndex: Index, lastSnapshotSize: Long, commitIndex: Index): Boolean = false
5354
// commitIndex.value - lastSnaphotIndex.value > 2
5455

55-
override def apply(command: KVCommand): State[Map[String, String], command.Response] =
56+
override def apply(command: KVCommand)
57+
: ZPure[Any, Map[String, String], Map[String, String], Any, Nothing, command.Response] =
5658
(command match
5759
case Set(k, v) => State.update((map: Map[String, String]) => map.updated(k, v))
5860
case Get(k) => State.get.map((map: Map[String, String]) => map.get(k).getOrElse(""))
5961
).map(_.asInstanceOf[command.Response])
6062

61-
class HttpServer(raft: Raft[Map[String, String], KVCommand]):
63+
class HttpServer(raft: Raft[Any, Map[String, String], KVCommand]):
6264

6365
val app = Routes(
6466
GET / "" -> handler(ZIO.succeed(Response.text("Hello World!"))),

raft/src/main/scala/zio/raft/Raft.scala

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ object StreamItem:
2323
val command: A
2424
val promise: CommandPromise[command.Response]
2525

26-
class Raft[S, A <: Command](
26+
class Raft[W, S, A <: Command](
2727
val memberId: MemberId,
2828
peers: Peers,
2929
private[raft] val raftState: Ref[State[S]],
@@ -32,7 +32,7 @@ class Raft[S, A <: Command](
3232
private[raft] val logStore: LogStore[A],
3333
snapshotStore: SnapshotStore,
3434
rpc: RPC[A],
35-
stateMachine: StateMachine[S, A],
35+
stateMachine: StateMachine[W, S, A],
3636
appStateRef: Ref[S]
3737
):
3838
val rpcTimeout = 50.millis
@@ -1010,14 +1010,14 @@ object Raft:
10101010
val electionTimeout = 150
10111011
val heartbeartInterval = (electionTimeout / 2).millis
10121012

1013-
def make[S, A <: Command](
1013+
def make[W, S, A <: Command](
10141014
memberId: MemberId,
10151015
peers: Peers,
10161016
stable: Stable,
10171017
logStore: LogStore[A],
10181018
snapshotStore: SnapshotStore,
10191019
rpc: RPC[A],
1020-
stateMachine: StateMachine[S, A]
1020+
stateMachine: StateMachine[W, S, A]
10211021
) =
10221022
for
10231023
now <- zio.Clock.instant
@@ -1047,7 +1047,7 @@ object Raft:
10471047
)
10481048
commandsQueue <- Queue.unbounded[StreamItem[A, S]] // TODO: should this be bounded for back-pressure?
10491049

1050-
raft = new Raft[S, A](
1050+
raft = new Raft[W, S, A](
10511051
memberId,
10521052
peers,
10531053
state,
@@ -1061,14 +1061,14 @@ object Raft:
10611061
)
10621062
yield raft
10631063

1064-
def makeScoped[S, A <: Command](
1064+
def makeScoped[W, S, A <: Command](
10651065
memberId: MemberId,
10661066
peers: Peers,
10671067
stable: Stable,
10681068
logStore: LogStore[A],
10691069
snapshotStore: SnapshotStore,
10701070
rpc: RPC[A],
1071-
stateMachine: StateMachine[S, A]
1071+
stateMachine: StateMachine[W, S, A]
10721072
) =
10731073
for
10741074
raft <- make(

raft/src/main/scala/zio/raft/StateMachine.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,13 @@ package zio.raft
22

33
import zio.UIO
44
import zio.stream.Stream
5-
import zio.prelude.State
5+
import zio.prelude.fx.ZPure
66

7-
trait StateMachine[S, A <: Command]:
7+
trait StateMachine[W, S, A <: Command]:
88

99
def emptyState: S
1010

11-
def apply(command: A): State[S, command.Response]
11+
def apply(command: A): ZPure[W, S, S, Any, Nothing, command.Response]
1212

1313
def takeSnapshot(state: S): Stream[Nothing, Byte]
1414

raft/src/test/scala/zio/raft/RaftIntegrationSpec.scala

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,10 @@ object RaftIntegrationSpec extends ZIOSpecDefault:
2727
}
2828

2929
private def findTheNewLeader(
30-
currentLeader: Raft[Int, TestCommands],
31-
raft1: Raft[Int, TestCommands],
32-
raft2: Raft[Int, TestCommands],
33-
raft3: Raft[Int, TestCommands]
30+
currentLeader: Raft[Any, Int, TestCommands],
31+
raft1: Raft[Any, Int, TestCommands],
32+
raft2: Raft[Any, Int, TestCommands],
33+
raft3: Raft[Any, Int, TestCommands]
3434
) =
3535
for
3636
r1IsLeader <- raft1.isTheLeader
@@ -43,10 +43,10 @@ object RaftIntegrationSpec extends ZIOSpecDefault:
4343
else None
4444

4545
private def waitForNewLeader(
46-
currentLeader: Raft[Int, TestCommands],
47-
raft1: Raft[Int, TestCommands],
48-
raft2: Raft[Int, TestCommands],
49-
raft3: Raft[Int, TestCommands]
46+
currentLeader: Raft[Any, Int, TestCommands],
47+
raft1: Raft[Any, Int, TestCommands],
48+
raft2: Raft[Any, Int, TestCommands],
49+
raft3: Raft[Any, Int, TestCommands]
5050
) =
5151
findTheNewLeader(currentLeader, raft1, raft2, raft3)
5252
.tap:
@@ -75,7 +75,7 @@ object RaftIntegrationSpec extends ZIOSpecDefault:
7575
snapshotStore1 <- SnapshotStore.makeInMemory
7676
snapshotStore2 <- SnapshotStore.makeInMemory
7777
snapshotStore3 <- SnapshotStore.makeInMemory
78-
raft1 <- Raft.makeScoped[Int, TestCommands](
78+
raft1 <- Raft.makeScoped[Any, Int, TestCommands](
7979
MemberId("peer1"),
8080
peers.filter(_ != MemberId("peer1")),
8181
stable1,

raft/src/test/scala/zio/raft/RaftSpec.scala

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ object RaftSpec extends ZIOSpecDefault:
1818
// )) ++ testEnvironment
1919

2020

21-
def makeRaft(memberId: MemberId, peers: Peers, enableSnapshot: Boolean): ZIO[Any, Nothing, (Raft[Int, TestCommands], MockRpc[TestCommands])] =
21+
def makeRaft(memberId: MemberId, peers: Peers, enableSnapshot: Boolean): ZIO[Any, Nothing, (Raft[Any, Int, TestCommands], MockRpc[TestCommands])] =
2222
(for
2323
stable <- Stable.makeInMemory
2424
logStore <- LogStore.makeInMemory[TestCommands]
@@ -37,20 +37,20 @@ object RaftSpec extends ZIOSpecDefault:
3737
)
3838
yield (raft, rpc))
3939

40-
def isCandidate(raft: Raft[Int, TestCommands]) =
40+
def isCandidate(raft: Raft[Any, Int, TestCommands]) =
4141
for s <- raft.raftState.get
4242
yield if s.isInstanceOf[State.Candidate[Int]] then true else false
4343

44-
def isFollower(raft: Raft[Int, TestCommands]) =
44+
def isFollower(raft: Raft[Any, Int, TestCommands]) =
4545
for s <- raft.raftState.get
4646
yield if s.isInstanceOf[State.Follower[Int]] then true else false
4747

48-
def expectFollower(raft: Raft[Int, TestCommands]) =
48+
def expectFollower(raft: Raft[Any, Int, TestCommands]) =
4949
raft.raftState.get.flatMap:
5050
case f: State.Follower[Int] => ZIO.succeed(f)
5151
case _ => ZIO.die(new Exception("Expected follower"))
5252

53-
def getLeader(raft: Raft[Int, TestCommands]) =
53+
def getLeader(raft: Raft[Any, Int, TestCommands]) =
5454
for s <- raft.raftState.get
5555
yield s match
5656
case State.Follower(commitIndex, lastApplied, electionTimeout, leaderId) =>
@@ -69,7 +69,7 @@ object RaftSpec extends ZIOSpecDefault:
6969
Some(raft.memberId)
7070

7171
def handleHeartbeat(
72-
raft: Raft[Int, TestCommands],
72+
raft: Raft[Any, Int, TestCommands],
7373
term: Term,
7474
leaderId: MemberId,
7575
commitIndex: Index
@@ -81,7 +81,7 @@ object RaftSpec extends ZIOSpecDefault:
8181
)
8282

8383
def handleVoteGranted(
84-
raft: Raft[Int, TestCommands],
84+
raft: Raft[Any, Int, TestCommands],
8585
term: Term,
8686
memberId: MemberId
8787
) =
@@ -92,7 +92,7 @@ object RaftSpec extends ZIOSpecDefault:
9292
)
9393

9494
def handelAppendEntries(
95-
raft: Raft[Int, TestCommands],
95+
raft: Raft[Any, Int, TestCommands],
9696
term: Term,
9797
leaderId: MemberId,
9898
previousIndex: Index,
@@ -113,13 +113,13 @@ object RaftSpec extends ZIOSpecDefault:
113113
)
114114
)
115115

116-
def handleBootstrap(raft: Raft[Int, TestCommands]) =
116+
def handleBootstrap(raft: Raft[Any, Int, TestCommands]) =
117117
raft.handleStreamItem(StreamItem.Bootstrap[TestCommands, Int]())
118118

119-
def handleTick(raft: Raft[Int, TestCommands]) =
119+
def handleTick(raft: Raft[Any, Int, TestCommands]) =
120120
raft.handleStreamItem(StreamItem.Tick[TestCommands, Int]())
121121

122-
def sendCommand(raft: Raft[Int, TestCommands], commandArg: TestCommands) =
122+
def sendCommand(raft: Raft[Any, Int, TestCommands], commandArg: TestCommands) =
123123
for
124124
promiseArg <- zio.Promise.make[NotALeaderError, Int]
125125
_ <- raft.handleStreamItem(new CommandMessage[TestCommands, Int] {
@@ -128,7 +128,7 @@ object RaftSpec extends ZIOSpecDefault:
128128
})
129129
yield ()
130130

131-
def bootstrap(raft: Raft[Int, TestCommands]) =
131+
def bootstrap(raft: Raft[Any, Int, TestCommands]) =
132132
for
133133
_ <- handleBootstrap(raft)
134134
_ <- handleVoteGranted(raft, Term(1), MemberId("peer2"))

raft/src/test/scala/zio/raft/TestStateMachine.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,16 +3,17 @@ package zio.raft
33
import zio.UIO
44
import zio.stream.{Stream, ZStream}
55
import zio.prelude.State
6+
import zio.prelude.fx.ZPure
67

78
sealed trait TestCommands extends Command:
89
type Response = Int
910
case object Increase extends TestCommands
1011
case object Get extends TestCommands
1112

12-
case class TestStateMachine(enableSnapshot: Boolean) extends StateMachine[Int, TestCommands]:
13+
case class TestStateMachine(enableSnapshot: Boolean) extends StateMachine[Any, Int, TestCommands]:
1314
override def emptyState: Int = 0
1415

15-
def apply(command: TestCommands): State[Int, command.Response] =
16+
def apply(command: TestCommands): ZPure[Any, Int, Int, Any, Nothing, command.Response] =
1617
command match
1718
case Increase => State.modify(s => (s + 1, s + 1))
1819
case Get => State.get

0 commit comments

Comments
 (0)