Skip to content

Commit ffab49b

Browse files
committed
fmt and tests
1 parent c167165 commit ffab49b

File tree

10 files changed

+226
-51
lines changed

10 files changed

+226
-51
lines changed

client-server-client/src/main/scala/zio/raft/client/PendingQueries.scala

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,5 +62,3 @@ object PendingQueries {
6262
lastSentAt: Instant
6363
)
6464
}
65-
66-

client-server-client/src/main/scala/zio/raft/client/RaftClient.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,7 @@ object RaftClient {
165165

166166
case StreamEvent.Action(ClientAction.SubmitCommand(_, promise)) =>
167167
promise.die(new IllegalStateException("Not connected")).ignore.as(this)
168-
168+
169169
case StreamEvent.Action(ClientAction.SubmitQuery(_, promise)) =>
170170
promise.die(new IllegalStateException("Not connected")).ignore.as(this)
171171

@@ -266,7 +266,7 @@ object RaftClient {
266266
now <- Clock.instant
267267
newPending = pendingRequests.add(requestId, payload, promise, now)
268268
} yield copy(pendingRequests = newPending)
269-
269+
270270
case StreamEvent.Action(ClientAction.SubmitQuery(payload, promise)) =>
271271
// Queue query while connecting (handled after session established)
272272
for {
@@ -425,7 +425,7 @@ object RaftClient {
425425
correlationId <- Random.nextUUID.map(u => CorrelationId.fromString(u.toString))
426426
newPending = pendingQueries.add(correlationId, payload, promise, now)
427427
} yield copy(pendingQueries = newPending)
428-
428+
429429
case StreamEvent.TimeoutCheck =>
430430
for {
431431
now <- Clock.instant
@@ -551,7 +551,7 @@ object RaftClient {
551551
// correlationId via client-side generator (to be implemented in T024)
552552
correlationId <- Random.nextUUID.map(u => CorrelationId.fromString(u.toString))
553553
newPending = pendingQueries.add(correlationId, payload, promise, now)
554-
_ <- transport.sendMessage(Query(correlationId, payload, now)).orDie
554+
_ <- transport.sendMessage(Query(correlationId, payload, now)).orDie
555555
} yield copy(pendingQueries = newPending)
556556

557557
case StreamEvent.ServerMsg(ClientResponse(requestId, result)) =>

client-server-server/src/main/scala/zio/raft/server/RaftServer.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ final class RaftServer(
6060
*/
6161
def leaderChanged(leaderId: MemberId): UIO[Unit] =
6262
actionQueue.offer(RaftServer.ServerAction.LeaderChanged(leaderId)).unit
63+
end RaftServer
6364

6465
object RaftServer:
6566

@@ -91,6 +92,7 @@ object RaftServer:
9192
) extends RaftAction
9293
case class ServerRequestAck(sessionId: SessionId, requestId: RequestId) extends RaftAction
9394
case class ExpireSession(sessionId: SessionId) extends RaftAction
95+
9496
/** Read-only Query forwarded to the application/handler layer. */
9597
case class Query(sessionId: SessionId, correlationId: CorrelationId, payload: ByteVector) extends RaftAction
9698

client-server-server/src/test/scala/zio/raft/server/QueryServerSpec.scala

Lines changed: 96 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,105 @@ package zio.raft.server
22

33
import zio.*
44
import zio.test.*
5+
import zio.test.Assertion.*
6+
import zio.raft.protocol.*
7+
import zio.raft.protocol.Codecs.*
8+
import zio.raft.server.RaftServer.*
9+
import zio.zmq.*
10+
import scodec.bits.ByteVector
511

6-
object QueryServerSpec extends ZIOSpecDefault {
12+
object QueryServerSpec extends ZIOSpec[TestEnvironment & ZContext]:
713

8-
override def spec: Spec[Environment & TestEnvironment & Scope, Any] =
9-
suiteAll("Server Query Handling") {
14+
override def bootstrap: ZLayer[Any, Any, TestEnvironment & ZContext] =
15+
testEnvironment ++ ZContext.live
1016

11-
test("Follower rejects Query with NotLeaderAnymore SessionClosed") {
12-
assertTrue(true) // Placeholder: full ZMQ harness out of scope here
13-
}
17+
// Helpers
18+
def sendClientMessage(socket: ZSocket, message: ClientMessage): Task[Unit] =
19+
for
20+
bytes <- ZIO.attempt(clientMessageCodec.encode(message).require.toByteArray)
21+
_ <- socket.send(bytes)
22+
yield ()
23+
24+
def receiveServerMessage(socket: ZSocket): Task[ServerMessage] =
25+
for
26+
chunk <- socket.receive
27+
bytes = ByteVector(chunk.toArray)
28+
message <- ZIO
29+
.fromEither(serverMessageCodec.decode(bytes.bits).toEither.map(_.value))
30+
.mapError(err => new RuntimeException(s"Failed to decode: $err"))
31+
yield message
32+
33+
def waitForMessage[A <: ServerMessage](socket: ZSocket, timeout: Duration = 3.seconds)(implicit
34+
tt: scala.reflect.TypeTest[ServerMessage, A],
35+
ct: scala.reflect.ClassTag[A]
36+
): Task[A] =
37+
receiveServerMessage(socket).timeout(timeout).flatMap {
38+
case Some(msg: A) => ZIO.succeed(msg)
39+
case Some(other) => ZIO.fail(new RuntimeException(s"Expected ${ct.runtimeClass.getSimpleName}, got ${other.getClass.getSimpleName}"))
40+
case None => ZIO.fail(new RuntimeException(s"Timeout waiting for ${ct.runtimeClass.getSimpleName}"))
41+
}
42+
43+
private val testPort = 25556
44+
private val serverAddress = s"tcp://127.0.0.1:$testPort"
45+
46+
override def spec = suiteAll("Server Query Handling") {
47+
48+
test("Follower rejects Query with NotLeaderAnymore SessionClosed") {
49+
for
50+
server <- RaftServer.make(s"tcp://0.0.0.0:$testPort")
51+
_ <- ZIO.sleep(200.millis)
52+
client <- ZSocket.client
53+
_ <- client.connect(serverAddress)
54+
// Send Query without a session (server derives routing but is follower)
55+
now <- Clock.instant
56+
_ <- sendClientMessage(client, Query(CorrelationId.fromString("q1"), ByteVector(1,2,3), now))
57+
msg <- waitForMessage[SessionClosed](client)
58+
yield assertTrue(msg.reason == SessionCloseReason.NotLeaderAnymore)
59+
}
60+
61+
test("Leader forwards Query to Raft and send QueryResponse back") {
62+
for
63+
server <- RaftServer.make(s"tcp://0.0.0.0:$testPort")
64+
_ <- ZIO.sleep(200.millis)
65+
_ <- server.stepUp(Map.empty)
66+
_ <- ZIO.sleep(100.millis)
67+
68+
client <- ZSocket.client
69+
_ <- client.connect(serverAddress)
70+
71+
// Create session first
72+
nonce <- Nonce.generate()
73+
_ <- sendClientMessage(client, CreateSession(Map("kv" -> "v1"), nonce))
74+
_ <- ZIO.sleep(100.millis)
75+
action <- server.raftActions.take(1).runCollect.map(_.head)
76+
sessionId = action.asInstanceOf[RaftAction.CreateSession].sessionId
77+
_ <- server.confirmSessionCreation(sessionId)
78+
_ <- waitForMessage[SessionCreated](client)
79+
80+
// Send Query
81+
corr = CorrelationId.fromString("corr-xyz")
82+
payload = ByteVector(0x0, 0x1, 0x2)
83+
now <- Clock.instant
84+
_ <- sendClientMessage(client, Query(corr, payload, now))
85+
86+
// Verify RaftAction.Query queued
87+
_ <- ZIO.sleep(100.millis)
88+
qAction <- server.raftActions.take(1).runCollect.map(_.head)
89+
verified = qAction.isInstanceOf[RaftAction.Query]
90+
_ <- ZIO.fail(new RuntimeException("Expected RaftAction.Query")).unless(verified).ignore
91+
q = qAction.asInstanceOf[RaftAction.Query]
92+
93+
// Send QueryResponse back
94+
result = ByteVector(0xA, 0xB)
95+
_ <- server.sendQueryResponse(sessionId, QueryResponse(corr, result))
96+
qr <- waitForMessage[QueryResponse](client)
97+
yield assertTrue(q.sessionId == sessionId) &&
98+
assertTrue(q.correlationId == corr) &&
99+
assertTrue(q.payload == payload) &&
100+
assertTrue(qr.correlationId == corr) &&
101+
assertTrue(qr.result == result)
14102
}
15-
}
103+
} @@ TestAspect.sequential @@ TestAspect.withLiveClock
104+
end QueryServerSpec
16105

17106

kvstore/src/main/scala/zio/kvstore/Codecs.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ object Codecs:
2727
given kvKeySetCodec: Codec[Set[KVKey]] =
2828
scodec.codecs.listOfN(scodec.codecs.uint16, utf8_32).xmap(_.map(KVKey(_)).toSet, _.toList.map(KVKey.unwrap))
2929

30-
// Command codecs
30+
// Command codecs
3131
private val setCodec: Codec[KVCommand.Set] = (utf8_32 :: utf8_32).as[KVCommand.Set]
3232
private val watchCodec: Codec[KVCommand.Watch] = utf8_32.as[KVCommand.Watch]
3333
given Codec[KVCommand] =

kvstore/src/main/scala/zio/kvstore/KVStateMachine.scala

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,8 +76,6 @@ class KVStateMachine extends SessionStateMachine[KVCommand, KVResponse, zio.kvst
7676
}
7777
yield KVResponse.SetDone.asResponseType(command, set)
7878

79-
80-
8179
case watch @ KVCommand.Watch(key) =>
8280
val k = KVKey(key)
8381
for

kvstore/src/main/scala/zio/kvstore/node/Node.scala

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -92,8 +92,6 @@ final case class Node(
9292
}
9393
case _ => ZIO.unit
9494

95-
96-
9795
case NodeAction.FromServer(KVServerAction.ServerRequestAck(sessionId, requestId)) =>
9896
for
9997
now <- Clock.instant

kvstore/src/test/scala/zio/kvstore/KVStoreQueryGetSpec.scala

Lines changed: 96 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,106 @@ package zio.kvstore
22

33
import zio.*
44
import zio.test.*
5+
import zio.test.Assertion.*
6+
import zio.raft.protocol.*
7+
import zio.raft.protocol.Codecs.*
8+
import zio.kvstore.protocol.*
9+
import zio.kvstore.protocol.KVClientResponse.given
10+
import scodec.bits.ByteVector
11+
import scodec.Codec
12+
import zio.zmq.*
13+
import java.nio.file.Files
14+
import zio.lmdb.Environment as LmdbEnv
515

6-
object KVStoreQueryGetSpec extends ZIOSpecDefault {
16+
object KVStoreQueryGetSpec extends ZIOSpec[TestEnvironment & ZContext]:
717

8-
override def spec: Spec[Environment & TestEnvironment & Scope, Any] =
9-
suiteAll("kvstore GET via Query") {
18+
override def bootstrap: ZLayer[Any, Any, TestEnvironment & ZContext] =
19+
testEnvironment ++ ZContext.live
1020

11-
test("placeholder integration") {
12-
assertTrue(true)
21+
// Helpers
22+
def sendClientMessage(socket: ZSocket, message: ClientMessage): Task[Unit] =
23+
for
24+
bytes <- ZIO.attempt(clientMessageCodec.encode(message).require.toByteArray)
25+
_ <- socket.send(bytes)
26+
yield ()
27+
28+
def receiveServerMessage(socket: ZSocket): Task[ServerMessage] =
29+
for
30+
chunk <- socket.receive
31+
bytes = ByteVector(chunk.toArray)
32+
message <- ZIO
33+
.fromEither(serverMessageCodec.decode(bytes.bits).toEither.map(_.value))
34+
.mapError(err => new RuntimeException(s"Failed to decode: $err"))
35+
yield message
36+
37+
def waitForMessage[A <: ServerMessage](socket: ZSocket, timeout: Duration = 5.seconds)(implicit
38+
tt: scala.reflect.TypeTest[ServerMessage, A],
39+
ct: scala.reflect.ClassTag[A]
40+
): Task[A] =
41+
receiveServerMessage(socket).timeout(timeout).flatMap {
42+
case Some(msg: A) => ZIO.succeed(msg)
43+
case Some(other) => ZIO.fail(new RuntimeException(s"Expected ${ct.runtimeClass.getSimpleName}, got ${other.getClass.getSimpleName}"))
44+
case None => ZIO.fail(new RuntimeException(s"Timeout waiting for ${ct.runtimeClass.getSimpleName}"))
45+
}
46+
47+
private val serverPort = 26555
48+
private val raftRpcPort = 26556
49+
private val serverAddress = s"tcp://127.0.0.1:$serverPort"
50+
private val nodeAddress = s"tcp://127.0.0.1:$raftRpcPort"
51+
52+
override def spec = suiteAll("kvstore GET via Query") {
53+
54+
test("set then get returns current value; GET is served via Query") {
55+
ZIO.scoped {
56+
for
57+
logDir <- ZIO.attempt(Files.createTempDirectory("kv-log-").toFile).map(_.getAbsolutePath)
58+
snapDir <- ZIO.attempt(Files.createTempDirectory("kv-snap-").toFile).map(_.getAbsolutePath)
59+
lmdbDir <- ZIO.attempt(Files.createTempDirectory("kv-lmdb-").toFile).map(_.getAbsolutePath)
60+
61+
result <- (for
62+
// Start node (single-node bootstrap)
63+
node <- zio.kvstore.node.Node.make(
64+
serverAddress = s"tcp://0.0.0.0:$serverPort",
65+
nodeAddress = nodeAddress,
66+
logDirectory = logDir,
67+
snapshotDirectory = snapDir,
68+
memberId = zio.raft.MemberId("node-1"),
69+
peers = Map.empty
70+
)
71+
_ <- node.run.forkScoped
72+
_ <- ZIO.sleep(600.millis) // let services bind
73+
74+
// Client connects
75+
client <- ZSocket.client
76+
_ <- client.connect(serverAddress)
77+
78+
// Create session
79+
nonce <- Nonce.generate()
80+
_ <- sendClientMessage(client, CreateSession(Map("kv" -> "v1"), nonce))
81+
created <- waitForMessage[SessionCreated](client)
82+
sessionId = created.sessionId
83+
84+
// Send ClientRequest(Set)
85+
rid = RequestId.fromLong(1L)
86+
payloadSet <- ZIO.fromEither(implicitly[Codec[KVClientRequest]].encode(KVClientRequest.Set("foo", "bar")).toEither).map(_.bytes)
87+
_ <- sendClientMessage(client, ClientRequest(rid, rid, payloadSet, java.time.Instant.now()))
88+
_ <- waitForMessage[ClientResponse](client)
89+
90+
// Send Query(Get)
91+
corr = CorrelationId.fromString("q-1")
92+
payloadGet <- ZIO.fromEither(implicitly[Codec[KVQuery]].encode(KVQuery.Get("foo")).toEither).map(_.bytes)
93+
_ <- sendClientMessage(client, Query(corr, payloadGet, java.time.Instant.now()))
94+
qres <- waitForMessage[QueryResponse](client)
95+
96+
// Decode Query result Option[String]
97+
getResult <- ZIO.fromEither(implicitly[Codec[Option[String]]].decode(qres.result.bits).toEither.map(_.value))
98+
yield assertTrue(getResult.contains("bar"))).provideSomeLayer(
99+
LmdbEnv.builder.withMaxDbs(3).layer(new java.io.File(lmdbDir))
100+
)
101+
yield result
13102
}
14103
}
15-
}
104+
} @@ TestAspect.sequential @@ TestAspect.withLiveClock
105+
end KVStoreQueryGetSpec
16106

17107

specs/005-client-server-libraries/plan.md

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -170,18 +170,18 @@ specs/[###-feature]/
170170
*This checklist is updated during execution flow*
171171

172172
**Phase Status**:
173-
- [ ] Phase 0: Research complete (/plan command)
174-
- [ ] Phase 1: Design complete (/plan command)
175-
- [ ] Phase 2: Task planning complete (/plan command - describe approach only)
176-
- [ ] Phase 3: Tasks generated (/tasks command)
177-
- [ ] Phase 4: Implementation complete
178-
- [ ] Phase 5: Validation passed
173+
- [X] Phase 0: Research complete (/plan command)
174+
- [X] Phase 1: Design complete (/plan command)
175+
- [X] Phase 2: Task planning complete (/plan command - describe approach only)
176+
- [X] Phase 3: Tasks generated (/tasks command)
177+
- [X] Phase 4: Implementation complete
178+
- [X] Phase 5: Validation passed
179179

180180
**Gate Status**:
181-
- [ ] Initial Constitution Check: PASS
182-
- [ ] Post-Design Constitution Check: PASS
183-
- [ ] All NEEDS CLARIFICATION resolved
184-
- [ ] Complexity deviations documented
181+
- [X] Initial Constitution Check: PASS
182+
- [X] Post-Design Constitution Check: PASS
183+
- [X] All NEEDS CLARIFICATION resolved
184+
- [X] Complexity deviations documented
185185

186186
---
187187
*Based on Constitution v1.0.0 - See `.specify/memory/constitution.md`*

0 commit comments

Comments
 (0)