Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,41 @@ var consumer_info = try js.addConsumer("ORDERS", consumer_config);
defer consumer_info.deinit();
```

### JetStream Push Subscriptions

```zig
// Push subscription with callback handler
fn orderHandler(js_msg: *nats.JetStreamMessage, count: *u32) !void {
defer js_msg.deinit();
count.* += 1;
try js_msg.ack(); // Acknowledge message
std.debug.print("Order: {s}\n", .{js_msg.data});
}

var processed: u32 = 0;
var push_sub = try js.subscribe("orders.*", orderHandler, .{&processed}, .{
.stream = "ORDERS",
.durable = "order_processor",
});
defer push_sub.deinit();
```

### JetStream Pull Subscriptions

```zig
// Pull subscription (fetch messages manually)
var pull_sub = try js.pullSubscribe("orders.*", "batch_processor", .{
.stream = "ORDERS",
});
defer pull_sub.deinit();

var batch = try pull_sub.fetch(10, 5000); // Fetch up to 10 msgs, 5s timeout
defer batch.deinit();
for (batch.messages) |js_msg| {
try js_msg.ack();
}
```

## Building

```bash
Expand Down
357 changes: 314 additions & 43 deletions src/jetstream.zig

Large diffs are not rendered by default.

5 changes: 4 additions & 1 deletion src/jetstream_kv.zig
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,10 @@ pub const KVWatcher = struct {
.filter_subjects = subjects,
};

var sub = try kv.js.subscribeSync(kv.stream_name, consumer_config);
var sub = try kv.js.subscribeSync(null, .{
.stream = kv.stream_name,
.config = consumer_config,
});
errdefer sub.deinit();

// Match C logic: only use num_pending to detect empty streams
Expand Down
12 changes: 8 additions & 4 deletions src/jetstream_objstore.zig
Original file line number Diff line number Diff line change
Expand Up @@ -459,10 +459,12 @@ pub const ObjectStore = struct {
.deliver_policy = .all,
.ack_policy = .none,
.max_ack_pending = 0,
.filter_subjects = &.{chunk_subject},
};

const sub = try self.js.subscribeSync(self.stream_name, consumer_config);
const sub = try self.js.subscribeSync(chunk_subject, .{
.stream = self.stream_name,
.config = consumer_config,
});

// Create result with subscription
return ObjectResult.init(arena_allocator, obj_info, sub, arena);
Expand Down Expand Up @@ -600,10 +602,12 @@ pub const ObjectStore = struct {
.deliver_policy = .last_per_subject,
.ack_policy = .none,
.max_ack_pending = 0,
.filter_subjects = &.{meta_filter},
};

const sub = try self.js.subscribeSync(self.stream_name, consumer_config);
const sub = try self.js.subscribeSync(meta_filter, .{
.stream = self.stream_name,
.config = consumer_config,
});
defer sub.deinit();

const arena = try self.allocator.create(std.heap.ArenaAllocator);
Expand Down
60 changes: 28 additions & 32 deletions tests/jetstream_duplicate_ack_test.zig
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,13 @@ test "ack should succeed on first call" {
}
};

// Create consumer
const consumer_config = nats.ConsumerConfig{
.durable_name = "dup_ack_consumer",
.deliver_subject = "push.dup.ack",
.ack_policy = .explicit,
};

var push_sub = try js.subscribe("TEST_DUP_ACK_STREAM", consumer_config, AckHandler.handle, .{&test_data});
var push_sub = try js.subscribe("test.dup.ack.*", AckHandler.handle, .{&test_data}, .{
.stream = "TEST_DUP_ACK_STREAM",
.durable = "dup_ack_consumer",
.config = .{
.deliver_policy = .all,
},
});
defer push_sub.deinit();

// Publish test message
Expand Down Expand Up @@ -135,14 +134,13 @@ test "ack should fail on second call" {
}
};

// Create consumer
const consumer_config = nats.ConsumerConfig{
.durable_name = "dup_ack2_consumer",
.deliver_subject = "push.dup.ack2",
.ack_policy = .explicit,
};

var push_sub = try js.subscribe("TEST_DUP_ACK2_STREAM", consumer_config, DoubleAckHandler.handle, .{&test_data});
var push_sub = try js.subscribe("test.dup.ack2.*", DoubleAckHandler.handle, .{&test_data}, .{
.stream = "TEST_DUP_ACK2_STREAM",
.durable = "dup_ack2_consumer",
.config = .{
.deliver_policy = .all,
},
});
defer push_sub.deinit();

// Publish test message
Expand Down Expand Up @@ -222,14 +220,13 @@ test "nak should fail after ack" {
}
};

// Create consumer
const consumer_config = nats.ConsumerConfig{
.durable_name = "ack_nak_consumer",
.deliver_subject = "push.ack.nak",
.ack_policy = .explicit,
};

