Skip to content

Commit 6628b70

Browse files
authored
Optimize Cayenne catalog maintenance paths (spiceai#10904)
* Optimize Cayenne catalog maintenance paths * refactor(cayenne): replace `allow` with `expect` for Clippy lints and update function signatures to return `()` where applicable * fix(cayenne): satisfy lint after review updates * refactor(cayenne): update terminology from RPC to metastore call in benchmarks and documentation * Update vortex dependencies to revision 86cc337 for consistency across crates * feat(cayenne): add maintenance compaction triggers for protected snapshots and implement benchmark for cache invalidation * feat(cayenne): add compaction trigger snapshot age configuration and related logging * test(cayenne): add unit test for resolving compaction thresholds from acceleration parameters * fix(cayenne): update PARAMETERS array length to include additional S3 parameter * feat(cayenne): implement cache invalidation for scan listing tables and add integration test for snapshot publishing * refactor(cayenne): improve comments and update function signatures for clarity * feat(benchmarks): add benchmark for load_existing_keyset_cap_disabled * feat(runtime): add filter propagation option and related parsing function feat(telemetry): implement tracking for Cayenne write phase durations refactor(cayenne): streamline keyset handling and improve performance * Enhance VortexConfig and optimize staging file moves - Updated VortexConfig documentation to clarify runtime parameter effects on compaction and inline write settings. - Introduced a function to apply refresh mode defaults to VortexConfig based on the acceleration mode. - Adjusted default values for compaction and inline write parameters for append and changes refresh modes. - Improved performance of S3 staged-file moves by implementing concurrent processing, reducing the time readers are blocked during file operations. - Refactored benchmarks to reflect changes in the staging move concurrency and inline memtable read overhead. - Added tests to verify correct application of VortexConfig defaults based on refresh mode. * refactor(cayenne): update VortexConfig documentation and refresh mode handling * refactor(benches): improve documentation and clarify inline upsert benchmark details * refactor(cayenne): enhance VortexConfig documentation and improve small write handling * feat(cayenne): add compaction_trigger_protected_snapshots to VortexConfig and related functions * refactor: optimize max_sequence_number validation in DeletionIndex and KeyDeletionIndex * feat(cayenne): add scaling benchmark for upsert performance against DuckDB * refactor: improve comments and optimize keyset cache handling in Cayenne * refactor: enhance error handling and improve comments in Cayenne components * refactor: improve code clarity and consistency in catalog and deletion index implementations * refactor: enhance transaction handling and add query_row_values method for metastore * refactor: update comments to clarify byte-budget cap impact on keyset rebuilding * refactor: enhance comments and improve clarity in benchmark and catalog implementations * refactor: initialize raw field in CachedTableStatistics for first load/persist * refactor: update keyset cache handling to use byte budget and improve delete file validation * refactor: update keyset cache handling to use byte budget and improve delete file validation * refactor: simplify delete file validation and improve code clarity in multiple files * refactor: enhance keyset cache handling and improve parameter parsing with semantic hints * refactor: enhance parameter handling for snapshot age configuration in Cayenne * refactor: add benchmark for int64 primary key filter keep mask allocation and improve snapshot age parsing logic * refactor: add max coalesce age configuration to CDC and update documentation * refactor: implement batched multi-VALUES INSERT for delete-file rows with ON CONFLICT handling * refactor: clean up formatting and improve readability in delete file insertion logic * refactor: add benchmark for position-based deletion redundant walks * refactor: remove redundant parse_u64 function and streamline parsing logic * test: add unit test for batched on-conflict deletions in CayenneCatalog * refactor: improve comments and error handling in various modules * refactor: simplify visibility checks in deletion filter execution * refactor: enhance transaction handling in SqliteMetastore and update conflict deletion logic * fix: correct type in insert_pks vector creation for delete file tests * feat: add benchmark for apply_partial_deletion_filter to measure allocation efficiency * feat: add default on_schema_change handling in dataset configuration * feat: add benchmark for pk_lookup_file_group_fanout to evaluate partition sensitivity * refactor: simplify batch query in benchmark for target partitions * feat: add benchmark for pk_lookup_session_cache_warmup to evaluate cache performance * feat: add benchmarks for pk_in_list_vs_range_rewrite and get_max_delete_sequence_walk * refactor: streamline SQL query formatting and add async await to vortex config calls * perf: optimize get_max_delete_sequence by using cached max_sequence_number
1 parent 97009c3 commit 6628b70

48 files changed

Lines changed: 5897 additions & 826 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

Cargo.lock

Lines changed: 165 additions & 165 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -347,12 +347,12 @@ flatbuffers = "25.2.10"
347347
# Vortex tagged releases publish version 0.1.0 in their Cargo.toml, not the actual release version,
348348
# so we specify git deps here directly. The [patch.crates-io] entries below exist to override
349349
# transitive vortex dependencies pulled in by other patched crates.
350-
vortex = { git = "https://github.com/spiceai/vortex.git", rev = "c765c90de6e1adccb822d0bddfe67d6738cc0d39" } # spiceai-52.5
351-
vortex-array = { git = "https://github.com/spiceai/vortex.git", rev = "c765c90de6e1adccb822d0bddfe67d6738cc0d39" } # spiceai-52.5
352-
vortex-datafusion = { git = "https://github.com/spiceai/vortex.git", rev = "c765c90de6e1adccb822d0bddfe67d6738cc0d39" } # spiceai-52.5
353-
vortex-scan = { git = "https://github.com/spiceai/vortex.git", rev = "c765c90de6e1adccb822d0bddfe67d6738cc0d39" } # spiceai-52.5
354-
vortex-session = { git = "https://github.com/spiceai/vortex.git", rev = "c765c90de6e1adccb822d0bddfe67d6738cc0d39" } # spiceai-52.5
355-
vortex-utils = { git = "https://github.com/spiceai/vortex.git", rev = "c765c90de6e1adccb822d0bddfe67d6738cc0d39" } # spiceai-52.5
350+
vortex = { git = "https://github.com/spiceai/vortex.git", rev = "86cc337cb133fc54ec45bd665e348453a84785a6" } # spiceai-52.5
351+
vortex-array = { git = "https://github.com/spiceai/vortex.git", rev = "86cc337cb133fc54ec45bd665e348453a84785a6" } # spiceai-52.5
352+
vortex-datafusion = { git = "https://github.com/spiceai/vortex.git", rev = "86cc337cb133fc54ec45bd665e348453a84785a6" } # spiceai-52.5
353+
vortex-scan = { git = "https://github.com/spiceai/vortex.git", rev = "86cc337cb133fc54ec45bd665e348453a84785a6" } # spiceai-52.5
354+
vortex-session = { git = "https://github.com/spiceai/vortex.git", rev = "86cc337cb133fc54ec45bd665e348453a84785a6" } # spiceai-52.5
355+
vortex-utils = { git = "https://github.com/spiceai/vortex.git", rev = "86cc337cb133fc54ec45bd665e348453a84785a6" } # spiceai-52.5
356356

357357
x509-certificate = "0.25.0"
358358
spiceai = { git = "https://github.com/spiceai/spice-rs.git", rev = "4c63970cf1d78c71eb69e2f4346d5d4795631b47" } # branch: trunk
@@ -470,9 +470,9 @@ parquet = { git = "https://github.com/spiceai/arrow-rs.git", rev = "96190c47f7a1
470470

471471
object_store = { git = "https://github.com/apache/arrow-rs-object-store", rev = "f0a772cd49d2ebb1f19f487ccd93d705f48dc891" }
472472

473-
vortex = { git = "https://github.com/spiceai/vortex.git", rev = "c765c90de6e1adccb822d0bddfe67d6738cc0d39" } # spiceai-52.5
474-
vortex-array = { git = "https://github.com/spiceai/vortex.git", rev = "c765c90de6e1adccb822d0bddfe67d6738cc0d39" } # spiceai-52.5
475-
vortex-datafusion = { git = "https://github.com/spiceai/vortex.git", rev = "c765c90de6e1adccb822d0bddfe67d6738cc0d39" } # spiceai-52.5
476-
vortex-scan = { git = "https://github.com/spiceai/vortex.git", rev = "c765c90de6e1adccb822d0bddfe67d6738cc0d39" } # spiceai-52.5
477-
vortex-session = { git = "https://github.com/spiceai/vortex.git", rev = "c765c90de6e1adccb822d0bddfe67d6738cc0d39" } # spiceai-52.5
478-
vortex-utils = { git = "https://github.com/spiceai/vortex.git", rev = "c765c90de6e1adccb822d0bddfe67d6738cc0d39" } # spiceai-52.5
473+
vortex = { git = "https://github.com/spiceai/vortex.git", rev = "86cc337cb133fc54ec45bd665e348453a84785a6" } # spiceai-52.5
474+
vortex-array = { git = "https://github.com/spiceai/vortex.git", rev = "86cc337cb133fc54ec45bd665e348453a84785a6" } # spiceai-52.5
475+
vortex-datafusion = { git = "https://github.com/spiceai/vortex.git", rev = "86cc337cb133fc54ec45bd665e348453a84785a6" } # spiceai-52.5
476+
vortex-scan = { git = "https://github.com/spiceai/vortex.git", rev = "86cc337cb133fc54ec45bd665e348453a84785a6" } # spiceai-52.5
477+
vortex-session = { git = "https://github.com/spiceai/vortex.git", rev = "86cc337cb133fc54ec45bd665e348453a84785a6" } # spiceai-52.5
478+
vortex-utils = { git = "https://github.com/spiceai/vortex.git", rev = "86cc337cb133fc54ec45bd665e348453a84785a6" } # spiceai-52.5

crates/cayenne/Cargo.toml

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,18 @@ name = "checkpoint_fence_stall"
129129
harness = false
130130
name = "metastore_connection_contention"
131131

132+
[[bench]]
133+
harness = false
134+
name = "stats_persistence_rpc_ceiling"
135+
136+
[[bench]]
137+
harness = false
138+
name = "apply_partial_deletion_filter_per_scan"
139+
140+
[[bench]]
141+
harness = false
142+
name = "apply_on_conflict_rpc_ceiling"
143+
132144
[[bench]]
133145
harness = false
134146
name = "validate_on_conflict_buffering"
@@ -165,6 +177,10 @@ name = "deletion_vector_bitmap_to_treemap"
165177
harness = false
166178
name = "wide_table_key_probe_scan"
167179

180+
[[bench]]
181+
harness = false
182+
name = "load_existing_keyset_cap_disabled"
183+
168184
[[bench]]
169185
harness = false
170186
name = "vs_duckdb_in_list_delete"
@@ -200,6 +216,11 @@ harness = false
200216
name = "vs_duckdb_upsert"
201217
required-features = ["duckdb-bench"]
202218

219+
[[bench]]
220+
harness = false
221+
name = "vs_duckdb_upsert_scaling"
222+
required-features = ["duckdb-bench"]
223+
203224
[[bench]]
204225
harness = false
205226
name = "vs_duckdb_groupby"
@@ -218,3 +239,38 @@ required-features = ["duckdb-bench"]
218239
[[bench]]
219240
harness = false
220241
name = "compaction_picker"
242+
243+
[[bench]]
244+
harness = false
245+
name = "scan_listing_cache_invalidation"
246+
247+
[[bench]]
248+
harness = false
249+
name = "int64_pk_filter_keep_mask_alloc"
250+
251+
[[bench]]
252+
harness = false
253+
name = "position_delete_redundant_walks"
254+
255+
[[bench]]
256+
harness = false
257+
name = "apply_partial_filter_empty_alloc"
258+
259+
[[bench]]
260+
harness = false
261+
name = "pk_lookup_file_group_fanout"
262+
required-features = ["duckdb-bench"]
263+
264+
[[bench]]
265+
harness = false
266+
name = "pk_lookup_session_cache_warmup"
267+
required-features = ["duckdb-bench"]
268+
269+
[[bench]]
270+
harness = false
271+
name = "pk_in_list_vs_range_rewrite"
272+
required-features = ["duckdb-bench"]
273+
274+
[[bench]]
275+
harness = false
276+
name = "get_max_delete_sequence_walk"

crates/cayenne/README.md

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -122,9 +122,10 @@ pub trait MetadataCatalog: Send + Sync {
122122
async fn get_table(&self, table_name: &str) -> CatalogResult<TableMetadata>;
123123
async fn drop_table(&self, table_name: &str) -> CatalogResult<bool>;
124124

125-
// Sequence numbers
125+
// Sequence numbers (reserve reduces round-trips on serialized backends)
126126
async fn increment_sequence_number(&self, table_id: &str) -> CatalogResult<i64>;
127127
async fn get_sequence_number(&self, table_id: &str) -> CatalogResult<i64>;
128+
async fn reserve_sequence_numbers(&self, table_id: &str, count: u32) -> CatalogResult<i64>;
128129

129130
// Delete files (position- and key-based)
130131
async fn add_delete_file(&self, delete_file: DeleteFile) -> CatalogResult<String>;
@@ -186,7 +187,7 @@ pub trait MetadataCatalog: Send + Sync {
186187
- **`InlinedDataStats`**`{ total_rows, segment_count, total_bytes }` aggregated from `cayenne_inlined_data` for memtable-pressure decisions.
187188
- **`PartitionMetadata`** — composite partition key, partition path, record/byte counts.
188189
- **`TableStatistics`** — serialized `FileStatistics` blob plus `num_rows`; populated from Vortex file footers and read by the DataFusion planner.
189-
- **`VortexConfig`** — Vortex-side tuning. All fields configurable per dataset via `cayenne_*` runtime parameters:
190+
- **`VortexConfig`** — Vortex-side tuning. All fields configurable per dataset via `cayenne_*` runtime parameters. The runtime applies refresh-mode defaults before parsing explicit params: `refresh_mode: caching`, `changes`, and `append` with `refresh_check_interval <= 5m` favor small incremental writes, while manual/cron/long-interval append plus `refresh_mode: full`, `snapshot`, `disabled`, and unspecified refresh modes favor large Vortex writes by default. Append workloads can be small or large depending on caller batch size, so tune the inline and compaction parameters explicitly if refresh cadence does not reflect write size.
190191

191192
```rust
192193
pub struct VortexConfig {
@@ -204,20 +205,22 @@ pub struct VortexConfig {
204205
pub write_concurrency: Option<usize>, // None = session target_partitions; forced to 1 if sort_columns set
205206

206207
// Compaction
207-
pub compaction_trigger_files: usize, // default 8
208+
pub compaction_trigger_files: usize, // default caching/changes/short-append=4, otherwise=8
209+
pub compaction_trigger_protected_snapshots: usize, // default caching/changes/short-append=4, otherwise=8
210+
pub compaction_trigger_snapshot_age_ms: u64, // default caching/changes/short-append=60_000, otherwise=300_000; 0 disables age trigger
208211
pub compaction_max_levels: usize, // default 3
209212
pub compaction_max_files_per_pick: usize, // default 32
210-
pub compaction_background_interval_ms: u64, // default 30_000, 0 disables background loop
213+
pub compaction_background_interval_ms: u64, // default caching/changes/short-append=10_000, otherwise=30_000; 0 disables background loop
211214

212215
// Inline-write admission (per-call gate)
213-
pub inline_max_rows: usize, // default 1_024
214-
pub inline_max_bytes: usize, // default 1_048_576 (1 MiB serialized IPC)
215-
pub inline_max_buffer_bytes: usize, // default 4_194_304 (4 MiB pre-decode buffer)
216+
pub inline_max_rows: usize, // default caching/changes/short-append=1_024, otherwise=0
217+
pub inline_max_bytes: usize, // default caching/changes/short-append=1_048_576, otherwise=0
218+
pub inline_max_buffer_bytes: usize, // default caching/changes/short-append=4_194_304, otherwise=0
216219

217220
// Inline-memtable flush triggers (cumulative gate)
218-
pub inline_flush_max_rows: i64, // default 10_000
219-
pub inline_flush_max_segments: i64, // default 64
220-
pub inline_flush_max_bytes: i64, // default 8_388_608 (8 MiB total IPC)
221+
pub inline_flush_max_rows: i64, // default caching/changes/short-append=2_048, otherwise=10_000
222+
pub inline_flush_max_segments: i64, // default caching/changes/short-append=16, otherwise=64
223+
pub inline_flush_max_bytes: i64, // default caching/changes/short-append=2_097_152, otherwise=8_388_608
221224

222225
// PK conflict detection
223226
pub pk_conflict_detection: PkConflictDetection, // default Auto; None opts into blind append for CDC
Lines changed: 179 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,179 @@
1+
// Copyright 2026 The Spice.ai OSS Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// https://www.apache.org/licenses/LICENSE-2.0
8+
9+
//! Regression bench: per-upsert metastore call count in
10+
//! [`crate::provider::table::CayenneTableProvider::apply_on_conflict_deletions`].
11+
//!
12+
//! Older versions of the on-conflict path ran a non-atomic 3+ metastore-call
13+
//! sequence on every upsert that produces deletion vectors:
14+
//!
15+
//! 1. `catalog.increment_sequence_number(table_id)` — 1 call
16+
//! 2. `DeletionVectorWriter::write(specs)` — writes deletion-vector
17+
//! files to disk (NOT counted here; we measure metastore calls only)
18+
//! 3. For each `DeletionVectorWriteResult`:
19+
//! `catalog.add_delete_file(result.delete_file)` — 1 call per file
20+
//! 4. `catalog.add_insert_records_batch(...)` — 1 call per insert-record chunk
21+
//!
22+
//! For a typical PK-mode upsert that produces 1-2 delete files the cumulative
23+
//! cost was 3-4 metastore calls per upsert. None of those calls were wrapped
24+
//! in a transaction, so a crash between step 3 and step 4 could leave the
25+
//! catalog with delete-file records at `delete_sequence` but no insert-record
26+
//! at `insert_sequence`, permanently hiding the new row on restart.
27+
//!
28+
//! The production path now calls `commit_on_conflict_deletions`
29+
//! ([`crate::catalog::MetadataCatalog::commit_on_conflict_deletions`], wired in
30+
//! at `provider/table.rs:4506-4526`) which opens one transaction, INSERTs every
31+
//! `cayenne_delete_file` row, INSERTs every insert-record row (chunked under
32+
//! SQLite's 32 K-param cap, as `add_insert_records_batch_in_chunks` already
33+
//! does internally), and commits. Crash anywhere before commit → catalog state
34+
//! unchanged → the upsert is fully re-driveable from the calling write path.
35+
//!
36+
//! Counted metastore-call totals for the one-insert-record-chunk case:
37+
//!
38+
//! | path | calls per upsert | atomic? |
39+
//! |-------------------------------------------------|----------------------|---------|
40+
//! | older (`apply_on_conflict_deletions`, no txn) | `2 + delete_files` | no |
41+
//! | current (`commit_on_conflict_deletions` in txn) | `4 + delete_files` | yes |
42+
//!
43+
//! ## What this bench measures
44+
//!
45+
//! Pure shape — same `tokio::sync::Mutex<()>` + `tokio::time::sleep(call_latency)`
46+
//! pattern as `stats_persistence_rpc_ceiling.rs`. No real SQLite, no Cayenne
47+
//! setup. Two lanes per `(delete_files_per_upsert, upsert_count, call_latency)`:
48+
//!
49+
//! - `no_txn_baseline` — each upsert: 1 call (increment) + N calls
50+
//! (add_delete_file × N) + 1 call (add_insert_records_batch). Total =
51+
//! `(N + 2)` calls per upsert. Mirrors the older non-atomic path.
52+
//! - `atomic_txn_calls` — current behavior. Each upsert: 1 call (increment) +
53+
//! 1 call (begin transaction) + N calls (delete-file INSERTs) + 1 call
54+
//! (insert-record chunk INSERT) + 1 call (commit). Total = `(N + 4)` calls
55+
//! per upsert for this bench's single-chunk setup.
56+
//!
57+
//! The bench keeps the call-count tradeoff visible — atomicity costs a
58+
//! constant 2 extra calls per upsert in exchange for closing the crash window.
59+
//!
60+
//! `cargo bench --bench apply_on_conflict_rpc_ceiling -p cayenne`.
61+
62+
#![expect(clippy::expect_used)]
63+
64+
use std::hint::black_box;
65+
use std::sync::Arc;
66+
use std::time::Duration;
67+
68+
use criterion::{BenchmarkId, Criterion, Throughput, criterion_group, criterion_main};
69+
use tokio::sync::Mutex;
70+
71+
/// Per-call simulated metastore latency — mirrors the other RPC-ceiling benches.
72+
const RTTS: &[(&str, Duration)] = &[
73+
("rtt_1ms", Duration::from_millis(1)),
74+
("rtt_10ms", Duration::from_millis(10)),
75+
];
76+
77+
/// Delete files per upsert. Typical PK-mode upsert produces 1-2 files
78+
/// (one per touched virtual file). Larger counts model partitioned or
79+
/// wide-fan-out upserts.
80+
const DELETE_FILES_PER_UPSERT: &[usize] = &[1, 2, 4];
81+
82+
/// Upsert count per iteration. 32 keeps the worst-case shape
83+
/// (`N=4`, `rtt_10ms`, current) under 2.5 s per iteration so Criterion
84+
/// `--quick` produces a multi-sample distribution. Pattern lifted from
85+
/// `stats_persistence_rpc_ceiling.rs:128`.
86+
const UPSERTS_PER_ITERATION: usize = 32;
87+
88+
/// One simulated metastore call. Same shape as other RPC-ceiling benches.
89+
async fn one_metastore_call(pool: &Mutex<()>, rtt: Duration) {
90+
let _guard = pool.lock().await;
91+
tokio::time::sleep(rtt).await;
92+
}
93+
94+
/// Lane A: mirrors the older non-atomic `apply_on_conflict_deletions`. Each
95+
/// upsert pays `2 + delete_files` separate metastore calls.
96+
async fn run_current(pool: &Arc<Mutex<()>>, upserts: usize, delete_files: usize, rtt: Duration) {
97+
for _ in 0..upserts {
98+
// 1. increment_sequence_number
99+
one_metastore_call(pool, rtt).await;
100+
// 3. add_delete_file × N
101+
for _ in 0..delete_files {
102+
one_metastore_call(pool, rtt).await;
103+
}
104+
// 4. add_insert_records_batch
105+
one_metastore_call(pool, rtt).await;
106+
}
107+
}
108+
109+
/// Lane B: current behavior — `commit_on_conflict_deletions` as implemented:
110+
/// `increment_sequence_number`, `BEGIN`, per-delete-file `INSERT`, one
111+
/// insert-record chunk `INSERT`, then `COMMIT`.
112+
async fn run_proposed(pool: &Arc<Mutex<()>>, upserts: usize, delete_files: usize, rtt: Duration) {
113+
for _ in 0..upserts {
114+
// 1. increment_sequence_number
115+
one_metastore_call(pool, rtt).await;
116+
// 2. begin_transaction
117+
one_metastore_call(pool, rtt).await;
118+
// 3. delete-file INSERT × N
119+
for _ in 0..delete_files {
120+
one_metastore_call(pool, rtt).await;
121+
}
122+
// 4. one insert-record chunk INSERT
123+
one_metastore_call(pool, rtt).await;
124+
// 5. commit
125+
one_metastore_call(pool, rtt).await;
126+
}
127+
}
128+
129+
fn bench_apply_on_conflict_rpc_ceiling(c: &mut Criterion) {
130+
let rt = tokio::runtime::Builder::new_multi_thread()
131+
.worker_threads(4)
132+
.enable_all()
133+
.build()
134+
.expect("tokio runtime");
135+
136+
let mut group = c.benchmark_group("apply_on_conflict_rpc_ceiling");
137+
group.sample_size(10);
138+
139+
for &(rtt_label, rtt) in RTTS {
140+
for &delete_files in DELETE_FILES_PER_UPSERT {
141+
let upserts_total = u64::try_from(UPSERTS_PER_ITERATION).unwrap_or(u64::MAX);
142+
group.throughput(Throughput::Elements(upserts_total));
143+
144+
let id = format!("delete_files={delete_files}/{rtt_label}");
145+
let pool_a = Arc::new(Mutex::new(()));
146+
group.bench_with_input(
147+
BenchmarkId::new("no_txn_baseline", &id),
148+
&delete_files,
149+
|b, &delete_files| {
150+
b.to_async(&rt).iter(|| {
151+
let pool = Arc::clone(&pool_a);
152+
async move {
153+
run_current(&pool, UPSERTS_PER_ITERATION, delete_files, rtt).await;
154+
black_box(pool);
155+
}
156+
});
157+
},
158+
);
159+
let pool_b = Arc::new(Mutex::new(()));
160+
group.bench_with_input(
161+
BenchmarkId::new("atomic_txn_calls", &id),
162+
&delete_files,
163+
|b, &delete_files| {
164+
b.to_async(&rt).iter(|| {
165+
let pool = Arc::clone(&pool_b);
166+
async move {
167+
run_proposed(&pool, UPSERTS_PER_ITERATION, delete_files, rtt).await;
168+
black_box(pool);
169+
}
170+
});
171+
},
172+
);
173+
}
174+
}
175+
group.finish();
176+
}
177+
178+
criterion_group!(benches, bench_apply_on_conflict_rpc_ceiling);
179+
criterion_main!(benches);

0 commit comments

Comments
 (0)