Skip to content

Commit 03e88fb

Browse files
committed
fix lowest pending request id, to clear lower
1 parent 4feb5d5 commit 03e88fb

File tree

9 files changed

+73
-73
lines changed

9 files changed

+73
-73
lines changed

kvstore/src/test/scala/zio/kvstore/KVSessionSpec.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ object KVSessionSpec extends ZIOSpecDefault:
2222
createdAt = now,
2323
sessionId = sessionId,
2424
requestId = RequestId(1),
25-
lowestRequestId = RequestId(1),
25+
lowestPendingRequestId = RequestId(1),
2626
command = KVCommand.Set("k", "v")
2727
)
2828
val (s2, _) = sm.apply(set).run(s1)
@@ -36,7 +36,7 @@ object KVSessionSpec extends ZIOSpecDefault:
3636
createdAt = now,
3737
sessionId = sessionId,
3838
requestId = RequestId(2),
39-
lowestRequestId = RequestId(1),
39+
lowestPendingRequestId = RequestId(1),
4040
command = KVCommand.Get("k")
4141
)
4242
val (_, result) = sm.apply(get).run(s3)

kvstore/src/test/scala/zio/kvstore/KVStateMachineSpec.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ object KVStateMachineSpec extends ZIOSpecDefault:
2323
createdAt = now,
2424
sessionId = sessionId,
2525
requestId = RequestId(1),
26-
lowestRequestId = RequestId(1),
26+
lowestPendingRequestId = RequestId(1),
2727
command = KVCommand.Set("a", "v1")
2828
)
2929
val (s2, _) = sm.apply(set1).run(s1)
@@ -33,7 +33,7 @@ object KVStateMachineSpec extends ZIOSpecDefault:
3333
createdAt = now,
3434
sessionId = sessionId,
3535
requestId = RequestId(1),
36-
lowestRequestId = RequestId(1),
36+
lowestPendingRequestId = RequestId(1),
3737
command = KVCommand.Set("a", "v2")
3838
)
3939
val (s3, _) = sm.apply(set2).run(s2)
@@ -42,7 +42,7 @@ object KVStateMachineSpec extends ZIOSpecDefault:
4242
createdAt = now,
4343
sessionId = sessionId,
4444
requestId = RequestId(2),
45-
lowestRequestId = RequestId(1),
45+
lowestPendingRequestId = RequestId(1),
4646
command = KVCommand.Get("a")
4747
)
4848
val (_, result) = sm.apply(get).run(s3)

session-state-machine/src/main/scala/zio/raft/sessionstatemachine/SessionCommand.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ object SessionCommand:
5151
createdAt: Instant,
5252
sessionId: SessionId,
5353
requestId: RequestId,
54-
lowestRequestId: RequestId,
54+
lowestPendingRequestId: RequestId,
5555
command: UC
5656
) extends SessionCommand[UC, SR]:
5757
// Response type can be an error or the user command's response with server request envelopes

session-state-machine/src/main/scala/zio/raft/sessionstatemachine/SessionStateMachine.scala

Lines changed: 41 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,9 @@ import java.time.Instant
2828
* - Response caching for duplicate requests
2929
* - Server-initiated request management with cumulative acknowledgment
3030
* - Session lifecycle coordination
31-
* - Cache cleanup driven by client-provided lowestRequestId (inclusive: removes requestId <= lowestRequestId)
32-
* - Evicted response detection: if requestId <= highestLowestRequestIdSeen and not in cache, return
31+
* - Cache cleanup driven by client-provided lowestPendingRequestId (exclusive: removes requestId <
32+
* lowestPendingRequestId)
33+
* - Evicted response detection: if requestId < highestLowestPendingRequestIdSeen and not in cache, return
3334
* RequestError.ResponseEvicted
3435
*
3536
* ## Type Parameters
@@ -269,17 +270,17 @@ trait SessionStateMachine[UC <: Command, R, SR, UserSchema <: Tuple]
269270
*
270271
* Implementation of Raft dissertation Chapter 6.3 session management protocol:
271272
* 1. Check cache for (sessionId, requestId) 2. If cache hit, return cached response 3. If cache miss, check if
272-
* requestId <= highestLowestRequestIdSeen → response was evicted, return error 4. If cache miss + requestId >
273-
* highestLowestRequestIdSeen, execute command and update highestLowestRequestIdSeen
273+
* requestId < highestLowestPendingRequestIdSeen → response was evicted, return error 4. If cache miss +
274+
* requestId ≥ highestLowestPendingRequestIdSeen, execute command and update highestLowestPendingRequestIdSeen
274275
*
275-
* This correctly handles out-of-order requests. The lowestRequestId from the client tells us which responses have
276-
* been acknowledged (inclusive) and can be evicted. We only update highestLowestRequestIdSeen for requests we
277-
* actually process.
276+
* This correctly handles out-of-order requests. The lowestPendingRequestId from the client tells us the lowest
277+
* sequence number without a response; we can evict responses with lower numbers. We only update
278+
* highestLowestPendingRequestIdSeen for requests we actually process.
278279
*/
279280
private def handleClientRequest(cmd: SessionCommand.ClientRequest[UC, SR])
280281
: State[HMap[Schema], Either[RequestError, (cmd.command.Response, List[ServerRequestEnvelope[SR]])]] =
281282
for
282-
highestLowestSeen <- getHighestLowestRequestIdSeen(cmd.sessionId)
283+
highestLowestSeen <- getHighestLowestPendingRequestIdSeen(cmd.sessionId)
283284
cachedOpt <- getCachedResponse((cmd.sessionId, cmd.requestId))
284285
result <- cachedOpt match
285286
case Some(cachedResponse) =>
@@ -288,17 +289,17 @@ trait SessionStateMachine[UC <: Command, R, SR, UserSchema <: Tuple]
288289

