Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/aws/src/logstore/dynamodb_logstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ impl S3DynamoDbLogStore {
),
}
}
unreachable!("for loop yields Ok or Err in body when retyr = MAX_REPAIR_RETRIES")
unreachable!("for loop yields Ok or Err in body when retry = MAX_REPAIR_RETRIES")
}

fn map_retry_result(
Expand Down
2 changes: 1 addition & 1 deletion crates/benchmarks/src/bin/merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ async fn benchmark_merge_tpcds(
let table = DeltaTableBuilder::from_uri(table_url)?.load().await?;

let provider = DeltaTableProvider::try_new(
table.snapshot()?.clone(),
table.snapshot()?.snapshot().clone(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😆

table.log_store(),
DeltaScanConfig {
file_column_name: Some("file_path".to_string()),
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/delta_datafusion/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -478,7 +478,7 @@ macro_rules! format_option {
}};
}

/// Epoch days from ce calander until 1970-01-01
/// Epoch days from ce calendar until 1970-01-01
pub const EPOCH_DAYS_FROM_CE: i32 = 719_163;

struct ScalarValueFormat<'a> {
Expand Down
97 changes: 58 additions & 39 deletions crates/core/src/delta_datafusion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ pub(crate) fn register_store(store: LogStoreRef, env: Arc<RuntimeEnv>) {
/// columns must appear at the end of the schema. This is to align with how partition are handled
/// at the physical level
pub(crate) fn df_logical_schema(
snapshot: &DeltaTableState,
snapshot: &EagerSnapshot,
file_column_name: &Option<String>,
schema: Option<ArrowSchemaRef>,
) -> DeltaResult<SchemaRef> {
Expand Down Expand Up @@ -576,13 +576,13 @@ impl DeltaDataChecker {
}

/// Create a new DeltaDataChecker
pub fn new(snapshot: &DeltaTableState) -> Self {
pub fn new(snapshot: &EagerSnapshot) -> Self {
let invariants = snapshot.schema().get_invariants().unwrap_or_default();
let generated_columns = snapshot
.schema()
.get_generated_columns()
.unwrap_or_default();
let constraints = snapshot.table_config().get_constraints();
let constraints = snapshot.table_properties().get_constraints();
let non_nullable_columns = snapshot
.schema()
.fields()
Expand Down Expand Up @@ -937,16 +937,17 @@ fn join_batches_with_add_actions(

/// Determine which files contain a record that satisfies the predicate
pub(crate) async fn find_files_scan(
snapshot: &DeltaTableState,
snapshot: &EagerSnapshot,
log_store: LogStoreRef,
state: &SessionState,
expression: Expr,
) -> DeltaResult<Vec<Add>> {
let candidate_map: HashMap<String, Add> = snapshot
.file_actions_iter(&log_store)
.map_ok(|add| (add.path.clone(), add.to_owned()))
.try_collect()
.await?;
.log_data()
.iter()
.map(|f| f.add_action())
.map(|add| (add.path.clone(), add.to_owned()))
.collect();

let scan_config = DeltaScanConfigBuilder {
include_file_column: true,
Expand Down Expand Up @@ -997,10 +998,14 @@ pub(crate) async fn find_files_scan(

pub(crate) async fn scan_memory_table(
log_store: &dyn LogStore,
snapshot: &DeltaTableState,
snapshot: &EagerSnapshot,
predicate: &Expr,
) -> DeltaResult<Vec<Add>> {
let actions = snapshot.file_actions(log_store).await?;
let actions = snapshot
.log_data()
.iter()
.map(|f| f.add_action())
.collect_vec();

let batch = snapshot.add_actions_table(true)?;
let mut arrays = Vec::new();
Expand Down Expand Up @@ -1052,7 +1057,7 @@ pub(crate) async fn scan_memory_table(

/// Finds files in a snapshot that match the provided predicate.
pub async fn find_files(
snapshot: &DeltaTableState,
snapshot: &EagerSnapshot,
log_store: LogStoreRef,
state: &SessionState,
predicate: Option<Expr>,
Expand Down Expand Up @@ -1088,7 +1093,7 @@ pub async fn find_files(
}
}
None => Ok(FindFiles {
candidates: snapshot.file_actions(&log_store).await?,
candidates: snapshot.log_data().iter().map(|f| f.add_action()).collect(),
partition_scan: true,
}),
}
Expand Down Expand Up @@ -1192,7 +1197,7 @@ impl From<DeltaColumn> for Column {
}
}

/// Create a column, resuing the existing datafusion column
/// Create a column, reusing the existing datafusion column
impl From<Column> for DeltaColumn {
fn from(c: Column) -> Self {
DeltaColumn { inner: c }
Expand Down Expand Up @@ -1517,13 +1522,16 @@ mod tests {
let table = crate::open_table(table_url).await.unwrap();
let config = DeltaScanConfigBuilder::new()
.with_file_column_name(&"file_source")
.build(table.snapshot().unwrap())
.build(table.snapshot().unwrap().snapshot())
.unwrap();

let log_store = table.log_store();
let provider =
DeltaTableProvider::try_new(table.snapshot().unwrap().clone(), log_store, config)
.unwrap();
let provider = DeltaTableProvider::try_new(
table.snapshot().unwrap().snapshot().clone(),
log_store,
config,
)
.unwrap();
let ctx = SessionContext::new();
ctx.register_table("test", Arc::new(provider)).unwrap();

Expand Down Expand Up @@ -1581,13 +1589,16 @@ mod tests {
.unwrap();

let config = DeltaScanConfigBuilder::new()
.build(table.snapshot().unwrap())
.build(table.snapshot().unwrap().snapshot())
.unwrap();

let log_store = table.log_store();
let provider =
DeltaTableProvider::try_new(table.snapshot().unwrap().clone(), log_store, config)
.unwrap();
let provider = DeltaTableProvider::try_new(
table.snapshot().unwrap().snapshot().clone(),
log_store,
config,
)
.unwrap();
let logical_schema = provider.schema();
let ctx = SessionContext::new();
ctx.register_table("test", Arc::new(provider)).unwrap();
Expand Down Expand Up @@ -1645,12 +1656,13 @@ mod tests {
.unwrap();

let config = DeltaScanConfigBuilder::new()
.build(table.snapshot().unwrap())
.build(table.snapshot().unwrap().snapshot())
.unwrap();
let log = table.log_store();

let provider =
DeltaTableProvider::try_new(table.snapshot().unwrap().clone(), log, config).unwrap();
DeltaTableProvider::try_new(table.snapshot().unwrap().snapshot().clone(), log, config)
.unwrap();
let ctx: SessionContext = DeltaSessionContext::default().into();
ctx.register_table("test", Arc::new(provider)).unwrap();

Expand Down Expand Up @@ -1737,12 +1749,13 @@ mod tests {
.unwrap();

let config = DeltaScanConfigBuilder::new()
.build(table.snapshot().unwrap())
.build(table.snapshot().unwrap().snapshot())
.unwrap();
let log = table.log_store();

let provider =
DeltaTableProvider::try_new(table.snapshot().unwrap().clone(), log, config).unwrap();
DeltaTableProvider::try_new(table.snapshot().unwrap().snapshot().clone(), log, config)
.unwrap();
let ctx: SessionContext = DeltaSessionContext::default().into();
ctx.register_table("test", Arc::new(provider)).unwrap();

Expand Down Expand Up @@ -1793,12 +1806,13 @@ mod tests {
.unwrap();

let config = DeltaScanConfigBuilder::new()
.build(table.snapshot().unwrap())
.build(table.snapshot().unwrap().snapshot())
.unwrap();
let log = table.log_store();

let provider =
DeltaTableProvider::try_new(table.snapshot().unwrap().clone(), log, config).unwrap();
DeltaTableProvider::try_new(table.snapshot().unwrap().snapshot().clone(), log, config)
.unwrap();

let mut cfg = SessionConfig::default();
cfg.options_mut().execution.parquet.pushdown_filters = true;
Expand Down Expand Up @@ -1889,12 +1903,13 @@ mod tests {
.unwrap();

let config = DeltaScanConfigBuilder::new()
.build(table.snapshot().unwrap())
.build(table.snapshot().unwrap().snapshot())
.unwrap();
let log = table.log_store();

let provider =
DeltaTableProvider::try_new(table.snapshot().unwrap().clone(), log, config).unwrap();
DeltaTableProvider::try_new(table.snapshot().unwrap().snapshot().clone(), log, config)
.unwrap();
let ctx: SessionContext = DeltaSessionContext::default().into();
ctx.register_table("test", Arc::new(provider)).unwrap();

Expand Down Expand Up @@ -1972,11 +1987,15 @@ mod tests {

let ctx = SessionContext::new();
let state = ctx.state();
let scan = DeltaScanBuilder::new(table.snapshot().unwrap(), table.log_store(), &state)
.with_filter(Some(col("a").eq(lit("s"))))
.build()
.await
.unwrap();
let scan = DeltaScanBuilder::new(
table.snapshot().unwrap().snapshot(),
table.log_store(),
&state,
)
.with_filter(Some(col("a").eq(lit("s"))))
.build()
.await
.unwrap();

let mut visitor = ParquetVisitor::default();
visit_execution_plan(&scan, &mut visitor).unwrap();
Expand All @@ -1997,12 +2016,12 @@ mod tests {
let snapshot = table.snapshot().unwrap();
let ctx = SessionContext::new();
let state = ctx.state();
let scan = DeltaScanBuilder::new(snapshot, table.log_store(), &state)
let scan = DeltaScanBuilder::new(snapshot.snapshot(), table.log_store(), &state)
.with_filter(Some(col("a").eq(lit("s"))))
.with_scan_config(
DeltaScanConfigBuilder::new()
.with_parquet_pushdown(false)
.build(snapshot)
.build(snapshot.snapshot())
.unwrap(),
)
.build()
Expand Down Expand Up @@ -2032,7 +2051,7 @@ mod tests {
let ctx = SessionContext::new_with_config(config);
let state = ctx.state();

let scan = DeltaScanBuilder::new(snapshot, table.log_store(), &state)
let scan = DeltaScanBuilder::new(snapshot.snapshot(), table.log_store(), &state)
.build()
.await
.unwrap();
Expand Down Expand Up @@ -2151,7 +2170,7 @@ mod tests {
.unwrap();

let config = DeltaScanConfigBuilder::new()
.build(table.snapshot().unwrap())
.build(table.snapshot().unwrap().snapshot())
.unwrap();

let (object_store, mut operations) =
Expand All @@ -2164,7 +2183,7 @@ mod tests {
table.log_store().config().clone(),
);
let provider = DeltaTableProvider::try_new(
table.snapshot().unwrap().clone(),
table.snapshot().unwrap().snapshot().clone(),
Arc::new(log_store),
config,
)
Expand Down
11 changes: 4 additions & 7 deletions crates/core/src/delta_datafusion/schema_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::sync::Arc;
use crate::kernel::schema::cast_record_batch;
use arrow_array::RecordBatch;
use arrow_schema::{Schema, SchemaRef};
use datafusion::common::{not_impl_err, ColumnStatistics};
use datafusion::common::{not_impl_err, ColumnStatistics, Result};
use datafusion::datasource::schema_adapter::{SchemaAdapter, SchemaAdapterFactory, SchemaMapper};

/// A Schema Adapter Factory which provides casting record batches from parquet to meet
Expand Down Expand Up @@ -39,10 +39,7 @@ impl SchemaAdapter for DeltaSchemaAdapter {
Some(file_schema.fields.find(field.name())?.0)
}

fn map_schema(
&self,
file_schema: &Schema,
) -> datafusion::common::Result<(Arc<dyn SchemaMapper>, Vec<usize>)> {
fn map_schema(&self, file_schema: &Schema) -> Result<(Arc<dyn SchemaMapper>, Vec<usize>)> {
let mut projection = Vec::with_capacity(file_schema.fields().len());

for (file_idx, file_field) in file_schema.fields.iter().enumerate() {
Expand Down Expand Up @@ -71,15 +68,15 @@ pub(crate) struct SchemaMapping {
}

impl SchemaMapper for SchemaMapping {
fn map_batch(&self, batch: RecordBatch) -> datafusion::common::Result<RecordBatch> {
fn map_batch(&self, batch: RecordBatch) -> Result<RecordBatch> {
let record_batch = cast_record_batch(&batch, self.projected_schema.clone(), false, true)?;
Ok(record_batch)
}

fn map_column_statistics(
&self,
_file_col_statistics: &[ColumnStatistics],
) -> datafusion::common::Result<Vec<ColumnStatistics>> {
) -> Result<Vec<ColumnStatistics>> {
not_impl_err!("Mapping column statistics is not implemented for DeltaSchemaAdapter")
}
}
Loading
Loading