Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 66 additions & 18 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -62,25 +62,39 @@ scalaVersion := mainScalaVersion
resolvers +=
"Sonatype OSS Snapshots" at "https://central.sonatype.com/repository/maven-snapshots/"

lazy val commonScalacOptions = Def.setting{
lazy val commonScalacOptions = Def.setting {
CrossVersion.partialVersion(scalaVersion.value) match {
case Some((2, n)) => List(
"-Xsource:3.7-migration",
"-Ymacro-annotations",
"-Wunused:imports"
"-Xsource:3.7-migration",
"-Ymacro-annotations",
"-Wunused:imports"
)
case Some((3, n)) => List(
"-Wunused:imports",
"-source:future",
"-deprecation",
"-Wunused:imports",
"-source:future",
"-deprecation"
)
case _ => List()
case _ => List()
}
}

lazy val root = project
.in(file("."))
.aggregate(raft, kvstore, zio1zmq, zio2zmq, raftZmq, stores, ziolmdb, clientServerProtocol, clientServerServer, clientServerClient, sessionStateMachine)
.aggregate(
raft,
kvstore,
kvstoreCli,
kvstoreProtocol,
zio1zmq,
zio2zmq,
raftZmq,
stores,
ziolmdb,
clientServerProtocol,
clientServerServer,
clientServerClient,
sessionStateMachine
)
.settings(
publish / skip := true,
crossScalaVersions := Nil
Expand All @@ -99,7 +113,7 @@ lazy val raft = project
"dev.zio" %% "zio-test" % zio2Version % Test,
"dev.zio" %% "zio-test-sbt" % zio2Version % Test,
"dev.zio" %% "zio-nio" % "2.0.0",
"dev.zio" %% "zio-prelude" % zioPreludeVersion,
"dev.zio" %% "zio-prelude" % zioPreludeVersion
),
excludeDependencies += "org.scala-lang.modules" % "scala-collection-compat_2.13"
)
Expand All @@ -114,13 +128,15 @@ lazy val kvstore = project
libraryDependencies ++= Seq(
"dev.zio" %% "zio" % zio2Version,
"dev.zio" %% "zio-prelude" % zioPreludeVersion,
"dev.zio" %% "zio-http" % "3.0.0-RC8",
"dev.zio" %% "zio-config" % "4.0.4",
"dev.zio" %% "zio-config-magnolia" % "4.0.4",
"dev.zio" %% "zio-logging" % "2.5.1",
"dev.zio" %% "zio-test" % zio2Version % Test,
"dev.zio" %% "zio-test-sbt" % zio2Version % Test
),
excludeDependencies += "org.scala-lang.modules" % "scala-collection-compat_2.13"
)
.dependsOn(raft, raftZmq, stores, sessionStateMachine, clientServerProtocol)
.dependsOn(raft, raftZmq, stores, sessionStateMachine, clientServerProtocol, clientServerServer, kvstoreProtocol)

lazy val raftZmq = project
.in(file("raft-zmq"))
Expand Down Expand Up @@ -190,7 +206,7 @@ lazy val ziolmdb = project
"dev.zio" %% "zio-nio" % "2.0.0",
"dev.zio" %% "zio-streams" % zio2Version,
"dev.zio" %% "zio-test" % zio2Version % Test,
"dev.zio" %% "zio-test-sbt" % zio2Version % Test,
"dev.zio" %% "zio-test-sbt" % zio2Version % Test,
"org.lmdbjava" % "lmdbjava" % "0.9.0"
)
)
Expand Down Expand Up @@ -227,11 +243,11 @@ lazy val clientServerProtocol = project
"dev.zio" %% "zio-test-sbt" % zio2Version % Test
) ++ (CrossVersion.partialVersion(scalaVersion.value) match {
case Some((2, _)) => Seq(
"org.scodec" %% "scodec-core" % "1.11.10"
)
"org.scodec" %% "scodec-core" % "1.11.10"
)
case Some((3, _)) => Seq(
"org.scodec" %% "scodec-core" % "2.3.2"
)
"org.scodec" %% "scodec-core" % "2.3.2"
)
case _ => Seq.empty
}),
excludeDependencies += "org.scala-lang.modules" % "scala-collection-compat_2.13"
Expand Down Expand Up @@ -297,4 +313,36 @@ lazy val sessionStateMachine = project
),
excludeDependencies += "org.scala-lang.modules" % "scala-collection-compat_2.13"
)
.dependsOn(raft, clientServerProtocol)
.dependsOn(raft, clientServerProtocol)

