From c983ae21a74f18526350fbff4f8aef6c094fd9fe Mon Sep 17 00:00:00 2001 From: Robert Pack Date: Mon, 6 Oct 2025 13:43:10 +0200 Subject: [PATCH] refactor: shim kernel Scan and ScanBuilder Signed-off-by: Robert Pack --- crates/core/src/kernel/arrow/engine_ext.rs | 60 +------ crates/core/src/kernel/snapshot/mod.rs | 187 ++++++++------------ crates/core/src/kernel/snapshot/scan.rs | 192 +++++++++++++++++++++ crates/core/src/kernel/snapshot/stream.rs | 3 - 4 files changed, 270 insertions(+), 172 deletions(-) create mode 100644 crates/core/src/kernel/snapshot/scan.rs diff --git a/crates/core/src/kernel/arrow/engine_ext.rs b/crates/core/src/kernel/arrow/engine_ext.rs index 61498b9f1a..6614e5a8e4 100644 --- a/crates/core/src/kernel/arrow/engine_ext.rs +++ b/crates/core/src/kernel/arrow/engine_ext.rs @@ -12,17 +12,14 @@ use delta_kernel::arrow::record_batch::RecordBatch; use delta_kernel::engine::arrow_conversion::TryIntoArrow; use delta_kernel::engine::arrow_data::ArrowEngineData; use delta_kernel::expressions::{ColumnName, Scalar, StructData}; -use delta_kernel::scan::{Scan, ScanMetadata}; +use delta_kernel::scan::ScanMetadata; use delta_kernel::schema::{ ArrayType, DataType, MapType, PrimitiveType, Schema, SchemaRef, SchemaTransform, StructField, StructType, }; use delta_kernel::snapshot::Snapshot; use delta_kernel::table_properties::{DataSkippingNumIndexedCols, TableProperties}; -use delta_kernel::{ - DeltaResult, Engine, EngineData, ExpressionEvaluator, ExpressionRef, PredicateRef, Version, -}; -use itertools::Itertools; +use delta_kernel::{DeltaResult, ExpressionEvaluator, ExpressionRef}; use crate::errors::{DeltaResult as DeltaResultLocal, DeltaTableError}; use crate::kernel::SCAN_ROW_ARROW_SCHEMA; @@ -47,57 +44,6 @@ pub(crate) struct ScanMetadataArrow { pub scan_file_transforms: Vec>, } -/// Internal extension trait to streamline working with Kernel scan objects. -/// -/// THe trait mainly handles conversion between arrow `RecordBatch` and `ArrowEngineData`. -/// The exposed methods are arrow-variants of methods already exposed on the kernel scan. -pub(crate) trait ScanExt { - /// Get the metadata for a table scan. - /// - /// This method handles translation between `EngineData` and `RecordBatch` - /// and will already apply any selection vectors to the data. - /// See [`Scan::scan_metadata`] for details. - fn scan_metadata_arrow( - &self, - engine: &dyn Engine, - ) -> DeltaResult>>; - - fn scan_metadata_from_arrow( - &self, - engine: &dyn Engine, - existing_version: Version, - existing_data: Box>, - existing_predicate: Option, - ) -> DeltaResult>>; -} - -impl ScanExt for Scan { - fn scan_metadata_arrow( - &self, - engine: &dyn Engine, - ) -> DeltaResult>> { - Ok(self - .scan_metadata(engine)? - .map_ok(kernel_to_arrow) - .flatten()) - } - - fn scan_metadata_from_arrow( - &self, - engine: &dyn Engine, - existing_version: Version, - existing_data: Box>, - existing_predicate: Option, - ) -> DeltaResult>> { - let engine_iter = - existing_data.map(|batch| Box::new(ArrowEngineData::new(batch)) as Box); - Ok(self - .scan_metadata_from(engine, existing_version, engine_iter, existing_predicate)? - .map_ok(kernel_to_arrow) - .flatten()) - } -} - /// Internal extension traits to the Kernel Snapshot. /// /// These traits provide additional convenience functionality for working with Kernel snapshots. @@ -439,7 +385,7 @@ fn is_skipping_eligeble_datatype(data_type: &PrimitiveType) -> bool { ) } -fn kernel_to_arrow(metadata: ScanMetadata) -> DeltaResult { +pub(crate) fn kernel_to_arrow(metadata: ScanMetadata) -> DeltaResult { let scan_file_transforms = metadata .scan_file_transforms .into_iter() diff --git a/crates/core/src/kernel/snapshot/mod.rs b/crates/core/src/kernel/snapshot/mod.rs index bd571a1c83..f16b4a23ec 100644 --- a/crates/core/src/kernel/snapshot/mod.rs +++ b/crates/core/src/kernel/snapshot/mod.rs @@ -39,9 +39,8 @@ use object_store::ObjectStore; use tokio::task::spawn_blocking; use super::{Action, CommitInfo, Metadata, Protocol}; -use crate::kernel::arrow::engine_ext::{ExpressionEvaluatorExt, ScanExt}; -#[cfg(test)] -use crate::kernel::transaction::CommitData; +use crate::kernel::arrow::engine_ext::{kernel_to_arrow, ExpressionEvaluatorExt}; +use crate::kernel::snapshot::scan::ScanBuilder; use crate::kernel::{StructType, ARROW_HANDLER}; use crate::logstore::{LogStore, LogStoreExt}; use crate::{to_kernel_predicate, DeltaResult, DeltaTableConfig, DeltaTableError, PartitionFilter}; @@ -52,6 +51,7 @@ pub use stream::*; mod iterators; mod log_data; +mod scan; mod serde; mod stream; @@ -117,41 +117,12 @@ impl Snapshot { }) } - #[cfg(test)] - pub async fn new_test<'a>( - commits: impl IntoIterator, - ) -> DeltaResult<(Self, Arc)> { - use crate::logstore::{commit_uri_from_version, default_logstore}; - use object_store::memory::InMemory; - let store = Arc::new(InMemory::new()); - - for (idx, commit) in commits.into_iter().enumerate() { - let uri = commit_uri_from_version(idx as i64); - let data = commit.get_bytes()?; - store.put(&uri, data.into()).await?; - } - - let table_url = url::Url::parse("memory:///").unwrap(); - - let log_store = default_logstore( - store.clone(), - store.clone(), - &table_url, - &Default::default(), - ); - - let engine = log_store.engine(None); - let snapshot = KernelSnapshot::builder_for(table_url.clone()).build(engine.as_ref())?; - let schema = snapshot.table_configuration().schema(); + pub fn scan_builder(&self) -> ScanBuilder { + ScanBuilder::new(self.inner.clone()) + } - Ok(( - Self { - inner: snapshot, - config: Default::default(), - schema, - }, - log_store, - )) + pub fn into_scan_builder(self) -> ScanBuilder { + ScanBuilder::new(self.inner) } /// Update the snapshot to the given version @@ -249,34 +220,18 @@ impl Snapshot { log_store: &dyn LogStore, predicate: Option, ) -> SendableRBStream { - let scan = match self - .inner - .clone() - .scan_builder() - .with_predicate(predicate) - .build() - { + let scan = match self.scan_builder().with_predicate(predicate).build() { Ok(scan) => scan, - Err(err) => return Box::pin(once(ready(Err(DeltaTableError::KernelError(err))))), + Err(err) => return Box::pin(once(ready(Err(err)))), }; - // TODO: which capacity to choose? - let mut builder = RecordBatchReceiverStreamBuilder::new(100); - let snapshot = scan.snapshot().clone(); - // TODO: bundle operation id with log store ... let engine = log_store.engine(None); - let tx = builder.tx(); - builder.spawn_blocking(move || { - for res in scan.scan_metadata_arrow(engine.as_ref())? { - if tx.blocking_send(Ok(res?.scan_files)).is_err() { - break; - } - } - Ok(()) - }); + let stream = scan + .scan_metadata(engine) + .map(|d| Ok(kernel_to_arrow(d?)?.scan_files)); - ScanRowOutStream::new(snapshot, builder.build()).boxed() + ScanRowOutStream::new(self.inner.clone(), stream).boxed() } pub(crate) fn files_from + Send + 'static>( @@ -287,50 +242,17 @@ impl Snapshot { existing_data: Box, existing_predicate: Option, ) -> SendableRBStream { - let scan = match self - .inner - .clone() - .scan_builder() - .with_predicate(predicate) - .build() - { + let scan = match self.scan_builder().with_predicate(predicate).build() { Ok(scan) => scan, - Err(err) => return Box::pin(once(ready(Err(DeltaTableError::KernelError(err))))), - }; - - let snapshot = scan.snapshot().clone(); - - // process our stored / caed data to conform to the expected input for log replay - let evaluator = match scan_row_in_eval(&snapshot) { - Ok(scan_row_in_eval) => scan_row_in_eval, Err(err) => return Box::pin(once(ready(Err(err)))), }; - let scan_row_iter = existing_data.map(move |b| { - evaluator - .evaluate_arrow(b) - .expect("Illegal stored log data") - }); - // TODO: which capacity to choose? - let mut builder = RecordBatchReceiverStreamBuilder::new(100); - // TODO: bundle operation id with log store ... let engine = log_store.engine(None); - let tx = builder.tx(); - builder.spawn_blocking(move || { - for res in scan.scan_metadata_from_arrow( - engine.as_ref(), - existing_version, - Box::new(scan_row_iter), - existing_predicate, - )? { - if tx.blocking_send(Ok(res?.scan_files)).is_err() { - break; - } - } - Ok(()) - }); + let stream = scan + .scan_metadata_from(engine, existing_version, existing_data, existing_predicate) + .map(|d| Ok(kernel_to_arrow(d?)?.scan_files)); - ScanRowOutStream::new(snapshot, builder.build()).boxed() + ScanRowOutStream::new(self.inner.clone(), stream).boxed() } /// Get the commit infos in the snapshot @@ -446,8 +368,7 @@ impl Snapshot { builder.spawn_blocking(move || { for res in remove_data { - let batch: RecordBatch = - ArrowEngineData::try_from_engine_data(res?.actions)?.into(); + let batch = ArrowEngineData::try_from_engine_data(res?.actions)?.into(); if tx.blocking_send(Ok(batch)).is_err() { break; } @@ -532,18 +453,6 @@ impl EagerSnapshot { Ok(Self { snapshot, files }) } - #[cfg(test)] - pub async fn new_test<'a>( - commits: impl IntoIterator, - ) -> DeltaResult { - let (snapshot, log_store) = Snapshot::new_test(commits).await?; - let files: Vec<_> = snapshot - .files(log_store.as_ref(), None) - .try_collect() - .await?; - Ok(Self { snapshot, files }) - } - /// Update the snapshot to the given version pub(crate) async fn update( &mut self, @@ -726,7 +635,61 @@ mod tests { // use super::log_segment::tests::{concurrent_checkpoint}; // use super::replay::tests::test_log_replay; use super::*; - use crate::test_utils::{assert_batches_sorted_eq, TestResult, TestTables}; + use crate::{ + kernel::transaction::CommitData, + test_utils::{assert_batches_sorted_eq, TestResult, TestTables}, + }; + + impl Snapshot { + pub async fn new_test<'a>( + commits: impl IntoIterator, + ) -> DeltaResult<(Self, Arc)> { + use crate::logstore::{commit_uri_from_version, default_logstore}; + use object_store::memory::InMemory; + let store = Arc::new(InMemory::new()); + + for (idx, commit) in commits.into_iter().enumerate() { + let uri = commit_uri_from_version(idx as i64); + let data = commit.get_bytes()?; + store.put(&uri, data.into()).await?; + } + + let table_url = url::Url::parse("memory:///").unwrap(); + + let log_store = default_logstore( + store.clone(), + store.clone(), + &table_url, + &Default::default(), + ); + + let engine = log_store.engine(None); + let snapshot = KernelSnapshot::builder_for(table_url.clone()).build(engine.as_ref())?; + let schema = snapshot.table_configuration().schema(); + + Ok(( + Self { + inner: snapshot, + config: Default::default(), + schema, + }, + log_store, + )) + } + } + + impl EagerSnapshot { + pub async fn new_test<'a>( + commits: impl IntoIterator, + ) -> DeltaResult { + let (snapshot, log_store) = Snapshot::new_test(commits).await?; + let files: Vec<_> = snapshot + .files(log_store.as_ref(), None) + .try_collect() + .await?; + Ok(Self { snapshot, files }) + } + } #[tokio::test] async fn test_snapshots() -> TestResult { diff --git a/crates/core/src/kernel/snapshot/scan.rs b/crates/core/src/kernel/snapshot/scan.rs new file mode 100644 index 0000000000..82cad33190 --- /dev/null +++ b/crates/core/src/kernel/snapshot/scan.rs @@ -0,0 +1,192 @@ +use std::pin::Pin; +use std::sync::Arc; + +use arrow_array::RecordBatch; +use delta_kernel::engine::arrow_data::ArrowEngineData; +use delta_kernel::scan::{Scan as KernelScan, ScanBuilder as KernelScanBuilder, ScanMetadata}; +use delta_kernel::schema::SchemaRef; +use delta_kernel::snapshot::Snapshot as KernelSnapshot; +use delta_kernel::{Engine, EngineData, PredicateRef, SnapshotRef, Version}; +use futures::future::ready; +use futures::stream::once; +use futures::Stream; +use url::Url; + +use crate::kernel::{scan_row_in_eval, ReceiverStreamBuilder}; +use crate::DeltaResult; + +pub type SendableScanMetadataStream = Pin> + Send>>; + +/// Builder to scan a snapshot of a table. +#[derive(Debug)] +pub struct ScanBuilder { + inner: KernelScanBuilder, +} + +impl ScanBuilder { + /// Create a new [`ScanBuilder`] instance. + pub fn new(snapshot: impl Into>) -> Self { + Self { + inner: KernelScanBuilder::new(snapshot.into()), + } + } + + /// Provide [`Schema`] for columns to select from the [`Snapshot`]. + /// + /// A table with columns `[a, b, c]` could have a scan which reads only the first + /// two columns by using the schema `[a, b]`. + /// + /// [`Schema`]: crate::schema::Schema + /// [`Snapshot`]: crate::snapshot::Snapshot + pub fn with_schema(mut self, schema: SchemaRef) -> Self { + self.inner = self.inner.with_schema(schema); + self + } + + /// Optionally provide a [`SchemaRef`] for columns to select from the [`Snapshot`]. See + /// [`ScanBuilder::with_schema`] for details. If `schema_opt` is `None` this is a no-op. + /// + /// [`Snapshot`]: crate::Snapshot + pub fn with_schema_opt(mut self, schema_opt: Option) -> Self { + self.inner = self.inner.with_schema_opt(schema_opt); + self + } + + /// Optionally provide an expression to filter rows. For example, using the predicate `x < + /// 4` to return a subset of the rows in the scan which satisfy the filter. If `predicate_opt` + /// is `None`, this is a no-op. + /// + /// NOTE: The filtering is best-effort and can produce false positives (rows that should should + /// have been filtered out but were kept). + pub fn with_predicate(mut self, predicate: impl Into>) -> Self { + self.inner = self.inner.with_predicate(predicate); + self + } + + pub fn build(self) -> DeltaResult { + Ok(Scan::from(self.inner.build()?)) + } +} + +#[derive(Debug)] +pub struct Scan { + inner: Arc, +} + +impl From for Scan { + fn from(inner: KernelScan) -> Self { + Self { + inner: Arc::new(inner), + } + } +} + +impl From> for Scan { + fn from(inner: Arc) -> Self { + Self { inner } + } +} + +impl Scan { + /// The table's root URL. Any relative paths returned from `scan_data` (or in a callback from + /// [`ScanMetadata::visit_scan_files`]) must be resolved against this root to get the actual path to + /// the file. + /// + /// [`ScanMetadata::visit_scan_files`]: crate::scan::ScanMetadata::visit_scan_files + // NOTE: this is obviously included in the snapshot, just re-exposed here for convenience. + pub fn table_root(&self) -> &Url { + self.inner.table_root() + } + + /// Get a shared reference to the [`Snapshot`] of this scan. + /// + /// [`Snapshot`]: crate::Snapshot + pub fn snapshot(&self) -> &SnapshotRef { + self.inner.snapshot() + } + + /// Get a shared reference to the logical [`Schema`] of the scan (i.e. the output schema of the + /// scan). Note that the logical schema can differ from the physical schema due to e.g. + /// partition columns which are present in the logical schema but not in the physical schema. + /// + /// [`Schema`]: crate::schema::Schema + pub fn logical_schema(&self) -> &SchemaRef { + self.inner.logical_schema() + } + + /// Get a shared reference to the physical [`Schema`] of the scan. This represents the schema + /// of the underlying data files which must be read from storage. + /// + /// [`Schema`]: crate::schema::Schema + pub fn physical_schema(&self) -> &SchemaRef { + self.inner.physical_schema() + } + + /// Get the predicate [`PredicateRef`] of the scan. + pub fn physical_predicate(&self) -> Option { + self.inner.physical_predicate() + } + + pub fn scan_metadata(&self, engine: Arc) -> SendableScanMetadataStream { + // TODO: which capacity to choose? + let mut builder = ReceiverStreamBuilder::::new(100); + let tx = builder.tx(); + + let inner = self.inner.clone(); + let blocking_iter = move || { + for res in inner.scan_metadata(engine.as_ref())? { + if tx.blocking_send(Ok(res?)).is_err() { + break; + } + } + Ok(()) + }; + + builder.spawn_blocking(blocking_iter); + builder.build() + } + + pub fn scan_metadata_from + Send + 'static>( + &self, + engine: Arc, + existing_version: Version, + existing_data: Box, + existing_predicate: Option, + ) -> SendableScanMetadataStream { + let inner = self.inner.clone(); + let snapshot = self.inner.snapshot().clone(); + + // process our stored / cached data to conform to the expected input for log replay + let evaluator = match scan_row_in_eval(&snapshot) { + Ok(scan_row_in_eval) => scan_row_in_eval, + Err(err) => return Box::pin(once(ready(Err(err)))), + }; + let scan_row_iter = existing_data + .map(|batch| Box::new(ArrowEngineData::new(batch)) as Box) + .map(move |b| { + evaluator + .evaluate(b.as_ref()) + .expect("malformed cached log data") + }); + + // TODO: which capacity to choose? + let mut builder = ReceiverStreamBuilder::::new(100); + let tx = builder.tx(); + let scan_inner = move || { + for res in inner.scan_metadata_from( + engine.as_ref(), + existing_version, + Box::new(scan_row_iter), + existing_predicate, + )? { + if tx.blocking_send(Ok(res?)).is_err() { + break; + } + } + Ok(()) + }; + + builder.spawn_blocking(scan_inner); + builder.build() + } +} diff --git a/crates/core/src/kernel/snapshot/stream.rs b/crates/core/src/kernel/snapshot/stream.rs index c350c6ac14..8e04300c52 100644 --- a/crates/core/src/kernel/snapshot/stream.rs +++ b/crates/core/src/kernel/snapshot/stream.rs @@ -9,7 +9,6 @@ use tokio::sync::mpsc::{Receiver, Sender}; use tokio::task::JoinSet; use crate::errors::DeltaResult; -use crate::kernel::Add; use crate::DeltaTableError; /// Trait for types that stream [RecordBatch] @@ -53,8 +52,6 @@ pub type SendableRecordBatchStream = Pin>; pub type SendableRBStream = Pin> + Send>>; -pub type SendableAddStream = Pin> + Send>>; - /// Creates a stream from a collection of producing tasks, routing panics to the stream. /// /// Note that this is similar to [`ReceiverStream` from tokio-stream], with the differences being: