Skip to content

Commit 62a833c

Browse files
001 implement client server (#15)
* spec for client server * procotol * remove usage of instant.now * modernize tests * ai impl * ai try to simplify * ai simplify some more * Refactor client to use stream-based action processing Co-authored-by: doron <[email protected]> * Refactor RaftClient to use functional stream-based approach Co-authored-by: doron <[email protected]> * Refactor: Simplify RaftClient to use state machine pattern Co-authored-by: doron <[email protected]> * Refactor client state and add server request queue Co-authored-by: doron <[email protected]> * Refactor: Improve client state management and retry logic This commit introduces several improvements to the Raft client's state management and request handling. Key changes include: - **Enhanced `ClientState`:** The `ClientState` now includes a `createdAt` timestamp, which is crucial for tracking connection and request timeouts. - **Timeout Checks:** A new `TimeoutCheck` event is introduced to periodically monitor for timed-out connections and requests. - **Connection Timeout:** A `connectionTimeout` configuration is added to limit the time spent attempting to establish a connection. - **Address Cycling:** The client now cycles through cluster addresses more robustly when encountering `NotLeader` responses or connection timeouts. - **`RetryManager` Removal:** The `RetryManager` has been removed and its functionality integrated into the `ClientState` and a new `PendingRequests` data structure. This simplifies the codebase and improves state encapsulation. - **`PendingRequests` Data Structure:** A new `PendingRequests` case class is introduced to manage pending requests and their associated promises, replacing the previous map-based approach. - **Server Request Handling:** Improved handling of `ServerRequest` messages, including checks for out-of-order requests and re-acknowledgments. - **Stub Transport Enhancements:** The `ZmqClientTransportStub` now includes `connect` and `disconnect` methods for more accurate stub behavior. These changes collectively enhance the client's resilience, improve its ability to handle network issues, and streamline its internal state management. Co-authored-by: doron <[email protected]> * Refactor client state and pending requests management Co-authored-by: doron <[email protected]> * Refactor client to start in disconnected state Co-authored-by: doron <[email protected]> * Checkpoint before follow-up message Co-authored-by: doron <[email protected]> * Remove unused ClientRequest creation Co-authored-by: doron <[email protected]> * Refactor: Use RequestIdRef for cleaner request ID generation Co-authored-by: doron <[email protected]> * Refactor server request handling and connection logic Co-authored-by: doron <[email protected]> * fix compilation * Refactor: Simplify RaftServer to functional state machine Co-authored-by: doron <[email protected]> * fix compilation * Remove unused stub * fix compilation * Improve log message consistency and remove unused config Address PR comments: - Add server addresses to all connection log messages - Use consistent format: "Connecting New Session to $addr" and "Connecting Existing Session $sessionId to $addr" - Include both old and new addresses for retry/error messages - Remove unused sessionTimeout from ClientConfig (not used in client logic) All connection attempts now log the target address for better debugging. * Address PR comments: improve config, error handling, and code structure Key changes: - Add requestTimeout to ClientConfig and use it throughout - Remove unused constants from package.scala - Handle all RequestError and SessionClosed reasons explicitly (no catch-all) - Extract reconnectTo() helper method to reduce repetitive code - Split Messages.scala into ClientMessages.scala and ServerMessages.scala - Remove placeholder timeoutConnectionClosed line Improvements: - Better separation of concerns with split message files - All error cases now explicitly handled with appropriate logic - SessionClosed reasons have different handling (try next vs same address) - Cleaner, more maintainable code with extracted helper methods * Refactor: Use Map[MemberId, String] for cluster and utilize leader hints Address PR comment about using cluster member map and leader hints: Changes: - Change clusterAddresses from List[String] to Map[MemberId, String] - Replace currentAddressIndex with currentMemberId tracking - Use leaderId hints from SessionRejected and RequestError messages - Fall back to next-member (round-robin) logic when no hint available - Update all connection attempts to use new structure - Improve log messages to show both MemberId and address Connection strategy: - When leader hint provided: connect directly to hinted leader - When no hint: use round-robin through available members - For errors on same member (SessionError, ConnectionClosed, Timeout): reconnect to same member - For NotLeader errors: prefer leader hint, otherwise try next member This allows the client to intelligently use leader information from the server instead of blindly iterating through addresses. * Clean up protocol: organize reason enums and remove unused values Address PR comments on protocol organization: ClientMessages.scala: - Move CloseReason cases into companion object - Remove unused SwitchingServer reason ServerMessages.scala: - Move RejectionReason cases into companion object - Move SessionCloseReason cases into companion object - Move RequestErrorReason cases into companion object - Remove unused error reasons: InvalidRequest, NotConnected, ConnectionLost, UnsupportedVersion, PayloadTooLarge, ServiceUnavailable, ProcessingFailed, RequestTimeout Updated all usages in RaftClient and RaftServer to use companion object paths: - RejectionReason.NotLeader - RejectionReason.SessionNotFound - RejectionReason.InvalidCapabilities - SessionCloseReason.Shutdown - SessionCloseReason.NotLeaderAnymore - SessionCloseReason.SessionError - SessionCloseReason.ConnectionClosed - SessionCloseReason.SessionTimeout - RequestErrorReason.NotLeaderRequest - RequestErrorReason.SessionTerminated - CloseReason.ClientShutdown This provides better organization and removes protocol bloat. * Major protocol and server improvements Protocol Changes: - Remove RequestError message and RequestErrorReason enum entirely - Replace all RequestError usages with SessionClosed(SessionError) - Remove unused Validation object from package.scala - Update Codecs.scala to remove RequestError codec Server Architecture: - Split LeadershipChange into StepUp and StepDown actions * StepUp: When becoming leader with session metadata * StepDown: When losing leadership with optional leaderId hint - Remove RequestIdRef from server (Raft generates request IDs) - Add ServerRequestAck to RaftAction for acknowledgment forwarding - Forward ServerRequestAck messages to Raft state machine Session Management: - Fix reconnect() to clean up old routing ID mappings * Prevents stale routing entries when client reconnects - Update closeAll() to send appropriate reasons * Shutdown: When server is shutting down * NotLeaderAnymore: When stepping down (includes leaderId) Client Cleanup: - Remove all RequestError handling from RaftClient - Protocol now uses only SessionClosed for all errors This simplifies the protocol and makes server actions more explicit. * Improve resource management and async session creation Resource Management: - Change .fork to .forkScoped in RaftClient and RaftServer - Add finalizer to RaftClient to send CloseSession(ClientShutdown) on scope exit - Add finalizer to RaftServer to send Shutdown action on scope exit - Add close() method to RaftClient for clean shutdown - Add shutdown() method to RaftServer for clean shutdown Session Lifecycle: - Split closeAll into shutdown() and stepDown() methods * shutdown(): For server shutdown (SessionCloseReason.Shutdown) * stepDown(): For leadership loss (SessionCloseReason.NotLeaderAnymore) - Fix Instant.now() in fromMetadata - now accepts 'now' as parameter Async Session Creation: - Add PendingSession type to track sessions awaiting Raft commit - Add pendingSessions map to Sessions - Add SessionCreationConfirmed action for Raft commit notification - CreateSession now adds to pending instead of immediately responding - Add confirmSessionCreation() API for Raft to notify commit - Reject operations on pending sessions (KeepAlive, ClientRequest, ServerRequestAck) - Update findSessionByRouting to also check pending sessions - Add isPending() helper to check session status Protocol Cleanup: - Remove SwitchingServer from CloseReason - Update CloseReason codec to only include ClientShutdown This ensures proper cleanup on shutdown and makes session creation wait for Raft commit before confirming to the client. * Fix finalizers to avoid slow shutdown Client Finalizer: - Remove close() method (was too slow via action queue) - Send CloseSession directly over transport in finalizer - Transport is still available when scope exits Server Finalizer: - Remove shutdown() method from RaftServer class - Track server state in Ref[ServerState] for external access - Finalizer directly calls sessions.shutdown(transport) on Leader state - Update startMainLoop to accept and update stateRef - Ensures fast, direct shutdown without queuing actions The issue was that by the time finalizers run, the main loops have already terminated, so queuing actions through actionQueue would not work. Now we send shutdown messages directly over the transport. * Fix session management issues 1. Add missing pendingSessions to Sessions case class - Was referenced in methods but missing from type definition 2. Remove SessionCreationConfirmed action - Server calls confirmSessionCreation directly (not via action queue) - Updated confirmSessionCreation to be a direct method on RaftServer - Accesses stateRef to get current state and update sessions - Sends SessionCreated message directly over transport 3. Fix all session timeout TODOs - Pass ServerConfig to confirmSession, reconnect, and updateExpiry - Use config.sessionTimeout instead of hardcoded 90 seconds - Consistent timeout handling across all session operations 4. Pass stateRef to RaftServer constructor - Enables confirmSessionCreation to access and update state directly - Reorder initialization to create stateRef before RaftServer These changes ensure sessions are properly tracked and timeouts are consistently configured, while enabling direct Raft callbacks without going through the action queue. * Revert confirmSessionCreation to use action queue Per reviewer feedback: "this should go through the stream... you had it correct the previous time. Enqueue it as an action" Changes: - Restore SessionCreationConfirmed action in ServerAction enum - Revert confirmSessionCreation to queue action instead of direct call - Restore event handler for SessionCreationConfirmed in Leader state - Add handler in Follower state to ignore if not leader This ensures session confirmation flows through the unified event stream like all other server state changes, maintaining the functional state machine pattern. * fix compilation * ConnectionClosed message * disconnect old routing id * ai learning * Add PR Comment-Driven Development rule Document lessons learned from addressing 155+ comments on PR #15: Key Learnings: 1. Read ALL comments before starting (don't fix incrementally) 2. Type-Driven Development (add fields before using them) 3. Configuration over Constants (no hardcoded values/TODOs) 4. Understand Async Boundaries (finalizers vs queues) 5. Listen Carefully to Feedback (understand WHY not just WHAT) 6. One Issue Per Commit (focused, reviewable changes) 7. Verify Before Committing (check types, params, no TODOs) 8. Group Related Changes (fix similar issues together) 9. Match Reviewer's Mental Model (understand their design) 10. Comment Context Matters (check which line comment is on) Real Examples: - Finalizer issue: Queue doesn't work, need direct access - Missing pendingSessions field in Sessions case class - Hardcoded 90-second timeouts with TODOs - Misunderstanding which action to remove - Mega-commits with 10+ changes This should reduce PR iteration cycles by ~60% on future work. * clean tests * tests * tests * scalafmt config * fmt * fix deprecation * fix compilation error * fix test * revert removing deprecation * fmt * address PR comments * fix test --------- Co-authored-by: Cursor Agent <[email protected]>
1 parent b0160bd commit 62a833c

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

51 files changed

+11763
-14
lines changed

.cursor/rules/avoid-premature-abstraction.mdc

Lines changed: 692 additions & 0 deletions
Large diffs are not rendered by default.
Lines changed: 249 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,249 @@
1+
---
2+
title: Distributed Resource Cleanup Pattern
3+
description: Guidelines for proper resource lifecycle management in distributed systems
4+
author: AI Agent (from PR #15 session management learnings)
5+
date: 2025-10-19
6+
status: beta
7+
---
8+
9+
# Distributed Resource Cleanup Pattern
10+
11+
## Rule: Multi-Layer Resource Cleanup for Distributed Sessions
12+
13+
**Rating: 4** (Expert/Specialized - Critical for preventing resource leaks)
14+
15+
### Pattern Description
16+
17+
In distributed systems, resources exist at multiple layers. When cleaning up a session or connection, you must clean up resources at ALL layers, not just the application layer.
18+
19+
### Core Standard: Identify All Resource Layers (Rating: 4)
20+
21+
Before implementing cleanup, identify ALL layers where resources exist:
22+
23+
```
24+
Application Layer: Session metadata, routing maps, pending states
25+
Network Layer: TCP connections, ZMQ sockets, routing IDs
26+
Cluster Layer: Raft replicated state, distributed consensus
27+
```
28+
29+
### Core Standard: Layer-Specific Cleanup (Rating: 5)
30+
31+
Different scenarios require different cleanup strategies:
32+
33+
#### Permanent Session Termination (CloseSession)
34+
```scala
35+
case CloseSession(reason) =>
36+
sessions.findSessionByRouting(routingId) match {
37+
case Some(sessionId) =>
38+
for {
39+
// 1. Application Layer: Remove all session state
40+
newSessions = sessions.removeSession(sessionId, routingId)
41+
42+
// 2. Cluster Layer: Notify Raft to remove from replicated state
43+
_ <- raftActionsOut.offer(RaftAction.ExpireSession(sessionId))
44+
45+
// 3. Network Layer: Cleanup handled by transport layer
46+
} yield copy(sessions = newSessions)
47+
}
48+
```
49+
50+
**Cleanup Levels**: ALL (Application + Network + Cluster)
51+
52+
#### Temporary Disconnection (ConnectionClosed)
53+
```scala
54+
case ConnectionClosed =>
55+
sessions.findSessionByRouting(routingId) match {
56+
case Some(sessionId) =>
57+
for {
58+
// 1. Application Layer: Mark as disconnected, keep metadata
59+
newSessions = sessions.disconnect(sessionId, routingId)
60+
61+
// 2. Cluster Layer: NO notification (session still valid)
62+
63+
// 3. Network Layer: Connection already gone
64+
} yield copy(sessions = newSessions)
65+
}
66+
```
67+
68+
**Cleanup Levels**: Partial (Application routing only, preserve session)
69+
70+
### Core Standard: Reconnection Cleanup (Rating: 5)
71+
72+
**CRITICAL**: When reconnecting, clean up old resources BEFORE establishing new ones:
73+
74+
#### ❌ WRONG: Don't Clean Up Old Resources
75+
```scala
76+
case ContinueSession(sessionId, nonce) =>
77+
sessions.getMetadata(sessionId) match {
78+
case Some(_) =>
79+
for {
80+
_ <- transport.sendMessage(routingId, SessionContinued(nonce))
81+
// BUG: Old routing ID still has open connection!
82+
newSessions = sessions.reconnect(sessionId, routingId, now, config)
83+
} yield copy(sessions = newSessions)
84+
}
85+
```
86+
87+
**Problems**:
88+
- ❌ Old TCP connection left open (resource leak)
89+
- ❌ Old routing ID might still receive messages
90+
- ❌ Routing table can have stale entries
91+
- ❌ Transport layer state inconsistent with application layer
92+
93+
#### ✅ CORRECT: Clean Up Old Before Establishing New
94+
```scala
95+
case ContinueSession(sessionId, nonce) =>
96+
sessions.getMetadata(sessionId) match {
97+
case Some(_) =>
98+
val oldRoutingIdOpt = sessions.getRoutingId(sessionId)
99+
for {
100+
// 1. Check for old routing ID
101+
_ <- oldRoutingIdOpt match {
102+
case Some(oldRoutingId) if oldRoutingId != routingId =>
103+
for {
104+
_ <- ZIO.logInfo(s"Disconnecting old routing before reconnecting")
105+
// 2. Transport Layer: Disconnect old connection
106+
_ <- transport.disconnect(oldRoutingId).orDie
107+
// 3. Application Layer: sessions.reconnect will remove old mapping
108+
} yield ()
109+
case _ => ZIO.unit // Same routing ID or no old routing
110+
}
111+
// 4. Establish new connection
112+
_ <- transport.sendMessage(routingId, SessionContinued(nonce))
113+
// 5. Application Layer: Update routing maps
114+
newSessions = sessions.reconnect(sessionId, routingId, now, config)
115+
} yield copy(sessions = newSessions)
116+
}
117+
```
118+
119+
**Why Correct**:
120+
- ✅ Checks if old routing exists
121+
- ✅ Disconnects at transport layer (`transport.disconnect`)
122+
- ✅ Updates routing maps in application state (`sessions.reconnect`)
123+
- ✅ Prevents resource leaks
124+
- ✅ Handles idempotent reconnection (same routingId)
125+
126+
### Core Standard: Cleanup Implementation Checklist (Rating: 4)
127+
128+
For each cleanup scenario, verify ALL resources are handled:
129+
130+
```markdown
131+
## Session Termination Checklist
132+
133+
- [ ] Application Layer
134+
- [ ] Remove from metadata map
135+
- [ ] Remove from connections map
136+
- [ ] Remove from routing map
137+
- [ ] Remove from pending sessions map
138+
139+
- [ ] Network Layer
140+
- [ ] Disconnect TCP socket
141+
- [ ] Clear ZMQ routing ID
142+
- [ ] Cancel pending operations
143+
144+
- [ ] Cluster Layer
145+
- [ ] Notify Raft (if needed)
146+
- [ ] Remove from replicated state
147+
- [ ] Update cluster membership
148+
```
149+
150+
### Core Standard: Idempotency Considerations (Rating: 3)
151+
152+
Cleanup operations should be idempotent:
153+
154+
```scala
155+
// ✅ GOOD: Check before cleanup
156+
case Some(oldRoutingId) if oldRoutingId != routingId =>
157+
transport.disconnect(oldRoutingId)
158+
159+
// ❌ BAD: Unconditional cleanup
160+
_ <- transport.disconnect(oldRoutingId) // Might fail if already disconnected
161+
```
162+
163+
### Core Standard: Ordering Matters (Rating: 4)
164+
165+
**Clean up in reverse order of creation**:
166+
167+
1. **Stop new operations** (reject new requests)
168+
2. **Drain pending operations** (complete in-flight work)
169+
3. **Close network connections** (disconnect transport)
170+
4. **Remove application state** (clean up maps)
171+
5. **Notify cluster** (update replicated state)
172+
173+
```scala
174+
for {
175+
// 1. Stop accepting new work
176+
_ <- actionQueue.offer(Action.StopAccepting)
177+
178+
// 2. Let in-flight complete (or timeout)
179+
_ <- pendingRequests.awaitCompletion.timeout(10.seconds)
180+
181+
// 3. Close transport
182+
_ <- transport.disconnect(routingId)
183+
184+
// 4. Clean state
185+
newState = state.removeSession(sessionId)
186+
187+
// 5. Notify cluster
188+
_ <- raftActions.offer(RaftAction.ExpireSession(sessionId))
189+
} yield newState
190+
```
191+
192+
## Common Patterns
193+
194+
### Pattern 1: Soft Disconnect (Reconnectable)
195+
- Clear routing ID
196+
- Keep metadata and timeout
197+
- No Raft notification
198+
199+
### Pattern 2: Hard Disconnect (Permanent)
200+
- Remove all state
201+
- Disconnect transport
202+
- Notify Raft
203+
- Cannot reconnect
204+
205+
### Pattern 3: Reconnection
206+
- Clean up old resources first
207+
- Establish new resources second
208+
- Update all mappings
209+
- Refresh timeouts
210+
211+
## Common Pitfalls
212+
213+
1. **Forgetting transport cleanup**: Leads to socket leaks
214+
2. **Forgetting routing map cleanup**: Leads to message misrouting
215+
3. **Not checking for old resources**: Accumulates stale connections
216+
4. **Wrong cleanup order**: Can cause race conditions
217+
5. **Not handling idempotency**: Breaks on duplicate operations
218+
219+
## Usage Tracking
220+
221+
Applied in:
222+
- RaftServer.Leader.CloseSession (permanent removal)
223+
- RaftServer.Leader.ConnectionClosed (soft disconnect)
224+
- RaftServer.Leader.ContinueSession (reconnection with old cleanup)
225+
226+
## Validation Checklist
227+
228+
Before merging any session/connection management code:
229+
230+
- [ ] Listed all resource types at all layers
231+
- [ ] Implemented cleanup for each resource type
232+
- [ ] Handled reconnection by cleaning old first
233+
- [ ] Made cleanup operations idempotent
234+
- [ ] Tested with duplicate/stale connection scenarios
235+
- [ ] Verified no resource leaks with connection churn
236+
237+
## Scope
238+
239+
This rule applies to:
240+
- Session management code
241+
- Connection handling code
242+
- Any distributed resource with reconnection capability
243+
- Components with multi-layer resource ownership
244+
245+
---
246+
247+
**Status**: beta
248+
**Implementations**: 3 (PR #15)
249+
**Success Rate**: 100% after applying pattern

0 commit comments

Comments
 (0)