Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 6 additions & 7 deletions kvstore/src/main/scala/zio/kvstore/App.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@ import zio.raft.{Command, Index, MemberId, Raft, SnapshotStore, StateMachine}
import zio.stream.{Stream, ZStream}
import zio.zmq.ZContext
import zio.{Chunk, UIO, ZIO, ZIOAppArgs, ZLayer}
import zio.prelude.State

import scodec.Codec
import scodec.bits.BitVector
import scodec.codecs.{ascii, discriminated, fixedSizeBytes, utf8_32}
import zio.raft.stores.FileStable
import zio.raft.stores.segmentedlog.SegmentedLog
import zio.prelude.fx.ZPure

sealed trait KVCommand extends Command

Expand All @@ -34,7 +34,7 @@ object KVCommand:
.typecase("S", setCodec)
.typecase("G", getCodec)

class KVStateMachine extends StateMachine[Any, Map[String, String], KVCommand]:
class KVStateMachine extends StateMachine[Map[String, String], KVCommand]:

override def emptyState: Map[String, String] = Map.empty

Expand All @@ -52,14 +52,13 @@ class KVStateMachine extends StateMachine[Any, Map[String, String], KVCommand]:
override def shouldTakeSnapshot(lastSnaphotIndex: Index, lastSnapshotSize: Long, commitIndex: Index): Boolean = false
// commitIndex.value - lastSnaphotIndex.value > 2

override def apply(command: KVCommand)
: ZPure[Any, Map[String, String], Map[String, String], Any, Nothing, command.Response] =
override def apply(command: KVCommand): State[Map[String, String], command.Response] =
(command match
case Set(k, v) => ZPure.update((map: Map[String, String]) => map.updated(k, v))
case Get(k) => ZPure.get.map((map: Map[String, String]) => map.get(k).getOrElse(""))
case Set(k, v) => State.update((map: Map[String, String]) => map.updated(k, v))
case Get(k) => State.get.map((map: Map[String, String]) => map.get(k).getOrElse(""))
).map(_.asInstanceOf[command.Response])

class HttpServer(raft: Raft[Any, Map[String, String], KVCommand]):
class HttpServer(raft: Raft[Map[String, String], KVCommand]):