var push_sub = try js.subscribe("TEST_ACK_NAK_STREAM", consumer_config, AckNakHandler.handle, .{&test_data});
var push_sub = try js.subscribe("test.ack.nak.*", AckNakHandler.handle, .{&test_data}, .{
.stream = "TEST_ACK_NAK_STREAM",
.durable = "ack_nak_consumer",
.config = .{
.deliver_policy = .all,
},
});
defer push_sub.deinit();

// Publish test message
Expand Down Expand Up @@ -316,14 +313,13 @@ test "inProgress can be called multiple times" {
}
};

// Create consumer
const consumer_config = nats.ConsumerConfig{
.durable_name = "progress_consumer",
.deliver_subject = "push.progress",
.ack_policy = .explicit,
};

var push_sub = try js.subscribe("TEST_PROGRESS_STREAM", consumer_config, ProgressHandler.handle, .{&test_data});
var push_sub = try js.subscribe("test.progress.*", ProgressHandler.handle, .{&test_data}, .{
.stream = "TEST_PROGRESS_STREAM",
.durable = "progress_consumer",
.config = .{
.deliver_policy = .all,
},
});
defer push_sub.deinit();

// Publish test message
Expand Down
83 changes: 39 additions & 44 deletions tests/jetstream_nak_test.zig
Original file line number Diff line number Diff line change
Expand Up @@ -88,15 +88,14 @@ test "NAK redelivery with delivery count verification" {
}
};

// Create push consumer with limited redeliveries
const consumer_config = nats.ConsumerConfig{
.durable_name = "nak_test_consumer",
.deliver_subject = "push.nak.test",
.ack_policy = .explicit,
.max_deliver = 3, // Allow up to 3 delivery attempts
};

var push_sub = try js.subscribe("TEST_NAK_STREAM", consumer_config, NakHandler.handle, .{&test_data});
// Create push consumer with limited redeliveries (deliver_subject auto-generated, ack_policy defaults to .explicit)
var push_sub = try js.subscribe("test.nak.*", NakHandler.handle, .{&test_data}, .{
.stream = "TEST_NAK_STREAM",
.durable = "nak_test_consumer",
.config = .{
.max_deliver = 3, // Allow up to 3 delivery attempts
},
});
defer push_sub.deinit();

// Publish a test message
Expand Down Expand Up @@ -194,15 +193,14 @@ test "NAK with max delivery limit" {
}
};

// Consumer with max_deliver = 2 (original + 1 redelivery)
const consumer_config = nats.ConsumerConfig{
.durable_name = "nak_limit_consumer",
.deliver_subject = "push.nak.limit",
.ack_policy = .explicit,
.max_deliver = 2,
};

var push_sub = try js.subscribe("TEST_NAK_LIMIT_STREAM", consumer_config, AlwaysNakHandler.handle, .{&test_data});
var push_sub = try js.subscribe("test.nak.limit.*", AlwaysNakHandler.handle, .{&test_data}, .{
.stream = "TEST_NAK_LIMIT_STREAM",
.durable = "nak_limit_consumer",
.config = .{
.deliver_policy = .all,
.max_deliver = 2,
},
});
defer push_sub.deinit();

// Publish test message
Expand Down Expand Up @@ -300,14 +298,13 @@ test "JetStream message metadata parsing" {
}
};

// Create consumer
const consumer_config = nats.ConsumerConfig{
.durable_name = "metadata_consumer",
.deliver_subject = "push.metadata.test",
.ack_policy = .explicit,
};

var push_sub = try js.subscribe("TEST_METADATA_STREAM", consumer_config, MetadataHandler.handle, .{ &received_message, &metadata_verified, &mutex });
var push_sub = try js.subscribe("test.metadata.*", MetadataHandler.handle, .{ &received_message, &metadata_verified, &mutex }, .{
.stream = "TEST_METADATA_STREAM",
.durable = "metadata_consumer",
.config = .{
.deliver_policy = .all,
},
});
defer push_sub.deinit();

// Publish a test message
Expand Down Expand Up @@ -409,15 +406,14 @@ test "NAK with delay redelivery timing" {
}
};

// Create push consumer
const consumer_config = nats.ConsumerConfig{
.durable_name = "nak_delay_consumer",
.deliver_subject = "push.nak.delay.test",
.ack_policy = .explicit,
.max_deliver = 3,
};

var push_sub = try js.subscribe("TEST_NAK_DELAY_STREAM", consumer_config, DelayHandler.handle, .{&test_data});
var push_sub = try js.subscribe("test.nak.delay.*", DelayHandler.handle, .{&test_data}, .{
.stream = "TEST_NAK_DELAY_STREAM",
.durable = "nak_delay_consumer",
.config = .{
.deliver_policy = .all,
.max_deliver = 3,
},
});
defer push_sub.deinit();

// Publish a test message
Expand Down Expand Up @@ -503,15 +499,14 @@ test "NAK with zero delay behaves like regular NAK" {
}
};

