Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 16 additions & 26 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion crates/data_components/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,10 @@ datafusion-federation = { workspace = true }
datafusion-federation-sql = { workspace = true }
datafusion-table-providers = { workspace = true }
db_connection_pool = { path = "../db_connection_pool" }
delta_kernel = { version = "0.6.1", features = [
delta_kernel = { version = "0.9", features = [
"default-engine",
"cloud",
"arrow_54",
], optional = true }
document_parse = { path = "../document_parse" }
duckdb = { workspace = true, features = [
Expand Down
31 changes: 20 additions & 11 deletions crates/data_components/src/delta_lake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
use datafusion::physical_plan::{ExecutionPlan, PhysicalExpr};
use datafusion::scalar::ScalarValue;
use datafusion::sql::TableReference;
use delta_kernel::ExpressionRef;
use delta_kernel::Table;
use delta_kernel::engine::default::DefaultEngine;
use delta_kernel::engine::default::executor::tokio::TokioBackgroundExecutor;
Expand Down Expand Up @@ -144,13 +145,13 @@ impl DeltaTable {
.map_err(handle_delta_error)?;

let arrow_schema = Self::get_schema(&snapshot);
let delta_schema = snapshot.schema().clone();
let delta_schema = snapshot.schema();

Ok(Self {
table,
engine,
arrow_schema: Arc::new(arrow_schema),
delta_schema: Arc::new(delta_schema),
delta_schema,
})
}

Expand Down Expand Up @@ -359,18 +360,14 @@ impl TableProvider for DeltaTable {
let mut scan_context = ScanContext::new(scan_state, Arc::clone(&engine));

let scan_iter = scan
.scan_data(engine.as_ref())
.scan_metadata(engine.as_ref())
.map_err(map_delta_error_to_datafusion_err)?;

for scan_result in scan_iter {
let data = scan_result.map_err(map_delta_error_to_datafusion_err)?;
scan_context = delta_kernel::scan::state::visit_scan_files(
data.0.as_ref(),
data.1.as_ref(),
scan_context,
handle_scan_file,
)
.map_err(map_delta_error_to_datafusion_err)?;
let scan = scan_result.map_err(map_delta_error_to_datafusion_err)?;
scan_context = scan
.visit_scan_files(scan_context, handle_scan_file)
.map_err(map_delta_error_to_datafusion_err)?;
}

Ok::<_, datafusion::error::DataFusionError>((
Expand Down Expand Up @@ -546,6 +543,16 @@ struct PartitionFileContext {
partitioned_file: PartitionedFile,
selection_vector: Option<Vec<bool>>,
partition_values: HashMap<String, String>,

/// These are transforms that Delta wants to apply to the physical data read from the Parquet files.
/// Currently this is only used for adding partition columns and mapping the columns read from the Parquet files
/// into the correct place in the output schema.
///
/// Both of these functions are already handled for us by the `DataFusion` `ParquetExec`. However, we may need to
/// revisit this if more complex transformations are required.
///
/// See: <https://github.com/delta-io/delta-kernel-rs/blob/7e62d12def00f248eccef23e7672fd4db553274f/kernel/src/scan/mod.rs#L444>
_transform: Option<ExpressionRef>,
}

#[allow(clippy::needless_pass_by_value)]
Expand All @@ -557,6 +564,7 @@ fn handle_scan_file(
size: i64,
_stats: Option<Stats>,
dv_info: DvInfo,
transform: Option<ExpressionRef>,
partition_values: HashMap<String, String>,
) {
let root_url = match Url::parse(&scan_context.scan_state.table_root) {
Expand Down Expand Up @@ -619,6 +627,7 @@ fn handle_scan_file(
partitioned_file,
selection_vector,
partition_values,
_transform: transform,
});
}

Expand Down
Loading