diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 47fbbb3..965e384 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -15,7 +15,7 @@ jobs: - name: Setup Zig uses: mlugg/setup-zig@v2 with: - version: 0.14.1 + version: 0.15.1 - name: Install libnats run: sudo apt-get update && sudo apt-get install libnats-dev diff --git a/.github/workflows/claude.yml b/.github/workflows/claude.yml index 7d78a30..5fbf828 100644 --- a/.github/workflows/claude.yml +++ b/.github/workflows/claude.yml @@ -33,7 +33,7 @@ jobs: - name: Setup Zig uses: mlugg/setup-zig@v2 with: - version: 0.14.1 + version: 0.15.1 - name: Install libnats run: sudo apt-get update && sudo apt-get install libnats-dev diff --git a/benchmarks/bench_util.zig b/benchmarks/bench_util.zig index 819ecbe..0a6b5d1 100644 --- a/benchmarks/bench_util.zig +++ b/benchmarks/bench_util.zig @@ -72,7 +72,7 @@ pub const BenchStats = struct { } }; -fn benchSignalHandler(sig_num: c_int) callconv(.C) void { +fn benchSignalHandler(sig_num: i32) callconv(.c) void { _ = sig_num; keep_running = false; } @@ -81,7 +81,7 @@ fn benchSignalHandler(sig_num: c_int) callconv(.C) void { pub fn setupSignals() !void { const sa = std.posix.Sigaction{ .handler = .{ .handler = benchSignalHandler }, - .mask = std.posix.empty_sigset, + .mask = std.posix.sigemptyset(), .flags = 0, }; std.posix.sigaction(std.posix.SIG.INT, &sa, null); diff --git a/build.zig b/build.zig index 9bf1534..4d8af8f 100644 --- a/build.zig +++ b/build.zig @@ -87,9 +87,11 @@ pub fn build(b: *std.Build) void { // Integration tests (require Docker and NATS server) const integration_tests = b.addTest(.{ - .root_source_file = b.path("tests/all_tests.zig"), - .target = target, - .optimize = optimize, + .root_module = b.createModule(.{ + .root_source_file = b.path("tests/all_tests.zig"), + .target = target, + .optimize = optimize, + }), .test_runner = .{ .path = b.path("test_runner.zig"), .mode = .simple }, }); integration_tests.root_module.addImport("nats", lib_mod); @@ -118,9 +120,11 @@ pub fn build(b: *std.Build) void { for (example_files) |example_info| { const exe = b.addExecutable(.{ .name = example_info.name, - .root_source_file = b.path(example_info.file), - .target = target, - .optimize = optimize, + .root_module = b.createModule(.{ + .root_source_file = b.path(example_info.file), + .target = target, + .optimize = optimize, + }), }); exe.root_module.addImport("nats", lib_mod); @@ -142,9 +146,11 @@ pub fn build(b: *std.Build) void { for (benchmark_files) |benchmark_info| { const exe = b.addExecutable(.{ .name = benchmark_info.name, - .root_source_file = b.path(benchmark_info.file), - .target = target, - .optimize = optimize, + .root_module = b.createModule(.{ + .root_source_file = b.path(benchmark_info.file), + .target = target, + .optimize = optimize, + }), }); exe.root_module.addImport("nats", lib_mod); @@ -156,9 +162,11 @@ pub fn build(b: *std.Build) void { // C benchmarks (require libnats) const c_echo_server = b.addExecutable(.{ .name = "echo_server_c", - .root_source_file = null, - .target = target, - .optimize = optimize, + .root_module = b.createModule(.{ + .root_source_file = null, + .target = target, + .optimize = optimize, + }), }); c_echo_server.addCSourceFile(.{ .file = b.path("benchmarks/echo_server.c"), .flags = &.{} }); c_echo_server.addCSourceFile(.{ .file = b.path("benchmarks/bench_util.c"), .flags = &.{} }); @@ -167,9 +175,11 @@ pub fn build(b: *std.Build) void { const c_echo_client = b.addExecutable(.{ .name = "echo_client_c", - .root_source_file = null, - .target = target, - .optimize = optimize, + .root_module = b.createModule(.{ + .root_source_file = null, + .target = target, + .optimize = optimize, + }), }); c_echo_client.addCSourceFile(.{ .file = b.path("benchmarks/echo_client.c"), .flags = &.{} }); c_echo_client.addCSourceFile(.{ .file = b.path("benchmarks/bench_util.c"), .flags = &.{} }); @@ -178,9 +188,11 @@ pub fn build(b: *std.Build) void { const c_publisher = b.addExecutable(.{ .name = "publisher_c", - .root_source_file = null, - .target = target, - .optimize = optimize, + .root_module = b.createModule(.{ + .root_source_file = null, + .target = target, + .optimize = optimize, + }), }); c_publisher.addCSourceFile(.{ .file = b.path("benchmarks/publisher.c"), .flags = &.{} }); c_publisher.addCSourceFile(.{ .file = b.path("benchmarks/bench_util.c"), .flags = &.{} }); @@ -189,9 +201,11 @@ pub fn build(b: *std.Build) void { const c_subscriber = b.addExecutable(.{ .name = "subscriber_c", - .root_source_file = null, - .target = target, - .optimize = optimize, + .root_module = b.createModule(.{ + .root_source_file = null, + .target = target, + .optimize = optimize, + }), }); c_subscriber.addCSourceFile(.{ .file = b.path("benchmarks/subscriber.c"), .flags = &.{} }); c_subscriber.addCSourceFile(.{ .file = b.path("benchmarks/bench_util.c"), .flags = &.{} }); diff --git a/build.zig.zon b/build.zig.zon index 6c0fd3e..1f6615a 100644 --- a/build.zig.zon +++ b/build.zig.zon @@ -28,7 +28,7 @@ // Tracks the earliest Zig version that the package considers to be a // supported use case. - .minimum_zig_version = "0.14.1", + .minimum_zig_version = "0.15.1", // This field is optional. // Each dependency must either provide a `url` and `hash`, or a `path`. diff --git a/examples/sub_async.zig b/examples/sub_async.zig index c34475a..e19d64a 100644 --- a/examples/sub_async.zig +++ b/examples/sub_async.zig @@ -33,7 +33,7 @@ pub fn main() !void { std.log.info("Subscribed with callback handler. Waiting for messages (10 seconds)...", .{}); // Keep the program running to receive messages - std.time.sleep(10 * std.time.ns_per_s); + std.Thread.sleep(10 * std.time.ns_per_s); std.log.info("Shutting down after receiving {} messages", .{counter}); } diff --git a/src/connection.zig b/src/connection.zig index 8cc2ae3..6aff0a7 100644 --- a/src/connection.zig +++ b/src/connection.zig @@ -630,10 +630,10 @@ pub const Connection = struct { defer self.resetScratch(); // TODO pre-allocate headers_buffer - var headers_buffer = ArrayList(u8).init(allocator); - defer headers_buffer.deinit(); + var headers_buffer = ArrayList(u8){}; + defer headers_buffer.deinit(allocator); - try msg.encodeHeaders(headers_buffer.writer()); + try msg.encodeHeaders(headers_buffer.writer(allocator)); const headers_len = headers_buffer.items.len; const total_payload = headers_len + msg.data.len; @@ -711,11 +711,12 @@ pub const Connection = struct { const allocator = self.scratch.allocator(); defer self.resetScratch(); - var buffer = ArrayList(u8).init(allocator); + var buffer = ArrayList(u8){}; + defer buffer.deinit(allocator); if (sub.queue) |group| { - try buffer.writer().print("SUB {s} {s} {d}\r\n", .{ sub.subject, group, sub.sid }); + try buffer.writer(allocator).print("SUB {s} {s} {d}\r\n", .{ sub.subject, group, sub.sid }); } else { - try buffer.writer().print("SUB {s} {d}\r\n", .{ sub.subject, sub.sid }); + try buffer.writer(allocator).print("SUB {s} {d}\r\n", .{ sub.subject, sub.sid }); } try self.write_buffer.append(buffer.items); } @@ -1226,7 +1227,8 @@ pub const Connection = struct { defer self.resetScratch(); // Build CONNECT message with all options - var buffer = ArrayList(u8).init(allocator); + var buffer = ArrayList(u8){}; + defer buffer.deinit(allocator); // Calculate effective no_responders: enable if server supports headers const no_responders = self.options.no_responders and self.server_info.headers; @@ -1253,10 +1255,10 @@ pub const Connection = struct { .auth_token = auth_token, }; - try buffer.writer().writeAll("CONNECT "); - try std.json.stringify(connect_obj, .{}, buffer.writer()); - try buffer.writer().writeAll("\r\n"); - try buffer.writer().writeAll("PING\r\n"); + try buffer.writer(allocator).writeAll("CONNECT "); + try std.fmt.format(buffer.writer(allocator), "{f}", .{std.json.fmt(connect_obj, .{})}); + try buffer.writer(allocator).writeAll("\r\n"); + try buffer.writer(allocator).writeAll("PING\r\n"); // Send via buffer (mutex already held) try self.write_buffer.append(buffer.items); @@ -1738,8 +1740,8 @@ pub const Connection = struct { log.debug("Re-establishing subscriptions", .{}); // Track SIDs that shouldn't be re-subscribed and must be removed - var to_remove = ArrayList(u64).init(self.allocator); - defer to_remove.deinit(); + var to_remove = ArrayList(u64){}; + defer to_remove.deinit(self.allocator); { self.subs_mutex.lock(); @@ -1748,7 +1750,8 @@ pub const Connection = struct { const allocator = self.scratch.allocator(); defer self.resetScratch(); - var buffer = ArrayList(u8).init(allocator); + var buffer = ArrayList(u8){}; + defer buffer.deinit(allocator); var iter = self.subscriptions.iterator(); while (iter.next()) |entry| { @@ -1765,21 +1768,21 @@ pub const Connection = struct { } else { // Already reached limit - don't re-subscribe; remove after unlock log.debug("Subscription {d} ({s}) already reached limit; will remove during reconnect", .{ sub.sid, sub.subject }); - try to_remove.append(sub.sid); + try to_remove.append(self.allocator, sub.sid); continue; } } // Send SUB command if (sub.queue) |queue| { - try buffer.writer().print("SUB {s} {s} {d}\r\n", .{ sub.subject, queue, sub.sid }); + try buffer.writer(allocator).print("SUB {s} {s} {d}\r\n", .{ sub.subject, queue, sub.sid }); } else { - try buffer.writer().print("SUB {s} {d}\r\n", .{ sub.subject, sub.sid }); + try buffer.writer(allocator).print("SUB {s} {d}\r\n", .{ sub.subject, sub.sid }); } // Send UNSUB with remaining limit if needed if (adjusted_max) |remaining| { - try buffer.writer().print("UNSUB {d} {d}\r\n", .{ sub.sid, remaining }); + try buffer.writer(allocator).print("UNSUB {d} {d}\r\n", .{ sub.sid, remaining }); log.debug("Re-subscribed to {s} with sid {d} and autounsubscribe limit {d} (delivered: {d})", .{ sub.subject, sub.sid, remaining, delivered }); } else { log.debug("Re-subscribed to {s} with sid {d}", .{ sub.subject, sub.sid }); diff --git a/src/jetstream.zig b/src/jetstream.zig index 65c857c..5403c12 100644 --- a/src/jetstream.zig +++ b/src/jetstream.zig @@ -29,6 +29,15 @@ const Result = @import("result.zig").Result; const log = @import("log.zig").log; +// Helper function to replace jsonStringifyAlloc +fn jsonStringifyAlloc(allocator: std.mem.Allocator, value: anytype, options: std.json.Stringify.Options) ![]u8 { + var buffer = std.ArrayList(u8){}; + defer buffer.deinit(allocator); + + try std.fmt.format(buffer.writer(allocator), "{f}", .{std.json.fmt(value, options)}); + return buffer.toOwnedSlice(allocator); +} + // Re-export JetStream message types pub const JetStreamMessage = jetstream_message.JetStreamMessage; pub const MsgMetadata = jetstream_message.MsgMetadata; @@ -438,7 +447,7 @@ pub const PullSubscription = struct { }; // Serialize the fetch request to JSON - const request_json = try std.json.stringifyAlloc(self.js.nc.allocator, request, .{ + const request_json = try jsonStringifyAlloc(self.js.nc.allocator, request, .{ .emit_null_optional_fields = false, }); defer self.js.nc.allocator.free(request_json); @@ -451,8 +460,8 @@ pub const PullSubscription = struct { try self.js.nc.publishRequest(api_subject, reply_subject, request_json); // Collect messages - var messages = std.ArrayList(*JetStreamMessage).init(self.js.nc.allocator); - defer messages.deinit(); + var messages = std.ArrayList(*JetStreamMessage){}; + defer messages.deinit(self.js.nc.allocator); var batch_complete = false; var fetch_error: ?anyerror = null; @@ -496,7 +505,7 @@ pub const PullSubscription = struct { const js_msg_ptr = try jetstream_message.createJetStreamMessage(self.js.nc, raw_msg); errdefer js_msg_ptr.deinit(); - try messages.append(js_msg_ptr); + try messages.append(self.js.nc.allocator, js_msg_ptr); } } else |err| switch (err) { error.Timeout => { @@ -507,7 +516,7 @@ pub const PullSubscription = struct { } // Convert ArrayList to owned slice - const messages_slice = try messages.toOwnedSlice(); + const messages_slice = try messages.toOwnedSlice(self.js.nc.allocator); return MessageBatch{ .messages = messages_slice, @@ -622,7 +631,7 @@ pub const JetStream = struct { subject: []const u8, }{ .subject = subject }; - const request_json = try std.json.stringifyAlloc(self.nc.allocator, request_payload, .{ + const request_json = try jsonStringifyAlloc(self.nc.allocator, request_payload, .{ .emit_null_optional_fields = false, }); defer self.nc.allocator.free(request_json); @@ -774,7 +783,7 @@ pub const JetStream = struct { defer self.nc.allocator.free(subject); // Serialize the config to JSON - const config_json = try std.json.stringifyAlloc(self.nc.allocator, config, .{}); + const config_json = try jsonStringifyAlloc(self.nc.allocator, config, .{}); defer self.nc.allocator.free(config_json); const msg = try self.sendRequest(subject, config_json); @@ -795,7 +804,7 @@ pub const JetStream = struct { defer self.nc.allocator.free(subject); // Serialize the config to JSON - const config_json = try std.json.stringifyAlloc(self.nc.allocator, config, .{}); + const config_json = try jsonStringifyAlloc(self.nc.allocator, config, .{}); defer self.nc.allocator.free(config_json); const msg = try self.sendRequest(subject, config_json); @@ -901,7 +910,7 @@ pub const JetStream = struct { config: ConsumerConfig, }{ .stream_name = stream_name, .config = config }; - const config_json = try std.json.stringifyAlloc(self.nc.allocator, request_payload, .{ + const config_json = try jsonStringifyAlloc(self.nc.allocator, request_payload, .{ .emit_null_optional_fields = false, }); defer self.nc.allocator.free(config_json); @@ -949,7 +958,7 @@ pub const JetStream = struct { const subject = try std.fmt.allocPrint(self.nc.allocator, "STREAM.PURGE.{s}", .{stream_name}); defer self.nc.allocator.free(subject); - const request_json = try std.json.stringifyAlloc(self.nc.allocator, request, .{}); + const request_json = try jsonStringifyAlloc(self.nc.allocator, request, .{}); defer self.nc.allocator.free(request_json); const msg = try self.sendRequest(subject, request_json); @@ -976,7 +985,7 @@ pub const JetStream = struct { }; // Serialize the request to JSON, omitting null fields - const request_json = try std.json.stringifyAlloc(self.nc.allocator, request, .{ + const request_json = try jsonStringifyAlloc(self.nc.allocator, request, .{ .emit_null_optional_fields = false, }); defer self.nc.allocator.free(request_json); @@ -1080,7 +1089,7 @@ pub const JetStream = struct { }; // Serialize the request to JSON, omitting null fields - const request_json = try std.json.stringifyAlloc(self.nc.allocator, request, .{ + const request_json = try jsonStringifyAlloc(self.nc.allocator, request, .{ .emit_null_optional_fields = false, }); defer self.nc.allocator.free(request_json); @@ -1118,7 +1127,7 @@ pub const JetStream = struct { defer self.nc.allocator.free(subject); // Serialize the request to JSON - const request_json = try std.json.stringifyAlloc(self.nc.allocator, request, .{}); + const request_json = try jsonStringifyAlloc(self.nc.allocator, request, .{}); defer self.nc.allocator.free(request_json); const msg = try self.sendRequest(subject, request_json); diff --git a/src/jetstream_kv.zig b/src/jetstream_kv.zig index 228397d..d0beb46 100644 --- a/src/jetstream_kv.zig +++ b/src/jetstream_kv.zig @@ -736,7 +736,7 @@ pub const KV = struct { for (filter_subjects.items) |subject| { self.js.nc.allocator.free(subject); } - filter_subjects.deinit(); + filter_subjects.deinit(self.js.nc.allocator); } // Convert filters to full subjects diff --git a/src/jetstream_message.zig b/src/jetstream_message.zig index 5cde019..2c0db42 100644 --- a/src/jetstream_message.zig +++ b/src/jetstream_message.zig @@ -109,7 +109,7 @@ pub const JetStreamMessage = struct { if (delay > 0) { // Convert milliseconds to nanoseconds for the protocol message const delay_ns = delay * std.time.ns_per_ms; - const formatted = try std.fmt.bufPrint(&ack_message, "{s} {{\"delay\": {}}}", .{ ack_type.toString(), delay_ns }); + const formatted = try std.fmt.bufPrint(&ack_message, "{s} {{\"delay\": {d}}}", .{ ack_type.toString(), delay_ns }); break :blk formatted; } else { break :blk ack_type.toString(); diff --git a/src/jetstream_objstore.zig b/src/jetstream_objstore.zig index 973e4a4..b9f6ece 100644 --- a/src/jetstream_objstore.zig +++ b/src/jetstream_objstore.zig @@ -30,6 +30,15 @@ const validation = @import("validation.zig"); const log = @import("log.zig").log; +// Helper function to replace std.json.stringifyAlloc +fn jsonStringifyAlloc(allocator: std.mem.Allocator, value: anytype, options: std.json.Stringify.Options) ![]u8 { + var buffer = std.ArrayList(u8){}; + defer buffer.deinit(allocator); + + try std.fmt.format(buffer.writer(allocator), "{f}", .{std.json.fmt(value, options)}); + return buffer.toOwnedSlice(allocator); +} + // Default chunk size (128KB) const DEFAULT_CHUNK_SIZE: u32 = 128 * 1024; @@ -217,7 +226,8 @@ pub const ObjectResult = struct { pub fn verify(self: *ObjectResult) !void { const calculated_digest = self.digest.finalResult(); var digest_hex: [64]u8 = undefined; - _ = std.fmt.bufPrint(&digest_hex, "{s}", .{std.fmt.fmtSliceHexLower(&calculated_digest)}) catch unreachable; + const hex_digest = std.fmt.bytesToHex(calculated_digest, .lower); + @memcpy(&digest_hex, &hex_digest); if (!std.mem.eql(u8, &digest_hex, self.info.digest)) { return ObjectStoreError.DigestMismatch; @@ -343,8 +353,8 @@ pub const ObjectStore = struct { // Create digest string - owned by arena const digest_bytes = hasher.finalResult(); - const digest_hex = try arena_allocator.alloc(u8, 64); - _ = std.fmt.bufPrint(digest_hex, "{s}", .{std.fmt.fmtSliceHexLower(&digest_bytes)}) catch unreachable; + const hex_digest = std.fmt.bytesToHex(digest_bytes, .lower); + const digest_hex = try arena_allocator.dupe(u8, &hex_digest); const obj_info = ObjectInfo{ .name = try arena_allocator.dupe(u8, meta.name), @@ -649,7 +659,7 @@ pub const ObjectStore = struct { .deleted = meta.deleted, }; - try objects.append(obj_info); + try objects.append(arena_allocator, obj_info); } // Stop when there are no more pending messages @@ -660,7 +670,7 @@ pub const ObjectStore = struct { return Result([]ObjectInfo){ .arena = arena, - .value = try objects.toOwnedSlice(), + .value = try objects.toOwnedSlice(arena_allocator), }; } @@ -678,7 +688,7 @@ pub const ObjectStore = struct { .digest = obj_info.digest, .deleted = obj_info.deleted, }; - return std.json.stringifyAlloc(self.allocator, json_info, .{}); + return jsonStringifyAlloc(self.allocator, json_info, .{}); } }; diff --git a/src/message_test.zig b/src/message_test.zig index a7ed31a..2ae992b 100644 --- a/src/message_test.zig +++ b/src/message_test.zig @@ -147,10 +147,10 @@ test "Message header encoding" { try msg.headerSet("Content-Type", "application/json"); try msg.headerSet("X-Custom", "value"); - var buf = std.ArrayList(u8).init(allocator); - defer buf.deinit(); + var buf = std.ArrayList(u8){}; + defer buf.deinit(allocator); - try msg.encodeHeaders(buf.writer()); + try msg.encodeHeaders(buf.writer(allocator)); // Should start with NATS/1.0 try testing.expect(std.mem.startsWith(u8, buf.items, "NATS/1.0\r\n")); @@ -196,10 +196,10 @@ test "Message status field and header parsing" { try testing.expectEqualStrings("test", custom_header.?); // Test encoding - Status header should be first line - var buf = std.ArrayList(u8).init(allocator); - defer buf.deinit(); + var buf = std.ArrayList(u8){}; + defer buf.deinit(allocator); - try msg.encodeHeaders(buf.writer()); + try msg.encodeHeaders(buf.writer(allocator)); // Should start with the full status line try testing.expect(std.mem.startsWith(u8, buf.items, "NATS/1.0 503 No Responders\r\n")); diff --git a/src/parser.zig b/src/parser.zig index 2570f0d..e2e03b0 100644 --- a/src/parser.zig +++ b/src/parser.zig @@ -79,13 +79,13 @@ pub const Parser = struct { pub fn init(allocator: std.mem.Allocator) Self { return Self{ .allocator = allocator, - .arg_buf_rec = std.ArrayList(u8).init(allocator), + .arg_buf_rec = std.ArrayList(u8){}, .msg_pool = MessagePool.init(allocator), }; } pub fn deinit(self: *Self) void { - self.arg_buf_rec.deinit(); + self.arg_buf_rec.deinit(self.allocator); if (self.ma.msg) |msg| { msg.deinit(); } @@ -196,7 +196,7 @@ pub const Parser = struct { else => { // Only accumulate if we're in split buffer mode if (self.arg_buf) |arg_buf| { - try arg_buf.append(b); + try arg_buf.append(self.allocator, b); } }, } @@ -393,7 +393,7 @@ pub const Parser = struct { else => { // Only accumulate if we're in split buffer mode if (self.arg_buf) |arg_buf| { - try arg_buf.append(b); + try arg_buf.append(self.allocator, b); } }, } @@ -466,7 +466,7 @@ pub const Parser = struct { else => { // Only accumulate if we're in split buffer mode if (self.arg_buf) |arg_buf| { - try arg_buf.append(b); + try arg_buf.append(self.allocator, b); } }, } @@ -486,7 +486,7 @@ pub const Parser = struct { // Set up arg_buf for next parse() call try self.setupArgBuf(); const remaining_args = buf[self.after_space .. i - self.drop]; - try self.arg_buf.?.appendSlice(remaining_args); + try self.arg_buf.?.appendSlice(self.allocator, remaining_args); } } diff --git a/src/queue.zig b/src/queue.zig index 58a9dc4..780aeff 100644 --- a/src/queue.zig +++ b/src/queue.zig @@ -132,8 +132,9 @@ fn ChunkPool(comptime T: type, comptime chunk_size: usize) type { const Self = @This(); fn init(allocator: Allocator, max_size: usize) Self { + _ = allocator; return .{ - .chunks = std.ArrayList(*Chunk).init(allocator), + .chunks = std.ArrayList(*Chunk){}, .max_size = max_size, }; } @@ -142,7 +143,7 @@ fn ChunkPool(comptime T: type, comptime chunk_size: usize) type { for (self.chunks.items) |chunk| { allocator.destroy(chunk); } - self.chunks.deinit(); + self.chunks.deinit(allocator); } fn get(self: *Self) ?*Chunk { @@ -150,12 +151,12 @@ fn ChunkPool(comptime T: type, comptime chunk_size: usize) type { return self.chunks.pop(); } - fn put(self: *Self, chunk: *Chunk) bool { + fn put(self: *Self, allocator: Allocator, chunk: *Chunk) bool { if (self.chunks.items.len >= self.max_size) { return false; } chunk.reset(); - self.chunks.append(chunk) catch return false; + self.chunks.append(allocator, chunk) catch return false; return true; } }; @@ -570,7 +571,7 @@ pub fn ConcurrentQueue(comptime T: type, comptime chunk_size: usize) type { } fn recycleChunk(self: *Self, chunk: *Chunk) void { - if (!self.chunk_pool.put(chunk)) { + if (!self.chunk_pool.put(self.allocator, chunk)) { self.allocator.destroy(chunk); self.total_chunks -= 1; } @@ -1021,7 +1022,7 @@ test "blocking pop handles queue closure" { // Start a thread that will close the queue after a delay const Closer = struct { fn run(q: *Queue) !void { - std.time.sleep(10 * std.time.ns_per_ms); + std.Thread.sleep(10 * std.time.ns_per_ms); q.close(); } }; @@ -1045,7 +1046,7 @@ test "getSlice handles queue closure with indefinite wait" { // Start a thread that will close the queue after a delay const Closer = struct { fn run(q: *Queue) !void { - std.time.sleep(10 * std.time.ns_per_ms); + std.Thread.sleep(10 * std.time.ns_per_ms); q.close(); } }; @@ -1149,13 +1150,13 @@ test "buffer moveToBuffer with multiple chunks" { try std.testing.expectEqual(total_bytes, dest.getBytesAvailable()); // Read and verify the moved data by consuming all chunks - var result = std.ArrayList(u8).init(allocator); - defer result.deinit(); + var result = std.ArrayList(u8){}; + defer result.deinit(allocator); while (dest.getBytesAvailable() > 0) { var view_opt = dest.tryGetSlice(); if (view_opt) |*view| { - try result.appendSlice(view.data); + try result.appendSlice(allocator, view.data); view.consume(view.data.len); } else { break; @@ -1286,7 +1287,7 @@ test "blocking operations during freeze" { // Start a thread that will resume after a delay const Resumer = struct { fn run(q: *Queue) !void { - std.time.sleep(10 * std.time.ns_per_ms); + std.Thread.sleep(10 * std.time.ns_per_ms); q.unfreeze(); try q.push(99); } diff --git a/test_runner.zig b/test_runner.zig index 7c1e7cc..dc34f78 100644 --- a/test_runner.zig +++ b/test_runner.zig @@ -19,6 +19,7 @@ const Allocator = std.mem.Allocator; // Log capture context const LogCapture = struct { captured_log_buffer: ?*std.ArrayList(u8) = null, + allocator: ?Allocator = null, mutex: std.Thread.Mutex = .{}, pub fn logFn( @@ -38,21 +39,23 @@ const LogCapture = struct { if (self.captured_log_buffer) |buf| { // Capture to buffer during test execution - buf.writer().print(scope_prefix ++ format ++ "\n", args) catch unreachable; + if (self.allocator) |alloc| { + buf.writer(alloc).print(scope_prefix ++ format ++ "\n", args) catch unreachable; + } } else { // Normal logging to stderr when not capturing std.debug.lockStdErr(); defer std.debug.unlockStdErr(); - const stderr = std.io.getStdErr().writer(); - stderr.print(scope_prefix ++ format ++ "\n", args) catch unreachable; + std.debug.print(scope_prefix ++ format ++ "\n", args); } } - pub fn startCapture(self: *@This(), buffer: *std.ArrayList(u8)) void { + pub fn startCapture(self: *@This(), buffer: *std.ArrayList(u8), allocator: Allocator) void { self.mutex.lock(); defer self.mutex.unlock(); self.captured_log_buffer = buffer; + self.allocator = allocator; } pub fn stopCapture(self: *@This()) void { @@ -60,6 +63,7 @@ const LogCapture = struct { defer self.mutex.unlock(); self.captured_log_buffer = null; + self.allocator = null; } }; @@ -107,8 +111,8 @@ pub fn main() !void { printer.fmt("\r\x1b[0K", .{}); // beginning of line and clear to end of line // Initialize log buffer for capturing test output - var log_buffer = std.ArrayList(u8).init(allocator); - defer log_buffer.deinit(); + var log_buffer = std.ArrayList(u8){}; + defer log_buffer.deinit(allocator); for (builtin.test_functions) |t| { if (isSetup(t)) { @@ -142,7 +146,7 @@ pub fn main() !void { if (env.do_log_capture) { // Clear log buffer and start capturing logs for this test log_buffer.clearRetainingCapacity(); - log_capture.startCapture(&log_buffer); + log_capture.startCapture(&log_buffer, allocator); } // Run per-test setup functions @@ -231,29 +235,25 @@ pub fn main() !void { } const Printer = struct { - out: std.fs.File.Writer, - fn init() Printer { - return .{ - .out = std.io.getStdErr().writer(), - }; + return .{}; } fn fmt(self: Printer, comptime format: []const u8, args: anytype) void { - std.fmt.format(self.out, format, args) catch unreachable; + _ = self; + std.debug.print(format, args); } fn status(self: Printer, s: Status, comptime format: []const u8, args: anytype) void { - const color = switch (s) { - .pass => "\x1b[32m", - .fail => "\x1b[31m", - .skip => "\x1b[33m", - else => "", - }; - const out = self.out; - out.writeAll(color) catch @panic("writeAll failed?!"); - std.fmt.format(out, format, args) catch @panic("std.fmt.format failed?!"); - self.fmt("\x1b[0m", .{}); + _ = self; + switch (s) { + .pass => std.debug.print("\x1b[32m", .{}), + .fail => std.debug.print("\x1b[31m", .{}), + .skip => std.debug.print("\x1b[33m", .{}), + else => {}, + } + std.debug.print(format, args); + std.debug.print("\x1b[0m", .{}); } }; diff --git a/tests/autounsubscribe_test.zig b/tests/autounsubscribe_test.zig index f6bf11e..ca9daae 100644 --- a/tests/autounsubscribe_test.zig +++ b/tests/autounsubscribe_test.zig @@ -40,26 +40,27 @@ test "autounsubscribe async basic functionality" { var conn = try utils.createDefaultConnection(); defer utils.closeConnection(conn); - var messages_received = std.ArrayList(*Message).init(std.testing.allocator); + var messages_received = std.ArrayList(*Message){}; defer { for (messages_received.items) |msg| { msg.deinit(); } - messages_received.deinit(); + messages_received.deinit(std.testing.allocator); } const TestContext = struct { messages: *std.ArrayList(*Message), + allocator: std.mem.Allocator, mutex: std.Thread.Mutex = .{}, pub fn handleMessage(msg: *Message, self: *@This()) !void { self.mutex.lock(); defer self.mutex.unlock(); - try self.messages.append(msg); + try self.messages.append(self.allocator, msg); } }; - var ctx = TestContext{ .messages = &messages_received }; + var ctx = TestContext{ .messages = &messages_received, .allocator = std.testing.allocator }; const sub = try conn.subscribe("auto.async.test", TestContext.handleMessage, .{&ctx}); defer sub.deinit(); @@ -84,7 +85,7 @@ test "autounsubscribe async basic functionality" { count = messages_received.items.len; ctx.mutex.unlock(); if (count >= 2) break; - std.time.sleep(10 * std.time.ns_per_ms); + std.Thread.sleep(10 * std.time.ns_per_ms); } try std.testing.expectEqual(@as(usize, 2), count); @@ -210,7 +211,7 @@ test "autounsubscribe with reconnection" { if (timer.read() >= 5000 * std.time.ns_per_ms) { return error.ReconnectionTimeout; } - std.time.sleep(10 * std.time.ns_per_ms); + std.Thread.sleep(10 * std.time.ns_per_ms); } // Publish more messages after reconnection (should only receive 2 more to reach limit of 5) diff --git a/tests/core_request_reply_test.zig b/tests/core_request_reply_test.zig index 57b2213..bd515fd 100644 --- a/tests/core_request_reply_test.zig +++ b/tests/core_request_reply_test.zig @@ -28,7 +28,7 @@ fn slowEchoHandler(msg: *nats.Message, connection: *nats.Connection) void { defer msg.deinit(); // Sleep for 200ms to simulate slow processing - std.time.sleep(200_000_000); // 200ms + std.Thread.sleep(200_000_000); // 200ms if (msg.reply) |reply_subject| { const response = std.fmt.allocPrint(std.testing.allocator, "slow: {s}", .{msg.data}) catch return; @@ -46,7 +46,7 @@ test "basic request reply" { defer replier_sub.deinit(); // Give the subscription time to register - std.time.sleep(10_000_000); // 10ms + std.Thread.sleep(10_000_000); // 10ms // Send a request var msg = try conn.request("test.echo", "hello world", 1000); @@ -65,7 +65,7 @@ test "simple request reply functionality" { defer replier_sub.deinit(); // Give the subscription time to register - std.time.sleep(10_000_000); // 10ms + std.Thread.sleep(10_000_000); // 10ms // Send a request var msg = try conn.request("test.simple.echo", "hello world", 1000); @@ -82,7 +82,7 @@ test "concurrent request reply" { const replier_sub = try conn.subscribe("test.concurrent", echoHandler, .{conn}); defer replier_sub.deinit(); - std.time.sleep(10_000_000); // 10ms + std.Thread.sleep(10_000_000); // 10ms // Send multiple requests concurrently var requests: [5]*nats.Message = undefined; @@ -123,7 +123,7 @@ test "request timeout with slow responder" { const slow_sub = try conn.subscribe("test.slow", slowEchoHandler, .{conn}); defer slow_sub.deinit(); - std.time.sleep(10_000_000); // 10ms + std.Thread.sleep(10_000_000); // 10ms // Send request with 100ms timeout (less than the 200ms handler delay) const response = conn.request("test.slow", "timeout test", 100); @@ -143,7 +143,7 @@ test "request with different subjects" { const replier2 = try conn.subscribe("test.subject2", echoHandler, .{conn}); defer replier2.deinit(); - std.time.sleep(10_000_000); // 10ms + std.Thread.sleep(10_000_000); // 10ms // Test requests to different subjects const response1 = try conn.request("test.subject1", "message1", 1000); @@ -163,7 +163,7 @@ test "requestMsg basic functionality" { const replier_sub = try conn.subscribe("test.requestmsg", simpleEchoHandler, .{conn}); defer replier_sub.deinit(); - std.time.sleep(10_000_000); // 10ms + std.Thread.sleep(10_000_000); // 10ms var request_msg = try conn.newMsg(); defer request_msg.deinit(); @@ -187,7 +187,7 @@ test "requestMsg with headers" { const replier_sub = try conn.subscribe("test.requestmsg.headers", echoHandler, .{conn}); defer replier_sub.deinit(); - std.time.sleep(10_000_000); // 10ms + std.Thread.sleep(10_000_000); // 10ms var request_msg = try conn.newMsg(); defer request_msg.deinit(); @@ -301,7 +301,7 @@ test "requestMany with sentinel function" { const replier_sub = try conn.subscribe("test.sentinel", sentinelResponder, .{conn}); defer replier_sub.deinit(); - std.time.sleep(10_000_000); // 10ms + std.Thread.sleep(10_000_000); // 10ms // Sentinel function that stops when it sees "END" message (ADR-47: false = stop) const sentinel = struct { @@ -355,11 +355,11 @@ test "requestMany with stall timeout" { connection.publish(reply_subject, "response-1") catch return; // Send second response after 10ms (within stall timeout) - std.time.sleep(10_000_000); // 10ms + std.Thread.sleep(10_000_000); // 10ms connection.publish(reply_subject, "response-2") catch return; // Wait 150ms (longer than 100ms stall timeout) then try to send third response - std.time.sleep(150_000_000); // 150ms + std.Thread.sleep(150_000_000); // 150ms connection.publish(reply_subject, "response-3") catch return; } }; @@ -368,7 +368,7 @@ test "requestMany with stall timeout" { const responder_thread = try std.Thread.spawn(.{}, ResponderThread.run, .{ conn, replier_sub }); defer responder_thread.join(); - std.time.sleep(10_000_000); // 10ms to ensure subscription is ready + std.Thread.sleep(10_000_000); // 10ms to ensure subscription is ready // Request with 100ms stall timeout - should get only first 2 responses var messages = try conn.requestMany("test.stall", "get with stall", 1000, .{ .stall_ms = 100 }); diff --git a/tests/drain_test.zig b/tests/drain_test.zig index b67e7f4..1d08c2d 100644 --- a/tests/drain_test.zig +++ b/tests/drain_test.zig @@ -43,7 +43,7 @@ test "subscription drain sync - with pending messages" { // Wait (up to 1s) for both messages to be counted as pending var waited: u64 = 0; while (sub.pending_msgs.load(.acquire) < 2 and waited < 1000) : (waited += 5) { - std.time.sleep(5 * std.time.ns_per_ms); + std.Thread.sleep(5 * std.time.ns_per_ms); } // Should have pending messages @@ -97,7 +97,7 @@ test "subscription drain async - with callback processing" { defer msg.deinit(); // Simulate some processing time - std.time.sleep(5 * std.time.ns_per_ms); + std.Thread.sleep(5 * std.time.ns_per_ms); ctx.processed_count_ptr.* += 1; if (ctx.processed_count_ptr.* == 3) { @@ -120,7 +120,7 @@ test "subscription drain async - with callback processing" { try conn.flush(); // Give messages time to arrive but not necessarily process - std.time.sleep(10 * std.time.ns_per_ms); + std.Thread.sleep(10 * std.time.ns_per_ms); // Messages should have arrived (they may be processing or queued) // Note: pending count may be 0 if already processed, so we'll skip this check @@ -155,7 +155,7 @@ test "subscription drain blocks new messages" { // Publish initial message try conn.publish("test.drain.block", "before drain"); try conn.flush(); - std.time.sleep(10 * std.time.ns_per_ms); + std.Thread.sleep(10 * std.time.ns_per_ms); // Should have 1 pending message try std.testing.expect(sub.pending_msgs.load(.acquire) == 1); @@ -167,7 +167,7 @@ test "subscription drain blocks new messages" { try conn.publish("test.drain.block", "after drain 1"); try conn.publish("test.drain.block", "after drain 2"); try conn.flush(); - std.time.sleep(10 * std.time.ns_per_ms); + std.Thread.sleep(10 * std.time.ns_per_ms); // Should still have only 1 pending message (new ones dropped) try std.testing.expect(sub.pending_msgs.load(.acquire) == 1); @@ -198,7 +198,7 @@ test "subscription drain timeout" { // Wait briefly for arrival var waited: u64 = 0; while (sub.pending_msgs.load(.acquire) < 1 and waited < 200) : (waited += 5) { - std.time.sleep(5 * std.time.ns_per_ms); + std.Thread.sleep(5 * std.time.ns_per_ms); } sub.drain(); @@ -249,7 +249,7 @@ test "connection drain - single subscription" { // Wait for messages to arrive var waited: u64 = 0; while (sub.pending_msgs.load(.acquire) < 2 and waited < 1000) : (waited += 5) { - std.time.sleep(5 * std.time.ns_per_ms); + std.Thread.sleep(5 * std.time.ns_per_ms); } try std.testing.expect(sub.pending_msgs.load(.acquire) == 2); @@ -289,7 +289,7 @@ test "connection drain - multiple subscriptions" { // Wait for messages to arrive var waited: u64 = 0; while ((sub1.pending_msgs.load(.acquire) < 1 or sub2.pending_msgs.load(.acquire) < 1) and waited < 1000) : (waited += 5) { - std.time.sleep(5 * std.time.ns_per_ms); + std.Thread.sleep(5 * std.time.ns_per_ms); } try std.testing.expect(sub1.pending_msgs.load(.acquire) == 1); try std.testing.expect(sub2.pending_msgs.load(.acquire) == 1); diff --git a/tests/jetstream_duplicate_ack_test.zig b/tests/jetstream_duplicate_ack_test.zig index 9dc8be9..336a3ea 100644 --- a/tests/jetstream_duplicate_ack_test.zig +++ b/tests/jetstream_duplicate_ack_test.zig @@ -64,7 +64,7 @@ test "ack should succeed on first call" { // Wait for message processing var attempts: u32 = 0; while (attempts < 30) { - std.time.sleep(100 * std.time.ns_per_ms); + std.Thread.sleep(100 * std.time.ns_per_ms); attempts += 1; test_data.mutex.lock(); @@ -149,7 +149,7 @@ test "ack should fail on second call" { // Wait for message processing var attempts: u32 = 0; while (attempts < 30) { - std.time.sleep(100 * std.time.ns_per_ms); + std.Thread.sleep(100 * std.time.ns_per_ms); attempts += 1; test_data.mutex.lock(); @@ -235,7 +235,7 @@ test "nak should fail after ack" { // Wait for message processing var attempts: u32 = 0; while (attempts < 30) { - std.time.sleep(100 * std.time.ns_per_ms); + std.Thread.sleep(100 * std.time.ns_per_ms); attempts += 1; test_data.mutex.lock(); @@ -328,7 +328,7 @@ test "inProgress can be called multiple times" { // Wait for message processing var attempts: u32 = 0; while (attempts < 30) { - std.time.sleep(100 * std.time.ns_per_ms); + std.Thread.sleep(100 * std.time.ns_per_ms); attempts += 1; test_data.mutex.lock(); diff --git a/tests/jetstream_nak_test.zig b/tests/jetstream_nak_test.zig index 8d449fc..396048b 100644 --- a/tests/jetstream_nak_test.zig +++ b/tests/jetstream_nak_test.zig @@ -31,8 +31,8 @@ test "NAK redelivery with delivery count verification" { fn init(allocator: std.mem.Allocator) @This() { return .{ - .messages = std.ArrayList([]const u8).init(allocator), - .delivery_counts = std.ArrayList(u64).init(allocator), + .messages = std.ArrayList([]const u8){}, + .delivery_counts = std.ArrayList(u64){}, .allocator = allocator, }; } @@ -41,8 +41,8 @@ test "NAK redelivery with delivery count verification" { for (self.messages.items) |msg| { self.allocator.free(msg); } - self.messages.deinit(); - self.delivery_counts.deinit(); + self.messages.deinit(self.allocator); + self.delivery_counts.deinit(self.allocator); if (self.first_delivery_data) |data| { self.allocator.free(data); } @@ -62,11 +62,11 @@ test "NAK redelivery with delivery count verification" { // Store message data copy for comparison const msg_copy = data.allocator.dupe(u8, js_msg.msg.data) catch return; - data.messages.append(msg_copy) catch return; + data.messages.append(data.allocator, msg_copy) catch return; // Get delivery count from JetStream message metadata const delivery_count = js_msg.metadata.num_delivered; - data.delivery_counts.append(delivery_count) catch return; + data.delivery_counts.append(data.allocator, delivery_count) catch return; log.info("Received message (delivery #{}): {s}", .{ delivery_count, js_msg.msg.data }); @@ -105,7 +105,7 @@ test "NAK redelivery with delivery count verification" { // Wait for message processing (should get delivered, NAK'd, then redelivered) var attempts: u32 = 0; while (attempts < 50) { // Wait up to 5 seconds - std.time.sleep(100 * std.time.ns_per_ms); + std.Thread.sleep(100 * std.time.ns_per_ms); attempts += 1; test_data.mutex.lock(); @@ -209,7 +209,7 @@ test "NAK with max delivery limit" { // Wait for all deliveries (should stop at max_deliver = 2) var wait_attempts: u32 = 0; while (wait_attempts < 30) { // Wait up to 3 seconds - std.time.sleep(100 * std.time.ns_per_ms); + std.Thread.sleep(100 * std.time.ns_per_ms); wait_attempts += 1; test_data.mutex.lock(); @@ -219,7 +219,7 @@ test "NAK with max delivery limit" { // Should stop at max_deliver limit if (count >= 2) { // Give a bit more time to ensure no additional deliveries - std.time.sleep(200 * std.time.ns_per_ms); + std.Thread.sleep(200 * std.time.ns_per_ms); break; } } @@ -273,10 +273,10 @@ test "JetStream message metadata parsing" { log.info("JetStream message metadata:", .{}); log.info("- Stream: {s}", .{js_msg.metadata.stream}); log.info("- Consumer: {s}", .{js_msg.metadata.consumer}); - log.info("- Consumer sequence: {?}", .{js_msg.metadata.sequence.consumer}); - log.info("- Stream sequence: {?}", .{js_msg.metadata.sequence.stream}); - log.info("- Delivered count: {}", .{js_msg.metadata.num_delivered}); - log.info("- Pending count: {?}", .{js_msg.metadata.num_pending}); + log.info("- Consumer sequence: {d}", .{js_msg.metadata.sequence.consumer}); + log.info("- Stream sequence: {d}", .{js_msg.metadata.sequence.stream}); + log.info("- Delivered count: {d}", .{js_msg.metadata.num_delivered}); + log.info("- Pending count: {d}", .{js_msg.metadata.num_pending}); // Verify metadata is populated correctly const stream_name = js_msg.metadata.stream; @@ -313,7 +313,7 @@ test "JetStream message metadata parsing" { // Wait for message processing var attempts: u32 = 0; while (attempts < 30) { - std.time.sleep(100 * std.time.ns_per_ms); + std.Thread.sleep(100 * std.time.ns_per_ms); attempts += 1; mutex.lock(); @@ -359,13 +359,13 @@ test "NAK with delay redelivery timing" { fn init(allocator: std.mem.Allocator) @This() { return .{ - .delivery_times = std.ArrayList(i64).init(allocator), + .delivery_times = std.ArrayList(i64){}, .allocator = allocator, }; } fn deinit(self: *@This()) void { - self.delivery_times.deinit(); + self.delivery_times.deinit(self.allocator); } }; @@ -381,7 +381,7 @@ test "NAK with delay redelivery timing" { defer data.mutex.unlock(); const current_time = std.time.milliTimestamp(); - data.delivery_times.append(current_time) catch return; + data.delivery_times.append(data.allocator, current_time) catch return; const delivery_count = js_msg.metadata.num_delivered; data.delivery_count += 1; @@ -423,7 +423,7 @@ test "NAK with delay redelivery timing" { // Wait for both deliveries (original + redelivery after delay) var attempts: u32 = 0; while (attempts < 100) { // Wait up to 10 seconds - std.time.sleep(100 * std.time.ns_per_ms); + std.Thread.sleep(100 * std.time.ns_per_ms); attempts += 1; test_data.mutex.lock(); @@ -515,7 +515,7 @@ test "NAK with zero delay behaves like regular NAK" { // Wait for both deliveries var attempts: u32 = 0; while (attempts < 30) { // Wait up to 3 seconds - std.time.sleep(100 * std.time.ns_per_ms); + std.Thread.sleep(100 * std.time.ns_per_ms); attempts += 1; mutex.lock(); diff --git a/tests/jetstream_push_test.zig b/tests/jetstream_push_test.zig index fc959cd..e86d764 100644 --- a/tests/jetstream_push_test.zig +++ b/tests/jetstream_push_test.zig @@ -54,7 +54,7 @@ test "basic push subscription" { try conn.publish("orders.update", "Order Update"); // Wait a bit for messages to be processed - std.time.sleep(100 * std.time.ns_per_ms); + std.Thread.sleep(100 * std.time.ns_per_ms); // Verify messages were received try testing.expect(message_count > 0); @@ -84,7 +84,7 @@ test "push subscription with flow control" { counter.* += 1; // Simulate some processing time - std.time.sleep(10 * std.time.ns_per_ms); + std.Thread.sleep(10 * std.time.ns_per_ms); // Acknowledge successful processing js_msg.ack() catch |err| { @@ -114,7 +114,7 @@ test "push subscription with flow control" { } // Allow time for processing - std.time.sleep(200 * std.time.ns_per_ms); + std.Thread.sleep(200 * std.time.ns_per_ms); try testing.expect(processed_count > 0); log.info("Processed {d} tasks with flow control", .{processed_count}); diff --git a/tests/pending_msgs_test.zig b/tests/pending_msgs_test.zig index d550c3f..ef0a3d5 100644 --- a/tests/pending_msgs_test.zig +++ b/tests/pending_msgs_test.zig @@ -24,7 +24,7 @@ test "pending_msgs counter sync subscription" { while (waited_ms < 1000) : (waited_ms += 10) { if (sub.pending_msgs.load(.acquire) == 1 and sub.pending_bytes.load(.acquire) == msg1_data.len) break; - std.time.sleep(10 * std.time.ns_per_ms); + std.Thread.sleep(10 * std.time.ns_per_ms); } // Should have 1 pending message and correct bytes @@ -40,7 +40,7 @@ test "pending_msgs counter sync subscription" { while (waited_ms < 1000) : (waited_ms += 10) { if (sub.pending_msgs.load(.acquire) == 2 and sub.pending_bytes.load(.acquire) == msg1_data.len + msg2_data.len) break; - std.time.sleep(10 * std.time.ns_per_ms); + std.Thread.sleep(10 * std.time.ns_per_ms); } // Should have 2 pending messages and correct total bytes @@ -84,7 +84,7 @@ test "pending_msgs counter async subscription" { _ = ctx.message_count_ptr.fetchAdd(1, .acq_rel); _ = ctx.total_bytes_ptr.fetchAdd(@intCast(msg.data.len), .acq_rel); // Add a small delay to simulate processing - std.time.sleep(5 * std.time.ns_per_ms); + std.Thread.sleep(5 * std.time.ns_per_ms); _ = ctx.processed_count_ptr.fetchAdd(1, .acq_rel); } }.handle; @@ -111,7 +111,7 @@ test "pending_msgs counter async subscription" { try conn.flush(); // Give a moment for messages to arrive but not fully process - std.time.sleep(20 * std.time.ns_per_ms); + std.Thread.sleep(20 * std.time.ns_per_ms); // Should have some pending messages (might be processing) // Note: We can't assert an exact number here since processing might start immediately @@ -119,7 +119,7 @@ test "pending_msgs counter async subscription" { // Wait for all messages to be processed var attempts: u32 = 0; while (processed_count.load(.acquire) < 3 and attempts < 200) { - std.time.sleep(10 * std.time.ns_per_ms); + std.Thread.sleep(10 * std.time.ns_per_ms); attempts += 1; } diff --git a/tests/reconnection_test.zig b/tests/reconnection_test.zig index 3053925..61647ad 100644 --- a/tests/reconnection_test.zig +++ b/tests/reconnection_test.zig @@ -98,7 +98,7 @@ test "basic reconnection when server stops" { return error.StillNotConnected; } - std.time.sleep(10 * std.time.ns_per_ms); + std.Thread.sleep(10 * std.time.ns_per_ms); } // Verify connection works after reconnection diff --git a/tests/utils.zig b/tests/utils.zig index 19cc828..1592fa0 100644 --- a/tests/utils.zig +++ b/tests/utils.zig @@ -85,7 +85,7 @@ pub fn waitForHealthyServices(allocator: std.mem.Allocator, timeout_ms: i64) !vo return; } - std.time.sleep(100 * std.time.ns_per_ms); + std.Thread.sleep(100 * std.time.ns_per_ms); } }