Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions src/connection.zig
Original file line number Diff line number Diff line change
Expand Up @@ -1079,6 +1079,25 @@ pub const Connection = struct {

self.mutex.unlock(); // Release before operations that don't need mutex

// Restart reader thread for the new connection
self.reader_thread = std.Thread.spawn(.{}, readerLoop, .{self}) catch |err| {
log.err("Failed to restart reader thread: {}", .{err});
self.triggerReconnect(err);
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@claude Can we safely do this? Wouldn't it trigger infinite recursion?

Copy link
Contributor

@claude claude bot Aug 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Claude finished @lalinsky's task —— View job


Analyzing Infinite Recursion Concern

Good question! Let me analyze the code flow to determine if calling self.triggerReconnect(err) could cause infinite recursion.

Todo List:

  • Read src/connection.zig to understand the reconnection flow
  • Examine the triggerReconnect method implementation
  • Trace through the call stack to identify potential recursion
  • Provide technical analysis and recommendation

Analysis of Call Stack:

Looking at line 1085, the concern is about this call:

self.reader_thread = std.Thread.spawn(.{}, readerLoop, .{self}) catch |err| {
    log.err("Failed to restart reader thread: {}", .{err});
    self.triggerReconnect(err);  // ← Line 1085
    continue; // Try next server
};

Potential recursion path:

  1. doReconnect() (line 983) spawns reader thread
  2. Reader thread spawn fails → calls triggerReconnect(err) (line 1085)
  3. triggerReconnect() (line 932) spawns new reconnect thread
  4. New doReconnect() tries to spawn reader thread again
  5. If spawn keeps failing → infinite recursion

Key Safety Mechanisms:

✅ SAFE - No infinite recursion possible due to:

  1. in_reconnect guard (lines 939-942):

    if (!self.options.reconnect.allow_reconnect or
        self.status != .connected or
        self.in_reconnect > 0)  // ← Prevents multiple reconnection threads
    {
        return; // Already reconnecting
    }
  2. State protection: When triggerReconnect() is called from line 1085:

    • We're already IN the doReconnect() function
    • self.in_reconnect is already > 0 (set at line 980)
    • The second triggerReconnect() call will immediately return at line 941
  3. Thread cleanup: The existing reconnection thread continues with continue and tries the next server

Flow Analysis:

doReconnect() starts → in_reconnect = 1
├─ Try to spawn reader_thread
├─ FAIL → triggerReconnect(err)
│   └─ Checks in_reconnect > 0 → RETURNS IMMEDIATELY ✅
└─ continue → try next server

Conclusion:

The code is safe - infinite recursion is prevented by the in_reconnect counter. When triggerReconnect() is called from within doReconnect(), it will detect that a reconnection is already in progress and return immediately.

The current approach correctly handles thread spawn failures by:

  1. Logging the error
  2. Attempting to trigger reconnection (which safely no-ops)
  3. Continuing to try the next server in the pool

continue; // Try next server
};

// Restart flusher thread for the new connection
self.mutex.lock();
self.flusher_stop = false;
self.flusher_signaled = false;
self.mutex.unlock();

self.flusher_thread = std.Thread.spawn(.{}, flusherLoop, .{self}) catch |err| {
log.err("Failed to restart flusher thread: {}", .{err});
self.triggerReconnect(err);
continue; // Try next server
};

// Re-establish subscriptions (outside mutex like C library)
self.resendSubscriptions() catch |err| {
log.err("Failed to re-establish subscriptions: {}", .{err});
Expand Down
6 changes: 3 additions & 3 deletions tests/reconnection_test.zig
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ test "basic reconnection when server stops" {
try utils.runDockerCompose(std.testing.allocator, &.{ "restart", "nats-1" });

// Verify connection works after reconnection
// log.debug("Trying to publish after reconnection", .{});
// try nc.publish("test.after", "hello after reconnection");
// try nc.flush();
log.debug("Trying to publish after reconnection", .{});
try nc.publish("test.after", "hello after reconnection");
try nc.flush();
}