Skip to content

Commit 0839e19

Browse files
committed
Merge branch 'cxdb-resiliency-1902202601'
2 parents d8550d7 + 4ff4304 commit 0839e19

20 files changed

Lines changed: 454 additions & 177 deletions

File tree

deploy/docker-compose/.env.example

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,3 +66,13 @@ PUBLIC_BASE_URL=http://localhost:8080
6666
# Zstd compression level 1-22 (default: 3)
6767
# Higher = better compression but slower
6868
# CXDB_COMPRESSION_LEVEL=3
69+
70+
# Maximum concurrent binary protocol connections (default: 512, 0 = unlimited)
71+
# CXDB_MAX_CONNECTIONS=512
72+
73+
# Read timeout for idle binary protocol connections in seconds (default: 300)
74+
# Clients that send no data for this long are disconnected
75+
# CXDB_CONNECTION_READ_TIMEOUT_SECS=300
76+
77+
# Write timeout for binary protocol connections in seconds (default: 30)
78+
# CXDB_CONNECTION_WRITE_TIMEOUT_SECS=30

deploy/kubernetes/README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,8 @@ Edit `configmap.yaml` and update:
101101
- `GOOGLE_ALLOWED_DOMAIN` - Email domain for access control (e.g., "example.com")
102102
- `CXDB_LOG_LEVEL` - Set to "debug" for verbose logging (optional)
103103
- `CXDB_ENABLE_METRICS` - Set to "true" to enable Prometheus (optional)
104+
- `CXDB_MAX_CONNECTIONS` - Max concurrent binary protocol connections (default: 512)
105+
- `CXDB_CONNECTION_READ_TIMEOUT_SECS` - Idle connection timeout in seconds (default: 300)
104106

105107
Apply the ConfigMap:
106108

deploy/kubernetes/configmap.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,11 @@ data:
2121
# Storage tuning
2222
CXDB_MAX_BLOB_SIZE: "10485760" # 10MB
2323
CXDB_COMPRESSION_LEVEL: "3" # Zstd level 1-22
24+
25+
# Connection limits
26+
CXDB_MAX_CONNECTIONS: "512" # Max concurrent binary protocol connections
27+
CXDB_CONNECTION_READ_TIMEOUT_SECS: "300" # Idle connection timeout (seconds)
28+
CXDB_CONNECTION_WRITE_TIMEOUT_SECS: "30" # Write timeout (seconds)
2429
---
2530
apiVersion: v1
2631
kind: ConfigMap

docs/architecture.md

