Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
189 changes: 70 additions & 119 deletions src/jetstream_objstore.zig
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ const ConsumerConfig = @import("jetstream.zig").ConsumerConfig;
const ConsumerInfo = @import("jetstream.zig").ConsumerInfo;
const PublishOptions = @import("jetstream.zig").PublishOptions;
const JetStreamSubscription = @import("jetstream.zig").JetStreamSubscription;
const JetStreamMessage = @import("jetstream.zig").JetStreamMessage;
const Result = @import("result.zig").Result;
const StoredMessage = @import("jetstream.zig").StoredMessage;
const Message = @import("message.zig").Message;
Expand Down Expand Up @@ -114,152 +115,113 @@ pub const ObjectStoreConfig = struct {
chunk_size: u32 = DEFAULT_CHUNK_SIZE,
};

/// Stream reader for object chunks - provides progressive chunk reading
pub const ObjectReader = struct {
store: ObjectStore,
/// ObjectResult provides streaming access to object data with integrated reading capabilities
pub const ObjectResult = struct {
info: ObjectInfo,
arena: *std.heap.ArenaAllocator,
// Reading state (zero-copy streaming)
subscription: ?*JetStreamSubscription,
buffer: std.ArrayList(u8),
buffer_pos: usize,
current_msg: ?*JetStreamMessage,
msg_pos: usize,
chunk_index: u32,
digest: std.crypto.hash.sha2.Sha256,
eof: bool,
verified: bool,
allocator: std.mem.Allocator,

pub fn init(allocator: std.mem.Allocator, store: ObjectStore, info: ObjectInfo, subscription: ?*JetStreamSubscription) ObjectReader {
return ObjectReader{
.store = store,
pub fn init(allocator: std.mem.Allocator, info: ObjectInfo, subscription: ?*JetStreamSubscription, arena: *std.heap.ArenaAllocator) ObjectResult {
return ObjectResult{
.info = info,
.arena = arena,
.subscription = subscription,
.buffer = std.ArrayList(u8).init(allocator),
.buffer_pos = 0,
.current_msg = null,
.msg_pos = 0,
.chunk_index = 0,
.digest = std.crypto.hash.sha2.Sha256.init(.{}),
.eof = false,
.verified = false,
.allocator = allocator,
};
}

pub fn deinit(self: *ObjectReader) void {
self.buffer.deinit();
pub fn deinit(self: *ObjectResult) void {
if (self.current_msg) |msg| {
msg.deinit();
}
if (self.subscription) |sub| {
sub.deinit();
}
const child_allocator = self.arena.child_allocator;
self.arena.deinit();
child_allocator.destroy(self.arena);
}

/// Read data from the object stream
pub fn read(self: *ObjectReader, dest: []u8) !usize {
pub fn read(self: *ObjectResult, dest: []u8) !usize {
if (self.eof) return 0;

var bytes_written: usize = 0;
// Early return if no subscription (empty objects)
const sub = self.subscription orelse return 0;

while (bytes_written < dest.len) {
// If buffer is empty, fetch next chunk
if (self.buffer_pos >= self.buffer.items.len) {
if (self.chunk_index >= self.info.chunks) {
self.eof = true;
break;
}
// If we don't have a current message, try to get one
if (self.current_msg == null) {
if (self.chunk_index >= self.info.chunks) {
self.eof = true;
return 0;
}

// Get next chunk from subscription
const timeout_ms = self.store.js.nc.options.timeout_ms;
const js_msg = self.subscription.?.nextMsg(timeout_ms) catch |err| {
// Only treat permanent errors as EOF, not temporary ones
switch (err) {
error.Timeout => {
// Temporary error - allow retry by returning partial data
break;
},
else => {
// Permanent error - propagate to caller
return err;
},
}
};
defer js_msg.deinit();
// Get next chunk from subscription
const timeout_ms = sub.js.nc.options.timeout_ms;
const js_msg = try sub.nextMsg(timeout_ms);

// Reset buffer with new chunk data
self.buffer.clearRetainingCapacity();
try self.buffer.appendSlice(js_msg.msg.data);
self.buffer_pos = 0;
self.chunk_index += 1;
// Store the message pointer and reset position
self.current_msg = js_msg;
self.msg_pos = 0;
self.chunk_index += 1;

// Update digest
self.digest.update(js_msg.msg.data);
// Update digest
self.digest.update(js_msg.msg.data);

// Validate we haven't received more chunks than expected
if (self.chunk_index > self.info.chunks) {
return ObjectStoreError.ChunkMismatch;
}
// Validate we haven't received more chunks than expected
if (self.chunk_index > self.info.chunks) {
return ObjectStoreError.ChunkMismatch;
}
}

// Copy from buffer to destination
const available = self.buffer.items.len - self.buffer_pos;
const to_copy = @min(available, dest.len - bytes_written);
// Copy from current message to destination
const msg = self.current_msg.?;
const available = msg.msg.data.len - self.msg_pos;
const to_copy = @min(available, dest.len);

@memcpy(
dest[bytes_written .. bytes_written + to_copy],
self.buffer.items[self.buffer_pos .. self.buffer_pos + to_copy],
);
@memcpy(
dest[0..to_copy],
msg.msg.data[self.msg_pos .. self.msg_pos + to_copy],
);

self.buffer_pos += to_copy;
bytes_written += to_copy;
}
self.msg_pos += to_copy;

// Check if we've completed reading all expected chunks
if (self.chunk_index == self.info.chunks and self.buffer_pos >= self.buffer.items.len) {
self.eof = true;
// Automatically verify digest when stream is complete
if (!self.verified) {
self.autoVerify() catch {
// Ignore verification error here - it can be checked later
// with verify() or isVerified()
};
// If we've consumed this entire message, clean it up
if (self.msg_pos >= msg.msg.data.len) {
msg.deinit();
self.current_msg = null;
self.msg_pos = 0;

// Check if we've completed reading all expected chunks
if (self.chunk_index >= self.info.chunks) {
self.eof = true;
}
}

return bytes_written;
return to_copy;
}

/// Internal automatic verification when stream completes
fn autoVerify(self: *ObjectReader) !void {
if (self.verified) return;

/// Verify the complete object integrity
pub fn verify(self: *ObjectResult) !void {
const calculated_digest = self.digest.finalResult();
var digest_hex: [64]u8 = undefined;
_ = std.fmt.bufPrint(&digest_hex, "{s}", .{std.fmt.fmtSliceHexLower(&calculated_digest)}) catch unreachable;

if (!std.mem.eql(u8, &digest_hex, self.info.digest)) {
return ObjectStoreError.DigestMismatch;
}

self.verified = true;
}

/// Verify the complete object integrity (public API)
pub fn verify(self: *ObjectReader) !void {
try self.autoVerify();
}

/// Check if the object has been verified
pub fn isVerified(self: *ObjectReader) bool {
return self.verified;
}
};

/// ObjectResult provides streaming access to object data
pub const ObjectResult = struct {
info: ObjectInfo,
reader: ObjectReader,
arena: *std.heap.ArenaAllocator,

pub fn deinit(self: *ObjectResult) void {
self.reader.deinit();
const child_allocator = self.arena.child_allocator;
self.arena.deinit();
child_allocator.destroy(self.arena);
}
};
Comment on lines +216 to 226
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix: use slices (not pointers-to-arrays) in hex formatting and equality; also guard against partial verify

Two issues:

  • bufPrint(&digest_hex, ...) and eql(&digest_hex, ...) should use slices (digest_hex[0..]) for clarity and to avoid type pitfalls.
  • verify() doesn’t enforce that all chunks were read; calling it mid-stream finalizes the hasher and can’t be resumed. Guard against partial reads.

Apply:

     pub fn verify(self: *ObjectResult) !void {
+        // Ensure full object was processed before finalizing digest
+        if (!self.eof or self.current_msg != null or self.chunk_index != self.info.chunks) {
+            return ObjectStoreError.ChunkMismatch;
+        }
         const calculated_digest = self.digest.finalResult();
         var digest_hex: [64]u8 = undefined;
-        _ = std.fmt.bufPrint(&digest_hex, "{s}", .{std.fmt.fmtSliceHexLower(&calculated_digest)}) catch unreachable;
+        _ = std.fmt.bufPrint(digest_hex[0..], "{s}", .{std.fmt.fmtSliceHexLower(&calculated_digest)}) catch unreachable;

-        if (!std.mem.eql(u8, &digest_hex, self.info.digest)) {
+        if (!std.mem.eql(u8, digest_hex[0..], self.info.digest)) {
             return ObjectStoreError.DigestMismatch;
         }
     }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
/// Verify the complete object integrity
pub fn verify(self: *ObjectResult) !void {
const calculated_digest = self.digest.finalResult();
var digest_hex: [64]u8 = undefined;
_ = std.fmt.bufPrint(&digest_hex, "{s}", .{std.fmt.fmtSliceHexLower(&calculated_digest)}) catch unreachable;
if (!std.mem.eql(u8, &digest_hex, self.info.digest)) {
return ObjectStoreError.DigestMismatch;
}
self.verified = true;
}
/// Verify the complete object integrity (public API)
pub fn verify(self: *ObjectReader) !void {
try self.autoVerify();
}
/// Check if the object has been verified
pub fn isVerified(self: *ObjectReader) bool {
return self.verified;
}
};
/// ObjectResult provides streaming access to object data
pub const ObjectResult = struct {
info: ObjectInfo,
reader: ObjectReader,
arena: *std.heap.ArenaAllocator,
pub fn deinit(self: *ObjectResult) void {
self.reader.deinit();
const child_allocator = self.arena.child_allocator;
self.arena.deinit();
child_allocator.destroy(self.arena);
}
};
/// Verify the complete object integrity
pub fn verify(self: *ObjectResult) !void {
// Ensure full object was processed before finalizing digest
if (!self.eof or self.current_msg != null or self.chunk_index != self.info.chunks) {
return ObjectStoreError.ChunkMismatch;
}
const calculated_digest = self.digest.finalResult();
var digest_hex: [64]u8 = undefined;
_ = std.fmt.bufPrint(digest_hex[0..], "{s}", .{std.fmt.fmtSliceHexLower(&calculated_digest)}) catch unreachable;
if (!std.mem.eql(u8, digest_hex[0..], self.info.digest)) {
return ObjectStoreError.DigestMismatch;
}
}
};
🤖 Prompt for AI Agents
In src/jetstream_objstore.zig around lines 216-226, change the fixed-array
pointer usage to explicit slices when formatting and comparing the hex digest
(use digest_hex[0..] for bufPrint and std.mem.eql) and add a guard to prevent
verify() from being called before the entire object has been read (e.g., check a
completed flag or compare bytes_read against self.info.size and return an
appropriate error like ObjectStoreError.PartialVerify or DigestMismatch if not
complete); ensure the hasher is only finalized in verify() when the object is
fully received so mid-stream calls fail fast rather than corrupting the hasher
state.


Expand Down Expand Up @@ -479,14 +441,9 @@ pub const ObjectStore = struct {

// For empty objects, return immediately without subscription
if (obj_info.size == 0) {
var reader = ObjectReader.init(arena_allocator, self.*, obj_info, null);
reader.eof = true; // Mark as EOF immediately since there's no data

return ObjectResult{
.info = obj_info,
.reader = reader,
.arena = arena,
};
var result = ObjectResult.init(arena_allocator, obj_info, null, arena);
result.eof = true; // Mark as EOF immediately since there's no data
return result;
}

// Create subscription for chunks
Expand All @@ -507,14 +464,8 @@ pub const ObjectStore = struct {

const sub = try self.js.subscribeSync(self.stream_name, consumer_config);

// Create reader with subscription
const reader = ObjectReader.init(arena_allocator, self.*, obj_info, sub);

return ObjectResult{
.info = obj_info,
.reader = reader,
.arena = arena,
};
// Create result with subscription
return ObjectResult.init(arena_allocator, obj_info, sub, arena);
}

/// Get object data as bytes (convenience method)
Expand All @@ -534,10 +485,10 @@ pub const ObjectStore = struct {
// Allocate buffer for complete object based on size from info
const data = try arena_allocator.alloc(u8, result.info.size);

// Read all data from the streaming reader
// Read all data from the streaming result
var total_read: usize = 0;
while (total_read < result.info.size) {
const n = try result.reader.read(data[total_read..]);
const n = try result.read(data[total_read..]);
if (n == 0) break; // EOF
total_read += n;
}
Expand All @@ -547,7 +498,7 @@ pub const ObjectStore = struct {
}

// Verify the digest
try result.reader.verify();
try result.verify();

return Result([]u8){
.arena = arena,
Expand Down
1 change: 0 additions & 1 deletion src/root.zig
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ pub const ObjectMeta = @import("jetstream_objstore.zig").ObjectMeta;
pub const ObjectInfo = @import("jetstream_objstore.zig").ObjectInfo;
pub const ObjectStoreConfig = @import("jetstream_objstore.zig").ObjectStoreConfig;
pub const ObjectStoreError = @import("jetstream_objstore.zig").ObjectStoreError;
pub const ObjectReader = @import("jetstream_objstore.zig").ObjectReader;
pub const ObjectResult = @import("jetstream_objstore.zig").ObjectResult;

// Removed top-level connect functions - use Connection.init() and Connection.connect() directly
Expand Down
8 changes: 4 additions & 4 deletions tests/jetstream_objstore_streaming_test.zig
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,15 @@ test "ObjectStore streaming put and get" {
var buffer: [1024]u8 = undefined;
var total_read: usize = 0;
while (total_read < get_result.info.size) {
const n = try get_result.reader.read(buffer[total_read..]);
const n = try get_result.read(buffer[total_read..]);
if (n == 0) break;
total_read += n;
}

try testing.expectEqualStrings(test_data, buffer[0..total_read]);

// Verify digest
try get_result.reader.verify();
try get_result.verify();

// Test that getBytes still works (compatibility)
const bytes_result = try store.getBytes("streaming-test-object");
Expand Down Expand Up @@ -122,9 +122,9 @@ test "ObjectStore streaming empty object" {

// Read should immediately return 0 (EOF)
var buffer: [10]u8 = undefined;
const n = try get_result.reader.read(&buffer);
const n = try get_result.read(&buffer);
try testing.expect(n == 0);

// Verify should succeed for empty object
try get_result.reader.verify();
try get_result.verify();
}