@@ -242,6 +242,13 @@ impl<'a> TableScanBuilder<'a> {
242242 "snapshot_id should not be set for incremental scan. Use from_snapshot_id and to_snapshot_id instead." ,
243243 ) ) ;
244244 }
245+
246+ if self . delete_file_processing_enabled {
247+ return Err ( Error :: new (
248+ ErrorKind :: DataInvalid ,
249+ "delete_file_processing_enabled should not be set for incremental scan" ,
250+ ) ) ;
251+ }
245252 }
246253
247254 let snapshot = match self . snapshot_id {
@@ -419,84 +426,25 @@ impl TableScan {
419426 // used to stream the results back to the caller
420427 let ( file_scan_task_tx, file_scan_task_rx) = channel ( concurrency_limit_manifest_entries) ;
421428
422- if let Some ( to_snapshot_id) = self . plan_context . to_snapshot_id {
423- // Incremental scan mode
424- let added_files = added_files_between (
425- & self . plan_context . object_cache ,
426- & self . plan_context . table_metadata ,
427- to_snapshot_id,
428- self . plan_context . from_snapshot_id ,
429- )
430- . await ?;
431-
432- for entry in added_files {
433- let manifest_entry_context = ManifestEntryContext {
434- manifest_entry : entry,
435- expression_evaluator_cache : self
436- . plan_context
437- . expression_evaluator_cache
438- . clone ( ) ,
439- field_ids : self . plan_context . field_ids . clone ( ) ,
440- bound_predicates : None , // TODO: support predicates in incremental scan
441- partition_spec_id : 0 , // TODO: get correct partition spec id
442- // It's used to skip any data file whose partition data indicates that it can't contain
443- // any data that matches this scan's filter
444- snapshot_schema : self . plan_context . snapshot_schema . clone ( ) ,
445- // delete is not supported in incremental scan
446- delete_file_index : None ,
447- } ;
448-
449- manifest_entry_data_ctx_tx
450- . clone ( )
451- . send ( manifest_entry_context)
452- . await
453- . map_err ( |_| Error :: new ( ErrorKind :: Unexpected , "mpsc channel SendError" ) ) ?;
454- }
455-
456- let mut channel_for_manifest_entry_error = file_scan_task_tx. clone ( ) ;
457-
458- // Process the [`ManifestEntry`] stream in parallel
459- spawn ( async move {
460- let result = manifest_entry_data_ctx_rx
461- . map ( |me_ctx| Ok ( ( me_ctx, file_scan_task_tx. clone ( ) ) ) )
462- . try_for_each_concurrent (
463- concurrency_limit_manifest_entries,
464- |( manifest_entry_context, tx) | async move {
465- spawn ( async move {
466- Self :: process_data_manifest_entry ( manifest_entry_context, tx) . await
467- } )
468- . await
469- } ,
470- )
471- . await ;
472-
473- if let Err ( error) = result {
474- let _ = channel_for_manifest_entry_error. send ( Err ( error) ) . await ;
475- }
476- } ) ;
477-
478- return Ok ( file_scan_task_rx. boxed ( ) ) ;
479- }
480-
481429 let delete_file_idx_and_tx: Option < ( DeleteFileIndex , Sender < DeleteFileContext > ) > =
482430 if self . delete_file_processing_enabled {
483431 Some ( DeleteFileIndex :: new ( ) )
484432 } else {
485433 None
486434 } ;
487435
488- let manifest_list = self . plan_context . get_manifest_list ( ) . await ?;
489-
490436 // get the [`ManifestFile`]s from the [`ManifestList`], filtering out any
491437 // whose partitions cannot match this
492438 // scan's filter
493- let manifest_file_contexts = self . plan_context . build_manifest_file_contexts (
494- manifest_list,
495- manifest_entry_data_ctx_tx,
496- delete_file_idx_and_tx. as_ref ( ) . map ( |( delete_file_idx, _) | {
497- ( delete_file_idx. clone ( ) , manifest_entry_delete_ctx_tx)
498- } ) ,
499- ) ?;
439+ let manifest_file_contexts = self
440+ . plan_context
441+ . build_manifest_file_contexts (
442+ manifest_entry_data_ctx_tx,
443+ delete_file_idx_and_tx. as_ref ( ) . map ( |( delete_file_idx, _) | {
444+ ( delete_file_idx. clone ( ) , manifest_entry_delete_ctx_tx)
445+ } ) ,
446+ )
447+ . await ?;
500448
501449 let mut channel_for_manifest_error = file_scan_task_tx. clone ( ) ;
502450
@@ -513,8 +461,6 @@ impl TableScan {
513461 }
514462 } ) ;
515463
516- let mut channel_for_data_manifest_entry_error = file_scan_task_tx. clone ( ) ;
517-
518464 if let Some ( ( _, delete_file_tx) ) = delete_file_idx_and_tx {
519465 let mut channel_for_delete_manifest_entry_error = file_scan_task_tx. clone ( ) ;
520466
@@ -543,6 +489,7 @@ impl TableScan {
543489 . await ;
544490 }
545491
492+ let mut channel_for_data_manifest_entry_error = file_scan_task_tx. clone ( ) ;
546493 // Process the data file [`ManifestEntry`] stream in parallel
547494 spawn ( async move {
548495 let result = manifest_entry_data_ctx_rx
@@ -701,6 +648,7 @@ struct BoundPredicates {
701648 snapshot_bound_predicate : BoundPredicate ,
702649}
703650
651+ type ManifestEntryFilterFn = dyn Fn ( & ManifestEntryRef ) -> bool + Send + Sync ;
704652/// Wraps a [`ManifestFile`] alongside the objects that are needed
705653/// to process it in a thread-safe manner
706654struct ManifestFileContext {
@@ -714,6 +662,10 @@ struct ManifestFileContext {
714662 snapshot_schema : SchemaRef ,
715663 expression_evaluator_cache : Arc < ExpressionEvaluatorCache > ,
716664 delete_file_index : Option < DeleteFileIndex > ,
665+
666+ /// filter manifest entries.
667+ /// Used for different kind of scans, e.g., only scan newly added files without delete files.
668+ filter_fn : Option < Arc < ManifestEntryFilterFn > > ,
717669}
718670
719671/// Wraps a [`ManifestEntryRef`] alongside the objects that are needed
@@ -742,12 +694,13 @@ impl ManifestFileContext {
742694 mut sender,
743695 expression_evaluator_cache,
744696 delete_file_index,
745- ..
697+ filter_fn ,
746698 } = self ;
699+ let filter_fn = filter_fn. unwrap_or_else ( || Arc :: new ( |_| true ) ) ;
747700
748701 let manifest = object_cache. get_manifest ( & manifest_file) . await ?;
749702
750- for manifest_entry in manifest. entries ( ) {
703+ for manifest_entry in manifest. entries ( ) . iter ( ) . filter ( |e| filter_fn ( e ) ) {
751704 let manifest_entry_context = ManifestEntryContext {
752705 // TODO: refactor to avoid the expensive ManifestEntry clone
753706 manifest_entry : manifest_entry. clone ( ) ,
@@ -835,18 +788,77 @@ impl PlanContext {
835788 Ok ( partition_filter)
836789 }
837790
838- fn build_manifest_file_contexts (
791+ async fn build_manifest_file_contexts (
839792 & self ,
840- manifest_list : Arc < ManifestList > ,
841793 tx_data : Sender < ManifestEntryContext > ,
842794 delete_file_idx_and_tx : Option < ( DeleteFileIndex , Sender < ManifestEntryContext > ) > ,
843795 ) -> Result < Box < impl Iterator < Item = Result < ManifestFileContext > > + ' static > > {
844- let manifest_files = manifest_list. entries ( ) . iter ( ) ;
796+ let mut filter_fn: Option < Arc < ManifestEntryFilterFn > > = None ;
797+ let manifest_files = {
798+ if let Some ( to_snapshot_id) = self . to_snapshot_id {
799+ // Incremental scan mode:
800+ // Get all added files between two snapshots.
801+ // - data files in `Append` and `Overwrite` snapshots are included.
802+ // - delete files are ignored
803+ // - `Replace` snapshots (e.g., compaction) are ignored.
804+ //
805+ // `latest_snapshot_id` is inclusive, `oldest_snapshot_id` is exclusive.
806+
807+ // prevent misuse
808+ assert ! (
809+ delete_file_idx_and_tx. is_none( ) ,
810+ "delete file is not supported in incremental scan mode"
811+ ) ;
812+
813+ let snapshots =
814+ ancestors_between ( & self . table_metadata , to_snapshot_id, self . from_snapshot_id )
815+ . filter ( |snapshot| {
816+ matches ! (
817+ snapshot. summary( ) . operation,
818+ Operation :: Append | Operation :: Overwrite
819+ )
820+ } )
821+ . collect_vec ( ) ;
822+ let snapshot_ids: HashSet < i64 > = snapshots
823+ . iter ( )
824+ . map ( |snapshot| snapshot. snapshot_id ( ) )
825+ . collect ( ) ;
826+
827+ let mut manifest_files = vec ! [ ] ;
828+ for snapshot in snapshots {
829+ let manifest_list = self
830+ . object_cache
831+ . get_manifest_list ( & snapshot, & self . table_metadata )
832+ . await ?;
833+ for entry in manifest_list. entries ( ) {
834+ if !snapshot_ids. contains ( & entry. added_snapshot_id ) {
835+ continue ;
836+ }
837+ manifest_files. push ( entry. clone ( ) ) ;
838+ }
839+ }
840+
841+ filter_fn = Some ( Arc :: new ( move |entry : & ManifestEntryRef | {
842+ matches ! ( entry. status( ) , ManifestStatus :: Added )
843+ && matches ! ( entry. data_file( ) . content_type( ) , DataContentType :: Data )
844+ && (
845+ // Is it possible that the snapshot id here is not contained?
846+ entry. snapshot_id ( ) . is_none ( )
847+ || snapshot_ids. contains ( & entry. snapshot_id ( ) . unwrap ( ) )
848+ )
849+ } ) ) ;
850+
851+ manifest_files
852+ } else {
853+ let manifest_list = self . get_manifest_list ( ) . await ?;
854+ manifest_list. entries ( ) . to_vec ( )
855+ }
856+ } ;
845857
846858 // TODO: Ideally we could ditch this intermediate Vec as we return an iterator.
847859 let mut filtered_mfcs = vec ! [ ] ;
848860
849- for manifest_file in manifest_files {
861+ for manifest_file in & manifest_files {
850862 let ( delete_file_idx, tx) = if manifest_file. content == ManifestContentType :: Deletes {
851863 let Some ( ( delete_file_idx, tx) ) = delete_file_idx_and_tx. as_ref ( ) else {
852864 continue ;
@@ -885,6 +897,7 @@ impl PlanContext {
885897 partition_bound_predicate,
886898 tx,
887899 delete_file_idx,
900+ filter_fn. clone ( ) ,
888901 ) ;
889902
890903 filtered_mfcs. push ( Ok ( mfc) ) ;
@@ -899,6 +912,7 @@ impl PlanContext {
899912 partition_filter : Option < Arc < BoundPredicate > > ,
900913 sender : Sender < ManifestEntryContext > ,
901914 delete_file_index : Option < DeleteFileIndex > ,
915+ filter_fn : Option < Arc < ManifestEntryFilterFn > > ,
902916 ) -> ManifestFileContext {
903917 let bound_predicates =
904918 if let ( Some ( ref partition_bound_predicate) , Some ( snapshot_bound_predicate) ) =
@@ -921,6 +935,7 @@ impl PlanContext {
921935 field_ids : self . field_ids . clone ( ) ,
922936 expression_evaluator_cache : self . expression_evaluator_cache . clone ( ) ,
923937 delete_file_index,
938+ filter_fn,
924939 }
925940 }
926941}
@@ -1303,59 +1318,6 @@ fn ancestors_between(
13031318 )
13041319}
13051320
1306- /// Get all added files between two snapshots.
1307- /// - data files in `Append` and `Overwrite` snapshots are included.
1308- /// - delete files are ignored
1309- /// - `Replace` snapshots (e.g., compaction) are ignored.
1310- ///
1311- /// `latest_snapshot_id` is inclusive, `oldest_snapshot_id` is exclusive.
1312- async fn added_files_between (
1313- object_cache : & ObjectCache ,
1314- table_metadata : & TableMetadataRef ,
1315- latest_snapshot_id : i64 ,
1316- oldest_snapshot_id : Option < i64 > ,
1317- ) -> Result < Vec < ManifestEntryRef > > {
1318- let mut added_files = vec ! [ ] ;
1319-
1320- let snapshots = ancestors_between ( table_metadata, latest_snapshot_id, oldest_snapshot_id)
1321- . filter ( |snapshot| {
1322- matches ! (
1323- snapshot. summary( ) . operation,
1324- Operation :: Append | Operation :: Overwrite
1325- )
1326- } )
1327- . collect_vec ( ) ;
1328- let snapshot_ids: HashSet < i64 > = snapshots
1329- . iter ( )
1330- . map ( |snapshot| snapshot. snapshot_id ( ) )
1331- . collect ( ) ;
1332-
1333- for snapshot in snapshots {
1334- let manifest_list = object_cache
1335- . get_manifest_list ( & snapshot, table_metadata)
1336- . await ?;
1337-
1338- for manifest_file in manifest_list. entries ( ) {
1339- if !snapshot_ids. contains ( & manifest_file. added_snapshot_id ) {
1340- continue ;
1341- }
1342- let manifest = object_cache. get_manifest ( manifest_file) . await ?;
1343- let entries = manifest. entries ( ) . iter ( ) . filter ( |entry| {
1344- matches ! ( entry. status( ) , ManifestStatus :: Added )
1345- && matches ! ( entry. data_file( ) . content_type( ) , DataContentType :: Data )
1346- && (
1347- // Is it possible that the snapshot id here is not contained?
1348- entry. snapshot_id ( ) . is_none ( )
1349- || snapshot_ids. contains ( & entry. snapshot_id ( ) . unwrap ( ) )
1350- )
1351- } ) ;
1352- added_files. extend ( entries. cloned ( ) ) ;
1353- }
1354- }
1355-
1356- Ok ( added_files)
1357- }
1358-
13591321#[ cfg( test) ]
13601322pub mod tests {
13611323 use std:: collections:: HashMap ;
0 commit comments