Skip to content

Commit 657481c

Browse files
Copilotsomdoron
andcommitted
Add dieAll methods to handle pending requests/queries on client termination
Co-authored-by: somdoron <[email protected]>
1 parent 0bce353 commit 657481c

File tree

5 files changed

+84
-10
lines changed

5 files changed

+84
-10
lines changed

client-server-client/src/main/scala/zio/raft/client/PendingQueries.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,10 @@ case class PendingQueries(
5050
ZIO.succeed(pending)
5151
}
5252
}
53+
54+
/** Die all pending queries with the given error. */
55+
def dieAll(error: Throwable): UIO[Unit] =
56+
ZIO.foreach(queries.values)(data => data.promise.die(error).ignore).unit
5357
}
5458

5559
object PendingQueries {

client-server-client/src/main/scala/zio/raft/client/PendingRequests.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,10 @@ case class PendingRequests(
7171
case Some(data) => data.promise.die(error).ignore.as(copy(requests = requests.removed(requestId)))
7272
case None => ZIO.succeed(this)
7373
}
74+
75+
/** Die all pending requests with the given error. */
76+
def dieAll(error: Throwable): UIO[Unit] =
77+
ZIO.foreach(requests.values)(data => data.promise.die(error).ignore).unit
7478
}
7579

7680
object PendingRequests {

client-server-client/src/main/scala/zio/raft/client/RaftClient.scala

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -250,10 +250,14 @@ object RaftClient {
250250
connectToMember(leaderId, transport, s"Not leader at $currentMemberId")
251251

252252
case RejectionReason.SessionExpired =>
253-
ZIO.dieMessage("Session not found - cannot continue")
253+
pendingRequests.dieAll(new RuntimeException("Session expired")) *>
254+
pendingQueries.dieAll(new RuntimeException("Session expired")) *>
255+
ZIO.dieMessage("Session not found - cannot continue")
254256

255257
case RejectionReason.InvalidCapabilities =>
256-
ZIO.dieMessage(s"Invalid capabilities - cannot connect: ${capabilities}")
258+
pendingRequests.dieAll(new RuntimeException("Invalid capabilities")) *>
259+
pendingQueries.dieAll(new RuntimeException("Invalid capabilities")) *>
260+
ZIO.dieMessage(s"Invalid capabilities - cannot connect: ${capabilities}")
257261
}
258262
} else {
259263
ZIO.logWarning("Nonce mismatch, ignoring SessionRejected").as(this)
@@ -399,12 +403,15 @@ object RaftClient {
399403
connectToMember(leaderId, transport, s"Not leader at $currentMemberId")
400404

401405
case RejectionReason.SessionExpired =>
402-
ZIO.logWarning("Session not found - cannot continue") *> ZIO.dieMessage(
403-
"Session not found - cannot continue"
404-
)
406+
pendingRequests.dieAll(new RuntimeException("Session expired")) *>
407+
pendingQueries.dieAll(new RuntimeException("Session expired")) *>
408+
ZIO.logWarning("Session not found - cannot continue") *>
409+
ZIO.dieMessage("Session not found - cannot continue")
405410

406411
case RejectionReason.InvalidCapabilities =>
407-
ZIO.dieMessage(s"Invalid capabilities - cannot connect: ${capabilities}")
412+
pendingRequests.dieAll(new RuntimeException("Invalid capabilities")) *>
413+
pendingQueries.dieAll(new RuntimeException("Invalid capabilities")) *>
414+
ZIO.dieMessage(s"Invalid capabilities - cannot connect: ${capabilities}")
408415
}
409416
} else {
410417
ZIO.logWarning("Nonce mismatch, ignoring SessionRejected").as(this)
@@ -561,7 +568,8 @@ object RaftClient {
561568
case StreamEvent.ServerMsg(RequestError(requestId, RequestErrorReason.ResponseEvicted)) =>
562569
if (pendingRequests.contains(requestId))
563570
ZIO.logError(s"RequestError: ResponseEvicted for request $requestId, terminating client") *>
564-
pendingRequests.die(requestId, new RuntimeException("ResponseEvicted")) *>
571+
pendingRequests.dieAll(new RuntimeException("ResponseEvicted")) *>
572+
pendingQueries.dieAll(new RuntimeException("ResponseEvicted")) *>
565573
ZIO.dieMessage("ResponseEvicted")
566574
else
567575
ZIO.logWarning(s"RequestError for non-pending request $requestId, ignoring").as(this)
@@ -621,9 +629,10 @@ object RaftClient {
621629
)
622630

623631
case StreamEvent.ServerMsg(SessionClosed(SessionCloseReason.SessionExpired, _)) =>
624-
ZIO.logWarning("Session closed due to timeout, terminating client") *> ZIO.dieMessage(
625-
"Session timed out: shutting down client."
626-
)
632+
pendingRequests.dieAll(new RuntimeException("Session expired")) *>
633+
pendingQueries.dieAll(new RuntimeException("Session expired")) *>
634+
ZIO.logWarning("Session closed due to timeout, terminating client") *>
635+
ZIO.dieMessage("Session timed out: shutting down client.")
627636

628637
case StreamEvent.KeepAliveTick =>
629638
for {

client-server-client/src/test/scala/zio/raft/client/PendingQueriesSpec.scala

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,33 @@ object PendingQueriesSpec extends ZIOSpecDefault {
7272
assertTrue(d1.lastSentAt == currentTime) &&
7373
assertTrue(d2.lastSentAt == currentTime.minusSeconds(5))
7474
}
75+
76+
test("dieAll completes all pending promises with death") {
77+
for {
78+
p1 <- Promise.make[Nothing, ByteVector]
79+
p2 <- Promise.make[Nothing, ByteVector]
80+
p3 <- Promise.make[Nothing, ByteVector]
81+
now <- Clock.instant
82+
pq = PendingQueries.empty
83+
.add(CorrelationId.fromString("c1"), ByteVector(1), p1, now)
84+
.add(CorrelationId.fromString("c2"), ByteVector(2), p2, now)
85+
.add(CorrelationId.fromString("c3"), ByteVector(3), p3, now)
86+
_ <- pq.dieAll(new RuntimeException("all dead"))
87+
fiber1 <- p1.await.fork
88+
fiber2 <- p2.await.fork
89+
fiber3 <- p3.await.fork
90+
exit1 <- fiber1.await
91+
exit2 <- fiber2.await
92+
exit3 <- fiber3.await
93+
} yield assertTrue(exit1.isFailure) && assertTrue(exit2.isFailure) && assertTrue(exit3.isFailure)
94+
}
95+
96+
test("dieAll on empty pending queries succeeds") {
97+
for {
98+
pq <- ZIO.succeed(PendingQueries.empty)
99+
_ <- pq.dieAll(new RuntimeException("boom"))
100+
} yield assertTrue(true)
101+
}
75102
}
76103
}
77104

client-server-client/src/test/scala/zio/raft/client/PendingRequestsSpec.scala

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,5 +88,35 @@ object PendingRequestsSpec extends ZIOSpecDefault {
8888
pending1 <- pending.die(rid, new RuntimeException("boom"))
8989
} yield assertTrue(pending1 == pending)
9090
}
91+
92+
test("dieAll completes all pending promises with death") {
93+
val rid1 = RequestId.fromLong(1L)
94+
val rid2 = RequestId.fromLong(2L)
95+
val rid3 = RequestId.fromLong(3L)
96+
for {
97+
p1 <- Promise.make[Nothing, ByteVector]
98+
p2 <- Promise.make[Nothing, ByteVector]
99+
p3 <- Promise.make[Nothing, ByteVector]
100+
now <- Clock.instant
101+
pending = PendingRequests.empty
102+
.add(rid1, ByteVector.fromValidHex("aa"), p1, now)
103+
.add(rid2, ByteVector.fromValidHex("bb"), p2, now)
104+
.add(rid3, ByteVector.fromValidHex("cc"), p3, now)
105+
_ <- pending.dieAll(new RuntimeException("all dead"))
106+
fiber1 <- p1.await.fork
107+
fiber2 <- p2.await.fork
108+
fiber3 <- p3.await.fork
109+
exit1 <- fiber1.await
110+
exit2 <- fiber2.await
111+
exit3 <- fiber3.await
112+
} yield assertTrue(exit1.isFailure) && assertTrue(exit2.isFailure) && assertTrue(exit3.isFailure)
113+
}
114+
115+
test("dieAll on empty pending requests succeeds") {
116+
for {
117+
pending <- ZIO.succeed(PendingRequests.empty)
118+
_ <- pending.dieAll(new RuntimeException("boom"))
119+
} yield assertTrue(true)
120+
}
91121
}
92122
}

0 commit comments

Comments
 (0)