Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ jobs:
- name: Setup Zig
uses: mlugg/setup-zig@v2
with:
version: 0.14.1
version: 0.15.1

- name: Install libnats
run: sudo apt-get update && sudo apt-get install libnats-dev
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/claude.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ jobs:
- name: Setup Zig
uses: mlugg/setup-zig@v2
with:
version: 0.14.1
version: 0.15.1

- name: Install libnats
run: sudo apt-get update && sudo apt-get install libnats-dev
Expand Down
4 changes: 2 additions & 2 deletions benchmarks/bench_util.zig
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ pub const BenchStats = struct {
}
};

fn benchSignalHandler(sig_num: c_int) callconv(.C) void {
fn benchSignalHandler(sig_num: i32) callconv(.c) void {
_ = sig_num;
keep_running = false;
}
Expand All @@ -81,7 +81,7 @@ fn benchSignalHandler(sig_num: c_int) callconv(.C) void {
pub fn setupSignals() !void {
const sa = std.posix.Sigaction{
.handler = .{ .handler = benchSignalHandler },
.mask = std.posix.empty_sigset,
.mask = std.posix.sigemptyset(),
.flags = 0,
};
std.posix.sigaction(std.posix.SIG.INT, &sa, null);
Expand Down
56 changes: 35 additions & 21 deletions build.zig
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,11 @@ pub fn build(b: *std.Build) void {

// Integration tests (require Docker and NATS server)
const integration_tests = b.addTest(.{
.root_source_file = b.path("tests/all_tests.zig"),
.target = target,
.optimize = optimize,
.root_module = b.createModule(.{
.root_source_file = b.path("tests/all_tests.zig"),
.target = target,
.optimize = optimize,
}),
.test_runner = .{ .path = b.path("test_runner.zig"), .mode = .simple },
});
integration_tests.root_module.addImport("nats", lib_mod);
Expand Down Expand Up @@ -118,9 +120,11 @@ pub fn build(b: *std.Build) void {
for (example_files) |example_info| {
const exe = b.addExecutable(.{
.name = example_info.name,
.root_source_file = b.path(example_info.file),
.target = target,
.optimize = optimize,
.root_module = b.createModule(.{
.root_source_file = b.path(example_info.file),
.target = target,
.optimize = optimize,
}),
});
exe.root_module.addImport("nats", lib_mod);

Expand All @@ -142,9 +146,11 @@ pub fn build(b: *std.Build) void {
for (benchmark_files) |benchmark_info| {
const exe = b.addExecutable(.{
.name = benchmark_info.name,
.root_source_file = b.path(benchmark_info.file),
.target = target,
.optimize = optimize,
.root_module = b.createModule(.{
.root_source_file = b.path(benchmark_info.file),
.target = target,
.optimize = optimize,
}),
});
exe.root_module.addImport("nats", lib_mod);

Expand All @@ -156,9 +162,11 @@ pub fn build(b: *std.Build) void {
// C benchmarks (require libnats)
const c_echo_server = b.addExecutable(.{
.name = "echo_server_c",
.root_source_file = null,
.target = target,
.optimize = optimize,
.root_module = b.createModule(.{
.root_source_file = null,
.target = target,
.optimize = optimize,
}),
});
c_echo_server.addCSourceFile(.{ .file = b.path("benchmarks/echo_server.c"), .flags = &.{} });
c_echo_server.addCSourceFile(.{ .file = b.path("benchmarks/bench_util.c"), .flags = &.{} });
Expand All @@ -167,9 +175,11 @@ pub fn build(b: *std.Build) void {

const c_echo_client = b.addExecutable(.{
.name = "echo_client_c",
.root_source_file = null,
.target = target,
.optimize = optimize,
.root_module = b.createModule(.{
.root_source_file = null,
.target = target,
.optimize = optimize,
}),
});
c_echo_client.addCSourceFile(.{ .file = b.path("benchmarks/echo_client.c"), .flags = &.{} });
c_echo_client.addCSourceFile(.{ .file = b.path("benchmarks/bench_util.c"), .flags = &.{} });
Expand All @@ -178,9 +188,11 @@ pub fn build(b: *std.Build) void {

const c_publisher = b.addExecutable(.{
.name = "publisher_c",
.root_source_file = null,
.target = target,
.optimize = optimize,
.root_module = b.createModule(.{
.root_source_file = null,
.target = target,
.optimize = optimize,
}),
});
c_publisher.addCSourceFile(.{ .file = b.path("benchmarks/publisher.c"), .flags = &.{} });
c_publisher.addCSourceFile(.{ .file = b.path("benchmarks/bench_util.c"), .flags = &.{} });
Expand All @@ -189,9 +201,11 @@ pub fn build(b: *std.Build) void {

const c_subscriber = b.addExecutable(.{
.name = "subscriber_c",
.root_source_file = null,
.target = target,
.optimize = optimize,
.root_module = b.createModule(.{
.root_source_file = null,
.target = target,
.optimize = optimize,
}),
});
c_subscriber.addCSourceFile(.{ .file = b.path("benchmarks/subscriber.c"), .flags = &.{} });
c_subscriber.addCSourceFile(.{ .file = b.path("benchmarks/bench_util.c"), .flags = &.{} });
Expand Down
2 changes: 1 addition & 1 deletion build.zig.zon
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@

// Tracks the earliest Zig version that the package considers to be a
// supported use case.
.minimum_zig_version = "0.14.1",
.minimum_zig_version = "0.15.1",

// This field is optional.
// Each dependency must either provide a `url` and `hash`, or a `path`.
Expand Down
2 changes: 1 addition & 1 deletion examples/sub_async.zig
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ pub fn main() !void {
std.log.info("Subscribed with callback handler. Waiting for messages (10 seconds)...", .{});

// Keep the program running to receive messages
std.time.sleep(10 * std.time.ns_per_s);
std.Thread.sleep(10 * std.time.ns_per_s);

std.log.info("Shutting down after receiving {} messages", .{counter});
}
39 changes: 21 additions & 18 deletions src/connection.zig
Original file line number Diff line number Diff line change
Expand Up @@ -630,10 +630,10 @@ pub const Connection = struct {
defer self.resetScratch();

// TODO pre-allocate headers_buffer
var headers_buffer = ArrayList(u8).init(allocator);
defer headers_buffer.deinit();
var headers_buffer = ArrayList(u8){};
defer headers_buffer.deinit(allocator);

try msg.encodeHeaders(headers_buffer.writer());
try msg.encodeHeaders(headers_buffer.writer(allocator));
const headers_len = headers_buffer.items.len;

const total_payload = headers_len + msg.data.len;
Expand Down Expand Up @@ -711,11 +711,12 @@ pub const Connection = struct {
const allocator = self.scratch.allocator();
defer self.resetScratch();

var buffer = ArrayList(u8).init(allocator);
var buffer = ArrayList(u8){};
defer buffer.deinit(allocator);
if (sub.queue) |group| {
try buffer.writer().print("SUB {s} {s} {d}\r\n", .{ sub.subject, group, sub.sid });
try buffer.writer(allocator).print("SUB {s} {s} {d}\r\n", .{ sub.subject, group, sub.sid });
} else {
try buffer.writer().print("SUB {s} {d}\r\n", .{ sub.subject, sub.sid });
try buffer.writer(allocator).print("SUB {s} {d}\r\n", .{ sub.subject, sub.sid });
}
try self.write_buffer.append(buffer.items);
}
Expand Down Expand Up @@ -1226,7 +1227,8 @@ pub const Connection = struct {
defer self.resetScratch();

// Build CONNECT message with all options
var buffer = ArrayList(u8).init(allocator);
var buffer = ArrayList(u8){};
defer buffer.deinit(allocator);

// Calculate effective no_responders: enable if server supports headers
const no_responders = self.options.no_responders and self.server_info.headers;
Expand All @@ -1253,10 +1255,10 @@ pub const Connection = struct {
.auth_token = auth_token,
};

try buffer.writer().writeAll("CONNECT ");
try std.json.stringify(connect_obj, .{}, buffer.writer());
try buffer.writer().writeAll("\r\n");
try buffer.writer().writeAll("PING\r\n");
try buffer.writer(allocator).writeAll("CONNECT ");
try std.fmt.format(buffer.writer(allocator), "{f}", .{std.json.fmt(connect_obj, .{})});
try buffer.writer(allocator).writeAll("\r\n");
try buffer.writer(allocator).writeAll("PING\r\n");

// Send via buffer (mutex already held)
try self.write_buffer.append(buffer.items);
Expand Down Expand Up @@ -1738,8 +1740,8 @@ pub const Connection = struct {
log.debug("Re-establishing subscriptions", .{});

// Track SIDs that shouldn't be re-subscribed and must be removed
var to_remove = ArrayList(u64).init(self.allocator);
defer to_remove.deinit();
var to_remove = ArrayList(u64){};
defer to_remove.deinit(self.allocator);

{
self.subs_mutex.lock();
Expand All @@ -1748,7 +1750,8 @@ pub const Connection = struct {
const allocator = self.scratch.allocator();
defer self.resetScratch();

var buffer = ArrayList(u8).init(allocator);
var buffer = ArrayList(u8){};
defer buffer.deinit(allocator);

var iter = self.subscriptions.iterator();
while (iter.next()) |entry| {
Expand All @@ -1765,21 +1768,21 @@ pub const Connection = struct {
} else {
// Already reached limit - don't re-subscribe; remove after unlock
log.debug("Subscription {d} ({s}) already reached limit; will remove during reconnect", .{ sub.sid, sub.subject });
try to_remove.append(sub.sid);
try to_remove.append(self.allocator, sub.sid);
continue;
}
}

// Send SUB command
if (sub.queue) |queue| {
try buffer.writer().print("SUB {s} {s} {d}\r\n", .{ sub.subject, queue, sub.sid });
try buffer.writer(allocator).print("SUB {s} {s} {d}\r\n", .{ sub.subject, queue, sub.sid });
} else {
try buffer.writer().print("SUB {s} {d}\r\n", .{ sub.subject, sub.sid });
try buffer.writer(allocator).print("SUB {s} {d}\r\n", .{ sub.subject, sub.sid });
}

// Send UNSUB with remaining limit if needed
if (adjusted_max) |remaining| {
try buffer.writer().print("UNSUB {d} {d}\r\n", .{ sub.sid, remaining });
try buffer.writer(allocator).print("UNSUB {d} {d}\r\n", .{ sub.sid, remaining });
log.debug("Re-subscribed to {s} with sid {d} and autounsubscribe limit {d} (delivered: {d})", .{ sub.subject, sub.sid, remaining, delivered });
} else {
log.debug("Re-subscribed to {s} with sid {d}", .{ sub.subject, sub.sid });
Expand Down
35 changes: 22 additions & 13 deletions src/jetstream.zig
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,15 @@ const Result = @import("result.zig").Result;

const log = @import("log.zig").log;

// Helper function to replace jsonStringifyAlloc
fn jsonStringifyAlloc(allocator: std.mem.Allocator, value: anytype, options: std.json.Stringify.Options) ![]u8 {
var buffer = std.ArrayList(u8){};
defer buffer.deinit(allocator);

try std.fmt.format(buffer.writer(allocator), "{f}", .{std.json.fmt(value, options)});
return buffer.toOwnedSlice(allocator);
}

// Re-export JetStream message types
pub const JetStreamMessage = jetstream_message.JetStreamMessage;
pub const MsgMetadata = jetstream_message.MsgMetadata;
Expand Down Expand Up @@ -438,7 +447,7 @@ pub const PullSubscription = struct {
};

// Serialize the fetch request to JSON
const request_json = try std.json.stringifyAlloc(self.js.nc.allocator, request, .{
const request_json = try jsonStringifyAlloc(self.js.nc.allocator, request, .{
.emit_null_optional_fields = false,
});
defer self.js.nc.allocator.free(request_json);
Expand All @@ -451,8 +460,8 @@ pub const PullSubscription = struct {
try self.js.nc.publishRequest(api_subject, reply_subject, request_json);

// Collect messages
var messages = std.ArrayList(*JetStreamMessage).init(self.js.nc.allocator);
defer messages.deinit();
var messages = std.ArrayList(*JetStreamMessage){};
defer messages.deinit(self.js.nc.allocator);

var batch_complete = false;
var fetch_error: ?anyerror = null;
Expand Down Expand Up @@ -496,7 +505,7 @@ pub const PullSubscription = struct {
const js_msg_ptr = try jetstream_message.createJetStreamMessage(self.js.nc, raw_msg);
errdefer js_msg_ptr.deinit();

try messages.append(js_msg_ptr);
try messages.append(self.js.nc.allocator, js_msg_ptr);
}
} else |err| switch (err) {
error.Timeout => {
Expand All @@ -507,7 +516,7 @@ pub const PullSubscription = struct {
}

// Convert ArrayList to owned slice
const messages_slice = try messages.toOwnedSlice();
const messages_slice = try messages.toOwnedSlice(self.js.nc.allocator);

return MessageBatch{
.messages = messages_slice,
Expand Down Expand Up @@ -622,7 +631,7 @@ pub const JetStream = struct {
subject: []const u8,
}{ .subject = subject };

const request_json = try std.json.stringifyAlloc(self.nc.allocator, request_payload, .{
const request_json = try jsonStringifyAlloc(self.nc.allocator, request_payload, .{
.emit_null_optional_fields = false,
});
defer self.nc.allocator.free(request_json);
Expand Down Expand Up @@ -774,7 +783,7 @@ pub const JetStream = struct {
defer self.nc.allocator.free(subject);

// Serialize the config to JSON
const config_json = try std.json.stringifyAlloc(self.nc.allocator, config, .{});
const config_json = try jsonStringifyAlloc(self.nc.allocator, config, .{});
defer self.nc.allocator.free(config_json);

const msg = try self.sendRequest(subject, config_json);
Expand All @@ -795,7 +804,7 @@ pub const JetStream = struct {
defer self.nc.allocator.free(subject);

// Serialize the config to JSON
const config_json = try std.json.stringifyAlloc(self.nc.allocator, config, .{});
const config_json = try jsonStringifyAlloc(self.nc.allocator, config, .{});
defer self.nc.allocator.free(config_json);

const msg = try self.sendRequest(subject, config_json);
Expand Down Expand Up @@ -901,7 +910,7 @@ pub const JetStream = struct {
config: ConsumerConfig,
}{ .stream_name = stream_name, .config = config };

const config_json = try std.json.stringifyAlloc(self.nc.allocator, request_payload, .{
const config_json = try jsonStringifyAlloc(self.nc.allocator, request_payload, .{
.emit_null_optional_fields = false,
});
defer self.nc.allocator.free(config_json);
Expand Down Expand Up @@ -949,7 +958,7 @@ pub const JetStream = struct {
const subject = try std.fmt.allocPrint(self.nc.allocator, "STREAM.PURGE.{s}", .{stream_name});
defer self.nc.allocator.free(subject);

const request_json = try std.json.stringifyAlloc(self.nc.allocator, request, .{});
const request_json = try jsonStringifyAlloc(self.nc.allocator, request, .{});
defer self.nc.allocator.free(request_json);

const msg = try self.sendRequest(subject, request_json);
Expand All @@ -976,7 +985,7 @@ pub const JetStream = struct {
};

// Serialize the request to JSON, omitting null fields
const request_json = try std.json.stringifyAlloc(self.nc.allocator, request, .{
const request_json = try jsonStringifyAlloc(self.nc.allocator, request, .{
.emit_null_optional_fields = false,
});
defer self.nc.allocator.free(request_json);
Expand Down Expand Up @@ -1080,7 +1089,7 @@ pub const JetStream = struct {
};

// Serialize the request to JSON, omitting null fields
const request_json = try std.json.stringifyAlloc(self.nc.allocator, request, .{
const request_json = try jsonStringifyAlloc(self.nc.allocator, request, .{
.emit_null_optional_fields = false,
});
defer self.nc.allocator.free(request_json);
Expand Down Expand Up @@ -1118,7 +1127,7 @@ pub const JetStream = struct {
defer self.nc.allocator.free(subject);

// Serialize the request to JSON
const request_json = try std.json.stringifyAlloc(self.nc.allocator, request, .{});
const request_json = try jsonStringifyAlloc(self.nc.allocator, request, .{});
defer self.nc.allocator.free(request_json);

const msg = try self.sendRequest(subject, request_json);
Expand Down
2 changes: 1 addition & 1 deletion src/jetstream_kv.zig
Original file line number Diff line number Diff line change
Expand Up @@ -736,7 +736,7 @@ pub const KV = struct {
for (filter_subjects.items) |subject| {
self.js.nc.allocator.free(subject);
}
filter_subjects.deinit();
filter_subjects.deinit(self.js.nc.allocator);
}

// Convert filters to full subjects
Expand Down
Loading