Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .cursor/rules/specify-rules.mdc
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ Auto-generated from all feature plans. Last updated: 2025-09-24
- **Code Quality**: scalafmt, scalafix, unused import warnings
- Scala 3.3+ + ZIO 2.1+, scodec (for serialization), existing client-server-server library (002-composable-raft-state)
- State maintained in Raft log + snapshots; uses shared Map[String, ByteVector] dictionary with key prefixes (002-composable-raft-state)
- Scala 3 + ZIO 2, existing zio-raft client/server libraries (005-client-server-libraries)
- N/A (read-only Query, no persistence change) (005-client-server-libraries)

## Project Structure
```
Expand Down Expand Up @@ -41,6 +43,7 @@ All code changes must adhere to ZIO Raft Constitution v1.0.0:
- Functional programming patterns (no var, mutable collections)

## Recent Changes
- 005-client-server-libraries: Added Scala 3 + ZIO 2, existing zio-raft client/server libraries
- 002-composable-raft-state: Added Scala 3.3+ + ZIO 2.1+, scodec (for serialization), existing client-server-server library
- 001-implement-client-server: Added Scala 3.3+ with ZIO 2.1+ + ZeroMQ (zio-zmq), scodec (protocol serialization), ZIO (effects), ZIO Test (testing)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package zio.raft.client

import zio.*
import zio.raft.protocol.*
import scodec.bits.ByteVector
import java.time.Instant

/** Manages pending queries with lastSentAt timestamps for retry. */
case class PendingQueries(
queries: Map[CorrelationId, PendingQueries.PendingQueryData]
) {
def contains(correlationId: CorrelationId): Boolean = queries.contains(correlationId)

def add(
correlationId: CorrelationId,
payload: ByteVector,
promise: Promise[Nothing, ByteVector],
sentAt: Instant
): PendingQueries =
copy(queries = queries.updated(correlationId, PendingQueries.PendingQueryData(payload, promise, sentAt, sentAt)))

def complete(correlationId: CorrelationId, result: ByteVector): UIO[PendingQueries] =
queries.get(correlationId) match {
case Some(data) => data.promise.succeed(result).as(copy(queries = queries.removed(correlationId)))
case None => ZIO.succeed(this)
}

/** Resend all pending queries (used after successful connection). */
def resendAll(transport: ClientTransport): UIO[PendingQueries] =
ZIO.foldLeft(queries.toList)(this) { case (pending, (correlationId, data)) =>
for {
now <- Clock.instant
_ <- transport.sendMessage(Query(correlationId, data.payload, now)).orDie
_ <- ZIO.logDebug(s"Resending pending query: ${CorrelationId.unwrap(correlationId)}")
updatedData = data.copy(lastSentAt = now)
} yield PendingQueries(pending.queries.updated(correlationId, updatedData))
}

/** Resend expired queries and update lastSentAt. */
def resendExpired(transport: ClientTransport, currentTime: Instant, timeout: Duration): UIO[PendingQueries] =
ZIO.foldLeft(queries.toList)(this) { case (pending, (correlationId, data)) =>
val elapsed = Duration.fromInterval(data.lastSentAt, currentTime)
if (elapsed > timeout) {
for {
_ <- transport.sendMessage(Query(correlationId, data.payload, currentTime)).orDie
_ <- ZIO.logDebug(s"Resending timed out query: ${CorrelationId.unwrap(correlationId)}")
updatedData = data.copy(lastSentAt = currentTime)
} yield PendingQueries(pending.queries.updated(correlationId, updatedData))
} else {
ZIO.succeed(pending)
}
}
}

object PendingQueries {
def empty: PendingQueries = PendingQueries(Map.empty)

case class PendingQueryData(
payload: ByteVector,
promise: Promise[Nothing, ByteVector],
createdAt: Instant,
lastSentAt: Instant
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ case class PendingRequests(
def add(
requestId: RequestId,
payload: ByteVector,
promise: Promise[Throwable, ByteVector],
promise: Promise[Nothing, ByteVector],
sentAt: Instant
): PendingRequests =
copy(requests = requests.updated(requestId, PendingRequests.PendingRequestData(payload, promise, sentAt, sentAt)))
Expand Down Expand Up @@ -78,7 +78,7 @@ object PendingRequests {

case class PendingRequestData(
payload: ByteVector,
promise: Promise[Throwable, ByteVector],
promise: Promise[Nothing, ByteVector],
createdAt: Instant,
lastSentAt: Instant
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,24 @@ class RaftClient private (
def connect(): UIO[Unit] =
actionQueue.offer(ClientAction.Connect).unit

def submitCommand(payload: ByteVector): Task[ByteVector] =
def submitCommand(payload: ByteVector): UIO[ByteVector] =
for {
promise <- Promise.make[Throwable, ByteVector]
promise <- Promise.make[Nothing, ByteVector]
action = ClientAction.SubmitCommand(payload, promise)
_ <- actionQueue.offer(action)
result <- promise.await
} yield result

/** Submit a read-only Query (leader-served, linearizable).
*/
def query(payload: ByteVector): UIO[ByteVector] =
for {
promise <- Promise.make[Nothing, ByteVector]
action = ClientAction.SubmitQuery(payload, promise)
_ <- actionQueue.offer(action)
result <- promise.await
} yield result

def disconnect(): UIO[Unit] =
actionQueue.offer(ClientAction.Disconnect).unit

Expand Down Expand Up @@ -146,14 +156,18 @@ object RaftClient {
currentMemberId = memberId,
createdAt = now,
nextRequestId = nextRequestId,
pendingRequests = PendingRequests.empty
pendingRequests = PendingRequests.empty,
pendingQueries = PendingQueries.empty
)

case StreamEvent.ServerMsg(message) =>
ZIO.logWarning(s"Received message while disconnected: ${message.getClass.getSimpleName}").as(this)

case StreamEvent.Action(ClientAction.SubmitCommand(_, promise)) =>
promise.fail(new IllegalStateException("Not connected")).ignore.as(this)
promise.die(new IllegalStateException("Not connected")).ignore.as(this)

case StreamEvent.Action(ClientAction.SubmitQuery(_, promise)) =>
promise.die(new IllegalStateException("Not connected")).ignore.as(this)

case StreamEvent.Action(ClientAction.Disconnect) =>
ZIO.succeed(this)
Expand All @@ -173,7 +187,8 @@ object RaftClient {
currentMemberId: MemberId,
createdAt: Instant,
nextRequestId: RequestIdRef,
pendingRequests: PendingRequests
pendingRequests: PendingRequests,
pendingQueries: PendingQueries
) extends ClientState {
override def stateName: String = "ConnectingNewSession"

Expand Down Expand Up @@ -210,8 +225,9 @@ object RaftClient {
for {
_ <- ZIO.logInfo(s"Session created: $sessionId")
now <- Clock.instant
// Send all pending requests
// Send all pending requests/queries
updatedPending <- pendingRequests.resendAll(transport)
updatedPendingQueries <- pendingQueries.resendAll(transport)
} yield Connected(
config = config,
sessionId = sessionId,
Expand All @@ -220,6 +236,7 @@ object RaftClient {
serverRequestTracker = ServerRequestTracker(),
nextRequestId = nextRequestId,
pendingRequests = updatedPending,
pendingQueries = updatedPendingQueries,
currentMemberId = currentMemberId
)
} else {
Expand Down Expand Up @@ -250,6 +267,14 @@ object RaftClient {
newPending = pendingRequests.add(requestId, payload, promise, now)
} yield copy(pendingRequests = newPending)

case StreamEvent.Action(ClientAction.SubmitQuery(payload, promise)) =>
// Queue query while connecting (handled after session established)
for {
now <- Clock.instant
correlationId <- Random.nextUUID.map(u => CorrelationId.fromString(u.toString))
newPending = pendingQueries.add(correlationId, payload, promise, now)
} yield copy(pendingQueries = newPending)

case StreamEvent.TimeoutCheck =>
for {
now <- Clock.instant
Expand All @@ -268,6 +293,9 @@ object RaftClient {
case StreamEvent.ServerMsg(ClientResponse(_, _)) =>
ZIO.logWarning("Received ClientResponse while connecting, ignoring").as(this)

case StreamEvent.ServerMsg(QueryResponse(correlationId, result)) =>
ZIO.logWarning("Received QueryResponse while connecting, ignoring").as(this)

case StreamEvent.ServerMsg(_: RequestError) =>
ZIO.logWarning("Received RequestError while connecting, ignoring").as(this)

Expand Down Expand Up @@ -305,7 +333,8 @@ object RaftClient {
createdAt: Instant,
serverRequestTracker: ServerRequestTracker,
nextRequestId: RequestIdRef,
pendingRequests: PendingRequests
pendingRequests: PendingRequests,
pendingQueries: PendingQueries
) extends ClientState {
override def stateName: String = s"ConnectingExistingSession($sessionId)"

Expand Down Expand Up @@ -345,8 +374,9 @@ object RaftClient {
s"Session continued: $sessionId at $currentMemberId (${currentAddr.getOrElse("unknown")})"
)
now <- Clock.instant
// Send all pending requests
// Send all pending requests/queries
updatedPending <- pendingRequests.resendAll(transport)
updatedPendingQueries <- pendingQueries.resendAll(transport)
} yield Connected(
config = config,
sessionId = sessionId,
Expand All @@ -355,6 +385,7 @@ object RaftClient {
serverRequestTracker = serverRequestTracker,
nextRequestId = nextRequestId,
pendingRequests = updatedPending,
pendingQueries = updatedPendingQueries,
currentMemberId = currentMemberId
)
} else {
Expand Down Expand Up @@ -387,6 +418,14 @@ object RaftClient {
newPending = pendingRequests.add(requestId, payload, promise, now)
} yield copy(pendingRequests = newPending)

case StreamEvent.Action(ClientAction.SubmitQuery(payload, promise)) =>
// Queue query while connecting (handled after session established)
for {
now <- Clock.instant
correlationId <- Random.nextUUID.map(u => CorrelationId.fromString(u.toString))
newPending = pendingQueries.add(correlationId, payload, promise, now)
} yield copy(pendingQueries = newPending)

case StreamEvent.TimeoutCheck =>
for {
now <- Clock.instant
Expand All @@ -405,6 +444,9 @@ object RaftClient {
case StreamEvent.ServerMsg(ClientResponse(_, _)) =>
ZIO.logWarning("Received ClientResponse while connecting, ignoring").as(this)

case StreamEvent.ServerMsg(QueryResponse(correlationId, result)) =>
ZIO.logWarning("Received QueryResponse while connecting existing session, ignoring").as(this)

case StreamEvent.ServerMsg(KeepAliveResponse(_)) =>
ZIO.succeed(this)

Expand Down Expand Up @@ -439,6 +481,7 @@ object RaftClient {
serverRequestTracker: ServerRequestTracker,
nextRequestId: RequestIdRef,
pendingRequests: PendingRequests,
pendingQueries: PendingQueries,
currentMemberId: MemberId
) extends ClientState {
override def stateName: String = s"Connected($sessionId)"
Expand Down Expand Up @@ -472,7 +515,8 @@ object RaftClient {
createdAt = now,
serverRequestTracker = serverRequestTracker,
nextRequestId = nextRequestId,
pendingRequests = pendingRequests
pendingRequests = pendingRequests,
pendingQueries = pendingQueries
)
}

Expand Down Expand Up @@ -501,9 +545,19 @@ object RaftClient {
_ <- transport.sendMessage(request).orDie
newPending = pendingRequests.add(requestId, payload, promise, now)
} yield copy(pendingRequests = newPending)
case StreamEvent.Action(ClientAction.SubmitQuery(payload, promise)) =>
for {
now <- Clock.instant
// correlationId via client-side generator (to be implemented in T024)
Copy link

Copilot AI Oct 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment references task T024 which is already marked as complete in tasks.md. This comment should be removed or updated since the implementation is now present.

Suggested change
// correlationId via client-side generator (to be implemented in T024)

Copilot uses AI. Check for mistakes.
correlationId <- Random.nextUUID.map(u => CorrelationId.fromString(u.toString))
newPending = pendingQueries.add(correlationId, payload, promise, now)
_ <- transport.sendMessage(Query(correlationId, payload, now)).orDie
} yield copy(pendingQueries = newPending)

case StreamEvent.ServerMsg(ClientResponse(requestId, result)) =>
pendingRequests.complete(requestId, result).map(newPending => copy(pendingRequests = newPending))
case StreamEvent.ServerMsg(QueryResponse(correlationId, result)) =>
pendingQueries.complete(correlationId, result).map(newPending => copy(pendingQueries = newPending))

case StreamEvent.ServerMsg(RequestError(requestId, RequestErrorReason.ResponseEvicted)) =>
if (pendingRequests.contains(requestId))
Expand Down Expand Up @@ -581,8 +635,9 @@ object RaftClient {
case StreamEvent.TimeoutCheck =>
for {
now <- Clock.instant
newPending <- pendingRequests.resendExpired(transport, now, config.requestTimeout)
} yield copy(pendingRequests = newPending)
newPendingReqs <- pendingRequests.resendExpired(transport, now, config.requestTimeout)
newPendingQs <- pendingQueries.resendExpired(transport, now, config.requestTimeout)
} yield copy(pendingRequests = newPendingReqs, pendingQueries = newPendingQs)

case StreamEvent.ServerMsg(SessionCreated(_, _)) =>
ZIO.logWarning("Received SessionCreated while connected, ignoring").as(this)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,11 @@ package object client {
case object Disconnect extends ClientAction
case class SubmitCommand(
payload: scodec.bits.ByteVector,
promise: Promise[Throwable, scodec.bits.ByteVector]
promise: Promise[Nothing, scodec.bits.ByteVector]
) extends ClientAction
case class SubmitQuery(
payload: scodec.bits.ByteVector,
promise: Promise[Nothing, scodec.bits.ByteVector]
) extends ClientAction
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package zio.raft.client

import zio.*
import zio.test.*
import zio.test.Assertion.*
import scodec.bits.ByteVector
import zio.raft.protocol.*
import zio.stream.ZStream

object PendingQueriesSpec extends ZIOSpecDefault {

override def spec: Spec[Environment & TestEnvironment & Scope, Any] =
suiteAll("PendingQueries") {

test("resendAll resends all pending queries and updates lastSentAt") {
for {
p <- Promise.make[Nothing, ByteVector]
now <- Clock.instant
pq = PendingQueries.empty.add(CorrelationId.fromString("c1"), ByteVector(1,2,3), p, now)
sentRef <- Ref.make(0)
transport = new ClientTransport {
def connect(address: String) = ZIO.unit
def disconnect() = ZIO.unit
def sendMessage(message: ClientMessage) = sentRef.update(_ + 1).unit
def incomingMessages = ZStream.empty
}
_ <- pq.resendAll(transport)
sent <- sentRef.get
} yield assertTrue(sent == 1)
}

test("complete delivers single completion and removes pending entry") {
for {
p <- Promise.make[Nothing, ByteVector]
now <- Clock.instant
cid = CorrelationId.fromString("c2")
pq = PendingQueries.empty.add(cid, ByteVector(9), p, now)
pq2 <- pq.complete(cid, ByteVector(7))
r <- p.await
} yield assertTrue(r == ByteVector(7))
}
}
}


Loading
Loading