-
Notifications
You must be signed in to change notification settings - Fork 1
Add tests for reconnect callbacks #71
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
This commit adds tests for all supported connection lifecycle callbacks: - disconnected_cb: Called when connection is lost - reconnected_cb: Called when reconnection succeeds - closed_cb: Called when connection is explicitly closed - error_cb: Called when server sends an error message Tests cover individual callbacks as well as full lifecycle scenarios. Also includes reconnection_test.zig in the e2e test suite. Fixes #68 🤖 Generated with [Claude Code](https://claude.ai/code) Co-authored-by: Lukáš Lalinský <[email protected]>
WalkthroughRegisters a new reconnection test and callback tracker, adds a close-callback test, changes test utility createConnection signature to accept ConnectionOptions, and performs a large refactor of connection lifecycle, handshake, and reconnection logic in production code. Changes
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Assessment against linked issues
Out-of-scope changes
Possibly related PRs
✨ Finishing Touches🧪 Generate unit tests
🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. CodeRabbit Commands (Invoked using PR/Issue comments)Type Other keywords and placeholders
Status, Documentation and Community
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
🧹 Nitpick comments (5)
tests/reconnection_test.zig (5)
7-7: Guard the global tracker against parallel test execution.If tests run in parallel, this shared mutable state will race. Either ensure the test runner is serial, or add a simple global test guard mutex around tests that use
global_tracker.Example guard (apply to each test that uses
global_tracker):+var tracker_guard: std.Thread.Mutex = .{}; ... test "disconnected callback on connection loss" { - global_tracker.reset(); + tracker_guard.lock(); + defer tracker_guard.unlock(); + global_tracker.reset();
17-25: Reset under lock or after connect for stricter ordering.Today it's fine (you call reset before connecting), but if usage drifts, clearing
error_messagewhile callbacks could fire would be racy. Consider holding the lock for the whole reset or documenting the pre-connect requirement.Minimal change:
fn reset(self: *@This()) void { - self.disconnected_called.store(false, .release); - self.reconnected_called.store(false, .release); - self.closed_called.store(false, .release); - self.error_called.store(false, .release); self.mutex.lock(); defer self.mutex.unlock(); + self.disconnected_called.store(false, .release); + self.reconnected_called.store(false, .release); + self.closed_called.store(false, .release); + self.error_called.store(false, .release); self.error_message = null; }
81-106: Restart alone may not guarantee a disconnect event quickly; consider a short publish to force I/O.To tighten this test, you can publish once after restarting to force socket activity, then wait for the callback. This reduces flakes if TCP keeps the connection “alive” briefly.
// Force disconnection by restarting server try utils.runDockerCompose(std.testing.allocator, &.{ "restart", "nats-1" }); // Wait for disconnected callback with timeout - const callback_called = global_tracker.waitForCallback(&global_tracker.disconnected_called, 10000); + // Nudge the connection to observe the disconnect faster + _ = conn.publish("test.nudge", "nudge") catch {}; + const callback_called = global_tracker.waitForCallback(&global_tracker.disconnected_called, 10000);
144-166: Closed callback timing: 1s can be tight on loaded CI.Bump to 5s to deflake slow CI hosts.
- const callback_called = global_tracker.waitForCallback(&global_tracker.closed_called, 1000); + const callback_called = global_tracker.waitForCallback(&global_tracker.closed_called, 5000);
200-246: End-to-end lifecycle looks solid. Two minor robustness tweaks: wait for health after restart and small timeout cushion.After
docker-compose restart, add a brief health wait so reconnection isn’t racing a booting server. Also, 20s reconnected timeout can be tight on slow CI.- try utils.runDockerCompose(std.testing.allocator, &.{ "restart", "nats-1" }); + try utils.runDockerCompose(std.testing.allocator, &.{ "restart", "nats-1" }); + // Ensure the restarted instance is healthy before asserting reconnection + _ = utils.waitForHealthyServices(std.testing.allocator, 10_000) catch {}; @@ - const reconnected_called = global_tracker.waitForCallback(&global_tracker.reconnected_called, 20000); + const reconnected_called = global_tracker.waitForCallback(&global_tracker.reconnected_called, 30000);
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (2)
tests/all_tests.zig(1 hunks)tests/reconnection_test.zig(2 hunks)
🧰 Additional context used
📓 Path-based instructions (1)
**/*.zig
📄 CodeRabbit inference engine (CLAUDE.md)
Format code using
zig fmtbefore commit
Files:
tests/all_tests.zigtests/reconnection_test.zig
🧠 Learnings (3)
📓 Common learnings
Learnt from: lalinsky
PR: lalinsky/nats.zig#31
File: tests/jetstream_pull_test.zig:21-23
Timestamp: 2025-08-25T14:51:33.570Z
Learning: The nats.zig project uses a "tests:beforeEach" test in tests/all_tests.zig that runs before each test to clean up all JetStream streams and consumers, ensuring test isolation. This eliminates the need for unique test resource names as each test starts with a clean JetStream state.
Learnt from: lalinsky
PR: lalinsky/nats.zig#67
File: src/dispatcher.zig:20-20
Timestamp: 2025-08-30T21:03:43.986Z
Learning: In the nats.zig project, test_runner.zig and files in the examples/ directory are fine to keep their std.debug.print calls - these don't need to be replaced with the centralized logging system.
Learnt from: lalinsky
PR: lalinsky/nats.zig#66
File: src/connection.zig:317-323
Timestamp: 2025-08-30T18:48:28.310Z
Learning: In src/connection.zig, the establishConnection() call in connectToServer() is during initial connection setup where no race conditions exist - the connection starts disconnected and other threads would see the connecting status and wait appropriately.
📚 Learning: 2025-08-29T10:22:22.247Z
Learnt from: CR
PR: lalinsky/nats.zig#0
File: CLAUDE.md:0-0
Timestamp: 2025-08-29T10:22:22.247Z
Learning: Run `zig build test` to confirm your changes are working
Applied to files:
tests/all_tests.zig
📚 Learning: 2025-08-25T14:51:33.570Z
Learnt from: lalinsky
PR: lalinsky/nats.zig#31
File: tests/jetstream_pull_test.zig:21-23
Timestamp: 2025-08-25T14:51:33.570Z
Learning: The nats.zig project uses a "tests:beforeEach" test in tests/all_tests.zig that runs before each test to clean up all JetStream streams and consumers, ensuring test isolation. This eliminates the need for unique test resource names as each test starts with a clean JetStream state.
Applied to files:
tests/all_tests.zigtests/reconnection_test.zig
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: test
🔇 Additional comments (4)
tests/reconnection_test.zig (3)
80-80: No comment needed.
108-143: Good coverage: both disconnected and reconnected paths, plus post-reconnect traffic.
168-198: No changes needed:processErris already public
processErris declared aspub fnin src/connection.zig (line 1156) and src/parser.zig (line 611), so invoking it in tests doesn’t break encapsulation.tests/all_tests.zig (1)
11-11: LGTM: reconnection tests are now part of the main suite.Order is fine after core request/reply.
- Fix thread safety by adding global tracker guard mutex - Fix memory ownership in error callback by copying to owned buffer - Fix timing issues in waitForCallback with proper signed integer math - Enable reconnection in disconnected callback test with limited retries - Increase timeouts and add service health checks for robustness - Add connection nudges to force disconnect detection 🤖 Generated with [Claude Code](https://claude.ai/code) Co-authored-by: Lukáš Lalinský <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (1)
tests/reconnection_test.zig (1)
19-27: Nit: clear the error buffer on reset for cleaner state.Not required for correctness, but it helps debugging and avoids stale bytes during future assertions.
Apply:
fn reset(self: *@This()) void { self.mutex.lock(); defer self.mutex.unlock(); self.disconnected_called.store(false, .release); self.reconnected_called.store(false, .release); self.closed_called.store(false, .release); self.error_called.store(false, .release); self.error_message = null; + std.mem.set(u8, self.error_buf[0..], 0); }
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (1)
tests/reconnection_test.zig(2 hunks)
🧰 Additional context used
📓 Path-based instructions (1)
**/*.zig
📄 CodeRabbit inference engine (CLAUDE.md)
Format code using
zig fmtbefore commit
Files:
tests/reconnection_test.zig
🧠 Learnings (2)
📚 Learning: 2025-08-25T14:51:33.570Z
Learnt from: lalinsky
PR: lalinsky/nats.zig#31
File: tests/jetstream_pull_test.zig:21-23
Timestamp: 2025-08-25T14:51:33.570Z
Learning: The nats.zig project uses a "tests:beforeEach" test in tests/all_tests.zig that runs before each test to clean up all JetStream streams and consumers, ensuring test isolation. This eliminates the need for unique test resource names as each test starts with a clean JetStream state.
Applied to files:
tests/reconnection_test.zig
📚 Learning: 2025-08-30T07:40:28.186Z
Learnt from: lalinsky
PR: lalinsky/nats.zig#60
File: src/queue.zig:390-393
Timestamp: 2025-08-30T07:40:28.186Z
Learning: In the nats.zig project, the maintainer prefers to rely on Zig's built-in integer overflow detection rather than implementing manual saturated conversions for timeout calculations (timeout_ms * std.time.ns_per_ms). Zig automatically fails on integer overflow in Debug and ReleaseSafe modes.
Applied to files:
tests/reconnection_test.zig
🪛 GitHub Actions: CI
tests/reconnection_test.zig
[error] 99-99: zig test Debug native failed with compile error: no field named 'wait_ms' in struct 'connection.ReconnectOptions' (tests/reconnection_test.zig:99). Note: 'ReconnectOptions' is defined in src/connection.zig:156.
🔇 Additional comments (9)
tests/reconnection_test.zig (9)
7-9: Global tracker + guard: fine for serializing callback tests.Using a single global tracker with a per-test guard avoids cross-test races in the callback assertions. Keep it.
10-17: Atomics + owned error buffer: good baseline.The tracker fields are sane; using atomics for flags and a fixed buffer for the error message is the right call.
29-42: Callbacks only flip atomics (release): looks good.Simple, non-blocking callbacks with release stores are appropriate here.
44-52: Error callback ordering and ownership fixed.Copying into error_buf first, then setting the flag with release, prevents readers from seeing a half-initialized message. Thanks for addressing earlier feedback.
54-65: Deadline-based wait avoids signed/unsigned pitfalls.This resolves the i64 vs u64 delta issue and keeps the loop simple.
120-158: Reconnected path looks reasonable.Adding additional servers and waiting for health reduces flakiness. Timeouts are generous.
160-184: Closed callback test is fine.Direct close followed by a bounded wait is straightforward.
220-270: End-to-end lifecycle test: solid coverage.Covers disconnect, reconnect, post-reconnect IO, and close. No changes needed.
186-218: KeeptryonprocessErr; it returns!voidand is public
processErris declared aspub fn processErr(...)\n !void, sotry conn.processErr(...)is correct and required, and the function is public for testing.Likely an incorrect or invalid review comment.
|
@claude we need to readd reconnect thread, as we need readLoop to run while we are in doReconnect, think and prepare a plan |
|
Claude finished @lalinsky's task —— View job Reconnect Thread Implementation PlanCurrent Problem: Why Reader Thread Must Continue:
Implementation Plan:1. Restore Reconnect Thread Infrastructure// Add back to Connection struct:
reconnect_thread: ?std.Thread = null,
in_reconnect: std.atomic.Value(bool) = std.atomic.Value(bool).init(false),
reconnect_condition: std.Thread.Condition = .{},2. Modify Reader Loop Behavior// In readerLoop() - change from:
if (self.getStatus() == .disconnected) {
self.doReconnect(); // BLOCKS reader thread
continue;
}
// To:
if (self.getStatus() == .disconnected) {
continue; // Keep processing, let reconnect thread handle reconnection
}3. Update markNeedsReconnect()// Spawn reconnect thread instead of just marking status:
self.status = .disconnected;
if (!self.in_reconnect.swap(true, .acq_rel)) {
self.reconnect_thread = try std.Thread.spawn(.{}, doReconnectThread, .{self});
}4. Socket Management During Reconnection
5. Thread Cleanup in deinit()// Join reconnect thread in deinit():
if (self.reconnect_thread) |thread| {
thread.join();
self.reconnect_thread = null;
}This restores concurrent operation: reader processes server messages while reconnect thread handles connection restoration in parallel. Ready to implement? This will require careful threading coordination but solves the fundamental blocking issue. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
src/connection.zig (1)
549-561: Critical: PUB can be sent before CONNECT during initial connectAllowing publish in .connecting and appending to write_buffer risks sending HPUB/PUB before CONNECT+PING (protocol violation). Buffer publishes during both .connecting and .reconnecting, then flush after handshake success.
Apply this diff:
@@ - log.debug("status = {}", .{self.status}); + if (self.options.trace) { + log.debug("status = {}", .{self.status}); + } @@ - switch (self.status) { - .connected, .connecting, .reconnecting => {}, + switch (self.status) { + .connected, .connecting, .reconnecting => {}, else => { return ConnectionError.ConnectionClosed; }, } @@ - // Published messages go to pending_buffer during reconnection, otherwise write_buffer - if (self.status == .reconnecting and self.options.reconnect.allow_reconnect) { - try self.pending_buffer.append(buffer.items); - } else { - try self.write_buffer.append(buffer.items); - } + // During both reconnecting and initial connecting, buffer to pending. + // Only write directly when fully connected. + const status = self.status; + if (status == .connected) { + try self.write_buffer.append(buffer.items); + } else { + try self.pending_buffer.append(buffer.items); + }And flush the pending buffer right after a successful initial handshake:
@@ - // Handshake completed successfully - self.status = .connected; + // Handshake completed successfully + self.status = .connected; + // Flush any messages queued during initial connect + self.pending_buffer.moveToBuffer(&self.write_buffer) catch |err| { + log.warn("Failed to flush pending messages after initial connect: {}", .{err}); + };Also applies to: 383-387, 544-546
♻️ Duplicate comments (1)
tests/reconnection_test.zig (1)
52-60: If you plan to assert on error text later, own the message buffer.Currently you ignore
msg. If future tests inspect it, copy into a small fixed buffer owned by the tracker to avoid dangling slices.error_called: u32 = 0, + error_message: ?[]const u8 = null, + error_buf: [256]u8 = undefined, @@ fn errorCallback(conn: *nats.Connection, msg: []const u8) void { var self = &tracker; self.mutex.lock(); defer self.mutex.unlock(); self.error_called += 1; + const n = @min(msg.len, self.error_buf.len); + std.mem.copy(u8, self.error_buf[0..n], msg[0..n]); + self.error_message = self.error_buf[0..n]; self.cond.signal(); _ = conn; - _ = msg; }
🧹 Nitpick comments (4)
src/connection.zig (2)
323-334: Guard connect() status check with the mutex-backed getterUse getStatus() (locks) to avoid a racy raw read of self.status.
Apply this diff:
- if (self.status != .closed) { + if (self.getStatus() != .closed) { return ConnectionError.ConnectionFailed; }Follow-up: If concurrent connect() calls are possible, consider promoting the state transition to .connecting inside connect() under the same lock (or add a one-shot guard) to prevent double inits.
1313-1391: Reconnection holds the main mutex across blocking ops; consider reducing lock scopedoReconnect() keeps self.mutex locked across socket connect and handshake wait, blocking publish() and other callers unnecessarily. Consider:
- Lock only to check/transition state and to mutate shared fields.
- Perform establishConnection() and network sleeps outside the lock.
- Reacquire the lock to update state and move buffers.
tests/reconnection_test.zig (2)
107-107: Synchronize the final read of the counter to avoid a data race.Counters are incremented under
mutex; read them under the same lock (or make them atomics). Low risk here, but it keeps the access pattern consistent.- try testing.expectEqual(1, tracker.reconnected_called); + tracker.mutex.lock(); + defer tracker.mutex.unlock(); + try testing.expectEqual(@as(u32, 1), tracker.reconnected_called);
62-67:timedWaitexists but isn’t used; either remove it or use it to await callbacks.Right now you busy-wait on
isConnected(). Given you already signal the condition in callbacks, prefer waiting on the predicate to avoid polling. Alternatively, dropcond/timedWaitfrom this test.Example (outside this hunk) to wait for the reconnected callback:
// Add to CallbackTracker: fn waitForReconnected(self: *@This(), timeout_ms: u32) !void { const start = std.time.milliTimestamp(); self.mutex.lock(); defer self.mutex.unlock(); while (self.reconnected_called == 0) { const now = std.time.milliTimestamp(); const deadline = start + @as(i64, @intCast(timeout_ms)); if (now >= deadline) return error.StillNotConnected; const remaining_ms = @max(@as(i64, 1), deadline - now); try self.cond.timedWait(self.mutex, @as(u64, @intCast(remaining_ms)) * std.time.ns_per_ms); } }Then in the test, replace the timer loop with:
try tracker.waitForReconnected(10_000);
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (4)
src/connection.zig(13 hunks)tests/callbacks_test.zig(1 hunks)tests/reconnection_test.zig(1 hunks)tests/utils.zig(2 hunks)
🧰 Additional context used
📓 Path-based instructions (1)
**/*.zig
📄 CodeRabbit inference engine (CLAUDE.md)
Format code using
zig fmtbefore commit
Files:
tests/utils.zigtests/callbacks_test.zigtests/reconnection_test.zigsrc/connection.zig
🧠 Learnings (3)
📚 Learning: 2025-08-25T14:51:33.570Z
Learnt from: lalinsky
PR: lalinsky/nats.zig#31
File: tests/jetstream_pull_test.zig:21-23
Timestamp: 2025-08-25T14:51:33.570Z
Learning: The nats.zig project uses a "tests:beforeEach" test in tests/all_tests.zig that runs before each test to clean up all JetStream streams and consumers, ensuring test isolation. This eliminates the need for unique test resource names as each test starts with a clean JetStream state.
Applied to files:
tests/callbacks_test.zigtests/reconnection_test.zig
📚 Learning: 2025-08-30T07:40:28.186Z
Learnt from: lalinsky
PR: lalinsky/nats.zig#60
File: src/queue.zig:390-393
Timestamp: 2025-08-30T07:40:28.186Z
Learning: In the nats.zig project, the maintainer prefers to rely on Zig's built-in integer overflow detection rather than implementing manual saturated conversions for timeout calculations (timeout_ms * std.time.ns_per_ms). Zig automatically fails on integer overflow in Debug and ReleaseSafe modes.
Applied to files:
tests/reconnection_test.zig
📚 Learning: 2025-08-30T18:48:28.333Z
Learnt from: lalinsky
PR: lalinsky/nats.zig#66
File: src/connection.zig:317-323
Timestamp: 2025-08-30T18:48:28.333Z
Learning: In src/connection.zig, the establishConnection() call in connectToServer() is during initial connection setup where no race conditions exist - the connection starts disconnected and other threads would see the connecting status and wait appropriately.
Applied to files:
src/connection.zig
🔇 Additional comments (11)
src/connection.zig (7)
115-120: Status model changes look saneclosed/disconnected/reconnecting split is clear and matches callback semantics.
187-187: Good default to .closedStarting closed avoids “half-initialized” edge cases.
450-458: Helpers are a nice touchisConnected/isConnecting improve call-site clarity.
834-837: Handshake trace log is usefulNo issues.
864-867: Reader loop reconnection trigger looks correctTransitions to doReconnect only from .disconnected.
1296-1304: Disconnected callback deferral is correctState change under lock; callback fired outside. Good.
1317-1322: Reconnect gating is correctOnly proceeds from .disconnected; transitions to .reconnecting under lock.
tests/utils.zig (2)
13-27: API change to pass ConnectionOptions through init is goodcreateConnection now accepts opts and wires them into Connection.init. Error handling and cleanup look fine.
29-35: Call sites updated correctlyDefaults use .{}; wrong-port helper matches the new signature.
tests/callbacks_test.zig (1)
17-33: Test intent is solidAsserts exactly one
closed_cbinvocation on deinit. Verified no otherutils.createConnectionOptionreferences.tests/reconnection_test.zig (1)
72-83: Callback wiring looks correct.All four callbacks are registered; options are minimal and clear. Good.
|
@claude update the plan: markNeedsReconnect/doReconnect run under mutex, we don't need atomics In_reconnect can be replaced by checking if the reconnect thread is non null When doReconnect exists, it should set the detach the thread and mark it as null readerLoop should just call readIteration in a loop, don't busy loop on status, no need to change Acquiresocket |
|
Claude finished @lalinsky's task —— View job Updated Reconnect Thread Implementation PlanTodo List:
Updated Plan Based on FeedbackCurrent Problem: Revised Implementation Plan:1. Add Reconnect Thread to Connection Struct// Add to Connection struct:
reconnect_thread: ?std.Thread = null,2. Update markNeedsReconnect() - Spawn Threadfn markNeedsReconnect(self: *Self, err: anyerror) void {
// ... existing mutex lock and status checks ...
self.status = .disconnected;
self.cleanupFailedConnection(err, false);
// Spawn reconnect thread if not already running
if (self.reconnect_thread == null) {
self.reconnect_thread = std.Thread.spawn(.{}, doReconnectThread, .{self}) catch |spawn_err| {
log.err("Failed to spawn reconnect thread: {}", .{spawn_err});
// Fall back to closing connection
needs_close = true;
return;
};
}
// Fire disconnected callback
if (self.options.callbacks.disconnected_cb) |cb| {
callback = cb;
}
}3. New doReconnectThread() Functionfn doReconnectThread(self: *Self) void {
defer {
self.mutex.lock();
defer self.mutex.unlock();
// Detach thread and mark as null
if (self.reconnect_thread) |thread| {
thread.detach();
self.reconnect_thread = null;
}
}
// Run existing doReconnect logic but without holding mutex across blocking operations
self.doReconnect();
}4. Simplify Reader Loopfn readerLoop(self: *Self) void {
log.debug("Reader loop started", .{});
while (!self.should_stop.load(.acquire)) {
self.readerIteration() catch |err| {
switch (err) {
error.ShouldStop => break,
else => {
self.markNeedsReconnect(err);
continue;
},
}
};
}
log.debug("Reader loop exited", .{});
}5. Update doReconnect() - Reduce Mutex Scopefn doReconnect(self: *Self) void {
// Check if we should reconnect (with mutex)
{
self.mutex.lock();
defer self.mutex.unlock();
if (self.status != .disconnected) return;
self.status = .reconnecting;
}
// Perform reconnection logic WITHOUT holding mutex
// (establishConnection, handshake waits, sleep delays)
// Update state and flush buffers (with mutex)
{
self.mutex.lock();
defer self.mutex.unlock();
self.status = .connected;
// ... flush pending buffer, fire callbacks ...
}
}6. Thread Cleanup in deinit()// Join reconnect thread in deinit():
if (self.reconnect_thread) |thread| {
thread.join();
self.reconnect_thread = null;
}Key Benefits:
Ready to implement this revised approach? |
|
@claude do NOT change doReconnect, the entire function should be under mutex, as it currently is Other than that, your plan is good Implement it |
|
Claude finished @lalinsky's task —— View job Implementing Reconnect Thread ✅Plan: Add reconnect thread to prevent reader thread blocking during reconnection, keeping doReconnect function entirely under mutex as requested. Todo List:
Implementation SummarySuccessfully implemented the reconnect thread infrastructure:
Key Changes:
Commit: |
- Add reconnect_thread field to Connection struct - Spawn reconnect thread in markNeedsReconnect instead of blocking reader - Add doReconnectThread wrapper that handles thread cleanup - Simplify readerLoop to just call readerIteration - Fix test compilation issues (createConnection vs createConnectionOption) - Fix publish timing in reconnection test to wait for connection first - Update deinit to cleanup reconnect thread 🤖 Generated with [Claude Code](https://claude.ai/code) Co-authored-by: Lukáš Lalinský <[email protected]>
|
@claude one more change We no longer need the disconnected state. We go from connected to either reconnecting or closed. Publishing messages should be allowed in either connected or reconnecting state. |
|
Claude finished @lalinsky's task —— View job Removing Disconnected State ✅Task: Remove Todo List:
Changes ImplementedState Model Simplified:
Key Changes:
Testing: ✅ All 79 tests pass, including reconnection test (203ms) Committed: |
- Remove .disconnected from ConnectionStatus enum - State transitions: connected → reconnecting → connected (or closed) - Allow publishing in both .connected and .reconnecting states - Update markNeedsReconnect to transition directly to .reconnecting - Update doReconnect to expect .reconnecting state 🤖 Generated with [Claude Code](https://claude.ai/code) Co-authored-by: Lukáš Lalinský <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (3)
src/connection.zig(14 hunks)tests/callbacks_test.zig(1 hunks)tests/reconnection_test.zig(1 hunks)
🧰 Additional context used
📓 Path-based instructions (1)
**/*.zig
📄 CodeRabbit inference engine (CLAUDE.md)
Format code using
zig fmtbefore commit
Files:
tests/callbacks_test.zigtests/reconnection_test.zigsrc/connection.zig
🧠 Learnings (3)
📚 Learning: 2025-08-25T14:51:33.570Z
Learnt from: lalinsky
PR: lalinsky/nats.zig#31
File: tests/jetstream_pull_test.zig:21-23
Timestamp: 2025-08-25T14:51:33.570Z
Learning: The nats.zig project uses a "tests:beforeEach" test in tests/all_tests.zig that runs before each test to clean up all JetStream streams and consumers, ensuring test isolation. This eliminates the need for unique test resource names as each test starts with a clean JetStream state.
Applied to files:
tests/callbacks_test.zigtests/reconnection_test.zig
📚 Learning: 2025-08-30T07:40:28.186Z
Learnt from: lalinsky
PR: lalinsky/nats.zig#60
File: src/queue.zig:390-393
Timestamp: 2025-08-30T07:40:28.186Z
Learning: In the nats.zig project, the maintainer prefers to rely on Zig's built-in integer overflow detection rather than implementing manual saturated conversions for timeout calculations (timeout_ms * std.time.ns_per_ms). Zig automatically fails on integer overflow in Debug and ReleaseSafe modes.
Applied to files:
tests/reconnection_test.zig
📚 Learning: 2025-08-30T18:48:28.333Z
Learnt from: lalinsky
PR: lalinsky/nats.zig#66
File: src/connection.zig:317-323
Timestamp: 2025-08-30T18:48:28.333Z
Learning: In src/connection.zig, the establishConnection() call in connectToServer() is during initial connection setup where no race conditions exist - the connection starts disconnected and other threads would see the connecting status and wait appropriately.
Applied to files:
tests/reconnection_test.zigsrc/connection.zig
🔇 Additional comments (17)
src/connection.zig (11)
115-120: Good enum reordering.Moving
closedto the front anddisconnectedafterconnectedmakes the state machine flow clearer. The removal of draining states simplifies the logic.
187-187: Correct default status.Starting with
.closedinstead of.disconnectedis the right choice - connections begin closed until explicitly opened.
212-212: Proper thread cleanup for reconnect_thread.Adding the reconnect thread field and joining it in deinit ensures clean shutdown without thread leaks.
Also applies to: 273-276
365-369: Good error handling pattern.Properly setting status to closed and cleaning up on connection failure prevents resource leaks and ensures consistent state.
455-462: Clean helper methods.The
isConnectedandisConnectinghelpers provide clear API for status checks. Good abstraction.
1045-1079: Well-structured handshake implementation.The
sendConnectAndPingmethod cleanly encapsulates the handshake protocol. Sending both CONNECT and PING together is efficient.
1297-1298: Good state transition for reconnection.Setting status to
.disconnectedbefore spawning reconnect thread prevents race conditions.
1300-1309: Thread spawn error handling looks good.Falling back to close on thread spawn failure is the right recovery strategy.
1316-1331: Thread detach pattern is correct.The deferred cleanup with thread detach and null assignment prevents dangling thread references.
1343-1348: Proper state machine transition.Only proceeding with reconnection when status is
.disconnectedand then immediately transitioning to.reconnectingprevents concurrent reconnection attempts.
1401-1413: Good reconnection flow.The establish connection followed by handshake completion with proper error cleanup on each failure is solid.
tests/callbacks_test.zig (1)
20-29: Test implementation looks correct.Good use of the updated
createConnectionAPI with the options parameter. The callback structure and test flow are proper.tests/reconnection_test.zig (5)
10-17: Thread-safe callback tracking implementation.Good use of mutex protection for the counters. The condition variable for signaling is a nice touch for synchronization.
25-60: Callback implementations are correct.All four callbacks properly lock the mutex, increment their counter, and signal the condition. The pattern is consistent across all of them.
62-66: Potential integer overflow in timeout calculation.The multiplication
timeout_ms * std.time.ns_per_mscould overflow for large timeout values.Based on the learnings, the project prefers to rely on Zig's built-in overflow detection rather than manual checks. Since this is test code with reasonable timeout values, this is acceptable.
93-102: Good fix for the race condition.Waiting for the connection to be established before publishing prevents the flaky test issue. The timeout check prevents infinite hangs.
108-108: Test assertion validates the callback.Good verification that the reconnected callback was invoked exactly once. This confirms the reconnection mechanism works as expected.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
♻️ Duplicate comments (2)
src/connection.zig (2)
328-330: Fix misleading error condition comment.The error check verifies
status != .closedbut any error message or documentation would imply it's about connected state. This creates confusion about the actual precondition.
552-557: Allow publishing during connecting state is risky.The switch statement allows
.connectingstate for publishes, but the TCP connection and NATS handshake might not be complete yet. Publishing during.connectingcould fail or result in lost messages since the server hasn't acknowledged the connection yet.
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (1)
src/connection.zig(13 hunks)
🧰 Additional context used
📓 Path-based instructions (1)
**/*.zig
📄 CodeRabbit inference engine (CLAUDE.md)
Format code using
zig fmtbefore commit
Files:
src/connection.zig
🧠 Learnings (1)
📚 Learning: 2025-08-30T18:48:28.333Z
Learnt from: lalinsky
PR: lalinsky/nats.zig#66
File: src/connection.zig:317-323
Timestamp: 2025-08-30T18:48:28.333Z
Learning: In src/connection.zig, the establishConnection() call in connectToServer() is during initial connection setup where no race conditions exist - the connection starts disconnected and other threads would see the connecting status and wait appropriately.
Applied to files:
src/connection.zig
🔇 Additional comments (13)
src/connection.zig (13)
115-119: LGTM! Status enum simplified correctly.The ConnectionStatus now contains only the essential states (closed, connecting, connected, reconnecting) as requested in the PR objectives. Removing the disconnected state eliminates the intermediate transition and follows the updated plan.
186-186: Default status correctly set to closed.This aligns with the new lifecycle model where connections start in .closed state and transition directly to .connecting when connect() is called.
211-211: Reconnect thread field added appropriately.The private
reconnect_thread: ?std.Thread = nullfield supports the new dedicated reconnection architecture as outlined in the PR objectives.
272-275: Proper thread cleanup in deinit.The reconnect thread is properly joined and nullified during cleanup, ensuring no thread leaks.
354-357: Proper cleanup on server selection failure.When no server is available, the connection status is correctly reset to .closed before returning an error.
364-368: Connection establishment failure handling looks good.The function properly cleans up failed connections, sets status to closed, and propagates the error. This provides consistent state management during connection failures.
380-385: Consistent handshake failure cleanup.Handshake failures are handled consistently with connection establishment failures - status is set to closed and cleanup is performed before returning the error.
454-461: New status helper methods implemented correctly.The
isConnected()andisConnecting()methods provide clean public API for checking connection state. TheisConnecting()correctly includes both.connectingand.reconnectingstates.
548-549: Debug logging added for publish status.This trace logging helps with debugging publish behavior across different connection states.
838-840: Trace logging for connection establishment.Good debugging trace for when connections are established and handshake begins.
1299-1307: Thread spawning logic implemented correctly.The reconnect thread is spawned only when
reconnect_thread == null, preventing multiple reconnect threads from running concurrently. Proper error handling falls back to closing the connection if thread spawn fails.
1398-1403: Connection establishment during reconnect.The reconnect logic correctly calls
establishConnectionwithin the loop and handles failures by cleaning up and continuing to the next attempt.
1405-1410: Handshake handling during reconnect.The handshake completion wait is properly handled during reconnect attempts, with appropriate cleanup on failure.
|
@claude add checks for disconnected/reconnected callbacks. |
|
Claude finished @lalinsky's task —— View job Adding Checks for Disconnected/Reconnected Callbacks ✅Task: Add proper checks for disconnected/reconnected callbacks in the test suite. Todo List:
Changes ImplementedEnhanced Callback Validation:
Test Results: ✅ All 79 tests pass, including:
Key Improvements:
Commit: |
- Add disconnected callback check in reconnection test - Make callbacks_test.zig thread-safe with atomic counters - Verify both disconnected and reconnected callbacks fire during server restart - Use proper type casting and mutex protection for counter reads 🤖 Generated with [Claude Code](https://claude.ai/code) Co-authored-by: Lukáš Lalinský <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (5)
tests/reconnection_test.zig (5)
8-8: Global tracker is fine for this testA single, file-scoped tracker is acceptable here. If you later parallelize tests, consider per-test instances to avoid cross-test interference.
52-60: Ignoring the error message is acceptable for this testGiven you aren’t asserting on error content, not storing msg avoids ownership hassles. If you later validate error payloads, switch to an owned buffer.
62-67: timedWait helper isn’t used and is fragile against spurious wakeupsEither remove it or turn it into a predicate-based wait that loops until a condition is met (or a timeout). Right now it’s a single wait with no predicate.
108-113: Avoid race on reading counters; wait for callbacks, then assert >= 1There’s a small chance you read counts before the callbacks fire. Poll under the mutex (short sleep) or use the condition variable. Also assert closed/error remain 0 during the test.
Apply:
- // Verify both disconnected and reconnected callbacks were called - tracker.mutex.lock(); - defer tracker.mutex.unlock(); - try testing.expectEqual(@as(u32, 1), tracker.disconnected_called); - try testing.expectEqual(@as(u32, 1), tracker.reconnected_called); + // Wait up to 2s for both callbacks to fire + var cb_timer = try std.time.Timer.start(); + while (true) { + tracker.mutex.lock(); + const d = tracker.disconnected_called; + const r = tracker.reconnected_called; + tracker.mutex.unlock(); + if (d >= 1 and r >= 1) break; + if (cb_timer.read() >= 2000 * std.time.ns_per_ms) { + return error.CallbacksNotObserved; + } + std.time.sleep(10 * std.time.ns_per_ms); + } + tracker.mutex.lock(); + defer tracker.mutex.unlock(); + try testing.expect(tracker.disconnected_called >= 1); + try testing.expect(tracker.reconnected_called >= 1); + try testing.expectEqual(@as(u32, 0), tracker.closed_called); + try testing.expectEqual(@as(u32, 0), tracker.error_called);
72-83: Make reconnection timing test-friendly (short wait, bounded attempts)
Add to thereconnectblock to avoid long stalls in tests:.reconnect = .{ .allow_reconnect = true, + .max_reconnect = 50, // keep trying for a bit + .reconnect_wait_ms = 200, // 200ms between attempts },
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (2)
tests/callbacks_test.zig(1 hunks)tests/reconnection_test.zig(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- tests/callbacks_test.zig
🧰 Additional context used
📓 Path-based instructions (1)
**/*.zig
📄 CodeRabbit inference engine (CLAUDE.md)
Format code using
zig fmtbefore commit
Files:
tests/reconnection_test.zig
🧠 Learnings (3)
📚 Learning: 2025-08-30T07:40:28.186Z
Learnt from: lalinsky
PR: lalinsky/nats.zig#60
File: src/queue.zig:390-393
Timestamp: 2025-08-30T07:40:28.186Z
Learning: In the nats.zig project, the maintainer prefers to rely on Zig's built-in integer overflow detection rather than implementing manual saturated conversions for timeout calculations (timeout_ms * std.time.ns_per_ms). Zig automatically fails on integer overflow in Debug and ReleaseSafe modes.
Applied to files:
tests/reconnection_test.zig
📚 Learning: 2025-08-30T18:48:28.333Z
Learnt from: lalinsky
PR: lalinsky/nats.zig#66
File: src/connection.zig:317-323
Timestamp: 2025-08-30T18:48:28.333Z
Learning: In src/connection.zig, the establishConnection() call in connectToServer() is during initial connection setup where no race conditions exist - the connection starts disconnected and other threads would see the connecting status and wait appropriately.
Applied to files:
tests/reconnection_test.zig
📚 Learning: 2025-08-25T14:51:33.570Z
Learnt from: lalinsky
PR: lalinsky/nats.zig#31
File: tests/jetstream_pull_test.zig:21-23
Timestamp: 2025-08-25T14:51:33.570Z
Learning: The nats.zig project uses a "tests:beforeEach" test in tests/all_tests.zig that runs before each test to clean up all JetStream streams and consumers, ensuring test isolation. This eliminates the need for unique test resource names as each test starts with a clean JetStream state.
Applied to files:
tests/reconnection_test.zig
🔇 Additional comments (4)
tests/reconnection_test.zig (4)
6-7: OK to alias std.testingImport alias looks fine and keeps call sites concise.
10-23: Tracker layout/reset looks solidCounters + mutex/cond with a simple reset is all you need here.
25-41: Signal-after-increment under the lock is correctYou increment then signal while still holding the mutex, which guarantees observers don’t miss the wakeup.
93-103: Reconnection wait loop: goodTimer + sleep with a clear 10s cap is reasonable here.
This PR adds comprehensive tests for all supported connection lifecycle callbacks in the NATS.zig library.
Changes
Fixes #68
Generated with Claude Code