-
Notifications
You must be signed in to change notification settings - Fork 1
Add JetStream STREAM.PURGE API implementation #24
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
- Add StreamPurgeRequest struct with filter, seq, and keep options - Add StreamPurgeResponse struct with success and purged count - Implement purgeStream method in JetStream client - Export StreamPurgeRequest in public API - Add comprehensive tests for basic purge, filtered purge, and sequence-based purge All tests pass successfully.
WalkthroughAdds a JetStream purge API: introduces Changes
Sequence Diagram(s)sequenceDiagram
autonumber
actor App as Client
participant JS as JetStream
participant NATS as NATS_Server
App->>JS: purgeStream(stream_name, StreamPurgeRequest)
JS->>JS: build subject "STREAM.PURGE.{stream}"
JS->>JS: JSON-encode request (filter / seq / keep)
JS->>NATS: sendRequest(subject, body)
NATS-->>JS: JSON response {success, purged} or error
JS->>JS: maybeParseErrorResponse / parseResponse
JS-->>App: Result(StreamPurgeResponse)
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Possibly related PRs
Poem
📜 Recent review detailsConfiguration used: CodeRabbit UI Review profile: CHILL Plan: Pro 💡 Knowledge Base configuration:
You can enable these sources in your CodeRabbit configuration. 📒 Files selected for processing (1)
🚧 Files skipped from review as they are similar to previous changes (1)
✨ Finishing Touches🧪 Generate unit tests
🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. CodeRabbit Commands (Invoked using PR/Issue comments)Type Other keywords and placeholders
CodeRabbit Configuration File (
|
|
@claude add test for the keep parameter as well |
|
Claude finished @lalinsky's task —— View job Adding test for keep parameter ✅I've successfully added a test case for the Todo List:
Summary: Added a new test
The test follows the same pattern as existing purge tests and all tests pass successfully. The changes have been committed and pushed to the |
- Add test case that verifies keep parameter functionality - Test publishes 5 messages and uses keep=2 to ensure only 2 messages remain - Verifies that 3 messages are purged correctly Co-authored-by: Lukáš Lalinský <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (5)
src/jetstream.zig (2)
191-199: Request shape matches schema; consider documenting mutual exclusivity and null-emission behavior
- Fields and types align with the JetStream STREAM.PURGE request schema (filter, seq, keep are all optional; seq/keep are u64). However, per the official schema, seq and keep are mutually exclusive, though each can be combined with filter. Please document that here (and enforce in purgeStream). Also, if your Zig version emits nulls for unset optionals by default, the server may receive {"filter":null,...}; better to omit null fields (see purgeStream comment below). (beta-docs.nats.io)
201-208: Fix nit: success comment wording; consider public export for consumers
- The field comment says “Indicates if this response is an error” but the field is success: bool. Please correct to “Indicates whether the purge operation succeeded”.
- Optional: If downstream users may want to work with the concrete type (e.g., in signatures), consider making StreamPurgeResponse public like other public response types, though tests can already use the parsed value without importing the type.
Apply this diff to fix the comment:
/// Response from $JS.API.STREAM.PURGE const StreamPurgeResponse = struct { - /// Indicates if this response is an error + /// Indicates whether the purge operation succeeded success: bool, /// The number of messages purged purged: u64, };tests/jetstream_test.zig (3)
451-485: Filtered purge path is covered; add a post-condition on remaining messagesGreat targeted assertions (expect == 2). Optionally, validate that the non-matching subject remains by fetching stream info after purge and asserting state.messages (or by republishing/reading). This strengthens the contract that only the filtered subject was purged.
486-520: Sequence-based purge semantics are correct; consider a negative test for invalid requestAssertions align with “purge up to but not including seq”. Recommend adding:
- A test that setting both seq and keep returns error.InvalidPurgeRequest (after the proposed guard).
- A test for keep (e.g., publish 5, keep=2 ⇒ purged==3).
I can draft these tests to fit your utils helpers.
418-449: Prefer flushing over sleeping for test reliabilityThe
Connectiontype includes aflush(self: *Self) !voidmethod (see src/connection.zig:548–550) that ensures all pending writes are sent to the server. Instead of using an arbitrary sleep:
- Replace
withstd.time.sleep(100 * std.time.ns_per_ms);(ortry conn.flush();try js.connection.flush();if flushing via the JetStream wrapper).This guarantees the messages are processed before the purge, reducing test flakiness by relying on protocol acknowledgments rather than fixed delays.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (3)
src/jetstream.zig(2 hunks)src/root.zig(1 hunks)tests/jetstream_test.zig(1 hunks)
🔇 Additional comments (2)
src/jetstream.zig (1)
458-470: ValidateStreamPurgeRequestparameters and optimize the purge payload– Enforce mutual exclusivity of
seqandkeepat the top ofpurgeStreamto catch client‐side errors early.
– Send an empty payload ("") when no filter/seq/keep is set, instead of"{}", to trigger a full‐stream purge more cleanly.
– Omit null optionals when serializing (viaemit_null_optional_fields = false, if supported) so you don’t send{"filter":null,"seq":null,"keep":null}.Suggested diff:
pub fn purgeStream(self: *JetStream, stream_name: []const u8, request: StreamPurgeRequest) !Result(StreamPurgeResponse) { + // Fail fast if both seq and keep are provided + if (request.seq != null and request.keep != null) { + // Ensure `InvalidPurgeRequest` is defined in your error set, or replace with an existing error + return error.InvalidPurgeRequest; + } const subject = try std.fmt.allocPrint(self.allocator, "STREAM.PURGE.{s}", .{stream_name}); defer self.allocator.free(subject); - const request_json = try std.json.stringifyAlloc(self.allocator, request, .{}); - defer self.allocator.free(request_json); - const msg = try self.sendRequest(subject, request_json); + const has_payload = (request.filter != null) or (request.seq != null) or (request.keep != null); + const msg = if (!has_payload) blk: { + // Purge-all: empty payload preferred by the server + break :blk try self.sendRequest(subject, ""); + } else blk: { + // Exclude null fields if supported + const request_json = try std.json.stringifyAlloc(self.allocator, request, .{ + .emit_null_optional_fields = false, + }); + defer self.allocator.free(request_json); + break :blk try self.sendRequest(subject, request_json); + }; defer msg.deinit(); return try self.parseResponse(StreamPurgeResponse, msg); }Please verify:
- You have an
InvalidPurgeRequesterror in your error set (or swap in an existing error).- Your Zig version’s
std.json.stringifyAllocsupportsemit_null_optional_fields. If not, you can hand-construct a JSON object with only non-null fields.src/root.zig (1)
20-20: Good re-export for ergonomicsRe-exporting StreamPurgeRequest at the root keeps the public surface consistent with other JetStream types. LGTM.
|
@claude please address this: 418-449: Prefer flushing over sleeping for test reliability The Connection type includes a flush(self: *Self) !void method (see src/connection.zig:548–550) that ensures all pending writes are sent to the server. Instead of using an arbitrary sleep: Replace |
|
Claude finished @lalinsky's task —— View job Replacing sleep with flush for test reliability ✅Todo List:
Summary: Successfully improved test reliability by replacing all 4 Changes made:
This ensures messages are properly sent to the server before purge operations, making tests more reliable by relying on protocol acknowledgments rather than arbitrary delays. All tests continue to pass, confirming the changes work correctly. |
Improve test reliability by using connection flush instead of arbitrary sleep delays. This ensures messages are properly sent to the server before attempting purge operations, reducing test flakiness. Co-authored-by: Lukáš Lalinský <[email protected]>
|
@claude move these tests to jetstream_stream_purge_test.zig and make sure they are imported in jetstream.zig |
|
Claude finished @lalinsky's task —— View job Moving purge tests to dedicated file ✅Todo List:
Summary: Successfully refactored the JetStream purge tests into a dedicated test file: Changes made:
Test organization:
All tests continue to pass (26/26 successful), confirming the refactor maintains functionality while improving code organization. |
- Create tests/jetstream_stream_purge_test.zig with all purge-related tests - Remove purge tests from tests/jetstream_test.zig - Update tests/all_tests.zig to import the new test module - All existing tests continue to pass Co-authored-by: Lukáš Lalinský <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (4)
tests/jetstream_test.zig (4)
418-449: Tighten the assertion and clean up the stream (nice use of conn.flush()).
- Since exactly 3 messages are published, assert the exact purged count for stronger guarantees.
- Add a best-effort stream cleanup to avoid cross-test contamination if the test runner reuses a server.
var stream_info = try js.addStream(stream_config); defer stream_info.deinit(); + // Ensure no state is left behind if the test server is reused + defer js.deleteStream("TEST_PURGE_STREAM") catch {}; // Publish some messages to the stream try conn.publish("test.purge.msg1", "First message"); try conn.publish("test.purge.msg2", "Second message"); try conn.publish("test.purge.msg3", "Third message"); @@ // Verify purge was successful and messages were removed try testing.expect(purge_result.value.success); - try testing.expect(purge_result.value.purged > 0); + try testing.expect(purge_result.value.purged == 3);
451-485: Add idempotence check and cleanup for the filtered purge.
- Add deleteStream() for hygiene.
- Verify idempotence by purging with the same filter again and asserting 0 purged.
var stream_info = try js.addStream(stream_config); defer stream_info.deinit(); + // Cleanup to prevent residue across test runs + defer js.deleteStream("TEST_PURGE_FILTER_STREAM") catch {}; @@ var purge_result = try js.purgeStream("TEST_PURGE_FILTER_STREAM", purge_request); defer purge_result.deinit(); @@ try testing.expect(purge_result.value.success); try testing.expect(purge_result.value.purged == 2); + + // A second purge with the same filter should be a no-op + const purge_again = nats.StreamPurgeRequest{ + .filter = "test.filter.purge", + }; + var purge_result2 = try js.purgeStream("TEST_PURGE_FILTER_STREAM", purge_again); + defer purge_result2.deinit(); + try testing.expect(purge_result2.value.success); + try testing.expect(purge_result2.value.purged == 0);
521-556: Strengthen the keep test with idempotence and add cleanup.
- Add deleteStream() to keep the test environment clean.
- After keeping 2, purge again with keep=2 to assert no-ops when already at the threshold.
var stream_info = try js.addStream(stream_config); defer stream_info.deinit(); + defer js.deleteStream("TEST_PURGE_KEEP_STREAM") catch {}; @@ var purge_result = try js.purgeStream("TEST_PURGE_KEEP_STREAM", purge_request); defer purge_result.deinit(); @@ try testing.expect(purge_result.value.success); try testing.expect(purge_result.value.purged == 3); + + // When only 2 messages remain, keep=2 should purge nothing + const purge_again = nats.StreamPurgeRequest{ .keep = 2 }; + var purge_result2 = try js.purgeStream("TEST_PURGE_KEEP_STREAM", purge_again); + defer purge_result2.deinit(); + try testing.expect(purge_result2.value.success); + try testing.expect(purge_result2.value.purged == 0);
486-520: Add cleanup and verify exclusive purge semantics
- After creating the stream, defer its deletion to prevent state leakage:
var stream_info = try js.addStream(stream_config); defer stream_info.deinit();
- defer js.deleteStream("TEST_PURGE_SEQ_STREAM") catch {};
- Confirm that
seqis exclusive (only messages with sequence < seq are purged) by issuing a second purge with the sameseqand expecting zero messages removed (beta-docs.nats.io, nats-io.github.io).- Updated diff:
var stream_info = try js.addStream(stream_config); defer stream_info.deinit(); + defer js.deleteStream("TEST_PURGE_SEQ_STREAM") catch {}; @@ var purge_result = try js.purgeStream("TEST_PURGE_SEQ_STREAM", purge_request); defer purge_result.deinit(); @@ try testing.expect(purge_result.value.success); try testing.expect(purge_result.value.purged == 2); + + // A second purge with the same sequence should remove nothing (< seq semantics) + const purge_again = nats.StreamPurgeRequest{ .seq = 3 }; + var purge_result2 = try js.purgeStream("TEST_PURGE_SEQ_STREAM", purge_again); + defer purge_result2.deinit(); + try testing.expect(purge_result2.value.success); + try testing.expect(purge_result2.value.purged == 0);
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (1)
tests/jetstream_test.zig(1 hunks)
|
@claude one more thing, in the tests, use the |
|
Claude finished @lalinsky's task —— View job Updating test syntax to use concise struct literals ✅Todo List:
Summary: Successfully updated all JetStream purge stream tests to use the more concise anonymous struct literal syntax as requested. Changes made:
Before: const purge_request = nats.StreamPurgeRequest{ .filter = "test.filter.purge" };
var purge_result = try js.purgeStream("STREAM_NAME", purge_request);After: var purge_result = try js.purgeStream("STREAM_NAME", .{ .filter = "test.filter.purge" });All tests continue to pass (26/26 successful), confirming the syntax change maintains functionality while making the code cleaner and more idiomatic. |
Replace nats.StreamPurgeRequest{} with .{} syntax for cleaner, more idiomatic test code.
Co-authored-by: Lukáš Lalinský <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (5)
tests/jetstream_stream_purge_test.zig (5)
34-37: Strengthen the assertion: check the exact purged count (deterministic).You publish exactly 3 messages before purging. As the suite’s
beforeEachclears streams, asserting== 3is more precise than> 0.Apply this diff:
- // Verify purge was successful and messages were removed - try testing.expect(purge_result.value.success); - try testing.expect(purge_result.value.purged > 0); + // Verify purge was successful and removed exactly 3 messages + try testing.expect(purge_result.value.success); + try testing.expectEqual(@as(@TypeOf(purge_result.value.purged), 3), purge_result.value.purged);
69-72: PreferexpectEqualfor clearer failures.You already assert equality;
expectEqualyields better diagnostics on mismatch.Apply this diff:
- try testing.expect(purge_result.value.success); - try testing.expect(purge_result.value.purged == 2); + try testing.expect(purge_result.value.success); + try testing.expectEqual(@as(@TypeOf(purge_result.value.purged), 2), purge_result.value.purged);
105-107: Nit: useexpectEqualhere as well.- try testing.expect(purge_result.value.success); - try testing.expect(purge_result.value.purged == 2); + try testing.expect(purge_result.value.success); + try testing.expectEqual(@as(@TypeOf(purge_result.value.purged), 2), purge_result.value.purged);
141-143: Nit: useexpectEqualfor consistency and better diffs.- try testing.expect(purge_result.value.success); - try testing.expect(purge_result.value.purged == 3); + try testing.expect(purge_result.value.success); + try testing.expectEqual(@as(@TypeOf(purge_result.value.purged), 3), purge_result.value.purged);
1-5: Optional: reduce duplication with a tiny publish helper.Each test repeats publish+flush. A small local helper keeps tests concise and consistent.
Add near the top of the file:
fn publishMany(conn: *nats.Connection, subject: []const u8, msgs: []const []const u8) !void { for (msgs) |m| try conn.publish(subject, m); try conn.flush(); }Then, for example in “purge stream”:
- // Publish some messages to the stream - try conn.publish("test.purge.msg1", "First message"); - try conn.publish("test.purge.msg2", "Second message"); - try conn.publish("test.purge.msg3", "Third message"); - - // Flush to ensure messages are sent to the server - try conn.flush(); + // Publish some messages to the stream + try publishMany(&conn, "test.purge.msg1", &.{ "First message" }); + try publishMany(&conn, "test.purge.msg2", &.{ "Second message" }); + try publishMany(&conn, "test.purge.msg3", &.{ "Third message" });Or, alternatively, group by subject:
try publishMany(&conn, "test.purge.msg1", &.{"First message"}); try publishMany(&conn, "test.purge.msg2", &.{"Second message"}); try publishMany(&conn, "test.purge.msg3", &.{"Third message"});Use the same pattern in the other tests.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (3)
tests/all_tests.zig(1 hunks)tests/jetstream_stream_purge_test.zig(1 hunks)tests/jetstream_test.zig(1 hunks)
✅ Files skipped from review due to trivial changes (1)
- tests/jetstream_test.zig
🔇 Additional comments (3)
tests/all_tests.zig (1)
11-11: Good: purge tests are now included in the test suite.Importing
jetstream_stream_purge_test.zighere ensures the new tests run viarefAllDecls. This also addresses the prior request to move these tests into their own file and wire them into the suite.tests/jetstream_stream_purge_test.zig (2)
26-27: Nice: replaced fixed sleeps with protocolflush()calls.Using
conn.flush()eliminates timing flakiness and aligns with the earlier review guidance.Also applies to: 59-61, 95-97, 131-133
109-143: Thanks for adding thekeeptest.This addresses the reviewer’s request. The expectations look correct for 5 messages with
keep = 2(expectingpurged == 3).
Summary
Changes Made
filter,seq, andkeepparameterssuccessandpurgedfields$JS.API.STREAM.PURGE.{stream}StreamPurgeRequestin root moduleAPI Usage
Test Plan
All tests pass successfully.
Summary by CodeRabbit
New Features
Tests