Skip to content

Commit 6bc61b1

Browse files
committed
lowest sequence number protocol
1 parent 62a833c commit 6bc61b1

File tree

20 files changed

+885
-153
lines changed

20 files changed

+885
-153
lines changed

.specify/templates/plan-template.md

Lines changed: 7 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
→ Update Progress Tracking: Initial Constitution Check
1919
5. Execute Phase 0 → research.md
2020
→ If NEEDS CLARIFICATION remain: ERROR "Resolve unknowns"
21-
6. Execute Phase 1 → contracts, data-model.md, quickstart.md, agent-specific template file (e.g., `CLAUDE.md` for Claude Code, `.github/copilot-instructions.md` for GitHub Copilot, `GEMINI.md` for Gemini CLI, `QWEN.md` for Qwen Code or `AGENTS.md` for opencode).
21+
6. Execute Phase 1 → data-model.md, quickstart.md, agent-specific template file (e.g., `CLAUDE.md` for Claude Code, `.github/copilot-instructions.md` for GitHub Copilot, `GEMINI.md` for Gemini CLI, `QWEN.md` for Qwen Code or `AGENTS.md` for opencode).
2222
7. Re-evaluate Constitution Check section
2323
→ If new violations: Refactor design, return to Phase 1
2424
→ Update Progress Tracking: Post-Design Constitution Check
@@ -66,6 +66,7 @@
6666
- [ ] ZIO primitives used for all concurrent operations
6767
- [ ] ZStream used for streaming, no external streaming libraries
6868
- [ ] Resource management follows ZIO Scope patterns
69+
- [ ] `suiteAll` is used instead of `suite`
6970

