Skip to content

Commit e0904fa

Browse files
Synchronize Cayenne partition commits across partitions (spiceai#10125) (spiceai#10819)
* Add CayenneStagedAppend two-phase commit lifecycle (spiceai#10125) * Add commit_compaction_in_txn catalog primitive (spiceai#10125) * Tighten CayenneTableProvider scan() lock against barrier (spiceai#10125) * Extract PreparedOverwrite lifecycle from write_all_overwrite (spiceai#10125) * Add CayennePartitionedInsertStrategy for atomic overwrite (spiceai#10125) * Add top-level partitioned-commit WAL format (spiceai#10125) * Extend CayennePartitionedInsertStrategy to atomic append (spiceai#10125) * Address Copilot review comments — atomic WAL write, S3 fallback (spiceai#10125) - partitioned_wal: write WAL via tempfile + fsync + atomic rename + parent dir fsync so a crash mid-write can never leave an unparseable file that read_all_in would skip. - partitioned_insert_strategy: skip the top-level partitioned WAL for S3-backed table roots (tokio::fs would attempt local paths like `s3:/bucket/...` and fail). Per-partition staging WAL still anchors single-partition recovery on S3; the cross-partition set anchor is deferred until the WAL grows object-store IO. - partitioned_insert_strategy: module docs now describe the actual append coordination flow (CayennePartitionedAppendSink) instead of the stale "falls through to DefaultInsertStrategy" wording. - staging_wal / overwrite: rollback paths hold the per-table write guard for the full cleanup so a concurrent writer cannot observe leftover staging state mid-rollback. * Drop stringified-column shortcut in partition-expr compilation (spiceai#10125) create_partition_physical_exprs previously tried an unqualified column lookup using `other.to_string()` for any non-Column partition expression before falling back to compiling the real expression. If the input schema contained a column whose name happened to match the debug form of a derived partition expression, the shortcut would silently succeed and route partitioning through the wrong physical expression — incorrect partition keys, incorrect commit groupings. Compile bare `Expr::Column` partitions via unqualified-name lookup as before; all other expressions are compiled as-is. No string-form fallback. Addresses Copilot comment on PR spiceai#10819. Also restages fmt-only fixes left from the trunk merge. * Add backticks around DataFusion in builder.rs test doc clippy::doc_markdown caught this on the test-clippy step (the lib step clears with the same rule, but the test build picks up a different cfg set). Pre-existing on trunk; fixing here so the PR branch's lint is green. * Update stats edge-case test to assert post-merge aggregation persist_table_stats now merges the current write's accumulator with the existing metastore aggregate (current trunk behavior, see spiceai#10818). The test expectations were locked to the pre-merge "latest-write-only" path with a TBD note for when aggregation lands — refresh them to the merged shape: num_rows = 3 + 2 = 5, min/max span both batches. Renamed the test + doc to match.
1 parent 33e498d commit e0904fa

21 files changed

Lines changed: 3470 additions & 272 deletions

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/cayenne/Cargo.toml

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ turso = ["dep:turso"]
7070
partition-table-provider = ["dep:runtime-table-partition"]
7171

7272
[dev-dependencies]
73-
criterion = { version = "0.7", features = ["html_reports"] }
73+
criterion = { version = "0.7", features = ["html_reports", "async_tokio"] }
7474
datafusion-federation.workspace = true
7575
datafusion-functions = { workspace = true }
7676
insta = { workspace = true, features = ["json"] }
@@ -80,6 +80,7 @@ tempfile = { workspace = true }
8080
test-framework = { path = "../test-framework" }
8181
test-log = { version = "0.2", features = ["trace"] }
8282
tokio = { workspace = true, features = ["test-util", "rt-multi-thread", "fs"] }
83+
tokio-stream = { workspace = true }
8384

8485
[[bench]]
8586
harness = false
@@ -96,3 +97,7 @@ name = "deletion_index_probe"
9697
[[bench]]
9798
harness = false
9899
name = "mutation_writer"
100+
101+
[[bench]]
102+
harness = false
103+
name = "listing_fence_overhead"
Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,154 @@
1+
// Copyright 2026 The Spice.ai OSS Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// https://www.apache.org/licenses/LICENSE-2.0
8+
9+
//! Microbenchmark for the `listing_table` synchronization pattern in
10+
//! `CayenneTableProvider::scan()`.
11+
//!
12+
//! Step 3 of issue #10125 replaced `Arc<std::sync::RwLock<Arc<ListingTable>>>`
13+
//! with `Arc<ArcSwap<ListingTable>>` + a separate
14+
//! `Arc<tokio::sync::RwLock<()>>` fence. The previous "grab Arc under brief
15+
//! sync guard, drop guard, then `.scan().await`" pattern is replaced with a
16+
//! fence read held across the `.scan().await` so that concurrent writer
17+
//! barriers cannot interleave with the listing operation.
18+
//!
19+
//! This bench isolates the per-scan synchronization cost of each pattern
20+
//! (no DataFusion listing, no I/O, no filesystem state — just the lock
21+
//! primitives) and runs both inside the same tokio runtime, so the `.await`
22+
//! that the new pattern requires does not double-count `Runtime::block_on`
23+
//! overhead.
24+
25+
#![allow(clippy::expect_used)]
26+
27+
use std::hint::black_box;
28+
use std::sync::Arc;
29+
30+
use arc_swap::ArcSwap;
31+
use criterion::{Criterion, criterion_group, criterion_main};
32+
use tokio::runtime::Runtime;
33+
34+
/// Stand-in for `Arc<ListingTable>` so the comparison focuses on lock cost
35+
/// rather than the size of the inner type.
36+
type Inner = String;
37+
38+
fn make_inner() -> Arc<Inner> {
39+
Arc::new("listing_table_placeholder".to_string())
40+
}
41+
42+
// ----------------------------------------------------------------------------
43+
// Uncontended single-task access.
44+
// ----------------------------------------------------------------------------
45+
//
46+
// Steady-state read path: one task calling scan() with no concurrent reader
47+
// or writer. Measures the per-call overhead of acquiring the synchronization
48+
// primitive + loading the Arc. Both patterns run inside the same tokio
49+
// runtime via `to_async` so the async pattern doesn't pay block_on overhead.
50+
51+
fn bench_uncontended_old_pattern(c: &mut Criterion) {
52+
let rt = Runtime::new().expect("runtime");
53+
let lock: Arc<std::sync::RwLock<Arc<Inner>>> = Arc::new(std::sync::RwLock::new(make_inner()));
54+
55+
c.bench_function("uncontended/old_sync_rwlock_then_arc_clone", |b| {
56+
b.to_async(&rt).iter(|| async {
57+
let guard = lock.read().expect("read");
58+
let snapshot = Arc::clone(&guard);
59+
drop(guard);
60+
black_box(snapshot);
61+
});
62+
});
63+
}
64+
65+
fn bench_uncontended_new_pattern(c: &mut Criterion) {
66+
let rt = Runtime::new().expect("runtime");
67+
let arc_swap: Arc<ArcSwap<Inner>> = Arc::new(ArcSwap::new(make_inner()));
68+
let fence: Arc<tokio::sync::RwLock<()>> = Arc::new(tokio::sync::RwLock::new(()));
69+
70+
c.bench_function("uncontended/new_fence_read_then_arcswap_load", |b| {
71+
b.to_async(&rt).iter(|| async {
72+
let _fence_guard = fence.read().await;
73+
let snapshot = arc_swap.load_full();
74+
black_box(snapshot);
75+
});
76+
});
77+
}
78+
79+
// ----------------------------------------------------------------------------
80+
// Concurrent-reader access, no writer.
81+
// ----------------------------------------------------------------------------
82+
//
83+
// Multi-tenant steady state: several scans share the same partition. Both
84+
// std::sync::RwLock and tokio::sync::RwLock allow parallel readers, but each
85+
// adds different atomic-counter overhead per acquisition.
86+
87+
fn bench_concurrent_readers_old_pattern(c: &mut Criterion) {
88+
let rt = Runtime::new().expect("runtime");
89+
let lock: Arc<std::sync::RwLock<Arc<Inner>>> = Arc::new(std::sync::RwLock::new(make_inner()));
90+
91+
// Background reader that keeps a read guard outstanding most of the time.
92+
let bg_lock = Arc::clone(&lock);
93+
let bg = std::thread::spawn(move || {
94+
loop {
95+
let _guard = bg_lock.read().expect("bg read");
96+
std::thread::yield_now();
97+
if Arc::strong_count(&bg_lock) == 1 {
98+
break;
99+
}
100+
}
101+
});
102+
103+
c.bench_function("concurrent_readers/old_sync_rwlock", |b| {
104+
b.to_async(&rt).iter(|| async {
105+
let guard = lock.read().expect("read");
106+
let snapshot = Arc::clone(&guard);
107+
drop(guard);
108+
black_box(snapshot);
109+
});
110+
});
111+
112+
drop(lock);
113+
let _ = bg.join();
114+
}
115+
116+
fn bench_concurrent_readers_new_pattern(c: &mut Criterion) {
117+
let rt = Runtime::new().expect("runtime");
118+
let arc_swap: Arc<ArcSwap<Inner>> = Arc::new(ArcSwap::new(make_inner()));
119+
let fence: Arc<tokio::sync::RwLock<()>> = Arc::new(tokio::sync::RwLock::new(()));
120+
121+
let bg_fence = Arc::clone(&fence);
122+
let bg = std::thread::spawn(move || {
123+
let rt = Runtime::new().expect("bg runtime");
124+
rt.block_on(async {
125+
loop {
126+
let _fence_guard = bg_fence.read().await;
127+
tokio::task::yield_now().await;
128+
if Arc::strong_count(&bg_fence) == 1 {
129+
break;
130+
}
131+
}
132+
});
133+
});
134+
135+
c.bench_function("concurrent_readers/new_fence_arcswap", |b| {
136+
b.to_async(&rt).iter(|| async {
137+
let _fence_guard = fence.read().await;
138+
let snapshot = arc_swap.load_full();
139+
black_box(snapshot);
140+
});
141+
});
142+
143+
drop(fence);
144+
let _ = bg.join();
145+
}
146+
147+
criterion_group!(
148+
benches,
149+
bench_uncontended_old_pattern,
150+
bench_uncontended_new_pattern,
151+
bench_concurrent_readers_old_pattern,
152+
bench_concurrent_readers_new_pattern,
153+
);
154+
criterion_main!(benches);

0 commit comments

Comments
 (0)