Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion crates/core/src/delta_datafusion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -721,7 +721,8 @@ impl<'a> DeltaScanBuilder<'a> {
// Should we update datafusion_table_statistics to optionally take the mask?
let stats = if let Some(mask) = pruning_mask {
let es = self.snapshot.snapshot();
let pruned_stats = prune_file_statistics(&es.files, mask);
let empty = vec![];
let pruned_stats = prune_file_statistics(es.files.as_ref().unwrap_or(&empty), mask);
LogDataHandler::new(&pruned_stats, es.metadata(), es.schema()).statistics()
} else {
self.snapshot.datafusion_table_statistics()
Expand Down
86 changes: 58 additions & 28 deletions crates/core/src/kernel/snapshot/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use std::io::{BufRead, BufReader, Cursor};
use ::serde::{Deserialize, Serialize};
use arrow_array::RecordBatch;
use delta_kernel::path::{LogPathFileType, ParsedLogPath};
use either::Either;
use futures::stream::BoxStream;
use futures::{StreamExt, TryStreamExt};
use object_store::path::Path;
Expand Down Expand Up @@ -342,8 +343,8 @@ pub struct EagerSnapshot {
pub(crate) transactions: Option<HashMap<String, Transaction>>,

// NOTE: this is a Vec of RecordBatch instead of a single RecordBatch because
// we do not yet enforce a consistent schema across all batches we read from the log.
pub(crate) files: Vec<RecordBatch>,
// we do not yet enforce a consistent schema across all batches we read from the log. If `None`, that indicates this was created with `config.require_files` set to `false`.
pub(crate) files: Option<Vec<RecordBatch>>,
}

impl EagerSnapshot {
Expand All @@ -370,13 +371,13 @@ impl EagerSnapshot {
let snapshot = Snapshot::try_new(log_store, config.clone(), version).await?;

let files = match config.require_files {
true => {
true => Some(
snapshot
.files(log_store, &mut visitors)?
.try_collect()
.await?
}
false => vec![],
.await?,
),
false => None,
};

let mut sn = Self {
Expand Down Expand Up @@ -424,20 +425,25 @@ impl EagerSnapshot {
.collect::<DeltaResult<Vec<_>>>()?;
Ok(Self {
snapshot,
files,
files: Some(files),
tracked_actions: Default::default(),
transactions: None,
})
}

/// Update the snapshot to the given version
///
/// This will return a true value if the [LogStore] was read from. This can be helpful for
/// understanding whether the snapshot loaded data or not
pub async fn update(
&mut self,
log_store: &dyn LogStore,
target_version: Option<i64>,
) -> DeltaResult<()> {
) -> DeltaResult<bool> {
// Whether or not data has been read by this function
let mut read_data = false;
if Some(self.version()) == target_version {
return Ok(());
return Ok(read_data);
}

let new_slice = self
Expand All @@ -446,7 +452,7 @@ impl EagerSnapshot {
.await?;

if new_slice.is_none() {
return Ok(());
return Ok(read_data);
}
let new_slice = new_slice.unwrap();

Expand All @@ -456,14 +462,21 @@ impl EagerSnapshot {
.flat_map(get_visitor)
.collect::<Vec<_>>();

// If files is `None` then this can exit early because the snapshot has intentionally been
// loaded _without_ files
if self.files.is_none() {
self.process_visitors(visitors)?;
return Ok(read_data);
}
Comment on lines +467 to +470
Copy link
Collaborator

@roeap roeap Jul 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this may actually break things in unexpected ways. Specifically process_visitors does nothing without doing a replay. Its main use case right now is idempotent writes (Txn actions ...).

However also not sure how this would work with require_files right now, we may just have an existing bug ...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@roeap anything is possible so sure it might break things but we have no tests to prove otherwise 🤷


let mut schema_actions: HashSet<_> =
visitors.iter().flat_map(|v| v.required_actions()).collect();
let files = std::mem::take(&mut self.files);

schema_actions.insert(ActionType::Add);
let checkpoint_stream = if new_slice.checkpoint_files.is_empty() {
// NOTE: we don't need to add the visitor relevant data here, as it is repüresented in the state already
futures::stream::iter(files.into_iter().map(Ok)).boxed()
// NOTE: we don't need to add the visitor relevant data here, as it is represented in the state already
futures::stream::iter(files.unwrap_or_default().into_iter().map(Ok)).boxed()
} else {
let read_schema =
StructType::new(schema_actions.iter().map(|a| a.schema_field().clone()));
Expand All @@ -477,17 +490,17 @@ impl EagerSnapshot {
let log_stream = new_slice.commit_stream(log_store, &read_schema, &self.snapshot.config)?;

let mapper = LogMapper::try_new(&self.snapshot, None)?;

let files =
ReplayStream::try_new(log_stream, checkpoint_stream, &self.snapshot, &mut visitors)?
.map(|batch| batch.and_then(|b| mapper.map_batch(b)))
.try_collect()
.await?;

self.files = files;
self.files = Some(files);
read_data = true;
self.process_visitors(visitors)?;

Ok(())
Ok(read_data)
}

/// Get the underlying snapshot
Expand Down Expand Up @@ -540,17 +553,31 @@ impl EagerSnapshot {

/// Get a [`LogDataHandler`] for the snapshot to inspect the currently loaded state of the log.
pub fn log_data(&self) -> LogDataHandler<'_> {
LogDataHandler::new(&self.files, self.metadata(), self.schema())
static EMPTY: Vec<RecordBatch> = vec![];
LogDataHandler::new(
self.files.as_ref().unwrap_or(&EMPTY),
self.metadata(),
self.schema(),
)
}

/// Iterate over tracked `&RecordBatch`, if any.
fn files_iter(&self) -> impl Iterator<Item = &RecordBatch> {
if let Some(ref files) = self.files {
Either::Left(files.iter())
} else {
Either::Right(std::iter::empty())
}
}

/// Get the number of files in the snapshot
pub fn files_count(&self) -> usize {
self.files.iter().map(|f| f.num_rows()).sum()
self.files_iter().map(|f| f.num_rows()).sum()
}

/// Get the files in the snapshot
pub fn file_actions(&self) -> DeltaResult<impl Iterator<Item = Add> + '_> {
Ok(self.files.iter().flat_map(|b| read_adds(b)).flatten())
Ok(self.files_iter().flat_map(|b| read_adds(b)).flatten())
}

/// Get a file action iterator for the given version
Expand All @@ -560,7 +587,7 @@ impl EagerSnapshot {

/// Get an iterator for the CDC files added in this version
pub fn cdc_files(&self) -> DeltaResult<impl Iterator<Item = AddCDCFile> + '_> {
Ok(self.files.iter().flat_map(|b| read_cdf_adds(b)).flatten())
Ok(self.files_iter().flat_map(|b| read_cdf_adds(b)).flatten())
}

/// Iterate over all latest app transactions
Expand Down Expand Up @@ -634,15 +661,18 @@ impl EagerSnapshot {
LogMapper::try_new(&self.snapshot, None)?
};

self.files = files
.into_iter()
.chain(
self.files
.iter()
.flat_map(|batch| scanner.process_files_batch(batch, false)),
)
.map(|b| mapper.map_batch(b))
.collect::<DeltaResult<Vec<_>>>()?;
if self.files.is_some() {
self.files = Some(
files
.into_iter()
.chain(
self.files_iter()
.flat_map(|batch| scanner.process_files_batch(batch, false)),
)
.map(|b| mapper.map_batch(b))
.collect::<DeltaResult<Vec<_>>>()?,
);
}

if let Some(metadata) = metadata {
self.snapshot.metadata = metadata;
Expand Down
4 changes: 2 additions & 2 deletions crates/core/src/kernel/snapshot/serde.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ impl Serialize for EagerSnapshot {
seq.serialize_element(&self.snapshot)?;
seq.serialize_element(&self.tracked_actions)?;
seq.serialize_element(&self.transactions)?;
for batch in self.files.iter() {
for batch in self.files_iter() {
let mut buffer = vec![];
let mut writer = FileWriter::try_new(&mut buffer, batch.schema().as_ref())
.map_err(serde::ser::Error::custom)?;
Expand Down Expand Up @@ -176,7 +176,7 @@ impl<'de> Visitor<'de> for EagerSnapshotVisitor {
}
Ok(EagerSnapshot {
snapshot,
files,
files: Some(files),
tracked_actions,
transactions,
})
Expand Down
8 changes: 5 additions & 3 deletions crates/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,9 +231,11 @@ mod tests {
let table_newest_version = crate::open_table(path).await.unwrap();
let mut table_to_update = crate::open_table_with_version(path, 0).await.unwrap();
// calling update several times should not produce any duplicates
table_to_update.update().await.unwrap();
table_to_update.update().await.unwrap();
table_to_update.update().await.unwrap();
// The first call should have read some data
assert_eq!(true, table_to_update.update().await.unwrap());
// Subsequent calls should not
assert_eq!(false, table_to_update.update().await.unwrap());
assert_eq!(false, table_to_update.update().await.unwrap());

assert_eq!(
table_newest_version.get_files_iter().unwrap().collect_vec(),
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/table/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ impl DeltaTableBuilder {
DeltaVersion::Newest => table.load().await?,
DeltaVersion::Version(v) => table.load_version(v).await?,
DeltaVersion::Timestamp(ts) => table.load_with_datetime(ts).await?,
}
};
Ok(table)
}
}
Expand Down
12 changes: 6 additions & 6 deletions crates/core/src/table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,13 +195,13 @@ impl DeltaTable {
}

/// Load DeltaTable with data from latest checkpoint
pub async fn load(&mut self) -> Result<(), DeltaTableError> {
pub async fn load(&mut self) -> Result<bool, DeltaTableError> {
self.update_incremental(None).await
}

/// Updates the DeltaTable to the most recent state committed to the transaction log by
/// loading the last checkpoint and incrementally applying each version since.
pub async fn update(&mut self) -> Result<(), DeltaTableError> {
pub async fn update(&mut self) -> Result<bool, DeltaTableError> {
self.update_incremental(None).await
}

Expand All @@ -210,21 +210,21 @@ impl DeltaTable {
pub async fn update_incremental(
&mut self,
max_version: Option<i64>,
) -> Result<(), DeltaTableError> {
) -> Result<bool, DeltaTableError> {
match self.state.as_mut() {
Some(state) => state.update(&self.log_store, max_version).await,
_ => {
let state =
DeltaTableState::try_new(&self.log_store, self.config.clone(), max_version)
.await?;
self.state = Some(state);
Ok(())
Ok(true)
}
}
}

/// Loads the DeltaTable state for the given version.
pub async fn load_version(&mut self, version: i64) -> Result<(), DeltaTableError> {
pub async fn load_version(&mut self, version: i64) -> Result<bool, DeltaTableError> {
if let Some(snapshot) = &self.state {
if snapshot.version() > version {
self.state = None;
Expand Down Expand Up @@ -379,7 +379,7 @@ impl DeltaTable {
pub async fn load_with_datetime(
&mut self,
datetime: DateTime<Utc>,
) -> Result<(), DeltaTableError> {
) -> Result<bool, DeltaTableError> {
let mut min_version: i64 = -1;
let log_store = self.log_store();
let prefix = log_store.log_path();
Expand Down
5 changes: 2 additions & 3 deletions crates/core/src/table/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,9 +204,8 @@ impl DeltaTableState {
&mut self,
log_store: &dyn LogStore,
version: Option<i64>,
) -> Result<(), DeltaTableError> {
self.snapshot.update(log_store, version).await?;
Ok(())
) -> Result<bool, DeltaTableError> {
self.snapshot.update(log_store, version).await
}

/// Obtain Add actions for files that match the filter
Expand Down
6 changes: 3 additions & 3 deletions python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ impl RawDeltaTable {
/// Load the internal [RawDeltaTable] with the table state from the specified `version`
///
/// This will acquire the internal lock since it is a mutating operation!
pub fn load_version(&self, py: Python, version: i64) -> PyResult<()> {
pub fn load_version(&self, py: Python, version: i64) -> PyResult<bool> {
py.allow_threads(|| {
#[allow(clippy::await_holding_lock)]
rt().block_on(async {
Expand Down Expand Up @@ -359,7 +359,7 @@ impl RawDeltaTable {
})
}

pub fn load_with_datetime(&self, py: Python, ds: &str) -> PyResult<()> {
pub fn load_with_datetime(&self, py: Python, ds: &str) -> PyResult<bool> {
py.allow_threads(|| {
let datetime =
DateTime::<Utc>::from(DateTime::<FixedOffset>::parse_from_rfc3339(ds).map_err(
Expand Down Expand Up @@ -1014,7 +1014,7 @@ impl RawDeltaTable {
.collect())
}

pub fn update_incremental(&self) -> PyResult<()> {
pub fn update_incremental(&self) -> PyResult<bool> {
#[allow(clippy::await_holding_lock)]
#[allow(deprecated)]
Ok(rt()
Expand Down
Loading