Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/connection.zig
Original file line number Diff line number Diff line change
Expand Up @@ -1091,7 +1091,7 @@ pub const Connection = struct {
self.flusher_stop = false;
self.flusher_signaled = false;
self.mutex.unlock();

self.flusher_thread = std.Thread.spawn(.{}, flusherLoop, .{self}) catch |err| {
log.err("Failed to restart flusher thread: {}", .{err});
self.triggerReconnect(err);
Expand Down
116 changes: 46 additions & 70 deletions src/server_pool.zig
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
const std = @import("std");
const Allocator = std.mem.Allocator;
const ArrayList = std.ArrayList;
const StringArrayHashMapUnmanaged = std.StringArrayHashMapUnmanaged;
const Url = @import("url.zig").Url;

pub const ConnectionError = error{
Expand All @@ -11,6 +12,7 @@ pub const ConnectionError = error{
// Server structure matching C library's natsSrv
pub const Server = struct {
parsed_url: Url, // Parsed URL containing all components
key: []const u8, // Host:port key for hash map lookups
did_connect: bool = false, // Has successfully connected
is_implicit: bool = false, // Discovered via INFO vs explicit
reconnects: u32 = 0, // Number of reconnection attempts
Expand All @@ -20,55 +22,52 @@ pub const Server = struct {

pub fn init(allocator: Allocator, url_str: []const u8, is_implicit: bool) !Server {
var parsed_url = Url.parse(allocator, url_str) catch return ConnectionError.InvalidUrl;
errdefer parsed_url.deinit();

if (!std.mem.eql(u8, parsed_url.scheme, "nats")) {
parsed_url.deinit();
return ConnectionError.InvalidUrl;
}

const key = try std.fmt.allocPrint(allocator, "{s}:{d}", .{ parsed_url.host, parsed_url.port });
errdefer allocator.free(key);

return Server{
.parsed_url = parsed_url,
.key = key,
.is_implicit = is_implicit,
};
}

pub fn deinit(self: *Server, allocator: Allocator) void {
self.parsed_url.deinit();
allocator.free(self.key);
if (self.tls_name) |t| allocator.free(t);
}
};

// Server pool matching C library's natsSrvPool
pub const ServerPool = struct {
servers: ArrayList(*Server), // Dynamic array of server pointers (like C)
urls: std.StringHashMap(void), // Hash map for O(1) duplicate detection
servers: StringArrayHashMapUnmanaged(*Server), // Combined hash map with insertion order (host:port -> *Server)
randomize: bool = false, // Whether to randomize order
default_user: ?[]const u8 = null, // Default username from first explicit URL
default_pwd: ?[]const u8 = null, // Default password from first explicit URL
allocator: Allocator,

pub fn init(allocator: Allocator) ServerPool {
return ServerPool{
.servers = ArrayList(*Server).init(allocator),
.urls = std.StringHashMap(void).init(allocator),
.servers = StringArrayHashMapUnmanaged(*Server){},
.allocator = allocator,
};
}

pub fn deinit(self: *ServerPool) void {
// Free all servers
for (self.servers.items) |server| {
server.deinit(self.allocator);
self.allocator.destroy(server);
}
self.servers.deinit();

// Free all URL strings stored in hash map
var iterator = self.urls.iterator();
var iterator = self.servers.iterator();
while (iterator.next()) |entry| {
self.allocator.free(entry.key_ptr.*);
entry.value_ptr.*.deinit(self.allocator);
self.allocator.destroy(entry.value_ptr.*);
}
self.urls.deinit();
self.servers.deinit(self.allocator);

if (self.default_user) |u| self.allocator.free(u);
if (self.default_pwd) |p| self.allocator.free(p);
Expand All @@ -82,18 +81,18 @@ pub const ServerPool = struct {
server.* = try Server.init(self.allocator, url_str, is_implicit);
errdefer server.deinit(self.allocator);

const bare_url = try std.fmt.allocPrint(self.allocator, "{s}:{d}", .{ server.parsed_url.host, server.parsed_url.port });
errdefer self.allocator.free(bare_url);

const result = try self.urls.getOrPut(bare_url);
// Use the key stored in the server
const result = try self.servers.getOrPut(self.allocator, server.key);
if (result.found_existing) {
// We already have a server with the same host:port - clean up duplicate
self.allocator.free(bare_url);
server.deinit(self.allocator);
self.allocator.destroy(server);
return;
}

// Store the server
result.value_ptr.* = server;

// Set default user/pwd from first explicit server (like C library)
if (!is_implicit and self.default_user == null) {
if (server.parsed_url.username) |u| {
Expand All @@ -103,84 +102,61 @@ pub const ServerPool = struct {
self.default_pwd = try self.allocator.dupe(u8, p);
}
}

try self.servers.append(server);
}

pub fn getCurrentServer(self: *ServerPool, current_server: ?*const Server) ?usize {
if (current_server == null) return null;

// Find current server's position in pool (like C library)
for (self.servers.items, 0..) |server, i| {
if (server == current_server) {
return i;
}
}

return null;
}

// Core server selection algorithm matching C library's natsSrvPool_GetNextServer
pub fn getNextServer(self: *ServerPool, max_reconnect: i32, current_server: ?*const Server) !?*Server {
// Get current server index (like C library line 88)
const current_index = self.getCurrentServer(current_server);
if (current_index == null) {
if (current_server == null) {
// Initial connection case - C library would return pool->srvrs[0] directly
if (self.servers.items.len == 0) return null;
return self.servers.items[0];
return self.getFirstServer();
}

const i = current_index.?;
const current_srv = self.servers.items[i];

// Shift left servers past current to the current's position (like C library lines 92-94)
var j = i;
while (j < self.servers.items.len - 1) : (j += 1) {
self.servers.items[j] = self.servers.items[j + 1];
}
const current_srv = current_server.?;

// Decide server fate based on reconnect attempts (like C library lines 96-107)
if (max_reconnect < 0 or current_srv.reconnects < max_reconnect) {
// Move the current server to the back of the list (like C library line 100)
self.servers.items[self.servers.items.len - 1] = current_srv;
// Remove current server and move it to back (like C library)
// Remove using the stored key and get the mutable server back
const removed_server = self.servers.fetchSwapRemove(current_srv.key).?;

// Add current server back at the end - capacity guaranteed since we just removed
self.servers.putAssumeCapacity(removed_server.key, removed_server.value);
} else {
// Remove the server from the list (like C library lines 104-106)
// Remove from hash map using host:port key with fixed buffer
var bare_url_buf: [256]u8 = undefined;
const bare_url = std.fmt.bufPrint(&bare_url_buf, "{s}:{d}", .{ current_srv.parsed_url.host, current_srv.parsed_url.port }) catch unreachable;

if (self.urls.fetchRemove(bare_url)) |kv| {
self.allocator.free(kv.key); // Free the duplicated host:port string
}
current_srv.deinit(self.allocator);
self.allocator.destroy(current_srv);
_ = self.servers.pop();
// Remove the server permanently (like C library lines 104-106)
const removed_server = self.servers.fetchSwapRemove(current_srv.key).?;
removed_server.value.deinit(self.allocator);
self.allocator.destroy(removed_server.value);
}
Comment on lines +125 to 129
Copy link
Contributor

@coderabbitai coderabbitai bot Aug 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Memory leak on permanent removal: free removed key slice

When a server exceeds max_reconnect, you destroy the *Server but don’t free the map-owned key returned by fetchSwapRemove. With StringArrayHashMapUnmanaged, the removed key’s memory becomes caller-owned and must be freed. Add a free to avoid leaking the key.

         } else {
             // Remove the server permanently (like C library lines 104-106)
-            const removed_server = self.servers.fetchSwapRemove(current_srv.key).?;
-            removed_server.value.deinit(self.allocator);
-            self.allocator.destroy(removed_server.value);
+            const removed_server = self.servers.fetchSwapRemove(current_srv.key).?;
+            // Free the key slice that was owned by the map
+            self.allocator.free(removed_server.key);
+            // Tear down the server
+            removed_server.value.deinit(self.allocator);
+            self.allocator.destroy(removed_server.value);
         }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// Remove the server permanently (like C library lines 104-106)
const removed_server = self.servers.fetchSwapRemove(current_srv.key).?;
removed_server.value.deinit(self.allocator);
self.allocator.destroy(removed_server.value);
}
} else {
// Remove the server permanently (like C library lines 104-106)
const removed_server = self.servers.fetchSwapRemove(current_srv.key).?;
// Free the key slice that was owned by the map
self.allocator.free(removed_server.key);
// Tear down the server
removed_server.value.deinit(self.allocator);
self.allocator.destroy(removed_server.value);
}
🤖 Prompt for AI Agents
In src/server_pool.zig around lines 125 to 129, when permanently removing a
server you call self.servers.fetchSwapRemove(current_srv.key) and deinit/destroy
the removed_server.value but forget to free the map-owned key slice returned by
fetchSwapRemove; for StringArrayHashMapUnmanaged the removed key becomes
caller-owned and must be freed. After obtaining removed_server from
fetchSwapRemove and before leaving the block, call the allocator.free on the
removed_server.key (or appropriate slice free API) to release the key memory,
ensuring you free it only once and handle any null/empty cases safely.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

    pub fn deinit(self: *Server, allocator: Allocator) void {
        self.parsed_url.deinit();
        allocator.free(self.key);
        if (self.tls_name) |t| allocator.free(t);
    }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lalinsky You're absolutely right! I apologize for the confusion. Looking at the Server.deinit() method, I can see that allocator.free(self.key) is already being called, which means the key memory is properly freed when removed_server.value.deinit(self.allocator) executes.

The memory management is correct as implemented:

  1. fetchSwapRemove() returns the key-value pair where removed_server.key points to the same memory as removed_server.value.key
  2. When removed_server.value.deinit() is called, it frees the key via allocator.free(self.key)
  3. No additional free() call is needed on the removed key

Thank you for the clarification - the code handles memory cleanup properly through the Server's deinit method.


// Return first server in list if pool not empty (like C library lines 109-112)
if (self.servers.items.len == 0) return null;

return self.servers.items[0];
return self.getFirstServer();
}

pub fn getSize(self: *ServerPool) usize {
return self.servers.items.len;
return self.servers.count();
}

pub fn getFirstServer(self: *ServerPool) ?*Server {
const values = self.servers.values();
return if (values.len > 0) values[0] else null;
}

// Shuffle servers in pool (like C library's _shufflePool)
pub fn shuffle(self: *ServerPool, offset: usize) void {
if (self.servers.items.len <= offset + 1) return;
if (self.servers.count() <= offset + 1) return;

var prng = std.rand.DefaultPrng.init(@bitCast(std.time.nanoTimestamp()));
const random = prng.random();

// Shuffle the underlying entries directly
var i = offset;
while (i < self.servers.items.len) : (i += 1) {
while (i < self.servers.entries.len) : (i += 1) {
const j = offset + random.uintLessThan(usize, i + 1 - offset);
const tmp = self.servers.items[i];
self.servers.items[i] = self.servers.items[j];
self.servers.items[j] = tmp;
self.servers.entries.swap(i, j);
}

// Rebuild the hash index
self.servers.reIndex(self.allocator);
}
};

Expand Down