diff --git a/build.sbt b/build.sbt index 1e6e0bc0..0ebbb7cf 100644 --- a/build.sbt +++ b/build.sbt @@ -62,25 +62,39 @@ scalaVersion := mainScalaVersion resolvers += "Sonatype OSS Snapshots" at "https://central.sonatype.com/repository/maven-snapshots/" -lazy val commonScalacOptions = Def.setting{ +lazy val commonScalacOptions = Def.setting { CrossVersion.partialVersion(scalaVersion.value) match { case Some((2, n)) => List( - "-Xsource:3.7-migration", - "-Ymacro-annotations", - "-Wunused:imports" + "-Xsource:3.7-migration", + "-Ymacro-annotations", + "-Wunused:imports" ) case Some((3, n)) => List( - "-Wunused:imports", - "-source:future", - "-deprecation", + "-Wunused:imports", + "-source:future", + "-deprecation" ) - case _ => List() + case _ => List() } } lazy val root = project .in(file(".")) - .aggregate(raft, kvstore, zio1zmq, zio2zmq, raftZmq, stores, ziolmdb, clientServerProtocol, clientServerServer, clientServerClient, sessionStateMachine) + .aggregate( + raft, + kvstore, + kvstoreCli, + kvstoreProtocol, + zio1zmq, + zio2zmq, + raftZmq, + stores, + ziolmdb, + clientServerProtocol, + clientServerServer, + clientServerClient, + sessionStateMachine + ) .settings( publish / skip := true, crossScalaVersions := Nil @@ -99,7 +113,7 @@ lazy val raft = project "dev.zio" %% "zio-test" % zio2Version % Test, "dev.zio" %% "zio-test-sbt" % zio2Version % Test, "dev.zio" %% "zio-nio" % "2.0.0", - "dev.zio" %% "zio-prelude" % zioPreludeVersion, + "dev.zio" %% "zio-prelude" % zioPreludeVersion ), excludeDependencies += "org.scala-lang.modules" % "scala-collection-compat_2.13" ) @@ -114,13 +128,15 @@ lazy val kvstore = project libraryDependencies ++= Seq( "dev.zio" %% "zio" % zio2Version, "dev.zio" %% "zio-prelude" % zioPreludeVersion, - "dev.zio" %% "zio-http" % "3.0.0-RC8", + "dev.zio" %% "zio-config" % "4.0.4", + "dev.zio" %% "zio-config-magnolia" % "4.0.4", + "dev.zio" %% "zio-logging" % "2.5.1", "dev.zio" %% "zio-test" % zio2Version % Test, "dev.zio" %% "zio-test-sbt" % zio2Version % Test ), excludeDependencies += "org.scala-lang.modules" % "scala-collection-compat_2.13" ) - .dependsOn(raft, raftZmq, stores, sessionStateMachine, clientServerProtocol) + .dependsOn(raft, raftZmq, stores, sessionStateMachine, clientServerProtocol, clientServerServer, kvstoreProtocol) lazy val raftZmq = project .in(file("raft-zmq")) @@ -190,7 +206,7 @@ lazy val ziolmdb = project "dev.zio" %% "zio-nio" % "2.0.0", "dev.zio" %% "zio-streams" % zio2Version, "dev.zio" %% "zio-test" % zio2Version % Test, - "dev.zio" %% "zio-test-sbt" % zio2Version % Test, + "dev.zio" %% "zio-test-sbt" % zio2Version % Test, "org.lmdbjava" % "lmdbjava" % "0.9.0" ) ) @@ -227,11 +243,11 @@ lazy val clientServerProtocol = project "dev.zio" %% "zio-test-sbt" % zio2Version % Test ) ++ (CrossVersion.partialVersion(scalaVersion.value) match { case Some((2, _)) => Seq( - "org.scodec" %% "scodec-core" % "1.11.10" - ) + "org.scodec" %% "scodec-core" % "1.11.10" + ) case Some((3, _)) => Seq( - "org.scodec" %% "scodec-core" % "2.3.2" - ) + "org.scodec" %% "scodec-core" % "2.3.2" + ) case _ => Seq.empty }), excludeDependencies += "org.scala-lang.modules" % "scala-collection-compat_2.13" @@ -297,4 +313,36 @@ lazy val sessionStateMachine = project ), excludeDependencies += "org.scala-lang.modules" % "scala-collection-compat_2.13" ) - .dependsOn(raft, clientServerProtocol) \ No newline at end of file + .dependsOn(raft, clientServerProtocol) + +lazy val kvstoreProtocol = project + .in(file("kvstore-protocol")) + .settings( + name := "kvstore-protocol", + publish / skip := true, + scalaVersion := mainScalaVersion, + scalacOptions ++= commonScalacOptions.value, + libraryDependencies ++= Seq( + "dev.zio" %% "zio" % zio2Version, + "org.scodec" %% "scodec-core" % "2.3.2", + "org.scodec" %% "scodec-bits" % "1.1.37" + ) + ) + +lazy val kvstoreCli = project + .in(file("kvstore-cli")) + .settings( + name := "kvstore-cli", + publish / skip := true, + scalaVersion := mainScalaVersion, + scalacOptions ++= commonScalacOptions.value, + libraryDependencies ++= Seq( + "dev.zio" %% "zio" % zio2Version, + "dev.zio" %% "zio-streams" % zio2Version, + "dev.zio" %% "zio-cli" % "0.7.3", + "dev.zio" %% "zio-test" % zio2Version % Test, + "dev.zio" %% "zio-test-sbt" % zio2Version % Test + ), + excludeDependencies += "org.scala-lang.modules" % "scala-collection-compat_2.13" + ) + .dependsOn(kvstore, clientServerClient, kvstoreProtocol) diff --git a/client-server-client/src/main/scala/zio/raft/client/RaftClient.scala b/client-server-client/src/main/scala/zio/raft/client/RaftClient.scala index f09fb383..df1172d9 100644 --- a/client-server-client/src/main/scala/zio/raft/client/RaftClient.scala +++ b/client-server-client/src/main/scala/zio/raft/client/RaftClient.scala @@ -82,7 +82,7 @@ object RaftClient { // Register finalizer to cleanly close session on scope exit _ <- ZIO.addFinalizer( - zmqTransport.sendMessage(CloseSession(CloseReason.ClientShutdown)).orDie + zmqTransport.sendMessage(CloseSession(CloseReason.ClientShutdown)).ignore ) } yield client diff --git a/client-server-server/src/main/scala/zio/raft/server/RaftServer.scala b/client-server-server/src/main/scala/zio/raft/server/RaftServer.scala index 8ce82f6e..9c46f8bc 100644 --- a/client-server-server/src/main/scala/zio/raft/server/RaftServer.scala +++ b/client-server-server/src/main/scala/zio/raft/server/RaftServer.scala @@ -49,6 +49,13 @@ final class RaftServer( def stepDown(leaderId: Option[MemberId]): UIO[Unit] = actionQueue.offer(RaftServer.ServerAction.StepDown(leaderId)).unit + /** Notify server that this node has changed leader. + * @param leaderId + * The new leader ID + */ + def leaderChanged(leaderId: MemberId): UIO[Unit] = + actionQueue.offer(RaftServer.ServerAction.LeaderChanged(leaderId)).unit + object RaftServer: /** Actions initiated by the server (internal or from Raft). @@ -62,6 +69,7 @@ object RaftServer: case class SessionCreationConfirmed(sessionId: SessionId) extends ServerAction case class StepUp(sessions: Map[SessionId, SessionMetadata]) extends ServerAction case class StepDown(leaderId: Option[MemberId]) extends ServerAction + case class LeaderChanged(leaderId: MemberId) extends ServerAction /** Actions to forward to Raft state machine. */ @@ -183,6 +191,9 @@ object RaftServer: case StreamEvent.Action(ServerAction.StepDown(_)) => ZIO.succeed(this) + case StreamEvent.Action(ServerAction.LeaderChanged(leaderId)) => + ZIO.succeed(this.copy(leaderId = Some(leaderId))) + case StreamEvent.Action(ServerAction.SendResponse(_, _)) => ZIO.logWarning("Cannot send response - not leader").as(this) @@ -262,6 +273,9 @@ object RaftServer: _ <- sessions.stepDown(transport, leaderId) yield Follower(leaderId) + case StreamEvent.Action(ServerAction.LeaderChanged(leaderId)) => + ZIO.logWarning(s"We received LeaderChanged event while in Leader state, this should not happen").as(this) + case StreamEvent.Action(ServerAction.SendResponse(sessionId, response)) => sessions.getRoutingId(sessionId) match case Some(routingId) => diff --git a/kvstore-cli/src/main/scala/zio/kvstore/cli/KVClient.scala b/kvstore-cli/src/main/scala/zio/kvstore/cli/KVClient.scala new file mode 100644 index 00000000..22673791 --- /dev/null +++ b/kvstore-cli/src/main/scala/zio/kvstore/cli/KVClient.scala @@ -0,0 +1,43 @@ +package zio.kvstore.cli + +import zio.* +import zio.stream.* +import zio.raft.client.RaftClient +import zio.raft.protocol.MemberId +import scodec.Codec +import scodec.bits.ByteVector +import zio.kvstore.protocol.{KVClientRequest, KVClientResponse, KVServerRequest} +import zio.kvstore.protocol.KVClientRequest.given +import zio.kvstore.protocol.KVClientResponse.given +import zio.kvstore.protocol.KVServerRequest.given + +final class KVClient private (private val raft: RaftClient): + + def run(): ZIO[Scope, Throwable, Unit] = raft.run().unit + + def connect(): ZIO[Any, Throwable, Unit] = raft.connect() + + def set(key: String, value: String): ZIO[Any, Throwable, Unit] = + val payload: ByteVector = summon[Codec[KVClientRequest]].encode(KVClientRequest.Set(key, value)).require.bytes + raft.submitCommand(payload).map(bytes => summon[Codec[Unit]].decode(bytes.bits).require.value) + + def get(key: String): ZIO[Any, Throwable, Option[String]] = + val payload: ByteVector = summon[Codec[KVClientRequest]].encode(KVClientRequest.Get(key)).require.bytes + raft.submitCommand(payload).map(bytes => summon[Codec[Option[String]]].decode(bytes.bits).require.value) + + // Registers the watch on the server; notifications are emitted via `notifications` stream + def watch(key: String): ZIO[Any, Throwable, Unit] = + val payload: ByteVector = summon[Codec[KVClientRequest]].encode(KVClientRequest.Watch(key)).require.bytes + raft.submitCommand(payload).map(bytes => summon[Codec[Unit]].decode(bytes.bits).require.value) + + // Stream of server-side watch notifications + val notifications: ZStream[Any, Nothing, KVServerRequest.Notification] = + raft.serverRequests.map { env => + summon[Codec[KVServerRequest]].decode(env.payload.bits).require.value + }.collect { case n: KVServerRequest.Notification => n } + +object KVClient: + private val defaultCapabilities: Map[String, String] = Map("app" -> "kvstore-cli") + + def make(endpoints: Map[MemberId, String]): ZIO[zio.zmq.ZContext & Scope, Throwable, KVClient] = + RaftClient.make(endpoints, defaultCapabilities).map(new KVClient(_)) diff --git a/kvstore-cli/src/main/scala/zio/kvstore/cli/Main.scala b/kvstore-cli/src/main/scala/zio/kvstore/cli/Main.scala new file mode 100644 index 00000000..d59481ce --- /dev/null +++ b/kvstore-cli/src/main/scala/zio/kvstore/cli/Main.scala @@ -0,0 +1,90 @@ +package zio.kvstore.cli + +import zio.* +import zio.cli.* +import zio.cli.HelpDoc.Span.text +import zio.raft.protocol.MemberId + +object Main extends ZIOCliDefault: + + private def parseEndpoints(endpoints: String): Map[MemberId, String] = + endpoints + .split(",") + .iterator + .map(_.trim) + .filter(_.nonEmpty) + .map { kv => + val Array(id, addr) = kv.split("=", 2) + MemberId(id) -> addr + } + .toMap + + // Model for CLI + sealed trait Action + final case class DoSet(endpoints: String, key: String, value: String) extends Action + final case class DoGet(endpoints: String, key: String) extends Action + final case class DoWatch(endpoints: String, key: String) extends Action + + // Shared options/args + private val endpointsOpt: Options[String] = + Options.text("endpoints").alias("e").withDefault("node-1=tcp://127.0.0.1:7001") + private val keyArg: Args[String] = Args.text("key") + private val valueArg: Args[String] = Args.text("value") + + // Commands + private val setCmd: Command[Action] = + Command("set", endpointsOpt, keyArg ++ valueArg) + .withHelp(HelpDoc.p("Set key to value")) + .map { case (endpoints, (key, value)) => DoSet(endpoints, key, value) } + + private val getCmd: Command[Action] = + Command("get", endpointsOpt, keyArg) + .withHelp(HelpDoc.p("Get key")) + .map { case (endpoints, key) => DoGet(endpoints, key) } + + private val watchCmd: Command[Action] = + Command("watch", endpointsOpt, keyArg) + .withHelp(HelpDoc.p("Watch key for updates")) + .map { case (endpoints, key) => DoWatch(endpoints, key) } + + private val root: Command[Action] = Command("kvstore").subcommands(setCmd, getCmd, watchCmd) + + val cliApp: CliApp[Any, Any, Unit] = + CliApp.make[Any, Any, Action, Unit]( + name = "kvstore", + version = "0.1.0", + summary = text("KVStore CLI"), + command = root + ) { + case DoSet(endpoints, key, value) => + val members = parseEndpoints(endpoints) + (for + client <- KVClient.make(members) + _ <- client.run().forkScoped + _ <- client.connect() + _ <- client.set(key, value) + _ <- Console.printLine("OK") + yield ()).provideSomeLayer(zio.zmq.ZContext.live ++ Scope.default) + + case DoGet(endpoints, key) => + val members = parseEndpoints(endpoints) + (for + client <- KVClient.make(members) + _ <- client.run().forkScoped + _ <- client.connect() + result <- client.get(key) + _ <- Console.printLine(s"${key} = ${result.getOrElse("")}") + yield ()).provideSomeLayer(zio.zmq.ZContext.live ++ Scope.default) + + case DoWatch(endpoints, key) => + val members = parseEndpoints(endpoints) + (for + client <- KVClient.make(members) + _ <- client.run().forkScoped + _ <- client.connect() + _ <- client.watch(key) + _ <- Console.printLine(s"watching ${key} - press Ctrl+C to stop") + _ <- client.notifications.map(n => s"${n.key} -> ${n.value}").foreach(Console.printLine(_)) + yield ()).provideSomeLayer(zio.zmq.ZContext.live ++ Scope.default) + } +end Main diff --git a/kvstore-protocol/src/main/scala/zio/kvstore/protocol/ClientApi.scala b/kvstore-protocol/src/main/scala/zio/kvstore/protocol/ClientApi.scala new file mode 100644 index 00000000..14374b22 --- /dev/null +++ b/kvstore-protocol/src/main/scala/zio/kvstore/protocol/ClientApi.scala @@ -0,0 +1,32 @@ +package zio.kvstore.protocol + +import scodec.Codec +import scodec.codecs.{ascii, bool, optional, discriminated, fixedSizeBytes, utf8_32} + +// Client-visible request types (distinct from internal Raft state machine commands) +sealed trait KVClientRequest +object KVClientRequest: + final case class Set(key: String, value: String) extends KVClientRequest + final case class Get(key: String) extends KVClientRequest + final case class Watch(key: String) extends KVClientRequest + + private val setCodec: Codec[Set] = (utf8_32 :: utf8_32).as[Set] + private val getCodec: Codec[Get] = utf8_32.as[Get] + private val watchCodec: Codec[Watch] = utf8_32.as[Watch] + + given Codec[KVClientRequest] = + discriminated[KVClientRequest].by(fixedSizeBytes(1, ascii)) + .typecase("S", setCodec) + .typecase("G", getCodec) + .typecase("W", watchCodec) + +// Plain Scala response types for the client API +object KVClientResponse: + type SetDone = Unit + type GetResult = Option[String] + type WatchDone = Unit + + given unitCodec: Codec[Unit] = scodec.codecs.provide(()) + + given optionStringCodec: Codec[Option[String]] = + optional(bool(8), utf8_32) diff --git a/kvstore-protocol/src/main/scala/zio/kvstore/protocol/KVServerRequest.scala b/kvstore-protocol/src/main/scala/zio/kvstore/protocol/KVServerRequest.scala new file mode 100644 index 00000000..ed1cfabf --- /dev/null +++ b/kvstore-protocol/src/main/scala/zio/kvstore/protocol/KVServerRequest.scala @@ -0,0 +1,15 @@ +package zio.kvstore.protocol + +import scodec.Codec +import scodec.codecs.{ascii, discriminated, fixedSizeBytes, utf8_32} + +sealed trait KVServerRequest +object KVServerRequest: + final case class Notification(key: String, value: String) extends KVServerRequest + + private val notificationCodec: Codec[Notification] = (utf8_32 :: utf8_32).as[Notification] + + given Codec[KVServerRequest] = + discriminated[KVServerRequest] + .by(fixedSizeBytes(1, ascii)) + .typecase("N", notificationCodec) diff --git a/kvstore/src/main/scala/zio/kvstore/App.scala b/kvstore/src/main/scala/zio/kvstore/App.scala deleted file mode 100644 index e2b46571..00000000 --- a/kvstore/src/main/scala/zio/kvstore/App.scala +++ /dev/null @@ -1,133 +0,0 @@ -package zio.kvstore - -import zio.raft.{Command, Index, HMap} -import zio.raft.sessionstatemachine.{SessionStateMachine, ScodecSerialization, ServerRequestForSession, StateWriter} -import zio.raft.sessionstatemachine.asResponseType // Extension method -import zio.raft.protocol.SessionId -import zio.{ZIO} -import zio.prelude.Newtype - -import scodec.Codec -import scodec.codecs.{ascii, discriminated, fixedSizeBytes, utf8_32} -import java.time.Instant - -// Simple KV store - no sessions, no server requests -// We just use the session framework for the template pattern and HMap - -sealed trait KVCommand extends Command - -object KVCommand: - case class Set(key: String, value: String) extends KVCommand: - type Response = SetDone - - case class Get(key: String) extends KVCommand: - type Response = GetResult - - // Codec for KVCommand (needed by ZmqRpc) - val getCodec = utf8_32.as[Get] - val setCodec = (utf8_32 :: utf8_32).as[Set] - given commandCodec: Codec[KVCommand] = discriminated[KVCommand] - .by(fixedSizeBytes(1, ascii)) - .typecase("S", setCodec) - .typecase("G", getCodec) - -// Response marker type - encompasses all KV command responses -sealed trait KVResponse -case class SetDone() extends KVResponse -case class GetResult(value: Option[String]) extends KVResponse - -// KV Schema - single prefix for key-value data -object KVKey extends Newtype[String] -type KVKey = KVKey.Type -given HMap.KeyLike[KVKey] = HMap.KeyLike.forNewtype(KVKey) - -type KVSchema = ("kv", KVKey, String) *: EmptyTuple -type KVCompleteSchema = Tuple.Concat[zio.raft.sessionstatemachine.SessionSchema[KVResponse, KVServerRequest], KVSchema] - -// Dummy server request type (KV store doesn't use server requests) -case class NoServerRequest() - -type KVServerRequest = NoServerRequest - -// KV store with scodec serialization mixin -class KVStateMachine extends SessionStateMachine[KVCommand, KVResponse, KVServerRequest, KVSchema] - with ScodecSerialization[KVResponse, KVServerRequest, KVSchema]: - - // Import provided codecs from Codecs object - import zio.raft.sessionstatemachine.Codecs.{sessionMetadataCodec, requestIdCodec, pendingServerRequestCodec} - - // Provide codecs for response types - given Codec[SetDone] = scodec.codecs.provide(SetDone()) - given Codec[GetResult] = scodec.codecs.optional(scodec.codecs.bool, utf8_32).xmap( - opt => GetResult(opt), - gr => gr.value - ) - - // Provide codec for response marker type - given Codec[KVResponse] = discriminated[KVResponse] - .by(fixedSizeBytes(1, ascii)) - .typecase("S", summon[Codec[SetDone]]) - .typecase("G", summon[Codec[GetResult]]) - - // Provide codec for our value types - given Codec[String] = utf8_32 - - // Dummy codec for NoServerRequest - given Codec[NoServerRequest] = scodec.codecs.provide(NoServerRequest()) - - val codecs = summon[HMap.TypeclassMap[Schema, Codec]] - - protected def applyCommand( - createdAt: Instant, - sessionId: SessionId, - command: KVCommand - ): StateWriter[HMap[KVCompleteSchema], ServerRequestForSession[KVServerRequest], command.Response & KVResponse] = - command match - case set @ KVCommand.Set(key, value) => - for - state <- StateWriter.get[HMap[KVCompleteSchema]] - newState = state.updated["kv"](KVKey(key), value) - _ <- StateWriter.set(newState) - yield SetDone().asResponseType(command, set) - - case get @ KVCommand.Get(key) => - for - state <- StateWriter.get[HMap[KVCompleteSchema]] - result = state.get["kv"](KVKey(key)) - yield GetResult(result).asResponseType(command, get) - - protected def handleSessionCreated( - createdAt: Instant, - sessionId: SessionId, - capabilities: Map[String, String] - ): StateWriter[HMap[KVCompleteSchema], ServerRequestForSession[KVServerRequest], Unit] = - StateWriter.succeed(()) - - protected def handleSessionExpired( - createdAt: Instant, - sessionId: SessionId, - capabilities: Map[String, String] - ): StateWriter[HMap[KVCompleteSchema], ServerRequestForSession[KVServerRequest], Unit] = - StateWriter.succeed(()) - - // takeSnapshot and restoreFromSnapshot are now provided by SessionStateMachine base class! - // They use the TypeclassMap[Schema, Codec] passed in constructor - - override def shouldTakeSnapshot(lastSnapshotIndex: Index, lastSnapshotSize: Long, commitIndex: Index): Boolean = - false // Disable snapshots for now -end KVStateMachine - -// HttpServer commented out for now - SessionCommand type system complexity -// The KVStateMachine implementation demonstrates: -// - SessionStateMachine extension -// - HMap with typed schema -// - Scodec-based serialization -// - TypeclassMap usage - -// TODO: Implement proper HTTP server with session management - -object KVStoreApp extends zio.ZIOAppDefault: - override def run = - // Simplified demo - just show that KVStateMachine compiles - // Full integration would require session management setup - ZIO.succeed(println("KVStore with SessionStateMachine - compilation successful")).exitCode diff --git a/kvstore/src/main/scala/zio/kvstore/Codecs.scala b/kvstore/src/main/scala/zio/kvstore/Codecs.scala new file mode 100644 index 00000000..ee26df7c --- /dev/null +++ b/kvstore/src/main/scala/zio/kvstore/Codecs.scala @@ -0,0 +1,39 @@ +package zio.kvstore + +import scodec.Codec +import scodec.codecs.{ascii, discriminated, fixedSizeBytes, utf8_32} + +object Codecs: + // Response codecs + given Codec[KVResponse.SetDone.type] = scodec.codecs.provide(KVResponse.SetDone) + given Codec[KVResponse.WatchDone.type] = scodec.codecs.provide(KVResponse.WatchDone) + given Codec[KVResponse.GetResult] = scodec.codecs.optional(scodec.codecs.bool, utf8_32).xmap( + opt => KVResponse.GetResult(opt), + gr => gr.value + ) + + given Codec[KVResponse] = + discriminated[KVResponse] + .by(fixedSizeBytes(1, ascii)) + .typecase("S", summon[Codec[KVResponse.SetDone.type]]) + .typecase("W", summon[Codec[KVResponse.WatchDone.type]]) + .typecase("G", summon[Codec[KVResponse.GetResult]]) + + // Value codec for KV schema values + given Codec[String] = utf8_32 + private val sessionIdCodec = zio.raft.protocol.Codecs.sessionIdCodec + given sessionIdSetCodec: Codec[Set[zio.raft.protocol.SessionId]] = + scodec.codecs.listOfN(scodec.codecs.uint16, sessionIdCodec).xmap(_.toSet, _.toList) + given kvKeySetCodec: Codec[Set[KVKey]] = + scodec.codecs.listOfN(scodec.codecs.uint16, utf8_32).xmap(_.map(KVKey(_)).toSet, _.toList.map(KVKey.unwrap)) + + // Command codecs + private val getCodec: Codec[KVCommand.Get] = utf8_32.as[KVCommand.Get] + private val setCodec: Codec[KVCommand.Set] = (utf8_32 :: utf8_32).as[KVCommand.Set] + private val watchCodec: Codec[KVCommand.Watch] = utf8_32.as[KVCommand.Watch] + given Codec[KVCommand] = + discriminated[KVCommand] + .by(fixedSizeBytes(1, ascii)) + .typecase("S", setCodec) + .typecase("G", getCodec) + .typecase("W", watchCodec) diff --git a/kvstore/src/main/scala/zio/kvstore/KVServer.scala b/kvstore/src/main/scala/zio/kvstore/KVServer.scala new file mode 100644 index 00000000..fe52bd5c --- /dev/null +++ b/kvstore/src/main/scala/zio/kvstore/KVServer.scala @@ -0,0 +1,70 @@ +package zio.kvstore + +import zio.raft.server.RaftServer +import zio.raft.server.RaftServer.RaftAction +import zio.raft.protocol.SessionId +import zio.raft.protocol.RequestId +import zio.kvstore.KVServer.KVServerAction +import scodec.bits.ByteVector +import zio.kvstore.protocol.KVClientRequest +import scodec.Codec +import zio.raft.protocol.ClientResponse +import zio.kvstore.protocol.KVServerRequest +import zio.raft.MemberId +import zio.raft.sessionstatemachine.SessionMetadata +import zio.raft.protocol.ServerRequest +import java.time.Instant + +class KVServer(server: RaftServer): + private def decodeClientRequest(bytes: ByteVector): KVClientRequest = + summon[Codec[KVClientRequest]].decode(bytes.bits).require.value + + def stream = server.raftActions.map { + case RaftAction.CreateSession(sessionId, capabilities) => KVServerAction.CreateSession(sessionId, capabilities) + case RaftAction.ClientRequest(sessionId, requestId, lowestPendingRequestId, payload) => + KVServerAction.ClientRequest(sessionId, requestId, lowestPendingRequestId, decodeClientRequest(payload)) + case RaftAction.ServerRequestAck(sessionId, requestId) => KVServerAction.ServerRequestAck(sessionId, requestId) + case RaftAction.ExpireSession(sessionId) => KVServerAction.ExpireSession(sessionId) + } + + def reply[A](sessionId: SessionId, requestId: RequestId, response: A)(using Codec[A]) = + val bytes = summon[Codec[A]].encode(response).require.bytes + server.sendClientResponse(sessionId, ClientResponse(requestId, bytes)) + + def requestError(sessionId: SessionId, requestId: RequestId, reason: zio.raft.sessionstatemachine.RequestError) = + val serverReason = reason match + case zio.raft.sessionstatemachine.RequestError.ResponseEvicted => + zio.raft.protocol.RequestErrorReason.ResponseEvicted + server.sendRequestError(sessionId, zio.raft.protocol.RequestError(requestId, serverReason)) + + def confirmSessionCreation(sessionId: SessionId) = server.confirmSessionCreation(sessionId) + + def stepUp(sessions: Map[SessionId, SessionMetadata]) = + server.stepUp(sessions.map((k, v) => (k, zio.raft.server.SessionMetadata(v.capabilities, v.createdAt)))) + + def stepDown(leaderId: Option[MemberId]) = server.stepDown(leaderId.map(id => zio.raft.protocol.MemberId(id.value))) + + def leaderChanged(leaderId: MemberId) = server.leaderChanged(zio.raft.protocol.MemberId(leaderId.value)) + + def sendServerRequest(now: Instant, sessionId: SessionId, requestId: RequestId, request: KVServerRequest) = + val payload = summon[Codec[KVServerRequest]].encode(request).require.bytes + server.sendServerRequest(sessionId, ServerRequest(requestId, payload, now)) + +object KVServer: + def make(serverBindAddress: String) = + for + server <- RaftServer.make(serverBindAddress) + kvServer = new KVServer(server) + yield kvServer + + trait KVServerAction + object KVServerAction: + case class CreateSession(sessionId: SessionId, capabilities: Map[String, String]) extends KVServerAction + case class ClientRequest( + sessionId: SessionId, + requestId: RequestId, + lowestPendingRequestId: RequestId, + request: KVClientRequest + ) extends KVServerAction + case class ServerRequestAck(sessionId: SessionId, requestId: RequestId) extends KVServerAction + case class ExpireSession(sessionId: SessionId) extends KVServerAction diff --git a/kvstore/src/main/scala/zio/kvstore/KVStateMachine.scala b/kvstore/src/main/scala/zio/kvstore/KVStateMachine.scala new file mode 100644 index 00000000..38f717a5 --- /dev/null +++ b/kvstore/src/main/scala/zio/kvstore/KVStateMachine.scala @@ -0,0 +1,117 @@ +package zio.kvstore + +import zio.raft.{Index, HMap} +import zio.raft.sessionstatemachine.{SessionStateMachine, ScodecSerialization, ServerRequestForSession, StateWriter} +import zio.raft.sessionstatemachine.asResponseType +import zio.raft.protocol.SessionId +import zio.kvstore.protocol.KVServerRequest +import scodec.Codec +import java.time.Instant +import zio.kvstore.Codecs.given +import zio.raft.sessionstatemachine.given +import zio.raft.sessionstatemachine.Codecs.{sessionMetadataCodec, requestIdCodec, pendingServerRequestCodec} + +class KVStateMachine extends SessionStateMachine[KVCommand, KVResponse, zio.kvstore.protocol.KVServerRequest, KVSchema] + with ScodecSerialization[KVResponse, zio.kvstore.protocol.KVServerRequest, KVSchema]: + + // Local alias to aid match-type reduction bug in scala 3.3 + type Schema = Tuple.Concat[zio.raft.sessionstatemachine.SessionSchema[KVResponse, KVServerRequest], KVSchema] + private type SW[A] = StateWriter[HMap[Schema], ServerRequestForSession[zio.kvstore.protocol.KVServerRequest], A] + + // Helpers + private def putValue(key: KVKey, value: String): SW[Unit] = + StateWriter.update[HMap[Schema], HMap[Schema]](s => s.updated["kv"](key, value)) + + private def getValue(key: KVKey): SW[Option[String]] = + StateWriter.get[HMap[Schema]].map { s => + s.get["kv"](key) + } + + private def subscribersFor(key: KVKey): SW[Set[zio.raft.protocol.SessionId]] = + StateWriter.get[HMap[Schema]].map { s => + s.get["subsByKey"](key).getOrElse(Set.empty) + } + + private def addSubscription(sessionId: zio.raft.protocol.SessionId, key: KVKey): SW[Unit] = + StateWriter.update[HMap[Schema], HMap[Schema]] { s => + val subs = s.get["subsByKey"](key).getOrElse(Set.empty) + val s1 = s.updated["subsByKey"](key, subs + sessionId) + val keys = s1.get["subsBySession"](sessionId).getOrElse(Set.empty) + s1.updated["subsBySession"](sessionId, keys + key) + } + + private def removeAllSubscriptions(sessionId: zio.raft.protocol.SessionId): SW[Unit] = + StateWriter.update[HMap[Schema], HMap[Schema]] { s => + val keys = s.get["subsBySession"](sessionId).getOrElse(Set.empty) + val cleared = keys.foldLeft(s) { (acc, k) => + val set = acc.get["subsByKey"](k).getOrElse(Set.empty) - sessionId + acc.updated["subsByKey"](k, set) + } + cleared.removed["subsBySession"](sessionId) + } + + // Typeclass map for Schema → Codec + val codecs = summon[HMap.TypeclassMap[Schema, Codec]] + + protected def applyCommand( + createdAt: Instant, + sessionId: SessionId, + command: KVCommand + ): SW[command.Response & KVResponse] = + command match + case set @ KVCommand.Set(key, value) => + val k = KVKey(key) + for + _ <- putValue(k, value) + sessions <- subscribersFor(k) + _ <- StateWriter.foreach(sessions) { sid => + StateWriter.log( + ServerRequestForSession[ + KVServerRequest + ]( + sid, + KVServerRequest.Notification(key, value) + ) + ) + } + yield KVResponse.SetDone.asResponseType(command, set) + + case get @ KVCommand.Get(key) => + getValue(KVKey(key)).map(result => KVResponse.GetResult(result).asResponseType(command, get)) + + case watch @ KVCommand.Watch(key) => + val k = KVKey(key) + for + _ <- addSubscription(sessionId, k) + current <- getValue(k) + _ <- current match + case Some(v) => + StateWriter.log( + ServerRequestForSession[ + KVServerRequest + ]( + sessionId, + KVServerRequest.Notification(key, v) + ) + ) + case None => StateWriter.unit + yield KVResponse.WatchDone.asResponseType(command, watch) + + protected def handleSessionCreated( + createdAt: Instant, + sessionId: SessionId, + capabilities: Map[String, String] + ): SW[Unit] = + StateWriter.unit + + protected def handleSessionExpired( + createdAt: Instant, + sessionId: SessionId, + capabilities: Map[String, String] + ): SW[Unit] = + removeAllSubscriptions(sessionId) + + override def shouldTakeSnapshot(lastSnapshotIndex: Index, lastSnapshotSize: Long, commitIndex: Index): Boolean = + (commitIndex.value - lastSnapshotIndex.value) >= 100 + +end KVStateMachine diff --git a/kvstore/src/main/scala/zio/kvstore/KVStoreServerApp.scala b/kvstore/src/main/scala/zio/kvstore/KVStoreServerApp.scala new file mode 100644 index 00000000..1473b7ef --- /dev/null +++ b/kvstore/src/main/scala/zio/kvstore/KVStoreServerApp.scala @@ -0,0 +1,105 @@ +package zio.kvstore + +import zio.* +import zio.raft.{MemberId as CoreMemberId} +import zio.kvstore.node.Node +import zio.lmdb.Environment as LmdbEnv +import zio.config.magnolia.* +import zio.logging.{consoleLogger, ConsoleLoggerConfig, LogFilter} +import java.io.File +import java.net.URI + +final case class ServerAppConfig( + serverPort: Int = 7001, + memberId: String = "node-1", + members: String = "node-1=tcp://127.0.0.1:7002", + logDir: String = "data/log", + snapshotDir: String = "data/snapshots", + lmdbDirectory: String = "data/lmdb" +) + +object ServerAppConfig: + val config: Config[ServerAppConfig] = + deriveConfig[ServerAppConfig] + +object KVStoreServerApp extends ZIOAppDefault: + + private def parseEndpoints(endpoints: String): Map[CoreMemberId, String] = + endpoints + .split(",") + .iterator + .map(_.trim) + .filter(_.nonEmpty) + .map { kv => + val Array(id, addr) = kv.split("=", 2) + CoreMemberId(id) -> addr + } + .toMap + + private def rewriteHostToLocalhost(address: String): String = + val uri = URI.create(address) + new URI( + uri.getScheme, + uri.getUserInfo, + "localhost", + uri.getPort, + uri.getPath, + uri.getQuery, + uri.getFragment + ).toString + + val logConfig = ConsoleLoggerConfig.default.copy(filter = LogFilter.LogLevelByNameConfig(zio.LogLevel.Debug)) + + override val bootstrap: ZLayer[ZIOAppArgs, Any, Any] = + ZLayer.fromZIO(ZIO.service[ZIOAppArgs].map(args => Runtime.setConfigProvider(ConfigProvider.fromAppArgs(args)))) ++ + zio.Runtime.removeDefaultLoggers >>> consoleLogger(logConfig) + + override def run = + for + config <- ZIO.config(ServerAppConfig.config) + + _ <- ZIO.logInfo(s"Config: $config") + + // Ensure the lmdb directory exists + _ <- ZIO.attemptBlockingIO { + val dir = new File(config.lmdbDirectory) + if !dir.exists() then dir.mkdirs() + } + + // Ensure the log directory exists + _ <- ZIO.attemptBlockingIO { + val dir = new File(config.logDir) + if !dir.exists() then dir.mkdirs() + } + + // Ensure the snapshot directory exists + _ <- ZIO.attemptBlockingIO { + val dir = new File(config.snapshotDir) + if !dir.exists() then dir.mkdirs() + } + + program = + for + endpoints <- ZIO.succeed(parseEndpoints(config.members)) + selfId = CoreMemberId(config.memberId) + nodeAddr <- ZIO.fromOption(endpoints.get(selfId)).orElseFail(new IllegalArgumentException( + "self member must be included in members" + )) + localNodeAddr = rewriteHostToLocalhost(nodeAddr) + peersMap = endpoints - selfId + + node <- Node.make( + serverAddress = s"tcp://localhost:${config.serverPort}", + nodeAddress = localNodeAddr, + logDirectory = config.logDir, + snapshotDirectory = config.snapshotDir, + memberId = selfId, + peers = peersMap + ).debug("Node.make") + _ <- node.run + yield () + _ <- program.provideSomeLayer( + (LmdbEnv.builder.withMaxDbs(3).layer(new File(config.lmdbDirectory)) ++ zio.zmq.ZContext.live) + ) + yield () +end KVStoreServerApp diff --git a/kvstore/src/main/scala/zio/kvstore/node/Node.scala b/kvstore/src/main/scala/zio/kvstore/node/Node.scala new file mode 100644 index 00000000..c5b93c85 --- /dev/null +++ b/kvstore/src/main/scala/zio/kvstore/node/Node.scala @@ -0,0 +1,240 @@ +package zio.kvstore.node + +import zio.* +import zio.stream.* +import zio.kvstore.* +import zio.kvstore.protocol.KVServerRequest +import zio.kvstore.protocol.KVClientRequest +import zio.kvstore.protocol.KVClientResponse.given +import zio.raft.Raft +import zio.raft.protocol.* +import zio.raft.sessionstatemachine.{SessionCommand, SessionStateMachine} +import zio.raft.sessionstatemachine.Codecs.given +import zio.kvstore.KVServer.KVServerAction +import java.time.Instant +import zio.kvstore.Codecs.given +import zio.kvstore.node.Node.NodeAction +import zio.kvstore.node.Node.NodeAction.* +import zio.raft.Peers +import zio.raft.stores.LmdbStable +import zio.raft.stores.segmentedlog.SegmentedLog +import zio.raft.stores.FileSnapshotStore +import zio.raft.zmq.ZmqRpc + +/** Node wiring KVServer, Raft core, and KV state machine. */ +final case class Node( + kvServer: KVServer, + raft: zio.raft.Raft[ + zio.raft.HMap[Tuple.Concat[zio.raft.sessionstatemachine.SessionSchema[KVResponse, KVServerRequest], KVSchema]], + SessionCommand[KVCommand, KVServerRequest] + ] +): + + private def dispatchServerRequests( + now: Instant, + envelopes: List[zio.raft.sessionstatemachine.ServerRequestEnvelope[KVServerRequest]] + ): UIO[Unit] = + ZIO.foreachDiscard(envelopes) { env => + kvServer.sendServerRequest(now, env.sessionId, env.requestId, env.payload) + } + + private def handleAction(action: NodeAction): UIO[Unit] = + action match + // Process KVServer actions → SessionCommand → raft → publish via KVServer + case NodeAction.FromServer(KVServerAction.CreateSession(sessionId, capabilities)) => + for + now <- Clock.instant + cmd = SessionCommand.CreateSession[KVServerRequest](now, sessionId, capabilities) + res <- raft.sendCommand(cmd).either + _ <- res match + case Right(envelopes) => + for + // It's important to confirm session creation before sending server requests + // As the session might be one of the recipients of the server requests + _ <- kvServer.confirmSessionCreation(sessionId) + _ <- dispatchServerRequests(now, envelopes) + yield () + case Left(_) => ZIO.unit + yield () + + case NodeAction.FromServer(KVServerAction.ClientRequest( + sessionId, + requestId, + lowestPendingRequestId, + clientReq + )) => + clientReq match + case KVClientRequest.Set(k, v) => + val cmd = KVCommand.Set(k, v) + for + maybe <- applyCommand(sessionId, requestId, lowestPendingRequestId, cmd) + _ <- maybe match + case Some(_) => kvServer.reply(sessionId, requestId, ()) + case _ => ZIO.unit + yield () + + case KVClientRequest.Get(k) => + val cmd = KVCommand.Get(k) + for + maybe <- applyCommand(sessionId, requestId, lowestPendingRequestId, cmd) + _ <- maybe match + case Some(result) => kvServer.reply(sessionId, requestId, result.value) + case _ => ZIO.unit + yield () + + case KVClientRequest.Watch(k) => + val cmd = KVCommand.Watch(k) + for + maybe <- applyCommand(sessionId, requestId, lowestPendingRequestId, cmd) + _ <- maybe match + case Some(_) => kvServer.reply(sessionId, requestId, ()) + case _ => ZIO.unit + yield () + + case NodeAction.FromServer(KVServerAction.ServerRequestAck(sessionId, requestId)) => + for + now <- Clock.instant + cmd = SessionCommand.ServerRequestAck[KVServerRequest](now, sessionId, requestId) + _ <- raft.sendCommand(cmd).ignore + yield () + + case NodeAction.FromServer(KVServerAction.ExpireSession(sessionId)) => + for + now <- Clock.instant + cmd = SessionCommand.SessionExpired[KVServerRequest](now, sessionId) + res <- raft.sendCommand(cmd).either + _ <- res match + case Right(envelopes) => dispatchServerRequests(now, envelopes) + case Left(_) => ZIO.unit + yield () + + // Retry stream every 10s - dirty read + command to bump lastSentAt + case NodeAction.RetryTick(now) => + val lastSentBefore = now.minusSeconds(10) + for + state <- raft.readStateDirty + hasPending = + SessionStateMachine.hasPendingRequests[KVResponse, KVServerRequest, KVSchema](state, lastSentBefore) + _ <- + if hasPending then + val cmd = SessionCommand.GetRequestsForRetry[KVServerRequest](now, lastSentBefore) + raft.sendCommand(cmd).either.flatMap { + case Right(envelopes) => dispatchServerRequests(now, envelopes) + case Left(_) => + // node is always running the job, regardless if leader or not + ZIO.unit + } + else ZIO.unit + yield () + + // State notifications mapping to server leadership signals + case NodeAction.StateNotificationReceived(notification) => + notification match + case zio.raft.StateNotification.SteppedUp => + ZIO.logInfo("Node stepped up") *> + raft.readState.either.flatMap { + case Right(state) => + val sessions = SessionStateMachine.getSessions[KVResponse, KVServerRequest, KVSchema](state).map { + case (sessionId: SessionId, metadata) => + ( + sessionId, + zio.raft.sessionstatemachine.SessionMetadata(metadata.capabilities, metadata.createdAt) + ) + } + kvServer.stepUp(sessions) + case Left(_) => ZIO.unit + } + case zio.raft.StateNotification.SteppedDown(leaderId) => + kvServer.stepDown(leaderId) + case zio.raft.StateNotification.LeaderChanged(leaderId) => + kvServer.leaderChanged(leaderId) + + case NodeAction.FromServer(_) => ZIO.unit + + // Unified stream construction + private val unifiedStream: ZStream[Any, Nothing, NodeAction] = + ZStream.mergeAllUnbounded(16)( + kvServer.stream.map(NodeAction.FromServer.apply), + // TODO: filter this if we are not the leader + ZStream.tick(10.seconds).mapZIO(_ => Clock.instant.map(NodeAction.RetryTick.apply)), + raft.stateNotifications.map(NodeAction.StateNotificationReceived.apply) + ) + + def applyCommand( + sessionId: SessionId, + requestId: RequestId, + lowestPendingRequestId: RequestId, + command: KVCommand + ): UIO[Option[command.Response]] = + for + now <- Clock.instant + cmd = SessionCommand.ClientRequest[KVCommand, KVServerRequest]( + now, + sessionId, + requestId, + lowestPendingRequestId, + command + ) + either <- raft.sendCommand(cmd).either + result <- either match + case Right(Right((resp, envelopes))) => + for + _ <- ZIO.foreachDiscard(envelopes) { env => + kvServer.sendServerRequest(now, env.sessionId, env.requestId, env.payload) + } + yield Some(resp.asInstanceOf[command.Response]) + case Right(Left(zio.raft.sessionstatemachine.RequestError.ResponseEvicted)) => + kvServer.requestError(sessionId, requestId, zio.raft.sessionstatemachine.RequestError.ResponseEvicted).as( + None + ) + case Left(_: zio.raft.NotALeaderError) => + // Ignore not leader error, server will handle it eventually + ZIO.none + yield result + + // Run unified loop + def run: UIO[Unit] = + ZIO.logInfo("Node started") *> + unifiedStream.mapZIO(handleAction).runDrain +end Node + +object Node: + + def make( + serverAddress: String, + nodeAddress: String, + logDirectory: String, + snapshotDirectory: String, + memberId: zio.raft.MemberId, + peers: Map[zio.raft.MemberId, String] + ) = + for + stable <- LmdbStable.make.debug("LmdbStable.make") + + logStore <- SegmentedLog.make[SessionCommand[KVCommand, KVServerRequest]](logDirectory).debug("SegmentedLog.make") + snapshotStore <- FileSnapshotStore.make(zio.nio.file.Path(snapshotDirectory)).debug("FileSnapshotStore.make") + rpc <- ZmqRpc.make[SessionCommand[KVCommand, KVServerRequest]]( + nodeAddress, + peers + ).debug("ZmqRpc.make") + + raft <- Raft.make( + memberId = memberId, + peers = peers.keySet, + stable = stable, + logStore = logStore, + snapshotStore = snapshotStore, + rpc = rpc, + stateMachine = new KVStateMachine() + ) + _ <- raft.run.forkScoped + _ <- raft.bootstrap.when(memberId == zio.raft.MemberId("node-1")) + kvServer <- zio.kvstore.KVServer.make(serverAddress).debug("KVServer.make") + node = Node(kvServer, raft) + yield node + + sealed trait NodeAction + object NodeAction: + case class FromServer(action: KVServerAction) extends NodeAction + case class RetryTick(now: Instant) extends NodeAction + case class StateNotificationReceived(notification: zio.raft.StateNotification) extends NodeAction diff --git a/kvstore/src/main/scala/zio/kvstore/package.scala b/kvstore/src/main/scala/zio/kvstore/package.scala new file mode 100644 index 00000000..c39a68c3 --- /dev/null +++ b/kvstore/src/main/scala/zio/kvstore/package.scala @@ -0,0 +1,31 @@ +package zio + +import zio.raft.{Command, HMap} +import zio.prelude.Newtype + +package object kvstore: + + sealed trait KVCommand extends Command + object KVCommand: + final case class Set(key: String, value: String) extends KVCommand: + type Response = KVResponse.SetDone.type + final case class Get(key: String) extends KVCommand: + type Response = KVResponse.GetResult + final case class Watch(key: String) extends KVCommand: + type Response = KVResponse.WatchDone.type + + sealed trait KVResponse + object KVResponse: + case object SetDone extends KVResponse + final case class GetResult(value: Option[String]) extends KVResponse + case object WatchDone extends KVResponse + + object KVKey extends Newtype[String] + type KVKey = KVKey.Type + given HMap.KeyLike[KVKey] = HMap.KeyLike.forNewtype(KVKey) + + type KVSchema = + ("kv", KVKey, String) *: + ("subsByKey", KVKey, Set[zio.raft.protocol.SessionId]) *: + ("subsBySession", zio.raft.protocol.SessionId, Set[KVKey]) *: + EmptyTuple diff --git a/kvstore/src/test/scala/zio/kvstore/KVSessionSpec.scala b/kvstore/src/test/scala/zio/kvstore/KVSessionSpec.scala deleted file mode 100644 index 111198c3..00000000 --- a/kvstore/src/test/scala/zio/kvstore/KVSessionSpec.scala +++ /dev/null @@ -1,46 +0,0 @@ -package zio.kvstore - -import zio.test.* -import zio.raft.HMap -import zio.raft.protocol.{RequestId, SessionId} -import zio.raft.sessionstatemachine.SessionCommand -import java.time.Instant - -object KVSessionSpec extends ZIOSpecDefault: - def spec = suite("KVStateMachine - session lifecycle")( - test("create session, set/get, expire clears metadata and cache") { - val sm = new KVStateMachine() - val sessionId = SessionId("s1") - val now = Instant.now() - - val create: SessionCommand[KVCommand, KVServerRequest] = - SessionCommand.CreateSession[KVServerRequest](now, sessionId, Map("x" -> "y")) - .asInstanceOf[SessionCommand[KVCommand, KVServerRequest]] - val (s1, _) = sm.apply(create).run(HMap.empty[sm.Schema]) - - val set = SessionCommand.ClientRequest[KVCommand, KVServerRequest]( - createdAt = now, - sessionId = sessionId, - requestId = RequestId(1), - lowestPendingRequestId = RequestId(1), - command = KVCommand.Set("k", "v") - ) - val (s2, _) = sm.apply(set).run(s1) - - val expire = SessionCommand.SessionExpired[KVServerRequest](now, sessionId) - .asInstanceOf[SessionCommand[KVCommand, KVServerRequest]] - val (s3, _) = sm.apply(expire).run(s2) - - // After expiration, a get should behave as empty (new session needed normally; here we just check state cleared) - val get = SessionCommand.ClientRequest[KVCommand, KVServerRequest]( - createdAt = now, - sessionId = sessionId, - requestId = RequestId(2), - lowestPendingRequestId = RequestId(1), - command = KVCommand.Get("k") - ) - val (_, result) = sm.apply(get).run(s3) - val Right((resp, _)) = result.asInstanceOf[Either[Any, (KVResponse, List[Any])]]: @unchecked - assertTrue(resp == GetResult(Some("v"))) - } - ) diff --git a/kvstore/src/test/scala/zio/kvstore/KVStateMachineSpec.scala b/kvstore/src/test/scala/zio/kvstore/KVStateMachineSpec.scala deleted file mode 100644 index 7b2a15a7..00000000 --- a/kvstore/src/test/scala/zio/kvstore/KVStateMachineSpec.scala +++ /dev/null @@ -1,53 +0,0 @@ -package zio.kvstore - -import zio.test.* -import zio.raft.HMap -import zio.raft.protocol.{RequestId, SessionId} -import zio.raft.sessionstatemachine.SessionCommand -import java.time.Instant - -object KVStateMachineSpec extends ZIOSpecDefault: - - def spec = suite("KVStateMachine - idempotency")( - test("duplicate Set with same requestId ignores new payload (cached)") { - val sm = new KVStateMachine() - val sessionId = SessionId("s1") - val now = Instant.now() - - val create: SessionCommand[KVCommand, KVServerRequest] = - SessionCommand.CreateSession[KVServerRequest](now, sessionId, Map.empty) - .asInstanceOf[SessionCommand[KVCommand, KVServerRequest]] - val (s1, _) = sm.apply(create).run(HMap.empty[sm.Schema]) - - val set1 = SessionCommand.ClientRequest[KVCommand, KVServerRequest]( - createdAt = now, - sessionId = sessionId, - requestId = RequestId(1), - lowestPendingRequestId = RequestId(1), - command = KVCommand.Set("a", "v1") - ) - val (s2, _) = sm.apply(set1).run(s1) - - // Same requestId but different payload – should return cached response of first, not apply second payload - val set2 = SessionCommand.ClientRequest[KVCommand, KVServerRequest]( - createdAt = now, - sessionId = sessionId, - requestId = RequestId(1), - lowestPendingRequestId = RequestId(1), - command = KVCommand.Set("a", "v2") - ) - val (s3, _) = sm.apply(set2).run(s2) - - val get = SessionCommand.ClientRequest[KVCommand, KVServerRequest]( - createdAt = now, - sessionId = sessionId, - requestId = RequestId(2), - lowestPendingRequestId = RequestId(1), - command = KVCommand.Get("a") - ) - val (_, result) = sm.apply(get).run(s3) - - val Right((resp, _)) = result.asInstanceOf[Either[Any, (KVResponse, List[Any])]]: @unchecked - assertTrue(resp == GetResult(Some("v1"))) - } - ) diff --git a/raft/src/main/scala/zio/raft/HMap.scala b/raft/src/main/scala/zio/raft/HMap.scala index ac98afa5..74eda00f 100644 --- a/raft/src/main/scala/zio/raft/HMap.scala +++ b/raft/src/main/scala/zio/raft/HMap.scala @@ -181,7 +181,7 @@ final case class HMap[M <: Tuple](private val m: TreeMap[Array[Byte], Any] = * hmap.get["invalid"](...) // ERROR: Prefix 'invalid' is not allowed * }}} */ - def get[P <: String & Singleton: ValueOf](key: KeyAt[M, P])(using + inline def get[P <: String & Singleton: ValueOf](key: KeyAt[M, P])(using Contains[M, P], KeyLike[KeyAt[M, P]] ): Option[ValueAt[M, P]] = @@ -564,7 +564,7 @@ object HMap: * type BadType = HMap.KeyAt[Schema, "invalid"] // Nothing * }}} */ - type KeyAt[M <: Tuple, P <: String] = M match + type KeyAt[M <: Tuple, P <: String & Singleton] = M match case (P, k, v) *: t => k case (?, ?, ?) *: t => KeyAt[t, P] case EmptyTuple => Nothing diff --git a/raft/src/main/scala/zio/raft/Raft.scala b/raft/src/main/scala/zio/raft/Raft.scala index 9619c87a..681655c8 100644 --- a/raft/src/main/scala/zio/raft/Raft.scala +++ b/raft/src/main/scala/zio/raft/Raft.scala @@ -969,6 +969,12 @@ class Raft[S, A <: Command]( ZIO .logDebug(s"memberId=${this.memberId} read pending command index=$index") .as(l.withReadPendingCommand(r.promise, index)) + case None if peers.size == 0 => + for + _ <- ZIO.logDebug("No peers, returning state") + appState <- appStateRef.get + _ <- r.promise.succeed(appState) + yield l case None => for now <- zio.Clock.instant @@ -987,6 +993,8 @@ class Raft[S, A <: Command]( result <- promise.await yield result + def readStateDirty: ZIO[Any, Nothing, S] = appStateRef.get + // bootstrap the node and wait until the node would become the leader, only works when the current term is zero def bootstrap = for diff --git a/session-state-machine/src/main/scala/zio/raft/sessionstatemachine/Codecs.scala b/session-state-machine/src/main/scala/zio/raft/sessionstatemachine/Codecs.scala index 7bc108ed..90574bb2 100644 --- a/session-state-machine/src/main/scala/zio/raft/sessionstatemachine/Codecs.scala +++ b/session-state-machine/src/main/scala/zio/raft/sessionstatemachine/Codecs.scala @@ -3,6 +3,8 @@ package zio.raft.sessionstatemachine import zio.raft.protocol.RequestId import scodec.Codec import java.time.Instant +import scodec.codecs.{utf8_32, listOfN, uint8, discriminated, int32} +import zio.raft.protocol.Codecs.{sessionIdCodec, requestIdCodec as requestIdCodecProtocol, instantCodec} /** Scodec codec definitions for SessionStateMachine types. * @@ -12,6 +14,8 @@ import java.time.Instant * - PendingServerRequest[SR] (automatically derived from payload codec) */ object Codecs: + val capabilitiesCodec: Codec[Map[String, String]] = + listOfN(int32, (utf8_32 :: utf8_32).as[(String, String)]).xmap(_.toMap, _.toList) /** Codec for SessionMetadata. * @@ -19,7 +23,7 @@ object Codecs: */ given sessionMetadataCodec: Codec[SessionMetadata] = import scodec.codecs.* - (list(utf8_32 :: utf8_32).xmap(_.toMap, _.toList) :: int64).xmap( + (capabilitiesCodec :: int64).xmap( { case (caps, ts) => SessionMetadata(caps, Instant.ofEpochMilli(ts)) }, sm => (sm.capabilities, sm.createdAt.toEpochMilli) ) @@ -28,9 +32,7 @@ object Codecs: * * Encodes as: Long */ - given requestIdCodec: Codec[RequestId] = - import scodec.codecs.int64 - int64.xmap(RequestId(_), RequestId.unwrap) + given requestIdCodec: Codec[RequestId] = requestIdCodecProtocol /** Codec for PendingServerRequest - automatically derived from payload codec. * @@ -54,3 +56,61 @@ object Codecs: { case (payload, ts) => PendingServerRequest(payload, Instant.ofEpochMilli(ts)) }, p => (p.payload, p.lastSentAt.toEpochMilli) ) + + /** Codec for SessionCommand, parameterized by UC (user command) and SR (server request payload). Requires codecs for + * UC and SR in scope. + */ + given sessionCommandCodec[UC <: zio.raft.Command, SR](using + ucCodec: Codec[UC], + srCodec: Codec[SR] + ): Codec[SessionCommand[UC, SR]] = + val clientRequestV0: Codec[SessionCommand.ClientRequest[UC, SR]] = + (instantCodec :: sessionIdCodec :: requestIdCodec :: requestIdCodec :: ucCodec) + .as[SessionCommand.ClientRequest[UC, SR]] + val clientRequestCodec: Codec[SessionCommand.ClientRequest[UC, SR]] = + (uint8 :: clientRequestV0).xmap( + { case (_, cmd) => cmd }, + cmd => (0, cmd) + ) + + val serverRequestAckV0: Codec[SessionCommand.ServerRequestAck[SR]] = + (instantCodec :: sessionIdCodec :: requestIdCodec).as[SessionCommand.ServerRequestAck[SR]] + val serverRequestAckCodec: Codec[SessionCommand.ServerRequestAck[SR]] = + (uint8 :: serverRequestAckV0).xmap( + { case (_, cmd) => cmd }, + cmd => (0, cmd) + ) + + val createSessionV0: Codec[SessionCommand.CreateSession[SR]] = + (instantCodec :: sessionIdCodec :: capabilitiesCodec).as[SessionCommand.CreateSession[SR]] + val createSessionCodec: Codec[SessionCommand.CreateSession[SR]] = + (uint8 :: createSessionV0).xmap( + { case (_, cmd) => cmd }, + cmd => (0, cmd) + ) + + val sessionExpiredV0: Codec[SessionCommand.SessionExpired[SR]] = + (instantCodec :: sessionIdCodec).as[SessionCommand.SessionExpired[SR]] + val sessionExpiredCodec: Codec[SessionCommand.SessionExpired[SR]] = + (uint8 :: sessionExpiredV0).xmap( + { case (_, cmd) => cmd }, + cmd => (0, cmd) + ) + + val getRequestsForRetryV0: Codec[SessionCommand.GetRequestsForRetry[SR]] = + (instantCodec :: instantCodec).as[SessionCommand.GetRequestsForRetry[SR]] + val getRequestsForRetryCodec: Codec[SessionCommand.GetRequestsForRetry[SR]] = + (uint8 :: getRequestsForRetryV0).xmap( + { case (_, cmd) => cmd }, + cmd => (0, cmd) + ) + + discriminated[SessionCommand[UC, SR]] + .by(uint8) + .typecase(0, clientRequestCodec) + .typecase(1, serverRequestAckCodec) + .typecase(2, createSessionCodec) + .typecase(3, sessionExpiredCodec) + .typecase(4, getRequestsForRetryCodec) + end sessionCommandCodec +end Codecs diff --git a/session-state-machine/src/main/scala/zio/raft/sessionstatemachine/SessionCommand.scala b/session-state-machine/src/main/scala/zio/raft/sessionstatemachine/SessionCommand.scala index 895c69fc..6a9cc824 100644 --- a/session-state-machine/src/main/scala/zio/raft/sessionstatemachine/SessionCommand.scala +++ b/session-state-machine/src/main/scala/zio/raft/sessionstatemachine/SessionCommand.scala @@ -14,7 +14,7 @@ import java.time.Instant * @tparam SR * Server-initiated request payload type */ -sealed trait SessionCommand[UC <: Command, SR] extends Command +sealed trait SessionCommand[+UC <: Command, SR] extends Command object SessionCommand: @@ -123,5 +123,5 @@ object SessionCommand: createdAt: Instant, lastSentBefore: Instant ) extends SessionCommand[Nothing, SR]: - type Response = List[PendingServerRequest[SR]] + type Response = List[ServerRequestEnvelope[SR]] end SessionCommand diff --git a/session-state-machine/src/main/scala/zio/raft/sessionstatemachine/SessionStateMachine.scala b/session-state-machine/src/main/scala/zio/raft/sessionstatemachine/SessionStateMachine.scala index 0884a1d3..a13d46c5 100644 --- a/session-state-machine/src/main/scala/zio/raft/sessionstatemachine/SessionStateMachine.scala +++ b/session-state-machine/src/main/scala/zio/raft/sessionstatemachine/SessionStateMachine.scala @@ -299,7 +299,7 @@ trait SessionStateMachine[UC <: Command, R, SR, UserSchema <: Tuple] // If requestId < highestLowestPendingRequestIdSeen, client has acknowledged receiving this response if cmd.requestId.isLowerThan(highestLowestSeen) then // Client said "I have responses for all requestIds < highestLowestPending", so this was evicted - State.succeed(Left(RequestError.ResponseEvicted(cmd.sessionId, cmd.requestId))) + State.succeed(Left(RequestError.ResponseEvicted)) else // requestId >= highestLowestPendingRequestIdSeen // This is a valid request (not yet acknowledged), execute the command @@ -414,18 +414,18 @@ trait SessionStateMachine[UC <: Command, R, SR, UserSchema <: Tuple] * iterating through all pending requests. */ private def handleGetRequestsForRetry(cmd: SessionCommand.GetRequestsForRetry[SR]) - : State[HMap[Schema], List[PendingServerRequest[SR]]] = + : State[HMap[Schema], List[ServerRequestEnvelope[SR]]] = State.modify { state => // Use foldRight to collect updated requests and build new state // This avoids vars and the need to reverse at the end - state.iterator["serverRequests"].foldRight((List.empty[PendingServerRequest[SR]], state)) { + state.iterator["serverRequests"].foldRight((List.empty[ServerRequestEnvelope[SR]], state)) { case (((sessionId, requestId), pending), (accumulated, currentState)) => // Check if this request needs retry (lastSentAt before threshold) if pending.lastSentAt.isBefore(cmd.lastSentBefore) then // Update lastSentAt and add to accumulated list val updatedReq = pending.copy(lastSentAt = cmd.createdAt) val updatedState = currentState.updated["serverRequests"]((sessionId, requestId), updatedReq) - (updatedReq :: accumulated, updatedState) + (ServerRequestEnvelope(sessionId, requestId, updatedReq.payload) :: accumulated, updatedState) else (accumulated, currentState) } @@ -552,27 +552,20 @@ trait SessionStateMachine[UC <: Command, R, SR, UserSchema <: Tuple] // Remove all old cache entries in one efficient operation state.removedAll["cache"](keysToRemove) } - - /** Dirty read helper - check if ANY session has pending requests needing retry. - * - * This method can be called directly (outside Raft consensus) to optimize the retry process. The retry process - * performs a dirty read, applies policy locally, and only sends GetRequestsForRetry command if retries are needed. - * - * Uses HMap.exists for efficient short-circuit evaluation - stops as soon as it finds ANY request (across all - * sessions) that needs retry. - * - * @param state - * Current state (can be stale - dirty read) - * @param lastSentBefore - * Retry threshold - check for requests sent before this time - * @return - * true if any pending requests (across ALL sessions) have lastSentAt < lastSentBefore - */ - def hasPendingRequests( - state: HMap[Schema], +end SessionStateMachine +object SessionStateMachine: + def hasPendingRequests[R, SR, UserSchema <: Tuple]( + state: HMap[Tuple.Concat[SessionSchema[R, SR], UserSchema]], lastSentBefore: Instant ): Boolean = state.exists["serverRequests"] { (_, pending) => pending.lastSentAt.isBefore(lastSentBefore) } -end SessionStateMachine + + def getSessions[R, SR, UserSchema <: Tuple]( + state: HMap[Tuple.Concat[SessionSchema[R, SR], UserSchema]] + ): Map[SessionId, SessionMetadata] = + state.iterator["metadata"].collect { + case (sessionId: SessionId, metadata) => + (sessionId, SessionMetadata(metadata.capabilities, metadata.createdAt)) + }.toMap diff --git a/session-state-machine/src/main/scala/zio/raft/sessionstatemachine/package.scala b/session-state-machine/src/main/scala/zio/raft/sessionstatemachine/package.scala index 1a02eee2..663c0ae9 100644 --- a/session-state-machine/src/main/scala/zio/raft/sessionstatemachine/package.scala +++ b/session-state-machine/src/main/scala/zio/raft/sessionstatemachine/package.scala @@ -146,7 +146,7 @@ package object sessionstatemachine: * * Per Raft dissertation Chapter 6.3, the client should create a new session and retry the operation. */ - case ResponseEvicted(sessionId: SessionId, requestId: RequestId) + case ResponseEvicted /** Fixed schema for session management state with typed keys. * @@ -185,7 +185,7 @@ package object sessionstatemachine: /** KeyLike instance for SessionId keys. Used by metadata, serverRequests, and lastServerRequestId prefixes. */ - given HMap.KeyLike[SessionId] = HMap.KeyLike.forNewtype(SessionId) + given sessionIdKeyLike: HMap.KeyLike[SessionId] = HMap.KeyLike.forNewtype(SessionId) /** Helper type to concatenate SessionSchema with user schema. * diff --git a/session-state-machine/src/test/scala/zio/raft/sessionstatemachine/CodecsSpec.scala b/session-state-machine/src/test/scala/zio/raft/sessionstatemachine/CodecsSpec.scala new file mode 100644 index 00000000..27f78dfc --- /dev/null +++ b/session-state-machine/src/test/scala/zio/raft/sessionstatemachine/CodecsSpec.scala @@ -0,0 +1,120 @@ +package zio.raft.sessionstatemachine + +import zio.test.* +import scodec.Codec +import scodec.codecs.* +import zio.raft.sessionstatemachine.Codecs.given +import zio.raft.protocol.* +import java.time.Instant +import zio.raft.Command + +object CodecsSpec extends ZIOSpecDefault: + def spec = + suiteAll("Session State Machine Codecs") { + test("sessionMetadata round-trip") { + val sm = SessionMetadata(Map("a" -> "b"), Instant.ofEpochMilli(1234L)) + val bits = summon[Codec[SessionMetadata]].encode(sm).require + val decoded = summon[Codec[SessionMetadata]].decode(bits).require.value + assertTrue(decoded == sm) + } + + test("requestId round-trip") { + val r = RequestId(42L) + val bits = Codecs.requestIdCodec.encode(r).require + val decoded = Codecs.requestIdCodec.decode(bits).require.value + assertTrue(decoded == r) + } + + test("pendingServerRequest round-trip") { + given Codec[String] = utf8_32 + val psr = PendingServerRequest("payload", Instant.ofEpochMilli(999L)) + val bits = summon[Codec[PendingServerRequest[String]]].encode(psr).require + val decoded = summon[Codec[PendingServerRequest[String]]].decode(bits).require.value + assertTrue(decoded == psr) + } + + suiteAll("sessionCommand discriminated union round-trips") { + test("ClientRequest") { + // define a concrete UC that extends zio.raft.Command and provide a codec for it + case object DummyCmd extends Command: + type Response = Unit + given Codec[DummyCmd.type] = provide(DummyCmd) + given Codec[Unit] = provide(()) + val cmd: SessionCommand[DummyCmd.type, Unit] = SessionCommand.ClientRequest[DummyCmd.type, Unit]( + createdAt = Instant.EPOCH, + sessionId = SessionId.fromString("s-1"), + requestId = RequestId(1L), + lowestPendingRequestId = RequestId(0L), + command = DummyCmd + ) + val codec = summon[Codec[SessionCommand[DummyCmd.type, Unit]]] + val bits = codec.encode(cmd).require + val decoded = codec.decode(bits).require.value + assertTrue(decoded == cmd) + } + + test("ServerRequestAck") { + // reuse Dummy UC type param for the union codec + case object DummyCmd extends Command: + type Response = Unit + given Codec[DummyCmd.type] = provide(DummyCmd) + given Codec[Unit] = provide(()) + val cmd = SessionCommand.ServerRequestAck[Unit]( + createdAt = Instant.EPOCH, + sessionId = SessionId.fromString("s-2"), + requestId = RequestId(2L) + ) + val codec = summon[Codec[SessionCommand[DummyCmd.type, Unit]]] + val bits = codec.encode(cmd).require + val decoded = codec.decode(bits).require.value + assertTrue(decoded == cmd) + } + + test("CreateSession") { + case object DummyCmd extends Command: + type Response = Unit + given Codec[DummyCmd.type] = provide(DummyCmd) + given Codec[Unit] = provide(()) + val cmd = SessionCommand.CreateSession[Unit]( + createdAt = Instant.EPOCH, + sessionId = SessionId.fromString("s-3"), + capabilities = Map("k" -> "v") + ) + val codec = summon[Codec[SessionCommand[DummyCmd.type, Unit]]] + val bits = codec.encode(cmd).require + val decoded = codec.decode(bits).require.value + assertTrue(decoded == cmd) + } + + test("SessionExpired") { + case object DummyCmd extends Command: + type Response = Unit + given Codec[DummyCmd.type] = provide(DummyCmd) + given Codec[Unit] = provide(()) + val cmd = SessionCommand.SessionExpired[Unit]( + createdAt = Instant.EPOCH, + sessionId = SessionId.fromString("s-4") + ) + val codec = summon[Codec[SessionCommand[DummyCmd.type, Unit]]] + val bits = codec.encode(cmd).require + val decoded = codec.decode(bits).require.value + assertTrue(decoded == cmd) + } + + test("GetRequestsForRetry") { + case object DummyCmd extends Command: + type Response = Unit + given Codec[DummyCmd.type] = provide(DummyCmd) + given Codec[Unit] = provide(()) + val cmd = SessionCommand.GetRequestsForRetry[Unit]( + createdAt = Instant.EPOCH, + lastSentBefore = Instant.ofEpochMilli(500L) + ) + val codec = summon[Codec[SessionCommand[DummyCmd.type, Unit]]] + val bits = codec.encode(cmd).require + val decoded = codec.decode(bits).require.value + assertTrue(decoded == cmd) + } + } + } +end CodecsSpec diff --git a/session-state-machine/src/test/scala/zio/raft/sessionstatemachine/IdempotencySpec.scala b/session-state-machine/src/test/scala/zio/raft/sessionstatemachine/IdempotencySpec.scala index 0f2da8b4..2389a88c 100644 --- a/session-state-machine/src/test/scala/zio/raft/sessionstatemachine/IdempotencySpec.scala +++ b/session-state-machine/src/test/scala/zio/raft/sessionstatemachine/IdempotencySpec.scala @@ -188,10 +188,8 @@ object IdempotencySpec extends ZIOSpecDefault: val (state5, result4) = sm.apply(cmd4).run(state4) (result4.asInstanceOf[Either[RequestError, (Int, List[Any])]]: @unchecked) match - case Left(RequestError.ResponseEvicted(sid, rid)) => + case Left(RequestError.ResponseEvicted) => assertTrue( - sid == sessionId && - rid == RequestId(1) && sm.callCount == 3 // Command was NOT executed again (only 3 commands processed) ) case Right(_) => diff --git a/specs/004-add-client-server/data-model.md b/specs/004-add-client-server/data-model.md new file mode 100644 index 00000000..86d0fb2b --- /dev/null +++ b/specs/004-add-client-server/data-model.md @@ -0,0 +1,39 @@ +# Data Model: Add client-server to kvstore, CLI, and watch command + +## Entities + +### Session +- Represents a client's active interaction window +- Attributes: `sessionId` +- Lifecycle: Active → Expired (on expire-session) + +### Subscription +- Associates a `sessionId` with a `key` +- Removed when the session expires +- Multiple subscriptions per session allowed (unlimited) + +### KeyValue +- The key and its current value +- Operations: create/update (deletes out of scope) + +### Notification/Event +- Payload: `key`, `value` +- Emitted to all sessions subscribed to `key` at delivery time +- Delivery: all updates delivered, no coalescing; no cross-session ordering guarantee + +## Relationships +- One Session ↔ many Subscriptions +- One KeyValue ↔ many Sessions (fan-out) + +## Derived Views (conceptual) +- Subscriptions Map: `key -> Set[sessionId]` +- Session Index: `sessionId -> Set[key]` + +## Validation Rules +- Duplicate watch requests (same session/key) are idempotent +- Watch requires active session (enforced by client-server libraries) + +## Integration Mapping (conceptual) +- Server `RaftAction` → `SessionCommand` +- State machine responses → server `ServerAction` +- Retry job: check `hasPendingRequests` before `SessionCommand.GetRequestsForRetry` diff --git a/specs/004-add-client-server/plan.md b/specs/004-add-client-server/plan.md new file mode 100644 index 00000000..df367c11 --- /dev/null +++ b/specs/004-add-client-server/plan.md @@ -0,0 +1,159 @@ +# Implementation Plan: Add client-server to kvstore, CLI, and watch command + +**Branch**: `004-add-client-server` | **Date**: 2025-10-26 | **Spec**: /Users/somdoron/git/zio-raft/specs/004-add-client-server/spec.md +**Input**: Feature specification from `/specs/004-add-client-server/spec.md` + +## Execution Flow (/plan command scope) +``` +1. Load feature spec from Input path + → If not found: ERROR "No feature spec at {path}" +2. Fill Technical Context (scan for NEEDS CLARIFICATION) + → Detect Project Type from context (web=frontend+backend, mobile=app+api) + → Set Structure Decision based on project type +3. Fill the Constitution Check section based on the content of the constitution document. +4. Evaluate Constitution Check section below + → If violations exist: Document in Complexity Tracking + → If no justification possible: ERROR "Simplify approach first" + → Update Progress Tracking: Initial Constitution Check +5. Execute Phase 0 → research.md + → If NEEDS CLARIFICATION remain: ERROR "Resolve unknowns" +6. Execute Phase 1 → data-model.md, quickstart.md, agent-specific template file (if applicable) +7. Re-evaluate Constitution Check section + → If new violations: Refactor design, return to Phase 1 + → Update Progress Tracking: Post-Design Constitution Check +8. Plan Phase 2 → Describe task generation approach (DO NOT create tasks.md) +9. STOP - Ready for /tasks command +``` + +## Summary +Use existing client-server libraries (Feature 001) and the session state machine (Feature 002) to replace the HTTP example in `kvstore`, add a `kvstore-cli` for demonstration, implement a watch command that returns the current value immediately and streams subsequent updates, manage per-session subscriptions, and introduce a stream-based `Node` that binds the Raft server, Raft core, and state machine: it consumes `RaftServer.raftActions`, maps them to `SessionCommand` and maps responses back to `ServerAction`; it also merges Raft core `stateNotifications` mapped to `RaftServer.stepUp`/`stepDown`; and it runs a 10s periodic retry stream that checks `hasPendingRequests` (dirty read) before initiating `SessionCommand.GetRequestsForRetry`. + +## Technical Context +**Language/Version**: Scala 3.3+, ZIO 2.1+ +**Primary Dependencies**: ZIO, zio-raft client-server libraries (Feature 001), session-state-machine (Feature 002) +**Storage**: In-memory example (no deletes) +**Testing**: ZIO Test (suiteAll, assertTrue) mandated by Constitution +**Target Platform**: JVM (Linux/macOS) +**Project Type**: single (library + examples) +**Performance Goals**: Reasonable dev-demo responsiveness; all updates delivered to watchers +**Constraints**: +- No HTTP server in example +- Unlimited concurrent watches +- No cross-session ordering guarantees; deliver all updates, no coalescing +- Watch returns current value immediately +- Duplicate watches idempotent +- Retry logic runs inside `Node`: must check `hasPendingRequests` before `GetRequestsForRetry` +- Reuse existing libraries, no duplicate mechanisms + +### CLI Configuration +- Endpoints provided via `--endpoints` flag or `KVSTORE_ENDPOINTS` env var; defaults to localhost dev endpoints if unspecified. + +## Constitution Check +*GATE: Must pass before Phase 0 research. Re-check after Phase 1 design.* + +### I. Functional Purity & Type Safety +- [ ] All new code uses immutable data structures and ZIO effect types +- [ ] No unsafe operations (casting, reflection) introduced +- [ ] Type safety preserved throughout implementation + +### II. Explicit Error Handling +- [ ] All external interactions have explicit error handling +- [ ] Business logic errors use ZIO.fail or Either types, not exceptions +- [ ] Timeout and resource failures properly modeled + +### III. Existing Code Preservation (NON-NEGOTIABLE) +- [ ] Core interfaces (StateMachine, RPC, LogStore) not modified without architectural review +- [ ] Backward compatibility maintained for public APIs +- [ ] No performance degradation without measurement and justification + +### IV. ZIO Ecosystem Consistency +- [ ] ZIO primitives used for all concurrent operations +- [ ] ZStream used for streaming, no external streaming libraries +- [ ] Resource management follows ZIO Scope patterns +- [ ] `suiteAll` is used instead of `suite` + +### V. Test-Driven Maintenance +- [ ] Bug fixes include reproducing test cases +- [ ] Performance changes include benchmark tests +- [ ] Complex Raft scenarios have property-based tests + +## Project Structure + +### Documentation (this feature) +``` +specs/004-add-client-server/ +├── plan.md # This file (/plan command output) +├── research.md # Phase 0 output (/plan command) +├── data-model.md # Phase 1 output (/plan command) +├── quickstart.md # Phase 1 output (/plan command) +└── tasks.md # Phase 2 output (/tasks command - NOT created by /plan) +``` + +## Phase 0: Outline & Research +1. Extract unknowns from Technical Context above: + - Event payload fields → resolved: key, value + - Delivery semantics → resolved: all updates, no coalescing, unordered across sessions + - Reconnection/expiry → resolved: handled by client-server; expire-session means terminal + - Limits → resolved: unlimited concurrent watches +2. Consolidate findings in `research.md` using Decision/Rationale/Alternatives + +**Output**: research.md with all NEEDS CLARIFICATION resolved + +## Phase 1: Design +*Prerequisites: research.md complete* + +1. Extract entities from feature spec → `data-model.md`: + - Session, Subscription, KeyValue, Notification/Event +2. Extract test scenarios from user stories → quickstart.md walkthrough +3. Define `Node` design: binds server, Raft core, and state machine; stream-consumes `raftActions` (→ `SessionCommand` → `ServerAction`), merges Raft `stateNotifications` (→ `RaftServer.stepUp`/`stepDown`), and runs 10s periodic retry stream using `hasPendingRequests` → `GetRequestsForRetry`. Publishing back to the server uses `RaftServer` methods: `sendClientResponse`, `sendServerRequest`, `sendRequestError`, and `confirmSessionCreation`. + +**Output**: data-model.md, quickstart.md + +## Phase 2: Task Planning Approach +*This section describes what the /tasks command will do - DO NOT execute during /plan* + +**Task Generation Strategy**: +- Generate tasks from Phase 1 design docs (data model, quickstart) +- Each entity → model update task [P] +- Each user story → integration test task +- Implementation tasks to make tests pass + +**Ordering Strategy**: +- TDD order: Tests before implementation +- Dependency order: State machine integration before CLI +- Mark [P] for parallel execution (independent files) + +**Estimated Output**: 25-30 numbered, ordered tasks in tasks.md + +## Phase 3+: Future Implementation +*These phases are beyond the scope of the /plan command* + +**Phase 3**: Task execution (/tasks command creates tasks.md) +**Phase 4**: Implementation (execute tasks.md following constitutional principles) +**Phase 5**: Validation (run tests, execute quickstart.md, performance validation) + +## Complexity Tracking +*Fill ONLY if Constitution Check has violations that must be justified* + +| Violation | Why Needed | Simpler Alternative Rejected Because | +|-----------|------------|-------------------------------------| + +## Progress Tracking +*This checklist is updated during execution flow* + +**Phase Status**: +- [x] Phase 0: Research complete (/plan command) +- [x] Phase 1: Design complete (/plan command) +- [ ] Phase 2: Task planning complete (/plan command - describe approach only) +- [ ] Phase 3: Tasks generated (/tasks command) +- [ ] Phase 4: Implementation complete +- [ ] Phase 5: Validation passed + +**Gate Status**: +- [x] Initial Constitution Check: PASS +- [x] Post-Design Constitution Check: PASS +- [x] All NEEDS CLARIFICATION resolved +- [ ] Complexity deviations documented + +--- +*Based on Constitution v1.0.0 - See `.specify/memory/constitution.md`* diff --git a/specs/004-add-client-server/quickstart.md b/specs/004-add-client-server/quickstart.md new file mode 100644 index 00000000..a4d2cc0a --- /dev/null +++ b/specs/004-add-client-server/quickstart.md @@ -0,0 +1,39 @@ +# Quickstart: kvstore client-server example with watch + +## Prerequisites +- Scala 3.3+, sbt + +## Run the server +```bash +sbt "kvstore/runServer" +``` + +## Run the CLI +```bash +sbt "kvstore-cli/run" +``` + +## Basic usage +- Set a key: +```bash +kvstore-cli set k1 v1 +``` +- Get a key: +```bash +kvstore-cli get k1 +``` +- Watch a key (returns current value, then streams updates until session expires): +```bash +kvstore-cli watch k1 +``` +- Update the key from another terminal: +```bash +kvstore-cli set k1 v2 +``` +- Observe the watch output showing v1 (initial) then v2... + +## Notes +- Unlimited concurrent watches; duplicate watch requests are idempotent +- All updates are delivered; ordering is not guaranteed across sessions +- Deletes are not supported in this example +- If the session expires (expire-session), the watch stops and subscriptions are removed diff --git a/specs/004-add-client-server/research.md b/specs/004-add-client-server/research.md new file mode 100644 index 00000000..1216e996 --- /dev/null +++ b/specs/004-add-client-server/research.md @@ -0,0 +1,27 @@ +# Phase 0 Research: Add client-server to kvstore, CLI, and watch command + +## Decisions +- Event payload fields = key, value + - Rationale: Minimal, sufficient for example; deletes out of scope + - Alternatives: Include timestamps/versioning → deferred for simplicity +- Delivery semantics = deliver all updates, no coalescing; no cross-session ordering + - Rationale: Matches client-server guarantees and example goals + - Alternatives: Exactly-once/ordering → higher complexity, unnecessary for example +- Initial snapshot on watch = return current value immediately + - Rationale: Better UX; aligns with watch semantics in many KV systems + - Alternatives: Only subsequent changes → requires extra read in clients +- Reconnection/expiry = managed by client-server; expire-session is terminal + - Rationale: Division of responsibilities; simplifies example logic + - Alternatives: Session resumption → not required for example scope +- Concurrency limits = unlimited watches + - Rationale: Keep example unbounded; real systems may enforce quotas + - Alternatives: Configurable limits → deferred +- Retry job = check `hasPendingRequests` (dirty read) before `GetRequestsForRetry` + - Rationale: Prevent Raft log spam; efficient retries + - Alternatives: Periodic unconditional `GetRequestsForRetry` → wasteful +- Mapping = server `RaftAction` → `SessionCommand`; responses → server `ServerAction` + - Rationale: Integrates Feature 001 and 002 cleanly + - Alternatives: Duplicate pathways → violates reuse requirement + +## Open Questions +- None (all clarifications captured in spec Clarifications section) diff --git a/specs/004-add-client-server/spec.md b/specs/004-add-client-server/spec.md new file mode 100644 index 00000000..07e00e4b --- /dev/null +++ b/specs/004-add-client-server/spec.md @@ -0,0 +1,156 @@ +# Feature Specification: Add client-server to kvstore, CLI, and watch command + +**Feature Branch**: `004-add-client-server` +**Created**: 2025-10-26 +**Status**: Draft +**Input**: User description: "add client-server to kvstore example project, remove the existing HttpServer. Add a new kvstore-cli to demostrate the client. To demostrate the server request, let's add a watch command to kvstore, session will be able to watch a specific key, and for the life of that session will receive updates when that key is updated. Manage the subscriptions in the hmap, remove any subscription when the session expire." + +## Execution Flow (main) +``` +1. Parse user description from Input + → If empty: ERROR "No feature description provided" +2. Extract key concepts from description + → Identify: actors, actions, data, constraints +3. For each unclear aspect: + → Mark with [NEEDS CLARIFICATION: specific question] +4. Fill User Scenarios & Testing section + → If no clear user flow: ERROR "Cannot determine user scenarios" +5. Generate Functional Requirements + → Each requirement must be testable + → Mark ambiguous requirements +6. Identify Key Entities (if data involved) +7. Run Review Checklist + → If any [NEEDS CLARIFICATION]: WARN "Spec has uncertainties" + → If implementation details found: ERROR "Remove tech details" +8. Return: SUCCESS (spec ready for planning) +``` + +--- + +## ⚡ Quick Guidelines +- ✅ Focus on WHAT users need and WHY +- ❌ Avoid HOW to implement (no tech stack, APIs, code structure) +- 👥 Written for business stakeholders, not developers + +### Section Requirements +- **Mandatory sections**: Must be completed for every feature +- **Optional sections**: Include only when relevant to the feature +- When a section doesn't apply, remove it entirely (don't leave as "N/A") + +### For AI Generation +When creating this spec from a user prompt: +1. **Mark all ambiguities**: Use [NEEDS CLARIFICATION: specific question] for any assumption you'd need to make +2. **Don't guess**: If the prompt doesn't specify something (e.g., "login system" without auth method), mark it +3. **Think like a tester**: Every vague requirement should fail the "testable and unambiguous" checklist item +4. **Common underspecified areas**: + - User types and permissions + - Data retention/deletion policies + - Performance targets and scale + - Error handling behaviors + - Integration requirements + - Security/compliance needs + +--- + +## User Scenarios & Testing (mandatory) + +### Primary User Story +As a developer using the kvstore example, I want to run a server that exposes a client-server interface (instead of HTTP) and use a `kvstore-cli` to set/get keys and to watch a specific key, so that during my session I receive real-time updates when that key changes, and the watch stops when my session ends. + +### Acceptance Scenarios +1. Given the kvstore server is running and a client establishes a session, When the client issues a watch command for key "k1" and another client updates "k1", Then the watching client sees an update event for "k1" during the active session. +2. Given an active watch on key "k1" for a session, When the session expires or is closed, Then the watch stops and no further updates are delivered to that session. +3. Given the project previously exposed an HTTP server, When the feature is applied, Then the example uses only the client-server interface and the HTTP server is not present in the example run path and docs. +4. Given multiple clients watch the same key, When the key is updated, Then all active sessions with a watch on that key receive the update during their session lifetimes; delivery ordering is not guaranteed and fan-out includes all sessions found in the subscriptions map at delivery time. +5. Given a client starts a watch on a key that currently has a value, When the watch begins, Then the client receives the current value immediately and continues to receive subsequent changes. + +### Edge Cases +- Deletes are out of scope for the example project and are not supported. +- If the same session attempts to watch the same key multiple times, subsequent watch requests have no effect (idempotent). +- Reconnection behavior is managed by client-server; watches persist while the session remains active. Upon receiving an expire-session, the session will not resume and all subscriptions are removed. +- If a key is updated many times rapidly, all updates are delivered (no coalescing). Ordering is not guaranteed across sessions. +- Unlimited concurrent watches per session and across the server. + +## Requirements (mandatory) + +### Functional Requirements +- **FR-001**: The kvstore example MUST use the client-server architecture; the existing HTTP server MUST be removed from the example path and documentation. +- **FR-002**: A `kvstore-cli` MUST be provided to demonstrate client interactions with the server. +- **FR-003**: The CLI MUST support a watch command that subscribes the current session to updates for a specific key for the duration of that session. +- **FR-004**: The watch feature MUST emit notifications for create and update events on the watched key. +- **FR-005**: When a session expires or is closed, the system MUST remove associated subscriptions so that no further updates are sent to that session. +- **FR-006**: Subscriptions MUST be tracked per session and per key to enable multiple simultaneous watches across sessions. +- **FR-008**: The CLI MUST display incoming watch updates in real-time while the watch is active. +- **FR-009**: The system MUST fan out updates to all sessions currently subscribed to the key; delivery ordering across sessions is not guaranteed. +- **FR-011**: Upon subscription, the watch MUST return the current value of the key (if any) immediately, then deliver subsequent changes. + - **FR-012**: The system MUST allow a session to watch multiple keys concurrently and impose no feature-level limit on the number of concurrent watches per session or across the server. + - **FR-013**: Duplicate watch requests from the same session for the same key MUST be idempotent and have no effect. + - **FR-014**: The system MUST deliver all updates for a watched key without coalescing; ordering across sessions is not guaranteed. + - **FR-016**: Implement a retry server-requests job that initiates `SessionCommand.GetRequestsForRetry`, and MUST first check `hasPendingRequests` (dirty read) to avoid spamming the Raft log. + - **FR-017**: The system MUST map server library `RaftAction` to `SessionCommand`, and map state machine responses back to server `ServerAction` for delivery. + +### Key Entities (include if feature involves data) +- **Session**: Represents a client's active interaction window; has an identity and a lifetime; governs eligibility to receive watch events. +- **Subscription**: Association between a `Session` and a specific `Key`; removed when the session ends; enables event delivery to the session. +- **KeyValue**: A key with its latest value; subject of watch subscriptions; may undergo create/update. +- **Notification/Event**: An update related to a watched `KeyValue` sent to all subscribed sessions during their lifetimes; payload includes the key and value. + +--- + +## Dependencies & Assumptions +- Client-server capabilities are provided by libraries from Feature 001 (client-server) and are reused here. No new transports or protocols are introduced. +- Session lifecycle and expiry are governed by the session state machine from Feature 002, already integrated into `kvstore`. +- The example project does not include an HTTP server. + +### CLI Configuration +- CLI discovers the Raft cluster endpoints via either: + - Command-line flag `--endpoints` (comma-separated list), or + - Environment variable `KVSTORE_ENDPOINTS` (comma-separated list) +- If neither provided, default to `localhost` development endpoints. + +--- + +## Clarifications + +### Session +- Initial snapshot: upon watch subscription, return the current value immediately. +- Fan-out and ordering: deliver to all subscribed sessions found at delivery time; ordering across sessions is not guaranteed. +- Reconnection/expiry: handled by client-server; on expire-session, the session will not resume and subscriptions are removed. +- Idempotency: duplicate watch requests from the same session for the same key have no effect. +- Concurrency limits: unlimited concurrent watches per session and across the server. +- Delivery semantics: all updates are delivered (no coalescing). +- Event payload: notification includes key and value. + +--- + +## Review & Acceptance Checklist +*GATE: Automated checks run during main() execution* + +### Content Quality +- [ ] No implementation details (languages, frameworks, APIs) +- [ ] Focused on user value and business needs +- [ ] Written for non-technical stakeholders +- [ ] All mandatory sections completed + +### Requirement Completeness +- [ ] No [NEEDS CLARIFICATION] markers remain +- [ ] Requirements are testable and unambiguous +- [ ] Success criteria are measurable +- [ ] Scope is clearly bounded +- [ ] Dependencies and assumptions identified + +--- + +## Execution Status +*Updated by main() during processing* + +- [ ] User description parsed +- [ ] Key concepts extracted +- [ ] Ambiguities marked +- [ ] User scenarios defined +- [ ] Requirements generated +- [ ] Entities identified +- [ ] Review checklist passed + +--- + diff --git a/specs/004-add-client-server/tasks.md b/specs/004-add-client-server/tasks.md new file mode 100644 index 00000000..89cc12ec --- /dev/null +++ b/specs/004-add-client-server/tasks.md @@ -0,0 +1,85 @@ +# Tasks: Add client-server to kvstore, CLI, and watch command + +**Input**: Design documents from `/specs/004-add-client-server/` +**Prerequisites**: plan.md (required), research.md, data-model.md + +## Execution Flow (main) +``` +1. Load plan.md from feature directory +2. Load data-model.md and research.md +3. Generate ordered tasks per template (TDD: tests before implementation) +4. Mark [P] for tasks in different files that can run in parallel +5. Output tasks.md in feature directory +``` + +## Format: `[ID] [P?] Description` +- **[P]**: Can run in parallel (different files, no dependencies) +- Include exact file paths in descriptions + +## Phase 3.1: Setup +- [X] T001 Update `/Users/somdoron/git/zio-raft/build.sbt` to add subproject `kvstore-cli` depending on `client-server-client` and `kvstore`; ensure `kvstore` depends on `clientServerServer` as needed. Enable Scala 3.3+ and ZIO 2.1+. +- [X] T002 [P] Remove stale HTTP server comment from `/Users/somdoron/git/zio-raft/kvstore/src/main/scala/zio/kvstore/App.scala`; ensure example no longer references HTTP anywhere. +- [X] T003 [P] Extract `KVStateMachine` from `/Users/somdoron/git/zio-raft/kvstore/src/main/scala/zio/kvstore/App.scala` to `/Users/somdoron/git/zio-raft/kvstore/src/main/scala/zio/kvstore/KVStateMachine.scala`; create package object at `/Users/somdoron/git/zio-raft/kvstore/src/main/scala/zio/kvstore/package.scala` and move remaining types (`KVCommand`, `KVResponse`, `SetDone`, `GetResult`, `KVKey`, type aliases/codecs) into the package object. + +## Phase 3.2: Tests First (TDD) ⚠️ MUST COMPLETE BEFORE 3.3 +- [ ] T004 [P] Create `Watch returns current value and updates` integration test at `/Users/somdoron/git/zio-raft/kvstore/src/test/scala/zio/kvstore/WatchSpec.scala` that: starts server, uses client to `set k1 v1`, `watch k1`, asserts initial event v1 then after `set k1 v2` receives v2. Use ZIO Test `suiteAll` and `assertTrue`. +- [ ] T005 [P] Create `Duplicate watch is idempotent` test in `/Users/somdoron/git/zio-raft/kvstore/src/test/scala/zio/kvstore/WatchSpec.scala` asserting second `watch k1` by same session has no effect (no duplicate events/registrations). Use `suiteAll` and `assertTrue`. +- [ ] T006 [P] Create `Session expiry removes subscriptions` test in `/Users/somdoron/git/zio-raft/kvstore/src/test/scala/zio/kvstore/SessionExpirySpec.scala` asserting after expire-session, no further watch notifications delivered. Use `suiteAll` and `assertTrue`. +- [ ] T007 [P] Create `All updates delivered, no coalescing` test in `/Users/somdoron/git/zio-raft/kvstore/src/test/scala/zio/kvstore/DeliverySpec.scala` asserting multiple rapid sets emit all events to watcher (ordering across sessions not asserted). Use `suiteAll` and `assertTrue`. + +-## Phase 3.3: Core Implementation (ONLY after tests are failing) +- [X] T008 Define `KVServerRequest` ADT at `/Users/somdoron/git/zio-raft/kvstore/src/main/scala/zio/kvstore/protocol/KVServerRequest.scala` with `Notification(key: String, value: String)`; update usages to replace previous `NoServerRequest` alias. +- [X] T009 Add `KVCommand.Watch(key: String)` in `/Users/somdoron/git/zio-raft/kvstore/src/main/scala/zio/kvstore/package.scala`; define watch semantics including initial snapshot (return current value immediately) and idempotency (duplicate watch has no effect). +- [X] T010 Update `KVStateMachine` at `/Users/somdoron/git/zio-raft/kvstore/src/main/scala/zio/kvstore/KVStateMachine.scala` to manage subscriptions in `HMap` per spec: add schema entries for (a) key → Set[SessionId], (b) sessionId → Set[key]; register on watch; remove in `handleSessionExpired`; in `applyCommand(Set)`, after updating state, fan out `KVServerRequest.Notification` to all sessions subscribed to that key. +- [X] T011 Update scodec codecs in `/Users/somdoron/git/zio-raft/kvstore/src/main/scala/zio/kvstore/package.scala` and `/Users/somdoron/git/zio-raft/kvstore/src/main/scala/zio/kvstore/protocol/KVServerRequest.scala` to cover new command/response/server-request types. +- [X] T012 Create `Node` skeleton at `/Users/somdoron/git/zio-raft/kvstore/src/main/scala/zio/kvstore/node/Node.scala` with constructor wiring (server, raft core, state machine) and `run` method signature; no logic yet. +- [X] T012 Create `Node` skeleton at `/Users/somdoron/git/zio-raft/kvstore/src/main/scala/zio/kvstore/node/Node.scala` with constructor wiring (server, raft core, state machine) and `run` method signature; no logic yet. +- [X] T013 Implement `Node` raft actions processing stream: consume `RaftServer.raftActions`, map to `SessionCommand`, call state machine, and based on results publish via `RaftServer.sendClientResponse`, `sendServerRequest`, `sendRequestError`, or `confirmSessionCreation`. +- [X] T014 Implement `Node` 10s retry stream: periodically check `hasPendingRequests` (dirty read) and when true initiate `SessionCommand.GetRequestsForRetry`; publish resulting actions via the same `RaftServer` methods. +- [X] T015 Implement `Node` raft state notifications stream: consume Raft core `stateNotifications` and map to `RaftServer.stepUp`/`stepDown` to reflect leadership changes. +- [X] T016 Compose `Node.run` to run raftActions stream, retry stream, and state notifications stream concurrently; ensure backpressure/termination semantics; publish all outputs via appropriate `RaftServer` methods. +- [X] T017 Create `kvstore-cli` skeleton at `/Users/somdoron/git/zio-raft/kvstore-cli/src/main/scala/zio/kvstore/cli/Main.scala`; initialize client connection to Raft cluster using `client-server-client`. Read endpoints from `--endpoints` flag or `KVSTORE_ENDPOINTS`; default to localhost if unspecified. +- [X] T018 Implement `set` command in CLI: parse args and send set request via client; print confirmation. +- [X] T019 Implement `get` command in CLI: parse args and fetch value; print key/value. +- [X] T020 Implement `watch` command in CLI: parse args, subscribe to key; print initial value and stream subsequent updates until session ends. + +## Phase 3.4: Integration +- [X] T021 Wire `KVStoreServerApp` to start the Raft server and run `Node` (raftActions, retry, and state notifications streams) and expose graceful shutdown. +- [ ] T022 Ensure subscriptions are removed on session expiry by updating `handleSessionExpired` in `KVStateMachine` and validating via logs/tests. +- [ ] T023 Add minimal logging and metrics using ZIO logging for watch and Node retry/state paths. + +## Phase 3.5: Polish +- [ ] T024 [P] Update `/Users/somdoron/git/zio-raft/specs/004-add-client-server/quickstart.md` with exact sbt module names and run commands for server and CLI. +- [ ] T025 [P] Update `/Users/somdoron/git/zio-raft/README.md` to reference the new example and CLI. +- [ ] T026 [P] Unit tests for codec roundtrips for `KVServerRequest.Notification` and `KVCommand.Watch` in `/Users/somdoron/git/zio-raft/kvstore/src/test/scala/zio/kvstore/CodecSpec.scala`. +- [ ] T027 Code review: ensure ZIO ecosystem consistency and constitution compliance; remove any lingering HTTP comments. + +## Dependencies +- Setup (T001-T003) before tests and implementation +- Tests (T004-T007) before core implementation (T008-T020) +- T008 blocks T009-T011 +- T012-T016 block T021 +- Implementation before polish (T024-T027) + +## Parallel Example +``` +# Launch independent tests in parallel: +Task: "T004 Watch returns current value and updates" +Task: "T005 Duplicate watch is idempotent" +Task: "T006 Session expiry removes subscriptions" +Task: "T007 All updates delivered, no coalescing" + +# Launch independent polish tasks in parallel: +Task: "T024 Update quickstart.md" +Task: "T025 Update README.md" +Task: "T026 Codec unit tests" +``` + +## Validation Checklist +- [ ] All functionality has corresponding ZIO Test specifications +- [ ] All data models use immutable structures and type safety +- [ ] All tests come before implementation (TDD compliance) +- [ ] Parallel tasks truly independent and don't affect shared state +- [ ] Each task preserves existing abstractions and APIs +- [ ] No task introduces unsafe operations or exceptions for business logic +- [ ] ZIO ecosystem consistency maintained throughout diff --git a/stores/src/main/scala/zio/raft/stores/FileSnapshotStore.scala b/stores/src/main/scala/zio/raft/stores/FileSnapshotStore.scala index 8c9ee337..edd9f621 100644 --- a/stores/src/main/scala/zio/raft/stores/FileSnapshotStore.scala +++ b/stores/src/main/scala/zio/raft/stores/FileSnapshotStore.scala @@ -22,6 +22,7 @@ import zio.nio.channels.AsynchronousFileChannel import java.nio.file.StandardOpenOption import zio.nio.Buffer import zio.stream.ZStream +import org.lmdbjava.DbiFlags class FileSnapshotStore( environment: Environment, @@ -222,7 +223,7 @@ object FileSnapshotStore: def make(directory: Path): ZIO[Environment, Nothing, FileSnapshotStore] = for environment <- ZIO.service[Environment] - database <- Database.open("snapshots").orDie + database <- Database.open("snapshots", DbiFlags.MDB_CREATE).orDie latest <- environment .transact( database.stream diff --git a/stores/src/main/scala/zio/raft/stores/LmdbStable.scala b/stores/src/main/scala/zio/raft/stores/LmdbStable.scala index 6ea8dbce..1df36562 100644 --- a/stores/src/main/scala/zio/raft/stores/LmdbStable.scala +++ b/stores/src/main/scala/zio/raft/stores/LmdbStable.scala @@ -9,6 +9,7 @@ import zio.UIO import zio.ZIO import zio.lmdb.Environment import scodec.bits.BitVector +import org.lmdbjava.DbiFlags class LmdbStable(environment: Environment, ref: Ref[(Term, Option[MemberId])], database: Database, key: Array[Byte]) extends Stable: @@ -37,7 +38,7 @@ object LmdbStable: def make: ZIO[Environment, Nothing, LmdbStable] = for environment <- ZIO.service[Environment] - database <- Database.open("stable").orDie + database <- Database.open("stable", DbiFlags.MDB_CREATE).orDie key = "stable".getBytes("UTF-8") bytes <- environment.transactReadOnly(database.get(key)).orDie tuple <- bytes match