diff --git a/.specify/templates/plan-template.md b/.specify/templates/plan-template.md index 0826609b..792e1354 100644 --- a/.specify/templates/plan-template.md +++ b/.specify/templates/plan-template.md @@ -81,6 +81,8 @@ specs/[###-feature]/ ├── plan.md # This file (/plan command output) ├── research.md # Phase 0 output (/plan command) ├── data-model.md # Phase 1 output (/plan command) +├── tests.md # Phase 1 output (/plan command) +├── design.md # Phase 1 output (/plan command) ├── quickstart.md # Phase 1 output (/plan command) └── tasks.md # Phase 2 output (/tasks command - NOT created by /plan) ``` @@ -114,11 +116,30 @@ specs/[###-feature]/ - Validation rules from requirements - State transitions if applicable -2. **Extract test scenarios** from user stories: +2. **Update plan with high-level design and architecture** → `design.md` + - Design high-level solution, based on the `research.md`, `data-model.md`, `plan.md`, and `spec.md`. + - Read the relevant chapter from the Raft Paper, you can find it over in memory folder. + - What new projects are you going to add? + - What are the new dependencies? + - What files are going to be changed? + - What new entities or classes you need to add? + - What areas are affected? Where do we need to add more tests to cover the new functionality? + - What procotol changes are required, if any? + - Do we need a new version of a codec for the protocol change? + - Do we need a new protocol? + - In general, give high-level overview of the solution in plain english and drawing as well. + +3. **Extract test scenarios from user stories** → `tests.md`: - Each story → integration test scenario + - For each functional requirement, evaluate if a test case is required → test case + - Each new entity that requires codec → codec test case + - For each edge case, prompt the user if a test is required → test case + - Based on the `design.md`, what additional test cases we need to add? + - Collect the different test cases in the `tests.md` in plain english + - We are NOT doing Test Driven Development. Only collect the tests to the `tests.md` file. - Quickstart test = story validation steps -3. **Update agent file incrementally** (O(1) operation): +4. **Update agent file incrementally** (O(1) operation): - Run `.specify/scripts/bash/update-agent-context.sh cursor` **IMPORTANT**: Execute it exactly as specified above. Do not add or remove any arguments. - If exists: Add only NEW tech from current plan @@ -127,20 +148,20 @@ specs/[###-feature]/ - Keep under 150 lines for token efficiency - Output to repository root -**Output**: data-model.md, failing tests, quickstart.md, agent-specific file +**Output**: data-model.md, tests.md, design.md, quickstart.md, agent-specific file ## Phase 2: Task Planning Approach *This section describes what the /tasks command will do - DO NOT execute during /plan* **Task Generation Strategy**: - Load `.specify/templates/tasks-template.md` as base -- Generate tasks from Phase 1 design docs (data model, quickstart) +- Generate tasks from Phase 1 design docs (data model, design, tests, quickstart) - Each entity → model creation task [P] -- Each user story → integration test task -- Implementation tasks to make tests pass +- Each new project → model creation task +- Implementation tasks to the tests in `tests.md`, no TDD, implement each test to completion. **Ordering Strategy**: -- TDD order: Tests before implementation +- No TDD order: implementations tasks before tests tasks - Dependency order: Models before services before UI - Mark [P] for parallel execution (independent files) diff --git a/client-server-protocol/src/main/scala/zio/raft/protocol/Codecs.scala b/client-server-protocol/src/main/scala/zio/raft/protocol/Codecs.scala index 34533de2..b2bba247 100644 --- a/client-server-protocol/src/main/scala/zio/raft/protocol/Codecs.scala +++ b/client-server-protocol/src/main/scala/zio/raft/protocol/Codecs.scala @@ -57,9 +57,9 @@ object Codecs { /** Codec for SessionId (UUID string). */ implicit val sessionIdCodec: Codec[SessionId] = { - variableSizeBytes(uint16, utf8).xmap( + variableSizeBytes(uint8, utf8).xmap( str => SessionId(str), - sessionId => SessionId.unwrap(sessionId) + sessionId => sessionId.value ) } diff --git a/client-server-protocol/src/main/scala/zio/raft/protocol/package.scala b/client-server-protocol/src/main/scala/zio/raft/protocol/package.scala index abea3a46..22c6d2cc 100644 --- a/client-server-protocol/src/main/scala/zio/raft/protocol/package.scala +++ b/client-server-protocol/src/main/scala/zio/raft/protocol/package.scala @@ -42,6 +42,13 @@ package object protocol { require(id.nonEmpty, "Session ID cannot be empty") SessionId(id) } + + implicit class SessionIdSyntax(private val sessionId: SessionId) extends AnyVal { + + /** Get the value of the session ID. + */ + def value: String = SessionId.unwrap(sessionId) + } } type SessionId = SessionId.Type diff --git a/kvstore/src/main/scala/zio/kvstore/Codecs.scala b/kvstore/src/main/scala/zio/kvstore/Codecs.scala index abacbb75..910a873b 100644 --- a/kvstore/src/main/scala/zio/kvstore/Codecs.scala +++ b/kvstore/src/main/scala/zio/kvstore/Codecs.scala @@ -1,7 +1,7 @@ package zio.kvstore import scodec.Codec -import scodec.codecs.{ascii, discriminated, fixedSizeBytes, utf8_32} +import scodec.codecs.{ascii, discriminated, fixedSizeBytes, utf8_32, uint8, provide} object Codecs: // Response codecs @@ -19,6 +19,11 @@ object Codecs: .typecase("W", summon[Codec[KVResponse.WatchDone.type]]) .typecase("G", summon[Codec[KVResponse.GetResult]]) + given Codec[Either[Nothing, KVResponse]] = + discriminated[Either[Nothing, KVResponse]].by(uint8) + .typecase(0, scodec.codecs.fail(scodec.Err("Cannot decode Left[Nothing]"))) + .typecase(1, summon[Codec[KVResponse]].xmap(Right(_), _.value)) + // Value codec for KV schema values given Codec[String] = utf8_32 private val sessionIdCodec = zio.raft.protocol.Codecs.sessionIdCodec diff --git a/kvstore/src/main/scala/zio/kvstore/KVServer.scala b/kvstore/src/main/scala/zio/kvstore/KVServer.scala index 86578d29..23fed83c 100644 --- a/kvstore/src/main/scala/zio/kvstore/KVServer.scala +++ b/kvstore/src/main/scala/zio/kvstore/KVServer.scala @@ -37,7 +37,11 @@ class KVServer(server: RaftServer): val bytes = summon[Codec[A]].encode(response).require.bytes server.sendQueryResponse(sessionId, QueryResponse(correlationId, bytes)) - def requestError(sessionId: SessionId, requestId: RequestId, reason: zio.raft.sessionstatemachine.RequestError) = + def requestError( + sessionId: SessionId, + requestId: RequestId, + reason: zio.raft.sessionstatemachine.RequestError[Nothing] + ) = val serverReason = reason match case zio.raft.sessionstatemachine.RequestError.ResponseEvicted => zio.raft.protocol.RequestErrorReason.ResponseEvicted diff --git a/kvstore/src/main/scala/zio/kvstore/KVStateMachine.scala b/kvstore/src/main/scala/zio/kvstore/KVStateMachine.scala index b3984b37..97a7437b 100644 --- a/kvstore/src/main/scala/zio/kvstore/KVStateMachine.scala +++ b/kvstore/src/main/scala/zio/kvstore/KVStateMachine.scala @@ -11,12 +11,14 @@ import zio.kvstore.Codecs.given import zio.raft.sessionstatemachine.given import zio.raft.sessionstatemachine.Codecs.{sessionMetadataCodec, requestIdCodec, pendingServerRequestCodec} -class KVStateMachine extends SessionStateMachine[KVCommand, KVResponse, zio.kvstore.protocol.KVServerRequest, KVSchema] - with ScodecSerialization[KVResponse, zio.kvstore.protocol.KVServerRequest, KVSchema]: +class KVStateMachine + extends SessionStateMachine[KVCommand, KVResponse, zio.kvstore.protocol.KVServerRequest, Nothing, KVSchema] + with ScodecSerialization[KVResponse, zio.kvstore.protocol.KVServerRequest, Nothing, KVSchema]: // Local alias to aid match-type reduction bug in scala 3.3 - type Schema = Tuple.Concat[zio.raft.sessionstatemachine.SessionSchema[KVResponse, KVServerRequest], KVSchema] - private type SW[A] = StateWriter[HMap[Schema], ServerRequestForSession[zio.kvstore.protocol.KVServerRequest], A] + type Schema = Tuple.Concat[zio.raft.sessionstatemachine.SessionSchema[KVResponse, KVServerRequest, Nothing], KVSchema] + private type SW[A] = + StateWriter[HMap[Schema], ServerRequestForSession[zio.kvstore.protocol.KVServerRequest], Nothing, A] // Helpers private def putValue(key: KVKey, value: String): SW[Unit] = diff --git a/kvstore/src/main/scala/zio/kvstore/node/Node.scala b/kvstore/src/main/scala/zio/kvstore/node/Node.scala index 5f8dcba8..dbd94e7d 100644 --- a/kvstore/src/main/scala/zio/kvstore/node/Node.scala +++ b/kvstore/src/main/scala/zio/kvstore/node/Node.scala @@ -25,8 +25,11 @@ import zio.raft.zmq.ZmqRpc final case class Node( kvServer: KVServer, raft: zio.raft.Raft[ - zio.raft.HMap[Tuple.Concat[zio.raft.sessionstatemachine.SessionSchema[KVResponse, KVServerRequest], KVSchema]], - SessionCommand[KVCommand, KVServerRequest] + zio.raft.HMap[Tuple.Concat[ + zio.raft.sessionstatemachine.SessionSchema[KVResponse, KVServerRequest, Nothing], + KVSchema + ]], + SessionCommand[KVCommand, KVServerRequest, Nothing] ] ): @@ -115,7 +118,10 @@ final case class Node( for state <- raft.readStateDirty hasPending = - SessionStateMachine.hasPendingRequests[KVResponse, KVServerRequest, KVSchema](state, lastSentBefore) + SessionStateMachine.hasPendingRequests[KVResponse, KVServerRequest, Nothing, KVSchema]( + state, + lastSentBefore + ) _ <- if hasPending then val cmd = SessionCommand.GetRequestsForRetry[KVServerRequest](now, lastSentBefore) @@ -135,13 +141,14 @@ final case class Node( ZIO.logInfo("Node stepped up") *> raft.readState.either.flatMap { case Right(state) => - val sessions = SessionStateMachine.getSessions[KVResponse, KVServerRequest, KVSchema](state).map { - case (sessionId: SessionId, metadata) => - ( - sessionId, - zio.raft.sessionstatemachine.SessionMetadata(metadata.capabilities, metadata.createdAt) - ) - } + val sessions = + SessionStateMachine.getSessions[KVResponse, KVServerRequest, Nothing, KVSchema](state).map { + case (sessionId: SessionId, metadata) => + ( + sessionId, + zio.raft.sessionstatemachine.SessionMetadata(metadata.capabilities, metadata.createdAt) + ) + } kvServer.stepUp(sessions) case Left(_) => ZIO.unit } @@ -169,7 +176,7 @@ final case class Node( ): UIO[Option[command.Response]] = for now <- Clock.instant - cmd = SessionCommand.ClientRequest[KVCommand, KVServerRequest]( + cmd = SessionCommand.ClientRequest[KVCommand, KVServerRequest, Nothing]( now, sessionId, requestId, @@ -178,16 +185,19 @@ final case class Node( ) either <- raft.sendCommand(cmd).either result <- either match - case Right(Right((resp, envelopes))) => + case Right(envelopes, Right(resp)) => for _ <- ZIO.foreachDiscard(envelopes) { env => kvServer.sendServerRequest(now, env.sessionId, env.requestId, env.payload) } yield Some(resp.asInstanceOf[command.Response]) - case Right(Left(zio.raft.sessionstatemachine.RequestError.ResponseEvicted)) => - kvServer.requestError(sessionId, requestId, zio.raft.sessionstatemachine.RequestError.ResponseEvicted).as( - None - ) + case Right(envelopes, Left(zio.raft.sessionstatemachine.RequestError.ResponseEvicted)) => + for + _ <- ZIO.foreachDiscard(envelopes) { env => + kvServer.sendServerRequest(now, env.sessionId, env.requestId, env.payload) + } + _ <- kvServer.requestError(sessionId, requestId, zio.raft.sessionstatemachine.RequestError.ResponseEvicted) + yield None case Left(_: zio.raft.NotALeaderError) => // Ignore not leader error, server will handle it eventually ZIO.none @@ -212,9 +222,10 @@ object Node: for stable <- LmdbStable.make.debug("LmdbStable.make") - logStore <- SegmentedLog.make[SessionCommand[KVCommand, KVServerRequest]](logDirectory).debug("SegmentedLog.make") + logStore <- + SegmentedLog.make[SessionCommand[KVCommand, KVServerRequest, Nothing]](logDirectory).debug("SegmentedLog.make") snapshotStore <- FileSnapshotStore.make(zio.nio.file.Path(snapshotDirectory)).debug("FileSnapshotStore.make") - rpc <- ZmqRpc.make[SessionCommand[KVCommand, KVServerRequest]]( + rpc <- ZmqRpc.make[SessionCommand[KVCommand, KVServerRequest, Nothing]]( nodeAddress, peers ).debug("ZmqRpc.make") diff --git a/session-state-machine/src/main/scala/zio/raft/sessionstatemachine/Codecs.scala b/session-state-machine/src/main/scala/zio/raft/sessionstatemachine/Codecs.scala index 90574bb2..546a0a26 100644 --- a/session-state-machine/src/main/scala/zio/raft/sessionstatemachine/Codecs.scala +++ b/session-state-machine/src/main/scala/zio/raft/sessionstatemachine/Codecs.scala @@ -60,14 +60,14 @@ object Codecs: /** Codec for SessionCommand, parameterized by UC (user command) and SR (server request payload). Requires codecs for * UC and SR in scope. */ - given sessionCommandCodec[UC <: zio.raft.Command, SR](using + given sessionCommandCodec[UC <: zio.raft.Command, SR, E](using ucCodec: Codec[UC], srCodec: Codec[SR] - ): Codec[SessionCommand[UC, SR]] = - val clientRequestV0: Codec[SessionCommand.ClientRequest[UC, SR]] = + ): Codec[SessionCommand[UC, SR, E]] = + val clientRequestV0: Codec[SessionCommand.ClientRequest[UC, SR, E]] = (instantCodec :: sessionIdCodec :: requestIdCodec :: requestIdCodec :: ucCodec) - .as[SessionCommand.ClientRequest[UC, SR]] - val clientRequestCodec: Codec[SessionCommand.ClientRequest[UC, SR]] = + .as[SessionCommand.ClientRequest[UC, SR, E]] + val clientRequestCodec: Codec[SessionCommand.ClientRequest[UC, SR, E]] = (uint8 :: clientRequestV0).xmap( { case (_, cmd) => cmd }, cmd => (0, cmd) @@ -105,7 +105,7 @@ object Codecs: cmd => (0, cmd) ) - discriminated[SessionCommand[UC, SR]] + discriminated[SessionCommand[UC, SR, E]] .by(uint8) .typecase(0, clientRequestCodec) .typecase(1, serverRequestAckCodec) diff --git a/session-state-machine/src/main/scala/zio/raft/sessionstatemachine/ScodecSerialization.scala b/session-state-machine/src/main/scala/zio/raft/sessionstatemachine/ScodecSerialization.scala index ca3e2019..7e03a24b 100644 --- a/session-state-machine/src/main/scala/zio/raft/sessionstatemachine/ScodecSerialization.scala +++ b/session-state-machine/src/main/scala/zio/raft/sessionstatemachine/ScodecSerialization.scala @@ -32,11 +32,11 @@ import scodec.Codec * // takeSnapshot and restoreFromSnapshot provided! * }}} */ -trait ScodecSerialization[R, SR, UserSchema <: Tuple]: - this: StateMachine[HMap[Tuple.Concat[SessionSchema[R, SR], UserSchema]], ?] => +trait ScodecSerialization[R, SR, E, UserSchema <: Tuple]: + this: StateMachine[HMap[Tuple.Concat[SessionSchema[R, SR, E], UserSchema]], ?] => // Type alias for convenience - type Schema = Tuple.Concat[SessionSchema[R, SR], UserSchema] + type Schema = Tuple.Concat[SessionSchema[R, SR, E], UserSchema] /** TypeclassMap providing codecs for all value types in the schema. * diff --git a/session-state-machine/src/main/scala/zio/raft/sessionstatemachine/SessionCommand.scala b/session-state-machine/src/main/scala/zio/raft/sessionstatemachine/SessionCommand.scala index 6a9cc824..3004e69f 100644 --- a/session-state-machine/src/main/scala/zio/raft/sessionstatemachine/SessionCommand.scala +++ b/session-state-machine/src/main/scala/zio/raft/sessionstatemachine/SessionCommand.scala @@ -14,7 +14,7 @@ import java.time.Instant * @tparam SR * Server-initiated request payload type */ -sealed trait SessionCommand[+UC <: Command, SR] extends Command +sealed trait SessionCommand[+UC <: Command, SR, +E] extends Command object SessionCommand: @@ -47,15 +47,15 @@ object SessionCommand: * @note * Requests can arrive out of order. Eviction detection uses lowestRequestId, not requestId ordering */ - case class ClientRequest[UC <: Command, SR]( + case class ClientRequest[UC <: Command, SR, E]( createdAt: Instant, sessionId: SessionId, requestId: RequestId, lowestPendingRequestId: RequestId, command: UC - ) extends SessionCommand[UC, SR]: + ) extends SessionCommand[UC, SR, E]: // Response type can be an error or the user command's response with server request envelopes - type Response = Either[RequestError, (command.Response, List[ServerRequestEnvelope[SR]])] + type Response = (List[ServerRequestEnvelope[SR]], Either[RequestError[E], command.Response]) /** Acknowledgment from a client for a server-initiated request. * @@ -71,7 +71,7 @@ object SessionCommand: createdAt: Instant, sessionId: SessionId, requestId: RequestId - ) extends SessionCommand[Nothing, SR]: + ) extends SessionCommand[Nothing, SR, Nothing]: type Response = Unit /** Create a new session. @@ -89,7 +89,7 @@ object SessionCommand: createdAt: Instant, sessionId: SessionId, capabilities: Map[String, String] - ) extends SessionCommand[Nothing, SR]: + ) extends SessionCommand[Nothing, SR, Nothing]: type Response = List[ServerRequestEnvelope[SR]] // server request envelopes /** Notification that a session has expired. @@ -104,7 +104,7 @@ object SessionCommand: case class SessionExpired[SR]( createdAt: Instant, sessionId: SessionId - ) extends SessionCommand[Nothing, SR]: + ) extends SessionCommand[Nothing, SR, Nothing]: type Response = List[ServerRequestEnvelope[SR]] // server request envelopes /** Command to atomically retrieve requests needing retry and update lastSentAt. @@ -122,6 +122,6 @@ object SessionCommand: case class GetRequestsForRetry[SR]( createdAt: Instant, lastSentBefore: Instant - ) extends SessionCommand[Nothing, SR]: + ) extends SessionCommand[Nothing, SR, Nothing]: type Response = List[ServerRequestEnvelope[SR]] end SessionCommand diff --git a/session-state-machine/src/main/scala/zio/raft/sessionstatemachine/SessionStateMachine.scala b/session-state-machine/src/main/scala/zio/raft/sessionstatemachine/SessionStateMachine.scala index a13d46c5..5b15f52a 100644 --- a/session-state-machine/src/main/scala/zio/raft/sessionstatemachine/SessionStateMachine.scala +++ b/session-state-machine/src/main/scala/zio/raft/sessionstatemachine/SessionStateMachine.scala @@ -71,15 +71,15 @@ import java.time.Instant * @see * StateWriter for the state + writer monad used in abstract methods */ -trait SessionStateMachine[UC <: Command, R, SR, UserSchema <: Tuple] - extends StateMachine[HMap[Tuple.Concat[SessionSchema[R, SR], UserSchema]], SessionCommand[UC, SR]]: +trait SessionStateMachine[UC <: Command, R, SR, E, UserSchema <: Tuple] + extends StateMachine[HMap[Tuple.Concat[SessionSchema[R, SR, E], UserSchema]], SessionCommand[UC, SR, E]]: /** Type alias for the complete schema (SessionSchema[R, SR] ++ UserSchema). * * This allows using `HMap[Schema]` instead of `HMap[Tuple.Concat[SessionSchema[R, SR], UserSchema]]` throughout the * implementation, making signatures cleaner. */ - type Schema = Tuple.Concat[SessionSchema[R, SR], UserSchema] + type Schema = Tuple.Concat[SessionSchema[R, SR, E], UserSchema] // ==================================================================================== // ABSTRACT METHODS - Users must implement @@ -133,7 +133,7 @@ trait SessionStateMachine[UC <: Command, R, SR, UserSchema <: Tuple] createdAt: Instant, sessionId: SessionId, command: UC - ): StateWriter[HMap[Schema], ServerRequestForSession[SR], command.Response & R] + ): StateWriter[HMap[Schema], ServerRequestForSession[SR], E, command.Response & R] /** Handle session creation event. * @@ -162,7 +162,7 @@ trait SessionStateMachine[UC <: Command, R, SR, UserSchema <: Tuple] createdAt: Instant, sessionId: SessionId, capabilities: Map[String, String] - ): StateWriter[HMap[Schema], ServerRequestForSession[SR], Unit] + ): StateWriter[HMap[Schema], ServerRequestForSession[SR], Nothing, Unit] /** Handle session expiration event. * @@ -194,7 +194,7 @@ trait SessionStateMachine[UC <: Command, R, SR, UserSchema <: Tuple] createdAt: Instant, sessionId: SessionId, capabilities: Map[String, String] - ): StateWriter[HMap[Schema], ServerRequestForSession[SR], Unit] + ): StateWriter[HMap[Schema], ServerRequestForSession[SR], Nothing, Unit] // ==================================================================================== // StateMachine INTERFACE - Implemented by this trait @@ -225,9 +225,9 @@ trait SessionStateMachine[UC <: Command, R, SR, UserSchema <: Tuple] * Response type, and the compiler cannot verify exhaustiveness due to type erasure. This is safe because we handle * all sealed trait cases explicitly. */ - final def apply(command: SessionCommand[UC, SR]): State[HMap[Schema], command.Response] = + final def apply(command: SessionCommand[UC, SR, E]): State[HMap[Schema], command.Response] = command match - case cmd: SessionCommand.ClientRequest[UC, SR] @unchecked => + case cmd: SessionCommand.ClientRequest[UC, SR, E] @unchecked => handleClientRequest(cmd).map(_.asResponseType(command, cmd)) case cmd: SessionCommand.ServerRequestAck[SR] @unchecked => @@ -284,22 +284,26 @@ trait SessionStateMachine[UC <: Command, R, SR, UserSchema <: Tuple] * sequence number without a response; we can evict responses with lower numbers. We only update * highestLowestPendingRequestIdSeen for requests we actually process. */ - private def handleClientRequest(cmd: SessionCommand.ClientRequest[UC, SR]) - : State[HMap[Schema], Either[RequestError, (cmd.command.Response, List[ServerRequestEnvelope[SR]])]] = + private def handleClientRequest(cmd: SessionCommand.ClientRequest[UC, SR, E]) + : State[HMap[Schema], (List[ServerRequestEnvelope[SR]], Either[RequestError[E], cmd.command.Response])] = for highestLowestSeen <- getHighestLowestPendingRequestIdSeen(cmd.sessionId) cachedOpt <- getCachedResponse((cmd.sessionId, cmd.requestId)) result <- cachedOpt match case Some(cachedResponse) => // Cache hit - return cached response without calling user method - State.succeed(Right((cachedResponse.asInstanceOf[cmd.command.Response], Nil))) + val response = + cachedResponse match + case Left(value) => Left(RequestError.UserError(value)) + case Right(value) => Right(value.asInstanceOf[cmd.command.Response]) + State.succeed((Nil, response)) case None => // Cache miss - check if response was evicted // If requestId < highestLowestPendingRequestIdSeen, client has acknowledged receiving this response if cmd.requestId.isLowerThan(highestLowestSeen) then // Client said "I have responses for all requestIds < highestLowestPending", so this was evicted - State.succeed(Left(RequestError.ResponseEvicted)) + State.succeed((Nil, Left(RequestError.ResponseEvicted))) else // requestId >= highestLowestPendingRequestIdSeen // This is a valid request (not yet acknowledged), execute the command @@ -307,22 +311,22 @@ trait SessionStateMachine[UC <: Command, R, SR, UserSchema <: Tuple] // Update highestLowestPendingRequestIdSeen ONLY when actually executing a new request _ <- updateHighestLowestPendingRequestIdSeen(cmd.sessionId, cmd.lowestPendingRequestId) _ <- cleanupCache(cmd.sessionId, cmd.lowestPendingRequestId) - (serverRequestsLog, response) <- applyCommand(cmd.createdAt, cmd.sessionId, cmd.command).withLog - assignedRequests <- addServerRequests(cmd.createdAt, serverRequestsLog) + (serverRequestsLog, response) <- applyCommand(cmd.createdAt, cmd.sessionId, cmd.command).either.withLog _ <- cacheResponse((cmd.sessionId, cmd.requestId), response) - yield Right((response, assignedRequests)) + assignedRequests <- addServerRequests(cmd.createdAt, serverRequestsLog) + yield (assignedRequests, response.left.map(RequestError.UserError(_))) yield result /** Get cached response for a composite key. */ - private def getCachedResponse(key: (SessionId, RequestId)): State[HMap[Schema], Option[Any]] = + private def getCachedResponse(key: (SessionId, RequestId)): State[HMap[Schema], Option[Either[E, Any]]] = State.get.map(_.get["cache"](key)) /** Cache a response at the given composite key. */ private def cacheResponse( key: (SessionId, RequestId), - response: R + response: Either[E, R] ): State[HMap[Schema], Unit] = State.update(_.updated["cache"](key, response)) @@ -554,16 +558,16 @@ trait SessionStateMachine[UC <: Command, R, SR, UserSchema <: Tuple] } end SessionStateMachine object SessionStateMachine: - def hasPendingRequests[R, SR, UserSchema <: Tuple]( - state: HMap[Tuple.Concat[SessionSchema[R, SR], UserSchema]], + def hasPendingRequests[R, SR, E, UserSchema <: Tuple]( + state: HMap[Tuple.Concat[SessionSchema[R, SR, E], UserSchema]], lastSentBefore: Instant ): Boolean = state.exists["serverRequests"] { (_, pending) => pending.lastSentAt.isBefore(lastSentBefore) } - def getSessions[R, SR, UserSchema <: Tuple]( - state: HMap[Tuple.Concat[SessionSchema[R, SR], UserSchema]] + def getSessions[R, SR, E, UserSchema <: Tuple]( + state: HMap[Tuple.Concat[SessionSchema[R, SR, E], UserSchema]] ): Map[SessionId, SessionMetadata] = state.iterator["metadata"].collect { case (sessionId: SessionId, metadata) => diff --git a/session-state-machine/src/main/scala/zio/raft/sessionstatemachine/package.scala b/session-state-machine/src/main/scala/zio/raft/sessionstatemachine/package.scala index 663c0ae9..2af94f78 100644 --- a/session-state-machine/src/main/scala/zio/raft/sessionstatemachine/package.scala +++ b/session-state-machine/src/main/scala/zio/raft/sessionstatemachine/package.scala @@ -89,7 +89,7 @@ package object sessionstatemachine: * } yield CommandResponse(...) * }}} */ - type StateWriter[S, W, A] = ZPure[W, S, S, Any, Nothing, A] + type StateWriter[S, W, E, A] = ZPure[W, S, S, Any, E, A] val StateWriter: zio.prelude.fx.ZPure.type = zio.prelude.fx.ZPure /** Extension methods for ZPure to simplify working with state + log. @@ -137,7 +137,7 @@ package object sessionstatemachine: /** Error type for request handling failures. */ - enum RequestError: + enum RequestError[+E]: /** Response was cached but has been evicted. Client must create a new session. * * This error occurs when: @@ -146,7 +146,18 @@ package object sessionstatemachine: * * Per Raft dissertation Chapter 6.3, the client should create a new session and retry the operation. */ - case ResponseEvicted + case ResponseEvicted extends RequestError[Nothing] + + /** Command execution failed with a user-defined error. + * + * This error occurs when the state machine's applyCommand method returns a failure. The error is cached to + * maintain idempotency - duplicate requests will return the same error. Unlike ResponseEvicted, the client can + * retry with the same session or handle the error appropriately. + * + * @param e + * The user-defined error value + */ + case UserError(e: E) extends RequestError[E] /** Fixed schema for session management state with typed keys. * @@ -175,9 +186,9 @@ package object sessionstatemachine: * - If yes and response is not in cache, we know it was evicted (client already acknowledged it) * - This correctly handles out-of-order requests while preventing re-execution of acknowledged commands */ - type SessionSchema[R, SR] = + type SessionSchema[R, SR, E] = ("metadata", SessionId, SessionMetadata) *: - ("cache", (SessionId, RequestId), R) *: + ("cache", (SessionId, RequestId), Either[E, R]) *: ("serverRequests", (SessionId, RequestId), PendingServerRequest[SR]) *: ("lastServerRequestId", SessionId, RequestId) *: ("highestLowestPendingRequestIdSeen", SessionId, RequestId) *: @@ -196,7 +207,7 @@ package object sessionstatemachine: * @tparam UserSchema * User-defined schema */ - type Schema[R, SR, UserSchema <: Tuple] = Tuple.Concat[SessionSchema[R, SR], UserSchema] + type Schema[R, SR, E, UserSchema <: Tuple] = Tuple.Concat[SessionSchema[R, SR, E], UserSchema] given HMap.KeyLike[(SessionId, RequestId)] = new HMap.KeyLike[(SessionId, RequestId)]: import java.nio.charset.StandardCharsets diff --git a/session-state-machine/src/test/scala/zio/raft/sessionstatemachine/CodecsSpec.scala b/session-state-machine/src/test/scala/zio/raft/sessionstatemachine/CodecsSpec.scala index 27f78dfc..a778e229 100644 --- a/session-state-machine/src/test/scala/zio/raft/sessionstatemachine/CodecsSpec.scala +++ b/session-state-machine/src/test/scala/zio/raft/sessionstatemachine/CodecsSpec.scala @@ -40,14 +40,14 @@ object CodecsSpec extends ZIOSpecDefault: type Response = Unit given Codec[DummyCmd.type] = provide(DummyCmd) given Codec[Unit] = provide(()) - val cmd: SessionCommand[DummyCmd.type, Unit] = SessionCommand.ClientRequest[DummyCmd.type, Unit]( + val cmd: SessionCommand[DummyCmd.type, Unit, Nothing] = SessionCommand.ClientRequest[DummyCmd.type, Unit, Nothing]( createdAt = Instant.EPOCH, sessionId = SessionId.fromString("s-1"), requestId = RequestId(1L), lowestPendingRequestId = RequestId(0L), command = DummyCmd ) - val codec = summon[Codec[SessionCommand[DummyCmd.type, Unit]]] + val codec = summon[Codec[SessionCommand[DummyCmd.type, Unit, Nothing]]] val bits = codec.encode(cmd).require val decoded = codec.decode(bits).require.value assertTrue(decoded == cmd) @@ -64,7 +64,7 @@ object CodecsSpec extends ZIOSpecDefault: sessionId = SessionId.fromString("s-2"), requestId = RequestId(2L) ) - val codec = summon[Codec[SessionCommand[DummyCmd.type, Unit]]] + val codec = summon[Codec[SessionCommand[DummyCmd.type, Unit, Nothing]]] val bits = codec.encode(cmd).require val decoded = codec.decode(bits).require.value assertTrue(decoded == cmd) @@ -80,7 +80,7 @@ object CodecsSpec extends ZIOSpecDefault: sessionId = SessionId.fromString("s-3"), capabilities = Map("k" -> "v") ) - val codec = summon[Codec[SessionCommand[DummyCmd.type, Unit]]] + val codec = summon[Codec[SessionCommand[DummyCmd.type, Unit, Nothing]]] val bits = codec.encode(cmd).require val decoded = codec.decode(bits).require.value assertTrue(decoded == cmd) @@ -95,7 +95,7 @@ object CodecsSpec extends ZIOSpecDefault: createdAt = Instant.EPOCH, sessionId = SessionId.fromString("s-4") ) - val codec = summon[Codec[SessionCommand[DummyCmd.type, Unit]]] + val codec = summon[Codec[SessionCommand[DummyCmd.type, Unit, Nothing]]] val bits = codec.encode(cmd).require val decoded = codec.decode(bits).require.value assertTrue(decoded == cmd) @@ -110,7 +110,7 @@ object CodecsSpec extends ZIOSpecDefault: createdAt = Instant.EPOCH, lastSentBefore = Instant.ofEpochMilli(500L) ) - val codec = summon[Codec[SessionCommand[DummyCmd.type, Unit]]] + val codec = summon[Codec[SessionCommand[DummyCmd.type, Unit, Nothing]]] val bits = codec.encode(cmd).require val decoded = codec.decode(bits).require.value assertTrue(decoded == cmd) diff --git a/session-state-machine/src/test/scala/zio/raft/sessionstatemachine/CumulativeAckSpec.scala b/session-state-machine/src/test/scala/zio/raft/sessionstatemachine/CumulativeAckSpec.scala index bfb7724c..4befcfc5 100644 --- a/session-state-machine/src/test/scala/zio/raft/sessionstatemachine/CumulativeAckSpec.scala +++ b/session-state-machine/src/test/scala/zio/raft/sessionstatemachine/CumulativeAckSpec.scala @@ -16,17 +16,26 @@ object CumulativeAckSpec extends ZIOSpecDefault: type TestResponse = Unit type TestSchema = EmptyTuple - type CombinedSchema = Tuple.Concat[SessionSchema[TestResponse, String], TestSchema] + type CombinedSchema = Tuple.Concat[SessionSchema[TestResponse, String, Nothing], TestSchema] // Minimal codecs for ScodecSerialization import scodec.codecs.* given scodec.Codec[Unit] = provide(()) + // Codec for Either[Nothing, TestResponse] to satisfy cache value type + given scodec.Codec[Either[Nothing, TestResponse]] = + summon[scodec.Codec[TestResponse]].exmap[Either[Nothing, TestResponse]]( + r => scodec.Attempt.successful(Right(r)), + (e: Either[Nothing, TestResponse]) => + e match + case Right(r) => scodec.Attempt.successful(r) + case Left(_) => scodec.Attempt.failure(scodec.Err("Left (Nothing) is not encodable/decodable")) + ) import zio.raft.sessionstatemachine.Codecs.{sessionMetadataCodec, requestIdCodec, pendingServerRequestCodec} given scodec.Codec[PendingServerRequest[?]] = summon[scodec.Codec[PendingServerRequest[String]]].asInstanceOf[scodec.Codec[PendingServerRequest[?]]] - class TestStateMachine extends SessionStateMachine[TestCommand, TestResponse, String, TestSchema] - with ScodecSerialization[TestResponse, String, TestSchema]: + class TestStateMachine extends SessionStateMachine[TestCommand, TestResponse, String, Nothing, TestSchema] + with ScodecSerialization[TestResponse, String, Nothing, TestSchema]: val codecs = summon[HMap.TypeclassMap[CombinedSchema, scodec.Codec]] @@ -34,21 +43,21 @@ object CumulativeAckSpec extends ZIOSpecDefault: createdAt: Instant, sessionId: SessionId, cmd: TestCommand - ): StateWriter[HMap[CombinedSchema], ServerRequestForSession[String], cmd.Response & TestResponse] = + ): StateWriter[HMap[CombinedSchema], ServerRequestForSession[String], Nothing, cmd.Response & TestResponse] = StateWriter.succeed(().asInstanceOf[cmd.Response & TestResponse]) protected def handleSessionCreated( createdAt: Instant, sid: SessionId, caps: Map[String, String] - ): StateWriter[HMap[CombinedSchema], ServerRequestForSession[String], Unit] = + ): StateWriter[HMap[CombinedSchema], ServerRequestForSession[String], Nothing, Unit] = StateWriter.succeed(()) protected def handleSessionExpired( createdAt: Instant, sid: SessionId, capabilities: Map[String, String] - ): StateWriter[HMap[CombinedSchema], ServerRequestForSession[String], Unit] = + ): StateWriter[HMap[CombinedSchema], ServerRequestForSession[String], Nothing, Unit] = StateWriter.succeed(()) override def shouldTakeSnapshot(lastSnapshotIndex: Index, lastSnapshotSize: Long, commitIndex: Index): Boolean = @@ -63,7 +72,7 @@ object CumulativeAckSpec extends ZIOSpecDefault: val createCmd = SessionCommand.CreateSession[String](now, sessionId, Map.empty) - .asInstanceOf[SessionCommand[TestCommand, String]] + .asInstanceOf[SessionCommand[TestCommand, String, Nothing]] val (state1, _) = sm.apply(createCmd).run(state0) // Seed pending server requests manually (IDs 1..5) @@ -84,9 +93,9 @@ object CumulativeAckSpec extends ZIOSpecDefault: val state2 = s2.asInstanceOf[HMap[sm.Schema]] // Acknowledge up to RequestId(1) should remove first batch - val ack1: SessionCommand[TestCommand, String] = + val ack1: SessionCommand[TestCommand, String, Nothing] = SessionCommand.ServerRequestAck[String](now, sessionId, RequestId(1)) - .asInstanceOf[SessionCommand[TestCommand, String]] + .asInstanceOf[SessionCommand[TestCommand, String, Nothing]] val (state3, _) = sm.apply(ack1).run(state2) val pendingAfterAck1 = state3.asInstanceOf[HMap[CombinedSchema]].iterator["serverRequests"].toList diff --git a/session-state-machine/src/test/scala/zio/raft/sessionstatemachine/IdempotencySpec.scala b/session-state-machine/src/test/scala/zio/raft/sessionstatemachine/IdempotencySpec.scala index 2389a88c..d9b15837 100644 --- a/session-state-machine/src/test/scala/zio/raft/sessionstatemachine/IdempotencySpec.scala +++ b/session-state-machine/src/test/scala/zio/raft/sessionstatemachine/IdempotencySpec.scala @@ -26,7 +26,7 @@ object IdempotencySpec extends ZIOSpecDefault: given HMap.KeyLike[CounterKey] = HMap.KeyLike.forNewtype(CounterKey) type TestSchema = ("counter", CounterKey, Int) *: EmptyTuple - type CombinedSchema = Tuple.Concat[SessionSchema[TestResponse, String], TestSchema] + type CombinedSchema = Tuple.Concat[SessionSchema[TestResponse, String, Nothing], TestSchema] val counterKey = CounterKey("value") @@ -39,6 +39,15 @@ object IdempotencySpec extends ZIOSpecDefault: // Codec for Int (our counter value) given scodec.Codec[Int] = int32 + // Codec for Either[Nothing, TestResponse] to satisfy cache value type + given scodec.Codec[Either[Nothing, TestResponse]] = + summon[scodec.Codec[TestResponse]].exmap[Either[Nothing, TestResponse]]( + r => scodec.Attempt.successful(Right(r)), + (e: Either[Nothing, TestResponse]) => + e match + case Right(r) => scodec.Attempt.successful(r) + case Left(_) => scodec.Attempt.failure(scodec.Err("Left (Nothing) is not encodable/decodable")) + ) // Use provided codecs from Codecs object import zio.raft.sessionstatemachine.Codecs.{sessionMetadataCodec, requestIdCodec, pendingServerRequestCodec} @@ -48,8 +57,8 @@ object IdempotencySpec extends ZIOSpecDefault: given scodec.Codec[PendingServerRequest[?]] = summon[scodec.Codec[PendingServerRequest[String]]].asInstanceOf[scodec.Codec[PendingServerRequest[?]]] - class TestStateMachine extends SessionStateMachine[TestCommand, TestResponse, String, TestSchema] - with ScodecSerialization[TestResponse, String, TestSchema]: + class TestStateMachine extends SessionStateMachine[TestCommand, TestResponse, String, Nothing, TestSchema] + with ScodecSerialization[TestResponse, String, Nothing, TestSchema]: val codecs = summon[HMap.TypeclassMap[CombinedSchema, scodec.Codec]] var callCount = 0 // Track how many times applyCommand is called @@ -58,7 +67,7 @@ object IdempotencySpec extends ZIOSpecDefault: createdAt: Instant, sessionId: SessionId, cmd: TestCommand - ): StateWriter[HMap[CombinedSchema], ServerRequestForSession[String], cmd.Response & TestResponse] = + ): StateWriter[HMap[CombinedSchema], ServerRequestForSession[String], Nothing, cmd.Response & TestResponse] = callCount += 1 cmd match case TestCommand.Increment(by) => @@ -74,14 +83,14 @@ object IdempotencySpec extends ZIOSpecDefault: createdAt: Instant, sid: SessionId, caps: Map[String, String] - ): StateWriter[HMap[CombinedSchema], ServerRequestForSession[String], Unit] = + ): StateWriter[HMap[CombinedSchema], ServerRequestForSession[String], Nothing, Unit] = StateWriter.succeed(()) protected def handleSessionExpired( createdAt: Instant, sid: SessionId, capabilities: Map[String, String] - ): StateWriter[HMap[CombinedSchema], ServerRequestForSession[String], Unit] = + ): StateWriter[HMap[CombinedSchema], ServerRequestForSession[String], Nothing, Unit] = StateWriter.succeed(()) // takeSnapshot and restoreFromSnapshot are now provided by SessionStateMachine base class! @@ -97,25 +106,27 @@ object IdempotencySpec extends ZIOSpecDefault: val sessionId = SessionId("s1") // Create session first (cast to match state machine type) - val createCmd: SessionCommand[TestCommand, String] = + val createCmd: SessionCommand[TestCommand, String, Nothing] = SessionCommand.CreateSession[String](now, sessionId, Map.empty) - .asInstanceOf[SessionCommand[TestCommand, String]] + .asInstanceOf[SessionCommand[TestCommand, String, Nothing]] val (state1, _) = sm.apply(createCmd).run(state0) // First request - should call applyCommand - val cmd1: SessionCommand[TestCommand, String] = + val cmd1: SessionCommand[TestCommand, String, Nothing] = SessionCommand.ClientRequest(now, sessionId, RequestId(1), RequestId(1), TestCommand.Increment(10)) val (state2, result1) = sm.apply(cmd1).run(state1) - val Right((response1, _)) = (result1.asInstanceOf[Either[RequestError, (Int, List[Any])]]): @unchecked + val (_, Right(response1)) = + (result1.asInstanceOf[(List[Any], Either[RequestError[Nothing], Int])]): @unchecked assertTrue(sm.callCount == 1) && assertTrue(response1 == 10) // Second request with same ID - should NOT call applyCommand - val cmd2: SessionCommand[TestCommand, String] = + val cmd2: SessionCommand[TestCommand, String, Nothing] = SessionCommand.ClientRequest(now, sessionId, RequestId(1), RequestId(1), TestCommand.Increment(999)) val (state3, result2) = sm.apply(cmd2).run(state2) - val Right((response2, _)) = (result2.asInstanceOf[Either[RequestError, (Int, List[Any])]]): @unchecked + val (_, Right(response2)) = + (result2.asInstanceOf[(List[Any], Either[RequestError[Nothing], Int])]): @unchecked assertTrue( sm.callCount == 1 && // Still 1, not called again! @@ -128,22 +139,24 @@ object IdempotencySpec extends ZIOSpecDefault: val now = Instant.now() val sessionId = SessionId("s1") - val createCmd: SessionCommand[TestCommand, String] = + val createCmd: SessionCommand[TestCommand, String, Nothing] = SessionCommand.CreateSession[String](now, sessionId, Map.empty) - .asInstanceOf[SessionCommand[TestCommand, String]] + .asInstanceOf[SessionCommand[TestCommand, String, Nothing]] val (state1, _) = sm.apply(createCmd).run(state0) // First request - val cmd1: SessionCommand[TestCommand, String] = + val cmd1: SessionCommand[TestCommand, String, Nothing] = SessionCommand.ClientRequest(now, sessionId, RequestId(1), RequestId(1), TestCommand.Increment(5)) val (state2, result1) = sm.apply(cmd1).run(state1) - val Right((response1, _)) = (result1.asInstanceOf[Either[RequestError, (Int, List[Any])]]): @unchecked + val (_, Right(response1)) = + (result1.asInstanceOf[(List[Any], Either[RequestError[Nothing], Int])]): @unchecked // Second request with DIFFERENT ID - val cmd2: SessionCommand[TestCommand, String] = + val cmd2: SessionCommand[TestCommand, String, Nothing] = SessionCommand.ClientRequest(now, sessionId, RequestId(2), RequestId(1), TestCommand.Increment(3)) val (state3, result2) = sm.apply(cmd2).run(state2) - val Right((response2, _)) = (result2.asInstanceOf[Either[RequestError, (Int, List[Any])]]): @unchecked + val (_, Right(response2)) = + (result2.asInstanceOf[(List[Any], Either[RequestError[Nothing], Int])]): @unchecked assertTrue( sm.callCount == 2 && // Both requests processed @@ -158,41 +171,43 @@ object IdempotencySpec extends ZIOSpecDefault: val sessionId = SessionId("s1") // Create session - val createCmd: SessionCommand[TestCommand, String] = + val createCmd: SessionCommand[TestCommand, String, Nothing] = SessionCommand.CreateSession[String](now, sessionId, Map.empty) - .asInstanceOf[SessionCommand[TestCommand, String]] + .asInstanceOf[SessionCommand[TestCommand, String, Nothing]] val (state1, _) = sm.apply(createCmd).run(state0) // First request (requestId=1, lowestPendingRequestId=1) - val cmd1: SessionCommand[TestCommand, String] = + val cmd1: SessionCommand[TestCommand, String, Nothing] = SessionCommand.ClientRequest(now, sessionId, RequestId(1), RequestId(1), TestCommand.Increment(10)) val (state2, result1) = sm.apply(cmd1).run(state1) - val Right((response1, _)) = (result1.asInstanceOf[Either[RequestError, (Int, List[Any])]]): @unchecked + val (_, Right(response1)) = + (result1.asInstanceOf[(List[Any], Either[RequestError[Nothing], Int])]): @unchecked // Second request (requestId=2, lowestPendingRequestId=1) - val cmd2: SessionCommand[TestCommand, String] = + val cmd2: SessionCommand[TestCommand, String, Nothing] = SessionCommand.ClientRequest(now, sessionId, RequestId(2), RequestId(1), TestCommand.Increment(5)) val (state3, result2) = sm.apply(cmd2).run(state2) - val Right((response2, _)) = (result2.asInstanceOf[Either[RequestError, (Int, List[Any])]]): @unchecked + val (_, Right(response2)) = + (result2.asInstanceOf[(List[Any], Either[RequestError[Nothing], Int])]): @unchecked // Third request (requestId=3, lowestPendingRequestId=2) // Client says "lowestPendingRequestId=2", so responses with requestId < 2 can be evicted (only 1) // This triggers cache cleanup AND updates highestLowestPendingRequestIdSeen to 2 - val cmd3: SessionCommand[TestCommand, String] = + val cmd3: SessionCommand[TestCommand, String, Nothing] = SessionCommand.ClientRequest(now, sessionId, RequestId(3), RequestId(2), TestCommand.Increment(1)) val (state4, _) = sm.apply(cmd3).run(state3) // Now retry request 1 - should fail with ResponseEvicted (evicted) - val cmd4: SessionCommand[TestCommand, String] = + val cmd4: SessionCommand[TestCommand, String, Nothing] = SessionCommand.ClientRequest(now, sessionId, RequestId(1), RequestId(2), TestCommand.Increment(999)) val (state5, result4) = sm.apply(cmd4).run(state4) - (result4.asInstanceOf[Either[RequestError, (Int, List[Any])]]: @unchecked) match - case Left(RequestError.ResponseEvicted) => + (result4.asInstanceOf[(List[Any], Either[RequestError[Nothing], Int])]: @unchecked) match + case (_, Left(RequestError.ResponseEvicted)) => assertTrue( - sm.callCount == 3 // Command was NOT executed again (only 3 commands processed) + sm.callCount == 3 // Command was NOT executed again (only 3 commands processed) ) - case Right(_) => + case (_, Right(_)) => assertTrue(false) // Should not succeed }, test("PC-3: Cache cleanup removes responses based on lowestPendingRequestId (exclusive)") { @@ -202,21 +217,21 @@ object IdempotencySpec extends ZIOSpecDefault: val sessionId = SessionId("s1") // Create session - val createCmd: SessionCommand[TestCommand, String] = + val createCmd: SessionCommand[TestCommand, String, Nothing] = SessionCommand.CreateSession[String](now, sessionId, Map.empty) - .asInstanceOf[SessionCommand[TestCommand, String]] + .asInstanceOf[SessionCommand[TestCommand, String, Nothing]] val (state1, _) = sm.apply(createCmd).run(state0) // Execute 3 requests (requestIds 1, 2, 3) - val cmd1: SessionCommand[TestCommand, String] = + val cmd1: SessionCommand[TestCommand, String, Nothing] = SessionCommand.ClientRequest(now, sessionId, RequestId(1), RequestId(1), TestCommand.Increment(10)) val (state2, _) = sm.apply(cmd1).run(state1) - val cmd2: SessionCommand[TestCommand, String] = + val cmd2: SessionCommand[TestCommand, String, Nothing] = SessionCommand.ClientRequest(now, sessionId, RequestId(2), RequestId(1), TestCommand.Increment(5)) val (state3, _) = sm.apply(cmd2).run(state2) - val cmd3: SessionCommand[TestCommand, String] = + val cmd3: SessionCommand[TestCommand, String, Nothing] = SessionCommand.ClientRequest(now, sessionId, RequestId(3), RequestId(1), TestCommand.Increment(3)) val (state4, _) = sm.apply(cmd3).run(state3) @@ -227,7 +242,7 @@ object IdempotencySpec extends ZIOSpecDefault: assertTrue(cache4.get["cache"]((sessionId, RequestId(3))).isDefined) // Execute request 4 with lowestPendingRequestId=2 (client says "lowest pending is 2") - val cmd4: SessionCommand[TestCommand, String] = + val cmd4: SessionCommand[TestCommand, String, Nothing] = SessionCommand.ClientRequest(now, sessionId, RequestId(4), RequestId(2), TestCommand.Increment(1)) val (state5, _) = sm.apply(cmd4).run(state4) @@ -237,6 +252,79 @@ object IdempotencySpec extends ZIOSpecDefault: assertTrue(cache5.get["cache"]((sessionId, RequestId(2))).isDefined) && assertTrue(cache5.get["cache"]((sessionId, RequestId(3))).isDefined) && assertTrue(cache5.get["cache"]((sessionId, RequestId(4))).isDefined) + }, + test("PC-1 error: duplicate request returns cached error without re-execution") { + // A separate state machine that can fail with String error type + sealed trait ErrorCommand extends Command + object ErrorCommand: + case object Fail extends ErrorCommand: + type Response = Int + + type ErrResponse = Int + type ErrSchema = EmptyTuple + + import zio.stream.Stream + import zio.{UIO, ZIO} + + class ErrorStateMachine extends SessionStateMachine[ErrorCommand, ErrResponse, String, String, ErrSchema]: + var callCount = 0 + + protected def applyCommand( + createdAt: Instant, + sessionId: SessionId, + cmd: ErrorCommand + ): StateWriter[HMap[Schema], ServerRequestForSession[String], String, cmd.Response & ErrResponse] = + callCount += 1 + StateWriter.fail("boom") + + protected def handleSessionCreated( + createdAt: Instant, + sid: SessionId, + caps: Map[String, String] + ): StateWriter[HMap[Schema], ServerRequestForSession[String], Nothing, Unit] = + StateWriter.succeed(()) + + protected def handleSessionExpired( + createdAt: Instant, + sid: SessionId, + capabilities: Map[String, String] + ): StateWriter[HMap[Schema], ServerRequestForSession[String], Nothing, Unit] = + StateWriter.succeed(()) + + def takeSnapshot(state: HMap[Schema]): Stream[Nothing, Byte] = zio.stream.ZStream.empty + def restoreFromSnapshot(stream: Stream[Nothing, Byte]): UIO[HMap[Schema]] = ZIO.succeed(HMap.empty[Schema]) + def shouldTakeSnapshot(lastSnapshotIndex: Index, lastSnapshotSize: Long, commitIndex: Index): Boolean = false + + val sm = new ErrorStateMachine() + val state0 = HMap.empty[sm.Schema] + val now = Instant.now() + val sessionId = SessionId("s-error") + + // Create session + val createCmd: SessionCommand[ErrorCommand, String, String] = + SessionCommand.CreateSession[String](now, sessionId, Map.empty) + .asInstanceOf[SessionCommand[ErrorCommand, String, String]] + val (state1, _) = sm.apply(createCmd).run(state0) + + // First request - should execute and cache the error + val cmd1: SessionCommand[ErrorCommand, String, String] = + SessionCommand.ClientRequest(now, sessionId, RequestId(1), RequestId(1), ErrorCommand.Fail) + val (state2, result1) = sm.apply(cmd1).run(state1) + val (_, Left(RequestError.UserError(err1))) = + (result1.asInstanceOf[(List[Any], Either[RequestError[String], Int])]): @unchecked + + // Duplicate request - should NOT execute and should return cached error + val cmd2: SessionCommand[ErrorCommand, String, String] = + SessionCommand.ClientRequest(now, sessionId, RequestId(1), RequestId(1), ErrorCommand.Fail) + val (_, result2) = sm.apply(cmd2).run(state2) + val (_, Left(RequestError.UserError(err2))) = + (result2.asInstanceOf[(List[Any], Either[RequestError[String], Int])]): @unchecked + + assertTrue( + sm.callCount == 1, // executed only once + err1 == "boom", + err2 == "boom" // same cached error returned + ) } ) end IdempotencySpec diff --git a/session-state-machine/src/test/scala/zio/raft/sessionstatemachine/ResponseCachingSpec.scala b/session-state-machine/src/test/scala/zio/raft/sessionstatemachine/ResponseCachingSpec.scala index d8132562..e7ec10a7 100644 --- a/session-state-machine/src/test/scala/zio/raft/sessionstatemachine/ResponseCachingSpec.scala +++ b/session-state-machine/src/test/scala/zio/raft/sessionstatemachine/ResponseCachingSpec.scala @@ -21,7 +21,7 @@ object ResponseCachingSpec extends ZIOSpecDefault: given HMap.KeyLike[CounterKey] = HMap.KeyLike.forNewtype(CounterKey) type TestSchema = ("counter", CounterKey, Int) *: EmptyTuple - type CombinedSchema = Tuple.Concat[SessionSchema[TestResponse, String], TestSchema] + type CombinedSchema = Tuple.Concat[SessionSchema[TestResponse, String, Nothing], TestSchema] val counterKey = CounterKey("value") @@ -29,12 +29,21 @@ object ResponseCachingSpec extends ZIOSpecDefault: import scodec.codecs.* given scodec.Codec[Any] = scodec.Codec[String].upcast[Any] given scodec.Codec[Int] = int32 + // Codec for Either[Nothing, TestResponse] to satisfy cache value type + given scodec.Codec[Either[Nothing, TestResponse]] = + summon[scodec.Codec[TestResponse]].exmap[Either[Nothing, TestResponse]]( + r => scodec.Attempt.successful(Right(r)), + (e: Either[Nothing, TestResponse]) => + e match + case Right(r) => scodec.Attempt.successful(r) + case Left(_) => scodec.Attempt.failure(scodec.Err("Left (Nothing) is not encodable/decodable")) + ) import zio.raft.sessionstatemachine.Codecs.{sessionMetadataCodec, requestIdCodec, pendingServerRequestCodec} given scodec.Codec[PendingServerRequest[?]] = summon[scodec.Codec[PendingServerRequest[String]]].asInstanceOf[scodec.Codec[PendingServerRequest[?]]] - class TestStateMachine extends SessionStateMachine[TestCommand, TestResponse, String, TestSchema] - with ScodecSerialization[TestResponse, String, TestSchema]: + class TestStateMachine extends SessionStateMachine[TestCommand, TestResponse, String, Nothing, TestSchema] + with ScodecSerialization[TestResponse, String, Nothing, TestSchema]: val codecs = summon[HMap.TypeclassMap[CombinedSchema, scodec.Codec]] var callCount = 0 @@ -43,7 +52,7 @@ object ResponseCachingSpec extends ZIOSpecDefault: createdAt: Instant, sessionId: SessionId, cmd: TestCommand - ): StateWriter[HMap[CombinedSchema], ServerRequestForSession[String], cmd.Response & TestResponse] = + ): StateWriter[HMap[CombinedSchema], ServerRequestForSession[String], Nothing, cmd.Response & TestResponse] = callCount += 1 cmd match case TestCommand.Increment(by) => @@ -58,14 +67,14 @@ object ResponseCachingSpec extends ZIOSpecDefault: createdAt: Instant, sid: SessionId, caps: Map[String, String] - ): StateWriter[HMap[CombinedSchema], ServerRequestForSession[String], Unit] = + ): StateWriter[HMap[CombinedSchema], ServerRequestForSession[String], Nothing, Unit] = StateWriter.succeed(()) protected def handleSessionExpired( createdAt: Instant, sid: SessionId, capabilities: Map[String, String] - ): StateWriter[HMap[CombinedSchema], ServerRequestForSession[String], Unit] = + ): StateWriter[HMap[CombinedSchema], ServerRequestForSession[String], Nothing, Unit] = StateWriter.succeed(()) override def shouldTakeSnapshot(lastSnapshotIndex: Index, lastSnapshotSize: Long, commitIndex: Index): Boolean = @@ -80,20 +89,22 @@ object ResponseCachingSpec extends ZIOSpecDefault: val createCmd = SessionCommand.CreateSession[String](now, sessionId, Map.empty) - .asInstanceOf[SessionCommand[TestCommand, String]] + .asInstanceOf[SessionCommand[TestCommand, String, Nothing]] val (state1, _) = sm.apply(createCmd).run(state0) - val cmd1: SessionCommand[TestCommand, String] = + val cmd1: SessionCommand[TestCommand, String, Nothing] = SessionCommand.ClientRequest(now, sessionId, RequestId(1), RequestId(1), TestCommand.Increment(10)) val (state2, result1) = sm.apply(cmd1).run(state1) - val Right((response1, _)) = (result1.asInstanceOf[Either[RequestError, (Int, List[Any])]]): @unchecked + val (_, Right(response1)) = + (result1.asInstanceOf[(List[Any], Either[RequestError[Nothing], Int])]): @unchecked assertTrue(sm.callCount == 1) && assertTrue(response1 == 10) - val cmd2: SessionCommand[TestCommand, String] = + val cmd2: SessionCommand[TestCommand, String, Nothing] = SessionCommand.ClientRequest(now, sessionId, RequestId(1), RequestId(1), TestCommand.Increment(999)) val (_, result2) = sm.apply(cmd2).run(state2) - val Right((response2, _)) = (result2.asInstanceOf[Either[RequestError, (Int, List[Any])]]): @unchecked + val (_, Right(response2)) = + (result2.asInstanceOf[(List[Any], Either[RequestError[Nothing], Int])]): @unchecked assertTrue( sm.callCount == 1 && diff --git a/session-state-machine/src/test/scala/zio/raft/sessionstatemachine/SchemaSpec.scala b/session-state-machine/src/test/scala/zio/raft/sessionstatemachine/SchemaSpec.scala index 55ca90e1..37cb579f 100644 --- a/session-state-machine/src/test/scala/zio/raft/sessionstatemachine/SchemaSpec.scala +++ b/session-state-machine/src/test/scala/zio/raft/sessionstatemachine/SchemaSpec.scala @@ -30,26 +30,26 @@ object SchemaSpec extends ZIOSpecDefault: type TestResponse = String // Simple marker type for test type TestServerReq = String - type CombinedSchema = Tuple.Concat[SessionSchema[TestResponse, TestServerReq], TestUserSchema] + type CombinedSchema = Tuple.Concat[SessionSchema[TestResponse, TestServerReq, Nothing], TestUserSchema] def spec = suite("Schema with Composite Keys")( test("SessionSchema has composite key for cache") { val sessionId = SessionId("session-1") - val state = HMap.empty[SessionSchema[TestResponse, TestServerReq]] + val state = HMap.empty[SessionSchema[TestResponse, TestServerReq, Nothing]] // Cache uses composite key (SessionId, RequestId) val withCache = state.updated["cache"]( (sessionId, RequestId(1)), - "cached-response" + Right("cached-response") ) - val retrieved: Option[Any] = withCache.get["cache"]((sessionId, RequestId(1))) + val retrieved: Option[Either[Nothing, String]] = withCache.get["cache"]((sessionId, RequestId(1))).asInstanceOf[Option[Either[Nothing, String]]] - assertTrue(retrieved.contains("cached-response")) + assertTrue(retrieved.contains(Right("cached-response"))) }, test("SessionSchema has composite key for serverRequests") { val sessionId = SessionId("session-1") - val state = HMap.empty[SessionSchema[TestResponse, TestServerReq]] + val state = HMap.empty[SessionSchema[TestResponse, TestServerReq, Nothing]] // serverRequests uses composite key (SessionId, RequestId) val pending = PendingServerRequest( @@ -79,28 +79,28 @@ object SchemaSpec extends ZIOSpecDefault: // Can use SessionSchema prefixes with composite keys val withSession = state .updated["metadata"](sessionId, SessionMetadata(Map.empty, Instant.now())) - .updated["cache"]((sessionId, RequestId(1)), "value1") + .updated["cache"]((sessionId, RequestId(1)), Right("value1")) // Can use UserSchema prefixes val withUser = withSession .updated["counter"](CounterKey("main"), 42) val metadata: Option[SessionMetadata] = withUser.get["metadata"](sessionId) - val cached: Option[Any] = withUser.get["cache"]((sessionId, RequestId(1))) + val cached: Option[Either[Nothing, String]] = withUser.get["cache"]((sessionId, RequestId(1))).asInstanceOf[Option[Either[Nothing, String]]] val counter: Option[Int] = withUser.get["counter"](CounterKey("main")) assertTrue( metadata.isDefined && - cached.contains("value1") && + cached.contains(Right("value1")) && counter.contains(42) ) }, test("Composite keys enable range queries for session") { val sessionId = SessionId("session-1") - val state = HMap.empty[SessionSchema[TestResponse, TestServerReq]] - .updated["cache"]((sessionId, RequestId(1)), "resp1") - .updated["cache"]((sessionId, RequestId(5)), "resp5") - .updated["cache"]((sessionId, RequestId(10)), "resp10") + val state = HMap.empty[SessionSchema[TestResponse, TestServerReq, Nothing]] + .updated["cache"]((sessionId, RequestId(1)), Right("resp1")) + .updated["cache"]((sessionId, RequestId(5)), Right("resp5")) + .updated["cache"]((sessionId, RequestId(10)), Right("resp10")) // Range query: get all cache entries for session with RequestId in [0, 7) val rangeResults = state.range["cache"]( @@ -115,10 +115,10 @@ object SchemaSpec extends ZIOSpecDefault: }, test("Numeric ordering works correctly for RequestIds") { val sessionId = SessionId("session-1") - val state = HMap.empty[SessionSchema[TestResponse, TestServerReq]] - .updated["cache"]((sessionId, RequestId(9)), "nine") - .updated["cache"]((sessionId, RequestId(42)), "forty-two") - .updated["cache"]((sessionId, RequestId(100)), "hundred") + val state = HMap.empty[SessionSchema[TestResponse, TestServerReq, Nothing]] + .updated["cache"]((sessionId, RequestId(9)), Right("nine")) + .updated["cache"]((sessionId, RequestId(42)), Right("forty-two")) + .updated["cache"]((sessionId, RequestId(100)), Right("hundred")) // Range should use numeric ordering, not lexicographic // RequestId uses big-endian encoding, so 9 < 42 < 100 diff --git a/session-state-machine/src/test/scala/zio/raft/sessionstatemachine/ServerRequestChunkSpec.scala b/session-state-machine/src/test/scala/zio/raft/sessionstatemachine/ServerRequestChunkSpec.scala index 48d466ff..fe4e2c45 100644 --- a/session-state-machine/src/test/scala/zio/raft/sessionstatemachine/ServerRequestChunkSpec.scala +++ b/session-state-machine/src/test/scala/zio/raft/sessionstatemachine/ServerRequestChunkSpec.scala @@ -16,17 +16,26 @@ object ServerRequestChunkSpec extends ZIOSpecDefault: type TestResponse = Int type TestSchema = EmptyTuple - type CombinedSchema = Tuple.Concat[SessionSchema[TestResponse, String], TestSchema] + type CombinedSchema = Tuple.Concat[SessionSchema[TestResponse, String, Nothing], TestSchema] // Minimal codecs import scodec.codecs.* given scodec.Codec[Int] = int32 + // Codec for Either[Nothing, TestResponse] to satisfy cache value type + given scodec.Codec[Either[Nothing, TestResponse]] = + summon[scodec.Codec[TestResponse]].exmap[Either[Nothing, TestResponse]]( + r => scodec.Attempt.successful(Right(r)), + (e: Either[Nothing, TestResponse]) => + e match + case Right(r) => scodec.Attempt.successful(r) + case Left(_) => scodec.Attempt.failure(scodec.Err("Left (Nothing) is not encodable/decodable")) + ) import zio.raft.sessionstatemachine.Codecs.{sessionMetadataCodec, requestIdCodec, pendingServerRequestCodec} given scodec.Codec[PendingServerRequest[?]] = summon[scodec.Codec[PendingServerRequest[String]]].asInstanceOf[scodec.Codec[PendingServerRequest[?]]] - class TestStateMachine extends SessionStateMachine[TestCommand, TestResponse, String, TestSchema] - with ScodecSerialization[TestResponse, String, TestSchema]: + class TestStateMachine extends SessionStateMachine[TestCommand, TestResponse, String, Nothing, TestSchema] + with ScodecSerialization[TestResponse, String, Nothing, TestSchema]: val codecs = summon[HMap.TypeclassMap[CombinedSchema, scodec.Codec]] @@ -34,7 +43,7 @@ object ServerRequestChunkSpec extends ZIOSpecDefault: createdAt: Instant, sessionId: SessionId, cmd: TestCommand - ): StateWriter[HMap[CombinedSchema], ServerRequestForSession[String], cmd.Response & TestResponse] = + ): StateWriter[HMap[CombinedSchema], ServerRequestForSession[String], Nothing, cmd.Response & TestResponse] = cmd match case TestCommand.Emit(count) => // Log a Chunk of server requests in order 1..count to current session @@ -43,7 +52,7 @@ object ServerRequestChunkSpec extends ZIOSpecDefault: if requests.isEmpty then StateWriter.succeed(count.asInstanceOf[cmd.Response & TestResponse]) else - val init: StateWriter[HMap[CombinedSchema], ServerRequestForSession[String], Unit] = + val init: StateWriter[HMap[CombinedSchema], ServerRequestForSession[String], Nothing, Unit] = StateWriter.get[HMap[CombinedSchema]].as(()) val all = requests.foldLeft(init) { (acc, r) => acc.flatMap(_ => StateWriter.log(r)) } all.as(count.asInstanceOf[cmd.Response & TestResponse]) @@ -52,14 +61,14 @@ object ServerRequestChunkSpec extends ZIOSpecDefault: createdAt: Instant, sid: SessionId, caps: Map[String, String] - ): StateWriter[HMap[CombinedSchema], ServerRequestForSession[String], Unit] = + ): StateWriter[HMap[CombinedSchema], ServerRequestForSession[String], Nothing, Unit] = StateWriter.succeed(()) protected def handleSessionExpired( createdAt: Instant, sid: SessionId, capabilities: Map[String, String] - ): StateWriter[HMap[CombinedSchema], ServerRequestForSession[String], Unit] = + ): StateWriter[HMap[CombinedSchema], ServerRequestForSession[String], Nothing, Unit] = StateWriter.succeed(()) override def shouldTakeSnapshot(lastSnapshotIndex: Index, lastSnapshotSize: Long, commitIndex: Index): Boolean = @@ -74,14 +83,14 @@ object ServerRequestChunkSpec extends ZIOSpecDefault: val create = SessionCommand.CreateSession[String](now, s1, Map.empty) - .asInstanceOf[SessionCommand[TestCommand, String]] + .asInstanceOf[SessionCommand[TestCommand, String, Nothing]] val (state1, _) = sm.apply(create).run(state0) - val cmd: SessionCommand[TestCommand, String] = + val cmd: SessionCommand[TestCommand, String, Nothing] = SessionCommand.ClientRequest(now, s1, RequestId(1), RequestId(0), TestCommand.Emit(5)) val (state2, result) = sm.apply(cmd).run(state1) - val Right((resp, envelopes)) = - (result.asInstanceOf[Either[RequestError, (Int, List[ServerRequestEnvelope[String]])]]): @unchecked + val (envelopes, Right(resp)) = + (result.asInstanceOf[(List[ServerRequestEnvelope[String]], Either[RequestError[Nothing], Int])]): @unchecked val h = state2.asInstanceOf[HMap[CombinedSchema]] val s1Reqs = h.iterator["serverRequests"].toList.collect { diff --git a/session-state-machine/src/test/scala/zio/raft/sessionstatemachine/ServerRequestTargetingSpec.scala b/session-state-machine/src/test/scala/zio/raft/sessionstatemachine/ServerRequestTargetingSpec.scala index 7beec580..63997635 100644 --- a/session-state-machine/src/test/scala/zio/raft/sessionstatemachine/ServerRequestTargetingSpec.scala +++ b/session-state-machine/src/test/scala/zio/raft/sessionstatemachine/ServerRequestTargetingSpec.scala @@ -15,17 +15,26 @@ object ServerRequestTargetingSpec extends ZIOSpecDefault: type TestResponse = Unit type TestSchema = EmptyTuple - type CombinedSchema = Tuple.Concat[SessionSchema[TestResponse, String], TestSchema] + type CombinedSchema = Tuple.Concat[SessionSchema[TestResponse, String, Nothing], TestSchema] // Minimal codecs import scodec.codecs.* given scodec.Codec[Unit] = provide(()) + // Codec for Either[Nothing, TestResponse] to satisfy cache value type + given scodec.Codec[Either[Nothing, TestResponse]] = + summon[scodec.Codec[TestResponse]].exmap[Either[Nothing, TestResponse]]( + r => scodec.Attempt.successful(Right(r)), + (e: Either[Nothing, TestResponse]) => + e match + case Right(r) => scodec.Attempt.successful(r) + case Left(_) => scodec.Attempt.failure(scodec.Err("Left (Nothing) is not encodable/decodable")) + ) import zio.raft.sessionstatemachine.Codecs.{sessionMetadataCodec, requestIdCodec, pendingServerRequestCodec} given scodec.Codec[PendingServerRequest[?]] = summon[scodec.Codec[PendingServerRequest[String]]].asInstanceOf[scodec.Codec[PendingServerRequest[?]]] - class TestStateMachine extends SessionStateMachine[TestCommand, TestResponse, String, TestSchema] - with ScodecSerialization[TestResponse, String, TestSchema]: + class TestStateMachine extends SessionStateMachine[TestCommand, TestResponse, String, Nothing, TestSchema] + with ScodecSerialization[TestResponse, String, Nothing, TestSchema]: val codecs = summon[HMap.TypeclassMap[CombinedSchema, scodec.Codec]] @@ -33,7 +42,7 @@ object ServerRequestTargetingSpec extends ZIOSpecDefault: createdAt: Instant, sessionId: SessionId, cmd: TestCommand - ): StateWriter[HMap[CombinedSchema], ServerRequestForSession[String], cmd.Response & TestResponse] = + ): StateWriter[HMap[CombinedSchema], ServerRequestForSession[String], Nothing, cmd.Response & TestResponse] = // Emit server requests to two different sessions for _ <- StateWriter.log(ServerRequestForSession[String](SessionId("s1"), "msg-s1")) @@ -44,14 +53,14 @@ object ServerRequestTargetingSpec extends ZIOSpecDefault: createdAt: Instant, sid: SessionId, caps: Map[String, String] - ): StateWriter[HMap[CombinedSchema], ServerRequestForSession[String], Unit] = + ): StateWriter[HMap[CombinedSchema], ServerRequestForSession[String], Nothing, Unit] = StateWriter.succeed(()) protected def handleSessionExpired( createdAt: Instant, sid: SessionId, capabilities: Map[String, String] - ): StateWriter[HMap[CombinedSchema], ServerRequestForSession[String], Unit] = + ): StateWriter[HMap[CombinedSchema], ServerRequestForSession[String], Nothing, Unit] = StateWriter.succeed(()) override def shouldTakeSnapshot(lastSnapshotIndex: Index, lastSnapshotSize: Long, commitIndex: Index): Boolean = @@ -67,15 +76,15 @@ object ServerRequestTargetingSpec extends ZIOSpecDefault: val create1 = SessionCommand.CreateSession[String](now, s1, Map.empty) - .asInstanceOf[SessionCommand[TestCommand, String]] + .asInstanceOf[SessionCommand[TestCommand, String, Nothing]] val (state1, _) = sm.apply(create1).run(state0) val create2 = SessionCommand.CreateSession[String](now, s2, Map.empty) - .asInstanceOf[SessionCommand[TestCommand, String]] + .asInstanceOf[SessionCommand[TestCommand, String, Nothing]] val (state2, _) = sm.apply(create2).run(state1) - val cmd: SessionCommand[TestCommand, String] = + val cmd: SessionCommand[TestCommand, String, Nothing] = SessionCommand.ClientRequest(now, s1, RequestId(1), RequestId(0), TestCommand.Noop) val (state3, _) = sm.apply(cmd).run(state2) diff --git a/session-state-machine/src/test/scala/zio/raft/sessionstatemachine/SessionLifecycleSpec.scala b/session-state-machine/src/test/scala/zio/raft/sessionstatemachine/SessionLifecycleSpec.scala index 3022a2eb..043411d0 100644 --- a/session-state-machine/src/test/scala/zio/raft/sessionstatemachine/SessionLifecycleSpec.scala +++ b/session-state-machine/src/test/scala/zio/raft/sessionstatemachine/SessionLifecycleSpec.scala @@ -15,17 +15,26 @@ object SessionLifecycleSpec extends ZIOSpecDefault: type TestResponse = Unit type TestSchema = EmptyTuple - type CombinedSchema = Tuple.Concat[SessionSchema[TestResponse, String], TestSchema] + type CombinedSchema = Tuple.Concat[SessionSchema[TestResponse, String, Nothing], TestSchema] // Minimal codecs import scodec.codecs.* given scodec.Codec[Unit] = provide(()) + // Codec for Either[Nothing, TestResponse] to satisfy cache value type + given scodec.Codec[Either[Nothing, TestResponse]] = + summon[scodec.Codec[TestResponse]].exmap[Either[Nothing, TestResponse]]( + r => scodec.Attempt.successful(Right(r)), + (e: Either[Nothing, TestResponse]) => + e match + case Right(r) => scodec.Attempt.successful(r) + case Left(_) => scodec.Attempt.failure(scodec.Err("Left (Nothing) is not encodable/decodable")) + ) import zio.raft.sessionstatemachine.Codecs.{sessionMetadataCodec, requestIdCodec, pendingServerRequestCodec} given scodec.Codec[PendingServerRequest[?]] = summon[scodec.Codec[PendingServerRequest[String]]].asInstanceOf[scodec.Codec[PendingServerRequest[?]]] - class TestStateMachine extends SessionStateMachine[TestCommand, TestResponse, String, TestSchema] - with ScodecSerialization[TestResponse, String, TestSchema]: + class TestStateMachine extends SessionStateMachine[TestCommand, TestResponse, String, Nothing, TestSchema] + with ScodecSerialization[TestResponse, String, Nothing, TestSchema]: val codecs = summon[HMap.TypeclassMap[CombinedSchema, scodec.Codec]] @@ -33,14 +42,14 @@ object SessionLifecycleSpec extends ZIOSpecDefault: createdAt: Instant, sessionId: SessionId, cmd: TestCommand - ): StateWriter[HMap[CombinedSchema], ServerRequestForSession[String], cmd.Response & TestResponse] = + ): StateWriter[HMap[CombinedSchema], ServerRequestForSession[String], Nothing, cmd.Response & TestResponse] = StateWriter.succeed(().asInstanceOf[cmd.Response & TestResponse]) protected def handleSessionCreated( createdAt: Instant, sid: SessionId, caps: Map[String, String] - ): StateWriter[HMap[CombinedSchema], ServerRequestForSession[String], Unit] = + ): StateWriter[HMap[CombinedSchema], ServerRequestForSession[String], Nothing, Unit] = // Emit a server request to another session (admin) upon session creation StateWriter.log(ServerRequestForSession[String](SessionId("admin"), s"created:${SessionId.unwrap(sid)}")) .as(()) @@ -49,7 +58,7 @@ object SessionLifecycleSpec extends ZIOSpecDefault: createdAt: Instant, sid: SessionId, capabilities: Map[String, String] - ): StateWriter[HMap[CombinedSchema], ServerRequestForSession[String], Unit] = + ): StateWriter[HMap[CombinedSchema], ServerRequestForSession[String], Nothing, Unit] = // Emit a server request to another session (admin) upon session expiration StateWriter.log(ServerRequestForSession[String](SessionId("admin"), s"expired:${SessionId.unwrap(sid)}")) .as(()) @@ -67,7 +76,7 @@ object SessionLifecycleSpec extends ZIOSpecDefault: // Create session val create = SessionCommand.CreateSession[String](now, sid, Map("k" -> "v")) - .asInstanceOf[SessionCommand[TestCommand, String]] + .asInstanceOf[SessionCommand[TestCommand, String, Nothing]] val (state1, _) = sm.apply(create).run(state0) // Verify admin request exists @@ -80,7 +89,7 @@ object SessionLifecycleSpec extends ZIOSpecDefault: // Expire session val expire = SessionCommand.SessionExpired[String](now, sid) - .asInstanceOf[SessionCommand[TestCommand, String]] + .asInstanceOf[SessionCommand[TestCommand, String, Nothing]] val (state2, _) = sm.apply(expire).run(state1) // Verify session data cleaned diff --git a/session-state-machine/src/test/scala/zio/raft/sessionstatemachine/SnapshotSpec.scala b/session-state-machine/src/test/scala/zio/raft/sessionstatemachine/SnapshotSpec.scala index 2408604e..9eb99456 100644 --- a/session-state-machine/src/test/scala/zio/raft/sessionstatemachine/SnapshotSpec.scala +++ b/session-state-machine/src/test/scala/zio/raft/sessionstatemachine/SnapshotSpec.scala @@ -11,6 +11,9 @@ object SnapshotSpec extends ZIOSpecDefault: case object R0 extends R given Codec[R] = provide(R0) given Codec[Int] = int32 + // Codec for Either[Nothing, R] to satisfy cache value type + given Codec[Either[Nothing, R]] = + summon[Codec[R]].as[Right[Nothing, R]].upcast[Either[Nothing, R]] import zio.prelude.Newtype object UKey extends Newtype[String] @@ -21,7 +24,7 @@ object SnapshotSpec extends ZIOSpecDefault: import zio.raft.sessionstatemachine.Codecs.{sessionMetadataCodec, requestIdCodec, pendingServerRequestCodec} type UserSchema = ("u", UKey, Int) *: EmptyTuple - type CombinedSchema = Tuple.Concat[SessionSchema[R, String], UserSchema] + type CombinedSchema = Tuple.Concat[SessionSchema[R, String, Nothing], UserSchema] // Minimal state machine to exercise ScodecSerialization import zio.raft.Command @@ -33,7 +36,7 @@ object SnapshotSpec extends ZIOSpecDefault: type Response = Unit class TestMachine extends zio.raft.StateMachine[HMap[CombinedSchema], DCmd] - with ScodecSerialization[R, String, UserSchema]: + with ScodecSerialization[R, String, Nothing, UserSchema]: override val codecs: HMap.TypeclassMap[Schema, Codec] = summon def emptyState: HMap[CombinedSchema] = HMap.empty def apply(command: DCmd): State[HMap[CombinedSchema], command.Response] =