Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 1 addition & 53 deletions crates/core/src/kernel/arrow/extract.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,7 @@

use std::sync::Arc;

use arrow_array::{
Array, ArrowNativeTypeOp, ArrowNumericType, BooleanArray, ListArray, MapArray, PrimitiveArray,
RecordBatch, StringArray, StructArray,
};
use arrow_array::{Array, ListArray, MapArray, RecordBatch, StructArray};
use arrow_schema::{ArrowError, DataType};

use crate::{DeltaResult, DeltaTableError};
Expand Down Expand Up @@ -74,17 +71,6 @@ pub(crate) fn extract_column<'a>(
extract_column(maparr.entries(), next_path, remaining_path_steps)
} else {
Ok(child)
// if maparr.entries().num_columns() != 2 {
// return Err(ArrowError::SchemaError(format!(
// "Map {path_step} has {} columns, expected 2",
// maparr.entries().num_columns()
// )));
// }
// if next_path_step == *maparr.entries().column_names().first().unwrap() {
// Ok(maparr.entries().column(0))
// } else {
// Ok(maparr.entries().column(1))
// }
}
}
DataType::List(_) => {
Expand Down Expand Up @@ -122,41 +108,3 @@ fn cast_column_as<'a, T: Array + 'static>(
"{name} is not of expected type."
)))
}

#[inline]
pub(crate) fn read_str(arr: &StringArray, idx: usize) -> DeltaResult<&str> {
read_str_opt(arr, idx).ok_or(DeltaTableError::Generic("missing value".into()))
}

#[inline]
pub(crate) fn read_str_opt(arr: &StringArray, idx: usize) -> Option<&str> {
arr.is_valid(idx).then(|| arr.value(idx))
}

#[inline]
pub(crate) fn read_primitive<T>(arr: &PrimitiveArray<T>, idx: usize) -> DeltaResult<T::Native>
where
T: ArrowNumericType,
T::Native: ArrowNativeTypeOp,
{
read_primitive_opt(arr, idx).ok_or(DeltaTableError::Generic("missing value".into()))
}

#[inline]
pub(crate) fn read_primitive_opt<T>(arr: &PrimitiveArray<T>, idx: usize) -> Option<T::Native>
where
T: ArrowNumericType,
T::Native: ArrowNativeTypeOp,
{
arr.is_valid(idx).then(|| arr.value(idx))
}

#[inline]
pub(crate) fn read_bool(arr: &BooleanArray, idx: usize) -> DeltaResult<bool> {
read_bool_opt(arr, idx).ok_or(DeltaTableError::Generic("missing value".into()))
}

#[inline]
pub(crate) fn read_bool_opt(arr: &BooleanArray, idx: usize) -> Option<bool> {
arr.is_valid(idx).then(|| arr.value(idx))
}
47 changes: 0 additions & 47 deletions crates/core/src/kernel/models/fields.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,26 +3,6 @@ use std::sync::LazyLock;

use delta_kernel::schema::{ArrayType, DataType, MapType, StructField, StructType};

use super::ActionType;

impl ActionType {
/// Returns the type of the corresponding field in the delta log schema
pub(crate) fn schema_field(&self) -> &StructField {
match self {
Self::Metadata => &METADATA_FIELD,
Self::Protocol => &PROTOCOL_FIELD,
Self::CommitInfo => &COMMIT_INFO_FIELD,
Self::Add => &ADD_FIELD,
Self::Remove => &REMOVE_FIELD,
Self::Cdc => &CDC_FIELD,
Self::Txn => &TXN_FIELD,
Self::DomainMetadata => &DOMAIN_METADATA_FIELD,
Self::CheckpointMetadata => &CHECKPOINT_METADATA_FIELD,
Self::Sidecar => &SIDECAR_FIELD,
}
}
}

