Skip to content

Commit f95d8fc

Browse files
committed
add client/server to kvstore
1 parent c1b91c4 commit f95d8fc

File tree

26 files changed

+999
-165
lines changed

26 files changed

+999
-165
lines changed

build.sbt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ lazy val kvstore = project
120120
),
121121
excludeDependencies += "org.scala-lang.modules" % "scala-collection-compat_2.13"
122122
)
123-
.dependsOn(raft, raftZmq, stores, sessionStateMachine, clientServerProtocol)
123+
.dependsOn(raft, raftZmq, stores, sessionStateMachine, clientServerProtocol, clientServerServer)
124124

125125
lazy val raftZmq = project
126126
.in(file("raft-zmq"))

client-server-server/src/main/scala/zio/raft/server/RaftServer.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,13 @@ final class RaftServer(
4949
def stepDown(leaderId: Option[MemberId]): UIO[Unit] =
5050
actionQueue.offer(RaftServer.ServerAction.StepDown(leaderId)).unit
5151

52+
/** Notify server that this node has changed leader.
53+
* @param leaderId
54+
* The new leader ID
55+
*/
56+
def leaderChanged(leaderId: MemberId): UIO[Unit] =
57+
actionQueue.offer(RaftServer.ServerAction.LeaderChanged(leaderId)).unit
58+
5259
object RaftServer:
5360

5461
/** Actions initiated by the server (internal or from Raft).

kvstore/src/main/scala/zio/kvstore/App.scala

Lines changed: 1 addition & 125 deletions
Original file line numberDiff line numberDiff line change
@@ -1,130 +1,6 @@
11
package zio.kvstore
22

3-
import zio.raft.{Command, Index, HMap}
4-
import zio.raft.sessionstatemachine.{SessionStateMachine, ScodecSerialization, ServerRequestForSession, StateWriter}
5-
import zio.raft.sessionstatemachine.asResponseType // Extension method
6-
import zio.raft.protocol.SessionId
7-
import zio.{ZIO}
8-
import zio.prelude.Newtype
9-
10-
import scodec.Codec
11-
import scodec.codecs.{ascii, discriminated, fixedSizeBytes, utf8_32}
12-
import java.time.Instant
13-
14-
// Simple KV store - no sessions, no server requests
15-
// We just use the session framework for the template pattern and HMap
16-
17-
sealed trait KVCommand extends Command
18-
19-
object KVCommand:
20-
case class Set(key: String, value: String) extends KVCommand:
21-
type Response = SetDone
22-
23-
case class Get(key: String) extends KVCommand:
24-
type Response = GetResult
25-
26-
// Codec for KVCommand (needed by ZmqRpc)
27-
val getCodec = utf8_32.as[Get]
28-
val setCodec = (utf8_32 :: utf8_32).as[Set]
29-
given commandCodec: Codec[KVCommand] = discriminated[KVCommand]
30-
.by(fixedSizeBytes(1, ascii))
31-
.typecase("S", setCodec)
32-
.typecase("G", getCodec)
33-
34-
// Response marker type - encompasses all KV command responses
35-
sealed trait KVResponse
36-
case class SetDone() extends KVResponse
37-
case class GetResult(value: Option[String]) extends KVResponse
38-
39-
// KV Schema - single prefix for key-value data
40-
object KVKey extends Newtype[String]
41-
type KVKey = KVKey.Type
42-
given HMap.KeyLike[KVKey] = HMap.KeyLike.forNewtype(KVKey)
43-
44-
type KVSchema = ("kv", KVKey, String) *: EmptyTuple
45-
type KVCompleteSchema = Tuple.Concat[zio.raft.sessionstatemachine.SessionSchema[KVResponse, KVServerRequest], KVSchema]
46-
47-
// Dummy server request type (KV store doesn't use server requests)
48-
case class NoServerRequest()
49-
50-
type KVServerRequest = NoServerRequest
51-
52-
// KV store with scodec serialization mixin
53-
class KVStateMachine extends SessionStateMachine[KVCommand, KVResponse, KVServerRequest, KVSchema]
54-
with ScodecSerialization[KVResponse, KVServerRequest, KVSchema]:
55-
56-
// Import provided codecs from Codecs object
57-
import zio.raft.sessionstatemachine.Codecs.{sessionMetadataCodec, requestIdCodec, pendingServerRequestCodec}
58-
59-
// Provide codecs for response types
60-
given Codec[SetDone] = scodec.codecs.provide(SetDone())
61-
given Codec[GetResult] = scodec.codecs.optional(scodec.codecs.bool, utf8_32).xmap(
62-
opt => GetResult(opt),
63-
gr => gr.value
64-
)
65-
66-
// Provide codec for response marker type
67-
given Codec[KVResponse] = discriminated[KVResponse]
68-
.by(fixedSizeBytes(1, ascii))
69-
.typecase("S", summon[Codec[SetDone]])
70-
.typecase("G", summon[Codec[GetResult]])
71-
72-
// Provide codec for our value types
73-
given Codec[String] = utf8_32
74-
75-
// Dummy codec for NoServerRequest
76-
given Codec[NoServerRequest] = scodec.codecs.provide(NoServerRequest())
77-
78-
val codecs = summon[HMap.TypeclassMap[Schema, Codec]]
79-
80-
protected def applyCommand(
81-
createdAt: Instant,
82-
sessionId: SessionId,
83-
command: KVCommand
84-
): StateWriter[HMap[KVCompleteSchema], ServerRequestForSession[KVServerRequest], command.Response & KVResponse] =
85-
command match
86-
case set @ KVCommand.Set(key, value) =>
87-
for
88-
state <- StateWriter.get[HMap[KVCompleteSchema]]
89-
newState = state.updated["kv"](KVKey(key), value)
90-
_ <- StateWriter.set(newState)
91-
yield SetDone().asResponseType(command, set)
92-
93-
case get @ KVCommand.Get(key) =>
94-
for
95-
state <- StateWriter.get[HMap[KVCompleteSchema]]
96-
result = state.get["kv"](KVKey(key))
97-
yield GetResult(result).asResponseType(command, get)
98-
99-
protected def handleSessionCreated(
100-
createdAt: Instant,
101-
sessionId: SessionId,
102-
capabilities: Map[String, String]
103-
): StateWriter[HMap[KVCompleteSchema], ServerRequestForSession[KVServerRequest], Unit] =
104-
StateWriter.succeed(())
105-
106-
protected def handleSessionExpired(
107-
createdAt: Instant,
108-
sessionId: SessionId,
109-
capabilities: Map[String, String]
110-
): StateWriter[HMap[KVCompleteSchema], ServerRequestForSession[KVServerRequest], Unit] =
111-
StateWriter.succeed(())
112-
113-
// takeSnapshot and restoreFromSnapshot are now provided by SessionStateMachine base class!
114-
// They use the TypeclassMap[Schema, Codec] passed in constructor
115-
116-
override def shouldTakeSnapshot(lastSnapshotIndex: Index, lastSnapshotSize: Long, commitIndex: Index): Boolean =
117-
false // Disable snapshots for now
118-
end KVStateMachine
119-
120-
// HttpServer commented out for now - SessionCommand type system complexity
121-
// The KVStateMachine implementation demonstrates:
122-
// - SessionStateMachine extension
123-
// - HMap with typed schema
124-
// - Scodec-based serialization
125-
// - TypeclassMap usage
126-
127-
// TODO: Implement proper HTTP server with session management
3+
import zio.ZIO
1284

1295
object KVStoreApp extends zio.ZIOAppDefault:
1306
override def run =
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
package zio.kvstore
2+
3+
import scodec.Codec
4+
import scodec.codecs.{ascii, discriminated, fixedSizeBytes, utf8_32}
5+
6+
object Codecs:
7+
// Response codecs
8+
given Codec[KVResponse.SetDone.type] = scodec.codecs.provide(KVResponse.SetDone)
9+
given Codec[KVResponse.WatchDone.type] = scodec.codecs.provide(KVResponse.WatchDone)
10+
given Codec[KVResponse.GetResult] = scodec.codecs.optional(scodec.codecs.bool, utf8_32).xmap(
11+
opt => KVResponse.GetResult(opt),
12+
gr => gr.value
13+
)
14+
15+
given Codec[KVResponse] =
16+
discriminated[KVResponse]
17+
.by(fixedSizeBytes(1, ascii))
18+
.typecase("S", summon[Codec[KVResponse.SetDone.type]])
19+
.typecase("W", summon[Codec[KVResponse.WatchDone.type]])
20+
.typecase("G", summon[Codec[KVResponse.GetResult]])
21+
22+
// Value codec for KV schema values
23+
given Codec[String] = utf8_32
24+
private val sessionIdCodec = zio.raft.protocol.Codecs.sessionIdCodec
25+
given sessionIdSetCodec: Codec[Set[zio.raft.protocol.SessionId]] =
26+
scodec.codecs.listOfN(scodec.codecs.uint16, sessionIdCodec).xmap(_.toSet, _.toList)
27+
given kvKeySetCodec: Codec[Set[KVKey]] =
28+
scodec.codecs.listOfN(scodec.codecs.uint16, utf8_32).xmap(_.map(KVKey(_)).toSet, _.toList.map(KVKey.unwrap))
29+
30+
// Command codecs
31+
private val getCodec: Codec[KVCommand.Get] = utf8_32.as[KVCommand.Get]
32+
private val setCodec: Codec[KVCommand.Set] = (utf8_32 :: utf8_32).as[KVCommand.Set]
33+
private val watchCodec: Codec[KVCommand.Watch] = utf8_32.as[KVCommand.Watch]
34+
given Codec[KVCommand] =
35+
discriminated[KVCommand]
36+
.by(fixedSizeBytes(1, ascii))
37+
.typecase("S", setCodec)
38+
.typecase("G", getCodec)
39+
.typecase("W", watchCodec)
Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
package zio.kvstore
2+
3+
import zio.raft.{Index, HMap}
4+
import zio.raft.sessionstatemachine.{SessionStateMachine, ScodecSerialization, ServerRequestForSession, StateWriter}
5+
import zio.raft.sessionstatemachine.asResponseType
6+
import zio.raft.protocol.SessionId
7+
import zio.kvstore.protocol.KVServerRequest
8+
import scodec.Codec
9+
import java.time.Instant
10+
import zio.kvstore.Codecs.given
11+
import zio.raft.sessionstatemachine.given
12+
import zio.raft.sessionstatemachine.Codecs.{sessionMetadataCodec, requestIdCodec, pendingServerRequestCodec}
13+
14+
class KVStateMachine extends SessionStateMachine[KVCommand, KVResponse, zio.kvstore.protocol.KVServerRequest, KVSchema]
15+
with ScodecSerialization[KVResponse, zio.kvstore.protocol.KVServerRequest, KVSchema]:
16+
17+
// Local alias to aid match-type reduction bug in scala 3.3
18+
type Schema = Tuple.Concat[zio.raft.sessionstatemachine.SessionSchema[KVResponse, KVServerRequest], KVSchema]
19+
private type SW[A] = StateWriter[HMap[Schema], ServerRequestForSession[zio.kvstore.protocol.KVServerRequest], A]
20+
21+
// Helpers
22+
private def putValue(key: KVKey, value: String): SW[Unit] =
23+
StateWriter.update[HMap[Schema], HMap[Schema]](s => s.updated["kv"](key, value))
24+
25+
private def getValue(key: KVKey): SW[Option[String]] =
26+
StateWriter.get[HMap[Schema]].map { s =>
27+
s.get["kv"](key)
28+
}
29+
30+
private def subscribersFor(key: KVKey): SW[Set[zio.raft.protocol.SessionId]] =
31+
StateWriter.get[HMap[Schema]].map { s =>
32+
s.get["subsByKey"](key).getOrElse(Set.empty)
33+
}
34+
35+
private def addSubscription(sessionId: zio.raft.protocol.SessionId, key: KVKey): SW[Unit] =
36+
StateWriter.update[HMap[Schema], HMap[Schema]] { s =>
37+
val subs = s.get["subsByKey"](key).getOrElse(Set.empty)
38+
val s1 = s.updated["subsByKey"](key, subs + sessionId)
39+
val keys = s1.get["subsBySession"](sessionId).getOrElse(Set.empty)
40+
s1.updated["subsBySession"](sessionId, keys + key)
41+
}
42+
43+
private def removeAllSubscriptions(sessionId: zio.raft.protocol.SessionId): SW[Unit] =
44+
StateWriter.update[HMap[Schema], HMap[Schema]] { s =>
45+
val keys = s.get["subsBySession"](sessionId).getOrElse(Set.empty)
46+
val cleared = keys.foldLeft(s) { (acc, k) =>
47+
val set = acc.get["subsByKey"](k).getOrElse(Set.empty) - sessionId
48+
acc.updated["subsByKey"](k, set)
49+
}
50+
cleared.removed["subsBySession"](sessionId)
51+
}
52+
53+
// Typeclass map for Schema → Codec
54+
val codecs = summon[HMap.TypeclassMap[Schema, Codec]]
55+
56+
protected def applyCommand(
57+
createdAt: Instant,
58+
sessionId: SessionId,
59+
command: KVCommand
60+
): SW[command.Response & KVResponse] =
61+
command match
62+
case set @ KVCommand.Set(key, value) =>
63+
val k = KVKey(key)
64+
for
65+
_ <- putValue(k, value)
66+
sessions <- subscribersFor(k)
67+
_ <- StateWriter.foreach(sessions) { sid =>
68+
StateWriter.log(
69+
ServerRequestForSession[
70+
KVServerRequest
71+
](
72+
sid,
73+
KVServerRequest.Notification(key, value)
74+
)
75+
)
76+
}
77+
yield KVResponse.SetDone.asResponseType(command, set)
78+
79+
case get @ KVCommand.Get(key) =>
80+
getValue(KVKey(key)).map(result => KVResponse.GetResult(result).asResponseType(command, get))
81+
82+
case watch @ KVCommand.Watch(key) =>
83+
val k = KVKey(key)
84+
for
85+
_ <- addSubscription(sessionId, k)
86+
current <- getValue(k)
87+
_ <- current match
88+
case Some(v) =>
89+
StateWriter.log(
90+
ServerRequestForSession[
91+
KVServerRequest
92+
](
93+
sessionId,
94+
KVServerRequest.Notification(key, v)
95+
)
96+
)
97+
case None => StateWriter.unit
98+
yield KVResponse.WatchDone.asResponseType(command, watch)
99+
100+
protected def handleSessionCreated(
101+
createdAt: Instant,
102+
sessionId: SessionId,
103+
capabilities: Map[String, String]
104+
): SW[Unit] =
105+
StateWriter.unit
106+
107+
protected def handleSessionExpired(
108+
createdAt: Instant,
109+
sessionId: SessionId,
110+
capabilities: Map[String, String]
111+
): SW[Unit] =
112+
removeAllSubscriptions(sessionId)
113+
114+
override def shouldTakeSnapshot(lastSnapshotIndex: Index, lastSnapshotSize: Long, commitIndex: Index): Boolean =
115+
(commitIndex.value - lastSnapshotIndex.value) >= 100
116+
117+
end KVStateMachine

0 commit comments

Comments
 (0)