Skip to content

Commit 4bfd54b

Browse files
justrachclaude
andcommitted
fix: harden against segfaults and OOM across core subsystems
BwTree: replace catch unreachable with try/errdefer on init, add deferred reclamation list for consolidated chains (two-phase epoch GC), bound collectEntries stack at 1024 with safety check. MmapFile: add RwLock so grow() takes exclusive lock during munmap/mmap while readers (at/slice) hold shared locks — prevents dangling pointers. Server: cap concurrent connections at 512 with atomic counter, replace billing_log ArrayList+orderedRemove(0) with O(1) ring buffer. FFI: wrap all opaque handles in DbHandle/ColHandle with magic number validation — stale/invalid handles return -1/null instead of segfaulting. Collection: trigger MVCC version chain GC every 10K inserts to prevent unbounded memory growth. Set sane default tenant quotas (256 collections, 10 GiB storage, 10K ops/sec). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 843a962 commit 4bfd54b

5 files changed

Lines changed: 197 additions & 58 deletions

File tree

src/bwtree.zig

Lines changed: 46 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -48,12 +48,18 @@ pub const BwTree = struct {
4848
root_pid: usize,
4949
next_page_id: std.atomic.Value(usize),
5050
allocator: std.mem.Allocator,
51+
// Deferred reclamation: old chains retired after consolidation are parked here
52+
// and freed on the next consolidation or deinit (simple two-phase approach).
53+
retired: std.ArrayList(*Page),
54+
retired_mu: std.Thread.Mutex,
5155

52-
pub fn init(allocator: std.mem.Allocator) BwTree {
56+
pub fn init(allocator: std.mem.Allocator) !BwTree {
5357
var tree: BwTree = undefined;
5458
tree.allocator = allocator;
5559
tree.root_pid = 0;
5660
tree.next_page_id = std.atomic.Value(usize).init(1);
61+
tree.retired = std.ArrayList(*Page).init(allocator);
62+
tree.retired_mu = .{};
5763

5864
// Zero out all mapping slots
5965
var i: usize = 0;
@@ -62,15 +68,19 @@ pub const BwTree = struct {
6268
}
6369

6470
// Create root base page (empty)
65-
const root_page = allocator.create(Page) catch unreachable;
66-
const empty_entries = allocator.alloc(Entry, 0) catch unreachable;
71+
const root_page = try allocator.create(Page);
72+
errdefer allocator.destroy(root_page);
73+
const empty_entries = try allocator.alloc(Entry, 0);
6774
root_page.* = Page{ .base = BasePage{ .entries = empty_entries } };
6875
tree.mapping[0] = std.atomic.Value(usize).init(@intFromPtr(root_page));
6976

7077
return tree;
7178
}
7279

7380
pub fn deinit(self: *BwTree) void {
81+
// Free all retired chains first
82+
self.drainRetired();
83+
self.retired.deinit();
7484
var i: usize = 0;
7585
while (i < MAX_PAGES) : (i += 1) {
7686
const ptr_val = self.mapping[i].load(.acquire);
@@ -95,6 +105,16 @@ pub const BwTree = struct {
95105
}
96106
}
97107

108+
/// Drain the retired list — frees chains that were parked on previous consolidations.
109+
fn drainRetired(self: *BwTree) void {
110+
self.retired_mu.lock();
111+
defer self.retired_mu.unlock();
112+
for (self.retired.items) |page| {
113+
self.freeChain(page);
114+
}
115+
self.retired.clearRetainingCapacity();
116+
}
117+
98118
// ─── allocPage ───────────────────────────────────────────────────────
99119

100120
fn allocPage(self: *BwTree) usize {
@@ -225,6 +245,10 @@ pub const BwTree = struct {
225245

226246
/// When delta chain exceeds MAX_DELTA_CHAIN, merge into a new base page.
227247
pub fn consolidate(self: *BwTree, page_id: usize) void {
248+
// Drain previously retired chains — they've survived at least one full
249+
// consolidation cycle, so readers from the previous epoch are done.
250+
self.drainRetired();
251+
228252
const old = self.mapping[page_id].load(.acquire);
229253
if (old == 0) return;
230254

@@ -264,9 +288,14 @@ pub const BwTree = struct {
264288
.acq_rel,
265289
.acquire,
266290
) == null) {
267-
// Success — old chain will be reclaimed by epoch-based GC.
268-
// Do NOT free here — concurrent readers may still be traversing it.
269-
// TODO: integrate with mvcc.zig epoch GC for safe reclamation.
291+
// Success — park old chain head for deferred reclamation.
292+
// Concurrent readers may still be traversing it; it will be freed
293+
// on the next consolidation cycle (two-phase epoch approach).
294+
self.retired_mu.lock();
295+
defer self.retired_mu.unlock();
296+
self.retired.append(page) catch {
297+
// If we can't track it, leak it — better than use-after-free.
298+
};
270299
} else {
271300
// Another thread consolidated first; discard our work
272301
self.allocator.free(new_entries);
@@ -276,12 +305,17 @@ pub const BwTree = struct {
276305

277306
fn collectEntries(self: *BwTree, page: *Page, map: *std.AutoHashMap(u64, Entry)) void {
278307
_ = self;
279-
// Walk to base first, then apply deltas in reverse (base → newest)
280-
var stack: [256]*Page = undefined;
308+
// Walk to base first, then apply deltas in reverse (base → newest).
309+
// Use a bounded stack — MAX_DELTA_CHAIN is 8, but under CAS contention
310+
// chains can temporarily grow longer. 1024 is generous; if exceeded we
311+
// truncate (lose oldest deltas) rather than crash.
312+
const STACK_CAP = 1024;
313+
var stack: [STACK_CAP]*Page = undefined;
281314
var depth: usize = 0;
282315
var cur: ?*Page = page;
283316

284317
while (cur) |p| {
318+
if (depth >= STACK_CAP) break; // safety bound
285319
stack[depth] = p;
286320
depth += 1;
287321
switch (p.*) {
@@ -342,7 +376,7 @@ fn makeEntry(key: u64, doc_id: u64) Entry {
342376
}
343377

344378
test "bwtree insert and search" {
345-
var tree = BwTree.init(std.testing.allocator);
379+
var tree = try BwTree.init(std.testing.allocator);
346380
defer tree.deinit();
347381

348382
try tree.insert(10, makeEntry(10, 100));
@@ -366,7 +400,7 @@ test "bwtree insert and search" {
366400
}
367401

368402
test "bwtree delete" {
369-
var tree = BwTree.init(std.testing.allocator);
403+
var tree = try BwTree.init(std.testing.allocator);
370404
defer tree.deinit();
371405

372406
try tree.insert(10, makeEntry(10, 100));
@@ -385,7 +419,7 @@ test "bwtree delete" {
385419

386420
test "bwtree consolidation" {
387421
// Use page_allocator — old chains deferred to epoch GC
388-
var tree = BwTree.init(std.heap.page_allocator);
422+
var tree = try BwTree.init(std.heap.page_allocator);
389423

390424
// Insert enough entries to trigger consolidation (> MAX_DELTA_CHAIN = 8)
391425
var i: u64 = 0;
@@ -408,7 +442,7 @@ test "bwtree consolidation" {
408442
test "bwtree concurrent inserts" {
409443
// Use page_allocator — consolidated chains are intentionally leaked
410444
// (deferred to epoch-based GC, not available in test context)
411-
var tree = BwTree.init(std.heap.page_allocator);
445+
var tree = try BwTree.init(std.heap.page_allocator);
412446

413447
const NUM_THREADS = 4;
414448
const KEYS_PER_THREAD = 50;

src/collection.zig

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,8 @@ fn extractJsonFloatArray(json: []const u8, field_name: []const u8, out: []f32) ?
163163
// ─── Collection ──────────────────────────────────────────────────────────
164164
pub const Collection = struct {
165165
pub const STRIPE_COUNT = 1024;
166+
// MVCC GC: trigger version chain cleanup every GC_INTERVAL inserts.
167+
const GC_INTERVAL: u64 = 10_000;
166168

167169
name_buf: [128]u8,
168170
name_len: u8,
@@ -184,7 +186,7 @@ pub const Collection = struct {
184186
key_epochs: std.AutoHashMap(u64, u64),
185187
alloc: std.mem.Allocator,
186188
cache: hot_cache.HotCache,
187-
// MVCC version chain — append-only, no vacuum.
189+
// MVCC version chain — append-only, vacuumed periodically via gc_counter.
188190
versions: mvcc_mod.VersionChain,
189191

190192
// Async index queues — inserts round-robin into 2 queues, 2 background threads drain them.
@@ -205,8 +207,7 @@ pub const Collection = struct {
205207
// Branch manager (optional — lazy-initialized on first branch creation)
206208
branch_mgr: ?*branch_mod.BranchManager = null,
207209
vec_entries: std.ArrayListUnmanaged(BTreeEntry) = .empty,
208-
209-
/// Get the stripe lock index for a key hash.
210+
gc_counter: std.atomic.Value(u64) = std.atomic.Value(u64).init(0),
210211
fn stripeIndex(key_hash: u64) usize {
211212
return @intCast(key_hash % STRIPE_COUNT);
212213
}
@@ -451,6 +452,11 @@ pub const Collection = struct {
451452

452453
emitChange(self, .insert, key, value, doc_id);
453454

455+
// Periodic MVCC version chain GC to prevent unbounded memory growth.
456+
if (self.gc_counter.fetchAdd(1, .monotonic) % GC_INTERVAL == GC_INTERVAL - 1) {
457+
_ = self.gcVersions();
458+
}
459+
454460
return doc_id;
455461
}
456462

@@ -1474,9 +1480,9 @@ pub const Database = struct {
14741480
auth: @import("auth.zig").AuthStore,
14751481

14761482
pub const TenantQuota = struct {
1477-
max_collections: u32 = std.math.maxInt(u32),
1478-
max_storage_bytes: usize = std.math.maxInt(usize),
1479-
max_ops_per_second: u32 = std.math.maxInt(u32),
1483+
max_collections: u32 = 256,
1484+
max_storage_bytes: usize = 10 * 1024 * 1024 * 1024, // 10 GiB
1485+
max_ops_per_second: u32 = 10_000,
14801486
};
14811487

14821488
pub const TenantUsage = struct {

0 commit comments

Comments
 (0)