Skip to content

Commit 1f2fcf7

Browse files
committed
cli
1 parent 450985a commit 1f2fcf7

File tree

16 files changed

+493
-119
lines changed

16 files changed

+493
-119
lines changed

build.sbt

Lines changed: 66 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -62,25 +62,39 @@ scalaVersion := mainScalaVersion
6262
resolvers +=
6363
"Sonatype OSS Snapshots" at "https://central.sonatype.com/repository/maven-snapshots/"
6464

65-
lazy val commonScalacOptions = Def.setting{
65+
lazy val commonScalacOptions = Def.setting {
6666
CrossVersion.partialVersion(scalaVersion.value) match {
6767
case Some((2, n)) => List(
68-
"-Xsource:3.7-migration",
69-
"-Ymacro-annotations",
70-
"-Wunused:imports"
68+
"-Xsource:3.7-migration",
69+
"-Ymacro-annotations",
70+
"-Wunused:imports"
7171
)
7272
case Some((3, n)) => List(
73-
"-Wunused:imports",
74-
"-source:future",
75-
"-deprecation",
73+
"-Wunused:imports",
74+
"-source:future",
75+
"-deprecation"
7676
)
77-
case _ => List()
77+
case _ => List()
7878
}
7979
}
8080

8181
lazy val root = project
8282
.in(file("."))
83-
.aggregate(raft, kvstore, zio1zmq, zio2zmq, raftZmq, stores, ziolmdb, clientServerProtocol, clientServerServer, clientServerClient, sessionStateMachine)
83+
.aggregate(
84+
raft,
85+
kvstore,
86+
kvstoreCli,
87+
kvstoreProtocol,
88+
zio1zmq,
89+
zio2zmq,
90+
raftZmq,
91+
stores,
92+
ziolmdb,
93+
clientServerProtocol,
94+
clientServerServer,
95+
clientServerClient,
96+
sessionStateMachine
97+
)
8498
.settings(
8599
publish / skip := true,
86100
crossScalaVersions := Nil
@@ -99,7 +113,7 @@ lazy val raft = project
99113
"dev.zio" %% "zio-test" % zio2Version % Test,
100114
"dev.zio" %% "zio-test-sbt" % zio2Version % Test,
101115
"dev.zio" %% "zio-nio" % "2.0.0",
102-
"dev.zio" %% "zio-prelude" % zioPreludeVersion,
116+
"dev.zio" %% "zio-prelude" % zioPreludeVersion
103117
),
104118
excludeDependencies += "org.scala-lang.modules" % "scala-collection-compat_2.13"
105119
)
@@ -114,13 +128,15 @@ lazy val kvstore = project
114128
libraryDependencies ++= Seq(
115129
"dev.zio" %% "zio" % zio2Version,
116130
"dev.zio" %% "zio-prelude" % zioPreludeVersion,
117-
"dev.zio" %% "zio-http" % "3.0.0-RC8",
131+
"dev.zio" %% "zio-config" % "4.0.4",
132+
"dev.zio" %% "zio-config-magnolia" % "4.0.4",
133+
"dev.zio" %% "zio-logging" % "2.5.1",
118134
"dev.zio" %% "zio-test" % zio2Version % Test,
119135
"dev.zio" %% "zio-test-sbt" % zio2Version % Test
120136
),
121137
excludeDependencies += "org.scala-lang.modules" % "scala-collection-compat_2.13"
122138
)
123-
.dependsOn(raft, raftZmq, stores, sessionStateMachine, clientServerProtocol, clientServerServer)
139+
.dependsOn(raft, raftZmq, stores, sessionStateMachine, clientServerProtocol, clientServerServer, kvstoreProtocol)
124140

