|
| 1 | +package zio.raft.sessionstatemachine |
| 2 | + |
| 3 | +import zio.test.* |
| 4 | +import zio.test.Assertion.* |
| 5 | +import zio.{UIO, ZIO} |
| 6 | +import zio.raft.{Command, HMap, Index} |
| 7 | +import zio.raft.protocol.{SessionId, RequestId} |
| 8 | +import zio.stream.{Stream, ZStream} |
| 9 | +import java.time.Instant |
| 10 | + |
| 11 | +/** |
| 12 | + * Contract test for idempotency checking (PC-1). |
| 13 | + * |
| 14 | + * Tests that duplicate requests return cached responses without calling applyCommand. |
| 15 | + */ |
| 16 | +object IdempotencySpec extends ZIOSpecDefault: |
| 17 | + |
| 18 | + sealed trait TestCommand extends Command |
| 19 | + object TestCommand: |
| 20 | + case class Increment(by: Int) extends TestCommand: |
| 21 | + type Response = Int |
| 22 | + |
| 23 | + import zio.prelude.Newtype |
| 24 | + object CounterKey extends Newtype[String] |
| 25 | + type CounterKey = CounterKey.Type |
| 26 | + given HMap.KeyLike[CounterKey] = HMap.KeyLike.forNewtype(CounterKey) |
| 27 | + |
| 28 | + type TestSchema = ("counter", CounterKey, Int) *: EmptyTuple |
| 29 | + type CombinedSchema = Tuple.Concat[SessionSchema, TestSchema] |
| 30 | + |
| 31 | + val counterKey = CounterKey("value") |
| 32 | + |
| 33 | + // SR = String (the actual server request payload type) |
| 34 | + class TestStateMachine extends SessionStateMachine[TestCommand, String, TestSchema]: |
| 35 | + var callCount = 0 // Track how many times applyCommand is called |
| 36 | + |
| 37 | + protected def applyCommand(cmd: TestCommand, createdAt: Instant): StateWriter[HMap[CombinedSchema], ServerRequestForSession[String], cmd.Response] = |
| 38 | + callCount += 1 |
| 39 | + cmd match |
| 40 | + case TestCommand.Increment(by) => |
| 41 | + for { |
| 42 | + state <- StateWriter.get[HMap[CombinedSchema]] |
| 43 | + current = state.get["counter"](counterKey).getOrElse(0) |
| 44 | + newValue = current + by |
| 45 | + newState = state.updated["counter"](counterKey, newValue) |
| 46 | + _ <- StateWriter.set(newState) |
| 47 | + } yield newValue.asInstanceOf[cmd.Response] |
| 48 | + |
| 49 | + protected def handleSessionCreated(sid: SessionId, caps: Map[String, String], createdAt: Instant): StateWriter[HMap[CombinedSchema], ServerRequestForSession[String], Unit] = |
| 50 | + StateWriter.succeed(()) |
| 51 | + |
| 52 | + protected def handleSessionExpired(sid: SessionId, capabilities: Map[String, String], createdAt: Instant): StateWriter[HMap[CombinedSchema], ServerRequestForSession[String], Unit] = |
| 53 | + StateWriter.succeed(()) |
| 54 | + |
| 55 | + def takeSnapshot(state: HMap[CombinedSchema]): Stream[Nothing, Byte] = |
| 56 | + ZStream.empty |
| 57 | + |
| 58 | + def restoreFromSnapshot(stream: Stream[Nothing, Byte]): UIO[HMap[CombinedSchema]] = |
| 59 | + ZIO.succeed(HMap.empty) |
| 60 | + |
| 61 | + def shouldTakeSnapshot(lastSnapshotIndex: Index, lastSnapshotSize: Long, commitIndex: Index): Boolean = |
| 62 | + false |
| 63 | + |
| 64 | + def spec = suite("Idempotency with Composite Keys")( |
| 65 | + |
| 66 | + test("PC-1: First request calls applyCommand, second request returns cached without calling") { |
| 67 | + val sm = new TestStateMachine() |
| 68 | + val state0 = HMap.empty[sm.Schema] |
| 69 | + val now = Instant.now() |
| 70 | + val sessionId = SessionId("s1") |
| 71 | + |
| 72 | + // Create session first (cast to match state machine type) |
| 73 | + val createCmd: SessionCommand[TestCommand, String] = |
| 74 | + SessionCommand.CreateSession[String](now, sessionId, Map.empty) |
| 75 | + .asInstanceOf[SessionCommand[TestCommand, String]] |
| 76 | + val (state1, _) = sm.apply(createCmd).run(state0) |
| 77 | + |
| 78 | + // First request - should call applyCommand |
| 79 | + val cmd1: SessionCommand[TestCommand, String] = |
| 80 | + SessionCommand.ClientRequest(now, sessionId, RequestId(1), RequestId(1), TestCommand.Increment(10)) |
| 81 | + val (state2, result1) = sm.apply(cmd1).run(state1) |
| 82 | + val response1 = result1.asInstanceOf[(Int, List[Any])]._1 |
| 83 | + |
| 84 | + assertTrue(sm.callCount == 1) && |
| 85 | + assertTrue(response1 == 10) |
| 86 | + |
| 87 | + // Second request with same ID - should NOT call applyCommand |
| 88 | + val cmd2: SessionCommand[TestCommand, String] = |
| 89 | + SessionCommand.ClientRequest(now, sessionId, RequestId(1), RequestId(1), TestCommand.Increment(999)) |
| 90 | + val (state3, result2) = sm.apply(cmd2).run(state2) |
| 91 | + val response2 = result2.asInstanceOf[(Int, List[Any])]._1 |
| 92 | + |
| 93 | + assertTrue( |
| 94 | + sm.callCount == 1 && // Still 1, not called again! |
| 95 | + response2 == 10 // Cached response, not 999! |
| 96 | + ) |
| 97 | + }, |
| 98 | + |
| 99 | + test("Different requestIds call applyCommand separately") { |
| 100 | + val sm = new TestStateMachine() |
| 101 | + val state0 = HMap.empty[sm.Schema] |
| 102 | + val now = Instant.now() |
| 103 | + val sessionId = SessionId("s1") |
| 104 | + |
| 105 | + val createCmd: SessionCommand[TestCommand, String] = |
| 106 | + SessionCommand.CreateSession[String](now, sessionId, Map.empty) |
| 107 | + .asInstanceOf[SessionCommand[TestCommand, String]] |
| 108 | + val (state1, _) = sm.apply(createCmd).run(state0) |
| 109 | + |
| 110 | + // First request |
| 111 | + val cmd1: SessionCommand[TestCommand, String] = |
| 112 | + SessionCommand.ClientRequest(now, sessionId, RequestId(1), RequestId(1), TestCommand.Increment(5)) |
| 113 | + val (state2, result1) = sm.apply(cmd1).run(state1) |
| 114 | + val response1 = result1.asInstanceOf[(Int, List[Any])]._1 |
| 115 | + |
| 116 | + // Second request with DIFFERENT ID |
| 117 | + val cmd2: SessionCommand[TestCommand, String] = |
| 118 | + SessionCommand.ClientRequest(now, sessionId, RequestId(2), RequestId(1), TestCommand.Increment(3)) |
| 119 | + val (state3, result2) = sm.apply(cmd2).run(state2) |
| 120 | + val response2 = result2.asInstanceOf[(Int, List[Any])]._1 |
| 121 | + |
| 122 | + assertTrue( |
| 123 | + sm.callCount == 2 && // Both requests processed |
| 124 | + response1 == 5 && |
| 125 | + response2 == 8 // 5 + 3 |
| 126 | + ) |
| 127 | + } |
| 128 | + ) |
0 commit comments