Skip to content

Commit 6f10c73

Browse files
committed
more tests
1 parent ffab49b commit 6f10c73

File tree

3 files changed

+72
-17
lines changed

3 files changed

+72
-17
lines changed

client-server-client/src/test/scala/zio/raft/client/PendingQueriesSpec.scala

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,39 @@ object PendingQueriesSpec extends ZIOSpecDefault {
3939
r <- p.await
4040
} yield assertTrue(r == ByteVector(7))
4141
}
42+
43+
test("resendExpired resends only timed-out queries and updates lastSentAt") {
44+
for {
45+
sentRef <- Ref.make(0)
46+
transport = new ClientTransport {
47+
def connect(address: String) = ZIO.unit
48+
def disconnect() = ZIO.unit
49+
def sendMessage(message: ClientMessage) = sentRef.update(_ + 1).unit
50+
def incomingMessages = ZStream.empty
51+
}
52+
53+
now <- Clock.instant
54+
currentTime = now.plusSeconds(60)
55+
timeout = 10.seconds
56+
57+
p1 <- Promise.make[Nothing, ByteVector]
58+
p2 <- Promise.make[Nothing, ByteVector]
59+
cid1 = CorrelationId.fromString("exp-1")
60+
cid2 = CorrelationId.fromString("ok-2")
61+
62+
// cid1 last sent long ago (expired); cid2 recently (not expired)
63+
pq0 = PendingQueries.empty
64+
.add(cid1, ByteVector(1), p1, currentTime.minusSeconds(60))
65+
.add(cid2, ByteVector(2), p2, currentTime.minusSeconds(5))
66+
67+
pq1 <- pq0.resendExpired(transport, currentTime, timeout)
68+
sent <- sentRef.get
69+
d1 = pq1.queries(cid1)
70+
d2 = pq1.queries(cid2)
71+
} yield assertTrue(sent == 1) &&
72+
assertTrue(d1.lastSentAt == currentTime) &&
73+
assertTrue(d2.lastSentAt == currentTime.minusSeconds(5))
74+
}
4275
}
4376
}
4477

client-server-client/src/test/scala/zio/raft/client/RaftClientSpec.scala

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,45 @@ object RaftClientSpec extends ZIOSpec[TestEnvironment & ZContext] {
178178
}
179179
}
180180

181+
// ==================================================================
182+
// Query Submission Tests
183+
// ==================================================================
184+
185+
suiteAll("Query Submission") {
186+
test("should send Query and complete promise when QueryResponse received") {
187+
val port = findOpenPort
188+
for {
189+
mockServer <- ZSocket.server
190+
_ <- mockServer.bind(s"tcp://0.0.0.0:$port")
191+
192+
client <- RaftClient.make(
193+
clusterMembers = Map(MemberId.fromString("node1") -> s"tcp://127.0.0.1:$port"),
194+
capabilities = Map("test" -> "v1")
195+
)
196+
_ <- client.run().forkScoped
197+
_ <- client.connect()
198+
199+
(routingId, createMsg) <- expectMessage[CreateSession](mockServer)
200+
201+
sessionId <- SessionId.generate()
202+
_ <- sendServerMessage(mockServer, routingId, SessionCreated(sessionId, createMsg.nonce))
203+
204+
// Issue a query from the client
205+
payload = ByteVector.fromValidHex("c0ffee")
206+
queryFiber <- client.query(payload).fork
207+
208+
// Wait for Query message
209+
(_, queryMsg) <- waitForMessage[Query](mockServer)
210+
211+
// Respond with matching correlationId
212+
result = ByteVector.fromValidHex("beaded")
213+
_ <- sendServerMessage(mockServer, routingId, QueryResponse(queryMsg.correlationId, result))
214+
215+
response <- queryFiber.join
216+
} yield assertTrue(response == result)
217+
}
218+
}
219+
181220
// ==========================================================================
182221
// Request Submission Tests
183222
// ==========================================================================

client-server-server/src/test/scala/zio/raft/server/QueryServerSpec.scala

Lines changed: 0 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -45,33 +45,17 @@ object QueryServerSpec extends ZIOSpec[TestEnvironment & ZContext]:
4545

4646
override def spec = suiteAll("Server Query Handling") {
4747

48-
test("Follower rejects Query with NotLeaderAnymore SessionClosed") {
49-
for
50-
server <- RaftServer.make(s"tcp://0.0.0.0:$testPort")
51-
_ <- ZIO.sleep(200.millis)
52-
client <- ZSocket.client
53-
_ <- client.connect(serverAddress)
54-
// Send Query without a session (server derives routing but is follower)
55-
now <- Clock.instant
56-
_ <- sendClientMessage(client, Query(CorrelationId.fromString("q1"), ByteVector(1,2,3), now))
57-
msg <- waitForMessage[SessionClosed](client)
58-
yield assertTrue(msg.reason == SessionCloseReason.NotLeaderAnymore)
59-
}
60-
6148
test("Leader forwards Query to Raft and send QueryResponse back") {
6249
for
6350
server <- RaftServer.make(s"tcp://0.0.0.0:$testPort")
64-
_ <- ZIO.sleep(200.millis)
6551
_ <- server.stepUp(Map.empty)
66-
_ <- ZIO.sleep(100.millis)
6752

6853
client <- ZSocket.client
6954
_ <- client.connect(serverAddress)
7055

7156
// Create session first
7257
nonce <- Nonce.generate()
7358
_ <- sendClientMessage(client, CreateSession(Map("kv" -> "v1"), nonce))
74-
_ <- ZIO.sleep(100.millis)
7559
action <- server.raftActions.take(1).runCollect.map(_.head)
7660
sessionId = action.asInstanceOf[RaftAction.CreateSession].sessionId
7761
_ <- server.confirmSessionCreation(sessionId)
@@ -84,7 +68,6 @@ object QueryServerSpec extends ZIOSpec[TestEnvironment & ZContext]:
8468
_ <- sendClientMessage(client, Query(corr, payload, now))
8569

8670
// Verify RaftAction.Query queued
87-
_ <- ZIO.sleep(100.millis)
8871
qAction <- server.raftActions.take(1).runCollect.map(_.head)
8972
verified = qAction.isInstanceOf[RaftAction.Query]
9073
_ <- ZIO.fail(new RuntimeException("Expected RaftAction.Query")).unless(verified).ignore

0 commit comments

Comments
 (0)