Skip to content

Commit b5fcaa5

Browse files
committed
hmap refactoring
1 parent ae90ff7 commit b5fcaa5

File tree

13 files changed

+458
-389
lines changed

13 files changed

+458
-389
lines changed

client-server-protocol/src/main/scala/zio/raft/protocol/package.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,8 @@ package object protocol {
6060
*/
6161
val zero: RequestId = RequestId(0L)
6262

63+
val max: RequestId = RequestId(Long.MaxValue)
64+
6365
/** Create a RequestId from a long value (for testing/deserialization).
6466
*/
6567
def fromLong(id: Long): RequestId = {

raft/src/main/scala/zio/raft/HMap.scala

Lines changed: 161 additions & 124 deletions
Large diffs are not rendered by default.

raft/src/test/scala/zio/raft/HMapTypedKeysSpec.scala

Lines changed: 25 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -85,23 +85,35 @@ object HMapTypedKeysSpec extends ZIOSpecDefault:
8585
)
8686
},
8787

88-
test("schema narrowing with typed keys") {
89-
type FullSchema =
90-
("users", UserId, UserData) *:
91-
("orders", OrderId, OrderData) *:
92-
EmptyTuple
93-
94-
type PartialSchema = ("users", UserId, UserData) *: EmptyTuple
95-
96-
val full: HMap[FullSchema] = HMap.empty[FullSchema]
88+
test("different prefixes with same key values don't interfere") {
89+
val hmap = HMap.empty[TestSchema]
9790
.updated["users"](UserId("u1"), UserData("Alice", "[email protected]"))
98-
.updated["orders"](OrderId("o1"), OrderData(1, 10.0))
91+
.updated["orders"](OrderId("u1"), OrderData(5, 50.0)) // Same string "u1" but different typed key
9992

100-
val partial: HMap[PartialSchema] = full.narrowTo[PartialSchema]
93+
val user = hmap.get["users"](UserId("u1"))
94+
val order = hmap.get["orders"](OrderId("u1"))
10195

102-
val user = partial.get["users"](UserId("u1"))
96+
assertTrue(
97+
user == Some(UserData("Alice", "[email protected]")),
98+
order == Some(OrderData(5, 50.0))
99+
)
100+
},
101+
102+
test("range queries return entries within key range") {
103+
val hmap = HMap.empty[TestSchema]
104+
.updated["users"](UserId("user001"), UserData("Alice", "[email protected]"))
105+
.updated["users"](UserId("user005"), UserData("Bob", "[email protected]"))
106+
.updated["users"](UserId("user010"), UserData("Charlie", "[email protected]"))
107+
.updated["users"](UserId("user015"), UserData("Diana", "[email protected]"))
103108

104-
assertTrue(user == Some(UserData("Alice", "[email protected]")))
109+
// Get users from "user003" (inclusive) to "user012" (exclusive)
110+
val rangeResults = hmap.range["users"](UserId("user003"), UserId("user012")).toList
111+
112+
assertTrue(
113+
rangeResults.length == 2,
114+
rangeResults.exists((k, v) => k == UserId("user005") && v.name == "Bob"),
115+
rangeResults.exists((k, v) => k == UserId("user010") && v.name == "Charlie")
116+
)
105117
}
106118
)
107119

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,19 @@
11
package zio.raft.sessionstatemachine
22

3-
import zio.raft.protocol.{SessionId, RequestId}
43
import java.time.Instant
54

65
/**
76
* Represents a server-initiated request that has been sent to a client
87
* and is awaiting acknowledgment.
98
*
10-
* These requests are tracked in the session state and managed by the
11-
* SessionStateMachine base class. When a client sends a ServerRequestAck,
9+
* These requests are tracked in the session state using composite keys
10+
* (SessionId, RequestId) in the HMap. When a client sends a ServerRequestAck,
1211
* the base class uses cumulative acknowledgment: acknowledging request N
1312
* removes all pending requests with ID ≤ N.
1413
*
15-
* @param id Unique identifier for this server-initiated request (monotonic per session)
16-
* @param sessionId The session this request was sent to
14+
* The id and sessionId are stored in the HMap key, not duplicated here,
15+
* for efficiency.
16+
*
1717
* @param payload The actual request data (generic type SR)
1818
* @param lastSentAt Timestamp when this request was last sent (required, not optional)
1919
*
@@ -24,8 +24,6 @@ import java.time.Instant
2424
* @note lastSentAt is NOT optional - it's always present and required
2525
*/
2626
case class PendingServerRequest[SR](
27-
id: RequestId,
28-
sessionId: SessionId,
2927
payload: SR,
3028
lastSentAt: Instant
3129
)