Lines changed: 39 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ turn_1 (root, depth=1) → turn_2 (depth=2) → turn_3 (depth=3)
5252
↘ turn_4 (depth=3, branch)
5353
```
5454

55-
**Turn Record** (fixed-size, 104 bytes):
55+
**Turn Record** (fixed-size, 80 bytes):
5656

5757
```rust
5858
TurnRecordV1 {
@@ -251,32 +251,50 @@ graph LR
251251

252252
## Concurrency Model
253253

254+
**Store-Level RwLock:**
255+
256+
- The `Store` is guarded by a `RwLock`, not a `Mutex`
257+
- Read operations (GetHead, GetLast, GetBlob, search, list contexts) acquire a shared read lock
258+
- Write operations (AppendTurn, CtxCreate, CtxFork, PutBlob) acquire an exclusive write lock
259+
- Multiple readers proceed concurrently; writers are serialized
260+
254261
**TurnID Allocation:**
262+
255263
- Single global atomic counter
256264
- Linearizable: each turn gets a unique, monotonic ID
257265

258-
**Per-Context Head Updates:**
259-
- Per-context mutex guards head pointer updates
260-
- Append operations are serialized per context
261-
- Different contexts can append concurrently
266+
**Blob Reads (pread):**
267+
268+
- `BlobStore::get()` uses `pread` (positional read) via a separate read-only file handle
269+
- This avoids seek/read mutation, allowing `&self` access under a shared read lock
270+
- Multiple threads can read different blobs concurrently without contention
262271

263272
**Blob Deduplication:**
264-
- Index sharded by hash prefix (16 shards)
265-
- Double-checked locking: check index, lock shard, check again, write if missing
266273

267-
**Example Concurrency:**
274+
- `put_if_absent()` checks the in-memory index first (O(1) hash lookup)
275+
- If missing, compresses and appends under the write lock
268276

269-
```
270-
Thread 1: Append to context 1 → acquire lock(ctx1) → write turn → update head → release
271-
Thread 2: Append to context 2 → acquire lock(ctx2) → write turn → update head → release
272-
Thread 3: Append to context 1 → wait for lock(ctx1) → write turn → update head → release
273-
```
277+
**Connection Limits:**
278+
279+
- Binary protocol connections are capped at `CXDB_MAX_CONNECTIONS` (default: 512)
280+
- Read/write timeouts prevent stalled clients from holding threads indefinitely
281+
- Connections beyond the limit are rejected immediately
282+
283+
**SSE Event Bus:**
274284

275-
Blobs are deduplicated safely under contention:
285+
- Event subscribers use bounded channels (4096 events max per subscriber)
286+
- `publish()` uses `try_send()`: slow subscribers have events dropped rather than accumulating unbounded memory
287+
- Disconnected subscribers are removed automatically on the next publish
288+
289+
**Example Concurrency:**
276290

277291
```
278-
Thread A: hash=abc → check index (miss) → lock shard(abc) → check again → write → unlock
279-
Thread B: hash=abc → check index (hit from Thread A) → return existing entry
292+
Thread 1 (read): get_last(ctx=1) │ Concurrent (shared read lock)
293+
Thread 2 (read): get_last(ctx=2) │
294+
Thread 3 (read): search("tag=foo") │
295+
296+
Thread 4 (write): append_turn(ctx=1) │ Exclusive (waits for readers to drain)
297+
Thread 5 (read): get_blob(hash) │ Waits for write to finish
280298
```
281299

282300
## Performance Characteristics
@@ -302,7 +320,7 @@ Thread B: hash=abc → check index (hit from Thread A) → return existing entry
302320
- **Total: ~0.2ms per turn**
303321

304322
**Storage Efficiency:**
305-
- Turn record: 104 bytes
323+
- Turn record: 80 bytes
306324
- Turn metadata: ~50 bytes (type_id + encoding)
307325
- Blob overhead: ~50 bytes (header + CRC)
308326
- Typical 10KB payload compresses to ~3KB (70% reduction)
@@ -339,10 +357,10 @@ Thread B: hash=abc → check index (hit from Thread A) → return existing entry
339357
- **Fast**: No seeks, pure sequential writes (~500 MB/s on SSD)
340358
- **Simple recovery**: Scan forward, truncate to last valid CRC
341359

342-
### Why Per-Context Locks?
360+
### Why RwLock?
343361

344-
- **Scalability**: N contexts can append concurrently
345-
- **Correctness**: Head updates are linearizable per context
362+
- **Read scalability**: Multiple readers proceed concurrently (reads dominate in practice)
363+
- **Correctness**: Writers get exclusive access for head updates and appends
346364
- **Simple**: No complex MVCC or optimistic concurrency
347365

348366
## Scalability Limits (v1)
@@ -355,7 +373,7 @@ Thread B: hash=abc → check index (hit from Thread A) → return existing entry
355373

356374
**Not supported in v1:**
357375
- Distributed/replicated storage
358-
- Horizontal read scaling (single reader process)
376+
- Horizontal read scaling (concurrent reads within single process via RwLock, but single-node)
359377
- Sub-blob chunking for huge payloads (>1MB)
360378

361379
See [Roadmap](https://github.com/strongdm/cxdb/blob/main/ROADMAP.md) for v2 features.

docs/deployment.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,9 @@ Internet
5050
| `CXDB_ENABLE_METRICS` | `false` | Enable Prometheus metrics on :9011 |
5151
| `CXDB_MAX_BLOB_SIZE` | `10485760` | Max blob size (10MB) |
5252
| `CXDB_COMPRESSION_LEVEL` | `3` | Zstd compression level (1-22) |
53+
| `CXDB_MAX_CONNECTIONS` | `512` | Max concurrent binary protocol connections (0 = unlimited) |
54+
| `CXDB_CONNECTION_READ_TIMEOUT_SECS` | `300` | Idle read timeout per connection (seconds) |
55+
| `CXDB_CONNECTION_WRITE_TIMEOUT_SECS` | `30` | Write timeout per connection (seconds) |
5356

5457
**Gateway (Go):**
5558

docs/storage.md

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,21 @@ ContextHeadRecord {
9797
}
9898
```
9999

100+
## Durability
101+
102+
All write operations use `sync_all()` (fsync) after each file write. This ensures
103+
data is flushed from the OS page cache to stable storage before the server
104+
acknowledges the write to the client. The sync order for an append is:
105+
106+
1. `blobs.pack` + `blobs.idx` (blob data and index)
107+
2. `turns.log` (turn record)
108+
3. `turns.idx` (turn index)
109+
4. `turns.meta` (type metadata)
110+
5. `heads.tbl` (context head update)
111+
112+
A crash at any point leaves files in a recoverable state: CRC checks on startup
113+
detect and truncate partial writes.
114+
100115
## Recovery
101116

102117
On startup the store scans logs sequentially. If a trailing record fails CRC or is incomplete,

server/src/blob_store/README.md

Lines changed: 27 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -102,14 +102,9 @@ Creates or opens:
102102
let data = b"Hello, world!";
103103
let hash = blake3::hash(data);
104104

105-
// Put will compress and deduplicate
106-
let was_new = store.put(&hash, data)?;
107-
108-
if was_new {
109-
println!("New blob stored");
110-
} else {
111-
println!("Blob already exists (deduplicated)");
112-
}
105+
// put_if_absent will compress and deduplicate
106+
let entry = store.put_if_absent(*hash.as_bytes(), data)?;
107+
println!("Stored: offset={}, raw_len={}", entry.offset, entry.raw_len);
113108
```
114109

115110
**Compression:**
@@ -121,13 +116,13 @@ if was_new {
121116
```rust
122117
let hash: [u8; 32] = /* ... */;
123118

124-
match store.get(&hash)? {
125-
Some(data) => println!("Found: {} bytes", data.len()),
126-
None => println!("Blob not found"),
127-
}
119+
let data = store.get(&hash)?; // Returns Result<Vec<u8>>
120+
println!("Found: {} bytes", data.len());
128121
```
129122

130-
Returns decompressed bytes.
123+
Returns decompressed bytes or a `NotFound` error. Reads use `pread` (positional
124+
read) via a separate read-only file handle, so `get()` takes `&self` and
125+
multiple threads can read concurrently without contention.
131126

132127
### Checking Existence
133128

@@ -147,14 +142,15 @@ The blob store automatically deduplicates identical content:
147142
let data1 = b"foo";
148143
let hash1 = blake3::hash(data1);
149144

150-
store.put(&hash1, data1)?; // Writes to pack file
151-
store.put(&hash1, data1)?; // No-op (already exists)
145+
store.put_if_absent(*hash1.as_bytes(), data1)?; // Writes to pack file
146+
store.put_if_absent(*hash1.as_bytes(), data1)?; // No-op (already exists)
152147
```
153148

154149
**Thread safety:**
155-
- Uses sharded locks (16 shards by hash prefix)
156-
- Double-checked locking: check index, acquire lock, check again, write if missing
157-
- Safe under concurrent writes
150+
151+
- Reads (`get`, `contains`, `raw_len`) take `&self` and are safe under concurrent access
152+
- Writes (`put_if_absent`) take `&mut self` and are serialized by the caller's write lock
153+
- Deduplication check is O(1) in the in-memory hash index
158154

159155
## Compression
160156

@@ -194,15 +190,19 @@ let hash = blake3::hash(data); // Hash before compression
194190
let compressed = zstd::encode(data, 3)?;
195191

196192
// Store with original hash
197-
store.put(&hash, data)?; // Handles compression internally
193+
store.put_if_absent(*hash.as_bytes(), data)?; // Handles compression internally
198194
```
199195

200196
**Why BLAKE3?**
201197
- Fast: 3-4x faster than SHA-256
202198
- Secure: 256-bit output, collision-resistant
203199
- Deterministic: Same input always produces same hash
204200

205-
## Crash Recovery
201+
## Durability and Crash Recovery
202+
203+
All writes (`put_if_absent`) are followed by `sync_all()` (fsync) on both the
204+
pack file and the index file. This ensures data is on stable storage before the
205+
caller is notified of success.
206206

207207
On startup, the store:
208208

@@ -212,6 +212,7 @@ On startup, the store:
212212
4. Rebuilds if necessary
213213

214214
**CRC verification:**
215+
215216
- Each blob record has a CRC-32
216217
- On read, CRC is verified
217218
- Corrupted blobs return error (caller must handle)
@@ -247,7 +248,7 @@ use blob_store::{BlobStore, BlobCodec};
247248
use blake3;
248249

249250
fn main() -> Result<(), Box<dyn std::error::Error>> {
250-
// Open store
251+
// Open store (mut needed for writes, reads only need &self)
251252
let mut store = BlobStore::open(Path::new("./data/blobs"))?;
252253

253254
// Prepare data
@@ -260,13 +261,12 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
260261
// Compute hash (before compression)
261262
let hash = blake3::hash(&msgpack_bytes);
262263

263-
// Store (will compress internally)
264-
let was_new = store.put(&hash, &msgpack_bytes)?;
265-
println!("Stored: was_new={}", was_new);
264+
// Store (will compress internally, requires &mut self)
265+
let _entry = store.put_if_absent(*hash.as_bytes(), &msgpack_bytes)?;
266+
println!("Stored blob");
266267

267-
// Retrieve
268-
let retrieved = store.get(&hash)?
269-
.expect("Blob should exist");
268+
// Retrieve (uses pread, only needs &self)
269+
let retrieved = store.get(hash.as_bytes())?;
270270

271271
assert_eq!(retrieved, msgpack_bytes);
272272
println!("Retrieved: {} bytes", retrieved.len());

0 commit comments

Comments
 (0)