Skip to content
Merged
19 changes: 12 additions & 7 deletions src/connection.zig
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ const server_pool_mod = @import("server_pool.zig");
const ServerPool = server_pool_mod.ServerPool;
const Server = server_pool_mod.Server;
const net_utils = @import("net_utils.zig");
const jetstream_mod = @import("jetstream.zig");
const JetStream = jetstream_mod.JetStream;
const JetStreamOptions = jetstream_mod.JetStreamOptions;

const log = std.log.scoped(.connection);

Expand Down Expand Up @@ -146,6 +149,7 @@ pub const ConnectionOptions = struct {
send_asap: bool = false,
reconnect: ReconnectOptions = .{},
callbacks: ConnectionCallbacks = .{},
trace: bool = false,
};

pub const Connection = struct {
Expand Down Expand Up @@ -602,12 +606,8 @@ pub const Connection = struct {
}

pub fn request(self: *Self, subject: []const u8, data: []const u8, timeout_ms: u64) !?*Message {
// Lock immediately like C library
self.mutex.lock();
defer self.mutex.unlock();

if (self.status != .connected) {
return ConnectionError.ConnectionClosed;
if (self.options.trace) {
log.debug("Sending request to {s} with timeout {d}ms", .{ subject, timeout_ms });
}

// 1. Create unique inbox
Expand Down Expand Up @@ -1091,7 +1091,7 @@ pub const Connection = struct {
self.flusher_stop = false;
self.flusher_signaled = false;
self.mutex.unlock();

self.flusher_thread = std.Thread.spawn(.{}, flusherLoop, .{self}) catch |err| {
log.err("Failed to restart flusher thread: {}", .{err});
self.triggerReconnect(err);
Expand Down Expand Up @@ -1210,4 +1210,9 @@ pub const Connection = struct {
log.debug("Added discovered server: {s}", .{url});
}
}

// JetStream support
pub fn jetstream(self: *Self, options: JetStreamOptions) JetStream {
return JetStream.init(self.allocator, self, options);
}
};
120 changes: 120 additions & 0 deletions src/jetstream.zig
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
const std = @import("std");
const Message = @import("message.zig").Message;
const Connection = @import("connection.zig").Connection;

const log = std.log.scoped(.jetstream);

const default_api_prefix = "$JS.API.";
const default_request_timeout_ms = 5000;

const ErrorResponse = struct {
@"error": ?struct {
/// HTTP like error code in the 300 to 500 range
code: u16,
/// A human friendly description of the error
description: []const u8 = "",
/// The NATS error code unique to each kind of error
err_code: u16 = 0,
},
};

const AccountInfoResponse = struct {
/// Memory Storage being used for Stream Message storage
memory: u64,
/// File Storage being used for Stream Message storage
storage: u64,
// Number of active Streams
streams: u32,
/// Number of active Consumers
consumers: u32,
};

/// Response from $JS.API.STREAM.NAMES
const StreamNamesResponse = struct {
total: u64,
offset: u64,
limit: u64,
streams: ?[]const []const u8,
};

pub const JetStreamOptions = struct {
request_timeout_ms: u64 = default_request_timeout_ms,
// Add options here
};

pub const Result = std.json.Parsed;

pub const JetStream = struct {
allocator: std.mem.Allocator,
nc: *Connection,
opts: JetStreamOptions,

pub fn init(allocator: std.mem.Allocator, nc: *Connection, options: JetStreamOptions) JetStream {
return .{
.allocator = allocator,
.nc = nc,
.opts = options,
};
}

pub fn deinit(self: *JetStream) void {
_ = self;
}

fn sendRequest(self: *JetStream, comptime method: []const u8, payload: []const u8) !*Message {
return try self.nc.request(default_api_prefix ++ method, payload, self.opts.request_timeout_ms) orelse {
return error.NoResponse;
};
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Normalize timeout handling to use the new JetStreamError.NoResponse.

Currently returns error.NoResponse, which is anonymous and inconsistent with the public error set. Switch to the exported error for clearer matching and consistency.

Apply this diff:

-    fn sendRequest(self: *JetStream, comptime method: []const u8, payload: []const u8) !*Message {
-        return try self.nc.request(default_api_prefix ++ method, payload, self.opts.request_timeout_ms) orelse {
-            return error.NoResponse;
-        };
-    }
+    fn sendRequest(self: *JetStream, comptime method: []const u8, payload: []const u8) !*Message {
+        const msg = try self.nc.request(default_api_prefix ++ method, payload, self.opts.request_timeout_ms)
+            orelse return JetStreamError.NoResponse;
+        return msg;
+    }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
fn sendRequest(self: *JetStream, comptime method: []const u8, payload: []const u8) !*Message {
return try self.nc.request(default_api_prefix ++ method, payload, self.opts.request_timeout_ms) orelse {
return error.NoResponse;
};
}
fn sendRequest(self: *JetStream, comptime method: []const u8, payload: []const u8) !*Message {
const msg = try self.nc.request(default_api_prefix ++ method, payload, self.opts.request_timeout_ms)
orelse return JetStreamError.NoResponse;
return msg;
}
🤖 Prompt for AI Agents
In src/jetstream.zig around lines 64 to 68, the function currently returns the
anonymous error.NoResponse; change it to return the exported
JetStreamError.NoResponse instead for consistent public error handling. Modify
the orelse branch to return JetStreamError.NoResponse (ensure JetStreamError is
in scope or fully qualify it) so callers can match against the public error set.


/// Parse an error response from the server, if present.
fn maybeParseErrorResponse(self: *JetStream, msg: *Message) !void {
if (std.mem.indexOfPos(u8, msg.data, 0, "error") != null) {
// this should not allocate any memory, so we don't need to clean up
const response = std.json.parseFromSliceLeaky(ErrorResponse, self.allocator, msg.data, .{
.allocate = .alloc_if_needed,
.ignore_unknown_fields = true,
}) catch return;
log.err("JetStream error: {any}", .{response.@"error"});
// TODO: Handle specific error cases
return error.JetStreamError;
}
}

/// Parse a response from the server, handling errors if present.
fn parseResponse(self: *JetStream, comptime T: type, msg: *Message) !Result(T) {
try self.maybeParseErrorResponse(msg);

return try std.json.parseFromSlice(T, self.allocator, msg.data, .{
.allocate = .alloc_always,
.ignore_unknown_fields = true,
});
}

// Retrieves stats and limits for the connected user's account.
pub fn getAccountInfo(self: *JetStream) !Result(AccountInfoResponse) {
const msg = try self.sendRequest("INFO", "");
defer msg.deinit();

return try self.parseResponse(AccountInfoResponse, msg);
}

/// Retrieves a list of stream names.
pub fn listStreamNames(self: *JetStream) !Result([]const []const u8) {
const msg = try self.sendRequest("STREAM.NAMES", "");
defer msg.deinit();

const page_result = try self.parseResponse(StreamNamesResponse, msg);
errdefer page_result.deinit();

// TODO: handle pagination
const streams = page_result.value.streams orelse &[_][]const u8{};
std.debug.assert(page_result.value.total == streams.len);

Comment on lines +273 to +276
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Assertion is incorrect for paginated responses; will fail with valid server data.

total is the total number of streams across all pages, whereas streams.len is the count on the current page. On page 1 of N, total != streams.len.

Change the assertion to reflect the documented invariant: offset + streams.len <= total. Or drop the assert until pagination is implemented.

Apply this diff:

-        std.debug.assert(page_result.value.total == streams.len);
+        std.debug.assert(page_result.value.offset + streams.len <= page_result.value.total);
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// TODO: handle pagination
const streams = page_result.value.streams orelse &[_][]const u8{};
std.debug.assert(page_result.value.total == streams.len);
// TODO: handle pagination
const streams = page_result.value.streams orelse &[_][]const u8{};
std.debug.assert(page_result.value.offset + streams.len <= page_result.value.total);
🤖 Prompt for AI Agents
In src/jetstream.zig around lines 110 to 113, the assertion compares
page_result.value.total to streams.len which is wrong for paginated responses;
replace it with an assertion that enforces the documented invariant using the
page offset (e.g. page_result.value.offset + streams.len <=
page_result.value.total) or simply remove the assertion until pagination is
implemented so valid multi-page responses won't fail.

const result: Result([]const []const u8) = .{
.arena = page_result.arena,
.value = streams,
};
return result;
}
};
10 changes: 10 additions & 0 deletions src/root.zig
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,16 @@ pub const ServerPool = @import("server_pool.zig").ServerPool;
pub const Server = @import("server_pool.zig").Server;
pub const inbox = @import("inbox.zig");

// JetStream types
pub const JetStream = @import("jetstream.zig").JetStream;
pub const JetStreamOptions = @import("jetstream.zig").JetStreamOptions;
pub const StreamConfig = @import("jetstream.zig").StreamConfig;
pub const StreamInfo = @import("jetstream.zig").StreamInfo;
pub const Stream = @import("jetstream.zig").Stream;
pub const PubAck = @import("jetstream.zig").PubAck;
pub const AccountInfo = @import("jetstream.zig").AccountInfo;
pub const JetStreamError = @import("jetstream.zig").JetStreamError;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Broken re-export: JetStreamError is not defined in jetstream.zig.

This will not compile as @import("jetstream.zig").JetStreamError doesn’t currently exist. Either remove this re-export or define and export an error set from jetstream.zig.

Apply this minimal fix (option A: remove the re-export for now):

- pub const JetStreamError = @import("jetstream.zig").JetStreamError;

Alternatively (option B: preferred), add and export a concrete error set from src/jetstream.zig and keep this re-export. See my comment in src/jetstream.zig (Lines 7-10) for the exact addition.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
pub const JetStreamError = @import("jetstream.zig").JetStreamError;


// Removed top-level connect functions - use Connection.init() and Connection.connect() directly

// Test basic functionality
Expand Down
35 changes: 20 additions & 15 deletions test_runner.zig
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,12 @@ const LogCapture = struct {
args: anytype,
) void {
_ = level; // Suppress unused parameter warning

const scope_prefix = "(" ++ switch (scope) {
std.log.default_log_scope => @tagName(scope),
else => @tagName(scope),
} ++ "): ";

if (self.captured_log_buffer) |buf| {
// Capture to buffer during test execution
buf.writer().print(scope_prefix ++ format ++ "\n", args) catch return;
Expand All @@ -43,11 +43,11 @@ const LogCapture = struct {
stderr.print(scope_prefix ++ format ++ "\n", args) catch return;
}
}

pub fn startCapture(self: *@This(), buffer: *std.ArrayList(u8)) void {
self.captured_log_buffer = buffer;
}

pub fn stopCapture(self: *@This()) void {
self.captured_log_buffer = null;
}
Expand Down Expand Up @@ -99,7 +99,6 @@ pub fn main() !void {
var log_buffer = std.ArrayList(u8).init(allocator);
defer log_buffer.deinit();


for (builtin.test_functions) |t| {
if (isSetup(t)) {
t.func() catch |err| {
Expand All @@ -126,17 +125,21 @@ pub fn main() !void {

const friendly_name = t.name;

// Clear log buffer and start capturing logs for this test
log_buffer.clearRetainingCapacity();
log_capture.startCapture(&log_buffer);

if (env.do_log_capture) {
// Clear log buffer and start capturing logs for this test
log_buffer.clearRetainingCapacity();
log_capture.startCapture(&log_buffer);
}

current_test = friendly_name;
std.testing.allocator_instance = .{};
const result = t.func();
current_test = null;

// Stop capturing logs
log_capture.stopCapture();

if (env.do_log_capture) {
// Stop capturing logs
log_capture.stopCapture();
}

const ns_taken = slowest.endTiming(friendly_name);

Expand All @@ -156,14 +159,14 @@ pub fn main() !void {
else => {
status = .fail;
fail += 1;

printer.status(.fail, "\n{s}\n\"{s}\" - {s}\n", .{ BORDER, friendly_name, @errorName(err) });

// Print captured logs for failed tests
if (log_buffer.items.len > 0) {
printer.fmt("Test output:\n{s}", .{log_buffer.items});
}

printer.fmt("{s}\n", .{BORDER});
if (@errorReturnTrace()) |trace| {
std.debug.dumpStackTrace(trace.*);
Expand Down Expand Up @@ -319,12 +322,14 @@ const Env = struct {
verbose: bool,
fail_first: bool,
filter: ?[]const u8,
do_log_capture: bool,

fn init(allocator: Allocator) Env {
return .{
.verbose = readEnvBool(allocator, "TEST_VERBOSE", true),
.fail_first = readEnvBool(allocator, "TEST_FAIL_FIRST", false),
.filter = readEnv(allocator, "TEST_FILTER"),
.do_log_capture = readEnvBool(allocator, "TEST_LOG_CAPTURE", true),
};
}

Expand Down
3 changes: 2 additions & 1 deletion tests/all_tests.zig
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ const time = std.time;
// Import all test modules
pub const minimal_tests = @import("minimal_test.zig");
pub const headers_tests = @import("headers_test.zig");
pub const reconnection_tests = @import("reconnection_test.zig");
// pub const reconnection_tests = @import("reconnection_test.zig");
pub const jetstream_tests = @import("jetstream_test.zig");

const utils = @import("utils.zig");

Expand Down
Loading