diff --git a/crates/core/src/logstore/mod.rs b/crates/core/src/logstore/mod.rs index 1a8fb26be1..14131d4b8c 100644 --- a/crates/core/src/logstore/mod.rs +++ b/crates/core/src/logstore/mod.rs @@ -69,7 +69,7 @@ use uuid::Uuid; use crate::kernel::log_segment::PathExt; use crate::kernel::transaction::TransactionError; -use crate::kernel::Action; +use crate::kernel::{Action, CommitInfo}; use crate::protocol::{get_last_checkpoint, ProtocolError}; use crate::{DeltaResult, DeltaTableError}; @@ -625,6 +625,73 @@ pub async fn get_earliest_version( Ok(version) } +/// Get all versions related to the delta table and return a tuple of a vector of versions and a +/// vector of commit infos. We guarantee the length of the two vectors is equal +pub async fn get_all_versions_from( + log_store: LogStoreRef, + start_version: i64, +) -> DeltaResult<(Vec, Vec)> { + if start_version < -1 { + return Err(DeltaTableError::NoStartingVersionOrTimestamp); + } + // list files to find max version + let prefix = Some(log_store.log_path()); + let offset_path = commit_uri_from_version(start_version); + let object_store = log_store.object_store(None); + let mut files = object_store.list_with_offset(prefix, &offset_path); + let mut empty_stream = true; + + let mut versions = Vec::::new(); + let mut commit_files = Vec::::new(); + + while let Some(obj_meta) = files.next().await { + let obj_meta = obj_meta?; + if obj_meta.location.is_commit_file() { + let commit_log_bytes = object_store.get(&obj_meta.location).await?.bytes().await?; + let reader = BufReader::new(Cursor::new(commit_log_bytes)); + for line in reader.lines() { + let action: Action = serde_json::from_str(line?.as_str())?; + if let Action::CommitInfo(commit_info) = action { + if let Some(log_version) = + extract_version_from_filename(obj_meta.location.as_ref()) + { + versions.push(log_version); + commit_files.push(commit_info); + } + } + } + } + empty_stream = false; + } + + // This implies no files were fetched during list_offset so either the starting_version is the latest + // or starting_version is invalid, so we try to get the first commit entry + if empty_stream { + let obj_meta = object_store + .head(&commit_uri_from_version(0)) + .await + .map_err(|head_err| DeltaTableError::not_a_table(head_err.to_string()))?; + let commit_log_bytes = object_store.get(&obj_meta.location).await?.bytes().await?; + let reader = BufReader::new(Cursor::new(commit_log_bytes)); + for line in reader.lines() { + let action: Action = serde_json::from_str(line?.as_str())?; + if let Action::CommitInfo(commit_info) = action { + if let Some(log_version) = extract_version_from_filename(obj_meta.location.as_ref()) + { + versions.push(log_version); + commit_files.push(commit_info); + } + } + } + } + if versions.len() != commit_files.len() { + return Err(DeltaTableError::Generic( + "Length of versions not equal to length of commit files".to_string(), + )); + } + Ok((versions, commit_files)) +} + /// Read delta log for a specific version pub async fn read_commit_entry( storage: &dyn ObjectStore, @@ -883,6 +950,55 @@ pub(crate) mod tests { .try_collect::>() .await } + + #[tokio::test] + async fn test_get_all_version_should_fail() { + use crate::protocol::SaveMode; + use crate::writer::test_utils::get_delta_schema; + use crate::DeltaOps; + + let table_schema = get_delta_schema(); + + let table = DeltaOps::new_in_memory() + .create() + .with_columns(table_schema.fields().cloned()) + .with_save_mode(SaveMode::Ignore) + .await + .unwrap(); + assert_eq!(table.version(), 0); + assert_eq!(table.get_schema().unwrap(), &table_schema); + + let get_err = get_all_versions_from(table.log_store(), -2).await; + assert!(get_err.is_err()); + } + + #[tokio::test] + async fn test_get_all_version_from_minus_1() { + use crate::protocol::SaveMode; + use crate::writer::test_utils::get_delta_schema; + use crate::DeltaOps; + + let table_schema = get_delta_schema(); + + let table = DeltaOps::new_in_memory() + .create() + .with_columns(table_schema.fields().cloned()) + .with_save_mode(SaveMode::Ignore) + .await + .unwrap(); + assert_eq!(table.version(), 0); + assert_eq!(table.get_schema().unwrap(), &table_schema); + + let res = get_all_versions_from(table.log_store(), -1).await; + assert!(res.is_ok()); + let (versions, commit_infos) = res.unwrap(); + + assert_eq!(versions.len(), 1); + assert_eq!(commit_infos.len(), 1); + + assert_eq!(versions[0], 0); + assert_eq!(commit_infos[0].operation, Some("CREATE TABLE".to_string())); + } } #[cfg(all(test, feature = "datafusion"))]