diff --git a/README.md b/README.md index 221e62a..3a4ea8f 100644 --- a/README.md +++ b/README.md @@ -154,6 +154,41 @@ var consumer_info = try js.addConsumer("ORDERS", consumer_config); defer consumer_info.deinit(); ``` +### JetStream Push Subscriptions + +```zig +// Push subscription with callback handler +fn orderHandler(js_msg: *nats.JetStreamMessage, count: *u32) !void { + defer js_msg.deinit(); + count.* += 1; + try js_msg.ack(); // Acknowledge message + std.debug.print("Order: {s}\n", .{js_msg.data}); +} + +var processed: u32 = 0; +var push_sub = try js.subscribe("orders.*", orderHandler, .{&processed}, .{ + .stream = "ORDERS", + .durable = "order_processor", +}); +defer push_sub.deinit(); +``` + +### JetStream Pull Subscriptions + +```zig +// Pull subscription (fetch messages manually) +var pull_sub = try js.pullSubscribe("orders.*", "batch_processor", .{ + .stream = "ORDERS", +}); +defer pull_sub.deinit(); + +var batch = try pull_sub.fetch(10, 5000); // Fetch up to 10 msgs, 5s timeout +defer batch.deinit(); +for (batch.messages) |js_msg| { + try js_msg.ack(); +} +``` + ## Building ```bash diff --git a/src/jetstream.zig b/src/jetstream.zig index b341c2f..65c857c 100644 --- a/src/jetstream.zig +++ b/src/jetstream.zig @@ -198,7 +198,7 @@ pub const ConsumerConfig = struct { /// For push consumers this will regularly send an empty mess with Status header 100 and a reply subject /// (This field was moved to the end to avoid duplication) /// The number of pulls that can be outstanding on a pull consumer - max_waiting: i64 = 512, + max_waiting: ?i64 = null, /// Delivers only the headers of messages in the stream and not the bodies headers_only: ?bool = null, /// The largest batch property that may be specified when doing a pull on a Pull Consumer @@ -548,6 +548,30 @@ pub const JetStreamSubscription = struct { } }; +/// Options for push subscriptions (subscribe, subscribeSync, queueSubscribe) +pub const SubscribeOptions = struct { + /// Stream name - auto-discovered from subject if null + stream: ?[]const u8 = null, + + /// Durable consumer name - creates durable consumer if provided + durable: ?[]const u8 = null, + + /// Full consumer configuration - merged with other options + config: ?ConsumerConfig = null, + + /// Manual acknowledgment mode (default: false = auto-ack like nats.c) + manual_ack: bool = false, +}; + +/// Options for pull subscriptions +pub const PullSubscribeOptions = struct { + /// Stream name - auto-discovered from subject if null + stream: ?[]const u8 = null, + + /// Full consumer configuration + config: ?ConsumerConfig = null, +}; + pub const JetStreamOptions = struct { request_timeout_ms: u64 = default_request_timeout_ms, // Add options here @@ -589,6 +613,89 @@ pub const JetStream = struct { return jetstream_errors.mapErrorCode(info.err_code); } + /// Lookup stream name by subject pattern + fn lookupStreamBySubject(self: JetStream, subject: []const u8) ![]const u8 { + try validation.validateSubject(subject); + + // Build the subject filter request + const request_payload = struct { + subject: []const u8, + }{ .subject = subject }; + + const request_json = try std.json.stringifyAlloc(self.nc.allocator, request_payload, .{ + .emit_null_optional_fields = false, + }); + defer self.nc.allocator.free(request_json); + + const msg = try self.sendRequest("STREAM.NAMES", request_json); + defer msg.deinit(); + + const names_result = try self.parseResponse(StreamNamesResponse, msg); + defer names_result.deinit(); + + const streams = names_result.value.streams orelse return error.NoStreamMatches; + + if (streams.len == 0) { + return error.NoStreamMatches; + } else if (streams.len > 1) { + return error.MultipleStreamMatches; + } + + // Return a copy of the stream name that we own + return try self.nc.allocator.dupe(u8, streams[0]); + } + + /// Get existing consumer or create new one based on configuration + fn getOrCreateConsumer(self: JetStream, stream_name: []const u8, subject: ?[]const u8, durable: ?[]const u8, base_config: ?ConsumerConfig, is_pull: bool, queue: ?[]const u8) !Result(ConsumerInfo) { + // Start with base config or empty config + var config = base_config orelse ConsumerConfig{}; + + // Set consumer name from durable parameter if provided + if (durable) |d| { + config.name = d; + config.durable_name = d; // For backward compatibility + } + + // Set subject filter if provided + if (subject) |s| { + config.filter_subject = s; + } + + // Set queue group for push consumers + if (queue) |q| { + config.deliver_group = q; + } + + // Configure for pull vs push consumer + if (is_pull) { + config.deliver_subject = null; + if (config.max_waiting == null) config.max_waiting = 512; + } else { + // Push consumer - remove pull-only fields + config.max_waiting = null; // Don't send max_waiting for push consumers + config.max_batch = null; + config.max_expires = null; + } + + // If durable is provided, try to get existing consumer first + if (durable) |consumer_name| { + // Try to get existing consumer info + if (self.getConsumerInfo(stream_name, consumer_name)) |existing_info| { + log.debug("Found existing consumer: {s}", .{consumer_name}); + return existing_info; + } else |err| switch (err) { + error.ConsumerNotFound => { + // Consumer doesn't exist, we'll create it below + log.debug("Consumer {s} not found, creating new one", .{consumer_name}); + }, + else => return err, // Other errors should be propagated + } + } + + // Create the consumer + return try self.addConsumer(stream_name, config); + } + /// Parse a response from the server, handling errors if present. fn parseResponse(self: JetStream, comptime T: type, msg: *Message) !Result(T) { try maybeParseErrorResponse(msg); @@ -794,7 +901,9 @@ pub const JetStream = struct { config: ConsumerConfig, }{ .stream_name = stream_name, .config = config }; - const config_json = try std.json.stringifyAlloc(self.nc.allocator, request_payload, .{}); + const config_json = try std.json.stringifyAlloc(self.nc.allocator, request_payload, .{ + .emit_null_optional_fields = false, + }); defer self.nc.allocator.free(config_json); const msg = try self.sendRequest(subject, config_json); @@ -1077,31 +1186,10 @@ pub const JetStream = struct { } } - pub fn subscribe(self: JetStream, stream_name: []const u8, consumer_config: ConsumerConfig, comptime handlerFn: anytype, args: anytype) !*JetStreamSubscription { - // Generate deliver_subject if not provided and create push consumer config - var push_config = consumer_config; - const generated_deliver_subject = if (consumer_config.deliver_subject == null) - try inbox.newInbox(self.nc.allocator) - else - null; - errdefer if (generated_deliver_subject) |ds| self.nc.allocator.free(ds); - - const deliver_subject = consumer_config.deliver_subject orelse generated_deliver_subject.?; - try validation.validateSubject(deliver_subject); - push_config.deliver_subject = deliver_subject; - - // Remove pull-only fields from push consumer config - push_config.max_waiting = 0; // Push consumers don't support max_waiting - push_config.max_batch = null; // Push consumers don't support max_batch - push_config.max_expires = null; // Push consumers don't support max_expires - - // Create the push consumer first - var consumer_info = try self.addConsumer(stream_name, push_config); - errdefer consumer_info.deinit(); - - // Define the handler inline to avoid the two-level context issue - const JSHandler = struct { - fn wrappedHandler(msg: *Message, nc: *Connection, user_args: @TypeOf(args)) anyerror!void { + /// Helper to generate a JetStream message handler wrapper + fn JetStreamHandler(comptime handlerFn: anytype) type { + return struct { + fn wrappedHandler(msg: *Message, nc: *Connection, user_args: anytype, manual_ack: bool) anyerror!void { // Check for status messages (heartbeats and flow control) if (msg.status_code == STATUS_CONTROL) { // Handle status message internally, don't pass to user callback @@ -1121,17 +1209,79 @@ pub const JetStream = struct { // Call user handler with JetStream message - handler owns cleanup responsibility // Support both void and fallible handlers + comptime { + const HandlerType = @TypeOf(handlerFn); + const type_info = @typeInfo(HandlerType); + if (type_info != .@"fn") { + @compileError("Handler must be a function, got: " ++ @typeName(HandlerType)); + } + } const ReturnType = @typeInfo(@TypeOf(handlerFn)).@"fn".return_type.?; + + // Only auto-ack if callback succeeds (like nats-py) + var callback_success = false; if (ReturnType == void) { @call(.auto, handlerFn, .{js_msg} ++ user_args); + callback_success = true; } else { - try @call(.auto, handlerFn, .{js_msg} ++ user_args); + @call(.auto, handlerFn, .{js_msg} ++ user_args) catch |err| { + log.err("User handler failed: {}", .{err}); + return; // Don't auto-ack on callback error + }; + callback_success = true; + } + + // Auto-acknowledge after successful user callback if not manual_ack + if (!manual_ack and callback_success) { + js_msg.ack() catch |err| switch (err) { + jetstream_message.AckError.AlreadyAcked => {}, // Ignore already acked (like nats-py) + else => log.err("Auto-ack failed: {}", .{err}), + }; } } }; + } + + /// Subscribe to a JetStream push consumer with callback handler + pub fn subscribe(self: JetStream, subject: ?[]const u8, comptime handlerFn: anytype, handler_args: anytype, options: SubscribeOptions) !*JetStreamSubscription { + // Resolve stream name + const stream_name = if (options.stream) |s| + s + else if (subject) |s| + try self.lookupStreamBySubject(s) + else + return error.StreamOrSubjectRequired; + + defer if (options.stream == null and subject != null) self.nc.allocator.free(stream_name); + + // Prepare consumer config, generating deliver_subject if needed + var config = options.config orelse ConsumerConfig{}; + const generated_deliver_subject = if (config.deliver_subject == null) + try inbox.newInbox(self.nc.allocator) + else + null; + errdefer if (generated_deliver_subject) |ds| self.nc.allocator.free(ds); + + // Validate user-provided deliver_subject (not generated ones) + if (config.deliver_subject != null) { + try validation.validateSubject(config.deliver_subject.?); + } + + if (generated_deliver_subject) |ds| { + config.deliver_subject = ds; + } + + // Create or get consumer with complete config + var consumer_info = try self.getOrCreateConsumer(stream_name, subject, options.durable, config, false, null); + errdefer consumer_info.deinit(); - // Subscribe to the delivery subject with simple arguments - const subscription = try self.nc.subscribe(deliver_subject, JSHandler.wrappedHandler, .{ self.nc, args }); + const deliver_subject = consumer_info.value.config.deliver_subject.?; + + // Use the reusable JetStream handler wrapper + const JSHandler = JetStreamHandler(handlerFn); + + // Subscribe to the delivery subject + const subscription = try self.nc.subscribe(deliver_subject, JSHandler.wrappedHandler, .{ self.nc, handler_args, options.manual_ack }); errdefer self.nc.unsubscribe(subscription); // Create JetStream subscription wrapper @@ -1147,28 +1297,42 @@ pub const JetStream = struct { } /// Create a synchronous push subscription for manual message consumption - pub fn subscribeSync(self: JetStream, stream_name: []const u8, consumer_config: ConsumerConfig) !*JetStreamSubscription { - // Generate deliver_subject if not provided and create push consumer config - var push_config = consumer_config; - const generated_deliver_subject = if (consumer_config.deliver_subject == null) + pub fn subscribeSync(self: JetStream, subject: ?[]const u8, options: SubscribeOptions) !*JetStreamSubscription { + // Resolve stream name + const stream_name = if (options.stream) |s| + s + else if (subject) |s| + try self.lookupStreamBySubject(s) + else + return error.StreamOrSubjectRequired; + + defer if (options.stream == null and subject != null) self.nc.allocator.free(stream_name); + + // Prepare config with deliver_subject if needed + var config = options.config orelse ConsumerConfig{}; + + // Validate user-provided deliver_subject (not generated ones) + if (config.deliver_subject != null) { + try validation.validateSubject(config.deliver_subject.?); + } + + // Generate deliver_subject if not provided in the config + const generated_deliver_subject = if (config.deliver_subject == null) try inbox.newInbox(self.nc.allocator) else null; errdefer if (generated_deliver_subject) |ds| self.nc.allocator.free(ds); - const deliver_subject = consumer_config.deliver_subject orelse generated_deliver_subject.?; - try validation.validateSubject(deliver_subject); - push_config.deliver_subject = deliver_subject; - - // Remove pull-only fields from push consumer config - push_config.max_waiting = 0; // Push consumers don't support max_waiting - push_config.max_batch = null; // Push consumers don't support max_batch - push_config.max_expires = null; // Push consumers don't support max_expires + if (generated_deliver_subject) |ds| { + config.deliver_subject = ds; + } - // Create the push consumer - var consumer_info = try self.addConsumer(stream_name, push_config); + // Create or get consumer with proper config + var consumer_info = try self.getOrCreateConsumer(stream_name, subject, options.durable, config, false, null); errdefer consumer_info.deinit(); + const deliver_subject = consumer_info.value.config.deliver_subject.?; + // Create synchronous subscription (no callback handler) const subscription = try self.nc.subscribeSync(deliver_subject); errdefer self.nc.unsubscribe(subscription); @@ -1184,21 +1348,135 @@ pub const JetStream = struct { return js_sub; } + /// Create a queue subscription for load balancing across multiple consumers + pub fn queueSubscribe(self: JetStream, subject: ?[]const u8, queue: []const u8, comptime handlerFn: anytype, handler_args: anytype, options: SubscribeOptions) !*JetStreamSubscription { + // Resolve stream name + const stream_name = if (options.stream) |s| + s + else if (subject) |s| + try self.lookupStreamBySubject(s) + else + return error.StreamOrSubjectRequired; + + defer if (options.stream == null and subject != null) self.nc.allocator.free(stream_name); + + // Prepare config with deliver_subject if needed + var config = options.config orelse ConsumerConfig{}; + + // Validate user-provided deliver_subject (not generated ones) + if (config.deliver_subject != null) { + try validation.validateSubject(config.deliver_subject.?); + } + + // Generate deliver_subject if not provided in the config + const generated_deliver_subject = if (config.deliver_subject == null) + try inbox.newInbox(self.nc.allocator) + else + null; + errdefer if (generated_deliver_subject) |ds| self.nc.allocator.free(ds); + + if (generated_deliver_subject) |ds| { + config.deliver_subject = ds; + } + + // Create or get consumer with queue group + var consumer_info = try self.getOrCreateConsumer(stream_name, subject, options.durable, config, false, queue); + errdefer consumer_info.deinit(); + + const deliver_subject = consumer_info.value.config.deliver_subject.?; + + // Use the reusable JetStream handler wrapper + const JSHandler = JetStreamHandler(handlerFn); + + // Subscribe to the delivery subject with queue group + const subscription = try self.nc.queueSubscribe(deliver_subject, queue, JSHandler.wrappedHandler, .{ self.nc, handler_args, options.manual_ack }); + errdefer self.nc.unsubscribe(subscription); + + // Create JetStream subscription wrapper + const js_sub = try self.nc.allocator.create(JetStreamSubscription); + js_sub.* = JetStreamSubscription{ + .subscription = subscription, + .js = self, + .consumer_info = consumer_info, + .deliver_subject_owned = generated_deliver_subject, + }; + + return js_sub; + } + + /// Create a synchronous queue subscription for load balancing across multiple consumers + pub fn queueSubscribeSync(self: JetStream, subject: ?[]const u8, queue: []const u8, options: SubscribeOptions) !*JetStreamSubscription { + // Resolve stream name + const stream_name = if (options.stream) |s| + s + else if (subject) |s| + try self.lookupStreamBySubject(s) + else + return error.StreamOrSubjectRequired; + + defer if (options.stream == null and subject != null) self.nc.allocator.free(stream_name); + + // Prepare config with deliver_subject if needed + var config = options.config orelse ConsumerConfig{}; + + // Validate user-provided deliver_subject (not generated ones) + if (config.deliver_subject != null) { + try validation.validateSubject(config.deliver_subject.?); + } + + // Generate deliver_subject if not provided in the config + const generated_deliver_subject = if (config.deliver_subject == null) + try inbox.newInbox(self.nc.allocator) + else + null; + errdefer if (generated_deliver_subject) |ds| self.nc.allocator.free(ds); + + if (generated_deliver_subject) |ds| { + config.deliver_subject = ds; + } + + // Create or get consumer with queue group + var consumer_info = try self.getOrCreateConsumer(stream_name, subject, options.durable, config, false, queue); + errdefer consumer_info.deinit(); + + const deliver_subject = consumer_info.value.config.deliver_subject.?; + + // Create synchronous subscription with queue group + const subscription = try self.nc.queueSubscribeSync(deliver_subject, queue); + errdefer self.nc.unsubscribe(subscription); + + // Create JetStream subscription wrapper + const js_sub = try self.nc.allocator.create(JetStreamSubscription); + js_sub.* = JetStreamSubscription{ + .subscription = subscription, + .js = self, + .consumer_info = consumer_info, + .deliver_subject_owned = generated_deliver_subject, + }; + + return js_sub; + } + /// Create a pull subscription for the specified stream - pub fn pullSubscribe(self: JetStream, stream_name: []const u8, consumer_config: ConsumerConfig) !*PullSubscription { - // Create pull consumer config with appropriate defaults - var pull_config = consumer_config; - pull_config.deliver_subject = null; // Force null for pull consumers - if (pull_config.max_waiting == 0) pull_config.max_waiting = 512; // Default max waiting pulls + pub fn pullSubscribe(self: JetStream, subject: ?[]const u8, durable: []const u8, options: PullSubscribeOptions) !*PullSubscription { + // Resolve stream name + const stream_name = if (options.stream) |s| + s + else if (subject) |s| + try self.lookupStreamBySubject(s) + else + return error.StreamOrSubjectRequired; - // Create the consumer - var consumer_info = try self.addConsumer(stream_name, pull_config); + defer if (options.stream == null and subject != null) self.nc.allocator.free(stream_name); + + // Create or get consumer (pull consumers require a name) + var consumer_info = try self.getOrCreateConsumer(stream_name, subject, durable, options.config, true, null); errdefer consumer_info.deinit(); - // Get the consumer name (use name first, then durable_name) + // Get the consumer name (should be set from durable parameter) const consumer_name = consumer_info.value.config.name orelse consumer_info.value.config.durable_name orelse - return error.MissingConsumerName; + durable; // Generate unique inbox prefix for this pull subscription const inbox_base = try inbox.newInbox(self.nc.allocator); diff --git a/src/jetstream_kv.zig b/src/jetstream_kv.zig index 614eee1..228397d 100644 --- a/src/jetstream_kv.zig +++ b/src/jetstream_kv.zig @@ -214,7 +214,10 @@ pub const KVWatcher = struct { .filter_subjects = subjects, }; - var sub = try kv.js.subscribeSync(kv.stream_name, consumer_config); + var sub = try kv.js.subscribeSync(null, .{ + .stream = kv.stream_name, + .config = consumer_config, + }); errdefer sub.deinit(); // Match C logic: only use num_pending to detect empty streams diff --git a/src/jetstream_objstore.zig b/src/jetstream_objstore.zig index c6b50fd..973e4a4 100644 --- a/src/jetstream_objstore.zig +++ b/src/jetstream_objstore.zig @@ -459,10 +459,12 @@ pub const ObjectStore = struct { .deliver_policy = .all, .ack_policy = .none, .max_ack_pending = 0, - .filter_subjects = &.{chunk_subject}, }; - const sub = try self.js.subscribeSync(self.stream_name, consumer_config); + const sub = try self.js.subscribeSync(chunk_subject, .{ + .stream = self.stream_name, + .config = consumer_config, + }); // Create result with subscription return ObjectResult.init(arena_allocator, obj_info, sub, arena); @@ -600,10 +602,12 @@ pub const ObjectStore = struct { .deliver_policy = .last_per_subject, .ack_policy = .none, .max_ack_pending = 0, - .filter_subjects = &.{meta_filter}, }; - const sub = try self.js.subscribeSync(self.stream_name, consumer_config); + const sub = try self.js.subscribeSync(meta_filter, .{ + .stream = self.stream_name, + .config = consumer_config, + }); defer sub.deinit(); const arena = try self.allocator.create(std.heap.ArenaAllocator); diff --git a/tests/jetstream_duplicate_ack_test.zig b/tests/jetstream_duplicate_ack_test.zig index 89422c4..9dc8be9 100644 --- a/tests/jetstream_duplicate_ack_test.zig +++ b/tests/jetstream_duplicate_ack_test.zig @@ -49,14 +49,13 @@ test "ack should succeed on first call" { } }; - // Create consumer - const consumer_config = nats.ConsumerConfig{ - .durable_name = "dup_ack_consumer", - .deliver_subject = "push.dup.ack", - .ack_policy = .explicit, - }; - - var push_sub = try js.subscribe("TEST_DUP_ACK_STREAM", consumer_config, AckHandler.handle, .{&test_data}); + var push_sub = try js.subscribe("test.dup.ack.*", AckHandler.handle, .{&test_data}, .{ + .stream = "TEST_DUP_ACK_STREAM", + .durable = "dup_ack_consumer", + .config = .{ + .deliver_policy = .all, + }, + }); defer push_sub.deinit(); // Publish test message @@ -135,14 +134,13 @@ test "ack should fail on second call" { } }; - // Create consumer - const consumer_config = nats.ConsumerConfig{ - .durable_name = "dup_ack2_consumer", - .deliver_subject = "push.dup.ack2", - .ack_policy = .explicit, - }; - - var push_sub = try js.subscribe("TEST_DUP_ACK2_STREAM", consumer_config, DoubleAckHandler.handle, .{&test_data}); + var push_sub = try js.subscribe("test.dup.ack2.*", DoubleAckHandler.handle, .{&test_data}, .{ + .stream = "TEST_DUP_ACK2_STREAM", + .durable = "dup_ack2_consumer", + .config = .{ + .deliver_policy = .all, + }, + }); defer push_sub.deinit(); // Publish test message @@ -222,14 +220,13 @@ test "nak should fail after ack" { } }; - // Create consumer - const consumer_config = nats.ConsumerConfig{ - .durable_name = "ack_nak_consumer", - .deliver_subject = "push.ack.nak", - .ack_policy = .explicit, - }; - - var push_sub = try js.subscribe("TEST_ACK_NAK_STREAM", consumer_config, AckNakHandler.handle, .{&test_data}); + var push_sub = try js.subscribe("test.ack.nak.*", AckNakHandler.handle, .{&test_data}, .{ + .stream = "TEST_ACK_NAK_STREAM", + .durable = "ack_nak_consumer", + .config = .{ + .deliver_policy = .all, + }, + }); defer push_sub.deinit(); // Publish test message @@ -316,14 +313,13 @@ test "inProgress can be called multiple times" { } }; - // Create consumer - const consumer_config = nats.ConsumerConfig{ - .durable_name = "progress_consumer", - .deliver_subject = "push.progress", - .ack_policy = .explicit, - }; - - var push_sub = try js.subscribe("TEST_PROGRESS_STREAM", consumer_config, ProgressHandler.handle, .{&test_data}); + var push_sub = try js.subscribe("test.progress.*", ProgressHandler.handle, .{&test_data}, .{ + .stream = "TEST_PROGRESS_STREAM", + .durable = "progress_consumer", + .config = .{ + .deliver_policy = .all, + }, + }); defer push_sub.deinit(); // Publish test message diff --git a/tests/jetstream_nak_test.zig b/tests/jetstream_nak_test.zig index b24ba02..8d449fc 100644 --- a/tests/jetstream_nak_test.zig +++ b/tests/jetstream_nak_test.zig @@ -88,15 +88,14 @@ test "NAK redelivery with delivery count verification" { } }; - // Create push consumer with limited redeliveries - const consumer_config = nats.ConsumerConfig{ - .durable_name = "nak_test_consumer", - .deliver_subject = "push.nak.test", - .ack_policy = .explicit, - .max_deliver = 3, // Allow up to 3 delivery attempts - }; - - var push_sub = try js.subscribe("TEST_NAK_STREAM", consumer_config, NakHandler.handle, .{&test_data}); + // Create push consumer with limited redeliveries (deliver_subject auto-generated, ack_policy defaults to .explicit) + var push_sub = try js.subscribe("test.nak.*", NakHandler.handle, .{&test_data}, .{ + .stream = "TEST_NAK_STREAM", + .durable = "nak_test_consumer", + .config = .{ + .max_deliver = 3, // Allow up to 3 delivery attempts + }, + }); defer push_sub.deinit(); // Publish a test message @@ -194,15 +193,14 @@ test "NAK with max delivery limit" { } }; - // Consumer with max_deliver = 2 (original + 1 redelivery) - const consumer_config = nats.ConsumerConfig{ - .durable_name = "nak_limit_consumer", - .deliver_subject = "push.nak.limit", - .ack_policy = .explicit, - .max_deliver = 2, - }; - - var push_sub = try js.subscribe("TEST_NAK_LIMIT_STREAM", consumer_config, AlwaysNakHandler.handle, .{&test_data}); + var push_sub = try js.subscribe("test.nak.limit.*", AlwaysNakHandler.handle, .{&test_data}, .{ + .stream = "TEST_NAK_LIMIT_STREAM", + .durable = "nak_limit_consumer", + .config = .{ + .deliver_policy = .all, + .max_deliver = 2, + }, + }); defer push_sub.deinit(); // Publish test message @@ -300,14 +298,13 @@ test "JetStream message metadata parsing" { } }; - // Create consumer - const consumer_config = nats.ConsumerConfig{ - .durable_name = "metadata_consumer", - .deliver_subject = "push.metadata.test", - .ack_policy = .explicit, - }; - - var push_sub = try js.subscribe("TEST_METADATA_STREAM", consumer_config, MetadataHandler.handle, .{ &received_message, &metadata_verified, &mutex }); + var push_sub = try js.subscribe("test.metadata.*", MetadataHandler.handle, .{ &received_message, &metadata_verified, &mutex }, .{ + .stream = "TEST_METADATA_STREAM", + .durable = "metadata_consumer", + .config = .{ + .deliver_policy = .all, + }, + }); defer push_sub.deinit(); // Publish a test message @@ -409,15 +406,14 @@ test "NAK with delay redelivery timing" { } }; - // Create push consumer - const consumer_config = nats.ConsumerConfig{ - .durable_name = "nak_delay_consumer", - .deliver_subject = "push.nak.delay.test", - .ack_policy = .explicit, - .max_deliver = 3, - }; - - var push_sub = try js.subscribe("TEST_NAK_DELAY_STREAM", consumer_config, DelayHandler.handle, .{&test_data}); + var push_sub = try js.subscribe("test.nak.delay.*", DelayHandler.handle, .{&test_data}, .{ + .stream = "TEST_NAK_DELAY_STREAM", + .durable = "nak_delay_consumer", + .config = .{ + .deliver_policy = .all, + .max_deliver = 3, + }, + }); defer push_sub.deinit(); // Publish a test message @@ -503,15 +499,14 @@ test "NAK with zero delay behaves like regular NAK" { } }; - // Create push consumer - const consumer_config = nats.ConsumerConfig{ - .durable_name = "nak_zero_delay_consumer", - .deliver_subject = "push.nak.zero.test", - .ack_policy = .explicit, - .max_deliver = 3, - }; - - var push_sub = try js.subscribe("TEST_NAK_ZERO_DELAY_STREAM", consumer_config, ZeroDelayHandler.handle, .{ &delivery_count, &mutex }); + var push_sub = try js.subscribe("test.nak.zero.*", ZeroDelayHandler.handle, .{ &delivery_count, &mutex }, .{ + .stream = "TEST_NAK_ZERO_DELAY_STREAM", + .durable = "nak_zero_delay_consumer", + .config = .{ + .deliver_policy = .all, + .max_deliver = 3, + }, + }); defer push_sub.deinit(); // Publish a test message diff --git a/tests/jetstream_pull_test.zig b/tests/jetstream_pull_test.zig index 7d5990e..9bda2a5 100644 --- a/tests/jetstream_pull_test.zig +++ b/tests/jetstream_pull_test.zig @@ -31,14 +31,10 @@ test "JetStream pull consumer basic fetch" { var stream_info = try js.addStream(stream_config); defer stream_info.deinit(); - // Create a pull consumer - const consumer_config = ConsumerConfig{ - .durable_name = consumer_name, - .ack_policy = .explicit, - .filter_subject = "test.pull.*", - }; - - var subscription = try js.pullSubscribe(stream_name, consumer_config); + // Create a pull consumer (ack_policy defaults to .explicit) + var subscription = try js.pullSubscribe("test.pull.*", consumer_name, .{ + .stream = stream_name, + }); defer subscription.deinit(); // Publish some test messages diff --git a/tests/jetstream_push_test.zig b/tests/jetstream_push_test.zig index 50462f9..fc959cd 100644 --- a/tests/jetstream_push_test.zig +++ b/tests/jetstream_push_test.zig @@ -38,17 +38,14 @@ test "basic push subscription" { } }; - // Create push consumer configuration - // Note: deliver_subject must not overlap with stream subjects to avoid cycles - const consumer_config = nats.ConsumerConfig{ - .durable_name = "test_push_consumer", - .deliver_subject = "push.orders.processed", // Key for push consumer - different from stream subjects - .ack_policy = .explicit, - .deliver_policy = .all, - }; - - // Subscribe to push consumer - var push_sub = try js.subscribe("TEST_PUSH_STREAM", consumer_config, MessageHandler.handle, .{&message_count}); + // Subscribe to push consumer (deliver_subject auto-generated, ack_policy defaults to .explicit) + var push_sub = try js.subscribe("orders.*", MessageHandler.handle, .{&message_count}, .{ + .stream = "TEST_PUSH_STREAM", + .durable = "test_push_consumer", + .config = .{ + .deliver_policy = .all, + }, + }); defer push_sub.deinit(); // Publish some test messages @@ -96,18 +93,17 @@ test "push subscription with flow control" { } }; - // Create push consumer with flow control enabled - const consumer_config = nats.ConsumerConfig{ - .durable_name = "task_processor", - .deliver_subject = "push.tasks.process", // Different from stream subjects - .ack_policy = .explicit, - .deliver_policy = .all, - .flow_control = true, // Enable flow control - .idle_heartbeat = 30_000_000_000, // 30s - required when flow_control=true - .max_ack_pending = 10, // Limit pending acknowledgments - }; - - var push_sub = try js.subscribe("TEST_PUSH_FC_STREAM", consumer_config, TaskHandler.handle, .{&processed_count}); + // Subscribe with flow control enabled (deliver_subject auto-generated) + var push_sub = try js.subscribe("tasks.*", TaskHandler.handle, .{&processed_count}, .{ + .stream = "TEST_PUSH_FC_STREAM", + .durable = "task_processor", + .config = .{ + .deliver_policy = .all, + .flow_control = true, // Enable flow control + .idle_heartbeat = 30_000_000_000, // 30s - required when flow_control=true + .max_ack_pending = 10, // Limit pending acknowledgments + }, + }); defer push_sub.deinit(); // Publish several tasks @@ -130,21 +126,16 @@ test "push subscription error handling" { var js = conn.jetstream(.{}); - // Try to create push subscription without deliver_subject - should auto-generate one - // but fail with stream not found error - const config_without_deliver_subject = nats.ConsumerConfig{ - .durable_name = "test_consumer", - .ack_policy = .explicit, - // Missing deliver_subject - should be auto-generated - }; - const DummyHandler = struct { fn handle(js_msg: *nats.JetStreamMessage) void { defer js_msg.deinit(); } }; - // This should fail with StreamNotFound error since auto-generated deliver_subject should work - const result = js.subscribe("NONEXISTENT_STREAM", config_without_deliver_subject, DummyHandler.handle, .{}); + // This should fail with StreamNotFound error since stream doesn't exist + const result = js.subscribe("nonexistent.*", DummyHandler.handle, .{}, .{ + .stream = "NONEXISTENT_STREAM", + .durable = "test_consumer", + }); try testing.expectError(error.StreamNotFound, result); } diff --git a/tests/jetstream_sync_test.zig b/tests/jetstream_sync_test.zig index 9c6e465..f615a05 100644 --- a/tests/jetstream_sync_test.zig +++ b/tests/jetstream_sync_test.zig @@ -21,13 +21,10 @@ test "JetStream synchronous subscription basic functionality" { defer stream_info.deinit(); // Create synchronous subscription - const consumer_config = nats.ConsumerConfig{ - .durable_name = "sync_test_consumer", - .deliver_subject = "push.sync.test", - .ack_policy = .explicit, - }; - - var sync_sub = try js.subscribeSync("TEST_SYNC_STREAM", consumer_config); + var sync_sub = try js.subscribeSync("test.sync.*", .{ + .stream = "TEST_SYNC_STREAM", + .durable = "sync_test_consumer", + }); defer sync_sub.deinit(); // Publish a test message @@ -63,13 +60,10 @@ test "JetStream synchronous subscription timeout" { defer stream_info.deinit(); // Create synchronous subscription - const consumer_config = nats.ConsumerConfig{ - .durable_name = "sync_timeout_consumer", - .deliver_subject = "push.sync.timeout", - .ack_policy = .explicit, - }; - - var sync_sub = try js.subscribeSync("TEST_SYNC_TIMEOUT_STREAM", consumer_config); + var sync_sub = try js.subscribeSync("test.sync.timeout.*", .{ + .stream = "TEST_SYNC_TIMEOUT_STREAM", + .durable = "sync_timeout_consumer", + }); defer sync_sub.deinit(); // Test timeout (should return error.Timeout after timeout) @@ -99,13 +93,10 @@ test "JetStream synchronous subscription multiple messages" { defer stream_info.deinit(); // Create synchronous subscription - const consumer_config = nats.ConsumerConfig{ - .durable_name = "sync_multi_consumer", - .deliver_subject = "push.sync.multi", - .ack_policy = .explicit, - }; - - var sync_sub = try js.subscribeSync("TEST_SYNC_MULTI_STREAM", consumer_config); + var sync_sub = try js.subscribeSync("test.sync.multi.*", .{ + .stream = "TEST_SYNC_MULTI_STREAM", + .durable = "sync_multi_consumer", + }); defer sync_sub.deinit(); // Publish multiple test messages @@ -127,3 +118,37 @@ test "JetStream synchronous subscription multiple messages" { log.info("Multiple message synchronous subscription test completed successfully", .{}); } + +test "JetStream synchronous queue subscription basic functionality" { + const conn = try utils.createDefaultConnection(); + defer utils.closeConnection(conn); + + var js = conn.jetstream(.{}); + + // Create a test stream + const stream_config = nats.StreamConfig{ + .name = "TEST_QUEUE_SYNC_STREAM", + .subjects = &.{"test.queue.sync.*"}, + .max_msgs = 100, + }; + var stream_info = try js.addStream(stream_config); + defer stream_info.deinit(); + + // Create synchronous queue subscription + var queue_sub = try js.queueSubscribeSync("test.queue.sync.*", "test_queue", .{ + .stream = "TEST_QUEUE_SYNC_STREAM", + .durable = "sync_queue_consumer", + }); + defer queue_sub.deinit(); + + // Publish a test message + const test_message = "Queue sync test message"; + try conn.publish("test.queue.sync.message", test_message); + + // Wait for message using nextMsg + const js_msg = try queue_sub.nextMsg(5000); + defer js_msg.deinit(); + + // Verify message content + try testing.expectEqualStrings(test_message, js_msg.msg.data); +}