Skip to content

Commit 258503e

Browse files
committed
analyze
1 parent f8f46b0 commit 258503e

File tree

6 files changed

+671
-771
lines changed

6 files changed

+671
-771
lines changed

specs/002-session-state-machine/contracts/session-state-machine.md

Lines changed: 73 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -1,93 +1,99 @@
1-
# Contract: Session State Machine
1+
# Contract: Session State Machine Template Method
22

33
**Feature**: 002-session-state-machine
4-
**Type**: Core Component Contract
4+
**Type**: Core Component Contract (Base Class)
55
**Status**: Design Complete
66

77
## Interface Definition
88

99
```scala
10-
package zio.raft.statemachine
10+
package zio.raft.sessionstatemachine
1111

1212
import zio.*
1313
import zio.raft.*
1414
import zio.raft.protocol.*
1515
import zio.prelude.State
16-
import scodec.Codec
1716
import java.time.Instant
1817

1918
/**
2019
* Session state machine handling idempotency, response caching,
21-
* and server-initiated request management.
20+
* and server-initiated request management (template pattern).
2221
*
23-
* Wraps user state machine (passed in constructor) to provide
24-
* linearizable semantics per Raft dissertation Chapter 6.3.
22+
* Abstract base class that users extend to add session management
23+
* to their business logic per Raft dissertation Chapter 6.3.
2524
*
2625
* Extends zio.raft.StateMachine to integrate with existing Raft infrastructure.
2726
*
28-
* State is Map[String, Any] with key prefixes:
29-
* - "session/" for session management data
30-
* - User keys for business logic data (no prefix required)
27+
* State is HMap[CombinedSchema[UserSchema]] with type-safe prefixes:
28+
* - SessionSchema prefixes: "metadata", "cache", "serverRequests", "lastServerRequestId"
29+
* - UserSchema prefixes: user-defined with their own types
3130
*
3231
* @tparam UC User command type (extends Command, has its own Response type)
3332
* @tparam SR Server request type (user-defined)
33+
* @tparam UserSchema User's schema (tuple of prefix-type pairs)
3434
*/
35-
class SessionStateMachine[UC <: Command, SR](
36-
userSM: UserStateMachine[UC, SR],
37-
stateCodec: Codec[(String, Any)] // ONE codec for all state serialization
38-
) extends StateMachine[Map[String, Any], SessionCommand[UC]]:
35+
abstract class SessionStateMachine[UC <: Command, SR, UserSchema <: Tuple]
36+
extends StateMachine[HMap[CombinedSchema[UserSchema]], SessionCommand[UC]]:
3937

4038
/**
41-
* Initial state is empty Map.
39+
* Initial state is empty HMap.
4240
*/
43-
def emptyState: Map[String, Any] = Map.empty
41+
def emptyState: HMap[CombinedSchema[UserSchema]] = HMap.empty
4442

4543
/**
46-
* Apply SessionCommand to state.
44+
* Apply SessionCommand to state (FINAL TEMPLATE METHOD).
4745
*
48-
* For ClientRequest: checks idempotency, delegates to user SM if needed, caches response.
46+
* For ClientRequest: checks idempotency, narrows state, calls applyCommand, caches response.
4947
* For ServerRequestAck: removes acknowledged requests (cumulative).
50-
* For SessionCreationConfirmed: adds new session metadata, forwards to user SM.
51-
* For SessionExpired: forwards to user SM, then removes all session data.
48+
* For SessionCreationConfirmed: adds session metadata, calls handleSessionCreated.
49+
* For SessionExpired: calls handleSessionExpired, then removes all session data.
5250
*
5351
* @param command Session-level command (UC is the user command type)
5452
* @return State monad with new state and command response
5553
*/
56-
def apply(command: SessionCommand[UC]): State[Map[String, Any], command.Response]
54+
final def apply(command: SessionCommand[UC]): State[HMap[CombinedSchema[UserSchema]], command.Response]
5755

5856
/**
59-
* Create snapshot of state.
57+
* Abstract methods - users MUST implement these.
58+
*/
59+
protected def applyCommand(command: UC): State[HMap[UserSchema], (command.Response, List[SR])]
60+
protected def handleSessionCreated(sessionId: SessionId, capabilities: Map[String, String]): State[HMap[UserSchema], List[SR]]
61+
protected def handleSessionExpired(sessionId: SessionId): State[HMap[UserSchema], List[SR]]
62+
63+
/**
64+
* Create snapshot of state (users implement with their chosen library).
6065
*
61-
* Serializes each Map entry using stateCodec.
62-
* Both session and user data are in the same Map (differentiated by key prefix).
66+
* State contains both session data (SessionSchema prefixes) and user data (UserSchema prefixes).
67+
* Users serialize HMap's internal Map[String, Any] using their chosen library.
6368
*
6469
* @param state State to snapshot
6570
* @return Stream of bytes
6671
*/
67-
def takeSnapshot(state: Map[String, Any]): Stream[Nothing, Byte]
72+
def takeSnapshot(state: HMap[CombinedSchema[UserSchema]]): Stream[Nothing, Byte]
6873

6974
/**
70-
* Restore state from snapshot stream.
75+
* Restore state from snapshot stream (users implement).
7176
*
72-
* Deserializes Map entries using stateCodec.
77+
* Users deserialize to Map[String, Any] then wrap in HMap.
7378
*
7479
* @param stream Snapshot byte stream
75-
* @return UIO with restored state
80+
* @return UIO with restored HMap state
7681
*/
77-
def restoreFromSnapshot(stream: Stream[Nothing, Byte]): UIO[Map[String, Any]]
82+
def restoreFromSnapshot(stream: Stream[Nothing, Byte]): UIO[HMap[CombinedSchema[UserSchema]]]
7883

7984
/**
8085
* Determine if snapshot should be taken.
8186
*
82-
* Default policy: every 1000 log entries.
87+
* Default implementation: every 1000 log entries.
88+
* Users can override for custom policy.
8389
*
8490
* @return true if snapshot should be taken
8591
*/
8692
def shouldTakeSnapshot(
8793
lastSnapshotIndex: Index,
8894
lastSnapshotSize: Long,
8995
commitIndex: Index
90-
): Boolean
96+
): Boolean = (commitIndex.value - lastSnapshotIndex.value) > 1000
9197

9298
/**
9399
* Check if there are any pending server requests that need retry.
@@ -98,11 +104,11 @@ class SessionStateMachine[UC <: Command, SR](
98104
*
99105
* Only returns true if there are requests with lastSentAt before the threshold.
100106
*
101-
* @param state The current state map
107+
* @param state The current HMap state
102108
* @param retryIfLastSentBefore Time threshold - only consider requests sent before this time
103109
* @return true if there are any pending server requests needing retry
104110
*/
105-
def hasPendingRequests(state: Map[String, Any], retryIfLastSentBefore: Instant): Boolean
111+
def hasPendingRequests(state: HMap[CombinedSchema[UserSchema]], retryIfLastSentBefore: Instant): Boolean
106112
```
107113

