17
17
18
18
//! Table scan api.
19
19
20
- use std:: collections:: HashMap ;
20
+ use std:: collections:: { HashMap , HashSet } ;
21
21
use std:: sync:: { Arc , RwLock } ;
22
22
23
23
use arrow_array:: RecordBatch ;
24
24
use futures:: channel:: mpsc:: { channel, Sender } ;
25
25
use futures:: stream:: BoxStream ;
26
26
use futures:: { SinkExt , StreamExt , TryFutureExt , TryStreamExt } ;
27
+ use itertools:: Itertools ;
27
28
use serde:: { Deserialize , Serialize } ;
28
29
29
30
use crate :: arrow:: ArrowReaderBuilder ;
@@ -38,7 +39,7 @@ use crate::io::FileIO;
38
39
use crate :: runtime:: spawn;
39
40
use crate :: spec:: {
40
41
DataContentType , DataFileFormat , ManifestContentType , ManifestEntryRef , ManifestFile ,
41
- ManifestList , Schema , SchemaRef , SnapshotRef , TableMetadataRef ,
42
+ ManifestList , ManifestStatus , Operation , Schema , SchemaRef , SnapshotRef , TableMetadataRef ,
42
43
} ;
43
44
use crate :: table:: Table ;
44
45
use crate :: utils:: available_parallelism;
@@ -55,6 +56,10 @@ pub struct TableScanBuilder<'a> {
55
56
// Defaults to none which means select all columns
56
57
column_names : Option < Vec < String > > ,
57
58
snapshot_id : Option < i64 > ,
59
+ /// Exclusive. Used for incremental scan.
60
+ from_snapshot_id : Option < i64 > ,
61
+ /// Inclusive. Used for incremental scan.
62
+ to_snapshot_id : Option < i64 > ,
58
63
batch_size : Option < usize > ,
59
64
case_sensitive : bool ,
60
65
filter : Option < Predicate > ,
@@ -78,6 +83,8 @@ impl<'a> TableScanBuilder<'a> {
78
83
table,
79
84
column_names : None ,
80
85
snapshot_id : None ,
86
+ from_snapshot_id : None ,
87
+ to_snapshot_id : None ,
81
88
batch_size : None ,
82
89
case_sensitive : true ,
83
90
filter : None ,
@@ -140,6 +147,18 @@ impl<'a> TableScanBuilder<'a> {
140
147
self
141
148
}
142
149
150
+ /// Set the starting snapshot id (exclusive) for incremental scan.
151
+ pub fn from_snapshot_id ( mut self , from_snapshot_id : i64 ) -> Self {
152
+ self . from_snapshot_id = Some ( from_snapshot_id) ;
153
+ self
154
+ }
155
+
156
+ /// Set the ending snapshot id (inclusive) for incremental scan.
157
+ pub fn to_snapshot_id ( mut self , to_snapshot_id : i64 ) -> Self {
158
+ self . to_snapshot_id = Some ( to_snapshot_id) ;
159
+ self
160
+ }
161
+
143
162
/// Sets the concurrency limit for both manifest files and manifest
144
163
/// entries for this scan
145
164
pub fn with_concurrency_limit ( mut self , limit : usize ) -> Self {
@@ -206,6 +225,25 @@ impl<'a> TableScanBuilder<'a> {
206
225
207
226
/// Build the table scan.
208
227
pub fn build ( self ) -> Result < TableScan > {
228
+ // Validate that we have either a snapshot scan or an incremental scan configuration
229
+ if self . from_snapshot_id . is_some ( ) || self . to_snapshot_id . is_some ( ) {
230
+ // For incremental scan, we need to_snapshot_id to be set. from_snapshot_id is optional.
231
+ if self . to_snapshot_id . is_none ( ) {
232
+ return Err ( Error :: new (
233
+ ErrorKind :: DataInvalid ,
234
+ "Incremental scan requires to_snapshot_id to be set" ,
235
+ ) ) ;
236
+ }
237
+
238
+ // snapshot_id should not be set for incremental scan
239
+ if self . snapshot_id . is_some ( ) {
240
+ return Err ( Error :: new (
241
+ ErrorKind :: DataInvalid ,
242
+ "snapshot_id should not be set for incremental scan. Use from_snapshot_id and to_snapshot_id instead." ,
243
+ ) ) ;
244
+ }
245
+ }
246
+
209
247
let snapshot = match self . snapshot_id {
210
248
Some ( snapshot_id) => self
211
249
. table
@@ -227,7 +265,6 @@ impl<'a> TableScanBuilder<'a> {
227
265
} ) ?
228
266
. clone ( ) ,
229
267
} ;
230
-
231
268
let schema = snapshot. schema ( self . table . metadata ( ) ) ?;
232
269
233
270
// Check that all column names exist in the schema.
@@ -297,6 +334,8 @@ impl<'a> TableScanBuilder<'a> {
297
334
snapshot_bound_predicate : snapshot_bound_predicate. map ( Arc :: new) ,
298
335
object_cache : self . table . object_cache ( ) ,
299
336
field_ids : Arc :: new ( field_ids) ,
337
+ from_snapshot_id : self . from_snapshot_id ,
338
+ to_snapshot_id : self . to_snapshot_id ,
300
339
partition_filter_cache : Arc :: new ( PartitionFilterCache :: new ( ) ) ,
301
340
manifest_evaluator_cache : Arc :: new ( ManifestEvaluatorCache :: new ( ) ) ,
302
341
expression_evaluator_cache : Arc :: new ( ExpressionEvaluatorCache :: new ( ) ) ,
@@ -358,6 +397,11 @@ struct PlanContext {
358
397
partition_filter_cache : Arc < PartitionFilterCache > ,
359
398
manifest_evaluator_cache : Arc < ManifestEvaluatorCache > ,
360
399
expression_evaluator_cache : Arc < ExpressionEvaluatorCache > ,
400
+
401
+ // for incremental scan.
402
+ // If `to_snapshot_id` is set, it means incremental scan. `from_snapshot_id` can be `None`.
403
+ from_snapshot_id : Option < i64 > ,
404
+ to_snapshot_id : Option < i64 > ,
361
405
}
362
406
363
407
impl TableScan {
@@ -375,6 +419,65 @@ impl TableScan {
375
419
// used to stream the results back to the caller
376
420
let ( file_scan_task_tx, file_scan_task_rx) = channel ( concurrency_limit_manifest_entries) ;
377
421
422
+ if let Some ( to_snapshot_id) = self . plan_context . to_snapshot_id {
423
+ // Incremental scan mode
424
+ let added_files = added_files_between (
425
+ & self . plan_context . object_cache ,
426
+ & self . plan_context . table_metadata ,
427
+ to_snapshot_id,
428
+ self . plan_context . from_snapshot_id ,
429
+ )
430
+ . await ?;
431
+
432
+ for entry in added_files {
433
+ let manifest_entry_context = ManifestEntryContext {
434
+ manifest_entry : entry,
435
+ expression_evaluator_cache : self
436
+ . plan_context
437
+ . expression_evaluator_cache
438
+ . clone ( ) ,
439
+ field_ids : self . plan_context . field_ids . clone ( ) ,
440
+ bound_predicates : None , // TODO: support predicates in incremental scan
441
+ partition_spec_id : 0 , // TODO: get correct partition spec id
442
+ // It's used to skip any data file whose partition data indicates that it can't contain
443
+ // any data that matches this scan's filter
444
+ snapshot_schema : self . plan_context . snapshot_schema . clone ( ) ,
445
+ // delete is not supported in incremental scan
446
+ delete_file_index : None ,
447
+ } ;
448
+
449
+ manifest_entry_data_ctx_tx
450
+ . clone ( )
451
+ . send ( manifest_entry_context)
452
+ . await
453
+ . map_err ( |_| Error :: new ( ErrorKind :: Unexpected , "mpsc channel SendError" ) ) ?;
454
+ }
455
+
456
+ let mut channel_for_manifest_entry_error = file_scan_task_tx. clone ( ) ;
457
+
458
+ // Process the [`ManifestEntry`] stream in parallel
459
+ spawn ( async move {
460
+ let result = manifest_entry_data_ctx_rx
461
+ . map ( |me_ctx| Ok ( ( me_ctx, file_scan_task_tx. clone ( ) ) ) )
462
+ . try_for_each_concurrent (
463
+ concurrency_limit_manifest_entries,
464
+ |( manifest_entry_context, tx) | async move {
465
+ spawn ( async move {
466
+ Self :: process_data_manifest_entry ( manifest_entry_context, tx) . await
467
+ } )
468
+ . await
469
+ } ,
470
+ )
471
+ . await ;
472
+
473
+ if let Err ( error) = result {
474
+ let _ = channel_for_manifest_entry_error. send ( Err ( error) ) . await ;
475
+ }
476
+ } ) ;
477
+
478
+ return Ok ( file_scan_task_rx. boxed ( ) ) ;
479
+ }
480
+
378
481
let delete_file_idx_and_tx: Option < ( DeleteFileIndex , Sender < DeleteFileContext > ) > =
379
482
if self . delete_file_processing_enabled {
380
483
Some ( DeleteFileIndex :: new ( ) )
@@ -1146,6 +1249,113 @@ impl FileScanTask {
1146
1249
}
1147
1250
}
1148
1251
1252
+ struct Ancestors {
1253
+ next : Option < SnapshotRef > ,
1254
+ get_snapshot : Box < dyn Fn ( i64 ) -> Option < SnapshotRef > + Send > ,
1255
+ }
1256
+
1257
+ impl Iterator for Ancestors {
1258
+ type Item = SnapshotRef ;
1259
+
1260
+ fn next ( & mut self ) -> Option < Self :: Item > {
1261
+ let snapshot = self . next . take ( ) ?;
1262
+ let result = snapshot. clone ( ) ;
1263
+ self . next = snapshot
1264
+ . parent_snapshot_id ( )
1265
+ . and_then ( |id| ( self . get_snapshot ) ( id) ) ;
1266
+ Some ( result)
1267
+ }
1268
+ }
1269
+
1270
+ /// Iterate starting from `snapshot` (inclusive) to the root snapshot.
1271
+ fn ancestors_of (
1272
+ table_metadata : & TableMetadataRef ,
1273
+ snapshot : i64 ,
1274
+ ) -> Box < dyn Iterator < Item = SnapshotRef > + Send > {
1275
+ if let Some ( snapshot) = table_metadata. snapshot_by_id ( snapshot) {
1276
+ let table_metadata = table_metadata. clone ( ) ;
1277
+ Box :: new ( Ancestors {
1278
+ next : Some ( snapshot. clone ( ) ) ,
1279
+ get_snapshot : Box :: new ( move |id| table_metadata. snapshot_by_id ( id) . cloned ( ) ) ,
1280
+ } )
1281
+ } else {
1282
+ Box :: new ( std:: iter:: empty ( ) )
1283
+ }
1284
+ }
1285
+
1286
+ /// Iterate starting from `snapshot` (inclusive) to `oldest_snapshot_id` (exclusive).
1287
+ fn ancestors_between (
1288
+ table_metadata : & TableMetadataRef ,
1289
+ latest_snapshot_id : i64 ,
1290
+ oldest_snapshot_id : Option < i64 > ,
1291
+ ) -> Box < dyn Iterator < Item = SnapshotRef > + Send > {
1292
+ let Some ( oldest_snapshot_id) = oldest_snapshot_id else {
1293
+ return Box :: new ( ancestors_of ( table_metadata, latest_snapshot_id) ) ;
1294
+ } ;
1295
+
1296
+ if latest_snapshot_id == oldest_snapshot_id {
1297
+ return Box :: new ( std:: iter:: empty ( ) ) ;
1298
+ }
1299
+
1300
+ Box :: new (
1301
+ ancestors_of ( table_metadata, latest_snapshot_id)
1302
+ . take_while ( move |snapshot| snapshot. snapshot_id ( ) != oldest_snapshot_id) ,
1303
+ )
1304
+ }
1305
+
1306
+ /// Get all added files between two snapshots.
1307
+ /// - data files in `Append` and `Overwrite` snapshots are included.
1308
+ /// - delete files are ignored
1309
+ /// - `Replace` snapshots (e.g., compaction) are ignored.
1310
+ ///
1311
+ /// `latest_snapshot_id` is inclusive, `oldest_snapshot_id` is exclusive.
1312
+ async fn added_files_between (
1313
+ object_cache : & ObjectCache ,
1314
+ table_metadata : & TableMetadataRef ,
1315
+ latest_snapshot_id : i64 ,
1316
+ oldest_snapshot_id : Option < i64 > ,
1317
+ ) -> Result < Vec < ManifestEntryRef > > {
1318
+ let mut added_files = vec ! [ ] ;
1319
+
1320
+ let snapshots = ancestors_between ( table_metadata, latest_snapshot_id, oldest_snapshot_id)
1321
+ . filter ( |snapshot| {
1322
+ matches ! (
1323
+ snapshot. summary( ) . operation,
1324
+ Operation :: Append | Operation :: Overwrite
1325
+ )
1326
+ } )
1327
+ . collect_vec ( ) ;
1328
+ let snapshot_ids: HashSet < i64 > = snapshots
1329
+ . iter ( )
1330
+ . map ( |snapshot| snapshot. snapshot_id ( ) )
1331
+ . collect ( ) ;
1332
+
1333
+ for snapshot in snapshots {
1334
+ let manifest_list = object_cache
1335
+ . get_manifest_list ( & snapshot, table_metadata)
1336
+ . await ?;
1337
+
1338
+ for manifest_file in manifest_list. entries ( ) {
1339
+ if !snapshot_ids. contains ( & manifest_file. added_snapshot_id ) {
1340
+ continue ;
1341
+ }
1342
+ let manifest = object_cache. get_manifest ( manifest_file) . await ?;
1343
+ let entries = manifest. entries ( ) . iter ( ) . filter ( |entry| {
1344
+ matches ! ( entry. status( ) , ManifestStatus :: Added )
1345
+ && matches ! ( entry. data_file( ) . content_type( ) , DataContentType :: Data )
1346
+ && (
1347
+ // Is it possible that the snapshot id here is not contained?
1348
+ entry. snapshot_id ( ) . is_none ( )
1349
+ || snapshot_ids. contains ( & entry. snapshot_id ( ) . unwrap ( ) )
1350
+ )
1351
+ } ) ;
1352
+ added_files. extend ( entries. cloned ( ) ) ;
1353
+ }
1354
+ }
1355
+
1356
+ Ok ( added_files)
1357
+ }
1358
+
1149
1359
#[ cfg( test) ]
1150
1360
pub mod tests {
1151
1361
use std:: collections:: HashMap ;
0 commit comments