Skip to content

Commit e6f1617

Browse files
somdoronCopilot
andauthored
support error in session state machine apply command (#39)
* support error in session state machine apply command * add test case for error cache * fix failed test * assigned requests propegated on errors * Update session-state-machine/src/main/scala/zio/raft/sessionstatemachine/package.scala Co-authored-by: Copilot <[email protected]> * Update kvstore/src/main/scala/zio/kvstore/Codecs.scala Co-authored-by: Copilot <[email protected]> * fix order of type params * fmt * fix session codec * add copilot comments * fix test * Update session-state-machine/src/main/scala/zio/raft/sessionstatemachine/package.scala Co-authored-by: Copilot <[email protected]> --------- Co-authored-by: Copilot <[email protected]>
1 parent 5aac189 commit e6f1617

File tree

21 files changed

+389
-186
lines changed

21 files changed

+389
-186
lines changed

.specify/templates/plan-template.md

Lines changed: 28 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,8 @@ specs/[###-feature]/
8181
├── plan.md # This file (/plan command output)
8282
├── research.md # Phase 0 output (/plan command)
8383
├── data-model.md # Phase 1 output (/plan command)
84+
├── tests.md # Phase 1 output (/plan command)
85+
├── design.md # Phase 1 output (/plan command)
8486
├── quickstart.md # Phase 1 output (/plan command)
8587
└── tasks.md # Phase 2 output (/tasks command - NOT created by /plan)
8688
```
@@ -114,11 +116,30 @@ specs/[###-feature]/
114116
- Validation rules from requirements
115117
- State transitions if applicable
116118

117-
2. **Extract test scenarios** from user stories:
119+
2. **Update plan with high-level design and architecture**`design.md`
120+
- Design high-level solution, based on the `research.md`, `data-model.md`, `plan.md`, and `spec.md`.
121+
- Read the relevant chapter from the Raft Paper, you can find it over in memory folder.
122+
- What new projects are you going to add?
123+
- What are the new dependencies?
124+
- What files are going to be changed?
125+
- What new entities or classes you need to add?
126+
- What areas are affected? Where do we need to add more tests to cover the new functionality?
127+
- What procotol changes are required, if any?
128+
- Do we need a new version of a codec for the protocol change?
129+
- Do we need a new protocol?
130+
- In general, give high-level overview of the solution in plain english and drawing as well.
131+
132+
3. **Extract test scenarios from user stories**`tests.md`:
118133
- Each story → integration test scenario
134+
- For each functional requirement, evaluate if a test case is required → test case
135+
- Each new entity that requires codec → codec test case
136+
- For each edge case, prompt the user if a test is required → test case
137+
- Based on the `design.md`, what additional test cases we need to add?
138+
- Collect the different test cases in the `tests.md` in plain english
139+
- We are NOT doing Test Driven Development. Only collect the tests to the `tests.md` file.
119140
- Quickstart test = story validation steps
120141

121-
3. **Update agent file incrementally** (O(1) operation):
142+
4. **Update agent file incrementally** (O(1) operation):
122143
- Run `.specify/scripts/bash/update-agent-context.sh cursor`
123144
**IMPORTANT**: Execute it exactly as specified above. Do not add or remove any arguments.
124145
- If exists: Add only NEW tech from current plan
@@ -127,20 +148,20 @@ specs/[###-feature]/
127148
- Keep under 150 lines for token efficiency
128149
- Output to repository root
129150

130-
**Output**: data-model.md, failing tests, quickstart.md, agent-specific file
151+
**Output**: data-model.md, tests.md, design.md, quickstart.md, agent-specific file
131152

132153
## Phase 2: Task Planning Approach
133154
*This section describes what the /tasks command will do - DO NOT execute during /plan*
134155

135156
**Task Generation Strategy**:
136157
- Load `.specify/templates/tasks-template.md` as base
137-
- Generate tasks from Phase 1 design docs (data model, quickstart)
158+
- Generate tasks from Phase 1 design docs (data model, design, tests, quickstart)
138159
- Each entity → model creation task [P]
139-
- Each user storyintegration test task
140-
- Implementation tasks to make tests pass
160+
- Each new projectmodel creation task
161+
- Implementation tasks to the tests in `tests.md`, no TDD, implement each test to completion.
141162

142163
**Ordering Strategy**:
143-
- TDD order: Tests before implementation
164+
- No TDD order: implementations tasks before tests tasks
144165
- Dependency order: Models before services before UI
145166
- Mark [P] for parallel execution (independent files)
146167

client-server-protocol/src/main/scala/zio/raft/protocol/Codecs.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,9 +57,9 @@ object Codecs {
5757
/** Codec for SessionId (UUID string).
5858
*/
5959
implicit val sessionIdCodec: Codec[SessionId] = {
60-
variableSizeBytes(uint16, utf8).xmap(
60+
variableSizeBytes(uint8, utf8).xmap(
6161
str => SessionId(str),
62-
sessionId => SessionId.unwrap(sessionId)
62+
sessionId => sessionId.value
6363
)
6464
}
6565

client-server-protocol/src/main/scala/zio/raft/protocol/package.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,13 @@ package object protocol {
4242
require(id.nonEmpty, "Session ID cannot be empty")
4343
SessionId(id)
4444
}
45+
46+
implicit class SessionIdSyntax(private val sessionId: SessionId) extends AnyVal {
47+
48+
/** Get the value of the session ID.
49+
*/
50+
def value: String = SessionId.unwrap(sessionId)
51+
}
4552
}
4653

4754
type SessionId = SessionId.Type

kvstore/src/main/scala/zio/kvstore/Codecs.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package zio.kvstore
22

33
import scodec.Codec
4-
import scodec.codecs.{ascii, discriminated, fixedSizeBytes, utf8_32}
4+
import scodec.codecs.{ascii, discriminated, fixedSizeBytes, utf8_32, uint8, provide}
55

66
object Codecs:
77
// Response codecs
@@ -19,6 +19,11 @@ object Codecs:
1919
.typecase("W", summon[Codec[KVResponse.WatchDone.type]])
2020
.typecase("G", summon[Codec[KVResponse.GetResult]])
2121

22+
given Codec[Either[Nothing, KVResponse]] =
23+
discriminated[Either[Nothing, KVResponse]].by(uint8)
24+
.typecase(0, scodec.codecs.fail(scodec.Err("Cannot decode Left[Nothing]")))
25+
.typecase(1, summon[Codec[KVResponse]].xmap(Right(_), _.value))
26+
2227
// Value codec for KV schema values
2328
given Codec[String] = utf8_32
2429
private val sessionIdCodec = zio.raft.protocol.Codecs.sessionIdCodec

kvstore/src/main/scala/zio/kvstore/KVServer.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,11 @@ class KVServer(server: RaftServer):
3737
val bytes = summon[Codec[A]].encode(response).require.bytes
3838
server.sendQueryResponse(sessionId, QueryResponse(correlationId, bytes))
3939

40-
def requestError(sessionId: SessionId, requestId: RequestId, reason: zio.raft.sessionstatemachine.RequestError) =
40+
def requestError(
41+
sessionId: SessionId,
42+
requestId: RequestId,
43+
reason: zio.raft.sessionstatemachine.RequestError[Nothing]
44+
) =
4145
val serverReason = reason match
4246
case zio.raft.sessionstatemachine.RequestError.ResponseEvicted =>
4347
zio.raft.protocol.RequestErrorReason.ResponseEvicted

kvstore/src/main/scala/zio/kvstore/KVStateMachine.scala

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,14 @@ import zio.kvstore.Codecs.given
1111
import zio.raft.sessionstatemachine.given
1212
import zio.raft.sessionstatemachine.Codecs.{sessionMetadataCodec, requestIdCodec, pendingServerRequestCodec}
1313

14-
class KVStateMachine extends SessionStateMachine[KVCommand, KVResponse, zio.kvstore.protocol.KVServerRequest, KVSchema]
15-
with ScodecSerialization[KVResponse, zio.kvstore.protocol.KVServerRequest, KVSchema]:
14+
class KVStateMachine
15+
extends SessionStateMachine[KVCommand, KVResponse, zio.kvstore.protocol.KVServerRequest, Nothing, KVSchema]
16+
with ScodecSerialization[KVResponse, zio.kvstore.protocol.KVServerRequest, Nothing, KVSchema]:
1617

1718
// Local alias to aid match-type reduction bug in scala 3.3
18-
type Schema = Tuple.Concat[zio.raft.sessionstatemachine.SessionSchema[KVResponse, KVServerRequest], KVSchema]
19-
private type SW[A] = StateWriter[HMap[Schema], ServerRequestForSession[zio.kvstore.protocol.KVServerRequest], A]
19+
type Schema = Tuple.Concat[zio.raft.sessionstatemachine.SessionSchema[KVResponse, KVServerRequest, Nothing], KVSchema]
20+
private type SW[A] =
21+
StateWriter[HMap[Schema], ServerRequestForSession[zio.kvstore.protocol.KVServerRequest], Nothing, A]
2022

2123
// Helpers
2224
private def putValue(key: KVKey, value: String): SW[Unit] =

kvstore/src/main/scala/zio/kvstore/node/Node.scala

Lines changed: 29 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,11 @@ import zio.raft.zmq.ZmqRpc
2525
final case class Node(
2626
kvServer: KVServer,
2727
raft: zio.raft.Raft[
28-
zio.raft.HMap[Tuple.Concat[zio.raft.sessionstatemachine.SessionSchema[KVResponse, KVServerRequest], KVSchema]],
29-
SessionCommand[KVCommand, KVServerRequest]
28+
zio.raft.HMap[Tuple.Concat[
29+
zio.raft.sessionstatemachine.SessionSchema[KVResponse, KVServerRequest, Nothing],
30+
KVSchema
31+
]],
32+
SessionCommand[KVCommand, KVServerRequest, Nothing]
3033
]
3134
):
3235

@@ -115,7 +118,10 @@ final case class Node(
115118
for
116119
state <- raft.readStateDirty
117120
hasPending =
118-
SessionStateMachine.hasPendingRequests[KVResponse, KVServerRequest, KVSchema](state, lastSentBefore)
121+
SessionStateMachine.hasPendingRequests[KVResponse, KVServerRequest, Nothing, KVSchema](
122+
state,
123+
lastSentBefore
124+
)
119125
_ <-
120126
if hasPending then
121127
val cmd = SessionCommand.GetRequestsForRetry[KVServerRequest](now, lastSentBefore)
@@ -135,13 +141,14 @@ final case class Node(
135141
ZIO.logInfo("Node stepped up") *>
136142
raft.readState.either.flatMap {
137143
case Right(state) =>
138-
val sessions = SessionStateMachine.getSessions[KVResponse, KVServerRequest, KVSchema](state).map {
139-
case (sessionId: SessionId, metadata) =>
140-
(
141-
sessionId,
142-
zio.raft.sessionstatemachine.SessionMetadata(metadata.capabilities, metadata.createdAt)
143-
)
144-
}
144+
val sessions =
145+
SessionStateMachine.getSessions[KVResponse, KVServerRequest, Nothing, KVSchema](state).map {
146+
case (sessionId: SessionId, metadata) =>
147+
(
148+
sessionId,
149+
zio.raft.sessionstatemachine.SessionMetadata(metadata.capabilities, metadata.createdAt)
150+
)
151+
}
145152
kvServer.stepUp(sessions)
146153
case Left(_) => ZIO.unit
147154
}
@@ -169,7 +176,7 @@ final case class Node(
169176
): UIO[Option[command.Response]] =
170177
for
171178
now <- Clock.instant
172-
cmd = SessionCommand.ClientRequest[KVCommand, KVServerRequest](
179+
cmd = SessionCommand.ClientRequest[KVCommand, KVServerRequest, Nothing](
173180
now,
174181
sessionId,
175182
requestId,
@@ -178,16 +185,19 @@ final case class Node(
178185
)
179186
either <- raft.sendCommand(cmd).either
180187
result <- either match
181-
case Right(Right((resp, envelopes))) =>
188+
case Right(envelopes, Right(resp)) =>
182189
for
183190
_ <- ZIO.foreachDiscard(envelopes) { env =>
184191
kvServer.sendServerRequest(now, env.sessionId, env.requestId, env.payload)
185192
}
186193
yield Some(resp.asInstanceOf[command.Response])
187-
case Right(Left(zio.raft.sessionstatemachine.RequestError.ResponseEvicted)) =>
188-
kvServer.requestError(sessionId, requestId, zio.raft.sessionstatemachine.RequestError.ResponseEvicted).as(
189-
None
190-
)
194+
case Right(envelopes, Left(zio.raft.sessionstatemachine.RequestError.ResponseEvicted)) =>
195+
for
196+
_ <- ZIO.foreachDiscard(envelopes) { env =>
197+
kvServer.sendServerRequest(now, env.sessionId, env.requestId, env.payload)
198+
}
199+
_ <- kvServer.requestError(sessionId, requestId, zio.raft.sessionstatemachine.RequestError.ResponseEvicted)
200+
yield None
191201
case Left(_: zio.raft.NotALeaderError) =>
192202
// Ignore not leader error, server will handle it eventually
193203
ZIO.none
@@ -212,9 +222,10 @@ object Node:
212222
for
213223
stable <- LmdbStable.make.debug("LmdbStable.make")
214224

215-
logStore <- SegmentedLog.make[SessionCommand[KVCommand, KVServerRequest]](logDirectory).debug("SegmentedLog.make")
225+
logStore <-
226+
SegmentedLog.make[SessionCommand[KVCommand, KVServerRequest, Nothing]](logDirectory).debug("SegmentedLog.make")
216227
snapshotStore <- FileSnapshotStore.make(zio.nio.file.Path(snapshotDirectory)).debug("FileSnapshotStore.make")
217-
rpc <- ZmqRpc.make[SessionCommand[KVCommand, KVServerRequest]](
228+
rpc <- ZmqRpc.make[SessionCommand[KVCommand, KVServerRequest, Nothing]](
218229
nodeAddress,
219230
peers
220231
).debug("ZmqRpc.make")

session-state-machine/src/main/scala/zio/raft/sessionstatemachine/Codecs.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -60,14 +60,14 @@ object Codecs:
6060
/** Codec for SessionCommand, parameterized by UC (user command) and SR (server request payload). Requires codecs for
6161
* UC and SR in scope.
6262
*/
63-
given sessionCommandCodec[UC <: zio.raft.Command, SR](using
63+
given sessionCommandCodec[UC <: zio.raft.Command, SR, E](using
6464
ucCodec: Codec[UC],
6565
srCodec: Codec[SR]
66-
): Codec[SessionCommand[UC, SR]] =
67-
val clientRequestV0: Codec[SessionCommand.ClientRequest[UC, SR]] =
66+
): Codec[SessionCommand[UC, SR, E]] =
67+
val clientRequestV0: Codec[SessionCommand.ClientRequest[UC, SR, E]] =
6868
(instantCodec :: sessionIdCodec :: requestIdCodec :: requestIdCodec :: ucCodec)
69-
.as[SessionCommand.ClientRequest[UC, SR]]
70-
val clientRequestCodec: Codec[SessionCommand.ClientRequest[UC, SR]] =
69+
.as[SessionCommand.ClientRequest[UC, SR, E]]
70+
val clientRequestCodec: Codec[SessionCommand.ClientRequest[UC, SR, E]] =
7171
(uint8 :: clientRequestV0).xmap(
7272
{ case (_, cmd) => cmd },
7373
cmd => (0, cmd)
@@ -105,7 +105,7 @@ object Codecs:
105105
cmd => (0, cmd)
106106
)
107107

108-
discriminated[SessionCommand[UC, SR]]
108+
discriminated[SessionCommand[UC, SR, E]]
109109
.by(uint8)
110110
.typecase(0, clientRequestCodec)
111111
.typecase(1, serverRequestAckCodec)

session-state-machine/src/main/scala/zio/raft/sessionstatemachine/ScodecSerialization.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,11 +32,11 @@ import scodec.Codec
3232
* // takeSnapshot and restoreFromSnapshot provided!
3333
* }}}
3434
*/
35-
trait ScodecSerialization[R, SR, UserSchema <: Tuple]:
36-
this: StateMachine[HMap[Tuple.Concat[SessionSchema[R, SR], UserSchema]], ?] =>
35+
trait ScodecSerialization[R, SR, E, UserSchema <: Tuple]:
36+
this: StateMachine[HMap[Tuple.Concat[SessionSchema[R, SR, E], UserSchema]], ?] =>
3737

3838
// Type alias for convenience
39-
type Schema = Tuple.Concat[SessionSchema[R, SR], UserSchema]
39+
type Schema = Tuple.Concat[SessionSchema[R, SR, E], UserSchema]
4040

4141
/** TypeclassMap providing codecs for all value types in the schema.
4242
*

session-state-machine/src/main/scala/zio/raft/sessionstatemachine/SessionCommand.scala

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ import java.time.Instant
1414
* @tparam SR
1515
* Server-initiated request payload type
1616
*/
17-
sealed trait SessionCommand[+UC <: Command, SR] extends Command
17+
sealed trait SessionCommand[+UC <: Command, SR, +E] extends Command
1818

1919
object SessionCommand:
2020

@@ -47,15 +47,15 @@ object SessionCommand:
4747
* @note
4848
* Requests can arrive out of order. Eviction detection uses lowestRequestId, not requestId ordering
4949
*/
50-
case class ClientRequest[UC <: Command, SR](
50+
case class ClientRequest[UC <: Command, SR, E](
5151
createdAt: Instant,
5252
sessionId: SessionId,
5353
requestId: RequestId,
5454
lowestPendingRequestId: RequestId,
5555
command: UC
56-
) extends SessionCommand[UC, SR]:
56+
) extends SessionCommand[UC, SR, E]:
5757
// Response type can be an error or the user command's response with server request envelopes
58-
type Response = Either[RequestError, (command.Response, List[ServerRequestEnvelope[SR]])]
58+
type Response = (List[ServerRequestEnvelope[SR]], Either[RequestError[E], command.Response])
5959

6060
/** Acknowledgment from a client for a server-initiated request.
6161
*
@@ -71,7 +71,7 @@ object SessionCommand:
7171
createdAt: Instant,
7272
sessionId: SessionId,
7373
requestId: RequestId
74-
) extends SessionCommand[Nothing, SR]:
74+
) extends SessionCommand[Nothing, SR, Nothing]:
7575
type Response = Unit
7676

7777
/** Create a new session.
@@ -89,7 +89,7 @@ object SessionCommand:
8989
createdAt: Instant,
9090
sessionId: SessionId,
9191
capabilities: Map[String, String]
92-
) extends SessionCommand[Nothing, SR]:
92+
) extends SessionCommand[Nothing, SR, Nothing]:
9393
type Response = List[ServerRequestEnvelope[SR]] // server request envelopes
9494

9595
/** Notification that a session has expired.
@@ -104,7 +104,7 @@ object SessionCommand:
104104
case class SessionExpired[SR](
105105
createdAt: Instant,
106106
sessionId: SessionId
107-
) extends SessionCommand[Nothing, SR]:
107+
) extends SessionCommand[Nothing, SR, Nothing]:
108108
type Response = List[ServerRequestEnvelope[SR]] // server request envelopes
109109

110110
/** Command to atomically retrieve requests needing retry and update lastSentAt.
@@ -122,6 +122,6 @@ object SessionCommand:
122122
case class GetRequestsForRetry[SR](
123123
createdAt: Instant,
124124
lastSentBefore: Instant
125-
) extends SessionCommand[Nothing, SR]:
125+
) extends SessionCommand[Nothing, SR, Nothing]:
126126
type Response = List[ServerRequestEnvelope[SR]]
127127
end SessionCommand

0 commit comments

Comments
 (0)