125141
lazy val raftZmq = project
126142
.in(file("raft-zmq"))
@@ -190,7 +206,7 @@ lazy val ziolmdb = project
190206
"dev.zio" %% "zio-nio" % "2.0.0",
191207
"dev.zio" %% "zio-streams" % zio2Version,
192208
"dev.zio" %% "zio-test" % zio2Version % Test,
193-
"dev.zio" %% "zio-test-sbt" % zio2Version % Test,
209+
"dev.zio" %% "zio-test-sbt" % zio2Version % Test,
194210
"org.lmdbjava" % "lmdbjava" % "0.9.0"
195211
)
196212
)
@@ -227,11 +243,11 @@ lazy val clientServerProtocol = project
227243
"dev.zio" %% "zio-test-sbt" % zio2Version % Test
228244
) ++ (CrossVersion.partialVersion(scalaVersion.value) match {
229245
case Some((2, _)) => Seq(
230-
"org.scodec" %% "scodec-core" % "1.11.10"
231-
)
246+
"org.scodec" %% "scodec-core" % "1.11.10"
247+
)
232248
case Some((3, _)) => Seq(
233-
"org.scodec" %% "scodec-core" % "2.3.2"
234-
)
249+
"org.scodec" %% "scodec-core" % "2.3.2"
250+
)
235251
case _ => Seq.empty
236252
}),
237253
excludeDependencies += "org.scala-lang.modules" % "scala-collection-compat_2.13"
@@ -297,4 +313,36 @@ lazy val sessionStateMachine = project
297313
),
298314
excludeDependencies += "org.scala-lang.modules" % "scala-collection-compat_2.13"
299315
)
300-
.dependsOn(raft, clientServerProtocol)
316+
.dependsOn(raft, clientServerProtocol)
317+
318+
lazy val kvstoreProtocol = project
319+
.in(file("kvstore-protocol"))
320+
.settings(
321+
name := "kvstore-protocol",
322+
publish / skip := true,
323+
scalaVersion := mainScalaVersion,
324+
scalacOptions ++= commonScalacOptions.value,
325+
libraryDependencies ++= Seq(
326+
"dev.zio" %% "zio" % zio2Version,
327+
"org.scodec" %% "scodec-core" % "2.3.2",
328+
"org.scodec" %% "scodec-bits" % "1.1.37"
329+
)
330+
)
331+
332+
lazy val kvstoreCli = project
333+
.in(file("kvstore-cli"))
334+
.settings(
335+
name := "kvstore-cli",
336+
publish / skip := true,
337+
scalaVersion := mainScalaVersion,
338+
scalacOptions ++= commonScalacOptions.value,
339+
libraryDependencies ++= Seq(
340+
"dev.zio" %% "zio" % zio2Version,
341+
"dev.zio" %% "zio-streams" % zio2Version,
342+
"dev.zio" %% "zio-cli" % "0.7.3",
343+
"dev.zio" %% "zio-test" % zio2Version % Test,
344+
"dev.zio" %% "zio-test-sbt" % zio2Version % Test
345+
),
346+
excludeDependencies += "org.scala-lang.modules" % "scala-collection-compat_2.13"
347+
)
348+
.dependsOn(kvstore, clientServerClient, kvstoreProtocol)