session-state-machine/src/main/scala/zio/raft/sessionstatemachine/SessionStateMachine.scala

Lines changed: 122 additions & 150 deletions
Large diffs are not rendered by default.

session-state-machine/src/main/scala/zio/raft/sessionstatemachine/package.scala

Lines changed: 78 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ import zio.Chunk
1414
* Key types:
1515
* - SessionStateMachine: Abstract base class (template pattern)
1616
* - SessionSchema: Fixed 4-prefix schema for session management
17-
* - CombinedSchema: Type-level concatenation of SessionSchema + UserSchema
17+
* - Schema: Type-level concatenation of SessionSchema + UserSchema
1818
* - SessionCommand: ADT of commands the session state machine accepts
1919
*/
2020
package object sessionstatemachine {
@@ -73,49 +73,94 @@ package object sessionstatemachine {
7373
*
7474
* This schema defines 4 prefixes with their key and value types:
7575
* - "metadata": (SessionId, SessionMetadata) - session information
76-
* - "cache": (SessionId, Map[RequestId, Any]) - cached responses per session for idempotency
77-
* - "serverRequests": (SessionId, List[PendingServerRequest[?]]) - pending requests per session
76+
* - "cache": ((SessionId, RequestId), Any) - cached responses with composite key for efficient range queries and streaming
77+
* - "serverRequests": ((SessionId, RequestId), PendingServerRequest[?]) - pending requests with composite key for efficiency
7878
* - "lastServerRequestId": (SessionId, RequestId) - last assigned server request ID per session
7979
*
80-
* All keys use SessionId for type safety and efficiency.
81-
*
82-
* The cache design groups responses by session, making cleanup O(1) instead of O(n):
83-
* - Cache lookup: get session's map, then lookup requestId
84-
* - Cache cleanup: get session's map, filter by lowestRequestId, update once
85-
* - Session expiration: remove single cache entry (not iterate all requests)
86-
*
87-
* The SessionStateMachine base class uses this schema to manage session state
88-
* automatically. Users don't interact with this schema directly - it's an
89-
* implementation detail of the template pattern.
80+
* Both cache and serverRequests use composite keys (SessionId, RequestId) for better performance:
81+
* - Direct key access: O(log n) lookups
82+
* - Range queries: Efficient iteration over session-specific entries
83+
* - Session expiration: Use range to find all entries for a session
84+
* - Streaming-friendly: Each entry is a separate key-value pair, not nested collections
85+
* - Proper ordering: RequestId ordering is numeric (big-endian encoding), not lexicographic
86+
* - No data duplication: sessionId and requestId are not stored in the value, only in the key
9087
*/
9188
type SessionSchema =
9289
("metadata", SessionId, SessionMetadata) *:
93-
("cache", SessionId, Map[RequestId, Any]) *:
94-
("serverRequests", SessionId, List[PendingServerRequest[?]]) *:
90+
("cache", (SessionId, RequestId), Any) *:
91+
("serverRequests", (SessionId, RequestId), PendingServerRequest[?]) *:
9592
("lastServerRequestId", SessionId, RequestId) *:
9693
EmptyTuple
9794

9895
/**
99-
* Combined schema that concatenates SessionSchema and UserSchema.
100-
*
101-
* This type alias uses Tuple.Concat to merge the session management prefixes
102-
* with the user-defined schema at the type level. The result is a schema with
103-
* both session prefixes and user prefixes, all with compile-time type safety.
104-
*
105-
* @tparam UserSchema The user-defined schema (tuple of (Prefix, KeyType, ValueType) triples)
106-
*
107-
* Example:
108-
* {{{
109-
* type MyUserSchema = ("counter", CounterId, Int) *: ("name", NameId, String) *: EmptyTuple
110-
* type MyCombined = CombinedSchema[MyUserSchema]
111-
* // MyCombined has all 4 SessionSchema prefixes plus "counter" and "name"
112-
* }}}
96+
* KeyLike instance for SessionId keys.
97+
* Used by metadata, serverRequests, and lastServerRequestId prefixes.
11398
*/
114-
type CombinedSchema[UserSchema <: Tuple] = Tuple.Concat[SessionSchema, UserSchema]
99+
given HMap.KeyLike[SessionId] = HMap.KeyLike.forNewtype(SessionId)
115100

116101
/**
117-
* KeyLike instance for SessionId keys.
118-
* All session management prefixes use SessionId as the key type.
102+
* KeyLike instance for composite (SessionId, RequestId) keys.
103+
* Used by the cache prefix for efficient range queries and proper numeric ordering of RequestIds.
104+
*
105+
* Encoding format:
106+
* - 4 bytes: length of SessionId (big-endian Int)
107+
* - N bytes: SessionId as UTF-8 string
108+
* - 8 bytes: RequestId as big-endian Long
109+
*
110+
* This encoding ensures:
111+
* 1. Sessions are grouped together (sorted by SessionId first)
112+
* 2. Within a session, RequestIds are ordered numerically (not lexicographically)
113+
* 3. Range queries work efficiently: range((sid, rid1), (sid, rid2)) selects requests within the range
119114
*/
120-
given HMap.KeyLike[SessionId] = HMap.KeyLike.forNewtype(SessionId)
115+
given HMap.KeyLike[(SessionId, RequestId)] = new HMap.KeyLike[(SessionId, RequestId)]:
116+
import java.nio.charset.StandardCharsets
117+
118+
def asBytes(key: (SessionId, RequestId)): Array[Byte] =
119+
// Encode SessionId as UTF-8
120+
val sessionBytes = SessionId.unwrap(key._1).getBytes(StandardCharsets.UTF_8)
121+
122+
// Encode RequestId as 8-byte big-endian long
123+
val requestId = RequestId.unwrap(key._2)
124+
val requestBytes = Array(
125+
(requestId >> 56).toByte, (requestId >> 48).toByte,
126+
(requestId >> 40).toByte, (requestId >> 32).toByte,
127+
(requestId >> 24).toByte, (requestId >> 16).toByte,
128+
(requestId >> 8).toByte, requestId.toByte
129+
)
130+
131+
// Length-prefix the SessionId (4 bytes big-endian)
132+
val lengthBytes = Array(
133+
(sessionBytes.length >> 24).toByte,
134+
(sessionBytes.length >> 16).toByte,
135+
(sessionBytes.length >> 8).toByte,
136+
sessionBytes.length.toByte
137+
)
138+
139+
lengthBytes ++ sessionBytes ++ requestBytes
140+
141+
def fromBytes(bytes: Array[Byte]): (SessionId, RequestId) =
142+
// Read length prefix
143+
val length =
144+
((bytes(0) & 0xFF) << 24) |
145+
((bytes(1) & 0xFF) << 16) |
146+
((bytes(2) & 0xFF) << 8) |
147+
(bytes(3) & 0xFF)
148+
149+
// Extract SessionId
150+
val sessionBytes = bytes.slice(4, 4 + length)
151+
val sessionId = SessionId(new String(sessionBytes, StandardCharsets.UTF_8))
152+
153+
// Extract RequestId (8 bytes big-endian)
154+
val offset = 4 + length
155+
val requestId =
156+
((bytes(offset).toLong & 0xFF) << 56) |
157+
((bytes(offset + 1).toLong & 0xFF) << 48) |
158+
((bytes(offset + 2).toLong & 0xFF) << 40) |
159+
((bytes(offset + 3).toLong & 0xFF) << 32) |
160+
((bytes(offset + 4).toLong & 0xFF) << 24) |
161+
((bytes(offset + 5).toLong & 0xFF) << 16) |
162+
((bytes(offset + 6).toLong & 0xFF) << 8) |
163+
(bytes(offset + 7).toLong & 0xFF)
164+
165+
(sessionId, RequestId(requestId))
121166
}

session-state-machine/src/test/scala/zio/raft/sessionstatemachine/CumulativeAckSpec.scala

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -50,24 +50,24 @@ object CumulativeAckSpec extends ZIOSpecDefault:
5050
protected def handleSessionExpired(sid: SessionId, capabilities: Map[String, String], createdAt: Instant): StateWriter[HMap[TestSchema], ServerReq, Unit] =
5151
StateWriter.succeed(())
5252

53-
def takeSnapshot(state: HMap[CombinedSchema[TestSchema]]): Stream[Nothing, Byte] =
53+
def takeSnapshot(state: HMap[Schema[TestSchema]]): Stream[Nothing, Byte] =
5454
zio.stream.ZStream.empty
5555

56-
def restoreFromSnapshot(stream: Stream[Nothing, Byte]): UIO[HMap[CombinedSchema[TestSchema]]] =
56+
def restoreFromSnapshot(stream: Stream[Nothing, Byte]): UIO[HMap[Schema[TestSchema]]] =
5757
ZIO.succeed(HMap.empty)
5858

5959
def shouldTakeSnapshot(lastSnapshotIndex: zio.raft.Index, lastSnapshotSize: Long, commitIndex: zio.raft.Index): Boolean =
6060
false
6161

6262
// Helper to check pending requests
63-
def getPendingRequests(state: HMap[CombinedSchema[TestSchema]], sid: SessionId): List[PendingServerRequest[ServerReq]] =
63+
def getPendingRequests(state: HMap[Schema[TestSchema]], sid: SessionId): List[PendingServerRequest[ServerReq]] =
6464
state.get["serverRequests"](sid).getOrElse(List.empty).asInstanceOf[List[PendingServerRequest[ServerReq]]]
6565

6666
def spec = suite("Cumulative Acknowledgment")(
6767

6868
test("PC-3: Ack N removes all pending requests with ID ≤ N") {
6969
val sm = new TestStateMachine()
70-
val state0 = HMap.empty[CombinedSchema[TestSchema]]
70+
val state0 = HMap.empty[Schema[TestSchema]]
7171
val now = Instant.now()
7272

7373
// Create session
@@ -99,7 +99,7 @@ object CumulativeAckSpec extends ZIOSpecDefault:
9999

100100
test("Acknowledging all requests clears pending list") {
101101
val sm = new TestStateMachine()
102-
val state0 = HMap.empty[CombinedSchema[TestSchema]]
102+
val state0 = HMap.empty[Schema[TestSchema]]
103103
val now = Instant.now()
104104

105105
val sessionId = SessionId("s1")
@@ -120,7 +120,7 @@ object CumulativeAckSpec extends ZIOSpecDefault:
120120

121121
test("Acknowledging first request only removes that request") {
122122
val sm = new TestStateMachine()
123-
val state0 = HMap.empty[CombinedSchema[TestSchema]]
123+
val state0 = HMap.empty[Schema[TestSchema]]
124124
val now = Instant.now()
125125

126126
val sessionId = SessionId("s1")
@@ -142,7 +142,7 @@ object CumulativeAckSpec extends ZIOSpecDefault:
142142

143143
test("Acknowledging non-existent high ID clears all requests") {
144144
val sm = new TestStateMachine()
145-
val state0 = HMap.empty[CombinedSchema[TestSchema]]
145+
val state0 = HMap.empty[Schema[TestSchema]]
146146
val now = Instant.now()
147147

148148
val sessionId = SessionId("s1")
@@ -161,7 +161,7 @@ object CumulativeAckSpec extends ZIOSpecDefault:
161161

162162
test("Acknowledging same request twice is idempotent") {
163163
val sm = new TestStateMachine()
164-
val state0 = HMap.empty[CombinedSchema[TestSchema]]
164+
val state0 = HMap.empty[Schema[TestSchema]]
165165
val now = Instant.now()
166166

167167
val sessionId = SessionId("s1")
@@ -184,7 +184,7 @@ object CumulativeAckSpec extends ZIOSpecDefault:
184184
test("Property: Ack N where N < max removes only requests ≤ N") {
185185
check(Gen.int(1, 10)) { ackN =>
186186
val sm = new TestStateMachine()
187-
val state0 = HMap.empty[CombinedSchema[TestSchema]]
187+
val state0 = HMap.empty[Schema[TestSchema]]
188188
val now = Instant.now()
189189

190190
val sessionId = SessionId("s1")

session-state-machine/src/test/scala/zio/raft/sessionstatemachine/IdempotencySpec.scala

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -67,10 +67,10 @@ object IdempotencySpec extends ZIOSpecDefault:
6767
protected def handleSessionExpired(sid: SessionId, capabilities: Map[String, String], createdAt: Instant): StateWriter[HMap[CounterSchema], String, Unit] =
6868
StateWriter.succeed(())
6969

70-
def takeSnapshot(state: HMap[CombinedSchema[CounterSchema]]): Stream[Nothing, Byte] =
70+
def takeSnapshot(state: HMap[Schema[CounterSchema]]): Stream[Nothing, Byte] =
7171
zio.stream.ZStream.empty
7272

73-
def restoreFromSnapshot(stream: Stream[Nothing, Byte]): UIO[HMap[CombinedSchema[CounterSchema]]] =
73+
def restoreFromSnapshot(stream: Stream[Nothing, Byte]): UIO[HMap[Schema[CounterSchema]]] =
7474
ZIO.succeed(HMap.empty)
7575

7676
def shouldTakeSnapshot(lastSnapshotIndex: zio.raft.Index, lastSnapshotSize: Long, commitIndex: zio.raft.Index): Boolean =
@@ -80,7 +80,7 @@ object IdempotencySpec extends ZIOSpecDefault:
8080

8181
test("PC-1: Cache hit returns cached response without calling applyCommand") {
8282
val sm = new CounterStateMachine()
83-
val state0 = HMap.empty[CombinedSchema[CounterSchema]]
83+
val state0 = HMap.empty[Schema[CounterSchema]]
8484
val now = Instant.now()
8585

8686
// First request - should call applyCommand
@@ -107,7 +107,7 @@ object IdempotencySpec extends ZIOSpecDefault:
107107

108108
test("Different request IDs should NOT be cached") {
109109
val sm = new CounterStateMachine()
110-
val state0 = HMap.empty[CombinedSchema[CounterSchema]]
110+
val state0 = HMap.empty[Schema[CounterSchema]]
111111
val now = Instant.now()
112112

113113
val cmd1 = SessionCommand.ClientRequest[CounterCommand, String](
@@ -130,7 +130,7 @@ object IdempotencySpec extends ZIOSpecDefault:
130130

131131
test("Different session IDs should NOT be cached") {
132132
val sm = new CounterStateMachine()
133-
val state0 = HMap.empty[CombinedSchema[CounterSchema]]
133+
val state0 = HMap.empty[Schema[CounterSchema]]
134134
val now = Instant.now()
135135

136136
val cmd1 = SessionCommand.ClientRequest[CounterCommand, String](
@@ -151,7 +151,7 @@ object IdempotencySpec extends ZIOSpecDefault:
151151

152152
test("State should be unchanged on duplicate request") {
153153
val sm = new CounterStateMachine()
154-
val state0 = HMap.empty[CombinedSchema[CounterSchema]]
154+
val state0 = HMap.empty[Schema[CounterSchema]]
155155
val now = Instant.now()
156156

157157
// First request
@@ -178,7 +178,7 @@ object IdempotencySpec extends ZIOSpecDefault:
178178

179179
test("Idempotency works across different command types") {
180180
val sm = new CounterStateMachine()
181-
val state0 = HMap.empty[CombinedSchema[CounterSchema]]
181+
val state0 = HMap.empty[Schema[CounterSchema]]
182182
val now = Instant.now()
183183

184184
// First: Set command

0 commit comments

Comments
 (0)