Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/cayenne/src/metastore/sqlite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ async fn configure_sqlite_connection(
///
/// Pool size is `min(available_parallelism, 32)` (minimum 2). If
/// `available_parallelism()` fails (rare — e.g. seccomp-restricted
/// environments), K falls back to 4. SQLite WAL mode allows many
/// environments), K falls back to 4. `SQLite` WAL mode allows many
/// concurrent readers per database file (read-only operations don't take
/// the WAL write lock), so a larger pool lifts the read-side concurrency
/// ceiling for metadata-heavy workloads — e.g. 64-core deployments running
Expand Down
16 changes: 16 additions & 0 deletions crates/cayenne/src/provider/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1485,6 +1485,12 @@ impl DeletionSink for PkKeysetInvalidatingDeletionSink {
let deleted = self.inner.delete_from().await?;
if deleted > 0 {
self.table.clear_cached_pk_keyset();
// Drop the per-file stats `CayenneTableProvider::collect_scan_file_statistics`
// caches. Without this, a follow-up `COUNT(*)` (or any other stats-driven
// query) is served the row count we computed *before* this delete added
// its rows to the position-based deletion vector, so the count is stale —
// see `tests/position_based_deletion_test.rs::test_position_based_sequential_deletes`.
self.table.invalidate_scan_file_statistics();
}
Ok(deleted)
}
Expand Down Expand Up @@ -6178,6 +6184,16 @@ impl CayenneTableProvider {
self.refresh_listing_table_under_held_fence()
}

/// Drop every entry in [`Self::scan_file_statistics`].
///
/// Calls must follow any operation that adds, removes, or updates
/// position-based deletion vectors so the next stats-driven query (e.g.
/// `COUNT(*)`) reinvokes `infer_stats`, which in turn reapplies the
/// `VortexAccessPlanProvider` and observes the fresh deletion bitmap.
pub(crate) fn invalidate_scan_file_statistics(&self) {
self.scan_file_statistics.clear();
}

/// Refresh the listing table, ASSUMING the caller already holds
/// [`Self::listing_fence`] for write.
///
Expand Down
57 changes: 53 additions & 4 deletions crates/runtime/src/execution_plan/udtf_exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,19 +168,19 @@ impl ExecutionPlan for UdtfExec {
}

fn required_input_distribution(&self) -> Vec<Distribution> {
vec![]
vec![Distribution::UnspecifiedDistribution]
}

fn required_input_ordering(&self) -> Vec<Option<OrderingRequirements>> {
vec![]
vec![None]
}

fn maintains_input_order(&self) -> Vec<bool> {
vec![]
vec![true]
}

fn benefits_from_input_partitioning(&self) -> Vec<bool> {
vec![]
vec![false]
}

fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
Expand Down Expand Up @@ -477,3 +477,52 @@ impl ExecutionPlan for PlaceholderExec {
Ok(SortOrderPushdownResult::Unsupported)
}
}

#[cfg(test)]
mod tests {
use super::*;
use arrow_schema::{DataType, Field, Schema};
use datafusion::physical_plan::empty::EmptyExec;
use runtime_proto::ListUdfsArgs;
use runtime_proto::udtf_args::Args;

fn test_udtf_args() -> UdtfArgs {
UdtfArgs {
args: Some(Args::ListUdfs(ListUdfsArgs {})),
}
}

fn test_inner() -> Arc<dyn ExecutionPlan> {
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
Arc::new(EmptyExec::new(schema))
}

/// Regression test for issue #10951.
///
/// `check_default_invariants` (invoked by optimizer rules like `EnforceSorting`
/// when `UdtfExec` is composed under another plan, e.g. `rrf(text_search(...),
/// vector_search(...))`) asserts that `maintains_input_order`,
/// `required_input_ordering`, `required_input_distribution`, and
/// `benefits_from_input_partitioning` each return a Vec with one entry per
/// child. `UdtfExec` reports one child (the inner plan), so each of these
/// must return a single-element Vec.
#[test]
fn invariant_vec_lengths_match_children_count() {
let exec = UdtfExec::new(test_udtf_args(), test_inner());
let children_len = exec.children().len();
assert_eq!(children_len, 1);
assert_eq!(exec.maintains_input_order().len(), children_len);
assert_eq!(exec.required_input_ordering().len(), children_len);
assert_eq!(exec.required_input_distribution().len(), children_len);
assert_eq!(exec.benefits_from_input_partitioning().len(), children_len);
}

#[test]
fn check_default_invariants_passes() {
let exec = UdtfExec::new(test_udtf_args(), test_inner());
exec.check_invariants(InvariantLevel::Always)
.expect("default invariants should pass for UdtfExec");
exec.check_invariants(InvariantLevel::Executable)
.expect("default invariants should pass for UdtfExec");
}
}
Loading