@@ -37,6 +37,7 @@ use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
3737use datafusion:: physical_plan:: { ExecutionPlan , PhysicalExpr } ;
3838use datafusion:: scalar:: ScalarValue ;
3939use datafusion:: sql:: TableReference ;
40+ use delta_kernel:: ExpressionRef ;
4041use delta_kernel:: Table ;
4142use delta_kernel:: engine:: default:: DefaultEngine ;
4243use delta_kernel:: engine:: default:: executor:: tokio:: TokioBackgroundExecutor ;
@@ -144,13 +145,13 @@ impl DeltaTable {
144145 . map_err ( handle_delta_error) ?;
145146
146147 let arrow_schema = Self :: get_schema ( & snapshot) ;
147- let delta_schema = snapshot. schema ( ) . clone ( ) ;
148+ let delta_schema = snapshot. schema ( ) ;
148149
149150 Ok ( Self {
150151 table,
151152 engine,
152153 arrow_schema : Arc :: new ( arrow_schema) ,
153- delta_schema : Arc :: new ( delta_schema ) ,
154+ delta_schema,
154155 } )
155156 }
156157
@@ -359,18 +360,14 @@ impl TableProvider for DeltaTable {
359360 let mut scan_context = ScanContext :: new ( scan_state, Arc :: clone ( & engine) ) ;
360361
361362 let scan_iter = scan
362- . scan_data ( engine. as_ref ( ) )
363+ . scan_metadata ( engine. as_ref ( ) )
363364 . map_err ( map_delta_error_to_datafusion_err) ?;
364365
365366 for scan_result in scan_iter {
366- let data = scan_result. map_err ( map_delta_error_to_datafusion_err) ?;
367- scan_context = delta_kernel:: scan:: state:: visit_scan_files (
368- data. 0 . as_ref ( ) ,
369- data. 1 . as_ref ( ) ,
370- scan_context,
371- handle_scan_file,
372- )
373- . map_err ( map_delta_error_to_datafusion_err) ?;
367+ let scan = scan_result. map_err ( map_delta_error_to_datafusion_err) ?;
368+ scan_context = scan
369+ . visit_scan_files ( scan_context, handle_scan_file)
370+ . map_err ( map_delta_error_to_datafusion_err) ?;
374371 }
375372
376373 Ok :: < _ , datafusion:: error:: DataFusionError > ( (
@@ -546,6 +543,16 @@ struct PartitionFileContext {
546543 partitioned_file : PartitionedFile ,
547544 selection_vector : Option < Vec < bool > > ,
548545 partition_values : HashMap < String , String > ,
546+
547+ /// These are transforms that Delta wants to apply to the physical data read from the Parquet files.
548+ /// Currently this is only used for adding partition columns and mapping the columns read from the Parquet files
549+ /// into the correct place in the output schema.
550+ ///
551+ /// Both of these functions are already handled for us by the `DataFusion` `ParquetExec`. However, we may need to
552+ /// revisit this if more complex transformations are required.
553+ ///
554+ /// See: <https://github.com/delta-io/delta-kernel-rs/blob/7e62d12def00f248eccef23e7672fd4db553274f/kernel/src/scan/mod.rs#L444>
555+ _transform : Option < ExpressionRef > ,
549556}
550557
551558#[ allow( clippy:: needless_pass_by_value) ]
@@ -557,6 +564,7 @@ fn handle_scan_file(
557564 size : i64 ,
558565 _stats : Option < Stats > ,
559566 dv_info : DvInfo ,
567+ transform : Option < ExpressionRef > ,
560568 partition_values : HashMap < String , String > ,
561569) {
562570 let root_url = match Url :: parse ( & scan_context. scan_state . table_root ) {
@@ -619,6 +627,7 @@ fn handle_scan_file(
619627 partitioned_file,
620628 selection_vector,
621629 partition_values,
630+ _transform : transform,
622631 } ) ;
623632}
624633
0 commit comments