Skip to content

Commit 37a774d

Browse files
committed
session codec tests
1 parent 1f2fcf7 commit 37a774d

File tree

8 files changed

+128
-146
lines changed

8 files changed

+128
-146
lines changed

kvstore/src/test/scala/zio/kvstore/DeliverySpec.scala

Lines changed: 0 additions & 11 deletions
This file was deleted.

kvstore/src/test/scala/zio/kvstore/KVSessionSpec.scala

Lines changed: 0 additions & 47 deletions
This file was deleted.

kvstore/src/test/scala/zio/kvstore/KVStateMachineSpec.scala

Lines changed: 0 additions & 54 deletions
This file was deleted.

kvstore/src/test/scala/zio/kvstore/SessionExpirySpec.scala

Lines changed: 0 additions & 11 deletions
This file was deleted.

kvstore/src/test/scala/zio/kvstore/WatchSpec.scala

Lines changed: 0 additions & 15 deletions
This file was deleted.

session-state-machine/src/main/scala/zio/raft/sessionstatemachine/Codecs.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,16 @@ import zio.raft.protocol.Codecs.{sessionIdCodec, requestIdCodec as requestIdCode
1414
* - PendingServerRequest[SR] (automatically derived from payload codec)
1515
*/
1616
object Codecs:
17+
val capabilitiesCodec: Codec[Map[String, String]] =
18+
listOfN(int32, (utf8_32 :: utf8_32).as[(String, String)]).xmap(_.toMap, _.toList)
1719

1820
/** Codec for SessionMetadata.
1921
*
2022
* Encodes as: Map[String, String] (capabilities) ++ Long (timestamp)
2123
*/
2224
given sessionMetadataCodec: Codec[SessionMetadata] =
2325
import scodec.codecs.*
24-
(list(utf8_32 :: utf8_32).xmap(_.toMap, _.toList) :: int64).xmap(
26+
(capabilitiesCodec :: int64).xmap(
2527
{ case (caps, ts) => SessionMetadata(caps, Instant.ofEpochMilli(ts)) },
2628
sm => (sm.capabilities, sm.createdAt.toEpochMilli)
2729
)
@@ -80,8 +82,6 @@ object Codecs:
8082
)
8183

8284
val createSessionV0: Codec[SessionCommand.CreateSession[SR]] =
83-
val capabilitiesCodec: Codec[Map[String, String]] =
84-
listOfN(int32, (utf8_32 :: utf8_32).as[(String, String)]).xmap(_.toMap, _.toList)
8585
(instantCodec :: sessionIdCodec :: capabilitiesCodec).as[SessionCommand.CreateSession[SR]]
8686
val createSessionCodec: Codec[SessionCommand.CreateSession[SR]] =
8787
(uint8 :: createSessionV0).xmap(
Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
package zio.raft.sessionstatemachine
2+
3+
import zio.test.*
4+
import scodec.Codec
5+
import scodec.codecs.*
6+
import zio.raft.sessionstatemachine.Codecs.given
7+
import zio.raft.protocol.*
8+
import java.time.Instant
9+
import zio.raft.Command
10+
11+
object CodecsSpec extends ZIOSpecDefault:
12+
def spec =
13+
suiteAll("Session State Machine Codecs") {
14+
test("sessionMetadata round-trip") {
15+
val sm = SessionMetadata(Map("a" -> "b"), Instant.ofEpochMilli(1234L))
16+
val bits = summon[Codec[SessionMetadata]].encode(sm).require
17+
val decoded = summon[Codec[SessionMetadata]].decode(bits).require.value
18+
assertTrue(decoded == sm)
19+
}
20+
21+
test("requestId round-trip") {
22+
val r = RequestId(42L)
23+
val bits = Codecs.requestIdCodec.encode(r).require
24+
val decoded = Codecs.requestIdCodec.decode(bits).require.value
25+
assertTrue(decoded == r)
26+
}
27+
28+
test("pendingServerRequest round-trip") {
29+
given Codec[String] = utf8_32
30+
val psr = PendingServerRequest("payload", Instant.ofEpochMilli(999L))
31+
val bits = summon[Codec[PendingServerRequest[String]]].encode(psr).require
32+
val decoded = summon[Codec[PendingServerRequest[String]]].decode(bits).require.value
33+
assertTrue(decoded == psr)
34+
}
35+
36+
suiteAll("sessionCommand discriminated union round-trips") {
37+
test("ClientRequest") {
38+
// define a concrete UC that extends zio.raft.Command and provide a codec for it
39+
case object DummyCmd extends Command:
40+
type Response = Unit
41+
given Codec[DummyCmd.type] = provide(DummyCmd)
42+
given Codec[Unit] = provide(())
43+
val cmd: SessionCommand[DummyCmd.type, Unit] = SessionCommand.ClientRequest[DummyCmd.type, Unit](
44+
createdAt = Instant.EPOCH,
45+
sessionId = SessionId.fromString("s-1"),
46+
requestId = RequestId(1L),
47+
lowestPendingRequestId = RequestId(0L),
48+
command = DummyCmd
49+
)
50+
val codec = summon[Codec[SessionCommand[DummyCmd.type, Unit]]]
51+
val bits = codec.encode(cmd).require
52+
val decoded = codec.decode(bits).require.value
53+
assertTrue(decoded == cmd)
54+
}
55+
56+
test("ServerRequestAck") {
57+
// reuse Dummy UC type param for the union codec
58+
case object DummyCmd extends Command:
59+
type Response = Unit
60+
given Codec[DummyCmd.type] = provide(DummyCmd)
61+
given Codec[Unit] = provide(())
62+
val cmd = SessionCommand.ServerRequestAck[Unit](
63+
createdAt = Instant.EPOCH,
64+
sessionId = SessionId.fromString("s-2"),
65+
requestId = RequestId(2L)
66+
)
67+
val codec = summon[Codec[SessionCommand[DummyCmd.type, Unit]]]
68+
val bits = codec.encode(cmd).require
69+
val decoded = codec.decode(bits).require.value
70+
assertTrue(decoded == cmd)
71+
}
72+
73+
test("CreateSession") {
74+
case object DummyCmd extends Command:
75+
type Response = Unit
76+
given Codec[DummyCmd.type] = provide(DummyCmd)
77+
given Codec[Unit] = provide(())
78+
val cmd = SessionCommand.CreateSession[Unit](
79+
createdAt = Instant.EPOCH,
80+
sessionId = SessionId.fromString("s-3"),
81+
capabilities = Map("k" -> "v")
82+
)
83+
val codec = summon[Codec[SessionCommand[DummyCmd.type, Unit]]]
84+
val bits = codec.encode(cmd).require
85+
val decoded = codec.decode(bits).require.value
86+
assertTrue(decoded == cmd)
87+
}
88+
89+
test("SessionExpired") {
90+
case object DummyCmd extends Command:
91+
type Response = Unit
92+
given Codec[DummyCmd.type] = provide(DummyCmd)
93+
given Codec[Unit] = provide(())
94+
val cmd = SessionCommand.SessionExpired[Unit](
95+
createdAt = Instant.EPOCH,
96+
sessionId = SessionId.fromString("s-4")
97+
)
98+
val codec = summon[Codec[SessionCommand[DummyCmd.type, Unit]]]
99+
val bits = codec.encode(cmd).require
100+
val decoded = codec.decode(bits).require.value
101+
assertTrue(decoded == cmd)
102+
}
103+
104+
test("GetRequestsForRetry") {
105+
case object DummyCmd extends Command:
106+
type Response = Unit
107+
given Codec[DummyCmd.type] = provide(DummyCmd)
108+
given Codec[Unit] = provide(())
109+
val cmd = SessionCommand.GetRequestsForRetry[Unit](
110+
createdAt = Instant.EPOCH,
111+
lastSentBefore = Instant.ofEpochMilli(500L)
112+
)
113+
val codec = summon[Codec[SessionCommand[DummyCmd.type, Unit]]]
114+
val bits = codec.encode(cmd).require
115+
val decoded = codec.decode(bits).require.value
116+
assertTrue(decoded == cmd)
117+
}
118+
}
119+
}
120+
end CodecsSpec

specs/004-add-client-server/tasks.md

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,13 +38,13 @@
3838
- [X] T014 Implement `Node` 10s retry stream: periodically check `hasPendingRequests` (dirty read) and when true initiate `SessionCommand.GetRequestsForRetry`; publish resulting actions via the same `RaftServer` methods.
3939
- [X] T015 Implement `Node` raft state notifications stream: consume Raft core `stateNotifications` and map to `RaftServer.stepUp`/`stepDown` to reflect leadership changes.
4040
- [X] T016 Compose `Node.run` to run raftActions stream, retry stream, and state notifications stream concurrently; ensure backpressure/termination semantics; publish all outputs via appropriate `RaftServer` methods.
41-
- [ ] T017 Create `kvstore-cli` skeleton at `/Users/somdoron/git/zio-raft/kvstore-cli/src/main/scala/zio/kvstore/cli/Main.scala`; initialize client connection to Raft cluster using `client-server-client`. Read endpoints from `--endpoints` flag or `KVSTORE_ENDPOINTS`; default to localhost if unspecified.
42-
- [ ] T018 Implement `set` command in CLI: parse args and send set request via client; print confirmation.
43-
- [ ] T019 Implement `get` command in CLI: parse args and fetch value; print key/value.
44-
- [ ] T020 Implement `watch` command in CLI: parse args, subscribe to key; print initial value and stream subsequent updates until session ends.
41+
- [X] T017 Create `kvstore-cli` skeleton at `/Users/somdoron/git/zio-raft/kvstore-cli/src/main/scala/zio/kvstore/cli/Main.scala`; initialize client connection to Raft cluster using `client-server-client`. Read endpoints from `--endpoints` flag or `KVSTORE_ENDPOINTS`; default to localhost if unspecified.
42+
- [X] T018 Implement `set` command in CLI: parse args and send set request via client; print confirmation.
43+
- [X] T019 Implement `get` command in CLI: parse args and fetch value; print key/value.
44+
- [X] T020 Implement `watch` command in CLI: parse args, subscribe to key; print initial value and stream subsequent updates until session ends.
4545

4646
## Phase 3.4: Integration
47-
- [ ] T021 Wire `KVStoreServerApp` to start the Raft server and run `Node` (raftActions, retry, and state notifications streams) and expose graceful shutdown.
47+
- [X] T021 Wire `KVStoreServerApp` to start the Raft server and run `Node` (raftActions, retry, and state notifications streams) and expose graceful shutdown.
4848
- [ ] T022 Ensure subscriptions are removed on session expiry by updating `handleSessionExpired` in `KVStateMachine` and validating via logs/tests.
4949
- [ ] T023 Add minimal logging and metrics using ZIO logging for watch and Node retry/state paths.
5050

0 commit comments

Comments
 (0)