// https://github.com/delta-io/delta/blob/master/PROTOCOL.md#change-metadata
static METADATA_FIELD: LazyLock<StructField> = LazyLock::new(|| {
StructField::new(
Expand Down Expand Up @@ -210,33 +190,6 @@ static DOMAIN_METADATA_FIELD: LazyLock<StructField> = LazyLock::new(|| {
true,
)
});
// https://github.com/delta-io/delta/blob/master/PROTOCOL.md#checkpoint-metadata
static CHECKPOINT_METADATA_FIELD: LazyLock<StructField> = LazyLock::new(|| {
StructField::new(
"checkpointMetadata",
StructType::try_new(vec![
StructField::new("flavor", DataType::STRING, false),
tags_field(),
])
.expect("Failed to construct StructType for CHECKPOINT_METADATA_FIELD"),
true,
)
});
// https://github.com/delta-io/delta/blob/master/PROTOCOL.md#sidecar-file-information
static SIDECAR_FIELD: LazyLock<StructField> = LazyLock::new(|| {
StructField::new(
"sidecar",
StructType::try_new(vec![
StructField::new("path", DataType::STRING, false),
StructField::new("sizeInBytes", DataType::LONG, true),
StructField::new("modificationTime", DataType::LONG, false),
StructField::new("type", DataType::STRING, false),
tags_field(),
])
.expect("Failed to construct StructType for SIDECAR_FIELD"),
true,
)
});

#[allow(unused)]
static LOG_SCHEMA: LazyLock<StructType> = LazyLock::new(|| {
Expand Down
41 changes: 0 additions & 41 deletions crates/core/src/kernel/models/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,31 +12,6 @@ pub(crate) mod fields;

pub use actions::*;

#[derive(Debug, Hash, PartialEq, Eq, Clone, Serialize, Deserialize)]
/// The type of action that was performed on the table
pub enum ActionType {
/// modify the data in a table by adding individual logical files
Add,
/// add a file containing only the data that was changed as part of the transaction
Cdc,
/// additional provenance information about what higher-level operation was being performed
CommitInfo,
/// contains a configuration (string-string map) for a named metadata domain
DomainMetadata,
/// changes the current metadata of the table
Metadata,
/// increase the version of the Delta protocol that is required to read or write a given table
Protocol,
/// modify the data in a table by removing individual logical files
Remove,
/// Transactional information
Txn,
/// Checkpoint metadata
CheckpointMetadata,
/// Sidecar
Sidecar,
}

#[derive(Debug, PartialEq, Eq, Clone, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
#[allow(missing_docs)]
Expand Down Expand Up @@ -109,19 +84,3 @@ impl From<DomainMetadata> for Action {
Self::DomainMetadata(a)
}
}

impl Action {
/// Get the action type
pub fn action_type(&self) -> ActionType {
match self {
Self::Add(_) => ActionType::Add,
Self::Remove(_) => ActionType::Remove,
Self::Cdc(_) => ActionType::Cdc,
Self::Metadata(_) => ActionType::Metadata,
Self::Protocol(_) => ActionType::Protocol,
Self::Txn(_) => ActionType::Txn,
Self::CommitInfo(_) => ActionType::CommitInfo,
Self::DomainMetadata(_) => ActionType::DomainMetadata,
}
}
}
4 changes: 4 additions & 0 deletions crates/core/src/kernel/snapshot/iterators.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ use crate::kernel::scalars::ScalarExt;
use crate::kernel::{Add, DeletionVectorDescriptor, Remove};
use crate::{DeltaResult, DeltaTableError};

pub use self::tombstones::TombstoneView;

mod tombstones;

const FIELD_NAME_PATH: &str = "path";
const FIELD_NAME_SIZE: &str = "size";
const FIELD_NAME_MODIFICATION_TIME: &str = "modificationTime";
Expand Down
58 changes: 58 additions & 0 deletions crates/core/src/kernel/snapshot/iterators/tombstones.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
use std::{borrow::Cow, sync::LazyLock};

use arrow::{
array::{AsArray, RecordBatch},
datatypes::Int64Type,
};
use delta_kernel::{actions::Remove, schema::ToSchema};
use percent_encoding::percent_decode_str;

use crate::kernel::snapshot::iterators::get_string_value;

#[derive(Clone)]
pub struct TombstoneView {
data: RecordBatch,
index: usize,
}

impl TombstoneView {
/// Creates a new view into the specified file entry.
pub(crate) fn new(data: RecordBatch, index: usize) -> Self {
Self { data, index }
}

/// Returns the file path with URL decoding applied.
pub fn path(&self) -> Cow<'_, str> {
static FIELD_INDEX: LazyLock<usize> =
LazyLock::new(|| Remove::to_schema().field_with_index("path").unwrap().0);
let raw = get_string_value(self.data.column(*FIELD_INDEX), self.index)
.expect("valid string field");
percent_decode_str(raw).decode_utf8_lossy()
}

pub fn deletion_timestamp(&self) -> Option<i64> {
static FIELD_INDEX: LazyLock<usize> = LazyLock::new(|| {
Remove::to_schema()
.field_with_index("deletionTimestamp")
.unwrap()
.0
});
self.data
.column(*FIELD_INDEX)
.as_primitive_opt::<Int64Type>()
.map(|a| a.value(self.index))
}

pub fn data_change(&self) -> bool {
static FIELD_INDEX: LazyLock<usize> = LazyLock::new(|| {
Remove::to_schema()
.field_with_index("dataChange")
.unwrap()
.0
});
self.data
.column(*FIELD_INDEX)
.as_boolean()
.value(self.index)
}
}
13 changes: 0 additions & 13 deletions crates/core/src/kernel/snapshot/log_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,6 @@ pub(crate) trait PartitionsExt {
fn hive_partition_path(&self) -> String;
}

impl PartitionsExt for IndexMap<&str, Scalar> {
fn hive_partition_path(&self) -> String {
let fields = self
.iter()
.map(|(k, v)| {
let encoded = v.serialize_encoded();
format!("{k}={encoded}")
})
.collect::<Vec<_>>();
fields.join("/")
}
}

impl PartitionsExt for IndexMap<String, Scalar> {
fn hive_partition_path(&self) -> String {
let fields = self
Expand Down
46 changes: 32 additions & 14 deletions crates/core/src/kernel/snapshot/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,30 +18,31 @@
use std::io::{BufRead, BufReader, Cursor};
use std::sync::{Arc, LazyLock};

use arrow::compute::concat_batches;
use arrow::compute::{concat_batches, filter_record_batch, is_not_null};
use arrow_array::RecordBatch;
use delta_kernel::actions::{Remove, Sidecar};
use delta_kernel::engine::arrow_conversion::TryIntoArrow;
use delta_kernel::engine::arrow_data::ArrowEngineData;
use delta_kernel::path::{LogPathFileType, ParsedLogPath};
use delta_kernel::scan::scan_row_schema;
use delta_kernel::schema::SchemaRef;
use delta_kernel::schema::derive_macro_utils::ToDataType;
use delta_kernel::schema::{SchemaRef, StructField, ToSchema};
use delta_kernel::snapshot::Snapshot as KernelSnapshot;
use delta_kernel::table_configuration::TableConfiguration;
use delta_kernel::table_properties::TableProperties;
use delta_kernel::{PredicateRef, Version};
use delta_kernel::{EvaluationHandler, Expression, ExpressionEvaluator, PredicateRef, Version};
use futures::stream::BoxStream;
use futures::{StreamExt, TryStreamExt};
use itertools::Itertools;
use object_store::path::Path;
use object_store::ObjectStore;
use tokio::task::spawn_blocking;

use super::{Action, CommitInfo, Metadata, Protocol, Remove};
use crate::kernel::arrow::engine_ext::{ScanExt, SnapshotExt};
use crate::kernel::parse::read_removes;
use super::{Action, CommitInfo, Metadata, Protocol};
use crate::kernel::arrow::engine_ext::{ExpressionEvaluatorExt, ScanExt, SnapshotExt};
#[cfg(test)]
use crate::kernel::transaction::CommitData;
use crate::kernel::{ActionType, StructType};
use crate::kernel::{StructType, ARROW_HANDLER};
use crate::logstore::{LogStore, LogStoreExt};
use crate::{to_kernel_predicate, DeltaResult, DeltaTableConfig, DeltaTableError, PartitionFilter};

Expand All @@ -51,7 +52,6 @@ pub use stream::*;

mod iterators;
mod log_data;
pub(crate) mod parse;
pub(crate) mod replay;
mod serde;
mod stream;
Expand Down Expand Up @@ -355,16 +355,29 @@ impl Snapshot {
pub(crate) fn tombstones(
&self,
log_store: &dyn LogStore,
) -> BoxStream<'_, DeltaResult<Remove>> {
) -> BoxStream<'_, DeltaResult<TombstoneView>> {
static TOMBSTONE_SCHEMA: LazyLock<Arc<StructType>> = LazyLock::new(|| {
Arc::new(
StructType::try_new(vec![
ActionType::Remove.schema_field().clone(),
ActionType::Sidecar.schema_field().clone(),
StructField::nullable("remove", Remove::to_data_type()),
StructField::nullable("sidecar", Sidecar::to_data_type()),
])
.expect("Failed to create a StructType somehow"),
)
});
static TOMBSTONE_EVALUATOR: LazyLock<Arc<dyn ExpressionEvaluator>> = LazyLock::new(|| {
let expression = Expression::struct_from(
Remove::to_schema()
.fields()
.map(|field| Expression::column(["remove", field.name()])),
)
.into();
ARROW_HANDLER.new_expression_evaluator(
TOMBSTONE_SCHEMA.clone(),
expression,
Remove::to_data_type(),
)
});

// TODO: which capacity to choose
let mut builder = RecordBatchReceiverStreamBuilder::new(100);
Expand Down Expand Up @@ -400,10 +413,15 @@ impl Snapshot {

builder
.build()
.map(|maybe_batch| maybe_batch.and_then(|batch| read_removes(&batch)))
.map_ok(|removes| {
futures::stream::iter(removes.into_iter().map(Ok::<_, DeltaTableError>))
.map(|maybe_batch| {
maybe_batch.and_then(|batch| {
let filtered = filter_record_batch(&batch, &is_not_null(batch.column(0))?)?;
let tombstones = TOMBSTONE_EVALUATOR.evaluate_arrow(filtered)?;
Ok((0..tombstones.num_rows())
.map(move |idx| TombstoneView::new(tombstones.clone(), idx)))
})
})
.map_ok(|removes| futures::stream::iter(removes.map(Ok::<_, DeltaTableError>)))
.try_flatten()
.boxed()
}
Expand Down
Loading
Loading