Skip to content

Commit 436be83

Browse files
westgatewestgate
authored andcommitted
S203b: LD-04 persistent connections + LD-05 socket separation
Fix primalSpring audit findings: - LD-04: Evolve HTTP to keep-alive loop, NDJSON to skip blank lines. Multi-step dispatch sequences no longer get broken pipe. - LD-05: Separate JSON-RPC (compute.sock) and tarpc (compute-tarpc.sock) sockets. Resolves internal bind collision and clears namespace for barraCuda. +7 tests. All quality gates green. Made-with: Cursor
1 parent d8555eb commit 436be83

7 files changed

Lines changed: 435 additions & 55 deletions

File tree

CHANGELOG.md

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,36 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
77

88
## [Unreleased] - April 12, 2026 (Sessions 43-203)
99

10+
### Session S203b (Apr 12, 2026) — primalSpring LD-04/LD-05: Persistent Connections + Socket Separation
11+
12+
#### LD-04: UDS/TCP Persistent Connection (Blocking)
13+
- EVOLVED: HTTP mode from single-shot (`Connection: close`, return) to HTTP/1.1 keep-alive loop
14+
- EVOLVED: NDJSON mode — empty lines between requests now skipped (previously broke connection)
15+
- FIXED: Multi-step dispatch sequences (submit → status → result) no longer get broken pipe
16+
- ADDED: `handle_http_keepalive_unix` / `handle_http_keepalive_tcp` — keep-alive loop respecting `Connection` header
17+
- ADDED: `handle_ndjson_unix` / `handle_ndjson_tcp` — extracted persistent NDJSON handlers
18+
- Files: `connection/unix.rs`, `connection/tcp.rs`
19+
20+
#### LD-05: Socket Namespace Separation
21+
- FIXED: JSON-RPC and tarpc no longer bind the same `compute.sock` (race condition: tarpc overwrote JSON-RPC socket)
22+
- SEPARATED: JSON-RPC primary → `compute.sock`, tarpc secondary → `compute-tarpc.sock`
23+
- ADDED: `tarpc_socket_filename_for_family()` in `unibin/format.rs` for family-scoped tarpc socket names
24+
- UPDATED: Shutdown cleanup handles both socket files
25+
- Files: `unibin/mod.rs`, `unibin/format.rs`
26+
27+
#### Tests
28+
- ADDED: `test_tcp_http_keepalive_multi_request` — two HTTP requests on one TCP connection
29+
- ADDED: `test_unix_http_keepalive_multi_request` — two HTTP requests on one UDS connection
30+
- ADDED: `test_ndjson_with_blank_lines_between_requests` — blank lines between NDJSON requests
31+
- ADDED: `test_ndjson_unix_persistent_multi_request` — three NDJSON requests on one UDS connection
32+
- ADDED: `tarpc_socket_filename_for_family_*` — 3 socket naming tests
33+
34+
#### Quality Gates
35+
- `cargo fmt`: PASS
36+
- `cargo clippy --workspace --all-targets`: PASS (0 warnings)
37+
- `cargo doc --workspace --no-deps`: PASS (0 warnings)
38+
- `cargo test --workspace`: PASS (0 failures)
39+
1040
### Session S203 (Apr 12, 2026) — Composition Elevation Sprint + Deep Debt Execution
1141

1242
#### Dispatch Wire Contract Standardization (Blocking Composition)

