diff --git a/src/jetstream.zig b/src/jetstream.zig index 6ed0ac1..b341c2f 100644 --- a/src/jetstream.zig +++ b/src/jetstream.zig @@ -1312,4 +1312,10 @@ pub const JetStream = struct { var manager = self.kvManager(); return try manager.openBucket(bucket_name); } + + /// Create an object store manager + pub fn objectStoreManager(self: JetStream) @import("jetstream_objstore.zig").ObjectStoreManager { + const jetstream_objstore = @import("jetstream_objstore.zig"); + return jetstream_objstore.ObjectStoreManager.init(self); + } }; diff --git a/src/jetstream_objstore.zig b/src/jetstream_objstore.zig new file mode 100644 index 0000000..41549f5 --- /dev/null +++ b/src/jetstream_objstore.zig @@ -0,0 +1,817 @@ +// Copyright 2025 Lukas Lalinsky +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +const std = @import("std"); +const JetStream = @import("jetstream.zig").JetStream; +const StreamConfig = @import("jetstream.zig").StreamConfig; +const StreamInfo = @import("jetstream.zig").StreamInfo; +const ConsumerConfig = @import("jetstream.zig").ConsumerConfig; +const ConsumerInfo = @import("jetstream.zig").ConsumerInfo; +const PublishOptions = @import("jetstream.zig").PublishOptions; +const JetStreamSubscription = @import("jetstream.zig").JetStreamSubscription; +const Result = @import("result.zig").Result; +const StoredMessage = @import("jetstream.zig").StoredMessage; +const Message = @import("message.zig").Message; +const timestamp = @import("timestamp.zig"); +const newInbox = @import("inbox.zig").newInbox; +const nuid = @import("nuid.zig"); +const validation = @import("validation.zig"); + +const log = @import("log.zig").log; + +// Default chunk size (128KB) +const DEFAULT_CHUNK_SIZE: u32 = 128 * 1024; + +// Object Store-specific errors +pub const ObjectStoreError = error{ + StoreNotFound, + ObjectNotFound, + ChunkMismatch, + DigestMismatch, + BadRequest, +}; + +/// Object metadata options +pub const ObjectMetaOptions = struct { + /// Custom chunk size for this object + chunk_size: ?u32 = null, +}; + +/// Object metadata structure +pub const ObjectMeta = struct { + /// Object name + name: []const u8, + /// Optional description + description: ?[]const u8 = null, + /// Optional additional options + opts: ?ObjectMetaOptions = null, +}; + +/// Object info contains metadata plus instance information (what gets stored as JSON) +pub const ObjectInfo = struct { + /// Object name + name: []const u8, + /// Optional description + description: ?[]const u8 = null, + /// Optional additional options + opts: ?ObjectMetaOptions = null, + /// Store/bucket name + bucket: []const u8, + /// Unique object identifier (NUID) + nuid: []const u8, + /// Total object size in bytes + size: u64, + /// Number of chunks + chunks: u32, + /// Last modified time (from message timestamp, not stored in JSON) + mtime: u64 = 0, + /// SHA-256 digest hex string + digest: []const u8, + /// True if object is deleted + deleted: bool = false, +}; + +/// ObjectInfo for JSON serialization (excludes mtime) +const ObjectInfoJson = struct { + name: []const u8, + description: ?[]const u8 = null, + opts: ?ObjectMetaOptions = null, + bucket: []const u8, + nuid: []const u8, + size: u64, + chunks: u32, + digest: []const u8, + deleted: bool = false, +}; + +/// Configuration for creating object stores +pub const ObjectStoreConfig = struct { + /// Store name (required) + store_name: []const u8, + /// Description of the store + description: ?[]const u8 = null, + /// Maximum object size in bytes (-1 = unlimited) + max_object_size: i64 = -1, + /// Maximum store size in bytes (-1 = unlimited) + max_bytes: i64 = -1, + /// Storage type + storage: enum { file, memory } = .file, + /// Number of replicas + replicas: u8 = 1, + /// Enable compression + compression: bool = false, + /// Chunk size + chunk_size: u32 = DEFAULT_CHUNK_SIZE, +}; + +/// Stream reader for object chunks - provides progressive chunk reading +pub const ObjectReader = struct { + store: ObjectStore, + info: ObjectInfo, + subscription: ?*JetStreamSubscription, + buffer: std.ArrayList(u8), + buffer_pos: usize, + chunk_index: u32, + digest: std.crypto.hash.sha2.Sha256, + eof: bool, + verified: bool, + allocator: std.mem.Allocator, + + pub fn init(allocator: std.mem.Allocator, store: ObjectStore, info: ObjectInfo, subscription: ?*JetStreamSubscription) ObjectReader { + return ObjectReader{ + .store = store, + .info = info, + .subscription = subscription, + .buffer = std.ArrayList(u8).init(allocator), + .buffer_pos = 0, + .chunk_index = 0, + .digest = std.crypto.hash.sha2.Sha256.init(.{}), + .eof = false, + .verified = false, + .allocator = allocator, + }; + } + + pub fn deinit(self: *ObjectReader) void { + self.buffer.deinit(); + if (self.subscription) |sub| { + sub.deinit(); + } + } + + /// Read data from the object stream + pub fn read(self: *ObjectReader, dest: []u8) !usize { + if (self.eof) return 0; + + var bytes_written: usize = 0; + + while (bytes_written < dest.len) { + // If buffer is empty, fetch next chunk + if (self.buffer_pos >= self.buffer.items.len) { + if (self.chunk_index >= self.info.chunks) { + self.eof = true; + break; + } + + // Get next chunk from subscription + const timeout_ms = self.store.js.nc.options.timeout_ms; + const js_msg = self.subscription.?.nextMsg(timeout_ms) catch |err| { + // Only treat permanent errors as EOF, not temporary ones + switch (err) { + error.Timeout => { + // Temporary error - allow retry by returning partial data + break; + }, + else => { + // Permanent error - propagate to caller + return err; + }, + } + }; + defer js_msg.deinit(); + + // Reset buffer with new chunk data + self.buffer.clearRetainingCapacity(); + try self.buffer.appendSlice(js_msg.msg.data); + self.buffer_pos = 0; + self.chunk_index += 1; + + // Update digest + self.digest.update(js_msg.msg.data); + + // Validate we haven't received more chunks than expected + if (self.chunk_index > self.info.chunks) { + return ObjectStoreError.ChunkMismatch; + } + } + + // Copy from buffer to destination + const available = self.buffer.items.len - self.buffer_pos; + const to_copy = @min(available, dest.len - bytes_written); + + @memcpy( + dest[bytes_written .. bytes_written + to_copy], + self.buffer.items[self.buffer_pos .. self.buffer_pos + to_copy], + ); + + self.buffer_pos += to_copy; + bytes_written += to_copy; + } + + // Check if we've completed reading all expected chunks + if (self.chunk_index == self.info.chunks and self.buffer_pos >= self.buffer.items.len) { + self.eof = true; + // Automatically verify digest when stream is complete + if (!self.verified) { + self.autoVerify() catch { + // Ignore verification error here - it can be checked later + // with verify() or isVerified() + }; + } + } + + return bytes_written; + } + + /// Internal automatic verification when stream completes + fn autoVerify(self: *ObjectReader) !void { + if (self.verified) return; + + const calculated_digest = self.digest.finalResult(); + var digest_hex: [64]u8 = undefined; + _ = std.fmt.bufPrint(&digest_hex, "{s}", .{std.fmt.fmtSliceHexLower(&calculated_digest)}) catch unreachable; + + if (!std.mem.eql(u8, &digest_hex, self.info.digest)) { + return ObjectStoreError.DigestMismatch; + } + + self.verified = true; + } + + /// Verify the complete object integrity (public API) + pub fn verify(self: *ObjectReader) !void { + try self.autoVerify(); + } + + /// Check if the object has been verified + pub fn isVerified(self: *ObjectReader) bool { + return self.verified; + } +}; + +/// ObjectResult provides streaming access to object data +pub const ObjectResult = struct { + info: ObjectInfo, + reader: ObjectReader, + arena: *std.heap.ArenaAllocator, + + pub fn deinit(self: *ObjectResult) void { + self.reader.deinit(); + const child_allocator = self.arena.child_allocator; + self.arena.deinit(); + child_allocator.destroy(self.arena); + } +}; + +/// Object Store implementation +pub const ObjectStore = struct { + /// JetStream context + js: JetStream, + /// Store name + store_name: []const u8, + /// Stream name (OBJ_) + stream_name: []const u8, + /// Chunk subject prefix ($O..C.) + chunk_subject_prefix: []const u8, + /// Meta subject prefix ($O..M.) + meta_subject_prefix: []const u8, + /// Allocator for memory management + allocator: std.mem.Allocator, + /// Default chunk size + chunk_size: u32, + + const Self = @This(); + + /// Initialize ObjectStore handle + pub fn init(allocator: std.mem.Allocator, js: JetStream, store_name: []const u8, chunk_size: u32) !ObjectStore { + try validation.validateOSBucketName(store_name); + + // Create owned copies of names + const owned_store_name = try allocator.dupe(u8, store_name); + errdefer allocator.free(owned_store_name); + + const stream_name = try std.fmt.allocPrint(allocator, "OBJ_{s}", .{store_name}); + errdefer allocator.free(stream_name); + + const chunk_subject_prefix = try std.fmt.allocPrint(allocator, "$O.{s}.C.", .{store_name}); + errdefer allocator.free(chunk_subject_prefix); + + const meta_subject_prefix = try std.fmt.allocPrint(allocator, "$O.{s}.M.", .{store_name}); + errdefer allocator.free(meta_subject_prefix); + + return ObjectStore{ + .js = js, + .store_name = owned_store_name, + .stream_name = stream_name, + .chunk_subject_prefix = chunk_subject_prefix, + .meta_subject_prefix = meta_subject_prefix, + .allocator = allocator, + .chunk_size = chunk_size, + }; + } + + pub fn deinit(self: *ObjectStore) void { + self.allocator.free(self.store_name); + self.allocator.free(self.stream_name); + self.allocator.free(self.chunk_subject_prefix); + self.allocator.free(self.meta_subject_prefix); + } + + /// Get the meta subject for an object name + fn getMetaSubject(self: *ObjectStore, object_name: []const u8) ![]u8 { + return std.fmt.allocPrint(self.allocator, "{s}{s}", .{ self.meta_subject_prefix, object_name }); + } + + /// Get the chunk subject for an object NUID + fn getChunkSubject(self: *ObjectStore, object_nuid: []const u8) ![]u8 { + return std.fmt.allocPrint(self.allocator, "{s}{s}", .{ self.chunk_subject_prefix, object_nuid }); + } + + /// Primary put method that accepts any reader type for efficient streaming + pub fn put(self: *ObjectStore, meta: ObjectMeta, reader: anytype) !Result(ObjectInfo) { + try validation.validateOSObjectName(meta.name); + + // Create arena for return value + const arena = try self.allocator.create(std.heap.ArenaAllocator); + errdefer self.allocator.destroy(arena); + arena.* = std.heap.ArenaAllocator.init(self.allocator); + errdefer arena.deinit(); + + const arena_allocator = arena.allocator(); + + // Generate unique identifier for this object - owned by arena + const object_nuid = try nuid.nextString(arena_allocator); + + // Initialize digest calculation + var hasher = std.crypto.hash.sha2.Sha256.init(.{}); + + // Determine chunk size (use meta.opts.chunk_size or store default) + const chunk_size = if (meta.opts) |opts| + opts.chunk_size orelse self.chunk_size + else + self.chunk_size; + + // Allocate chunk buffer using temporary allocator + const chunk_buffer = try self.allocator.alloc(u8, chunk_size); + defer self.allocator.free(chunk_buffer); + + // Stream and publish chunks + var total_size: u64 = 0; + var chunk_count: u32 = 0; + const chunk_subject = try self.getChunkSubject(object_nuid); + defer self.allocator.free(chunk_subject); + + while (true) { + const bytes_read = reader.read(chunk_buffer) catch |err| switch (err) { + error.EndOfStream => break, + else => return err, + }; + if (bytes_read == 0) break; + + // Update digest + hasher.update(chunk_buffer[0..bytes_read]); + + // Publish chunk + const ack = try self.js.publish(chunk_subject, chunk_buffer[0..bytes_read], .{}); + defer ack.deinit(); + + total_size += bytes_read; + chunk_count += 1; + } + + // Create digest string - owned by arena + const digest_bytes = hasher.finalResult(); + const digest_hex = try arena_allocator.alloc(u8, 64); + _ = std.fmt.bufPrint(digest_hex, "{s}", .{std.fmt.fmtSliceHexLower(&digest_bytes)}) catch unreachable; + + const obj_info = ObjectInfo{ + .name = try arena_allocator.dupe(u8, meta.name), + .description = if (meta.description) |desc| try arena_allocator.dupe(u8, desc) else null, + .opts = meta.opts, + .bucket = try arena_allocator.dupe(u8, self.store_name), + .nuid = object_nuid, + .size = total_size, + .chunks = chunk_count, + .digest = digest_hex, + .deleted = false, + }; + + // Store metadata + const info_json = try self.serializeObjectInfo(obj_info); + defer self.allocator.free(info_json); + + const meta_subject = try self.getMetaSubject(meta.name); + defer self.allocator.free(meta_subject); + + const meta_ack = try self.js.publish(meta_subject, info_json, .{}); + defer meta_ack.deinit(); + + return Result(ObjectInfo){ + .arena = arena, + .value = obj_info, + }; + } + + /// Put bytes as an object into the store (convenience method) + pub fn putBytes(self: *ObjectStore, object_name: []const u8, data: []const u8) !Result(ObjectInfo) { + // Create a fixed buffer stream from the data + var stream = std.io.fixedBufferStream(data); + + // Create ObjectMeta with store's default chunk size + const meta = ObjectMeta{ + .name = object_name, + .description = null, + .opts = ObjectMetaOptions{ + .chunk_size = self.chunk_size, + }, + }; + + // Use the primary put() method with the stream reader + return self.put(meta, stream.reader()); + } + + /// Primary get method that returns a streaming result + pub fn get(self: *ObjectStore, object_name: []const u8) !ObjectResult { + try validation.validateOSObjectName(object_name); + + // First get metadata + const meta_subject = try self.getMetaSubject(object_name); + defer self.allocator.free(meta_subject); + + const meta_msg = self.js.getMsg(self.stream_name, .{ .last_by_subj = meta_subject, .direct = true }) catch |err| { + return if (err == error.MessageNotFound) ObjectStoreError.ObjectNotFound else err; + }; + errdefer meta_msg.deinit(); + + // Create arena for the result + const arena = try self.allocator.create(std.heap.ArenaAllocator); + errdefer self.allocator.destroy(arena); + arena.* = std.heap.ArenaAllocator.init(self.allocator); + errdefer arena.deinit(); + + const arena_allocator = arena.allocator(); + + // Parse object info JSON using arena allocator + const parsed = try std.json.parseFromSlice(ObjectInfoJson, arena_allocator, meta_msg.data, .{}); + + // Convert from JSON version to full ObjectInfo with proper string ownership + const obj_info = ObjectInfo{ + .name = parsed.value.name, + .description = parsed.value.description, + .opts = parsed.value.opts, + .bucket = parsed.value.bucket, + .nuid = parsed.value.nuid, + .size = parsed.value.size, + .chunks = parsed.value.chunks, + .mtime = meta_msg.time, // Set from message timestamp + .digest = parsed.value.digest, + .deleted = parsed.value.deleted, + }; + + // We can safely deinit the message now since all strings are owned by arena + meta_msg.deinit(); + + if (obj_info.deleted) { + return ObjectStoreError.ObjectNotFound; + } + + // Links are not supported in the current server implementation + + // For empty objects, return immediately without subscription + if (obj_info.size == 0) { + var reader = ObjectReader.init(arena_allocator, self.*, obj_info, null); + reader.eof = true; // Mark as EOF immediately since there's no data + + return ObjectResult{ + .info = obj_info, + .reader = reader, + .arena = arena, + }; + } + + // Create subscription for chunks + const chunk_subject = try self.getChunkSubject(obj_info.nuid); + defer self.allocator.free(chunk_subject); + + const inbox = try newInbox(self.allocator); + defer self.allocator.free(inbox); + + const consumer_config = ConsumerConfig{ + .description = "Object chunk retrieval", + .deliver_subject = inbox, + .deliver_policy = .all, + .ack_policy = .none, + .max_ack_pending = 0, + .filter_subjects = &.{chunk_subject}, + }; + + const sub = try self.js.subscribeSync(self.stream_name, consumer_config); + + // Create reader with subscription + const reader = ObjectReader.init(arena_allocator, self.*, obj_info, sub); + + return ObjectResult{ + .info = obj_info, + .reader = reader, + .arena = arena, + }; + } + + /// Get object data as bytes (convenience method) + pub fn getBytes(self: *ObjectStore, object_name: []const u8) !Result([]u8) { + // Get the streaming result + var result = try self.get(object_name); + defer result.deinit(); + + // Create arena for the result + const arena = try self.allocator.create(std.heap.ArenaAllocator); + errdefer self.allocator.destroy(arena); + arena.* = std.heap.ArenaAllocator.init(self.allocator); + errdefer arena.deinit(); + + const arena_allocator = arena.allocator(); + + // Allocate buffer for complete object based on size from info + const data = try arena_allocator.alloc(u8, result.info.size); + + // Read all data from the streaming reader + var total_read: usize = 0; + while (total_read < result.info.size) { + const n = try result.reader.read(data[total_read..]); + if (n == 0) break; // EOF + total_read += n; + } + + if (total_read != result.info.size) { + return ObjectStoreError.ChunkMismatch; + } + + // Verify the digest + try result.reader.verify(); + + return Result([]u8){ + .arena = arena, + .value = data, + }; + } + + /// Get object metadata + pub fn info(self: *ObjectStore, object_name: []const u8) !Result(ObjectInfo) { + try validation.validateOSObjectName(object_name); + + const meta_subject = try self.getMetaSubject(object_name); + defer self.allocator.free(meta_subject); + + const meta_msg = self.js.getMsg(self.stream_name, .{ .last_by_subj = meta_subject, .direct = true }) catch |err| { + return if (err == error.MessageNotFound) ObjectStoreError.ObjectNotFound else err; + }; + errdefer meta_msg.deinit(); + + const arena = try self.allocator.create(std.heap.ArenaAllocator); + errdefer self.allocator.destroy(arena); + arena.* = std.heap.ArenaAllocator.init(self.allocator); + errdefer arena.deinit(); + + const arena_allocator = arena.allocator(); + + // Parse object info JSON using arena allocator + const parsed = try std.json.parseFromSlice(ObjectInfoJson, arena_allocator, meta_msg.data, .{}); + + // Convert from JSON version to full ObjectInfo with proper string ownership + const obj_info = ObjectInfo{ + .name = parsed.value.name, + .description = parsed.value.description, + .opts = parsed.value.opts, + .bucket = parsed.value.bucket, + .nuid = parsed.value.nuid, + .size = parsed.value.size, + .chunks = parsed.value.chunks, + .mtime = meta_msg.time, // Set from message timestamp + .digest = parsed.value.digest, + .deleted = parsed.value.deleted, + }; + + // We can safely deinit the message now since all strings are owned by arena + meta_msg.deinit(); + + return Result(ObjectInfo){ + .arena = arena, + .value = obj_info, + }; + } + + /// Delete an object (marks as deleted) + pub fn delete(self: *ObjectStore, object_name: []const u8) !void { + // Get current metadata + const info_result = try self.info(object_name); + defer info_result.deinit(); + const obj_info = info_result.value; + + if (obj_info.deleted) { + return ObjectStoreError.ObjectNotFound; + } + + // Create updated object info with deleted flag + const updated_info = ObjectInfo{ + .name = object_name, + .description = obj_info.description, + .bucket = self.store_name, + .nuid = obj_info.nuid, + .size = obj_info.size, + .chunks = obj_info.chunks, + .digest = obj_info.digest, + .deleted = true, + }; + + // Serialize and store updated object info + const info_json = try self.serializeObjectInfo(updated_info); + defer self.allocator.free(info_json); + + const meta_subject = try self.getMetaSubject(object_name); + defer self.allocator.free(meta_subject); + + const result = try self.js.publish(meta_subject, info_json, .{}); + defer result.deinit(); + } + + /// List all objects in the store + pub fn list(self: *ObjectStore) !Result([]ObjectInfo) { + // Use JetStream consumer to iterate through all metadata messages + const meta_filter = try std.fmt.allocPrint(self.allocator, "{s}>", .{self.meta_subject_prefix}); + defer self.allocator.free(meta_filter); + + const inbox = try newInbox(self.allocator); + defer self.allocator.free(inbox); + + const consumer_config = ConsumerConfig{ + .description = "Object store list", + .deliver_subject = inbox, + .deliver_policy = .last_per_subject, + .ack_policy = .none, + .max_ack_pending = 0, + .filter_subjects = &.{meta_filter}, + }; + + const sub = try self.js.subscribeSync(self.stream_name, consumer_config); + defer sub.deinit(); + + const arena = try self.allocator.create(std.heap.ArenaAllocator); + errdefer self.allocator.destroy(arena); + arena.* = std.heap.ArenaAllocator.init(self.allocator); + errdefer arena.deinit(); + + const arena_allocator = arena.allocator(); + var objects = try std.ArrayList(ObjectInfo).initCapacity(arena_allocator, 64); + + // Collect all objects (including deleted ones, to be filtered later) + const timeout_ms = self.js.nc.options.timeout_ms; + while (true) { + const js_msg = sub.nextMsg(timeout_ms) catch |err| { + if (err == error.Timeout) { + break; + } + return err; + }; + defer js_msg.deinit(); + + // Parse metadata using temporary allocator + const parsed = try std.json.parseFromSlice(ObjectInfoJson, self.allocator, js_msg.msg.data, .{}); + defer parsed.deinit(); + const meta = parsed.value; + + // Only include non-deleted objects + if (!meta.deleted) { + const obj_info = ObjectInfo{ + .name = try arena_allocator.dupe(u8, meta.name), + .description = if (meta.description) |desc| try arena_allocator.dupe(u8, desc) else null, + .opts = meta.opts, + .bucket = try arena_allocator.dupe(u8, meta.bucket), + .nuid = try arena_allocator.dupe(u8, meta.nuid), + .size = meta.size, + .chunks = meta.chunks, + .mtime = js_msg.msg.time, + .digest = try arena_allocator.dupe(u8, meta.digest), + .deleted = meta.deleted, + }; + + try objects.append(obj_info); + } + + // Stop when there are no more pending messages + if (js_msg.metadata.num_pending == 0) { + break; + } + } + + return Result([]ObjectInfo){ + .arena = arena, + .value = try objects.toOwnedSlice(), + }; + } + + /// Serialize ObjectInfo to JSON string + fn serializeObjectInfo(self: *ObjectStore, obj_info: ObjectInfo) ![]u8 { + // Convert to JSON-only version (excludes mtime) + const json_info = ObjectInfoJson{ + .name = obj_info.name, + .description = obj_info.description, + .opts = obj_info.opts, + .bucket = obj_info.bucket, + .nuid = obj_info.nuid, + .size = obj_info.size, + .chunks = obj_info.chunks, + .digest = obj_info.digest, + .deleted = obj_info.deleted, + }; + return std.json.stringifyAlloc(self.allocator, json_info, .{}); + } +}; + +/// Object Store Manager handles store-level operations +pub const ObjectStoreManager = struct { + js: JetStream, + + const Self = @This(); + + pub fn init(js: JetStream) ObjectStoreManager { + return ObjectStoreManager{ + .js = js, + }; + } + + /// Create a new object store + pub fn createStore(self: *ObjectStoreManager, config: ObjectStoreConfig) !ObjectStore { + try validation.validateOSBucketName(config.store_name); + + const stream_name = try std.fmt.allocPrint(self.js.nc.allocator, "OBJ_{s}", .{config.store_name}); + defer self.js.nc.allocator.free(stream_name); + + const chunk_subject = try std.fmt.allocPrint(self.js.nc.allocator, "$O.{s}.C.>", .{config.store_name}); + defer self.js.nc.allocator.free(chunk_subject); + + const meta_subject = try std.fmt.allocPrint(self.js.nc.allocator, "$O.{s}.M.>", .{config.store_name}); + defer self.js.nc.allocator.free(meta_subject); + + const stream_config = StreamConfig{ + .name = stream_name, + .description = config.description, + .subjects = &.{ chunk_subject, meta_subject }, + .retention = .limits, + .max_msg_size = @intCast(config.max_object_size), + .max_bytes = config.max_bytes, + .storage = switch (config.storage) { + .file => .file, + .memory => .memory, + }, + .compression = if (config.compression) .s2 else .none, + .num_replicas = config.replicas, + .discard = .new, + .allow_direct = true, + .allow_rollup_hdrs = true, + }; + + const result = try self.js.addStream(stream_config); + defer result.deinit(); + + return try ObjectStore.init(self.js.nc.allocator, self.js, config.store_name, config.chunk_size); + } + + /// Open an existing object store + pub fn openStore(self: *ObjectStoreManager, store_name: []const u8) !ObjectStore { + // Verify store exists by getting stream info + const stream_name = try std.fmt.allocPrint(self.js.nc.allocator, "OBJ_{s}", .{store_name}); + defer self.js.nc.allocator.free(stream_name); + + const stream_info = self.js.getStreamInfo(stream_name) catch |err| { + return if (err == error.JetStreamError) ObjectStoreError.StoreNotFound else err; + }; + defer stream_info.deinit(); + + return try ObjectStore.init(self.js.nc.allocator, self.js, store_name, DEFAULT_CHUNK_SIZE); + } + + /// Delete an object store + pub fn deleteStore(self: *ObjectStoreManager, store_name: []const u8) !void { + try validation.validateOSBucketName(store_name); + + const stream_name = try std.fmt.allocPrint(self.js.nc.allocator, "OBJ_{s}", .{store_name}); + defer self.js.nc.allocator.free(stream_name); + + try self.js.deleteStream(stream_name); + } +}; + +test "validation delegates to validation.zig" { + // Test that we properly delegate to centralized validation + try validation.validateOSBucketName("valid-store_name123"); + try std.testing.expectError(error.InvalidOSBucketName, validation.validateOSBucketName("")); + try std.testing.expectError(error.InvalidOSBucketName, validation.validateOSBucketName("foo bar")); + try std.testing.expectError(error.InvalidOSBucketName, validation.validateOSBucketName("foo.bar")); + + try validation.validateOSObjectName("valid-object/name_123.txt"); + try std.testing.expectError(error.InvalidOSObjectName, validation.validateOSObjectName("")); + try std.testing.expectError(error.InvalidOSObjectName, validation.validateOSObjectName("/starts-with-slash")); + try std.testing.expectError(error.InvalidOSObjectName, validation.validateOSObjectName("ends-with-slash/")); + try std.testing.expectError(error.InvalidOSObjectName, validation.validateOSObjectName(".starts-with-dot")); + try std.testing.expectError(error.InvalidOSObjectName, validation.validateOSObjectName("ends-with-dot.")); +} diff --git a/src/root.zig b/src/root.zig index 92c9208..f1800d8 100644 --- a/src/root.zig +++ b/src/root.zig @@ -70,10 +70,22 @@ pub const validateSubject = @import("validation.zig").validateSubject; pub const validateQueueName = @import("validation.zig").validateQueueName; pub const validateKVBucketName = @import("validation.zig").validateKVBucketName; pub const validateKVKeyName = @import("validation.zig").validateKVKeyName; +pub const validateOSBucketName = @import("validation.zig").validateOSBucketName; +pub const validateOSObjectName = @import("validation.zig").validateOSObjectName; // Utility types pub const Result = @import("result.zig").Result; +// JetStream Object Store types +pub const ObjectStore = @import("jetstream_objstore.zig").ObjectStore; +pub const ObjectStoreManager = @import("jetstream_objstore.zig").ObjectStoreManager; +pub const ObjectMeta = @import("jetstream_objstore.zig").ObjectMeta; +pub const ObjectInfo = @import("jetstream_objstore.zig").ObjectInfo; +pub const ObjectStoreConfig = @import("jetstream_objstore.zig").ObjectStoreConfig; +pub const ObjectStoreError = @import("jetstream_objstore.zig").ObjectStoreError; +pub const ObjectReader = @import("jetstream_objstore.zig").ObjectReader; +pub const ObjectResult = @import("jetstream_objstore.zig").ObjectResult; + // Removed top-level connect functions - use Connection.init() and Connection.connect() directly test { @@ -81,4 +93,5 @@ test { _ = @import("jetstream.zig"); _ = @import("jetstream_message.zig"); _ = @import("jetstream_kv.zig"); + _ = @import("jetstream_objstore.zig"); } diff --git a/src/validation.zig b/src/validation.zig index d84b859..762a132 100644 --- a/src/validation.zig +++ b/src/validation.zig @@ -220,6 +220,32 @@ pub fn validateOSBucketName(name: []const u8) !void { } } +/// Validate Object Store object name according to ADR-6: limited-term with dots, no leading/trailing slashes or dots +pub fn validateOSObjectName(name: []const u8) !void { + if (name.len == 0) { + return error.InvalidOSObjectName; + } + + if (name.len > 255) { + return error.InvalidOSObjectName; + } + + // Check for leading or trailing slashes/dots + if (name[0] == '/' or name[name.len - 1] == '/' or + name[0] == '.' or name[name.len - 1] == '.') + { + return error.InvalidOSObjectName; + } + + // Validate each character - allow limited-term chars plus dots + for (name) |c| { + const valid = isLimitedTermChar(c) or c == '.'; + if (!valid) { + return error.InvalidOSObjectName; + } + } +} + // Tests test "filename-safe characters" { @@ -376,3 +402,37 @@ test "validateKVKeyName" { try std.testing.expectError(error.InvalidKVKeyName, validateKVKeyName("key\\withbackslash")); try std.testing.expectError(error.InvalidKVKeyName, validateKVKeyName("key with space")); } + +test "validateOSBucketName" { + // Valid bucket names (restricted-term) + try validateOSBucketName("valid_bucket-name"); + try validateOSBucketName("Bucket123"); + try validateOSBucketName("UPPERCASE"); + + // Invalid bucket names + try std.testing.expectError(error.InvalidOSBucketName, validateOSBucketName("")); + try std.testing.expectError(error.InvalidOSBucketName, validateOSBucketName("bucket.with.dots")); + try std.testing.expectError(error.InvalidOSBucketName, validateOSBucketName("bucket/with/slash")); + try std.testing.expectError(error.InvalidOSBucketName, validateOSBucketName("bucket=equals")); + try std.testing.expectError(error.InvalidOSBucketName, validateOSBucketName("bucket with space")); +} + +test "validateOSObjectName" { + // Valid object names + try validateOSObjectName("valid-object/name_123.txt"); + try validateOSObjectName("object_with-dashes"); + try validateOSObjectName("object/with/slashes"); + try validateOSObjectName("object=with=equals"); + try validateOSObjectName("object.with.dots"); + + // Invalid object names + try std.testing.expectError(error.InvalidOSObjectName, validateOSObjectName("")); + try std.testing.expectError(error.InvalidOSObjectName, validateOSObjectName("/starts-with-slash")); + try std.testing.expectError(error.InvalidOSObjectName, validateOSObjectName("ends-with-slash/")); + try std.testing.expectError(error.InvalidOSObjectName, validateOSObjectName(".starts-with-dot")); + try std.testing.expectError(error.InvalidOSObjectName, validateOSObjectName("ends-with-dot.")); + try std.testing.expectError(error.InvalidOSObjectName, validateOSObjectName("object*with*asterisk")); + try std.testing.expectError(error.InvalidOSObjectName, validateOSObjectName("object>with>gt")); + try std.testing.expectError(error.InvalidOSObjectName, validateOSObjectName("object\\with\\backslash")); + try std.testing.expectError(error.InvalidOSObjectName, validateOSObjectName("object with space")); +} diff --git a/tests/all_tests.zig b/tests/all_tests.zig index bd8c7ad..2a52376 100644 --- a/tests/all_tests.zig +++ b/tests/all_tests.zig @@ -27,6 +27,8 @@ test { _ = @import("jetstream_kv_simple_test.zig"); _ = @import("jetstream_kv_test.zig"); _ = @import("jetstream_kv_history_test.zig"); + _ = @import("jetstream_objstore_test.zig"); + _ = @import("jetstream_objstore_streaming_test.zig"); } test "tests:beforeEach" { diff --git a/tests/jetstream_objstore_streaming_test.zig b/tests/jetstream_objstore_streaming_test.zig new file mode 100644 index 0000000..9cc14c4 --- /dev/null +++ b/tests/jetstream_objstore_streaming_test.zig @@ -0,0 +1,130 @@ +const std = @import("std"); +const testing = std.testing; +const nats = @import("nats"); +const utils = @import("utils.zig"); + +test "ObjectStore streaming put and get" { + const conn = try utils.createDefaultConnection(); + defer utils.closeConnection(conn); + + // Create JetStream context + const js = conn.jetstream(.{}); + + // Create ObjectStore manager + var osm = js.objectStoreManager(); + + // Generate unique store name + const store_name = try utils.generateUniqueName(testing.allocator, "teststore"); + defer testing.allocator.free(store_name); + + // Create test object store + const store_config = nats.ObjectStoreConfig{ + .store_name = store_name, + .description = "Test store for streaming", + }; + + var store = try osm.createStore(store_config); + defer store.deinit(); + defer osm.deleteStore(store_name) catch {}; + + // Test data + const test_data = "Hello, streaming world! This is a test of the new streaming put/get functionality."; + + // Test streaming put with ObjectMeta + const meta = nats.ObjectMeta{ + .name = "streaming-test-object", + .description = "Test object for streaming", + .opts = null, + }; + + var stream = std.io.fixedBufferStream(test_data); + const put_result = try store.put(meta, stream.reader()); + + try testing.expect(put_result.value.size == test_data.len); + try testing.expect(put_result.value.chunks > 0); + try testing.expectEqualStrings("streaming-test-object", put_result.value.name); + defer put_result.deinit(); + + // Test streaming get + var get_result = try store.get("streaming-test-object"); + defer get_result.deinit(); + + try testing.expectEqualStrings("streaming-test-object", get_result.info.name); + try testing.expect(get_result.info.size == test_data.len); + + // Read data from stream + var buffer: [1024]u8 = undefined; + var total_read: usize = 0; + while (total_read < get_result.info.size) { + const n = try get_result.reader.read(buffer[total_read..]); + if (n == 0) break; + total_read += n; + } + + try testing.expectEqualStrings(test_data, buffer[0..total_read]); + + // Verify digest + try get_result.reader.verify(); + + // Test that getBytes still works (compatibility) + const bytes_result = try store.getBytes("streaming-test-object"); + defer bytes_result.deinit(); + + try testing.expectEqualStrings(test_data, bytes_result.value); +} + +test "ObjectStore streaming empty object" { + const conn = try utils.createDefaultConnection(); + defer utils.closeConnection(conn); + + // Create JetStream context + const js = conn.jetstream(.{}); + + // Create ObjectStore manager + var osm = js.objectStoreManager(); + + // Generate unique store name + const store_name = try utils.generateUniqueName(testing.allocator, "testempty"); + defer testing.allocator.free(store_name); + + // Create test object store + const store_config = nats.ObjectStoreConfig{ + .store_name = store_name, + .description = "Test store for empty objects", + }; + + var store = try osm.createStore(store_config); + defer store.deinit(); + defer osm.deleteStore(store_name) catch {}; + + // Test empty data + const empty_data = ""; + + // Test streaming put with empty data + const meta = nats.ObjectMeta{ + .name = "empty-object", + .description = "Empty test object", + .opts = null, + }; + + var stream = std.io.fixedBufferStream(empty_data); + const put_result = try store.put(meta, stream.reader()); + + try testing.expect(put_result.value.size == 0); + try testing.expect(put_result.value.chunks == 0); + defer put_result.deinit(); + + // Test streaming get for empty object + var get_result = try store.get("empty-object"); + defer get_result.deinit(); + + try testing.expect(get_result.info.size == 0); + + // Read should immediately return 0 (EOF) + var buffer: [10]u8 = undefined; + const n = try get_result.reader.read(&buffer); + try testing.expect(n == 0); + + // Verify should succeed for empty object + try get_result.reader.verify(); +} diff --git a/tests/jetstream_objstore_test.zig b/tests/jetstream_objstore_test.zig new file mode 100644 index 0000000..5b041a1 --- /dev/null +++ b/tests/jetstream_objstore_test.zig @@ -0,0 +1,291 @@ +const std = @import("std"); +const testing = std.testing; +const nats = @import("nats"); +const utils = @import("utils.zig"); + +const log = std.log.default; + +test "ObjectStore basic create store" { + const conn = try utils.createDefaultConnection(); + defer utils.closeConnection(conn); + + const js = conn.jetstream(.{}); + + // Generate unique store name + const store_name = try utils.generateUniqueName(testing.allocator, "teststore"); + defer testing.allocator.free(store_name); + + // Create ObjectStore manager + var objstore_manager = js.objectStoreManager(); + + // Create store + const config = nats.ObjectStoreConfig{ + .store_name = store_name, + .description = "Test object store", + }; + + var objstore = try objstore_manager.createStore(config); + defer objstore.deinit(); + + // Clean up + try objstore_manager.deleteStore(store_name); +} + +test "ObjectStore put and get operations" { + const conn = try utils.createDefaultConnection(); + defer utils.closeConnection(conn); + + const js = conn.jetstream(.{}); + + // Generate unique store name + const store_name = try utils.generateUniqueName(testing.allocator, "teststore"); + defer testing.allocator.free(store_name); + + // Create ObjectStore manager and store + var objstore_manager = js.objectStoreManager(); + const config = nats.ObjectStoreConfig{ + .store_name = store_name, + .description = "Test object store", + }; + + var objstore = try objstore_manager.createStore(config); + defer objstore.deinit(); + defer objstore_manager.deleteStore(store_name) catch {}; + + // Test data + const test_data = "Hello, ObjectStore World!"; + const object_name = "test-object.txt"; + + // Put object + const put_result = try objstore.putBytes(object_name, test_data); + defer put_result.deinit(); + try testing.expectEqualStrings(object_name, put_result.value.name); + try testing.expectEqualStrings(store_name, put_result.value.bucket); + try testing.expect(put_result.value.size == test_data.len); + try testing.expect(put_result.value.chunks > 0); + try testing.expect(!put_result.value.deleted); + + // Get object + const get_result = try objstore.getBytes(object_name); + defer get_result.deinit(); + try testing.expectEqualStrings(test_data, get_result.value); + + // Get object info + const info_result = try objstore.info(object_name); + defer info_result.deinit(); + try testing.expectEqualStrings(object_name, info_result.value.name); + try testing.expectEqualStrings(store_name, info_result.value.bucket); + try testing.expect(info_result.value.size == test_data.len); + try testing.expect(!info_result.value.deleted); +} + +test "ObjectStore chunked operations" { + const conn = try utils.createDefaultConnection(); + defer utils.closeConnection(conn); + + const js = conn.jetstream(.{}); + + // Generate unique store name + const store_name = try utils.generateUniqueName(testing.allocator, "teststore"); + defer testing.allocator.free(store_name); + + // Create ObjectStore manager and store + var objstore_manager = js.objectStoreManager(); + const config = nats.ObjectStoreConfig{ + .store_name = store_name, + .description = "Test object store for chunked data", + .chunk_size = 1024, + }; + + var objstore = try objstore_manager.createStore(config); + defer objstore.deinit(); + defer objstore_manager.deleteStore(store_name) catch {}; + + // Create test data larger than default chunk size + const chunk_size = 1024; // 1KB chunks + const data_size = chunk_size * 3 + 512; // 3.5 chunks worth + const large_data = try testing.allocator.alloc(u8, data_size); + defer testing.allocator.free(large_data); + + // Fill with pattern + for (large_data, 0..) |*byte, i| { + byte.* = @intCast(i % 256); + } + + const object_name = "large-object.bin"; + + // Put large object with custom chunk size + const put_result = try objstore.putBytes(object_name, large_data); + defer put_result.deinit(); + try testing.expectEqualStrings(object_name, put_result.value.name); + try testing.expect(put_result.value.size == large_data.len); + try testing.expect(put_result.value.chunks == 4); // Should be 4 chunks + + // Get large object + const get_result = try objstore.getBytes(object_name); + defer get_result.deinit(); + try testing.expectEqualSlices(u8, large_data, get_result.value); +} + +test "ObjectStore delete operations" { + const conn = try utils.createDefaultConnection(); + defer utils.closeConnection(conn); + + const js = conn.jetstream(.{}); + + // Generate unique store name + const store_name = try utils.generateUniqueName(testing.allocator, "teststore"); + defer testing.allocator.free(store_name); + + // Create ObjectStore manager and store + var objstore_manager = js.objectStoreManager(); + const config = nats.ObjectStoreConfig{ + .store_name = store_name, + .description = "Test object store for delete operations", + }; + + var objstore = try objstore_manager.createStore(config); + defer objstore.deinit(); + defer objstore_manager.deleteStore(store_name) catch {}; + + const test_data = "Data to be deleted"; + const object_name = "doomed-object.txt"; + + // Put object + const put_result = try objstore.putBytes(object_name, test_data); + defer put_result.deinit(); + + // Verify object exists + const get_result = try objstore.getBytes(object_name); + defer get_result.deinit(); + try testing.expectEqualStrings(test_data, get_result.value); + + // Delete object + try objstore.delete(object_name); + + // Verify object is deleted + try testing.expectError(nats.ObjectStoreError.ObjectNotFound, objstore.getBytes(object_name)); + + // Info should show deleted status + const info_result = try objstore.info(object_name); + defer info_result.deinit(); + try testing.expect(info_result.value.deleted); +} + +test "ObjectStore list operations" { + const conn = try utils.createDefaultConnection(); + defer utils.closeConnection(conn); + + const js = conn.jetstream(.{}); + + // Generate unique store name + const store_name = try utils.generateUniqueName(testing.allocator, "teststore"); + defer testing.allocator.free(store_name); + + // Create ObjectStore manager and store + var objstore_manager = js.objectStoreManager(); + const config = nats.ObjectStoreConfig{ + .store_name = store_name, + .description = "Test object store for list operations", + }; + + var objstore = try objstore_manager.createStore(config); + defer objstore.deinit(); + defer objstore_manager.deleteStore(store_name) catch {}; + + // Put multiple objects + const objects = [_]struct { name: []const u8, data: []const u8 }{ + .{ .name = "file1.txt", .data = "Content of file 1" }, + .{ .name = "file2.txt", .data = "Content of file 2" }, + .{ .name = "subdir/file3.txt", .data = "Content of file 3" }, + }; + + for (objects) |obj| { + const put_result = try objstore.putBytes(obj.name, obj.data); + defer put_result.deinit(); + } + + // List all objects + const list_result = try objstore.list(); + defer list_result.deinit(); + + try testing.expect(list_result.value.len >= objects.len); + + // Verify all objects are in the list + for (objects) |expected| { + var found = false; + for (list_result.value) |info| { + if (std.mem.eql(u8, info.name, expected.name)) { + found = true; + try testing.expect(!info.deleted); + try testing.expect(info.size == expected.data.len); + break; + } + } + try testing.expect(found); + } + + // Delete one object and verify it's no longer in the list + try objstore.delete("file1.txt"); + const list_result2 = try objstore.list(); + defer list_result2.deinit(); + + var found_deleted = false; + for (list_result2.value) |info| { + if (std.mem.eql(u8, info.name, "file1.txt")) { + found_deleted = true; + break; + } + } + try testing.expect(!found_deleted); // Should not be in the list since it's deleted +} + +test "ObjectStore validation" { + const conn = try utils.createDefaultConnection(); + defer utils.closeConnection(conn); + + // Test store name validation + try testing.expectError(error.InvalidOSBucketName, nats.validateOSBucketName("")); + try testing.expectError(error.InvalidOSBucketName, nats.validateOSBucketName("invalid space")); + try testing.expectError(error.InvalidOSBucketName, nats.validateOSBucketName("invalid.dot")); + + // Test object name validation + try testing.expectError(error.InvalidOSObjectName, nats.validateOSObjectName("")); + try testing.expectError(error.InvalidOSObjectName, nats.validateOSObjectName("/starts-with-slash")); + try testing.expectError(error.InvalidOSObjectName, nats.validateOSObjectName("ends-with-slash/")); + try testing.expectError(error.InvalidOSObjectName, nats.validateOSObjectName(".starts-with-dot")); + try testing.expectError(error.InvalidOSObjectName, nats.validateOSObjectName("ends-with-dot.")); + + // Valid names should pass + try nats.validateOSBucketName("valid-store_name123"); + try nats.validateOSObjectName("valid-object/name_123.txt"); +} + +test "ObjectStore error handling" { + const conn = try utils.createDefaultConnection(); + defer utils.closeConnection(conn); + + const js = conn.jetstream(.{}); + + // Generate unique store name + const store_name = try utils.generateUniqueName(testing.allocator, "teststore"); + defer testing.allocator.free(store_name); + + // Create ObjectStore manager and store + var objstore_manager = js.objectStoreManager(); + const config = nats.ObjectStoreConfig{ + .store_name = store_name, + .description = "Test object store for error handling", + }; + + var objstore = try objstore_manager.createStore(config); + defer objstore.deinit(); + defer objstore_manager.deleteStore(store_name) catch {}; + + // Try to get non-existent object + try testing.expectError(nats.ObjectStoreError.ObjectNotFound, objstore.getBytes("nonexistent.txt")); + try testing.expectError(nats.ObjectStoreError.ObjectNotFound, objstore.info("nonexistent.txt")); + + // Try to delete non-existent object + try testing.expectError(nats.ObjectStoreError.ObjectNotFound, objstore.delete("nonexistent.txt")); +}