289290
case None =>
290291
// Cache miss - check if response was evicted
291-
// If requestId <= highestLowestRequestIdSeen, client has acknowledged receiving this response
292-
if cmd.requestId.isLowerOrEqual(highestLowestSeen) then
293-
// Client said "I have responses for all requestIds <= highestLowest", so this was evicted
292+
// If requestId < highestLowestPendingRequestIdSeen, client has acknowledged receiving this response
293+
if cmd.requestId.isLowerThan(highestLowestSeen) then
294+
// Client said "I have responses for all requestIds < highestLowestPending", so this was evicted
294295
State.succeed(Left(RequestError.ResponseEvicted(cmd.sessionId, cmd.requestId)))
295296
else
296-
// requestId >= highestLowestRequestIdSeen
297+
// requestId >= highestLowestPendingRequestIdSeen
297298
// This is a valid request (not yet acknowledged), execute the command
298299
for
299-
// Update highestLowestRequestIdSeen ONLY when actually executing a new request
300-
_ <- updateHighestLowestRequestIdSeen(cmd.sessionId, cmd.lowestRequestId)
301-
_ <- cleanupCache(cmd.sessionId, cmd.lowestRequestId)
300+
// Update highestLowestPendingRequestIdSeen ONLY when actually executing a new request
301+
_ <- updateHighestLowestPendingRequestIdSeen(cmd.sessionId, cmd.lowestPendingRequestId)
302+
_ <- cleanupCache(cmd.sessionId, cmd.lowestPendingRequestId)
302303
(serverRequestsLog, response) <- applyCommand(cmd.command, cmd.createdAt).withLog
303304
assignedRequests <- addServerRequests(cmd.createdAt, serverRequestsLog)
304305
_ <- cacheResponse((cmd.sessionId, cmd.requestId), response)
@@ -318,31 +319,31 @@ trait SessionStateMachine[UC <: Command, R, SR, UserSchema <: Tuple]
318319
): State[HMap[Schema], Unit] =
319320
State.update(_.updated["cache"](key, response))
320321

321-
/** Get the highest lowestRequestId seen from the client for a session.
322+
/** Get the highest lowestPendingRequestId seen from the client for a session.
322323
*
323-
* This tracks the highest value of lowestRequestId that the client has sent, indicating which responses the client
324-
* has acknowledged receiving.
324+
* This tracks the highest value of lowestPendingRequestId that the client has sent, indicating below which requestId
325+
* responses have been acknowledged and can be discarded.
325326
*
326-
* Returns RequestId.zero if no lowestRequestId has been seen yet (no requests have been acknowledged).
327+
* Returns RequestId.zero if no lowestPendingRequestId has been seen yet (no requests have been acknowledged).
327328
*/
328-
private def getHighestLowestRequestIdSeen(sessionId: SessionId): State[HMap[Schema], RequestId] =
329-
State.get.map(_.get["highestLowestRequestIdSeen"](sessionId).getOrElse(RequestId.zero))
329+
private def getHighestLowestPendingRequestIdSeen(sessionId: SessionId): State[HMap[Schema], RequestId] =
330+
State.get.map(_.get["highestLowestPendingRequestIdSeen"](sessionId).getOrElse(RequestId.zero))
330331