// Create push consumer
const consumer_config = nats.ConsumerConfig{
.durable_name = "nak_zero_delay_consumer",
.deliver_subject = "push.nak.zero.test",
.ack_policy = .explicit,
.max_deliver = 3,
};

var push_sub = try js.subscribe("TEST_NAK_ZERO_DELAY_STREAM", consumer_config, ZeroDelayHandler.handle, .{ &delivery_count, &mutex });
var push_sub = try js.subscribe("test.nak.zero.*", ZeroDelayHandler.handle, .{ &delivery_count, &mutex }, .{
.stream = "TEST_NAK_ZERO_DELAY_STREAM",
.durable = "nak_zero_delay_consumer",
.config = .{
.deliver_policy = .all,
.max_deliver = 3,
},
});
defer push_sub.deinit();

// Publish a test message
Expand Down
12 changes: 4 additions & 8 deletions tests/jetstream_pull_test.zig
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,10 @@ test "JetStream pull consumer basic fetch" {
var stream_info = try js.addStream(stream_config);
defer stream_info.deinit();

// Create a pull consumer
const consumer_config = ConsumerConfig{
.durable_name = consumer_name,
.ack_policy = .explicit,
.filter_subject = "test.pull.*",
};

var subscription = try js.pullSubscribe(stream_name, consumer_config);
// Create a pull consumer (ack_policy defaults to .explicit)
var subscription = try js.pullSubscribe("test.pull.*", consumer_name, .{
.stream = stream_name,
});
defer subscription.deinit();

// Publish some test messages
Expand Down
57 changes: 24 additions & 33 deletions tests/jetstream_push_test.zig
Original file line number Diff line number Diff line change
Expand Up @@ -38,17 +38,14 @@ test "basic push subscription" {
}
};

// Create push consumer configuration
// Note: deliver_subject must not overlap with stream subjects to avoid cycles
const consumer_config = nats.ConsumerConfig{
.durable_name = "test_push_consumer",
.deliver_subject = "push.orders.processed", // Key for push consumer - different from stream subjects
.ack_policy = .explicit,
.deliver_policy = .all,
};

// Subscribe to push consumer
var push_sub = try js.subscribe("TEST_PUSH_STREAM", consumer_config, MessageHandler.handle, .{&message_count});
// Subscribe to push consumer (deliver_subject auto-generated, ack_policy defaults to .explicit)
var push_sub = try js.subscribe("orders.*", MessageHandler.handle, .{&message_count}, .{
.stream = "TEST_PUSH_STREAM",
.durable = "test_push_consumer",
.config = .{
.deliver_policy = .all,
},
});
defer push_sub.deinit();

// Publish some test messages
Expand Down Expand Up @@ -96,18 +93,17 @@ test "push subscription with flow control" {
}
};

// Create push consumer with flow control enabled
const consumer_config = nats.ConsumerConfig{
.durable_name = "task_processor",
.deliver_subject = "push.tasks.process", // Different from stream subjects
.ack_policy = .explicit,
.deliver_policy = .all,
.flow_control = true, // Enable flow control
.idle_heartbeat = 30_000_000_000, // 30s - required when flow_control=true
.max_ack_pending = 10, // Limit pending acknowledgments
};

var push_sub = try js.subscribe("TEST_PUSH_FC_STREAM", consumer_config, TaskHandler.handle, .{&processed_count});
// Subscribe with flow control enabled (deliver_subject auto-generated)
var push_sub = try js.subscribe("tasks.*", TaskHandler.handle, .{&processed_count}, .{
.stream = "TEST_PUSH_FC_STREAM",
.durable = "task_processor",
.config = .{
.deliver_policy = .all,
.flow_control = true, // Enable flow control
.idle_heartbeat = 30_000_000_000, // 30s - required when flow_control=true
.max_ack_pending = 10, // Limit pending acknowledgments
},
});
defer push_sub.deinit();

// Publish several tasks
Expand All @@ -130,21 +126,16 @@ test "push subscription error handling" {

var js = conn.jetstream(.{});

// Try to create push subscription without deliver_subject - should auto-generate one
// but fail with stream not found error
const config_without_deliver_subject = nats.ConsumerConfig{
.durable_name = "test_consumer",
.ack_policy = .explicit,
// Missing deliver_subject - should be auto-generated
};

const DummyHandler = struct {
fn handle(js_msg: *nats.JetStreamMessage) void {
defer js_msg.deinit();
}
};

// This should fail with StreamNotFound error since auto-generated deliver_subject should work
const result = js.subscribe("NONEXISTENT_STREAM", config_without_deliver_subject, DummyHandler.handle, .{});
// This should fail with StreamNotFound error since stream doesn't exist
const result = js.subscribe("nonexistent.*", DummyHandler.handle, .{}, .{
.stream = "NONEXISTENT_STREAM",
.durable = "test_consumer",
});
try testing.expectError(error.StreamNotFound, result);
}
Loading