Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions kvstore/src/main/scala/zio/kvstore/App.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import scodec.bits.BitVector
import scodec.codecs.{ascii, discriminated, fixedSizeBytes, utf8_32}
import zio.raft.stores.FileStable
import zio.raft.stores.segmentedlog.SegmentedLog
import zio.prelude.fx.ZPure

sealed trait KVCommand extends Command

Expand All @@ -34,7 +35,7 @@ object KVCommand:
.typecase("S", setCodec)
.typecase("G", getCodec)

class KVStateMachine extends StateMachine[Map[String, String], KVCommand]:
class KVStateMachine extends StateMachine[Any, Map[String, String], KVCommand]:

override def emptyState: Map[String, String] = Map.empty

Expand All @@ -52,13 +53,14 @@ class KVStateMachine extends StateMachine[Map[String, String], KVCommand]:
override def shouldTakeSnapshot(lastSnaphotIndex: Index, lastSnapshotSize: Long, commitIndex: Index): Boolean = false
// commitIndex.value - lastSnaphotIndex.value > 2

override def apply(command: KVCommand): State[Map[String, String], command.Response] =
override def apply(command: KVCommand)
: ZPure[Any, Map[String, String], Map[String, String], Any, Nothing, command.Response] =
(command match
case Set(k, v) => State.update((map: Map[String, String]) => map.updated(k, v))
case Get(k) => State.get.map((map: Map[String, String]) => map.get(k).getOrElse(""))
).map(_.asInstanceOf[command.Response])

class HttpServer(raft: Raft[Map[String, String], KVCommand]):
class HttpServer(raft: Raft[Any, Map[String, String], KVCommand]):

