diff --git a/crates/core/src/delta_datafusion/mod.rs b/crates/core/src/delta_datafusion/mod.rs index a7bd668a54..238b02d445 100644 --- a/crates/core/src/delta_datafusion/mod.rs +++ b/crates/core/src/delta_datafusion/mod.rs @@ -721,7 +721,8 @@ impl<'a> DeltaScanBuilder<'a> { // Should we update datafusion_table_statistics to optionally take the mask? let stats = if let Some(mask) = pruning_mask { let es = self.snapshot.snapshot(); - let pruned_stats = prune_file_statistics(&es.files, mask); + let empty = vec![]; + let pruned_stats = prune_file_statistics(es.files.as_ref().unwrap_or(&empty), mask); LogDataHandler::new(&pruned_stats, es.metadata(), es.schema()).statistics() } else { self.snapshot.datafusion_table_statistics() diff --git a/crates/core/src/kernel/snapshot/mod.rs b/crates/core/src/kernel/snapshot/mod.rs index 04e9c7cc37..5bc5a476b1 100644 --- a/crates/core/src/kernel/snapshot/mod.rs +++ b/crates/core/src/kernel/snapshot/mod.rs @@ -21,6 +21,7 @@ use std::io::{BufRead, BufReader, Cursor}; use ::serde::{Deserialize, Serialize}; use arrow_array::RecordBatch; use delta_kernel::path::{LogPathFileType, ParsedLogPath}; +use either::Either; use futures::stream::BoxStream; use futures::{StreamExt, TryStreamExt}; use object_store::path::Path; @@ -342,8 +343,8 @@ pub struct EagerSnapshot { pub(crate) transactions: Option>, // NOTE: this is a Vec of RecordBatch instead of a single RecordBatch because - // we do not yet enforce a consistent schema across all batches we read from the log. - pub(crate) files: Vec, + // we do not yet enforce a consistent schema across all batches we read from the log. If `None`, that indicates this was created with `config.require_files` set to `false`. + pub(crate) files: Option>, } impl EagerSnapshot { @@ -370,13 +371,13 @@ impl EagerSnapshot { let snapshot = Snapshot::try_new(log_store, config.clone(), version).await?; let files = match config.require_files { - true => { + true => Some( snapshot .files(log_store, &mut visitors)? .try_collect() - .await? - } - false => vec![], + .await?, + ), + false => None, }; let mut sn = Self { @@ -424,20 +425,25 @@ impl EagerSnapshot { .collect::>>()?; Ok(Self { snapshot, - files, + files: Some(files), tracked_actions: Default::default(), transactions: None, }) } /// Update the snapshot to the given version + /// + /// This will return a true value if the [LogStore] was read from. This can be helpful for + /// understanding whether the snapshot loaded data or not pub async fn update( &mut self, log_store: &dyn LogStore, target_version: Option, - ) -> DeltaResult<()> { + ) -> DeltaResult { + // Whether or not data has been read by this function + let mut read_data = false; if Some(self.version()) == target_version { - return Ok(()); + return Ok(read_data); } let new_slice = self @@ -446,7 +452,7 @@ impl EagerSnapshot { .await?; if new_slice.is_none() { - return Ok(()); + return Ok(read_data); } let new_slice = new_slice.unwrap(); @@ -456,14 +462,21 @@ impl EagerSnapshot { .flat_map(get_visitor) .collect::>(); + // If files is `None` then this can exit early because the snapshot has intentionally been + // loaded _without_ files + if self.files.is_none() { + self.process_visitors(visitors)?; + return Ok(read_data); + } + let mut schema_actions: HashSet<_> = visitors.iter().flat_map(|v| v.required_actions()).collect(); let files = std::mem::take(&mut self.files); schema_actions.insert(ActionType::Add); let checkpoint_stream = if new_slice.checkpoint_files.is_empty() { - // NOTE: we don't need to add the visitor relevant data here, as it is repüresented in the state already - futures::stream::iter(files.into_iter().map(Ok)).boxed() + // NOTE: we don't need to add the visitor relevant data here, as it is represented in the state already + futures::stream::iter(files.unwrap_or_default().into_iter().map(Ok)).boxed() } else { let read_schema = StructType::new(schema_actions.iter().map(|a| a.schema_field().clone())); @@ -477,17 +490,17 @@ impl EagerSnapshot { let log_stream = new_slice.commit_stream(log_store, &read_schema, &self.snapshot.config)?; let mapper = LogMapper::try_new(&self.snapshot, None)?; - let files = ReplayStream::try_new(log_stream, checkpoint_stream, &self.snapshot, &mut visitors)? .map(|batch| batch.and_then(|b| mapper.map_batch(b))) .try_collect() .await?; - self.files = files; + self.files = Some(files); + read_data = true; self.process_visitors(visitors)?; - Ok(()) + Ok(read_data) } /// Get the underlying snapshot @@ -540,17 +553,31 @@ impl EagerSnapshot { /// Get a [`LogDataHandler`] for the snapshot to inspect the currently loaded state of the log. pub fn log_data(&self) -> LogDataHandler<'_> { - LogDataHandler::new(&self.files, self.metadata(), self.schema()) + static EMPTY: Vec = vec![]; + LogDataHandler::new( + self.files.as_ref().unwrap_or(&EMPTY), + self.metadata(), + self.schema(), + ) + } + + /// Iterate over tracked `&RecordBatch`, if any. + fn files_iter(&self) -> impl Iterator { + if let Some(ref files) = self.files { + Either::Left(files.iter()) + } else { + Either::Right(std::iter::empty()) + } } /// Get the number of files in the snapshot pub fn files_count(&self) -> usize { - self.files.iter().map(|f| f.num_rows()).sum() + self.files_iter().map(|f| f.num_rows()).sum() } /// Get the files in the snapshot pub fn file_actions(&self) -> DeltaResult + '_> { - Ok(self.files.iter().flat_map(|b| read_adds(b)).flatten()) + Ok(self.files_iter().flat_map(|b| read_adds(b)).flatten()) } /// Get a file action iterator for the given version @@ -560,7 +587,7 @@ impl EagerSnapshot { /// Get an iterator for the CDC files added in this version pub fn cdc_files(&self) -> DeltaResult + '_> { - Ok(self.files.iter().flat_map(|b| read_cdf_adds(b)).flatten()) + Ok(self.files_iter().flat_map(|b| read_cdf_adds(b)).flatten()) } /// Iterate over all latest app transactions @@ -634,15 +661,18 @@ impl EagerSnapshot { LogMapper::try_new(&self.snapshot, None)? }; - self.files = files - .into_iter() - .chain( - self.files - .iter() - .flat_map(|batch| scanner.process_files_batch(batch, false)), - ) - .map(|b| mapper.map_batch(b)) - .collect::>>()?; + if self.files.is_some() { + self.files = Some( + files + .into_iter() + .chain( + self.files_iter() + .flat_map(|batch| scanner.process_files_batch(batch, false)), + ) + .map(|b| mapper.map_batch(b)) + .collect::>>()?, + ); + } if let Some(metadata) = metadata { self.snapshot.metadata = metadata; diff --git a/crates/core/src/kernel/snapshot/serde.rs b/crates/core/src/kernel/snapshot/serde.rs index 84a0f135fb..f4644b25dc 100644 --- a/crates/core/src/kernel/snapshot/serde.rs +++ b/crates/core/src/kernel/snapshot/serde.rs @@ -128,7 +128,7 @@ impl Serialize for EagerSnapshot { seq.serialize_element(&self.snapshot)?; seq.serialize_element(&self.tracked_actions)?; seq.serialize_element(&self.transactions)?; - for batch in self.files.iter() { + for batch in self.files_iter() { let mut buffer = vec![]; let mut writer = FileWriter::try_new(&mut buffer, batch.schema().as_ref()) .map_err(serde::ser::Error::custom)?; @@ -176,7 +176,7 @@ impl<'de> Visitor<'de> for EagerSnapshotVisitor { } Ok(EagerSnapshot { snapshot, - files, + files: Some(files), tracked_actions, transactions, }) diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs index 2c189fa700..71cf32fad6 100644 --- a/crates/core/src/lib.rs +++ b/crates/core/src/lib.rs @@ -231,9 +231,11 @@ mod tests { let table_newest_version = crate::open_table(path).await.unwrap(); let mut table_to_update = crate::open_table_with_version(path, 0).await.unwrap(); // calling update several times should not produce any duplicates - table_to_update.update().await.unwrap(); - table_to_update.update().await.unwrap(); - table_to_update.update().await.unwrap(); + // The first call should have read some data + assert_eq!(true, table_to_update.update().await.unwrap()); + // Subsequent calls should not + assert_eq!(false, table_to_update.update().await.unwrap()); + assert_eq!(false, table_to_update.update().await.unwrap()); assert_eq!( table_newest_version.get_files_iter().unwrap().collect_vec(), diff --git a/crates/core/src/table/builder.rs b/crates/core/src/table/builder.rs index 4ce5c6dddb..21c49082d0 100644 --- a/crates/core/src/table/builder.rs +++ b/crates/core/src/table/builder.rs @@ -290,7 +290,7 @@ impl DeltaTableBuilder { DeltaVersion::Newest => table.load().await?, DeltaVersion::Version(v) => table.load_version(v).await?, DeltaVersion::Timestamp(ts) => table.load_with_datetime(ts).await?, - } + }; Ok(table) } } diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs index 9554ac5b98..4f15d86151 100644 --- a/crates/core/src/table/mod.rs +++ b/crates/core/src/table/mod.rs @@ -195,13 +195,13 @@ impl DeltaTable { } /// Load DeltaTable with data from latest checkpoint - pub async fn load(&mut self) -> Result<(), DeltaTableError> { + pub async fn load(&mut self) -> Result { self.update_incremental(None).await } /// Updates the DeltaTable to the most recent state committed to the transaction log by /// loading the last checkpoint and incrementally applying each version since. - pub async fn update(&mut self) -> Result<(), DeltaTableError> { + pub async fn update(&mut self) -> Result { self.update_incremental(None).await } @@ -210,7 +210,7 @@ impl DeltaTable { pub async fn update_incremental( &mut self, max_version: Option, - ) -> Result<(), DeltaTableError> { + ) -> Result { match self.state.as_mut() { Some(state) => state.update(&self.log_store, max_version).await, _ => { @@ -218,13 +218,13 @@ impl DeltaTable { DeltaTableState::try_new(&self.log_store, self.config.clone(), max_version) .await?; self.state = Some(state); - Ok(()) + Ok(true) } } } /// Loads the DeltaTable state for the given version. - pub async fn load_version(&mut self, version: i64) -> Result<(), DeltaTableError> { + pub async fn load_version(&mut self, version: i64) -> Result { if let Some(snapshot) = &self.state { if snapshot.version() > version { self.state = None; @@ -379,7 +379,7 @@ impl DeltaTable { pub async fn load_with_datetime( &mut self, datetime: DateTime, - ) -> Result<(), DeltaTableError> { + ) -> Result { let mut min_version: i64 = -1; let log_store = self.log_store(); let prefix = log_store.log_path(); diff --git a/crates/core/src/table/state.rs b/crates/core/src/table/state.rs index 132280054e..ea87d61bfc 100644 --- a/crates/core/src/table/state.rs +++ b/crates/core/src/table/state.rs @@ -204,9 +204,8 @@ impl DeltaTableState { &mut self, log_store: &dyn LogStore, version: Option, - ) -> Result<(), DeltaTableError> { - self.snapshot.update(log_store, version).await?; - Ok(()) + ) -> Result { + self.snapshot.update(log_store, version).await } /// Obtain Add actions for files that match the filter diff --git a/python/src/lib.rs b/python/src/lib.rs index 85deb8c426..5c987f5b43 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -306,7 +306,7 @@ impl RawDeltaTable { /// Load the internal [RawDeltaTable] with the table state from the specified `version` /// /// This will acquire the internal lock since it is a mutating operation! - pub fn load_version(&self, py: Python, version: i64) -> PyResult<()> { + pub fn load_version(&self, py: Python, version: i64) -> PyResult { py.allow_threads(|| { #[allow(clippy::await_holding_lock)] rt().block_on(async { @@ -359,7 +359,7 @@ impl RawDeltaTable { }) } - pub fn load_with_datetime(&self, py: Python, ds: &str) -> PyResult<()> { + pub fn load_with_datetime(&self, py: Python, ds: &str) -> PyResult { py.allow_threads(|| { let datetime = DateTime::::from(DateTime::::parse_from_rfc3339(ds).map_err( @@ -1014,7 +1014,7 @@ impl RawDeltaTable { .collect()) } - pub fn update_incremental(&self) -> PyResult<()> { + pub fn update_incremental(&self) -> PyResult { #[allow(clippy::await_holding_lock)] #[allow(deprecated)] Ok(rt()