Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ tracing-subscriber = { version = "0.3", features = ["env-filter"] }
num_cpus = { version = "1" }

[workspace.metadata.typos]
files.extend-exclude = ["CHANGELOG.md"]
files.extend-exclude = ["CHANGELOG.md", "crates/benchmarks/queries/tpcds/*.sql"]
default.extend-ignore-re = [
# Custom ignore regex patterns: https://github.com/crate-ci/typos/blob/master/docs/reference.md#example-configurations
"(?s)//\\s*spellchecker:ignore-next-line[^\\n]*\\n[^\\n]*",
Expand Down
10 changes: 9 additions & 1 deletion crates/core/src/delta_datafusion/cdf/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,15 @@ pub struct DeltaCdfTableProvider {
impl DeltaCdfTableProvider {
/// Build a DeltaCDFTableProvider
pub fn try_new(cdf_builder: CdfLoadBuilder) -> DeltaResult<Self> {
let mut fields = cdf_builder.snapshot.input_schema().fields().to_vec();
let mut fields = cdf_builder
.snapshot
.as_ref()
.ok_or(DeltaTableError::generic(
"expected initialized snapshot for DeltaCdfTableProvider",
))?
.input_schema()
.fields()
.to_vec();
for f in ADD_PARTITION_SCHEMA.clone() {
fields.push(f.into());
}
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/delta_datafusion/table_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -626,7 +626,7 @@ impl<'a> DeltaScanBuilder<'a> {
let mut pruned_batches = Vec::new();
let mut mask_offset = 0;

for batch in &self.snapshot.files {
for batch in self.snapshot.files()? {
let batch_size = batch.num_rows();
let batch_mask = &mask[mask_offset..mask_offset + batch_size];
let batch_mask_array = BooleanArray::from(batch_mask.to_vec());
Expand Down
51 changes: 48 additions & 3 deletions crates/core/src/kernel/snapshot/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,25 @@ impl Snapshot {
pub struct EagerSnapshot {
snapshot: Snapshot,
// logical files in the snapshot
pub(crate) files: Vec<RecordBatch>,
files: Vec<RecordBatch>,
}

pub(crate) async fn resolve_snapshot(
log_store: &dyn LogStore,
maybe_snapshot: Option<EagerSnapshot>,
require_files: bool,
) -> DeltaResult<EagerSnapshot> {
if let Some(snapshot) = maybe_snapshot {
if require_files {
snapshot.with_files(log_store).await
} else {
Ok(snapshot)
}
} else {
let mut config = DeltaTableConfig::default();
config.require_files = require_files;
EagerSnapshot::try_new(log_store, config, None).await
}
Comment on lines +469 to +484
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function along with the new methods on EagerSnapshot are the main pieces of new logic.

doing this again b/c force push.

}

impl EagerSnapshot {
Expand All @@ -474,15 +492,36 @@ impl EagerSnapshot {
version: Option<i64>,
) -> DeltaResult<Self> {
let snapshot = Snapshot::try_new(log_store, config.clone(), version).await?;
Self::try_new_with_snapshot(log_store, snapshot).await
}

let files = match config.require_files {
pub(crate) async fn try_new_with_snapshot(
log_store: &dyn LogStore,
snapshot: Snapshot,
) -> DeltaResult<Self> {
let files = match snapshot.load_config().require_files {
true => snapshot.files(log_store, None).try_collect().await?,
false => vec![],
};

Ok(Self { snapshot, files })
}

pub(crate) async fn with_files(mut self, log_store: &dyn LogStore) -> DeltaResult<Self> {
if self.snapshot.config.require_files {
return Ok(self);
}
self.snapshot.config.require_files = true;
Self::try_new_with_snapshot(log_store, self.snapshot).await
}

pub(crate) fn files(&self) -> DeltaResult<&[RecordBatch]> {
if self.snapshot.config.require_files {
Ok(&self.files)
} else {
Err(DeltaTableError::NotInitializedWithFiles("files".into()))
}
}

/// Update the snapshot to the given version
pub(crate) async fn update(
&mut self,
Expand Down Expand Up @@ -588,6 +627,12 @@ impl EagerSnapshot {
log_store: &dyn LogStore,
predicate: Option<PredicateRef>,
) -> BoxStream<'_, DeltaResult<LogicalFileView>> {
if !self.snapshot.load_config().require_files {
return Box::pin(once(ready(Err(DeltaTableError::NotInitializedWithFiles(
"file_views".into(),
)))));
}

self.snapshot
.files_from(
log_store,
Expand Down
20 changes: 12 additions & 8 deletions crates/core/src/operations/add_column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,17 @@ use itertools::Itertools;
use super::{CustomExecuteHandler, Operation};
use crate::kernel::schema::merge_delta_struct;
use crate::kernel::transaction::{CommitBuilder, CommitProperties};
use crate::kernel::{EagerSnapshot, MetadataExt, ProtocolExt as _, StructField, StructTypeExt};
use crate::kernel::{
resolve_snapshot, EagerSnapshot, MetadataExt, ProtocolExt as _, StructField, StructTypeExt,
};
use crate::logstore::LogStoreRef;
use crate::protocol::DeltaOperation;
use crate::{DeltaResult, DeltaTable, DeltaTableError};

/// Add new columns and/or nested fields to a table
pub struct AddColumnBuilder {
/// A snapshot of the table's state
snapshot: EagerSnapshot,
snapshot: Option<EagerSnapshot>,
/// Fields to add/merge into schema
fields: Option<Vec<StructField>>,
/// Delta object store for handling data files
Expand All @@ -27,7 +29,7 @@ pub struct AddColumnBuilder {
custom_execute_handler: Option<Arc<dyn CustomExecuteHandler>>,
}

impl Operation<()> for AddColumnBuilder {
impl Operation for AddColumnBuilder {
fn log_store(&self) -> &LogStoreRef {
&self.log_store
}
Expand All @@ -38,7 +40,7 @@ impl Operation<()> for AddColumnBuilder {

impl AddColumnBuilder {
/// Create a new builder
pub fn new(log_store: LogStoreRef, snapshot: EagerSnapshot) -> Self {
pub(crate) fn new(log_store: LogStoreRef, snapshot: Option<EagerSnapshot>) -> Self {
Self {
snapshot,
log_store,
Expand Down Expand Up @@ -75,7 +77,9 @@ impl std::future::IntoFuture for AddColumnBuilder {
let this = self;

Box::pin(async move {
let mut metadata = this.snapshot.metadata().clone();
let snapshot = resolve_snapshot(&this.log_store, this.snapshot.clone(), false).await?;

let mut metadata = snapshot.metadata().clone();
let fields = match this.fields.clone() {
Some(v) => v,
None => return Err(DeltaTableError::Generic("No fields provided".to_string())),
Expand All @@ -95,10 +99,10 @@ impl std::future::IntoFuture for AddColumnBuilder {
));
}

let table_schema = this.snapshot.schema();
let table_schema = snapshot.schema();
let new_table_schema = merge_delta_struct(table_schema.as_ref(), fields_right)?;

let current_protocol = this.snapshot.protocol();
let current_protocol = snapshot.protocol();

let new_protocol = current_protocol
.clone()
Expand All @@ -121,7 +125,7 @@ impl std::future::IntoFuture for AddColumnBuilder {
.with_actions(actions)
.with_operation_id(operation_id)
.with_post_commit_hook_handler(this.get_custom_execute_handler())
.build(Some(&this.snapshot), this.log_store.clone(), operation)
.build(Some(&snapshot), this.log_store.clone(), operation)
.await?;

this.post_execute(operation_id).await?;
Expand Down
14 changes: 8 additions & 6 deletions crates/core/src/operations/add_feature.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use itertools::Itertools;

use super::{CustomExecuteHandler, Operation};
use crate::kernel::transaction::{CommitBuilder, CommitProperties};
use crate::kernel::{EagerSnapshot, ProtocolExt as _, TableFeatures};
use crate::kernel::{resolve_snapshot, EagerSnapshot, ProtocolExt as _, TableFeatures};
use crate::logstore::LogStoreRef;
use crate::protocol::DeltaOperation;
use crate::DeltaTable;
Expand All @@ -17,7 +17,7 @@ use crate::{DeltaResult, DeltaTableError};
/// Enable table features for a table
pub struct AddTableFeatureBuilder {
/// A snapshot of the table's state
snapshot: EagerSnapshot,
snapshot: Option<EagerSnapshot>,
/// Name of the feature
name: Vec<TableFeatures>,
/// Allow protocol versions to be increased by setting features
Expand All @@ -29,7 +29,7 @@ pub struct AddTableFeatureBuilder {
custom_execute_handler: Option<Arc<dyn CustomExecuteHandler>>,
}

impl super::Operation<()> for AddTableFeatureBuilder {
impl super::Operation for AddTableFeatureBuilder {
fn log_store(&self) -> &LogStoreRef {
&self.log_store
}
Expand All @@ -40,7 +40,7 @@ impl super::Operation<()> for AddTableFeatureBuilder {

impl AddTableFeatureBuilder {
/// Create a new builder
pub fn new(log_store: LogStoreRef, snapshot: EagerSnapshot) -> Self {
pub(crate) fn new(log_store: LogStoreRef, snapshot: Option<EagerSnapshot>) -> Self {
Self {
name: vec![],
allow_protocol_versions_increase: false,
Expand Down Expand Up @@ -92,6 +92,8 @@ impl std::future::IntoFuture for AddTableFeatureBuilder {
let this = self;

Box::pin(async move {
let snapshot = resolve_snapshot(&this.log_store, this.snapshot.clone(), false).await?;

let name = if this.name.is_empty() {
return Err(DeltaTableError::Generic("No features provided".to_string()));
} else {
Expand All @@ -107,7 +109,7 @@ impl std::future::IntoFuture for AddTableFeatureBuilder {
let reader_features = reader_features.into_iter().flatten().collect_vec();
let writer_features = writer_features.into_iter().flatten().collect_vec();

let mut protocol = this.snapshot.protocol().clone();
let mut protocol = snapshot.protocol().clone();

if !this.allow_protocol_versions_increase {
if !reader_features.is_empty()
Expand Down Expand Up @@ -135,7 +137,7 @@ impl std::future::IntoFuture for AddTableFeatureBuilder {
.with_actions(actions)
.with_operation_id(operation_id)
.with_post_commit_hook_handler(this.get_custom_execute_handler())
.build(Some(&this.snapshot), this.log_store.clone(), operation)
.build(Some(&snapshot), this.log_store.clone(), operation)
.await?;

this.post_execute(operation_id).await?;
Expand Down
30 changes: 14 additions & 16 deletions crates/core/src/operations/constraints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ use super::{CustomExecuteHandler, Operation};
use crate::delta_datafusion::expr::fmt_expr_to_sql;
use crate::delta_datafusion::{create_session, register_store, DeltaDataChecker, DeltaScanBuilder};
use crate::kernel::transaction::{CommitBuilder, CommitProperties};
use crate::kernel::{EagerSnapshot, MetadataExt, ProtocolExt as _, ProtocolInner};
use crate::kernel::{
resolve_snapshot, EagerSnapshot, MetadataExt, ProtocolExt as _, ProtocolInner,
};
use crate::logstore::LogStoreRef;
use crate::operations::datafusion_utils::Expression;
use crate::protocol::DeltaOperation;
Expand All @@ -25,7 +27,7 @@ use crate::{DeltaResult, DeltaTable, DeltaTableError};
/// Build a constraint to add to a table
pub struct ConstraintBuilder {
/// A snapshot of the table's state
snapshot: EagerSnapshot,
snapshot: Option<EagerSnapshot>,
/// Name of the constraint
name: Option<String>,
/// Constraint expression
Expand All @@ -39,7 +41,7 @@ pub struct ConstraintBuilder {
custom_execute_handler: Option<Arc<dyn CustomExecuteHandler>>,
}

impl super::Operation<()> for ConstraintBuilder {
impl super::Operation for ConstraintBuilder {
fn log_store(&self) -> &LogStoreRef {
&self.log_store
}
Expand All @@ -50,7 +52,7 @@ impl super::Operation<()> for ConstraintBuilder {

impl ConstraintBuilder {
/// Create a new builder
pub fn new(log_store: LogStoreRef, snapshot: EagerSnapshot) -> Self {
pub(crate) fn new(log_store: LogStoreRef, snapshot: Option<EagerSnapshot>) -> Self {
Self {
name: None,
expr: None,
Expand Down Expand Up @@ -101,11 +103,8 @@ impl std::future::IntoFuture for ConstraintBuilder {
let this = self;

Box::pin(async move {
if !this.snapshot.load_config().require_files {
return Err(DeltaTableError::NotInitializedWithFiles(
"ADD CONSTRAINTS".into(),
));
}
let snapshot = resolve_snapshot(&this.log_store, this.snapshot.clone(), true).await?;

let operation_id = this.get_operation_id();
this.pre_execute(operation_id).await?;

Expand All @@ -118,7 +117,7 @@ impl std::future::IntoFuture for ConstraintBuilder {
.expr
.ok_or_else(|| DeltaTableError::Generic("No Expression provided".to_string()))?;

let mut metadata = this.snapshot.metadata().clone();
let mut metadata = snapshot.metadata().clone();
let configuration_key = format!("delta.constraints.{name}");

if metadata.configuration().contains_key(&configuration_key) {
Expand All @@ -132,10 +131,9 @@ impl std::future::IntoFuture for ConstraintBuilder {
.unwrap_or_else(|| Arc::new(create_session().into_inner().state()));
register_store(this.log_store.clone(), session.runtime_env().as_ref());

let scan =
DeltaScanBuilder::new(&this.snapshot, this.log_store.clone(), session.as_ref())
.build()
.await?;
let scan = DeltaScanBuilder::new(&snapshot, this.log_store.clone(), session.as_ref())
.build()
.await?;

let schema = scan.schema().to_dfschema()?;
let expr = into_expr(expr, &schema, session.as_ref())?;
Expand Down Expand Up @@ -175,7 +173,7 @@ impl std::future::IntoFuture for ConstraintBuilder {
metadata =
metadata.add_config_key(format!("delta.constraints.{name}"), expr_str.clone())?;

let old_protocol = this.snapshot.protocol();
let old_protocol = snapshot.protocol();
let protocol = ProtocolInner {
min_reader_version: if old_protocol.min_reader_version() > 1 {
old_protocol.min_reader_version()
Expand Down Expand Up @@ -213,7 +211,7 @@ impl std::future::IntoFuture for ConstraintBuilder {
.with_actions(actions)
.with_operation_id(operation_id)
.with_post_commit_hook_handler(this.custom_execute_handler.clone())
.build(Some(&this.snapshot), this.log_store.clone(), operation)
.build(Some(&snapshot), this.log_store.clone(), operation)
.await?;

if let Some(handler) = this.custom_execute_handler {
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/operations/convert_to_delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ impl Default for ConvertToDeltaBuilder {
}
}

impl super::Operation<()> for ConvertToDeltaBuilder {
impl super::Operation for ConvertToDeltaBuilder {
fn log_store(&self) -> &LogStoreRef {
self.log_store
.as_ref()
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/operations/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ pub struct CreateBuilder {
custom_execute_handler: Option<Arc<dyn CustomExecuteHandler>>,
}

impl super::Operation<()> for CreateBuilder {
impl super::Operation for CreateBuilder {
fn log_store(&self) -> &LogStoreRef {
self.log_store
.as_ref()
Expand Down
Loading
Loading