Skip to content

Commit a38c77a

Browse files
committed
feat: allow iterating over logical files
Signed-off-by: Robert Pack <[email protected]>
1 parent 7886d97 commit a38c77a

File tree

2 files changed

+74
-57
lines changed

2 files changed

+74
-57
lines changed

crates/core/src/kernel/snapshot_next/iterators.rs

Lines changed: 47 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use std::collections::HashSet;
2-
use std::sync::{Arc, LazyLock};
2+
use std::sync::Arc;
33

44
use arrow_array::cast::AsArray;
55
use arrow_array::types::Int64Type;
@@ -14,7 +14,6 @@ use delta_kernel::engine::arrow_data::ArrowEngineData;
1414
use delta_kernel::engine::arrow_expression::ProvidesColumnByName;
1515
use delta_kernel::engine_data::{GetData, RowVisitor};
1616
use delta_kernel::expressions::{Scalar, StructData};
17-
use delta_kernel::scan::scan_row_schema;
1817

1918
use crate::kernel::scalars::ScalarExt;
2019
use crate::{DeltaResult, DeltaTableError};
@@ -202,23 +201,6 @@ impl LogicalFileView {
202201
}
203202
}
204203

205-
impl Iterator for LogicalFileView {
206-
type Item = LogicalFileView;
207-
208-
fn next(&mut self) -> Option<Self::Item> {
209-
if self.index < self.files.num_rows() {
210-
let file = LogicalFileView {
211-
files: self.files.clone(),
212-
index: self.index,
213-
};
214-
self.index += 1;
215-
Some(file)
216-
} else {
217-
None
218-
}
219-
}
220-
}
221-
222204
pub struct LogicalFileViewIterator<I>
223205
where
224206
I: IntoIterator<Item = Result<RecordBatch, DeltaTableError>>,
@@ -244,43 +226,43 @@ where
244226
}
245227
}
246228

247-
// impl<I> Iterator for LogicalFileViewIterator<I>
248-
// where
249-
// I: IntoIterator<Item = DeltaResult<RecordBatch>>,
250-
// {
251-
// type Item = DeltaResult<LogicalFileView>;
252-
//
253-
// fn next(&mut self) -> Option<Self::Item> {
254-
// if let Some(batch) = &self.batch {
255-
// if self.current < batch.num_rows() {
256-
// let item = LogicalFileView {
257-
// files: batch.clone(),
258-
// index: self.current,
259-
// };
260-
// self.current += 1;
261-
// return Some(Ok(item));
262-
// }
263-
// }
264-
// match self.inner.next() {
265-
// Some(Ok(batch)) => {
266-
// if validate_logical_file(&batch).is_err() {
267-
// return Some(Err(DeltaTableError::generic(
268-
// "Invalid logical file data encountered.",
269-
// )));
270-
// }
271-
// self.batch = Some(batch);
272-
// self.current = 0;
273-
// self.next()
274-
// }
275-
// Some(Err(e)) => Some(Err(e)),
276-
// None => None,
277-
// }
278-
// }
279-
//
280-
// fn size_hint(&self) -> (usize, Option<usize>) {
281-
// self.inner.size_hint()
282-
// }
283-
// }
229+
impl<I> Iterator for LogicalFileViewIterator<I>
230+
where
231+
I: IntoIterator<Item = DeltaResult<RecordBatch>>,
232+
{
233+
type Item = DeltaResult<LogicalFileView>;
234+
235+
fn next(&mut self) -> Option<Self::Item> {
236+
if let Some(batch) = &self.batch {
237+
if self.current < batch.num_rows() {
238+
let item = LogicalFileView {
239+
files: batch.clone(),
240+
index: self.current,
241+
};
242+
self.current += 1;
243+
return Some(Ok(item));
244+
}
245+
}
246+
match self.inner.next() {
247+
Some(Ok(batch)) => {
248+
if validate_logical_file(&batch).is_err() {
249+
return Some(Err(DeltaTableError::generic(
250+
"Invalid logical file data encountered.",
251+
)));
252+
}
253+
self.batch = Some(batch);
254+
self.current = 0;
255+
self.next()
256+
}
257+
Some(Err(e)) => Some(Err(e)),
258+
None => None,
259+
}
260+
}
261+
262+
fn size_hint(&self) -> (usize, Option<usize>) {
263+
self.inner.size_hint()
264+
}
265+
}
284266

