Skip to content
35 changes: 28 additions & 7 deletions .specify/templates/plan-template.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ specs/[###-feature]/
├── plan.md # This file (/plan command output)
├── research.md # Phase 0 output (/plan command)
├── data-model.md # Phase 1 output (/plan command)
├── tests.md # Phase 1 output (/plan command)
├── design.md # Phase 1 output (/plan command)
├── quickstart.md # Phase 1 output (/plan command)
└── tasks.md # Phase 2 output (/tasks command - NOT created by /plan)
```
Expand Down Expand Up @@ -114,11 +116,30 @@ specs/[###-feature]/
- Validation rules from requirements
- State transitions if applicable

2. **Extract test scenarios** from user stories:
2. **Update plan with high-level design and architecture** → `design.md`
- Design high-level solution, based on the `research.md`, `data-model.md`, `plan.md`, and `spec.md`.
- Read the relevant chapter from the Raft Paper, you can find it over in memory folder.
- What new projects are you going to add?
- What are the new dependencies?
- What files are going to be changed?
- What new entities or classes you need to add?
- What areas are affected? Where do we need to add more tests to cover the new functionality?
- What procotol changes are required, if any?
- Do we need a new version of a codec for the protocol change?
- Do we need a new protocol?
- In general, give high-level overview of the solution in plain english and drawing as well.

3. **Extract test scenarios from user stories** → `tests.md`:
- Each story → integration test scenario
- For each functional requirement, evaluate if a test case is required → test case
- Each new entity that requires codec → codec test case
- For each edge case, prompt the user if a test is required → test case
- Based on the `design.md`, what additional test cases we need to add?
- Collect the different test cases in the `tests.md` in plain english
- We are NOT doing Test Driven Development. Only collect the tests to the `tests.md` file.
- Quickstart test = story validation steps

3. **Update agent file incrementally** (O(1) operation):
4. **Update agent file incrementally** (O(1) operation):
- Run `.specify/scripts/bash/update-agent-context.sh cursor`
**IMPORTANT**: Execute it exactly as specified above. Do not add or remove any arguments.
- If exists: Add only NEW tech from current plan
Expand All @@ -127,20 +148,20 @@ specs/[###-feature]/
- Keep under 150 lines for token efficiency
- Output to repository root

**Output**: data-model.md, failing tests, quickstart.md, agent-specific file
**Output**: data-model.md, tests.md, design.md, quickstart.md, agent-specific file

## Phase 2: Task Planning Approach
*This section describes what the /tasks command will do - DO NOT execute during /plan*

**Task Generation Strategy**:
- Load `.specify/templates/tasks-template.md` as base
- Generate tasks from Phase 1 design docs (data model, quickstart)
- Generate tasks from Phase 1 design docs (data model, design, tests, quickstart)
- Each entity → model creation task [P]
- Each user storyintegration test task
- Implementation tasks to make tests pass
- Each new projectmodel creation task
- Implementation tasks to the tests in `tests.md`, no TDD, implement each test to completion.

**Ordering Strategy**:
- TDD order: Tests before implementation
- No TDD order: implementations tasks before tests tasks
- Dependency order: Models before services before UI
- Mark [P] for parallel execution (independent files)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,9 @@ object Codecs {
/** Codec for SessionId (UUID string).
*/
implicit val sessionIdCodec: Codec[SessionId] = {
variableSizeBytes(uint16, utf8).xmap(
variableSizeBytes(uint8, utf8).xmap(
str => SessionId(str),
sessionId => SessionId.unwrap(sessionId)
sessionId => sessionId.value
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,13 @@ package object protocol {
require(id.nonEmpty, "Session ID cannot be empty")
SessionId(id)
}

implicit class SessionIdSyntax(private val sessionId: SessionId) extends AnyVal {

/** Get the value of the session ID.
*/
def value: String = SessionId.unwrap(sessionId)
}
}

type SessionId = SessionId.Type
Expand Down
7 changes: 6 additions & 1 deletion kvstore/src/main/scala/zio/kvstore/Codecs.scala
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package zio.kvstore

import scodec.Codec
import scodec.codecs.{ascii, discriminated, fixedSizeBytes, utf8_32}
import scodec.codecs.{ascii, discriminated, fixedSizeBytes, utf8_32, uint8, provide}

object Codecs:
// Response codecs
Expand All @@ -19,6 +19,11 @@ object Codecs:
.typecase("W", summon[Codec[KVResponse.WatchDone.type]])
.typecase("G", summon[Codec[KVResponse.GetResult]])

given Codec[Either[Nothing, KVResponse]] =
discriminated[Either[Nothing, KVResponse]].by(uint8)
.typecase(0, scodec.codecs.fail(scodec.Err("Cannot decode Left[Nothing]")))
.typecase(1, summon[Codec[KVResponse]].xmap(Right(_), _.value))

// Value codec for KV schema values
given Codec[String] = utf8_32
private val sessionIdCodec = zio.raft.protocol.Codecs.sessionIdCodec
Expand Down
6 changes: 5 additions & 1 deletion kvstore/src/main/scala/zio/kvstore/KVServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,11 @@ class KVServer(server: RaftServer):
val bytes = summon[Codec[A]].encode(response).require.bytes
server.sendQueryResponse(sessionId, QueryResponse(correlationId, bytes))

def requestError(sessionId: SessionId, requestId: RequestId, reason: zio.raft.sessionstatemachine.RequestError) =
def requestError(
sessionId: SessionId,
requestId: RequestId,
reason: zio.raft.sessionstatemachine.RequestError[Nothing]
) =
val serverReason = reason match
case zio.raft.sessionstatemachine.RequestError.ResponseEvicted =>
zio.raft.protocol.RequestErrorReason.ResponseEvicted
Expand Down
10 changes: 6 additions & 4 deletions kvstore/src/main/scala/zio/kvstore/KVStateMachine.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,14 @@ import zio.kvstore.Codecs.given
import zio.raft.sessionstatemachine.given
import zio.raft.sessionstatemachine.Codecs.{sessionMetadataCodec, requestIdCodec, pendingServerRequestCodec}

class KVStateMachine extends SessionStateMachine[KVCommand, KVResponse, zio.kvstore.protocol.KVServerRequest, KVSchema]
with ScodecSerialization[KVResponse, zio.kvstore.protocol.KVServerRequest, KVSchema]:
class KVStateMachine
extends SessionStateMachine[KVCommand, KVResponse, zio.kvstore.protocol.KVServerRequest, Nothing, KVSchema]
with ScodecSerialization[KVResponse, zio.kvstore.protocol.KVServerRequest, Nothing, KVSchema]:

// Local alias to aid match-type reduction bug in scala 3.3
type Schema = Tuple.Concat[zio.raft.sessionstatemachine.SessionSchema[KVResponse, KVServerRequest], KVSchema]
private type SW[A] = StateWriter[HMap[Schema], ServerRequestForSession[zio.kvstore.protocol.KVServerRequest], A]
type Schema = Tuple.Concat[zio.raft.sessionstatemachine.SessionSchema[KVResponse, KVServerRequest, Nothing], KVSchema]
private type SW[A] =
StateWriter[HMap[Schema], ServerRequestForSession[zio.kvstore.protocol.KVServerRequest], Nothing, A]

// Helpers
private def putValue(key: KVKey, value: String): SW[Unit] =
Expand Down
47 changes: 29 additions & 18 deletions kvstore/src/main/scala/zio/kvstore/node/Node.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,11 @@ import zio.raft.zmq.ZmqRpc
final case class Node(
kvServer: KVServer,
raft: zio.raft.Raft[
zio.raft.HMap[Tuple.Concat[zio.raft.sessionstatemachine.SessionSchema[KVResponse, KVServerRequest], KVSchema]],
SessionCommand[KVCommand, KVServerRequest]
zio.raft.HMap[Tuple.Concat[
zio.raft.sessionstatemachine.SessionSchema[KVResponse, KVServerRequest, Nothing],
KVSchema
]],
SessionCommand[KVCommand, KVServerRequest, Nothing]
]
):

Expand Down Expand Up @@ -115,7 +118,10 @@ final case class Node(
for
state <- raft.readStateDirty
hasPending =
SessionStateMachine.hasPendingRequests[KVResponse, KVServerRequest, KVSchema](state, lastSentBefore)
SessionStateMachine.hasPendingRequests[KVResponse, KVServerRequest, Nothing, KVSchema](
state,
lastSentBefore
)
_ <-
if hasPending then
val cmd = SessionCommand.GetRequestsForRetry[KVServerRequest](now, lastSentBefore)
Expand All @@ -135,13 +141,14 @@ final case class Node(
ZIO.logInfo("Node stepped up") *>
raft.readState.either.flatMap {
case Right(state) =>
val sessions = SessionStateMachine.getSessions[KVResponse, KVServerRequest, KVSchema](state).map {
case (sessionId: SessionId, metadata) =>
(
sessionId,
zio.raft.sessionstatemachine.SessionMetadata(metadata.capabilities, metadata.createdAt)
)
}
val sessions =
SessionStateMachine.getSessions[KVResponse, KVServerRequest, Nothing, KVSchema](state).map {
case (sessionId: SessionId, metadata) =>
(
sessionId,
zio.raft.sessionstatemachine.SessionMetadata(metadata.capabilities, metadata.createdAt)
)
}
kvServer.stepUp(sessions)
case Left(_) => ZIO.unit
}
Expand Down Expand Up @@ -169,7 +176,7 @@ final case class Node(
): UIO[Option[command.Response]] =
for
now <- Clock.instant
cmd = SessionCommand.ClientRequest[KVCommand, KVServerRequest](
cmd = SessionCommand.ClientRequest[KVCommand, KVServerRequest, Nothing](
now,
sessionId,
requestId,
Expand All @@ -178,16 +185,19 @@ final case class Node(
)
either <- raft.sendCommand(cmd).either
result <- either match
case Right(Right((resp, envelopes))) =>
case Right(envelopes, Right(resp)) =>
for
_ <- ZIO.foreachDiscard(envelopes) { env =>
kvServer.sendServerRequest(now, env.sessionId, env.requestId, env.payload)
}
yield Some(resp.asInstanceOf[command.Response])
case Right(Left(zio.raft.sessionstatemachine.RequestError.ResponseEvicted)) =>
kvServer.requestError(sessionId, requestId, zio.raft.sessionstatemachine.RequestError.ResponseEvicted).as(
None
)
case Right(envelopes, Left(zio.raft.sessionstatemachine.RequestError.ResponseEvicted)) =>
for
_ <- ZIO.foreachDiscard(envelopes) { env =>
kvServer.sendServerRequest(now, env.sessionId, env.requestId, env.payload)
}
_ <- kvServer.requestError(sessionId, requestId, zio.raft.sessionstatemachine.RequestError.ResponseEvicted)
yield None
case Left(_: zio.raft.NotALeaderError) =>
// Ignore not leader error, server will handle it eventually
ZIO.none
Expand All @@ -212,9 +222,10 @@ object Node:
for
stable <- LmdbStable.make.debug("LmdbStable.make")

logStore <- SegmentedLog.make[SessionCommand[KVCommand, KVServerRequest]](logDirectory).debug("SegmentedLog.make")
logStore <-
SegmentedLog.make[SessionCommand[KVCommand, KVServerRequest, Nothing]](logDirectory).debug("SegmentedLog.make")
snapshotStore <- FileSnapshotStore.make(zio.nio.file.Path(snapshotDirectory)).debug("FileSnapshotStore.make")
rpc <- ZmqRpc.make[SessionCommand[KVCommand, KVServerRequest]](
rpc <- ZmqRpc.make[SessionCommand[KVCommand, KVServerRequest, Nothing]](
nodeAddress,
peers
).debug("ZmqRpc.make")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,14 +60,14 @@ object Codecs:
/** Codec for SessionCommand, parameterized by UC (user command) and SR (server request payload). Requires codecs for
* UC and SR in scope.
*/
given sessionCommandCodec[UC <: zio.raft.Command, SR](using
given sessionCommandCodec[UC <: zio.raft.Command, SR, E](using
ucCodec: Codec[UC],
srCodec: Codec[SR]
): Codec[SessionCommand[UC, SR]] =
val clientRequestV0: Codec[SessionCommand.ClientRequest[UC, SR]] =
): Codec[SessionCommand[UC, SR, E]] =
val clientRequestV0: Codec[SessionCommand.ClientRequest[UC, SR, E]] =
(instantCodec :: sessionIdCodec :: requestIdCodec :: requestIdCodec :: ucCodec)
.as[SessionCommand.ClientRequest[UC, SR]]
val clientRequestCodec: Codec[SessionCommand.ClientRequest[UC, SR]] =
.as[SessionCommand.ClientRequest[UC, SR, E]]
val clientRequestCodec: Codec[SessionCommand.ClientRequest[UC, SR, E]] =
(uint8 :: clientRequestV0).xmap(
{ case (_, cmd) => cmd },
cmd => (0, cmd)
Expand Down Expand Up @@ -105,7 +105,7 @@ object Codecs:
cmd => (0, cmd)
)

discriminated[SessionCommand[UC, SR]]
discriminated[SessionCommand[UC, SR, E]]
.by(uint8)
.typecase(0, clientRequestCodec)
.typecase(1, serverRequestAckCodec)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@ import scodec.Codec
* // takeSnapshot and restoreFromSnapshot provided!
* }}}
*/
trait ScodecSerialization[R, SR, UserSchema <: Tuple]:
this: StateMachine[HMap[Tuple.Concat[SessionSchema[R, SR], UserSchema]], ?] =>
trait ScodecSerialization[R, SR, E, UserSchema <: Tuple]:
this: StateMachine[HMap[Tuple.Concat[SessionSchema[R, SR, E], UserSchema]], ?] =>

// Type alias for convenience
type Schema = Tuple.Concat[SessionSchema[R, SR], UserSchema]
type Schema = Tuple.Concat[SessionSchema[R, SR, E], UserSchema]

/** TypeclassMap providing codecs for all value types in the schema.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import java.time.Instant
* @tparam SR
* Server-initiated request payload type
*/
sealed trait SessionCommand[+UC <: Command, SR] extends Command
sealed trait SessionCommand[+UC <: Command, SR, +E] extends Command

object SessionCommand:

Expand Down Expand Up @@ -47,15 +47,15 @@ object SessionCommand:
* @note
* Requests can arrive out of order. Eviction detection uses lowestRequestId, not requestId ordering
*/
case class ClientRequest[UC <: Command, SR](
case class ClientRequest[UC <: Command, SR, E](
createdAt: Instant,
sessionId: SessionId,
requestId: RequestId,
lowestPendingRequestId: RequestId,
command: UC
) extends SessionCommand[UC, SR]:
) extends SessionCommand[UC, SR, E]:
// Response type can be an error or the user command's response with server request envelopes
type Response = Either[RequestError, (command.Response, List[ServerRequestEnvelope[SR]])]
type Response = (List[ServerRequestEnvelope[SR]], Either[RequestError[E], command.Response])

/** Acknowledgment from a client for a server-initiated request.
*
Expand All @@ -71,7 +71,7 @@ object SessionCommand:
createdAt: Instant,
sessionId: SessionId,
requestId: RequestId
) extends SessionCommand[Nothing, SR]:
) extends SessionCommand[Nothing, SR, Nothing]:
type Response = Unit

/** Create a new session.
Expand All @@ -89,7 +89,7 @@ object SessionCommand:
createdAt: Instant,
sessionId: SessionId,
capabilities: Map[String, String]
) extends SessionCommand[Nothing, SR]:
) extends SessionCommand[Nothing, SR, Nothing]:
type Response = List[ServerRequestEnvelope[SR]] // server request envelopes

/** Notification that a session has expired.
Expand All @@ -104,7 +104,7 @@ object SessionCommand:
case class SessionExpired[SR](
createdAt: Instant,
sessionId: SessionId
) extends SessionCommand[Nothing, SR]:
) extends SessionCommand[Nothing, SR, Nothing]:
type Response = List[ServerRequestEnvelope[SR]] // server request envelopes

/** Command to atomically retrieve requests needing retry and update lastSentAt.
Expand All @@ -122,6 +122,6 @@ object SessionCommand:
case class GetRequestsForRetry[SR](
createdAt: Instant,
lastSentBefore: Instant
) extends SessionCommand[Nothing, SR]:
) extends SessionCommand[Nothing, SR, Nothing]:
type Response = List[ServerRequestEnvelope[SR]]
end SessionCommand
Loading