Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 3 additions & 57 deletions crates/core/src/kernel/arrow/engine_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,14 @@ use delta_kernel::arrow::record_batch::RecordBatch;
use delta_kernel::engine::arrow_conversion::TryIntoArrow;
use delta_kernel::engine::arrow_data::ArrowEngineData;
use delta_kernel::expressions::{ColumnName, Scalar, StructData};
use delta_kernel::scan::{Scan, ScanMetadata};
use delta_kernel::scan::ScanMetadata;
use delta_kernel::schema::{
ArrayType, DataType, MapType, PrimitiveType, Schema, SchemaRef, SchemaTransform, StructField,
StructType,
};
use delta_kernel::snapshot::Snapshot;
use delta_kernel::table_properties::{DataSkippingNumIndexedCols, TableProperties};
use delta_kernel::{
DeltaResult, Engine, EngineData, ExpressionEvaluator, ExpressionRef, PredicateRef, Version,
};
use itertools::Itertools;
use delta_kernel::{DeltaResult, ExpressionEvaluator, ExpressionRef};

use crate::errors::{DeltaResult as DeltaResultLocal, DeltaTableError};
use crate::kernel::SCAN_ROW_ARROW_SCHEMA;
Expand All @@ -47,57 +44,6 @@ pub(crate) struct ScanMetadataArrow {
pub scan_file_transforms: Vec<Option<ExpressionRef>>,
}

/// Internal extension trait to streamline working with Kernel scan objects.
///
/// THe trait mainly handles conversion between arrow `RecordBatch` and `ArrowEngineData`.
/// The exposed methods are arrow-variants of methods already exposed on the kernel scan.
pub(crate) trait ScanExt {
/// Get the metadata for a table scan.
///
/// This method handles translation between `EngineData` and `RecordBatch`
/// and will already apply any selection vectors to the data.
/// See [`Scan::scan_metadata`] for details.
fn scan_metadata_arrow(
&self,
engine: &dyn Engine,
) -> DeltaResult<impl Iterator<Item = DeltaResult<ScanMetadataArrow>>>;

fn scan_metadata_from_arrow(
&self,
engine: &dyn Engine,
existing_version: Version,
existing_data: Box<dyn Iterator<Item = RecordBatch>>,
existing_predicate: Option<PredicateRef>,
) -> DeltaResult<impl Iterator<Item = DeltaResult<ScanMetadataArrow>>>;
}

impl ScanExt for Scan {
fn scan_metadata_arrow(
&self,
engine: &dyn Engine,
) -> DeltaResult<impl Iterator<Item = DeltaResult<ScanMetadataArrow>>> {
Ok(self
.scan_metadata(engine)?
.map_ok(kernel_to_arrow)
.flatten())
}

fn scan_metadata_from_arrow(
&self,
engine: &dyn Engine,
existing_version: Version,
existing_data: Box<dyn Iterator<Item = RecordBatch>>,
existing_predicate: Option<PredicateRef>,
) -> DeltaResult<impl Iterator<Item = DeltaResult<ScanMetadataArrow>>> {
let engine_iter =
existing_data.map(|batch| Box::new(ArrowEngineData::new(batch)) as Box<dyn EngineData>);
Ok(self
.scan_metadata_from(engine, existing_version, engine_iter, existing_predicate)?
.map_ok(kernel_to_arrow)
.flatten())
}
}

/// Internal extension traits to the Kernel Snapshot.
///
/// These traits provide additional convenience functionality for working with Kernel snapshots.
Expand Down Expand Up @@ -439,7 +385,7 @@ fn is_skipping_eligeble_datatype(data_type: &PrimitiveType) -> bool {
)
}