DEBT.md

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,31 @@ stable in Rust. Cannot resolve until Rust stabilizes this feature. The `#[async_
6262
dependency is pure Rust (proc-macro) and zero-overhead at runtime for non-dyn paths.
6363
**Not actionable** — resolves when Rust stabilizes the feature. Markers are accurate documentation.
6464

65+
### D-UDS-SINGLE-SHOT — RESOLVED S203
66+
**Crate**: `server` | **Audit**: LD-04 (primalSpring downstream)
67+
HTTP mode in `pure_jsonrpc/connection/unix.rs` and `tcp.rs` was single-shot:
68+
processed one request, wrote `Connection: close`, and returned. Multi-step
69+
dispatch sequences (submit → status → result) got broken pipe on second call.
70+
**Fix**: Evolved to HTTP/1.1 keep-alive loop — server reads subsequent HTTP
71+
requests on the same connection until client sends `Connection: close` or EOF.
72+
NDJSON mode also fixed: empty lines between requests now skipped (previously
73+
broke the connection). +7 tests covering keep-alive and NDJSON persistence.
74+
Files: `connection/unix.rs`, `connection/tcp.rs`, `connection/tests.rs`.
75+
76+
### D-SOCKET-NAMESPACE-COLLISION — RESOLVED S203
77+
**Crate**: `server` | **Audit**: LD-05 (primalSpring downstream)
78+
JSON-RPC and tarpc servers both bound the same `compute.sock` — tarpc's
79+
`serve_unix` removed JSON-RPC's socket file and re-bound, orphaning the
80+
JSON-RPC listener. Clients connecting to `compute.sock` would reach the
81+
tarpc binary framing, not JSON-RPC.
82+
**Fix**: Separated socket paths: JSON-RPC primary on `compute.sock`,
83+
tarpc secondary on `compute-tarpc.sock`. New `tarpc_socket_filename_for_family`
84+
helper generates family-scoped tarpc socket names. Cleanup at shutdown handles
85+
both sockets. This also resolves the barraCuda namespace conflict: toadStool
86+
claims `compute.sock` / `compute-tarpc.sock`, leaving `compute-math.sock`
87+
available for barraCuda.
88+
Files: `unibin/mod.rs`, `unibin/format.rs`.
89+
6590
## S203 Resolved Debt (Deep Audit & Evolution Execution)
6691

6792
### D-RUSTIX-DISPLAY-038 — RESOLVED S203

crates/server/src/pure_jsonrpc/connection/tcp.rs

Lines changed: 82 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -41,10 +41,11 @@ pub async fn serve_tcp(handler: Arc<JsonRpcHandler>, listener: TcpListener) -> S
4141
}
4242
}
4343

44-
/// Handle a single TCP connection.
44+
/// Handle a single TCP connection with persistent keep-alive.
4545
///
46-
/// Supports both HTTP (single request-response) and persistent NDJSON sessions
47-
/// per `PRIMAL_IPC_PROTOCOL.md`.
46+
/// Supports both HTTP/1.1 keep-alive and persistent NDJSON sessions per
47+
/// `PRIMAL_IPC_PROTOCOL.md`. Multi-step dispatch sequences (submit → status →
48+
/// result) and health checks reuse the same connection without reconnecting.
4849
pub(crate) async fn handle_tcp_connection(
4950
handler: Arc<JsonRpcHandler>,
5051
stream: TcpStream,
@@ -53,42 +54,93 @@ pub(crate) async fn handle_tcp_connection(
5354
let mut reader = BufReader::new(reader);
5455

5556
let mut first_line = String::new();
56-
reader
57+
let n = reader
5758
.read_line(&mut first_line)
5859
.await
5960
.map_err(|e| ServerError::Network(e.to_string()))?;
61+
if n == 0 {
62+
return Ok(());
63+
}
6064

6165
if first_line.starts_with("POST")
6266
|| first_line.starts_with("GET")
6367
|| first_line.starts_with("HTTP")
6468
{
65-
let (_headers, body) = read_http_request_continuation_tcp(&mut reader).await?;
66-
let response_body = process_request(&handler, &body).await?;
67-
write_http_response_tcp(&mut writer, &response_body).await?;
68-
return Ok(());
69+
return handle_http_keepalive_tcp(handler, &mut reader, &mut writer, first_line).await;
6970
}
7071

71-
// NDJSON session: process first line, then loop for subsequent lines
72-
let mut line = first_line;
72+
handle_ndjson_tcp(handler, &mut reader, &mut writer, first_line).await
73+
}
74+
75+
/// HTTP/1.1 keep-alive loop for TCP connections.
76+
async fn handle_http_keepalive_tcp(
77+
handler: Arc<JsonRpcHandler>,
78+
reader: &mut BufReader<tokio::net::tcp::OwnedReadHalf>,
79+
writer: &mut tokio::net::tcp::OwnedWriteHalf,
80+
first_request_line: String,
81+
) -> ServerResult<()> {
82+
let mut request_line = first_request_line;
7383
loop {
74-
let trimmed = line.trim();
75-
if trimmed.is_empty() {
84+
let (headers, body) = read_http_request_continuation_tcp(reader).await?;
85+
let response_body = process_request(&handler, &body).await?;
86+
87+
let client_wants_close = headers
88+
.get("connection")
89+
.is_some_and(|v| v.eq_ignore_ascii_case("close"));
90+
91+
write_http_response_tcp(writer, &response_body, client_wants_close).await?;
92+
93+
if client_wants_close {
7694
break;
7795
}
7896

79-
let response_body = process_request(&handler, trimmed.as_bytes()).await?;
80-
writer
81-
.write_all(&response_body)
82-
.await
83-
.map_err(|e| ServerError::Network(e.to_string()))?;
84-
writer
85-
.write_all(b"\n")
86-
.await
87-
.map_err(|e| ServerError::Network(e.to_string()))?;
88-
writer
89-
.flush()
97+
request_line.clear();
98+
let n = reader
99+
.read_line(&mut request_line)
90100
.await
91101
.map_err(|e| ServerError::Network(e.to_string()))?;
102+
if n == 0 {
103+
break;
104+
}
105+
let trimmed = request_line.trim();
106+
if trimmed.is_empty() {
107+
continue;
108+
}
109+
if !trimmed.starts_with("POST")
110+
&& !trimmed.starts_with("GET")
111+
&& !trimmed.starts_with("HTTP")
112+
{
113+
break;
114+
}
115+
}
116+
Ok(())
117+
}
118+
119+
/// NDJSON persistent session for TCP connections.
120+
async fn handle_ndjson_tcp(
121+
handler: Arc<JsonRpcHandler>,
122+
reader: &mut BufReader<tokio::net::tcp::OwnedReadHalf>,
123+
writer: &mut tokio::net::tcp::OwnedWriteHalf,
124+
first_line: String,
125+
) -> ServerResult<()> {
126+
let mut line = first_line;
127+
loop {
128+
let trimmed = line.trim();
129+
if !trimmed.is_empty() {
130+
let response_body = process_request(&handler, trimmed.as_bytes()).await?;
131+
writer
132+
.write_all(&response_body)
133+
.await
134+
.map_err(|e| ServerError::Network(e.to_string()))?;
135+
writer
136+
.write_all(b"\n")
137+
.await
138+
.map_err(|e| ServerError::Network(e.to_string()))?;
139+
writer
140+
.flush()
141+
.await
142+
.map_err(|e| ServerError::Network(e.to_string()))?;
143+
}
92144

93145
line.clear();
94146
let n = reader
@@ -99,7 +151,6 @@ pub(crate) async fn handle_tcp_connection(
99151
break;
100152
}
101153
}
102-
103154
Ok(())
104155
}
105156

@@ -140,12 +191,18 @@ async fn read_http_request_continuation_tcp(
140191
async fn write_http_response_tcp(
141192
writer: &mut tokio::net::tcp::OwnedWriteHalf,
142193
body: &[u8],
194+
closing: bool,
143195
) -> ServerResult<()> {
196+
let conn_header = if closing {
197+
"Connection: close"
198+
} else {
199+
"Connection: keep-alive"
200+
};
144201
let header = format!(
145202
"HTTP/1.1 200 OK\r\n\
146203
Content-Type: application/json\r\n\
147204
Content-Length: {}\r\n\
148-
Connection: close\r\n\
205+
{conn_header}\r\n\
149206
\r\n",
150207
body.len()
151208
);

crates/server/src/pure_jsonrpc/connection/tests.rs

Lines changed: 164 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,170 @@ async fn test_serve_unix_accepts_http_post() {
221221
assert!(response.contains("test-conn-1.0.0"));
222222
}
223223

224+
#[tokio::test]
225+
async fn test_tcp_http_keepalive_multi_request() {
226+
let handler = Arc::new(test_handler());
227+
let listener = TcpListener::bind("127.0.0.1:0").await.expect("bind");
228+
let addr = listener.local_addr().expect("addr");
229+
230+
let server_handler = Arc::clone(&handler);
231+
let _server = tokio::spawn(async move {
232+
let (stream, _) = listener.accept().await.expect("accept");
233+
handle_tcp_connection(server_handler, stream).await.expect("ok");
234+
});
235+
236+
let mut client = TcpStream::connect(addr).await.expect("connect");
237+
238+
// First request (keep-alive default)
239+
let body1 = r#"{"jsonrpc":"2.0","method":"toadstool.health","id":1}"#;
240+
let http1 = format!(
241+
"POST /rpc HTTP/1.1\r\nContent-Type: application/json\r\nContent-Length: {}\r\n\r\n{}",
242+
body1.len(),
243+
body1
244+
);
245+
client.write_all(http1.as_bytes()).await.expect("write1");
246+
247+
let mut buf = vec![0u8; 4096];
248+
let n = client.read(&mut buf).await.expect("read1");
249+
let resp1 = String::from_utf8_lossy(&buf[..n]);
250+
assert!(resp1.contains("HTTP/1.1 200 OK"), "first response ok");
251+
assert!(
252+
resp1.contains("Connection: keep-alive"),
253+
"keep-alive header present"
254+
);
255+
assert!(resp1.contains("healthy"), "first response has health data");
256+
257+
// Second request on same connection (Connection: close to end)
258+
let body2 = r#"{"jsonrpc":"2.0","method":"toadstool.version","id":2}"#;
259+
let http2 = format!(
260+
"POST /rpc HTTP/1.1\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{}",
261+
body2.len(),
262+
body2
263+
);
264+
client.write_all(http2.as_bytes()).await.expect("write2");
265+
266+
let mut buf2 = Vec::new();
267+
client.read_to_end(&mut buf2).await.expect("read2");
268+
let resp2 = String::from_utf8_lossy(&buf2);
269+
assert!(resp2.contains("HTTP/1.1 200 OK"), "second response ok");
270+
assert!(
271+
resp2.contains("Connection: close"),
272+
"close header on final response"
273+
);
274+
}
275+
276+
#[tokio::test]
277+
async fn test_unix_http_keepalive_multi_request() {
278+
let handler = Arc::new(test_handler());
279+
let dir = tempfile::tempdir().expect("tempdir");
280+
let socket_path = dir.path().join("keepalive.sock");
281+
282+
let server_handler = Arc::clone(&handler);
283+
let sock_path = socket_path.clone();
284+
let _server = tokio::spawn(async move {
285+
serve_unix(server_handler, sock_path).await.expect("serve");
286+
});
287+
288+
let mut stream = await_unix_socket(&socket_path).await;
289+
290+
// First request (keep-alive)
291+
let body1 = r#"{"jsonrpc":"2.0","method":"toadstool.health","id":1}"#;
292+
let http1 = format!(
293+
"POST /rpc HTTP/1.1\r\nContent-Type: application/json\r\nContent-Length: {}\r\n\r\n{}",
294+
body1.len(),
295+
body1
296+
);
297+
stream.write_all(http1.as_bytes()).await.expect("write1");
298+
299+
let mut buf = vec![0u8; 4096];
300+
let n = stream.read(&mut buf).await.expect("read1");
301+
let resp1 = String::from_utf8_lossy(&buf[..n]);
302+
assert!(resp1.contains("Connection: keep-alive"));
303+
304+
// Second request on same connection
305+
let body2 = r#"{"jsonrpc":"2.0","method":"toadstool.version","id":2}"#;
306+
let http2 = format!(
307+
"POST /rpc HTTP/1.1\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{}",
308+
body2.len(),
309+
body2
310+
);
311+
stream.write_all(http2.as_bytes()).await.expect("write2");
312+
313+
let mut buf2 = Vec::new();
314+
stream.read_to_end(&mut buf2).await.expect("read2");
315+
let resp2 = String::from_utf8_lossy(&buf2);
316+
assert!(resp2.contains("HTTP/1.1 200 OK"));
317+
assert!(resp2.contains("Connection: close"));
318+
}
319+
320+
#[tokio::test]
321+
async fn test_ndjson_with_blank_lines_between_requests() {
322+
let handler = Arc::new(test_handler());
323+
let listener = TcpListener::bind("127.0.0.1:0").await.expect("bind");
324+
let addr = listener.local_addr().expect("addr");
325+
326+
let server_handler = Arc::clone(&handler);
327+
let _server = tokio::spawn(async move {
328+
let (stream, _) = listener.accept().await.expect("accept");
329+
handle_tcp_connection(server_handler, stream).await.expect("ok");
330+
});
331+
332+
let mut client = TcpStream::connect(addr).await.expect("connect");
333+
334+
// Send two NDJSON requests with a blank line between them
335+
let requests = concat!(
336+
"{\"jsonrpc\":\"2.0\",\"method\":\"toadstool.health\",\"id\":1}\n",
337+
"\n",
338+
"{\"jsonrpc\":\"2.0\",\"method\":\"toadstool.version\",\"id\":2}\n",
339+
);
340+
client.write_all(requests.as_bytes()).await.expect("write");
341+
client.shutdown().await.ok();
342+
343+
let mut buf = Vec::new();
344+
client.read_to_end(&mut buf).await.expect("read");
345+
let text = String::from_utf8_lossy(&buf);
346+
let responses: Vec<&str> = text.lines().collect();
347+
assert!(
348+
responses.len() >= 2,
349+
"expected 2 responses, got {}: {text}",
350+
responses.len()
351+
);
352+
353+
let r1: serde_json::Value = serde_json::from_str(responses[0]).expect("json1");
354+
assert!(r1["result"]["healthy"].as_bool().is_some());
355+
let r2: serde_json::Value = serde_json::from_str(responses[1]).expect("json2");
356+
assert!(r2["result"]["version"].as_str().is_some());
357+
}
358+
359+
#[tokio::test]
360+
async fn test_ndjson_unix_persistent_multi_request() {
361+
let handler = Arc::new(test_handler());
362+
let dir = tempfile::tempdir().expect("tempdir");
363+
let socket_path = dir.path().join("ndjson-multi.sock");
364+
365+
let server_handler = Arc::clone(&handler);
366+
let sock_path = socket_path.clone();
367+
let _server = tokio::spawn(async move {
368+
serve_unix(server_handler, sock_path).await.expect("serve");
369+
});
370+
371+
let mut stream = await_unix_socket(&socket_path).await;
372+
373+
// Send three requests on the same connection
374+
let r1 = b"{\"jsonrpc\":\"2.0\",\"method\":\"toadstool.health\",\"id\":1}\n";
375+
let r2 = b"{\"jsonrpc\":\"2.0\",\"method\":\"toadstool.version\",\"id\":2}\n";
376+
let r3 = b"{\"jsonrpc\":\"2.0\",\"method\":\"toadstool.health\",\"id\":3}\n";
377+
stream.write_all(r1).await.expect("w1");
378+
stream.write_all(r2).await.expect("w2");
379+
stream.write_all(r3).await.expect("w3");
380+
stream.shutdown().await.ok();
381+
382+
let mut buf = Vec::new();
383+
stream.read_to_end(&mut buf).await.expect("read");
384+
let text = String::from_utf8_lossy(&buf);
385+
assert_eq!(text.lines().count(), 3, "expected 3 responses: {text}");
386+
}
387+
224388
#[tokio::test]
225389
async fn test_process_request_partial_json() {
226390
let handler = test_handler();

0 commit comments

Comments
 (0)