Skip to content

Commit b935783

Browse files
Fix spiceai#10951: UdtfExec invariant Vec lengths must match children count (spiceai#10953)
* udtf: fix UdtfExec invariant vec lengths to match children count `UdtfExec::children()` reports one child (the inner plan), but `maintains_input_order()`, `required_input_ordering()`, `required_input_distribution()`, and `benefits_from_input_partitioning()` all returned empty Vecs, violating the DataFusion ExecutionPlan contract that these vectors must have one entry per child. When `UdtfExec` got composed under another plan (e.g. `rrf(text_search(...), vector_search(...))` without an explicit `join_key`), an optimizer pass invoked `check_default_invariants` and the query failed with: Internal error: Assertion failed: actual_len == children_len (left: 0, right: 1): UdtfExec::maintains_input_order returned Vec with incorrect size: 0 != 1. Since `UdtfExec::execute()` delegates straight to the inner plan, the correct values are: `maintains_input_order = [true]`, `benefits_from_input_partitioning = [false]`, `required_input_ordering = [None]`, `required_input_distribution = [UnspecifiedDistribution]`. Add a regression test that builds a UdtfExec and asserts both the per-method Vec lengths and `check_invariants` itself. Fixes spiceai#10951 * cayenne: invalidate scan_file_statistics cache after position-based delete The position-based delete path updates the per-file deletion bitmap in `cached_deleted_row_ids` but did not invalidate the per-file `scan_file_statistics` cache that `CayenneTableProvider::list_files_for_snapshot_scan` populates from `infer_stats`. Because `infer_stats` applies the `VortexAccessPlanProvider::adjust_statistics` hook at the time it runs, the cached entry froze the row count as of the *previous* delete. The next `COUNT(*)` (or any other stats-driven query) hit the cache and returned the stale count — even though the deletion bitmap itself was up to date. Also keep the `cayenne::metastore::sqlite` docblock backtick fix (`SQLite`) that the trunk lint failure pointed out — clippy::doc_markdown was failing on that line under `clippy::pedantic`. `PkKeysetInvalidatingDeletionSink` already wraps every `delete_using_deletion_vectors` sink, so dropping the cache there covers every position-based deletion path without touching the position-based sink internals. PK-strategy callers also flow through this sink — clearing for them is harmless because they don't populate the access-plan stats path.
1 parent d1ec339 commit b935783

3 files changed

Lines changed: 70 additions & 5 deletions

File tree