fn kernel_to_arrow(metadata: ScanMetadata) -> DeltaResult<ScanMetadataArrow> {
pub(crate) fn kernel_to_arrow(metadata: ScanMetadata) -> DeltaResult<ScanMetadataArrow> {
let scan_file_transforms = metadata
.scan_file_transforms
.into_iter()
Expand Down
187 changes: 75 additions & 112 deletions crates/core/src/kernel/snapshot/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,8 @@ use object_store::ObjectStore;
use tokio::task::spawn_blocking;

use super::{Action, CommitInfo, Metadata, Protocol};
use crate::kernel::arrow::engine_ext::{ExpressionEvaluatorExt, ScanExt};
#[cfg(test)]
use crate::kernel::transaction::CommitData;
use crate::kernel::arrow::engine_ext::{kernel_to_arrow, ExpressionEvaluatorExt};
use crate::kernel::snapshot::scan::ScanBuilder;
use crate::kernel::{StructType, ARROW_HANDLER};
use crate::logstore::{LogStore, LogStoreExt};
use crate::{to_kernel_predicate, DeltaResult, DeltaTableConfig, DeltaTableError, PartitionFilter};
Expand All @@ -52,6 +51,7 @@ pub use stream::*;

mod iterators;
mod log_data;
mod scan;
mod serde;
mod stream;

Expand Down Expand Up @@ -117,41 +117,12 @@ impl Snapshot {
})
}

