Skip to content

Commit c167165

Browse files
committed
add query to client/server
1 parent be7a3c8 commit c167165

File tree

28 files changed

+967
-46
lines changed

28 files changed

+967
-46
lines changed

.cursor/rules/specify-rules.mdc

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ Auto-generated from all feature plans. Last updated: 2025-09-24
1010
- **Code Quality**: scalafmt, scalafix, unused import warnings
1111
- Scala 3.3+ + ZIO 2.1+, scodec (for serialization), existing client-server-server library (002-composable-raft-state)
1212
- State maintained in Raft log + snapshots; uses shared Map[String, ByteVector] dictionary with key prefixes (002-composable-raft-state)
13+
- Scala 3 + ZIO 2, existing zio-raft client/server libraries (005-client-server-libraries)
14+
- N/A (read-only Query, no persistence change) (005-client-server-libraries)
1315

1416
## Project Structure
1517
```
@@ -41,6 +43,7 @@ All code changes must adhere to ZIO Raft Constitution v1.0.0:
4143
- Functional programming patterns (no var, mutable collections)
4244

4345
## Recent Changes
46+
- 005-client-server-libraries: Added Scala 3 + ZIO 2, existing zio-raft client/server libraries
4447
- 002-composable-raft-state: Added Scala 3.3+ + ZIO 2.1+, scodec (for serialization), existing client-server-server library
4548
- 001-implement-client-server: Added Scala 3.3+ with ZIO 2.1+ + ZeroMQ (zio-zmq), scodec (protocol serialization), ZIO (effects), ZIO Test (testing)
4649

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
package zio.raft.client
2+
3+
import zio.*
4+
import zio.raft.protocol.*
5+
import scodec.bits.ByteVector
6+
import java.time.Instant
7+
8+
/** Manages pending queries with lastSentAt timestamps for retry. */
9+
case class PendingQueries(
10+
queries: Map[CorrelationId, PendingQueries.PendingQueryData]
11+
) {
12+
def contains(correlationId: CorrelationId): Boolean = queries.contains(correlationId)
13+
14+
def add(
15+
correlationId: CorrelationId,
16+
payload: ByteVector,
17+
promise: Promise[Nothing, ByteVector],
18+
sentAt: Instant
19+
): PendingQueries =
20+
copy(queries = queries.updated(correlationId, PendingQueries.PendingQueryData(payload, promise, sentAt, sentAt)))
21+
22+
def complete(correlationId: CorrelationId, result: ByteVector): UIO[PendingQueries] =
23+
queries.get(correlationId) match {
24+
case Some(data) => data.promise.succeed(result).as(copy(queries = queries.removed(correlationId)))
25+
case None => ZIO.succeed(this)
26+
}
27+
28+
/** Resend all pending queries (used after successful connection). */
29+
def resendAll(transport: ClientTransport): UIO[PendingQueries] =
30+
ZIO.foldLeft(queries.toList)(this) { case (pending, (correlationId, data)) =>
31+
for {
32+
now <- Clock.instant
33+
_ <- transport.sendMessage(Query(correlationId, data.payload, now)).orDie
34+
_ <- ZIO.logDebug(s"Resending pending query: ${CorrelationId.unwrap(correlationId)}")
35+
updatedData = data.copy(lastSentAt = now)
36+
} yield PendingQueries(pending.queries.updated(correlationId, updatedData))
37+
}
38+
39+
/** Resend expired queries and update lastSentAt. */
40+
def resendExpired(transport: ClientTransport, currentTime: Instant, timeout: Duration): UIO[PendingQueries] =
41+
ZIO.foldLeft(queries.toList)(this) { case (pending, (correlationId, data)) =>
42+
val elapsed = Duration.fromInterval(data.lastSentAt, currentTime)
43+
if (elapsed > timeout) {
44+
for {
45+
_ <- transport.sendMessage(Query(correlationId, data.payload, currentTime)).orDie
46+
_ <- ZIO.logDebug(s"Resending timed out query: ${CorrelationId.unwrap(correlationId)}")
47+
updatedData = data.copy(lastSentAt = currentTime)
48+
} yield PendingQueries(pending.queries.updated(correlationId, updatedData))
49+
} else {
50+
ZIO.succeed(pending)
51+
}
52+
}
53+
}
54+
55+
object PendingQueries {
56+
def empty: PendingQueries = PendingQueries(Map.empty)
57+
58+
case class PendingQueryData(
59+
payload: ByteVector,
60+
promise: Promise[Nothing, ByteVector],
61+
createdAt: Instant,
62+
lastSentAt: Instant
63+
)
64+
}
65+
66+

client-server-client/src/main/scala/zio/raft/client/PendingRequests.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ case class PendingRequests(
1818
def add(
1919
requestId: RequestId,
2020
payload: ByteVector,
21-
promise: Promise[Throwable, ByteVector],
21+
promise: Promise[Nothing, ByteVector],
2222
sentAt: Instant
2323
): PendingRequests =
2424
copy(requests = requests.updated(requestId, PendingRequests.PendingRequestData(payload, promise, sentAt, sentAt)))
@@ -78,7 +78,7 @@ object PendingRequests {
7878

7979
case class PendingRequestData(
8080
payload: ByteVector,
81-
promise: Promise[Throwable, ByteVector],
81+
promise: Promise[Nothing, ByteVector],
8282
createdAt: Instant,
8383
lastSentAt: Instant
8484
)

client-server-client/src/main/scala/zio/raft/client/RaftClient.scala

Lines changed: 66 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -51,14 +51,24 @@ class RaftClient private (
5151
def connect(): UIO[Unit] =
5252
actionQueue.offer(ClientAction.Connect).unit
5353

54-
def submitCommand(payload: ByteVector): Task[ByteVector] =
54+
def submitCommand(payload: ByteVector): UIO[ByteVector] =
5555
for {
56-
promise <- Promise.make[Throwable, ByteVector]
56+
promise <- Promise.make[Nothing, ByteVector]
5757
action = ClientAction.SubmitCommand(payload, promise)
5858
_ <- actionQueue.offer(action)
5959
result <- promise.await
6060
} yield result
6161

62+
/** Submit a read-only Query (leader-served, linearizable).
63+
*/
64+
def query(payload: ByteVector): UIO[ByteVector] =
65+
for {
66+
promise <- Promise.make[Nothing, ByteVector]
67+
action = ClientAction.SubmitQuery(payload, promise)
68+
_ <- actionQueue.offer(action)
69+
result <- promise.await
70+
} yield result
71+
6272
def disconnect(): UIO[Unit] =
6373
actionQueue.offer(ClientAction.Disconnect).unit
6474

@@ -146,14 +156,18 @@ object RaftClient {
146156
currentMemberId = memberId,
147157
createdAt = now,
148158
nextRequestId = nextRequestId,
149-
pendingRequests = PendingRequests.empty
159+
pendingRequests = PendingRequests.empty,
160+
pendingQueries = PendingQueries.empty
150161
)
151162

152163
case StreamEvent.ServerMsg(message) =>
153164
ZIO.logWarning(s"Received message while disconnected: ${message.getClass.getSimpleName}").as(this)
154165

155166
case StreamEvent.Action(ClientAction.SubmitCommand(_, promise)) =>
156-
promise.fail(new IllegalStateException("Not connected")).ignore.as(this)
167+
promise.die(new IllegalStateException("Not connected")).ignore.as(this)
168+
169+
case StreamEvent.Action(ClientAction.SubmitQuery(_, promise)) =>
170+
promise.die(new IllegalStateException("Not connected")).ignore.as(this)
157171

158172
case StreamEvent.Action(ClientAction.Disconnect) =>
159173
ZIO.succeed(this)
@@ -173,7 +187,8 @@ object RaftClient {
173187
currentMemberId: MemberId,
174188
createdAt: Instant,
175189
nextRequestId: RequestIdRef,
176-
pendingRequests: PendingRequests
190+
pendingRequests: PendingRequests,
191+
pendingQueries: PendingQueries
177192
) extends ClientState {
178193
override def stateName: String = "ConnectingNewSession"
179194

@@ -210,8 +225,9 @@ object RaftClient {
210225
for {
211226
_ <- ZIO.logInfo(s"Session created: $sessionId")
212227
now <- Clock.instant
213-
// Send all pending requests
228+
// Send all pending requests/queries
214229
updatedPending <- pendingRequests.resendAll(transport)
230+
updatedPendingQueries <- pendingQueries.resendAll(transport)
215231
} yield Connected(
216232
config = config,
217233
sessionId = sessionId,
@@ -220,6 +236,7 @@ object RaftClient {
220236
serverRequestTracker = ServerRequestTracker(),
221237
nextRequestId = nextRequestId,
222238
pendingRequests = updatedPending,
239+
pendingQueries = updatedPendingQueries,
223240
currentMemberId = currentMemberId
224241
)
225242
} else {
@@ -249,6 +266,14 @@ object RaftClient {
249266
now <- Clock.instant
250267
newPending = pendingRequests.add(requestId, payload, promise, now)
251268
} yield copy(pendingRequests = newPending)
269+
270+
case StreamEvent.Action(ClientAction.SubmitQuery(payload, promise)) =>
271+
// Queue query while connecting (handled after session established)
272+
for {
273+
now <- Clock.instant
274+
correlationId <- Random.nextUUID.map(u => CorrelationId.fromString(u.toString))
275+
newPending = pendingQueries.add(correlationId, payload, promise, now)
276+
} yield copy(pendingQueries = newPending)
252277

253278
case StreamEvent.TimeoutCheck =>
254279
for {
@@ -268,6 +293,9 @@ object RaftClient {
268293
case StreamEvent.ServerMsg(ClientResponse(_, _)) =>
269294
ZIO.logWarning("Received ClientResponse while connecting, ignoring").as(this)
270295

296+
case StreamEvent.ServerMsg(QueryResponse(correlationId, result)) =>
297+
ZIO.logWarning("Received QueryResponse while connecting, ignoring").as(this)
298+
271299
case StreamEvent.ServerMsg(_: RequestError) =>
272300
ZIO.logWarning("Received RequestError while connecting, ignoring").as(this)
273301

@@ -305,7 +333,8 @@ object RaftClient {
305333
createdAt: Instant,
306334
serverRequestTracker: ServerRequestTracker,
307335
nextRequestId: RequestIdRef,
308-
pendingRequests: PendingRequests
336+
pendingRequests: PendingRequests,
337+
pendingQueries: PendingQueries
309338
) extends ClientState {
310339
override def stateName: String = s"ConnectingExistingSession($sessionId)"
311340

@@ -345,8 +374,9 @@ object RaftClient {
345374
s"Session continued: $sessionId at $currentMemberId (${currentAddr.getOrElse("unknown")})"
346375
)
347376
now <- Clock.instant
348-
// Send all pending requests
377+
// Send all pending requests/queries
349378
updatedPending <- pendingRequests.resendAll(transport)
379+
updatedPendingQueries <- pendingQueries.resendAll(transport)
350380
} yield Connected(
351381
config = config,
352382
sessionId = sessionId,
@@ -355,6 +385,7 @@ object RaftClient {
355385
serverRequestTracker = serverRequestTracker,
356386
nextRequestId = nextRequestId,
357387
pendingRequests = updatedPending,
388+
pendingQueries = updatedPendingQueries,
358389
currentMemberId = currentMemberId
359390
)
360391
} else {
@@ -387,6 +418,14 @@ object RaftClient {
387418
newPending = pendingRequests.add(requestId, payload, promise, now)
388419
} yield copy(pendingRequests = newPending)
389420

421+
case StreamEvent.Action(ClientAction.SubmitQuery(payload, promise)) =>
422+
// Queue query while connecting (handled after session established)
423+
for {
424+
now <- Clock.instant
425+
correlationId <- Random.nextUUID.map(u => CorrelationId.fromString(u.toString))
426+
newPending = pendingQueries.add(correlationId, payload, promise, now)
427+
} yield copy(pendingQueries = newPending)
428+
390429
case StreamEvent.TimeoutCheck =>
391430
for {
392431
now <- Clock.instant
@@ -405,6 +444,9 @@ object RaftClient {
405444
case StreamEvent.ServerMsg(ClientResponse(_, _)) =>
406445
ZIO.logWarning("Received ClientResponse while connecting, ignoring").as(this)
407446

447+
case StreamEvent.ServerMsg(QueryResponse(correlationId, result)) =>
448+
ZIO.logWarning("Received QueryResponse while connecting existing session, ignoring").as(this)
449+
408450
case StreamEvent.ServerMsg(KeepAliveResponse(_)) =>
409451
ZIO.succeed(this)
410452

@@ -439,6 +481,7 @@ object RaftClient {
439481
serverRequestTracker: ServerRequestTracker,
440482
nextRequestId: RequestIdRef,
441483
pendingRequests: PendingRequests,
484+
pendingQueries: PendingQueries,
442485
currentMemberId: MemberId
443486
) extends ClientState {
444487
override def stateName: String = s"Connected($sessionId)"
@@ -472,7 +515,8 @@ object RaftClient {
472515
createdAt = now,
473516
serverRequestTracker = serverRequestTracker,
474517
nextRequestId = nextRequestId,
475-
pendingRequests = pendingRequests
518+
pendingRequests = pendingRequests,
519+
pendingQueries = pendingQueries
476520
)
477521
}
478522

@@ -501,9 +545,19 @@ object RaftClient {
501545
_ <- transport.sendMessage(request).orDie
502546
newPending = pendingRequests.add(requestId, payload, promise, now)
503547
} yield copy(pendingRequests = newPending)
548+
case StreamEvent.Action(ClientAction.SubmitQuery(payload, promise)) =>
549+
for {
550+
now <- Clock.instant
551+
// correlationId via client-side generator (to be implemented in T024)
552+
correlationId <- Random.nextUUID.map(u => CorrelationId.fromString(u.toString))
553+
newPending = pendingQueries.add(correlationId, payload, promise, now)
554+
_ <- transport.sendMessage(Query(correlationId, payload, now)).orDie
555+
} yield copy(pendingQueries = newPending)
504556

505557
case StreamEvent.ServerMsg(ClientResponse(requestId, result)) =>
506558
pendingRequests.complete(requestId, result).map(newPending => copy(pendingRequests = newPending))
559+
case StreamEvent.ServerMsg(QueryResponse(correlationId, result)) =>
560+
pendingQueries.complete(correlationId, result).map(newPending => copy(pendingQueries = newPending))
507561

508562
case StreamEvent.ServerMsg(RequestError(requestId, RequestErrorReason.ResponseEvicted)) =>
509563
if (pendingRequests.contains(requestId))
@@ -581,8 +635,9 @@ object RaftClient {
581635
case StreamEvent.TimeoutCheck =>
582636
for {
583637
now <- Clock.instant
584-
newPending <- pendingRequests.resendExpired(transport, now, config.requestTimeout)
585-
} yield copy(pendingRequests = newPending)
638+
newPendingReqs <- pendingRequests.resendExpired(transport, now, config.requestTimeout)
639+
newPendingQs <- pendingQueries.resendExpired(transport, now, config.requestTimeout)
640+
} yield copy(pendingRequests = newPendingReqs, pendingQueries = newPendingQs)
586641

587642
case StreamEvent.ServerMsg(SessionCreated(_, _)) =>
588643
ZIO.logWarning("Received SessionCreated while connected, ignoring").as(this)

client-server-client/src/main/scala/zio/raft/client/package.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,11 @@ package object client {
7878
case object Disconnect extends ClientAction
7979
case class SubmitCommand(
8080
payload: scodec.bits.ByteVector,
81-
promise: Promise[Throwable, scodec.bits.ByteVector]
81+
promise: Promise[Nothing, scodec.bits.ByteVector]
82+
) extends ClientAction
83+
case class SubmitQuery(
84+
payload: scodec.bits.ByteVector,
85+
promise: Promise[Nothing, scodec.bits.ByteVector]
8286
) extends ClientAction
8387
}
8488
}
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
package zio.raft.client
2+
3+
import zio.*
4+
import zio.test.*
5+
import zio.test.Assertion.*
6+
import scodec.bits.ByteVector
7+
import zio.raft.protocol.*
8+
import zio.stream.ZStream
9+
10+
object PendingQueriesSpec extends ZIOSpecDefault {
11+
12+
override def spec: Spec[Environment & TestEnvironment & Scope, Any] =
13+
suiteAll("PendingQueries") {
14+
15+
test("resendAll resends all pending queries and updates lastSentAt") {
16+
for {
17+
p <- Promise.make[Nothing, ByteVector]
18+
now <- Clock.instant
19+
pq = PendingQueries.empty.add(CorrelationId.fromString("c1"), ByteVector(1,2,3), p, now)
20+
sentRef <- Ref.make(0)
21+
transport = new ClientTransport {
22+
def connect(address: String) = ZIO.unit
23+
def disconnect() = ZIO.unit
24+
def sendMessage(message: ClientMessage) = sentRef.update(_ + 1).unit
25+
def incomingMessages = ZStream.empty
26+
}
27+
_ <- pq.resendAll(transport)
28+
sent <- sentRef.get
29+
} yield assertTrue(sent == 1)
30+
}
31+
32+
test("complete delivers single completion and removes pending entry") {
33+
for {
34+
p <- Promise.make[Nothing, ByteVector]
35+
now <- Clock.instant
36+
cid = CorrelationId.fromString("c2")
37+
pq = PendingQueries.empty.add(cid, ByteVector(9), p, now)
38+
pq2 <- pq.complete(cid, ByteVector(7))
39+
r <- p.await
40+
} yield assertTrue(r == ByteVector(7))
41+
}
42+
}
43+
}
44+
45+

0 commit comments

Comments
 (0)