285267
pub struct AddViewIterator<I>
286268
where
@@ -353,6 +335,15 @@ pub(crate) fn validate_add(batch: &RecordBatch) -> DeltaResult<()> {
353335
Ok(())
354336
}
355337

338+
fn validate_logical_file(batch: &RecordBatch) -> DeltaResult<()> {
339+
validate_column::<StringArray>(batch, &["path"])?;
340+
validate_column::<Int64Array>(batch, &["size"])?;
341+
validate_column::<Int64Array>(batch, &["modificationTime"])?;
342+
// validate_column::<StructArray>(batch, &["deletionVector"])?;
343+
// validate_column::<StructArray>(batch, &["fileConstantValues"])?;
344+
Ok(())
345+
}
346+
356347
fn validate_column<'a, T: Array + 'static>(
357348
actions: &'a RecordBatch,
358349
col: &'a [impl AsRef<str>],

crates/core/src/kernel/snapshot_next/mod.rs

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ use delta_kernel::scan::scan_row_schema;
1717
use delta_kernel::schema::{DataType, Schema};
1818
use delta_kernel::table_properties::TableProperties;
1919
use delta_kernel::{EngineData, ExpressionRef, Version};
20-
use iterators::{AddView, AddViewIterator};
20+
use iterators::{AddView, AddViewIterator, LogicalFileView, LogicalFileViewIterator};
2121
use itertools::Itertools;
2222
use url::Url;
2323

@@ -109,6 +109,16 @@ pub trait Snapshot {
109109
predicate: Option<ExpressionRef>,
110110
) -> DeltaResult<Box<dyn Iterator<Item = DeltaResult<RecordBatch>>>>;
111111

112+
fn logical_files_view(
113+
&self,
114+
predicate: Option<ExpressionRef>,
115+
) -> DeltaResult<Box<dyn Iterator<Item = DeltaResult<LogicalFileView>>>> {
116+
#[allow(deprecated)]
117+
Ok(Box::new(LogicalFileViewIterator::new(
118+
self.logical_files(predicate)?,
119+
)))
120+
}
121+
112122
/// Get all currently active files in the table.
113123
///
114124
/// # Parameters
@@ -125,6 +135,10 @@ pub trait Snapshot {
125135
predicate: Option<ExpressionRef>,
126136
) -> DeltaResult<Box<dyn Iterator<Item = DeltaResult<RecordBatch>>>>;
127137

138+
#[deprecated(
139+
since = "0.25.0",
140+
note = "Use `logical_files_view` instead, which returns a more focussed dataset and avoids computational overhead."
141+
)]
128142
fn files_view(
129143
&self,
130144
predicate: Option<ExpressionRef>,
@@ -423,6 +437,7 @@ mod tests {
423437
test_files_view(snapshot.as_ref())?;
424438
test_commit_infos(snapshot.as_ref())?;
425439
test_logical_files(snapshot.as_ref())?;
440+
test_logical_files_view(snapshot.as_ref())?;
426441
}
427442

428443
let mut snapshot = get_snapshot(ctx, TestTables::Checkpoints, Some(0))?.await?;
@@ -434,6 +449,7 @@ mod tests {
434449
test_files_view(snapshot.as_ref())?;
435450
test_commit_infos(snapshot.as_ref())?;
436451
test_logical_files(snapshot.as_ref())?;
452+
test_logical_files_view(snapshot.as_ref())?;
437453
}
438454

439455
Ok(())
@@ -451,6 +467,15 @@ mod tests {
451467
Ok(())
452468
}
453469

470+
fn test_logical_files_view(snapshot: &dyn Snapshot) -> TestResult<()> {
471+
let num_files_view = snapshot
472+
.logical_files_view(None)?
473+
.map(|f| f.unwrap().path().to_string())
474+
.count() as u64;
475+
assert_eq!(num_files_view, snapshot.version());
476+
Ok(())
477+
}
478+
454479
fn test_files(snapshot: &dyn Snapshot) -> TestResult<()> {
455480
#[allow(deprecated)]
456481
let batches = snapshot.files(None)?.collect::<Result<Vec<_>, _>>()?;
@@ -460,6 +485,7 @@ mod tests {
460485
}
461486

462487
fn test_files_view(snapshot: &dyn Snapshot) -> TestResult<()> {
488+
#[allow(deprecated)]
463489
let num_files_view = snapshot
464490
.files_view(None)?
465491
.map(|f| f.unwrap().path().to_string())

0 commit comments

Comments
 (0)