7071
### V. Test-Driven Maintenance
7172
- [ ] Bug fixes include reproducing test cases
@@ -81,49 +82,9 @@ specs/[###-feature]/
8182
├── research.md # Phase 0 output (/plan command)
8283
├── data-model.md # Phase 1 output (/plan command)
8384
├── quickstart.md # Phase 1 output (/plan command)
84-
├── contracts/ # Phase 1 output (/plan command)
8585
└── tasks.md # Phase 2 output (/tasks command - NOT created by /plan)
8686
```
8787

88-
### Source Code (repository root)
89-
```
90-
# Option 1: Single project (DEFAULT)
91-
src/
92-
├── models/
93-
├── services/
94-
├── cli/
95-
└── lib/
96-
97-
tests/
98-
├── contract/
99-
├── integration/
100-
└── unit/
101-
102-
# Option 2: Web application (when "frontend" + "backend" detected)
103-
backend/
104-
├── src/
105-
│ ├── models/
106-
│ ├── services/
107-
│ └── api/
108-
└── tests/
109-
110-
frontend/
111-
├── src/
112-
│ ├── components/
113-
│ ├── pages/
114-
│ └── services/
115-
└── tests/
116-
117-
# Option 3: Mobile + API (when "iOS/Android" detected)
118-
api/
119-
└── [same as backend above]
120-
121-
ios/ or android/
122-
└── [platform-specific structure]
123-
```
124-
125-
**Structure Decision**: [DEFAULT to Option 1 unless Technical Context indicates web/mobile app]
126-
12788
## Phase 0: Outline & Research
12889
1. **Extract unknowns from Technical Context** above:
12990
- For each NEEDS CLARIFICATION → research task
@@ -145,29 +106,19 @@ ios/ or android/
145106

146107
**Output**: research.md with all NEEDS CLARIFICATION resolved
147108

148-
## Phase 1: Design & Contracts
109+
## Phase 1: Design
149110
*Prerequisites: research.md complete*
150111

151112
1. **Extract entities from feature spec**`data-model.md`:
152113
- Entity name, fields, relationships
153114
- Validation rules from requirements
154115
- State transitions if applicable
155116

156-
2. **Generate API contracts** from functional requirements:
157-
- For each user action → endpoint
158-
- Use standard REST/GraphQL patterns
159-
- Output OpenAPI/GraphQL schema to `/contracts/`
160-
161-
3. **Generate contract tests** from contracts:
162-
- One test file per endpoint
163-
- Assert request/response schemas
164-
- Tests must fail (no implementation yet)
165-
166-
4. **Extract test scenarios** from user stories:
117+
2. **Extract test scenarios** from user stories:
167118
- Each story → integration test scenario
168119
- Quickstart test = story validation steps
169120

170-
5. **Update agent file incrementally** (O(1) operation):
121+
3. **Update agent file incrementally** (O(1) operation):
171122
- Run `.specify/scripts/bash/update-agent-context.sh cursor`
172123
**IMPORTANT**: Execute it exactly as specified above. Do not add or remove any arguments.
173124
- If exists: Add only NEW tech from current plan
@@ -176,15 +127,14 @@ ios/ or android/
176127
- Keep under 150 lines for token efficiency
177128
- Output to repository root
178129

179-
**Output**: data-model.md, /contracts/*, failing tests, quickstart.md, agent-specific file
130+
**Output**: data-model.md, failing tests, quickstart.md, agent-specific file
180131

181132
## Phase 2: Task Planning Approach
182133
*This section describes what the /tasks command will do - DO NOT execute during /plan*
183134

184135
**Task Generation Strategy**:
185136
- Load `.specify/templates/tasks-template.md` as base
186-
- Generate tasks from Phase 1 design docs (contracts, data model, quickstart)
187-
- Each contract → contract test task [P]
137+
- Generate tasks from Phase 1 design docs (data model, quickstart)
188138
- Each entity → model creation task [P]
189139
- Each user story → integration test task
190140
- Implementation tasks to make tests pass

.specify/templates/tasks-template.md

Lines changed: 23 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
→ Extract: tech stack, libraries, structure
1111
2. Load optional design documents:
1212
→ data-model.md: Extract entities → model tasks
13-
→ contracts/: Each file → contract test task
1413
→ research.md: Extract decisions → setup tasks
1514
3. Generate tasks by category:
1615
→ Setup: project init, dependencies, linting
@@ -49,38 +48,37 @@
4948

5049
## Phase 3.2: Tests First (TDD) ⚠️ MUST COMPLETE BEFORE 3.3
5150
**CRITICAL: These tests MUST be written and MUST FAIL before ANY implementation**
52-
- [ ] T004 [P] Contract test for new functionality using ZIO Test framework
53-
- [ ] T005 [P] Property-based tests for complex logic using ZIO Test Gen
54-
- [ ] T006 [P] Integration tests for external interactions (RPC, storage)
55-
- [ ] T007 [P] Performance benchmark tests for critical paths
51+
- [ ] T004 [P] Property-based tests for complex logic using ZIO Test Gen
52+
- [ ] T005 [P] Integration tests for external interactions (RPC, storage)
53+
- [ ] T006 [P] Performance benchmark tests for critical paths
5654

5755
## Phase 3.3: Core Implementation (ONLY after tests are failing)
58-
- [ ] T008 [P] Immutable data models with proper type safety
59-
- [ ] T009 [P] Service layer using ZIO effects and explicit error types
60-
- [ ] T010 [P] Extend existing abstractions (StateMachine, RPC, LogStore) without breaking changes
61-
- [ ] T011 Implement functionality using ZIO ecosystem (ZStream, ZLayer, etc.)
62-
- [ ] T012 Add explicit error handling for all external interactions
63-
- [ ] T013 Input validation using ZIO Prelude or similar type-safe validation
64-
- [ ] T014 Resource management using ZIO Scope patterns
56+
- [ ] T007 [P] Immutable data models with proper type safety
57+
- [ ] T008 [P] Service layer using ZIO effects and explicit error types
58+
- [ ] T009 [P] Extend existing abstractions (StateMachine, RPC, LogStore) without breaking changes
59+
- [ ] T010 Implement functionality using ZIO ecosystem (ZStream, ZLayer, etc.)
60+
- [ ] T011 Add explicit error handling for all external interactions
61+
- [ ] T012 Input validation using ZIO Prelude or similar type-safe validation
62+
- [ ] T013 Resource management using ZIO Scope patterns
6563

6664
## Phase 3.4: Integration
67-
- [ ] T015 Integrate with existing storage abstractions using ZIO patterns
68-
- [ ] T016 Add distributed system error handling (timeouts, retries)
69-
- [ ] T017 Implement observability using ZIO logging and metrics
70-
- [ ] T018 Ensure backward compatibility and performance preservation
65+
- [ ] T014 Integrate with existing storage abstractions using ZIO patterns
66+
- [ ] T015 Add distributed system error handling (timeouts, retries)
67+
- [ ] T016 Implement observability using ZIO logging and metrics
68+
- [ ] T017 Ensure backward compatibility and performance preservation
7169

7270
## Phase 3.5: Polish
73-
- [ ] T019 [P] Unit tests covering edge cases and error scenarios
74-
- [ ] T020 Performance validation and benchmark comparison with baseline
75-
- [ ] T021 [P] Update documentation and API specifications
76-
- [ ] T022 Code review for ZIO ecosystem consistency and functional purity
77-
- [ ] T023 Constitution compliance verification checklist
71+
- [ ] T018 [P] Unit tests covering edge cases and error scenarios
72+
- [ ] T019 Performance validation and benchmark comparison with baseline
73+
- [ ] T020 [P] Update documentation and API specifications
74+
- [ ] T021 Code review for ZIO ecosystem consistency and functional purity
75+
- [ ] T022 Constitution compliance verification checklist
7876

7977
## Dependencies
80-
- Tests (T004-T007) before implementation (T008-T014)
81-
- T008 blocks T009, T015
82-
- T016 blocks T018
83-
- Implementation before polish (T019-T023)
78+
- Tests (T004-T006) before implementation (T008-T014)
79+
- T007 blocks T008, T014
80+
- T015 blocks T017
81+
- Implementation before polish (T018-T022)
8482

8583
## Parallel Example
8684
```

client-server-client/src/main/scala/zio/raft/client/PendingRequests.scala

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package zio.raft.client
22

33
import zio.*
44
import zio.raft.protocol.*
5+
import scala.math.Ordering
56
import scodec.bits.ByteVector
67
import java.time.Instant
78

@@ -10,6 +11,10 @@ import java.time.Instant
1011
case class PendingRequests(
1112
requests: Map[RequestId, PendingRequests.PendingRequestData]
1213
) {
14+
def contains(requestId: RequestId): Boolean = requests.contains(requestId)
15+
def lowestPendingRequestIdOr(default: RequestId): RequestId =
16+
if (requests.isEmpty) default else requests.keys.min(using Ordering.by[RequestId, Long](_.value))
17+
1318
def add(
1419
requestId: RequestId,
1520
payload: ByteVector,
@@ -33,7 +38,8 @@ case class PendingRequests(
3338
ZIO.foldLeft(requests.toList)(this) { case (pending, (requestId, data)) =>
3439
for {
3540
now <- Clock.instant
36-
request = ClientRequest(requestId, data.payload, now)
41+
lowestPendingRequestId = pending.lowestPendingRequestIdOr(requestId)
42+
request = ClientRequest(requestId, lowestPendingRequestId, data.payload, now)
3743
_ <- transport.sendMessage(request).orDie
3844
_ <- ZIO.logDebug(s"Resending pending request: $requestId")
3945
updatedData = data.copy(lastSentAt = now)
@@ -47,7 +53,8 @@ case class PendingRequests(
4753
ZIO.foldLeft(requests.toList)(this) { case (pending, (requestId, data)) =>
4854
val elapsed = Duration.fromInterval(data.lastSentAt, currentTime)
4955
if (elapsed > timeout) {
50-
val request = ClientRequest(requestId, data.payload, currentTime)
56+
val lowestPendingRequestId = pending.lowestPendingRequestIdOr(requestId)
57+
val request = ClientRequest(requestId, lowestPendingRequestId, data.payload, currentTime)
5158
for {
5259
_ <- transport.sendMessage(request).orDie
5360
_ <- ZIO.logDebug(s"Resending timed out request: $requestId")
@@ -58,6 +65,12 @@ case class PendingRequests(
5865
}
5966
}
6067
}
68+
69+
def die(requestId: RequestId, error: Throwable): UIO[PendingRequests] =
70+
requests.get(requestId) match {
71+
case Some(data) => data.promise.die(error).ignore.as(copy(requests = requests.removed(requestId)))
72+
case None => ZIO.succeed(this)
73+
}
6174
}
6275

6376
object PendingRequests {

client-server-client/src/main/scala/zio/raft/client/RaftClient.scala

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -268,6 +268,9 @@ object RaftClient {
268268
case StreamEvent.ServerMsg(ClientResponse(_, _)) =>
269269
ZIO.logWarning("Received ClientResponse while connecting, ignoring").as(this)
270270

271+
case StreamEvent.ServerMsg(_: RequestError) =>
272+
ZIO.logWarning("Received RequestError while connecting, ignoring").as(this)
273+
271274
case StreamEvent.ServerMsg(KeepAliveResponse(_)) =>
272275
ZIO.succeed(this)
273276

@@ -405,6 +408,9 @@ object RaftClient {
405408
case StreamEvent.ServerMsg(KeepAliveResponse(_)) =>
406409
ZIO.succeed(this)
407410

411+
case StreamEvent.ServerMsg(_: RequestError) =>
412+
ZIO.logWarning("Received RequestError while connecting existing session, ignoring").as(this)
413+
408414
case StreamEvent.ServerMsg(_: ServerRequest) =>
409415
ZIO.logWarning("Received ServerRequest while connecting, ignoring").as(this)
410416

@@ -490,14 +496,23 @@ object RaftClient {
490496
for {
491497
requestId <- nextRequestId.next
492498
now <- Clock.instant
493-
request = ClientRequest(requestId, payload, now)
499+
lowestPendingRequestId = pendingRequests.lowestPendingRequestIdOr(requestId)
500+
request = ClientRequest(requestId, lowestPendingRequestId, payload, now)
494501
_ <- transport.sendMessage(request).orDie
495502
newPending = pendingRequests.add(requestId, payload, promise, now)
496503
} yield copy(pendingRequests = newPending)
497504

498505
case StreamEvent.ServerMsg(ClientResponse(requestId, result)) =>
499506
pendingRequests.complete(requestId, result).map(newPending => copy(pendingRequests = newPending))
500507

508+
case StreamEvent.ServerMsg(RequestError(requestId, RequestErrorReason.ResponseEvicted)) =>
509+
if (pendingRequests.contains(requestId))
510+
ZIO.logError(s"RequestError: ResponseEvicted for request $requestId, terminating client") *>
511+
pendingRequests.die(requestId, new RuntimeException("ResponseEvicted")) *>
512+
ZIO.dieMessage("ResponseEvicted")
513+
else
514+
ZIO.logWarning(s"RequestError for non-pending request $requestId, ignoring").as(this)
515+
501516
case StreamEvent.ServerMsg(KeepAliveResponse(_)) =>
502517
ZIO.succeed(this)
503518

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
package zio.raft.client
2+
3+
import _root_.zio.*
4+
import _root_.zio.test.*
5+
import _root_.zio.test.Assertion.*
6+
import _root_.zio.raft.protocol.*
7+
import scodec.bits.ByteVector
8+
import java.time.Instant
9+
10+
object PendingRequestsSpec extends ZIOSpecDefault {
11+
12+
private class FakeTransport(ref: Ref[List[ClientMessage]]) extends ClientTransport {
13+
override def connect(address: String): ZIO[Any, Throwable, Unit] = ZIO.unit
14+
override def disconnect(): ZIO[Any, Throwable, Unit] = ZIO.unit
15+
override def sendMessage(message: ClientMessage): ZIO[Any, Throwable, Unit] = ref.update(message :: _).unit
16+
override def incomingMessages: _root_.zio.stream.ZStream[Any, Throwable, ServerMessage] =
17+
_root_.zio.stream.ZStream.empty
18+
}
19+
20+
def spec = suiteAll("PendingRequests") {
21+
22+
test("lowestPendingRequestIdOr returns min or default") {
23+
val rid1 = RequestId.fromLong(5L)
24+
val rid2 = RequestId.fromLong(2L)
25+
val rid3 = RequestId.fromLong(9L)
26+
for {
27+
p <- Promise.make[Throwable, ByteVector]
28+
now <- Clock.instant
29+
pending = PendingRequests.empty
30+
.add(rid1, ByteVector.empty, p, now)
31+
.add(rid2, ByteVector.empty, p, now)
32+
.add(rid3, ByteVector.empty, p, now)
33+
} yield assertTrue(pending.lowestPendingRequestIdOr(rid1) == rid2)
34+
}
35+
36+
test("resendAll includes lowestPendingRequestId = min(pending)") {
37+
val rid1 = RequestId.fromLong(1L)
38+
val rid2 = RequestId.fromLong(3L)
39+
for {
40+
sentRef <- Ref.make(List.empty[ClientMessage])
41+
transport = new FakeTransport(sentRef)
42+
p <- Promise.make[Throwable, ByteVector]
43+
now <- Clock.instant
44+
pending = PendingRequests.empty
45+
.add(rid2, ByteVector.fromValidHex("02"), p, now)
46+
.add(rid1, ByteVector.fromValidHex("01"), p, now)
47+
_ <- pending.resendAll(transport)
48+
sent <- sentRef.get
49+
msgs = sent.collect { case m: ClientRequest => m }
50+
} yield assertTrue(msgs.nonEmpty) && assertTrue(msgs.forall(_.lowestPendingRequestId == rid1))
51+
}
52+
53+
test("resendExpired includes lowestPendingRequestId = min(pending)") {
54+
val rid1 = RequestId.fromLong(2L)
55+
val rid2 = RequestId.fromLong(4L)
56+
for {
57+
sentRef <- Ref.make(List.empty[ClientMessage])
58+
transport = new FakeTransport(sentRef)
59+
p <- Promise.make[Throwable, ByteVector]
60+
createdAt = Instant.parse("2023-01-01T00:00:00Z")
61+
pending = PendingRequests(Map(
62+
rid1 -> PendingRequests.PendingRequestData(ByteVector.fromValidHex("aa"), p, createdAt, createdAt),
63+
rid2 -> PendingRequests.PendingRequestData(ByteVector.fromValidHex("bb"), p, createdAt, createdAt)
64+
))
65+
current <- ZIO.succeed(Instant.parse("2023-01-01T00:10:00Z"))
66+
_ <- pending.resendExpired(transport, current, 1.minute)
67+
sent <- sentRef.get
68+
msgs = sent.collect { case m: ClientRequest => m }
69+
} yield assertTrue(msgs.nonEmpty) && assertTrue(msgs.forall(_.lowestPendingRequestId == rid1))
70+
}
71+
72+
test("die removes request and completes promise with death") {
73+
val rid = RequestId.fromLong(7L)
74+
for {
75+
p <- Promise.make[Throwable, ByteVector]
76+
now <- Clock.instant
77+
pending0 = PendingRequests.empty.add(rid, ByteVector.fromValidHex("aa"), p, now)
78+
pending1 <- pending0.die(rid, new RuntimeException("boom"))
79+
fiber <- p.await.fork
80+
exit <- fiber.await
81+
} yield assertTrue(!pending1.contains(rid)) && assertTrue(exit.isFailure)
82+
}
83+
84+
test("ignore non-pending die invocation leaves state unchanged") {
85+
val rid = RequestId.fromLong(7L)
86+
for {
87+
pending <- ZIO.succeed(PendingRequests.empty)
88+
pending1 <- pending.die(rid, new RuntimeException("boom"))
89+
} yield assertTrue(pending1 == pending)
90+
}
91+
}
92+
}

0 commit comments

Comments
 (0)