Skip to content

Commit 52cf8b9

Browse files
committed
feat: schema evolution of equality delete file record batches
1 parent 563c9e9 commit 52cf8b9

File tree

2 files changed

+53
-12
lines changed

2 files changed

+53
-12
lines changed

crates/iceberg/src/arrow/delete_file_manager.rs

+45-7
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,14 @@ use futures::channel::oneshot;
2525
use futures::future::join_all;
2626
use futures::{StreamExt, TryStreamExt};
2727

28+
use crate::arrow::record_batch_transformer::RecordBatchTransformer;
2829
use crate::arrow::ArrowReader;
2930
use crate::delete_vector::DeleteVector;
3031
use crate::expr::Predicate::AlwaysTrue;
3132
use crate::expr::{Bind, BoundPredicate, Predicate};
3233
use crate::io::FileIO;
3334
use crate::scan::{ArrowRecordBatchStream, FileScanTask, FileScanTaskDeleteFile};
34-
use crate::spec::DataContentType;
35+
use crate::spec::{DataContentType, Schema, SchemaRef};
3536
use crate::{Error, ErrorKind, Result};
3637

3738
#[allow(unused)]
@@ -164,7 +165,7 @@ impl CachingDeleteFileManager {
164165
/// * The unbound Predicates resulting from equality deletes are sent to their associated oneshot
165166
/// channel to store them in the right place in the delete file managers state.
166167
/// * The results of all of these futures are awaited on in parallel with the specified
167-
/// level of concurrency and collected into a vec. We then combine all of the delete
168+
/// level of concurrency and collected into a vec. We then combine all the delete
168169
/// vector maps that resulted from any positional delete or delete vector files into a
169170
/// single map and persist it in the state.
170171
///
@@ -206,19 +207,27 @@ impl CachingDeleteFileManager {
206207
pub(crate) async fn load_deletes(
207208
&self,
208209
delete_file_entries: &[FileScanTaskDeleteFile],
210+
schema: SchemaRef,
209211
) -> Result<()> {
210212
let stream_items = delete_file_entries
211213
.iter()
212-
.map(|t| (t.clone(), self.file_io.clone(), self.state.clone()))
214+
.map(|t| {
215+
(
216+
t.clone(),
217+
self.file_io.clone(),
218+
self.state.clone(),
219+
schema.clone(),
220+
)
221+
})
213222
.collect::<Vec<_>>();
214223
// NOTE: removing the collect and just passing the iterator to futures::stream:iter
215224
// results in an error 'implementation of `std::ops::FnOnce` is not general enough'
216225

217226
let task_stream = futures::stream::iter(stream_items.into_iter());
218227

219228
let results: Vec<ParsedDeleteFileContext> = task_stream
220-
.map(move |(task, file_io, state_ref)| async {
221-
Self::load_file_for_task(task, file_io, state_ref).await
229+
.map(move |(task, file_io, state_ref, schema)| async {
230+
Self::load_file_for_task(task, file_io, state_ref, schema).await
222231
})
223232
.map(move |ctx| Ok(async { Self::parse_file_content_for_task(ctx.await?).await }))
224233
.try_buffer_unordered(self.concurrency_limit_data_files)
@@ -248,6 +257,7 @@ impl CachingDeleteFileManager {
248257
task: FileScanTaskDeleteFile,
249258
file_io: FileIO,
250259
state: StateRef,
260+
schema: SchemaRef,
251261
) -> Result<DeleteFileContext> {
252262
match task.file_type {
253263
DataContentType::PositionDeletes => Ok(DeleteFileContext::PosDels(
@@ -271,7 +281,11 @@ impl CachingDeleteFileManager {
271281
};
272282

273283
Ok(DeleteFileContext::FreshEqDel {
274-
batch_stream: Self::parquet_to_batch_stream(&task.file_path, file_io).await?,
284+
batch_stream: Self::evolve_schema(
285+
Self::parquet_to_batch_stream(&task.file_path, file_io).await?,
286+
schema,
287+
)
288+
.await?,
275289
sender,
276290
})
277291
}
@@ -351,6 +365,30 @@ impl CachingDeleteFileManager {
351365
Ok(Box::pin(record_batch_stream) as ArrowRecordBatchStream)
352366
}
353367

368+
/// Evolves the schema of the RecordBatches from an equality delete file
369+
async fn evolve_schema(
370+
record_batch_stream: ArrowRecordBatchStream,
371+
target_schema: Arc<Schema>,
372+
) -> Result<ArrowRecordBatchStream> {
373+
let eq_ids = target_schema
374+
.as_ref()
375+
.field_id_to_name_map()
376+
.keys()
377+
.cloned()
378+
.collect::<Vec<_>>();
379+
380+
let mut record_batch_transformer =
381+
RecordBatchTransformer::build(target_schema.clone(), &eq_ids);
382+
383+
let record_batch_stream = record_batch_stream.map(move |record_batch| {
384+
record_batch.and_then(|record_batch| {
385+
record_batch_transformer.process_record_batch(record_batch)
386+
})
387+
});
388+
389+
Ok(Box::pin(record_batch_stream) as ArrowRecordBatchStream)
390+
}
391+
354392
/// Parses a record batch stream coming from positional delete files
355393
///
356394
/// Returns a map of data file path to a delete vector
@@ -483,7 +521,7 @@ mod tests {
483521
let file_scan_tasks = setup(table_location);
484522

485523
let result = delete_file_manager
486-
.load_deletes(&file_scan_tasks[0].deletes)
524+
.load_deletes(&file_scan_tasks[0].deletes, file_scan_tasks[0].schema_ref())
487525
.await;
488526

489527
assert!(result.is_err_and(|e| e.kind() == ErrorKind::FeatureUnsupported));

crates/iceberg/src/arrow/reader.rs

+8-5
Original file line numberDiff line numberDiff line change
@@ -200,11 +200,14 @@ impl ArrowReader {
200200

201201
// concurrently retrieve delete files and create RecordBatchStreamBuilder
202202
let (_, mut record_batch_stream_builder) = try_join!(
203-
delete_file_manager.load_deletes(if delete_file_support_enabled {
204-
&task.deletes
205-
} else {
206-
&[]
207-
},),
203+
delete_file_manager.load_deletes(
204+
if delete_file_support_enabled {
205+
&task.deletes
206+
} else {
207+
&[]
208+
},
209+
task.schema.clone()
210+
),
208211
Self::create_parquet_record_batch_stream_builder(
209212
&task.data_file_path,
210213
file_io.clone(),

0 commit comments

Comments
 (0)