#[cfg(test)]
pub async fn new_test<'a>(
commits: impl IntoIterator<Item = &'a CommitData>,
) -> DeltaResult<(Self, Arc<dyn LogStore>)> {
use crate::logstore::{commit_uri_from_version, default_logstore};
use object_store::memory::InMemory;
let store = Arc::new(InMemory::new());

for (idx, commit) in commits.into_iter().enumerate() {
let uri = commit_uri_from_version(idx as i64);
let data = commit.get_bytes()?;
store.put(&uri, data.into()).await?;
}

let table_url = url::Url::parse("memory:///").unwrap();

let log_store = default_logstore(
store.clone(),
store.clone(),
&table_url,
&Default::default(),
);

let engine = log_store.engine(None);
let snapshot = KernelSnapshot::builder_for(table_url.clone()).build(engine.as_ref())?;
let schema = snapshot.table_configuration().schema();
pub fn scan_builder(&self) -> ScanBuilder {
ScanBuilder::new(self.inner.clone())
}

Ok((
Self {
inner: snapshot,
config: Default::default(),
schema,
},
log_store,
))
pub fn into_scan_builder(self) -> ScanBuilder {
ScanBuilder::new(self.inner)
}

/// Update the snapshot to the given version
Expand Down Expand Up @@ -249,34 +220,18 @@ impl Snapshot {
log_store: &dyn LogStore,
predicate: Option<PredicateRef>,
) -> SendableRBStream {
let scan = match self
.inner
.clone()
.scan_builder()
.with_predicate(predicate)
.build()
{
let scan = match self.scan_builder().with_predicate(predicate).build() {
Ok(scan) => scan,
Err(err) => return Box::pin(once(ready(Err(DeltaTableError::KernelError(err))))),
Err(err) => return Box::pin(once(ready(Err(err)))),
};

// TODO: which capacity to choose?
let mut builder = RecordBatchReceiverStreamBuilder::new(100);
let snapshot = scan.snapshot().clone();

// TODO: bundle operation id with log store ...
let engine = log_store.engine(None);
let tx = builder.tx();
builder.spawn_blocking(move || {
for res in scan.scan_metadata_arrow(engine.as_ref())? {
if tx.blocking_send(Ok(res?.scan_files)).is_err() {
break;
}
}
Ok(())
});
let stream = scan
.scan_metadata(engine)
.map(|d| Ok(kernel_to_arrow(d?)?.scan_files));

ScanRowOutStream::new(snapshot, builder.build()).boxed()
ScanRowOutStream::new(self.inner.clone(), stream).boxed()
}

pub(crate) fn files_from<T: Iterator<Item = RecordBatch> + Send + 'static>(
Expand All @@ -287,50 +242,17 @@ impl Snapshot {
existing_data: Box<T>,
existing_predicate: Option<PredicateRef>,
) -> SendableRBStream {
let scan = match self
.inner
.clone()
.scan_builder()
.with_predicate(predicate)
.build()
{
let scan = match self.scan_builder().with_predicate(predicate).build() {
Ok(scan) => scan,
Err(err) => return Box::pin(once(ready(Err(DeltaTableError::KernelError(err))))),
};

let snapshot = scan.snapshot().clone();

// process our stored / caed data to conform to the expected input for log replay
let evaluator = match scan_row_in_eval(&snapshot) {
Ok(scan_row_in_eval) => scan_row_in_eval,
Err(err) => return Box::pin(once(ready(Err(err)))),
};
let scan_row_iter = existing_data.map(move |b| {
evaluator
.evaluate_arrow(b)
.expect("Illegal stored log data")
});

// TODO: which capacity to choose?
let mut builder = RecordBatchReceiverStreamBuilder::new(100);
// TODO: bundle operation id with log store ...
let engine = log_store.engine(None);
let tx = builder.tx();
builder.spawn_blocking(move || {
for res in scan.scan_metadata_from_arrow(
engine.as_ref(),
existing_version,
Box::new(scan_row_iter),
existing_predicate,
)? {
if tx.blocking_send(Ok(res?.scan_files)).is_err() {
break;
}
}
Ok(())
});
let stream = scan
.scan_metadata_from(engine, existing_version, existing_data, existing_predicate)
.map(|d| Ok(kernel_to_arrow(d?)?.scan_files));

ScanRowOutStream::new(snapshot, builder.build()).boxed()
ScanRowOutStream::new(self.inner.clone(), stream).boxed()
}

/// Get the commit infos in the snapshot
Expand Down Expand Up @@ -446,8 +368,7 @@ impl Snapshot {

builder.spawn_blocking(move || {
for res in remove_data {
let batch: RecordBatch =
ArrowEngineData::try_from_engine_data(res?.actions)?.into();
let batch = ArrowEngineData::try_from_engine_data(res?.actions)?.into();
if tx.blocking_send(Ok(batch)).is_err() {
break;
}
Expand Down Expand Up @@ -532,18 +453,6 @@ impl EagerSnapshot {
Ok(Self { snapshot, files })
}

#[cfg(test)]
pub async fn new_test<'a>(
commits: impl IntoIterator<Item = &'a CommitData>,
) -> DeltaResult<Self> {
let (snapshot, log_store) = Snapshot::new_test(commits).await?;
let files: Vec<_> = snapshot
.files(log_store.as_ref(), None)
.try_collect()
.await?;
Ok(Self { snapshot, files })
}

/// Update the snapshot to the given version
pub(crate) async fn update(
&mut self,
Expand Down Expand Up @@ -726,7 +635,61 @@ mod tests {
// use super::log_segment::tests::{concurrent_checkpoint};
// use super::replay::tests::test_log_replay;
use super::*;
use crate::test_utils::{assert_batches_sorted_eq, TestResult, TestTables};
use crate::{
kernel::transaction::CommitData,
test_utils::{assert_batches_sorted_eq, TestResult, TestTables},
};

impl Snapshot {
pub async fn new_test<'a>(
commits: impl IntoIterator<Item = &'a CommitData>,
) -> DeltaResult<(Self, Arc<dyn LogStore>)> {
use crate::logstore::{commit_uri_from_version, default_logstore};
use object_store::memory::InMemory;
let store = Arc::new(InMemory::new());

for (idx, commit) in commits.into_iter().enumerate() {
let uri = commit_uri_from_version(idx as i64);
let data = commit.get_bytes()?;
store.put(&uri, data.into()).await?;
}

let table_url = url::Url::parse("memory:///").unwrap();

let log_store = default_logstore(
store.clone(),
store.clone(),
&table_url,
&Default::default(),
);

let engine = log_store.engine(None);
let snapshot = KernelSnapshot::builder_for(table_url.clone()).build(engine.as_ref())?;
let schema = snapshot.table_configuration().schema();

Ok((
Self {
inner: snapshot,
config: Default::default(),
schema,
},
log_store,
))
}
}

impl EagerSnapshot {
pub async fn new_test<'a>(
commits: impl IntoIterator<Item = &'a CommitData>,
) -> DeltaResult<Self> {
let (snapshot, log_store) = Snapshot::new_test(commits).await?;
let files: Vec<_> = snapshot
.files(log_store.as_ref(), None)
.try_collect()
.await?;
Ok(Self { snapshot, files })
}
}

#[tokio::test]
async fn test_snapshots() -> TestResult {
Expand Down
Loading
Loading