crates/cayenne/src/metastore/sqlite.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ async fn configure_sqlite_connection(
8585
///
8686
/// Pool size is `min(available_parallelism, 32)` (minimum 2). If
8787
/// `available_parallelism()` fails (rare — e.g. seccomp-restricted
88-
/// environments), K falls back to 4. SQLite WAL mode allows many
88+
/// environments), K falls back to 4. `SQLite` WAL mode allows many
8989
/// concurrent readers per database file (read-only operations don't take
9090
/// the WAL write lock), so a larger pool lifts the read-side concurrency
9191
/// ceiling for metadata-heavy workloads — e.g. 64-core deployments running

crates/cayenne/src/provider/table.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1485,6 +1485,12 @@ impl DeletionSink for PkKeysetInvalidatingDeletionSink {
14851485
let deleted = self.inner.delete_from().await?;
14861486
if deleted > 0 {
14871487
self.table.clear_cached_pk_keyset();
1488+
// Drop the per-file stats `CayenneTableProvider::collect_scan_file_statistics`
1489+
// caches. Without this, a follow-up `COUNT(*)` (or any other stats-driven
1490+
// query) is served the row count we computed *before* this delete added
1491+
// its rows to the position-based deletion vector, so the count is stale —
1492+
// see `tests/position_based_deletion_test.rs::test_position_based_sequential_deletes`.
1493+
self.table.invalidate_scan_file_statistics();
14881494
}
14891495
Ok(deleted)
14901496
}
@@ -6178,6 +6184,16 @@ impl CayenneTableProvider {
61786184
self.refresh_listing_table_under_held_fence()
61796185
}
61806186

6187+
/// Drop every entry in [`Self::scan_file_statistics`].
6188+
///
6189+
/// Calls must follow any operation that adds, removes, or updates
6190+
/// position-based deletion vectors so the next stats-driven query (e.g.
6191+
/// `COUNT(*)`) reinvokes `infer_stats`, which in turn reapplies the
6192+
/// `VortexAccessPlanProvider` and observes the fresh deletion bitmap.
6193+
pub(crate) fn invalidate_scan_file_statistics(&self) {
6194+
self.scan_file_statistics.clear();
6195+
}
6196+
61816197
/// Refresh the listing table, ASSUMING the caller already holds
61826198
/// [`Self::listing_fence`] for write.
61836199
///

crates/runtime/src/execution_plan/udtf_exec.rs

Lines changed: 53 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -168,19 +168,19 @@ impl ExecutionPlan for UdtfExec {
168168
}
169169

170170
fn required_input_distribution(&self) -> Vec<Distribution> {
171-
vec![]
171+
vec![Distribution::UnspecifiedDistribution]
172172
}
173173

174174
fn required_input_ordering(&self) -> Vec<Option<OrderingRequirements>> {
175-
vec![]
175+
vec![None]
176176
}
177177

178178
fn maintains_input_order(&self) -> Vec<bool> {
179-
vec![]
179+
vec![true]
180180
}
181181

182182
fn benefits_from_input_partitioning(&self) -> Vec<bool> {
183-
vec![]
183+
vec![false]
184184
}
185185

186186
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
@@ -477,3 +477,52 @@ impl ExecutionPlan for PlaceholderExec {
477477
Ok(SortOrderPushdownResult::Unsupported)
478478
}
479479
}
480+
481+
#[cfg(test)]
482+
mod tests {
483+
use super::*;
484+
use arrow_schema::{DataType, Field, Schema};
485+
use datafusion::physical_plan::empty::EmptyExec;
486+
use runtime_proto::ListUdfsArgs;
487+
use runtime_proto::udtf_args::Args;
488+
489+
fn test_udtf_args() -> UdtfArgs {
490+
UdtfArgs {
491+
args: Some(Args::ListUdfs(ListUdfsArgs {})),
492+
}
493+
}
494+
495+
fn test_inner() -> Arc<dyn ExecutionPlan> {
496+
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
497+
Arc::new(EmptyExec::new(schema))
498+
}
499+
500+
/// Regression test for issue #10951.
501+
///
502+
/// `check_default_invariants` (invoked by optimizer rules like `EnforceSorting`
503+
/// when `UdtfExec` is composed under another plan, e.g. `rrf(text_search(...),
504+
/// vector_search(...))`) asserts that `maintains_input_order`,
505+
/// `required_input_ordering`, `required_input_distribution`, and
506+
/// `benefits_from_input_partitioning` each return a Vec with one entry per
507+
/// child. `UdtfExec` reports one child (the inner plan), so each of these
508+
/// must return a single-element Vec.
509+
#[test]
510+
fn invariant_vec_lengths_match_children_count() {
511+
let exec = UdtfExec::new(test_udtf_args(), test_inner());
512+
let children_len = exec.children().len();
513+
assert_eq!(children_len, 1);
514+
assert_eq!(exec.maintains_input_order().len(), children_len);
515+
assert_eq!(exec.required_input_ordering().len(), children_len);
516+
assert_eq!(exec.required_input_distribution().len(), children_len);
517+
assert_eq!(exec.benefits_from_input_partitioning().len(), children_len);
518+
}
519+
520+
#[test]
521+
fn check_default_invariants_passes() {
522+
let exec = UdtfExec::new(test_udtf_args(), test_inner());
523+
exec.check_invariants(InvariantLevel::Always)
524+
.expect("default invariants should pass for UdtfExec");
525+
exec.check_invariants(InvariantLevel::Executable)
526+
.expect("default invariants should pass for UdtfExec");
527+
}
528+
}

0 commit comments

Comments
 (0)