client-server-client/src/main/scala/zio/raft/client/RaftClient.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ object RaftClient {
8282

8383
// Register finalizer to cleanly close session on scope exit
8484
_ <- ZIO.addFinalizer(
85-
zmqTransport.sendMessage(CloseSession(CloseReason.ClientShutdown)).orDie
85+
zmqTransport.sendMessage(CloseSession(CloseReason.ClientShutdown)).ignore
8686
)
8787

8888
} yield client
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package zio.kvstore.cli
2+
3+
import zio.*
4+
import zio.stream.*
5+
import zio.raft.client.RaftClient
6+
import zio.raft.protocol.MemberId
7+
import scodec.Codec
8+
import scodec.bits.ByteVector
9+
import zio.kvstore.protocol.{KVClientRequest, KVClientResponse, KVServerRequest}
10+
import zio.kvstore.protocol.KVClientRequest.given
11+
import zio.kvstore.protocol.KVClientResponse.given
12+
import zio.kvstore.protocol.KVServerRequest.given
13+
14+
final class KVClient private (private val raft: RaftClient):
15+
16+
def run(): ZIO[Scope, Throwable, Unit] = raft.run().unit
17+
18+
def connect(): ZIO[Any, Throwable, Unit] = raft.connect()
19+
20+
def set(key: String, value: String): ZIO[Any, Throwable, Unit] =
21+
val payload: ByteVector = summon[Codec[KVClientRequest]].encode(KVClientRequest.Set(key, value)).require.bytes
22+
raft.submitCommand(payload).map(bytes => summon[Codec[Unit]].decode(bytes.bits).require.value)
23+
24+
def get(key: String): ZIO[Any, Throwable, Option[String]] =
25+
val payload: ByteVector = summon[Codec[KVClientRequest]].encode(KVClientRequest.Get(key)).require.bytes
26+
raft.submitCommand(payload).map(bytes => summon[Codec[Option[String]]].decode(bytes.bits).require.value)
27+
28+
// Registers the watch on the server; notifications are emitted via `notifications` stream
29+
def watch(key: String): ZIO[Any, Throwable, Unit] =
30+
val payload: ByteVector = summon[Codec[KVClientRequest]].encode(KVClientRequest.Watch(key)).require.bytes
31+
raft.submitCommand(payload).map(bytes => summon[Codec[Unit]].decode(bytes.bits).require.value)
32+
33+
// Stream of server-side watch notifications
34+
val notifications: ZStream[Any, Nothing, KVServerRequest.Notification] =
35+
raft.serverRequests.map { env =>
36+
summon[Codec[KVServerRequest]].decode(env.payload.bits).require.value
37+
}.collect { case n: KVServerRequest.Notification => n }
38+
39+
object KVClient:
40+
private val defaultCapabilities: Map[String, String] = Map("app" -> "kvstore-cli")
41+
42+
def make(endpoints: Map[MemberId, String]): ZIO[zio.zmq.ZContext & Scope, Throwable, KVClient] =
43+
RaftClient.make(endpoints, defaultCapabilities).map(new KVClient(_))
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
package zio.kvstore.cli
2+
3+
import zio.*
4+
import zio.cli.*
5+
import zio.cli.HelpDoc.Span.text
6+
import zio.raft.protocol.MemberId
7+
8+
object Main extends ZIOCliDefault:
9+
10+
private def parseEndpoints(endpoints: String): Map[MemberId, String] =
11+
endpoints
12+
.split(",")
13+
.iterator
14+
.map(_.trim)
15+
.filter(_.nonEmpty)
16+
.map { kv =>
17+
val Array(id, addr) = kv.split("=", 2)
18+
MemberId(id) -> addr
19+
}
20+
.toMap
21+
22+
// Model for CLI
23+
sealed trait Action
24+
final case class DoSet(endpoints: String, key: String, value: String) extends Action
25+
final case class DoGet(endpoints: String, key: String) extends Action
26+
final case class DoWatch(endpoints: String, key: String) extends Action
27+
28+
// Shared options/args
29+
private val endpointsOpt: Options[String] =
30+
Options.text("endpoints").alias("e").withDefault("node-1=tcp://127.0.0.1:7001")
31+
private val keyArg: Args[String] = Args.text("key")
32+
private val valueArg: Args[String] = Args.text("value")
33+
34+
// Commands
35+
private val setCmd: Command[Action] =
36+
Command("set", endpointsOpt, keyArg ++ valueArg)
37+
.withHelp(HelpDoc.p("Set key to value"))
38+
.map { case (endpoints, (key, value)) => DoSet(endpoints, key, value) }
39+
40+
private val getCmd: Command[Action] =
41+
Command("get", endpointsOpt, keyArg)
42+
.withHelp(HelpDoc.p("Get key"))
43+
.map { case (endpoints, key) => DoGet(endpoints, key) }
44+
45+
private val watchCmd: Command[Action] =
46+
Command("watch", endpointsOpt, keyArg)
47+
.withHelp(HelpDoc.p("Watch key for updates"))
48+
.map { case (endpoints, key) => DoWatch(endpoints, key) }
49+
50+
private val root: Command[Action] = Command("kvstore").subcommands(setCmd, getCmd, watchCmd)
51+
52+
val cliApp: CliApp[Any, Any, Unit] =
53+
CliApp.make[Any, Any, Action, Unit](
54+
name = "kvstore",
55+
version = "0.1.0",
56+
summary = text("KVStore CLI"),
57+
command = root
58+
) {
59+
case DoSet(endpoints, key, value) =>
60+
val members = parseEndpoints(endpoints)
61+
(for
62+
client <- KVClient.make(members)
63+
_ <- client.run().forkScoped
64+
_ <- client.connect()
65+
_ <- client.set(key, value)
66+
_ <- Console.printLine("OK")
67+
yield ()).provideSomeLayer(zio.zmq.ZContext.live ++ Scope.default)
68+
69+
case DoGet(endpoints, key) =>
70+
val members = parseEndpoints(endpoints)
71+
(for
72+
client <- KVClient.make(members)
73+
_ <- client.run().forkScoped
74+
_ <- client.connect()
75+
result <- client.get(key)
76+
_ <- Console.printLine(s"${key} = ${result.getOrElse("<none>")}")
77+
yield ()).provideSomeLayer(zio.zmq.ZContext.live ++ Scope.default)
78+
79+
case DoWatch(endpoints, key) =>
80+
val members = parseEndpoints(endpoints)
81+
(for
82+
client <- KVClient.make(members)
83+
_ <- client.run().forkScoped
84+
_ <- client.connect()
85+
_ <- client.watch(key)
86+
_ <- Console.printLine(s"watching ${key} - press Ctrl+C to stop")
87+
_ <- client.notifications.map(n => s"${n.key} -> ${n.value}").foreach(Console.printLine(_))
88+
yield ()).provideSomeLayer(zio.zmq.ZContext.live ++ Scope.default)
89+
}
90+
end Main
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
package zio.kvstore.protocol
2+
3+
import scodec.Codec
4+
import scodec.codecs.{ascii, bool, optional, discriminated, fixedSizeBytes, utf8_32}
5+
6+
// Client-visible request types (distinct from internal Raft state machine commands)
7+
sealed trait KVClientRequest
8+
object KVClientRequest:
9+
final case class Set(key: String, value: String) extends KVClientRequest
10+
final case class Get(key: String) extends KVClientRequest
11+
final case class Watch(key: String) extends KVClientRequest
12+
13+
private val setCodec: Codec[Set] = (utf8_32 :: utf8_32).as[Set]
14+
private val getCodec: Codec[Get] = utf8_32.as[Get]
15+
private val watchCodec: Codec[Watch] = utf8_32.as[Watch]
16+
17+
given Codec[KVClientRequest] =
18+
discriminated[KVClientRequest].by(fixedSizeBytes(1, ascii))
19+
.typecase("S", setCodec)
20+
.typecase("G", getCodec)
21+
.typecase("W", watchCodec)
22+
23+
// Plain Scala response types for the client API
24+
object KVClientResponse:
25+
type SetDone = Unit
26+
type GetResult = Option[String]
27+
type WatchDone = Unit
28+
29+
given unitCodec: Codec[Unit] = scodec.codecs.provide(())
30+
31+
given optionStringCodec: Codec[Option[String]] =
32+
optional(bool(8), utf8_32)