lazy val kvstoreProtocol = project
.in(file("kvstore-protocol"))
.settings(
name := "kvstore-protocol",
publish / skip := true,
scalaVersion := mainScalaVersion,
scalacOptions ++= commonScalacOptions.value,
libraryDependencies ++= Seq(
"dev.zio" %% "zio" % zio2Version,
"org.scodec" %% "scodec-core" % "2.3.2",
"org.scodec" %% "scodec-bits" % "1.1.37"
)
)

lazy val kvstoreCli = project
.in(file("kvstore-cli"))
.settings(
name := "kvstore-cli",
publish / skip := true,
scalaVersion := mainScalaVersion,
scalacOptions ++= commonScalacOptions.value,
libraryDependencies ++= Seq(
"dev.zio" %% "zio" % zio2Version,
"dev.zio" %% "zio-streams" % zio2Version,
"dev.zio" %% "zio-cli" % "0.7.3",
"dev.zio" %% "zio-test" % zio2Version % Test,
"dev.zio" %% "zio-test-sbt" % zio2Version % Test
),
excludeDependencies += "org.scala-lang.modules" % "scala-collection-compat_2.13"
)
.dependsOn(kvstore, clientServerClient, kvstoreProtocol)
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ object RaftClient {

// Register finalizer to cleanly close session on scope exit
_ <- ZIO.addFinalizer(
zmqTransport.sendMessage(CloseSession(CloseReason.ClientShutdown)).orDie
zmqTransport.sendMessage(CloseSession(CloseReason.ClientShutdown)).ignore
)

} yield client
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,13 @@ final class RaftServer(
def stepDown(leaderId: Option[MemberId]): UIO[Unit] =
actionQueue.offer(RaftServer.ServerAction.StepDown(leaderId)).unit

/** Notify server that this node has changed leader.
* @param leaderId
* The new leader ID
*/
def leaderChanged(leaderId: MemberId): UIO[Unit] =
actionQueue.offer(RaftServer.ServerAction.LeaderChanged(leaderId)).unit

object RaftServer:

/** Actions initiated by the server (internal or from Raft).
Expand All @@ -62,6 +69,7 @@ object RaftServer:
case class SessionCreationConfirmed(sessionId: SessionId) extends ServerAction
case class StepUp(sessions: Map[SessionId, SessionMetadata]) extends ServerAction
case class StepDown(leaderId: Option[MemberId]) extends ServerAction
case class LeaderChanged(leaderId: MemberId) extends ServerAction

/** Actions to forward to Raft state machine.
*/
Expand Down Expand Up @@ -183,6 +191,9 @@ object RaftServer:
case StreamEvent.Action(ServerAction.StepDown(_)) =>
ZIO.succeed(this)

case StreamEvent.Action(ServerAction.LeaderChanged(leaderId)) =>
ZIO.succeed(this.copy(leaderId = Some(leaderId)))

case StreamEvent.Action(ServerAction.SendResponse(_, _)) =>
ZIO.logWarning("Cannot send response - not leader").as(this)

Expand Down Expand Up @@ -262,6 +273,9 @@ object RaftServer:
_ <- sessions.stepDown(transport, leaderId)
yield Follower(leaderId)

case StreamEvent.Action(ServerAction.LeaderChanged(leaderId)) =>
ZIO.logWarning(s"We received LeaderChanged event while in Leader state, this should not happen").as(this)

case StreamEvent.Action(ServerAction.SendResponse(sessionId, response)) =>
sessions.getRoutingId(sessionId) match
case Some(routingId) =>
Expand Down
43 changes: 43 additions & 0 deletions kvstore-cli/src/main/scala/zio/kvstore/cli/KVClient.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package zio.kvstore.cli

import zio.*
import zio.stream.*
import zio.raft.client.RaftClient
import zio.raft.protocol.MemberId
import scodec.Codec
import scodec.bits.ByteVector
import zio.kvstore.protocol.{KVClientRequest, KVClientResponse, KVServerRequest}
import zio.kvstore.protocol.KVClientRequest.given
import zio.kvstore.protocol.KVClientResponse.given
import zio.kvstore.protocol.KVServerRequest.given

final class KVClient private (private val raft: RaftClient):

def run(): ZIO[Scope, Throwable, Unit] = raft.run().unit

def connect(): ZIO[Any, Throwable, Unit] = raft.connect()

def set(key: String, value: String): ZIO[Any, Throwable, Unit] =
val payload: ByteVector = summon[Codec[KVClientRequest]].encode(KVClientRequest.Set(key, value)).require.bytes
raft.submitCommand(payload).map(bytes => summon[Codec[Unit]].decode(bytes.bits).require.value)

def get(key: String): ZIO[Any, Throwable, Option[String]] =
val payload: ByteVector = summon[Codec[KVClientRequest]].encode(KVClientRequest.Get(key)).require.bytes
raft.submitCommand(payload).map(bytes => summon[Codec[Option[String]]].decode(bytes.bits).require.value)

// Registers the watch on the server; notifications are emitted via `notifications` stream
def watch(key: String): ZIO[Any, Throwable, Unit] =
val payload: ByteVector = summon[Codec[KVClientRequest]].encode(KVClientRequest.Watch(key)).require.bytes
raft.submitCommand(payload).map(bytes => summon[Codec[Unit]].decode(bytes.bits).require.value)

// Stream of server-side watch notifications
val notifications: ZStream[Any, Nothing, KVServerRequest.Notification] =
raft.serverRequests.map { env =>
summon[Codec[KVServerRequest]].decode(env.payload.bits).require.value
}.collect { case n: KVServerRequest.Notification => n }

object KVClient:
private val defaultCapabilities: Map[String, String] = Map("app" -> "kvstore-cli")

def make(endpoints: Map[MemberId, String]): ZIO[zio.zmq.ZContext & Scope, Throwable, KVClient] =
RaftClient.make(endpoints, defaultCapabilities).map(new KVClient(_))
90 changes: 90 additions & 0 deletions kvstore-cli/src/main/scala/zio/kvstore/cli/Main.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package zio.kvstore.cli

import zio.*
import zio.cli.*
import zio.cli.HelpDoc.Span.text
import zio.raft.protocol.MemberId

object Main extends ZIOCliDefault:

private def parseEndpoints(endpoints: String): Map[MemberId, String] =
endpoints
.split(",")
.iterator
.map(_.trim)
.filter(_.nonEmpty)
.map { kv =>
val Array(id, addr) = kv.split("=", 2)
MemberId(id) -> addr
}
.toMap

// Model for CLI
sealed trait Action
final case class DoSet(endpoints: String, key: String, value: String) extends Action
final case class DoGet(endpoints: String, key: String) extends Action
final case class DoWatch(endpoints: String, key: String) extends Action

// Shared options/args
private val endpointsOpt: Options[String] =
Options.text("endpoints").alias("e").withDefault("node-1=tcp://127.0.0.1:7001")
private val keyArg: Args[String] = Args.text("key")
private val valueArg: Args[String] = Args.text("value")

// Commands
private val setCmd: Command[Action] =
Command("set", endpointsOpt, keyArg ++ valueArg)
.withHelp(HelpDoc.p("Set key to value"))
.map { case (endpoints, (key, value)) => DoSet(endpoints, key, value) }

private val getCmd: Command[Action] =
Command("get", endpointsOpt, keyArg)
.withHelp(HelpDoc.p("Get key"))
.map { case (endpoints, key) => DoGet(endpoints, key) }

private val watchCmd: Command[Action] =
Command("watch", endpointsOpt, keyArg)
.withHelp(HelpDoc.p("Watch key for updates"))
.map { case (endpoints, key) => DoWatch(endpoints, key) }

private val root: Command[Action] = Command("kvstore").subcommands(setCmd, getCmd, watchCmd)

val cliApp: CliApp[Any, Any, Unit] =
CliApp.make[Any, Any, Action, Unit](
name = "kvstore",
version = "0.1.0",
summary = text("KVStore CLI"),
command = root
) {
case DoSet(endpoints, key, value) =>
val members = parseEndpoints(endpoints)
(for
client <- KVClient.make(members)
_ <- client.run().forkScoped
_ <- client.connect()
_ <- client.set(key, value)
_ <- Console.printLine("OK")
yield ()).provideSomeLayer(zio.zmq.ZContext.live ++ Scope.default)

case DoGet(endpoints, key) =>
val members = parseEndpoints(endpoints)
(for
client <- KVClient.make(members)
_ <- client.run().forkScoped
_ <- client.connect()
result <- client.get(key)
_ <- Console.printLine(s"${key} = ${result.getOrElse("<none>")}")
yield ()).provideSomeLayer(zio.zmq.ZContext.live ++ Scope.default)

case DoWatch(endpoints, key) =>
val members = parseEndpoints(endpoints)
(for
client <- KVClient.make(members)
_ <- client.run().forkScoped
_ <- client.connect()
_ <- client.watch(key)
_ <- Console.printLine(s"watching ${key} - press Ctrl+C to stop")
_ <- client.notifications.map(n => s"${n.key} -> ${n.value}").foreach(Console.printLine(_))
yield ()).provideSomeLayer(zio.zmq.ZContext.live ++ Scope.default)
}
end Main
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package zio.kvstore.protocol

import scodec.Codec
import scodec.codecs.{ascii, bool, optional, discriminated, fixedSizeBytes, utf8_32}

// Client-visible request types (distinct from internal Raft state machine commands)
sealed trait KVClientRequest
object KVClientRequest:
final case class Set(key: String, value: String) extends KVClientRequest
final case class Get(key: String) extends KVClientRequest
final case class Watch(key: String) extends KVClientRequest

private val setCodec: Codec[Set] = (utf8_32 :: utf8_32).as[Set]
private val getCodec: Codec[Get] = utf8_32.as[Get]
private val watchCodec: Codec[Watch] = utf8_32.as[Watch]

given Codec[KVClientRequest] =
discriminated[KVClientRequest].by(fixedSizeBytes(1, ascii))
.typecase("S", setCodec)
.typecase("G", getCodec)
.typecase("W", watchCodec)

// Plain Scala response types for the client API
object KVClientResponse:
type SetDone = Unit
type GetResult = Option[String]
type WatchDone = Unit

given unitCodec: Codec[Unit] = scodec.codecs.provide(())

given optionStringCodec: Codec[Option[String]] =
optional(bool(8), utf8_32)
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package zio.kvstore.protocol

import scodec.Codec
import scodec.codecs.{ascii, discriminated, fixedSizeBytes, utf8_32}

sealed trait KVServerRequest
object KVServerRequest:
final case class Notification(key: String, value: String) extends KVServerRequest

private val notificationCodec: Codec[Notification] = (utf8_32 :: utf8_32).as[Notification]

given Codec[KVServerRequest] =
discriminated[KVServerRequest]
.by(fixedSizeBytes(1, ascii))
.typecase("N", notificationCodec)
Loading