val app = Routes(
GET / "" -> handler(ZIO.succeed(Response.text("Hello World!"))),
Expand Down
14 changes: 7 additions & 7 deletions raft/src/main/scala/zio/raft/Raft.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ object StreamItem:
val command: A
val promise: CommandPromise[command.Response]

class Raft[W, S, A <: Command](
class Raft[S, A <: Command](
val memberId: MemberId,
peers: Peers,
private[raft] val raftState: Ref[State[S]],
Expand All @@ -32,7 +32,7 @@ class Raft[W, S, A <: Command](
private[raft] val logStore: LogStore[A],
snapshotStore: SnapshotStore,
rpc: RPC[A],
stateMachine: StateMachine[W, S, A],
stateMachine: StateMachine[S, A],
appStateRef: Ref[S]
):
val rpcTimeout = 50.millis
Expand Down Expand Up @@ -1010,14 +1010,14 @@ object Raft:
val electionTimeout = 150
val heartbeartInterval = (electionTimeout / 2).millis

def make[W, S, A <: Command](
def make[S, A <: Command](
memberId: MemberId,
peers: Peers,
stable: Stable,
logStore: LogStore[A],
snapshotStore: SnapshotStore,
rpc: RPC[A],
stateMachine: StateMachine[W, S, A]
stateMachine: StateMachine[S, A]
) =
for
now <- zio.Clock.instant
Expand Down Expand Up @@ -1047,7 +1047,7 @@ object Raft:
)
commandsQueue <- Queue.unbounded[StreamItem[A, S]] // TODO: should this be bounded for back-pressure?

raft = new Raft[W, S, A](
raft = new Raft[S, A](
memberId,
peers,
state,
Expand All @@ -1061,14 +1061,14 @@ object Raft:
)
yield raft

def makeScoped[W, S, A <: Command](
def makeScoped[S, A <: Command](
memberId: MemberId,
peers: Peers,
stable: Stable,
logStore: LogStore[A],
snapshotStore: SnapshotStore,
rpc: RPC[A],
stateMachine: StateMachine[W, S, A]
stateMachine: StateMachine[S, A]
) =
for
raft <- make(
Expand Down
6 changes: 3 additions & 3 deletions raft/src/main/scala/zio/raft/StateMachine.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@ package zio.raft

import zio.UIO
import zio.stream.Stream
import zio.prelude.fx.ZPure
import zio.prelude.State

trait StateMachine[W, S, A <: Command]:
trait StateMachine[S, A <: Command]:

def emptyState: S

def apply(command: A): ZPure[W, S, S, Any, Nothing, command.Response]
def apply(command: A): State[S, command.Response]

def takeSnapshot(state: S): Stream[Nothing, Byte]

Expand Down
18 changes: 9 additions & 9 deletions raft/src/test/scala/zio/raft/RaftIntegrationSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ object RaftIntegrationSpec extends ZIOSpecDefault:
}

private def findTheNewLeader(
currentLeader: Raft[Any, Int, TestCommands],
raft1: Raft[Any, Int, TestCommands],
raft2: Raft[Any, Int, TestCommands],
raft3: Raft[Any, Int, TestCommands]
currentLeader: Raft[Int, TestCommands],
raft1: Raft[Int, TestCommands],
raft2: Raft[Int, TestCommands],
raft3: Raft[Int, TestCommands]
) =
for
r1IsLeader <- raft1.isTheLeader
Expand All @@ -43,10 +43,10 @@ object RaftIntegrationSpec extends ZIOSpecDefault:
else None

private def waitForNewLeader(
currentLeader: Raft[Any, Int, TestCommands],
raft1: Raft[Any, Int, TestCommands],
raft2: Raft[Any, Int, TestCommands],
raft3: Raft[Any, Int, TestCommands]
currentLeader: Raft[Int, TestCommands],
raft1: Raft[Int, TestCommands],
raft2: Raft[Int, TestCommands],
raft3: Raft[Int, TestCommands]
) =
findTheNewLeader(currentLeader, raft1, raft2, raft3)
.tap:
Expand Down Expand Up @@ -75,7 +75,7 @@ object RaftIntegrationSpec extends ZIOSpecDefault:
snapshotStore1 <- SnapshotStore.makeInMemory
snapshotStore2 <- SnapshotStore.makeInMemory
snapshotStore3 <- SnapshotStore.makeInMemory
raft1 <- Raft.makeScoped[Any, Int, TestCommands](
raft1 <- Raft.makeScoped[Int, TestCommands](
MemberId("peer1"),
peers.filter(_ != MemberId("peer1")),
stable1,
Expand Down
24 changes: 12 additions & 12 deletions raft/src/test/scala/zio/raft/RaftSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ object RaftSpec extends ZIOSpecDefault:
// )) ++ testEnvironment


def makeRaft(memberId: MemberId, peers: Peers, enableSnapshot: Boolean): ZIO[Any, Nothing, (Raft[Any, Int, TestCommands], MockRpc[TestCommands])] =
def makeRaft(memberId: MemberId, peers: Peers, enableSnapshot: Boolean): ZIO[Any, Nothing, (Raft[Int, TestCommands], MockRpc[TestCommands])] =
(for
stable <- Stable.makeInMemory
logStore <- LogStore.makeInMemory[TestCommands]
Expand All @@ -37,20 +37,20 @@ object RaftSpec extends ZIOSpecDefault:
)
yield (raft, rpc))

def isCandidate(raft: Raft[Any, Int, TestCommands]) =
def isCandidate(raft: Raft[Int, TestCommands]) =
for s <- raft.raftState.get
yield if s.isInstanceOf[State.Candidate[Int]] then true else false

def isFollower(raft: Raft[Any, Int, TestCommands]) =
def isFollower(raft: Raft[Int, TestCommands]) =
for s <- raft.raftState.get
yield if s.isInstanceOf[State.Follower[Int]] then true else false

def expectFollower(raft: Raft[Any, Int, TestCommands]) =
def expectFollower(raft: Raft[Int, TestCommands]) =
raft.raftState.get.flatMap:
case f: State.Follower[Int] => ZIO.succeed(f)
case _ => ZIO.die(new Exception("Expected follower"))

def getLeader(raft: Raft[Any, Int, TestCommands]) =
def getLeader(raft: Raft[Int, TestCommands]) =
for s <- raft.raftState.get
yield s match
case State.Follower(commitIndex, lastApplied, electionTimeout, leaderId) =>
Expand All @@ -69,7 +69,7 @@ object RaftSpec extends ZIOSpecDefault:
Some(raft.memberId)

def handleHeartbeat(
raft: Raft[Any, Int, TestCommands],
raft: Raft[Int, TestCommands],
term: Term,
leaderId: MemberId,
commitIndex: Index
Expand All @@ -81,7 +81,7 @@ object RaftSpec extends ZIOSpecDefault:
)

def handleVoteGranted(
raft: Raft[Any, Int, TestCommands],
raft: Raft[Int, TestCommands],
term: Term,
memberId: MemberId
) =
Expand All @@ -92,7 +92,7 @@ object RaftSpec extends ZIOSpecDefault:
)

def handelAppendEntries(
raft: Raft[Any, Int, TestCommands],
raft: Raft[Int, TestCommands],
term: Term,
leaderId: MemberId,
previousIndex: Index,
Expand All @@ -113,13 +113,13 @@ object RaftSpec extends ZIOSpecDefault:
)
)

def handleBootstrap(raft: Raft[Any, Int, TestCommands]) =
def handleBootstrap(raft: Raft[Int, TestCommands]) =
raft.handleStreamItem(StreamItem.Bootstrap[TestCommands, Int]())

def handleTick(raft: Raft[Any, Int, TestCommands]) =
def handleTick(raft: Raft[Int, TestCommands]) =
raft.handleStreamItem(StreamItem.Tick[TestCommands, Int]())

def sendCommand(raft: Raft[Any, Int, TestCommands], commandArg: TestCommands) =
def sendCommand(raft: Raft[Int, TestCommands], commandArg: TestCommands) =
for
promiseArg <- zio.Promise.make[NotALeaderError, Int]
_ <- raft.handleStreamItem(new CommandMessage[TestCommands, Int] {
Expand All @@ -128,7 +128,7 @@ object RaftSpec extends ZIOSpecDefault:
})
yield ()

def bootstrap(raft: Raft[Any, Int, TestCommands]) =
def bootstrap(raft: Raft[Int, TestCommands]) =
for
_ <- handleBootstrap(raft)
_ <- handleVoteGranted(raft, Term(1), MemberId("peer2"))
Expand Down
10 changes: 5 additions & 5 deletions raft/src/test/scala/zio/raft/TestStateMachine.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,20 @@ package zio.raft

import zio.UIO
import zio.stream.{Stream, ZStream}
import zio.prelude.fx.ZPure
import zio.prelude.State

sealed trait TestCommands extends Command:
type Response = Int
case object Increase extends TestCommands
case object Get extends TestCommands

case class TestStateMachine(enableSnapshot: Boolean) extends StateMachine[Any, Int, TestCommands]:
case class TestStateMachine(enableSnapshot: Boolean) extends StateMachine[Int, TestCommands]:
override def emptyState: Int = 0

def apply(command: TestCommands): ZPure[Any, Int, Int, Any, Nothing, command.Response] =
def apply(command: TestCommands): State[Int, command.Response] =
command match
case Increase => ZPure.modify(s => (s + 1, s + 1))
case Get => ZPure.get
case Increase => State.modify(s => (s + 1, s + 1))
case Get => State.get

override def restoreFromSnapshot(stream: Stream[Nothing, Byte]): UIO[Int] =
stream.runCollect.map(b => new String(b.toArray).toInt)
Expand Down