kvstore/src/main/scala/zio/kvstore/App.scala

Lines changed: 0 additions & 9 deletions
This file was deleted.
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
package zio.kvstore
2+
3+
import zio.raft.server.RaftServer
4+
import zio.raft.server.RaftServer.RaftAction
5+
import zio.raft.protocol.SessionId
6+
import zio.raft.protocol.RequestId
7+
import zio.kvstore.KVServer.KVServerAction
8+
import scodec.bits.ByteVector
9+
import zio.kvstore.protocol.KVClientRequest
10+
import scodec.Codec
11+
import zio.raft.protocol.ClientResponse
12+
import zio.kvstore.protocol.KVServerRequest
13+
import zio.raft.MemberId
14+
import zio.raft.sessionstatemachine.SessionMetadata
15+
import zio.raft.protocol.ServerRequest
16+
import java.time.Instant
17+
18+
class KVServer(server: RaftServer):
19+
private def decodeClientRequest(bytes: ByteVector): KVClientRequest =
20+
summon[Codec[KVClientRequest]].decode(bytes.bits).require.value
21+
22+
def stream = server.raftActions.map {
23+
case RaftAction.CreateSession(sessionId, capabilities) => KVServerAction.CreateSession(sessionId, capabilities)
24+
case RaftAction.ClientRequest(sessionId, requestId, lowestPendingRequestId, payload) =>
25+
KVServerAction.ClientRequest(sessionId, requestId, lowestPendingRequestId, decodeClientRequest(payload))
26+
case RaftAction.ServerRequestAck(sessionId, requestId) => KVServerAction.ServerRequestAck(sessionId, requestId)
27+
case RaftAction.ExpireSession(sessionId) => KVServerAction.ExpireSession(sessionId)
28+
}
29+
30+
def reply[A](sessionId: SessionId, requestId: RequestId, response: A)(using Codec[A]) =
31+
val bytes = summon[Codec[A]].encode(response).require.bytes
32+
server.sendClientResponse(sessionId, ClientResponse(requestId, bytes))
33+
34+
def requestError(sessionId: SessionId, requestId: RequestId, reason: zio.raft.sessionstatemachine.RequestError) =
35+
val serverReason = reason match
36+
case zio.raft.sessionstatemachine.RequestError.ResponseEvicted =>
37+
zio.raft.protocol.RequestErrorReason.ResponseEvicted
38+
server.sendRequestError(sessionId, zio.raft.protocol.RequestError(requestId, serverReason))
39+
40+
def confirmSessionCreation(sessionId: SessionId) = server.confirmSessionCreation(sessionId)
41+
42+
def stepUp(sessions: Map[SessionId, SessionMetadata]) =
43+
server.stepUp(sessions.map((k, v) => (k, zio.raft.server.SessionMetadata(v.capabilities, v.createdAt))))
44+
45+
def stepDown(leaderId: Option[MemberId]) = server.stepDown(leaderId.map(id => zio.raft.protocol.MemberId(id.value)))
46+
47+
def leaderChanged(leaderId: MemberId) = server.leaderChanged(zio.raft.protocol.MemberId(leaderId.value))
48+
49+
def sendServerRequest(now: Instant, sessionId: SessionId, requestId: RequestId, request: KVServerRequest) =
50+
val payload = summon[Codec[KVServerRequest]].encode(request).require.bytes
51+
server.sendServerRequest(sessionId, ServerRequest(requestId, payload, now))
52+
53+
object KVServer:
54+
def make(serverBindAddress: String) =
55+
for
56+
server <- RaftServer.make(serverBindAddress)
57+
kvServer = new KVServer(server)
58+
yield kvServer
59+
60+
trait KVServerAction
61+
object KVServerAction:
62+
case class CreateSession(sessionId: SessionId, capabilities: Map[String, String]) extends KVServerAction
63+
case class ClientRequest(
64+
sessionId: SessionId,
65+
requestId: RequestId,
66+
lowestPendingRequestId: RequestId,
67+
request: KVClientRequest
68+
) extends KVServerAction
69+
case class ServerRequestAck(sessionId: SessionId, requestId: RequestId) extends KVServerAction
70+
case class ExpireSession(sessionId: SessionId) extends KVServerAction

0 commit comments

Comments
 (0)