Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/aws/src/logstore/dynamodb_logstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ impl S3DynamoDbLogStore {
),
}
}
unreachable!("for loop yields Ok or Err in body when retyr = MAX_REPAIR_RETRIES")
unreachable!("for loop yields Ok or Err in body when retry = MAX_REPAIR_RETRIES")
}

fn map_retry_result(
Expand Down
2 changes: 1 addition & 1 deletion crates/benchmarks/src/bin/merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ async fn benchmark_merge_tpcds(
let table = DeltaTableBuilder::from_uri(table_url)?.load().await?;

let provider = DeltaTableProvider::try_new(
table.snapshot()?.clone(),
table.snapshot()?.snapshot().clone(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😆

table.log_store(),
DeltaScanConfig {
file_column_name: Some("file_path".to_string()),
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/delta_datafusion/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -478,7 +478,7 @@ macro_rules! format_option {
}};
}

/// Epoch days from ce calander until 1970-01-01
/// Epoch days from ce calendar until 1970-01-01
pub const EPOCH_DAYS_FROM_CE: i32 = 719_163;

struct ScalarValueFormat<'a> {
Expand Down
97 changes: 58 additions & 39 deletions crates/core/src/delta_datafusion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ pub(crate) fn register_store(store: LogStoreRef, env: Arc<RuntimeEnv>) {
/// columns must appear at the end of the schema. This is to align with how partition are handled
/// at the physical level
pub(crate) fn df_logical_schema(
snapshot: &DeltaTableState,
snapshot: &EagerSnapshot,
file_column_name: &Option<String>,
schema: Option<ArrowSchemaRef>,
) -> DeltaResult<SchemaRef> {
Expand Down Expand Up @@ -576,13 +576,13 @@ impl DeltaDataChecker {
}

/// Create a new DeltaDataChecker
pub fn new(snapshot: &DeltaTableState) -> Self {
pub fn new(snapshot: &EagerSnapshot) -> Self {
let invariants = snapshot.schema().get_invariants().unwrap_or_default();
let generated_columns = snapshot
.schema()
.get_generated_columns()
.unwrap_or_default();
let constraints = snapshot.table_config().get_constraints();
let constraints = snapshot.table_properties().get_constraints();
let non_nullable_columns = snapshot
.schema()
.fields()
Expand Down Expand Up @@ -937,16 +937,17 @@ fn join_batches_with_add_actions(

/// Determine which files contain a record that satisfies the predicate
pub(crate) async fn find_files_scan(
snapshot: &DeltaTableState,
snapshot: &EagerSnapshot,
log_store: LogStoreRef,
state: &SessionState,
expression: Expr,
) -> DeltaResult<Vec<Add>> {
let candidate_map: HashMap<String, Add> = snapshot
.file_actions_iter(&log_store)
.map_ok(|add| (add.path.clone(), add.to_owned()))
.try_collect()
.await?;
.log_data()
.iter()
.map(|f| f.add_action())
.map(|add| (add.path.clone(), add.to_owned()))
.collect();

let scan_config = DeltaScanConfigBuilder {
include_file_column: true,
Expand Down Expand Up @@ -997,10 +998,14 @@ pub(crate) async fn find_files_scan(

pub(crate) async fn scan_memory_table(
log_store: &dyn LogStore,
snapshot: &DeltaTableState,
snapshot: &EagerSnapshot,
predicate: &Expr,
) -> DeltaResult<Vec<Add>> {
let actions = snapshot.file_actions(log_store).await?;
let actions = snapshot
.log_data()
.iter()
.map(|f| f.add_action())
.collect_vec();

let batch = snapshot.add_actions_table(true)?;
let mut arrays = Vec::new();
Expand Down Expand Up @@ -1052,7 +1057,7 @@ pub(crate) async fn scan_memory_table(

/// Finds files in a snapshot that match the provided predicate.
pub async fn find_files(
snapshot: &DeltaTableState,
snapshot: &EagerSnapshot,
log_store: LogStoreRef,
state: &SessionState,
predicate: Option<Expr>,
Expand Down Expand Up @@ -1088,7 +1093,7 @@ pub async fn find_files(
}
}
None => Ok(FindFiles {
candidates: snapshot.file_actions(&log_store).await?,
candidates: snapshot.log_data().iter().map(|f| f.add_action()).collect(),
partition_scan: true,
}),
}
Expand Down Expand Up @@ -1192,7 +1197,7 @@ impl From<DeltaColumn> for Column {
}
}

/// Create a column, resuing the existing datafusion column
/// Create a column, reusing the existing datafusion column
impl From<Column> for DeltaColumn {
fn from(c: Column) -> Self {
DeltaColumn { inner: c }
Expand Down Expand Up @@ -1517,13 +1522,16 @@ mod tests {
let table = crate::open_table(table_url).await.unwrap();
let config = DeltaScanConfigBuilder::new()
.with_file_column_name(&"file_source")
.build(table.snapshot().unwrap())
.build(table.snapshot().unwrap().snapshot())
.unwrap();

let log_store = table.log_store();
let provider =
DeltaTableProvider::try_new(table.snapshot().unwrap().clone(), log_store, config)
.unwrap();
let provider = DeltaTableProvider::try_new(
table.snapshot().unwrap().snapshot.clone(),
log_store,
config,
)
.unwrap();
let ctx = SessionContext::new();
ctx.register_table("test", Arc::new(provider)).unwrap();

Expand Down Expand Up @@ -1581,13 +1589,16 @@ mod tests {
.unwrap();

let config = DeltaScanConfigBuilder::new()
.build(table.snapshot().unwrap())
.build(table.snapshot().unwrap().snapshot())
.unwrap();

let log_store = table.log_store();
let provider =
DeltaTableProvider::try_new(table.snapshot().unwrap().clone(), log_store, config)
.unwrap();
let provider = DeltaTableProvider::try_new(
table.snapshot().unwrap().snapshot.clone(),
log_store,
config,
)
.unwrap();
let logical_schema = provider.schema();
let ctx = SessionContext::new();
ctx.register_table("test", Arc::new(provider)).unwrap();
Expand Down Expand Up @@ -1645,12 +1656,13 @@ mod tests {
.unwrap();

let config = DeltaScanConfigBuilder::new()
.build(table.snapshot().unwrap())
.build(table.snapshot().unwrap().snapshot())
.unwrap();
let log = table.log_store();

let provider =
DeltaTableProvider::try_new(table.snapshot().unwrap().clone(), log, config).unwrap();
DeltaTableProvider::try_new(table.snapshot().unwrap().snapshot.clone(), log, config)
.unwrap();
let ctx: SessionContext = DeltaSessionContext::default().into();
ctx.register_table("test", Arc::new(provider)).unwrap();

Expand Down Expand Up @@ -1737,12 +1749,13 @@ mod tests {
.unwrap();

let config = DeltaScanConfigBuilder::new()
.build(table.snapshot().unwrap())
.build(table.snapshot().unwrap().snapshot())
.unwrap();
let log = table.log_store();

let provider =
DeltaTableProvider::try_new(table.snapshot().unwrap().clone(), log, config).unwrap();
DeltaTableProvider::try_new(table.snapshot().unwrap().snapshot.clone(), log, config)
.unwrap();
let ctx: SessionContext = DeltaSessionContext::default().into();
ctx.register_table("test", Arc::new(provider)).unwrap();

Expand Down Expand Up @@ -1793,12 +1806,13 @@ mod tests {
.unwrap();

let config = DeltaScanConfigBuilder::new()
.build(table.snapshot().unwrap())
.build(table.snapshot().unwrap().snapshot())
.unwrap();
let log = table.log_store();

let provider =
DeltaTableProvider::try_new(table.snapshot().unwrap().clone(), log, config).unwrap();
DeltaTableProvider::try_new(table.snapshot().unwrap().snapshot.clone(), log, config)
.unwrap();

let mut cfg = SessionConfig::default();
cfg.options_mut().execution.parquet.pushdown_filters = true;
Expand Down Expand Up @@ -1889,12 +1903,13 @@ mod tests {
.unwrap();

let config = DeltaScanConfigBuilder::new()
.build(table.snapshot().unwrap())
.build(table.snapshot().unwrap().snapshot())
.unwrap();
let log = table.log_store();

let provider =
DeltaTableProvider::try_new(table.snapshot().unwrap().clone(), log, config).unwrap();
DeltaTableProvider::try_new(table.snapshot().unwrap().snapshot.clone(), log, config)
.unwrap();
let ctx: SessionContext = DeltaSessionContext::default().into();
ctx.register_table("test", Arc::new(provider)).unwrap();

Expand Down Expand Up @@ -1972,11 +1987,15 @@ mod tests {

let ctx = SessionContext::new();
let state = ctx.state();
let scan = DeltaScanBuilder::new(table.snapshot().unwrap(), table.log_store(), &state)
.with_filter(Some(col("a").eq(lit("s"))))
.build()
.await
.unwrap();
let scan = DeltaScanBuilder::new(
table.snapshot().unwrap().snapshot(),
table.log_store(),
&state,
)
.with_filter(Some(col("a").eq(lit("s"))))
.build()
.await
.unwrap();

let mut visitor = ParquetVisitor::default();
visit_execution_plan(&scan, &mut visitor).unwrap();
Expand All @@ -1997,12 +2016,12 @@ mod tests {
let snapshot = table.snapshot().unwrap();
let ctx = SessionContext::new();
let state = ctx.state();
let scan = DeltaScanBuilder::new(snapshot, table.log_store(), &state)
let scan = DeltaScanBuilder::new(snapshot.snapshot(), table.log_store(), &state)
.with_filter(Some(col("a").eq(lit("s"))))
.with_scan_config(
DeltaScanConfigBuilder::new()
.with_parquet_pushdown(false)
.build(snapshot)
.build(snapshot.snapshot())
.unwrap(),
)
.build()
Expand Down Expand Up @@ -2032,7 +2051,7 @@ mod tests {
let ctx = SessionContext::new_with_config(config);
let state = ctx.state();

let scan = DeltaScanBuilder::new(snapshot, table.log_store(), &state)
let scan = DeltaScanBuilder::new(snapshot.snapshot(), table.log_store(), &state)
.build()
.await
.unwrap();
Expand Down Expand Up @@ -2151,7 +2170,7 @@ mod tests {
.unwrap();

let config = DeltaScanConfigBuilder::new()
.build(table.snapshot().unwrap())
.build(table.snapshot().unwrap().snapshot())
.unwrap();

let (object_store, mut operations) =
Expand All @@ -2164,7 +2183,7 @@ mod tests {
table.log_store().config().clone(),
);
let provider = DeltaTableProvider::try_new(
table.snapshot().unwrap().clone(),
table.snapshot().unwrap().snapshot.clone(),
Arc::new(log_store),
config,
)
Expand Down
11 changes: 4 additions & 7 deletions crates/core/src/delta_datafusion/schema_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::sync::Arc;
use crate::kernel::schema::cast_record_batch;
use arrow_array::RecordBatch;
use arrow_schema::{Schema, SchemaRef};
use datafusion::common::{not_impl_err, ColumnStatistics};
use datafusion::common::{not_impl_err, ColumnStatistics, Result};
use datafusion::datasource::schema_adapter::{SchemaAdapter, SchemaAdapterFactory, SchemaMapper};

/// A Schema Adapter Factory which provides casting record batches from parquet to meet
Expand Down Expand Up @@ -39,10 +39,7 @@ impl SchemaAdapter for DeltaSchemaAdapter {
Some(file_schema.fields.find(field.name())?.0)
}

fn map_schema(
&self,
file_schema: &Schema,
) -> datafusion::common::Result<(Arc<dyn SchemaMapper>, Vec<usize>)> {
fn map_schema(&self, file_schema: &Schema) -> Result<(Arc<dyn SchemaMapper>, Vec<usize>)> {
let mut projection = Vec::with_capacity(file_schema.fields().len());

for (file_idx, file_field) in file_schema.fields.iter().enumerate() {
Expand Down Expand Up @@ -71,15 +68,15 @@ pub(crate) struct SchemaMapping {
}

impl SchemaMapper for SchemaMapping {
fn map_batch(&self, batch: RecordBatch) -> datafusion::common::Result<RecordBatch> {
fn map_batch(&self, batch: RecordBatch) -> Result<RecordBatch> {
let record_batch = cast_record_batch(&batch, self.projected_schema.clone(), false, true)?;
Ok(record_batch)
}

fn map_column_statistics(
&self,
_file_col_statistics: &[ColumnStatistics],
) -> datafusion::common::Result<Vec<ColumnStatistics>> {
) -> Result<Vec<ColumnStatistics>> {
not_impl_err!("Mapping column statistics is not implemented for DeltaSchemaAdapter")
}
}
Loading
Loading