Skip to content

Commit 983e8ca

Browse files
committed
Remove external mode, fix server/client roles
- SDK now connects as client (hegeld is the server) - Removed HegelMode enum and mode-related functions - Renamed set_embedded_connection to set_connection - SDK waits for hegeld to start and initiates version negotiation Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1 parent 57d308f commit 983e8ca

File tree

3 files changed

+55
-165
lines changed

3 files changed

+55
-165
lines changed

.claude/CLAUDE.md

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -55,16 +55,16 @@ hegel-rust/
5555

5656
### How It Works
5757

58-
The SDK spawns the `hegel` CLI as a subprocess with `--client-mode`. The test binary creates a Unix socket server, and hegel connects for each test case. The build script (`build.rs`) automatically installs Python and hegel into cargo's `OUT_DIR/hegel` via uv if not found on PATH.
58+
The SDK creates a socket path and spawns the `hegel` CLI as a subprocess. Hegeld binds to the socket and listens for connections. The SDK then connects as a client, and a single persistent connection is maintained for the program run. Multiple tests can be executed over this connection. The build script (`build.rs`) automatically installs Python and hegel into cargo's `OUT_DIR/hegel` via uv if not found on PATH.
5959

6060
### Protocol
6161

62-
Each test case follows this handshake:
63-
1. Hegel connects to the SDK's socket
64-
2. Hegel sends: `{"is_last_run": bool}` (is_last_run=true on final replay for output)
65-
3. SDK responds: `{"type": "handshake_ack"}`
66-
4. SDK runs test, communicating via `generate`/`start_span`/`stop_span` commands
67-
5. SDK sends result: `{"type": "test_result", "result": "pass"|"fail"|"reject", ...}`
62+
The protocol uses CBOR encoding over multiplexed channels. For each test:
63+
1. SDK sends `run_test` request on control channel
64+
2. Hegeld sends `test_case` events with channel IDs for each test case
65+
3. SDK runs test function, sending `generate`/`start_span`/`stop_span` requests on the test channel
66+
4. SDK sends `mark_complete` with status (VALID, INVALID, or INTERESTING)
67+
5. After all test cases, hegeld sends `test_done` with results`
6868

6969
### Thread-Local State
7070

src/embedded.rs

Lines changed: 41 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
use crate::gen::{
2-
clear_embedded_connection, set_embedded_connection, set_is_last_run, set_mode,
3-
take_generated_values, HegelMode,
2+
clear_connection, set_connection, set_is_last_run, take_generated_values,
43
};
54
use crate::protocol::{
65
cbor_to_json, json_to_cbor, Channel, Connection,
@@ -9,11 +8,12 @@ use crate::protocol::{
98
use ciborium::Value as CborValue;
109
use serde_json::{json, Value};
1110
use std::cell::RefCell;
12-
use std::os::unix::net::UnixListener;
11+
use std::os::unix::net::UnixStream;
1312
use std::panic::{self, catch_unwind, AssertUnwindSafe};
1413
use std::process::{Command, Stdio};
1514
use std::sync::atomic::{AtomicBool, Ordering};
1615
use std::sync::{Arc, Once};
16+
use std::time::Duration;
1717
use tempfile::TempDir;
1818

1919
static PANIC_HOOK_INIT: Once = Once::new();
@@ -79,7 +79,7 @@ impl Verbosity {
7979
/// Special marker used to identify assume(false) panics.
8080
const REJECT_MARKER: &str = "HEGEL_REJECT";
8181

82-
/// Run property-based tests using Hegel in embedded mode with default options.
82+
/// Run property-based tests using Hegel with default options.
8383
///
8484
/// This is a convenience function for simple cases. For configuration options,
8585
/// use [`Hegel::new`] with the builder pattern.
@@ -178,18 +178,14 @@ where
178178
pub fn run(self) {
179179
init_panic_hook();
180180

181-
// Create temp directory with socket
181+
// Create temp directory with socket path
182182
let temp_dir = TempDir::new().expect("Failed to create temp directory");
183183
let socket_path = temp_dir.path().join("hegel.sock");
184184

185-
// Create Unix socket server
186-
let listener = UnixListener::bind(&socket_path).expect("Failed to bind socket");
187-
188-
// Build hegel command
185+
// Build hegel command - hegeld will bind to the socket and listen
189186
let hegel_path = self.hegel_path.as_deref().unwrap_or(HEGEL_BINARY_PATH);
190187
let mut cmd = Command::new(hegel_path);
191-
cmd.arg("--client-mode")
192-
.arg(&socket_path)
188+
cmd.arg(&socket_path)
193189
.arg("--verbosity")
194190
.arg(self.verbosity.as_str());
195191

@@ -204,29 +200,47 @@ where
204200

205201
let mut child = cmd.spawn().expect("Failed to spawn hegel");
206202

207-
// Accept the single connection from hegeld
208-
let stream = match listener.accept() {
209-
Ok((stream, _)) => stream,
210-
Err(e) => {
203+
// Wait for hegeld to create the socket and start listening
204+
let mut attempts = 0;
205+
let stream = loop {
206+
if socket_path.exists() {
207+
match UnixStream::connect(&socket_path) {
208+
Ok(stream) => break stream,
209+
Err(e) if attempts < 50 => {
210+
// Socket exists but not yet listening
211+
std::thread::sleep(Duration::from_millis(100));
212+
attempts += 1;
213+
continue;
214+
}
215+
Err(e) => {
216+
let _ = child.kill();
217+
panic!("Failed to connect to hegeld socket: {}", e);
218+
}
219+
}
220+
}
221+
if attempts >= 50 {
211222
let _ = child.kill();
212-
panic!("Failed to accept connection: {}", e);
223+
panic!("Timeout waiting for hegeld to create socket");
213224
}
225+
std::thread::sleep(Duration::from_millis(100));
226+
attempts += 1;
214227
};
215228

216229
// Create connection and perform version negotiation
217230
let connection = Connection::new(stream);
218231

219-
// Handle version negotiation (as server accepting from hegeld)
232+
// Initiate version negotiation (SDK is the client)
220233
let control = connection.control_channel();
221-
let (req_id, payload) = control.receive_request().expect("Failed to receive handshake");
222-
223-
if payload != VERSION_NEGOTIATION_MESSAGE {
224-
panic!("Invalid version negotiation message: {:?}", String::from_utf8_lossy(&payload));
234+
let req_id = control.send_request(VERSION_NEGOTIATION_MESSAGE.to_vec())
235+
.expect("Failed to send version negotiation");
236+
let response = control.receive_response(req_id)
237+
.expect("Failed to receive version response");
238+
239+
if response != VERSION_NEGOTIATION_OK {
240+
let _ = child.kill();
241+
panic!("Version negotiation failed: {:?}", String::from_utf8_lossy(&response));
225242
}
226243

227-
control.send_response(req_id, VERSION_NEGOTIATION_OK.to_vec())
228-
.expect("Failed to send version ack");
229-
230244
if self.verbosity == Verbosity::Debug {
231245
eprintln!("Version negotiation complete");
232246
}
@@ -346,17 +360,15 @@ fn run_test_case<F: FnMut()>(
346360
_verbosity: Verbosity,
347361
got_interesting: &Arc<AtomicBool>,
348362
) -> (String, Option<Value>) {
349-
// Set thread-local state
350-
set_mode(HegelMode::Embedded);
363+
// Set thread-local state for this test case
351364
set_is_last_run(is_final);
352-
set_embedded_connection(Arc::clone(connection), test_channel.clone_for_embedded());
365+
set_connection(Arc::clone(connection), test_channel.clone_for_embedded());
353366

354367
// Run test in catch_unwind
355368
let result = catch_unwind(AssertUnwindSafe(test_fn));
356369

357370
// Clear connection before returning (test is done generating)
358-
clear_embedded_connection();
359-
set_mode(HegelMode::External);
371+
clear_connection();
360372

361373
match result {
362374
Ok(()) => ("VALID".to_string(), None),

src/gen/mod.rs

Lines changed: 7 additions & 129 deletions
Original file line numberDiff line numberDiff line change
@@ -43,38 +43,9 @@ pub(crate) mod exit_codes {
4343
}
4444
use std::cell::{Cell, RefCell};
4545
use std::marker::PhantomData;
46-
use std::os::unix::net::UnixStream;
4746
use std::sync::Arc;
4847

49-
use crate::protocol::{cbor_to_json, json_to_cbor, negotiate_version, Channel, Connection};
50-
51-
// ============================================================================
52-
// Mode Management
53-
// ============================================================================
54-
55-
/// Operating mode for the SDK.
56-
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
57-
pub enum HegelMode {
58-
/// SDK connects to a running hegel server (external process runs tests)
59-
#[default]
60-
External,
61-
/// SDK accepts connections from hegel (embedded in test binary)
62-
Embedded,
63-
}
64-
65-
thread_local! {
66-
static MODE: Cell<HegelMode> = const { Cell::new(HegelMode::External) };
67-
}
68-
69-
/// Get the current operating mode.
70-
pub fn current_mode() -> HegelMode {
71-
MODE.with(|m| m.get())
72-
}
73-
74-
/// Set the operating mode (used by embedded module).
75-
pub(crate) fn set_mode(mode: HegelMode) {
76-
MODE.with(|m| m.set(mode));
77-
}
48+
use crate::protocol::{cbor_to_json, json_to_cbor, Channel, Connection};
7849

7950
// ============================================================================
8051
// State Management (Thread-Local)
@@ -92,7 +63,7 @@ pub(crate) fn is_last_run() -> bool {
9263
IS_LAST_RUN.with(|r| r.get())
9364
}
9465

95-
/// Set the is_last_run flag (used by embedded module).
66+
/// Set the is_last_run flag.
9667
pub(crate) fn set_is_last_run(is_last: bool) {
9768
IS_LAST_RUN.with(|r| r.set(is_last));
9869
}
@@ -137,90 +108,14 @@ fn is_debug() -> bool {
137108
std::env::var("HEGEL_DEBUG").is_ok()
138109
}
139110

140-
fn get_socket_path() -> String {
141-
std::env::var("HEGEL_SOCKET").expect("HEGEL_SOCKET environment variable not set")
142-
}
143-
144-
/// Check if we have an active connection.
145-
fn is_connected() -> bool {
146-
CONNECTION.with(|conn| conn.borrow().is_some())
147-
}
148-
149-
/// Get the current span depth.
150-
fn get_span_depth() -> usize {
151-
CONNECTION.with(|conn| {
152-
conn.borrow()
153-
.as_ref()
154-
.map(|s| s.span_depth)
155-
.unwrap_or(0)
156-
})
157-
}
158-
159-
/// Open a connection. Panics if already connected.
160-
pub(crate) fn open_connection() {
161-
CONNECTION.with(|conn| {
162-
let mut conn = conn.borrow_mut();
163-
assert!(
164-
conn.is_none(),
165-
"open_connection called while already connected"
166-
);
167-
168-
let socket_path = get_socket_path();
169-
let stream = match UnixStream::connect(&socket_path) {
170-
Ok(s) => s,
171-
Err(e) => {
172-
eprintln!(
173-
"Failed to connect to Hegel socket at {}: {}",
174-
socket_path, e
175-
);
176-
std::process::exit(exit_codes::SOCKET_ERROR);
177-
}
178-
};
179-
180-
let connection = Connection::new(stream);
181-
182-
// Perform version negotiation
183-
if let Err(e) = negotiate_version(&connection) {
184-
eprintln!("Version negotiation failed: {}", e);
185-
std::process::exit(exit_codes::SOCKET_ERROR);
186-
}
187-
188-
// Get the control channel for sending requests
189-
let channel = connection.control_channel();
190-
191-
*conn = Some(ConnectionState {
192-
connection,
193-
channel,
194-
span_depth: 0,
195-
});
196-
});
197-
}
198-
199-
/// Close the connection. Panics if not connected or if spans are still open.
200-
pub(crate) fn close_connection() {
201-
CONNECTION.with(|conn| {
202-
let mut conn = conn.borrow_mut();
203-
let state = conn
204-
.as_ref()
205-
.expect("close_connection called while not connected");
206-
assert_eq!(
207-
state.span_depth, 0,
208-
"close_connection called with {} unclosed span(s)",
209-
state.span_depth
210-
);
211-
*conn = None;
212-
});
213-
}
214-
215-
/// Set the connection from an already-connected stream (used by embedded module).
216-
/// This is used when the SDK creates a server and accepts connections from hegel.
111+
/// Set the connection for the current test case.
217112
/// The channel parameter is the test case channel assigned by the server.
218-
pub(crate) fn set_embedded_connection(connection: Arc<Connection>, channel: Channel) {
113+
pub(crate) fn set_connection(connection: Arc<Connection>, channel: Channel) {
219114
CONNECTION.with(|conn| {
220115
let mut conn = conn.borrow_mut();
221116
assert!(
222117
conn.is_none(),
223-
"set_embedded_connection called while already connected"
118+
"set_connection called while already connected"
224119
);
225120

226121
*conn = Some(ConnectionState {
@@ -231,8 +126,8 @@ pub(crate) fn set_embedded_connection(connection: Arc<Connection>, channel: Chan
231126
});
232127
}
233128

234-
/// Clear the embedded connection (used by embedded module).
235-
pub(crate) fn clear_embedded_connection() {
129+
/// Clear the connection after a test case completes.
130+
pub(crate) fn clear_connection() {
236131
CONNECTION.with(|conn| {
237132
*conn.borrow_mut() = None;
238133
});
@@ -332,19 +227,10 @@ pub(crate) fn request_from_schema(schema: &Value) -> Result<Value, StopTestError
332227

333228
/// Generate a value from a schema.
334229
pub fn generate_from_schema<T: serde::de::DeserializeOwned>(schema: &Value) -> T {
335-
// In embedded mode, connection is already set - don't try to open/close
336-
let need_connection = !is_connected() && current_mode() == HegelMode::External;
337-
if need_connection {
338-
open_connection();
339-
}
340-
341230
let result = match request_from_schema(schema) {
342231
Ok(v) => v,
343232
Err(StopTestError) => {
344233
// Server ran out of data - reject this test case
345-
if need_connection {
346-
close_connection();
347-
}
348234
crate::assume(false);
349235
unreachable!("assume(false) should not return")
350236
}
@@ -372,9 +258,6 @@ pub fn start_span(label: u64) {
372258
increment_span_depth();
373259
if let Err(StopTestError) = send_request("start_span", &json!({"label": label})) {
374260
decrement_span_depth();
375-
if get_span_depth() == 0 && current_mode() == HegelMode::External {
376-
close_connection();
377-
}
378261
crate::assume(false);
379262
}
380263
}
@@ -387,11 +270,6 @@ pub fn stop_span(discard: bool) {
387270
decrement_span_depth();
388271
// Ignore StopTest errors from stop_span - we're already closing
389272
let _ = send_request("stop_span", &json!({"discard": discard}));
390-
// Only close connection in external mode - in embedded mode, the
391-
// connection is managed by the embedded module
392-
if get_span_depth() == 0 && current_mode() == HegelMode::External {
393-
close_connection();
394-
}
395273
}
396274

397275
// ============================================================================

0 commit comments

Comments
 (0)