331-
/** Update the highest lowestRequestId seen from the client (only if lowestRequestId > current highest).
332+
/** Update the highest lowestPendingRequestId seen from the client (only if it increased).
332333
*
333-
* The lowestRequestId from the client indicates "I have received all responses for requestIds <= this value
334-
* (inclusive)". We track the highest such value to detect evicted responses.
334+
* The lowestPendingRequestId from the client indicates "I have not yet received a response for this requestId". We
335+
* discard cached responses with lower requestIds and track the highest such value to detect evicted responses.
335336
*/
336-
private def updateHighestLowestRequestIdSeen(
337+
private def updateHighestLowestPendingRequestIdSeen(
337338
sessionId: SessionId,
338-
lowestRequestId: RequestId
339+
lowestPendingRequestId: RequestId
339340
): State[HMap[Schema], Unit] =
340341
State.update(state =>
341-
state.get["highestLowestRequestIdSeen"](sessionId) match
342-
case Some(current) if !lowestRequestId.isGreaterThan(current) =>
343-
state // Don't update if lowestRequestId is not higher
342+
state.get["highestLowestPendingRequestIdSeen"](sessionId) match
343+
case Some(current) if !lowestPendingRequestId.isGreaterThan(current) =>
344+
state // Don't update if not higher
344345
case _ =>
345-
state.updated["highestLowestRequestIdSeen"](sessionId, lowestRequestId)
346+
state.updated["highestLowestPendingRequestIdSeen"](sessionId, lowestPendingRequestId)
346347
)
347348

348349
/** Handle ServerRequestAck with cumulative acknowledgment.
@@ -512,32 +513,33 @@ trait SessionStateMachine[UC <: Command, R, SR, UserSchema <: Tuple]
512513
.removedAll["serverRequests"](serverRequestKeysToRemove)
513514
.removed["metadata"](sessionId)
514515
.removed["lastServerRequestId"](sessionId)
515-
.removed["highestLowestRequestIdSeen"](sessionId)
516+
.removed["highestLowestPendingRequestIdSeen"](sessionId)
516517
}
517518

518519
// ====================================================================================
519520
// HELPER METHODS - All return State[HMap[Schema], A] for composition
520521
// ====================================================================================
521522

522-
/** Clean up cached responses based on lowestRequestId (Lowest Sequence Number Protocol).
523+
/** Clean up cached responses based on lowestPendingRequestId (Lowest Sequence Number Protocol).
523524
*
524-
* Removes all cached responses for the session with requestId <= lowestRequestId. This allows the client to control
525-
* cache cleanup by telling the server which responses it no longer needs (Chapter 6.3 of Raft dissertation).
525+
* Removes all cached responses for the session with requestId < lowestPendingRequestId. This allows the client to
526+
* control cache cleanup by telling the server the lowest requestId without a response (Chapter 6.3 of Raft
527+
* dissertation).
526528
*
527-
* The client sends lowestRequestId to indicate "I have received all responses up to and including this ID".
529+
* The client sends lowestPendingRequestId to indicate "I have not yet received a response for this ID".
528530
*
529531
* Uses range queries to efficiently find and remove old cache entries.
530532
*/
531533
private def cleanupCache(
532534
sessionId: SessionId,
533-
lowestRequestId: RequestId
535+
lowestPendingRequestId: RequestId
534536
): State[HMap[Schema], Unit] =
535537
State.update { state =>
536-
// Use range to find all cache entries for this session with requestId <= lowestRequestId
537-
// Range is [from, until), so to include lowestRequestId, we use lowestRequestId.next as upper bound
538+
// Use range to find all cache entries for this session with requestId < lowestPendingRequestId
539+
// Range is [from, until), so use lowestPendingRequestId as the exclusive upper bound
538540
val keysToRemove = state.range["cache"](
539541
(sessionId, RequestId.zero),
540-
(sessionId, lowestRequestId.next)
542+
(sessionId, lowestPendingRequestId)
541543
).map((key, _) => key)
542544

543545
// Remove all old cache entries in one efficient operation

session-state-machine/src/main/scala/zio/raft/sessionstatemachine/package.scala

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -141,8 +141,8 @@ package object sessionstatemachine:
141141
/** Response was cached but has been evicted. Client must create a new session.
142142
*
143143
* This error occurs when:
144-
* 1. Client retries a request with requestId <= highestLowestRequestIdSeen for the session 2. The response is
145-
* not in the cache (was evicted via cleanupCache)
144+
* 1. Client retries a request with requestId < highestLowestPendingRequestIdSeen for the session 2. The response
145+
* is not in the cache (was evicted via cleanupCache)
146146
*
147147
* Per Raft dissertation Chapter 6.3, the client should create a new session and retry the operation.
148148
*/
@@ -157,7 +157,7 @@ package object sessionstatemachine:
157157
* - "serverRequests": ((SessionId, RequestId), PendingServerRequest[?]) - pending requests with composite key for
158158
* efficiency
159159
* - "lastServerRequestId": (SessionId, RequestId) - last assigned server request ID per session
160-
* - "highestLowestRequestIdSeen": (SessionId, RequestId) - highest lowestRequestId acknowledged by client (for
160+
* - "highestLowestPendingRequestIdSeen": (SessionId, RequestId) - highest lowestPendingRequestId observed (for
161161
* eviction detection)
162162
*
163163
* Both cache and serverRequests use composite keys (SessionId, RequestId) for better performance:
@@ -168,10 +168,10 @@ package object sessionstatemachine:
168168
* - Proper ordering: RequestId ordering is numeric (big-endian encoding), not lexicographic
169169
* - No data duplication: sessionId and requestId are not stored in the value, only in the key
170170
*
171-
* The highestLowestRequestIdSeen prefix enables detection of evicted responses:
172-
* - Client sends lowestRequestId indicating "I have received all responses <= this ID (inclusive)"
173-
* - We track the highest lowestRequestId value we've seen from the client
174-
* - When a ClientRequest arrives, we check if requestId <= highestLowestRequestIdSeen
171+
* The highestLowestPendingRequestIdSeen prefix enables detection of evicted responses:
172+
* - Client sends lowestPendingRequestId indicating the lowest sequence number without a response
173+
* - We track the highest such value received from the client
174+
* - When a ClientRequest arrives, we check if requestId < highestLowestPendingRequestIdSeen
175175
* - If yes and response is not in cache, we know it was evicted (client already acknowledged it)
176176
* - This correctly handles out-of-order requests while preventing re-execution of acknowledged commands
177177
*/
@@ -180,7 +180,7 @@ package object sessionstatemachine:
180180
("cache", (SessionId, RequestId), R) *:
181181
("serverRequests", (SessionId, RequestId), PendingServerRequest[SR]) *:
182182
("lastServerRequestId", SessionId, RequestId) *:
183-
("highestLowestRequestIdSeen", SessionId, RequestId) *:
183+
("highestLowestPendingRequestIdSeen", SessionId, RequestId) *:
184184
EmptyTuple
185185

186186
/** KeyLike instance for SessionId keys. Used by metadata, serverRequests, and lastServerRequestId prefixes.

session-state-machine/src/test/scala/zio/raft/sessionstatemachine/IdempotencySpec.scala

Lines changed: 12 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -162,42 +162,41 @@ object IdempotencySpec extends ZIOSpecDefault:
162162
.asInstanceOf[SessionCommand[TestCommand, String]]
163163
val (state1, _) = sm.apply(createCmd).run(state0)
164164

165-
// First request (requestId=1, lowestRequestId=1)
165+
// First request (requestId=1, lowestPendingRequestId=1)
166166
val cmd1: SessionCommand[TestCommand, String] =
167167
SessionCommand.ClientRequest(now, sessionId, RequestId(1), RequestId(1), TestCommand.Increment(10))
168168
val (state2, result1) = sm.apply(cmd1).run(state1)
169169
val Right((response1, _)) = (result1.asInstanceOf[Either[RequestError, (Int, List[Any])]]): @unchecked
170170

171-
// Second request (requestId=2, lowestRequestId=1)
171+
// Second request (requestId=2, lowestPendingRequestId=1)
172172
val cmd2: SessionCommand[TestCommand, String] =
173173
SessionCommand.ClientRequest(now, sessionId, RequestId(2), RequestId(1), TestCommand.Increment(5))
174174
val (state3, result2) = sm.apply(cmd2).run(state2)
175175
val Right((response2, _)) = (result2.asInstanceOf[Either[RequestError, (Int, List[Any])]]): @unchecked
176176

177-
// Third request (requestId=3, lowestRequestId=2)
178-
// Client says "I have received responses for requestIds <= 2 (inclusive)", so responses 1 and 2 can be evicted
179-
// This triggers cache cleanup AND updates highestLowestRequestIdSeen to 2
177+
// Third request (requestId=3, lowestPendingRequestId=2)
178+
// Client says "lowestPendingRequestId=2", so responses with requestId < 2 can be evicted (only 1)
179+
// This triggers cache cleanup AND updates highestLowestPendingRequestIdSeen to 2
180180
val cmd3: SessionCommand[TestCommand, String] =
181181
SessionCommand.ClientRequest(now, sessionId, RequestId(3), RequestId(2), TestCommand.Increment(1))
182182
val (state4, _) = sm.apply(cmd3).run(state3)
183183

184-
// Now retry request 2 - should fail with ResponseEvicted
185-
// requestId(2) <= highestLowestRequestIdSeen(2) AND not in cache (was evicted)
184+
// Now retry request 1 - should fail with ResponseEvicted (evicted)
186185
val cmd4: SessionCommand[TestCommand, String] =
187-
SessionCommand.ClientRequest(now, sessionId, RequestId(2), RequestId(2), TestCommand.Increment(999))
186+
SessionCommand.ClientRequest(now, sessionId, RequestId(1), RequestId(2), TestCommand.Increment(999))
188187
val (state5, result4) = sm.apply(cmd4).run(state4)
189188

190189
(result4.asInstanceOf[Either[RequestError, (Int, List[Any])]]: @unchecked) match
191190
case Left(RequestError.ResponseEvicted(sid, rid)) =>
192191
assertTrue(
193192
sid == sessionId &&
194-
rid == RequestId(2) &&
193+
rid == RequestId(1) &&
195194
sm.callCount == 3 // Command was NOT executed again (only 3 commands processed)
196195
)
197196
case Right(_) =>
198197
assertTrue(false) // Should not succeed
199198
},
200-
test("PC-3: Cache cleanup removes responses based on lowestRequestId") {
199+
test("PC-3: Cache cleanup removes responses based on lowestPendingRequestId (exclusive)") {
201200
val sm = new TestStateMachine()
202201
val state0: HMap[CombinedSchema] = HMap.empty[CombinedSchema]
203202
val now = Instant.now()
@@ -228,16 +227,15 @@ object IdempotencySpec extends ZIOSpecDefault:
228227
assertTrue(cache4.get["cache"]((sessionId, RequestId(2))).isDefined) &&
229228
assertTrue(cache4.get["cache"]((sessionId, RequestId(3))).isDefined)
230229

231-
// Execute request 4 with lowestRequestId=2 (client says "I have received 1 and 2 (inclusive)")
230+
// Execute request 4 with lowestPendingRequestId=2 (client says "lowest pending is 2")
232231
val cmd4: SessionCommand[TestCommand, String] =
233232
SessionCommand.ClientRequest(now, sessionId, RequestId(4), RequestId(2), TestCommand.Increment(1))
234233
val (state5, _) = sm.apply(cmd4).run(state4)
235234

236-
// Requests 1 and 2 should be cleaned up (<= lowestRequestId=2, inclusive)
237-
// Request 3 and 4 should still be cached (> lowestRequestId=2)
235+
// Requests with id < 2 should be cleaned up (only 1). 2, 3, 4 should still be cached.
238236
val cache5 = state5.asInstanceOf[HMap[CombinedSchema]]
239237
assertTrue(cache5.get["cache"]((sessionId, RequestId(1))).isEmpty) &&
240-
assertTrue(cache5.get["cache"]((sessionId, RequestId(2))).isEmpty) &&
238+
assertTrue(cache5.get["cache"]((sessionId, RequestId(2))).isDefined) &&
241239
assertTrue(cache5.get["cache"]((sessionId, RequestId(3))).isDefined) &&
242240
assertTrue(cache5.get["cache"]((sessionId, RequestId(4))).isDefined)
243241
}

specs/002-session-state-machine/ARCHITECTURE_FINAL.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,7 @@ object SessionCommand:
162162
case class ClientRequest[UC <: Command](
163163
sessionId: SessionId,
164164
requestId: RequestId,
165-
lowestRequestId: RequestId, // For cache cleanup (Lowest Sequence Number Protocol)
165+
lowestPendingRequestId: RequestId, // For cache cleanup (Lowest Sequence Number Protocol, exclusive: evict < this)
166166
command: UC // Already decoded! UC is a Command subtype
167167
) extends SessionCommand[UC]:
168168
type Response = (command.Response, List[Any]) // (user response, server requests)

0 commit comments

Comments
 (0)