108114
---
@@ -432,53 +438,58 @@ object SessionStateMachineSpec extends ZIOSpecDefault:
432438

433439
## Integration Examples
434440

435-
### With User State Machine
441+
### Extending SessionStateMachine (Template Pattern)
436442

437443
```scala
438-
// Step 1: User creates their state machine
439-
class CounterStateMachine extends UserStateMachine[Int, CounterCommand, CounterResponse]:
440-
def emptyState: Int = 0
444+
// Step 1: Define schema
445+
type KVSchema = ("kv", Map[String, String]) *: EmptyTuple
446+
447+
// Step 2: Extend SessionStateMachine
448+
class KVStateMachine extends SessionStateMachine[KVCommand, ServerReq, KVSchema]:
441449

442-
def apply(command: CounterCommand): State[Int, CounterResponse] =
443-
State.modify { counter =>
444-
command match
445-
case CounterCommand.Increment(by) =>
446-
(counter + by, CounterResponse.Success(counter + by))
447-
case CounterCommand.GetValue =>
448-
(counter, CounterResponse.Success(counter))
450+
// Step 3: Implement 3 abstract methods
451+
protected def applyCommand(cmd: KVCommand): State[HMap[KVSchema], (cmd.Response, List[ServerReq])] =
452+
State.modify { state =>
453+
cmd match
454+
case Set(k, v) =>
455+
val kvMap = state.get["kv"]("store").getOrElse(Map.empty)
456+
val newState = state.updated["kv"]("store", kvMap.updated(k, v))
457+
(newState, ((), Nil))
458+
case Get(k) =>
459+
val kvMap = state.get["kv"]("store").getOrElse(Map.empty)
460+
(state, (kvMap.getOrElse(k, ""), Nil))
449461
}
450462

451-
def stateCodec: Codec[Int] = int32
452-
def commandCodec: Codec[CounterCommand] = CounterCommand.codec
453-
def responseCodec: Codec[CounterResponse] = CounterResponse.codec
454-
455-
// Step 2: Pass to SessionStateMachine constructor
456-
val counterSM = new CounterStateMachine()
457-
val sessionSM = new SessionStateMachine(counterSM)
463+
protected def handleSessionCreated(...) = State.succeed(Nil)
464+
protected def handleSessionExpired(...) = State.succeed(Nil)
465+
466+
// Step 4: Implement serialization
467+
def takeSnapshot(state: HMap[CombinedSchema[KVSchema]]): Stream[Nothing, Byte] = ???
468+
def restoreFromSnapshot(stream: Stream[Nothing, Byte]): UIO[HMap[...]] = ???
458469

459-
// sessionSM now extends zio.raft.StateMachine and can be used with Raft
470+
// Step 5: Use it
471+
val kvSM = new KVStateMachine() // Ready for Raft
460472

461-
// Step 3: Use in application
473+
// Step 6: Use in application
462474
for
463475
currentState <- stateRef.get
464476

465-
// Apply session command
466-
(newState, response) = sessionSM.apply(
467-
SessionCommand.ClientRequest(sessionId, requestId, payload)
477+
// Apply session command (template method handles everything)
478+
(newState, response) = kvSM.apply(
479+
SessionCommand.ClientRequest(sessionId, requestId, command)
468480
).run(currentState)
469481

470482
_ <- stateRef.set(newState)
471-
472-
// Send response to client
473483
_ <- sendToClient(response)
474484
yield ()
475485
```
476486

477487
**Key Points**:
478-
- ✅ SessionStateMachine takes userSM in constructor
479-
- ✅ Manages CombinedState[UserState] internally
480-
- ✅ Handles idempotency, caching, snapshots automatically
481-
- ✅ User just implements simple UserStateMachine trait
488+
- ✅ Users extend SessionStateMachine (template pattern)
489+
- ✅ Implement 3 methods + serialization
490+
- ✅ Base class manages HMap[CombinedSchema[UserSchema]] internally
491+
- ✅ Handles idempotency, caching, state narrowing/merging automatically
492+
- ✅ Type-safe state access with compile-time checking
482493

483494
---
484495

0 commit comments

Comments
 (0)