val app = Routes(
GET / "" -> handler(ZIO.succeed(Response.text("Hello World!"))),
Expand Down
14 changes: 7 additions & 7 deletions raft/src/main/scala/zio/raft/Raft.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ object StreamItem:
val command: A
val promise: CommandPromise[command.Response]

class Raft[S, A <: Command](
class Raft[W, S, A <: Command](
val memberId: MemberId,
peers: Peers,
private[raft] val raftState: Ref[State[S]],
Expand All @@ -32,7 +32,7 @@ class Raft[S, A <: Command](
private[raft] val logStore: LogStore[A],
snapshotStore: SnapshotStore,
rpc: RPC[A],
stateMachine: StateMachine[S, A],
stateMachine: StateMachine[W, S, A],
appStateRef: Ref[S]
):
val rpcTimeout = 50.millis
Expand Down Expand Up @@ -1010,14 +1010,14 @@ object Raft:
val electionTimeout = 150
val heartbeartInterval = (electionTimeout / 2).millis

def make[S, A <: Command](
def make[W, S, A <: Command](
memberId: MemberId,
peers: Peers,
stable: Stable,
logStore: LogStore[A],
snapshotStore: SnapshotStore,
rpc: RPC[A],
stateMachine: StateMachine[S, A]
stateMachine: StateMachine[W, S, A]
) =
for
now <- zio.Clock.instant
Expand Down Expand Up @@ -1047,7 +1047,7 @@ object Raft:
)
commandsQueue <- Queue.unbounded[StreamItem[A, S]] // TODO: should this be bounded for back-pressure?

raft = new Raft[S, A](
raft = new Raft[W, S, A](
memberId,
peers,
state,
Expand All @@ -1061,14 +1061,14 @@ object Raft:
)
yield raft

def makeScoped[S, A <: Command](
def makeScoped[W, S, A <: Command](
memberId: MemberId,
peers: Peers,
stable: Stable,
logStore: LogStore[A],
snapshotStore: SnapshotStore,
rpc: RPC[A],
stateMachine: StateMachine[S, A]
stateMachine: StateMachine[W, S, A]
) =
for
raft <- make(
Expand Down
6 changes: 3 additions & 3 deletions raft/src/main/scala/zio/raft/StateMachine.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@ package zio.raft

import zio.UIO
import zio.stream.Stream
import zio.prelude.State
import zio.prelude.fx.ZPure

trait StateMachine[S, A <: Command]:
trait StateMachine[W, S, A <: Command]:

def emptyState: S

def apply(command: A): State[S, command.Response]
def apply(command: A): ZPure[W, S, S, Any, Nothing, command.Response]

def takeSnapshot(state: S): Stream[Nothing, Byte]

Expand Down
18 changes: 9 additions & 9 deletions raft/src/test/scala/zio/raft/RaftIntegrationSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ object RaftIntegrationSpec extends ZIOSpecDefault:
}

private def findTheNewLeader(
currentLeader: Raft[Int, TestCommands],
raft1: Raft[Int, TestCommands],
raft2: Raft[Int, TestCommands],
raft3: Raft[Int, TestCommands]
currentLeader: Raft[Any, Int, TestCommands],
raft1: Raft[Any, Int, TestCommands],
raft2: Raft[Any, Int, TestCommands],
raft3: Raft[Any, Int, TestCommands]
) =
for
r1IsLeader <- raft1.isTheLeader
Expand All @@ -43,10 +43,10 @@ object RaftIntegrationSpec extends ZIOSpecDefault:
else None

private def waitForNewLeader(
currentLeader: Raft[Int, TestCommands],
raft1: Raft[Int, TestCommands],
raft2: Raft[Int, TestCommands],
raft3: Raft[Int, TestCommands]
currentLeader: Raft[Any, Int, TestCommands],
raft1: Raft[Any, Int, TestCommands],
raft2: Raft[Any, Int, TestCommands],
raft3: Raft[Any, Int, TestCommands]
) =
findTheNewLeader(currentLeader, raft1, raft2, raft3)
.tap:
Expand Down Expand Up @@ -75,7 +75,7 @@ object RaftIntegrationSpec extends ZIOSpecDefault:
snapshotStore1 <- SnapshotStore.makeInMemory
snapshotStore2 <- SnapshotStore.makeInMemory
snapshotStore3 <- SnapshotStore.makeInMemory
raft1 <- Raft.makeScoped[Int, TestCommands](
raft1 <- Raft.makeScoped[Any, Int, TestCommands](
MemberId("peer1"),
peers.filter(_ != MemberId("peer1")),
stable1,
Expand Down
24 changes: 12 additions & 12 deletions raft/src/test/scala/zio/raft/RaftSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ object RaftSpec extends ZIOSpecDefault:
// )) ++ testEnvironment


def makeRaft(memberId: MemberId, peers: Peers, enableSnapshot: Boolean): ZIO[Any, Nothing, (Raft[Int, TestCommands], MockRpc[TestCommands])] =
def makeRaft(memberId: MemberId, peers: Peers, enableSnapshot: Boolean): ZIO[Any, Nothing, (Raft[Any, Int, TestCommands], MockRpc[TestCommands])] =
(for
stable <- Stable.makeInMemory
logStore <- LogStore.makeInMemory[TestCommands]
Expand All @@ -37,20 +37,20 @@ object RaftSpec extends ZIOSpecDefault:
)
yield (raft, rpc))

def isCandidate(raft: Raft[Int, TestCommands]) =
def isCandidate(raft: Raft[Any, Int, TestCommands]) =
for s <- raft.raftState.get
yield if s.isInstanceOf[State.Candidate[Int]] then true else false

def isFollower(raft: Raft[Int, TestCommands]) =
def isFollower(raft: Raft[Any, Int, TestCommands]) =
for s <- raft.raftState.get
yield if s.isInstanceOf[State.Follower[Int]] then true else false

def expectFollower(raft: Raft[Int, TestCommands]) =
def expectFollower(raft: Raft[Any, Int, TestCommands]) =
raft.raftState.get.flatMap:
case f: State.Follower[Int] => ZIO.succeed(f)
case _ => ZIO.die(new Exception("Expected follower"))

def getLeader(raft: Raft[Int, TestCommands]) =
def getLeader(raft: Raft[Any, Int, TestCommands]) =
for s <- raft.raftState.get
yield s match
case State.Follower(commitIndex, lastApplied, electionTimeout, leaderId) =>
Expand All @@ -69,7 +69,7 @@ object RaftSpec extends ZIOSpecDefault:
Some(raft.memberId)

def handleHeartbeat(
raft: Raft[Int, TestCommands],
raft: Raft[Any, Int, TestCommands],
term: Term,
leaderId: MemberId,
commitIndex: Index
Expand All @@ -81,7 +81,7 @@ object RaftSpec extends ZIOSpecDefault:
)

def handleVoteGranted(
raft: Raft[Int, TestCommands],
raft: Raft[Any, Int, TestCommands],
term: Term,
memberId: MemberId
) =
Expand All @@ -92,7 +92,7 @@ object RaftSpec extends ZIOSpecDefault:
)

def handelAppendEntries(
raft: Raft[Int, TestCommands],
raft: Raft[Any, Int, TestCommands],
term: Term,
leaderId: MemberId,
previousIndex: Index,
Expand All @@ -113,13 +113,13 @@ object RaftSpec extends ZIOSpecDefault:
)
)

def handleBootstrap(raft: Raft[Int, TestCommands]) =
def handleBootstrap(raft: Raft[Any, Int, TestCommands]) =
raft.handleStreamItem(StreamItem.Bootstrap[TestCommands, Int]())

def handleTick(raft: Raft[Int, TestCommands]) =
def handleTick(raft: Raft[Any, Int, TestCommands]) =
raft.handleStreamItem(StreamItem.Tick[TestCommands, Int]())

def sendCommand(raft: Raft[Int, TestCommands], commandArg: TestCommands) =
def sendCommand(raft: Raft[Any, Int, TestCommands], commandArg: TestCommands) =
for
promiseArg <- zio.Promise.make[NotALeaderError, Int]
_ <- raft.handleStreamItem(new CommandMessage[TestCommands, Int] {
Expand All @@ -128,7 +128,7 @@ object RaftSpec extends ZIOSpecDefault:
})
yield ()

def bootstrap(raft: Raft[Int, TestCommands]) =
def bootstrap(raft: Raft[Any, Int, TestCommands]) =
for
_ <- handleBootstrap(raft)
_ <- handleVoteGranted(raft, Term(1), MemberId("peer2"))
Expand Down
5 changes: 3 additions & 2 deletions raft/src/test/scala/zio/raft/TestStateMachine.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,17 @@ package zio.raft
import zio.UIO
import zio.stream.{Stream, ZStream}
import zio.prelude.State
import zio.prelude.fx.ZPure

sealed trait TestCommands extends Command:
type Response = Int
case object Increase extends TestCommands
case object Get extends TestCommands

case class TestStateMachine(enableSnapshot: Boolean) extends StateMachine[Int, TestCommands]:
case class TestStateMachine(enableSnapshot: Boolean) extends StateMachine[Any, Int, TestCommands]:
override def emptyState: Int = 0

def apply(command: TestCommands): State[Int, command.Response] =
def apply(command: TestCommands): ZPure[Any, Int, Int, Any, Nothing, command.Response] =
command match
case Increase => State.modify(s => (s + 1, s + 1))
case Get => State.get
Expand Down