Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 117 additions & 1 deletion crates/core/src/logstore/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ use uuid::Uuid;

use crate::kernel::log_segment::PathExt;
use crate::kernel::transaction::TransactionError;
use crate::kernel::Action;
use crate::kernel::{Action, CommitInfo};
use crate::protocol::{get_last_checkpoint, ProtocolError};
use crate::{DeltaResult, DeltaTableError};

Expand Down Expand Up @@ -625,6 +625,73 @@ pub async fn get_earliest_version(
Ok(version)
}

/// Get all versions related to the delta table and return a tuple of a vector of versions and a
/// vector of commit infos. We guarantee the length of the two vectors is equal
pub async fn get_all_versions_from(
log_store: LogStoreRef,
start_version: i64,
) -> DeltaResult<(Vec<i64>, Vec<CommitInfo>)> {
if start_version < -1 {
return Err(DeltaTableError::NoStartingVersionOrTimestamp);
}
// list files to find max version
let prefix = Some(log_store.log_path());
let offset_path = commit_uri_from_version(start_version);
let object_store = log_store.object_store(None);
let mut files = object_store.list_with_offset(prefix, &offset_path);
let mut empty_stream = true;

let mut versions = Vec::<i64>::new();
let mut commit_files = Vec::<CommitInfo>::new();

while let Some(obj_meta) = files.next().await {
let obj_meta = obj_meta?;
if obj_meta.location.is_commit_file() {
let commit_log_bytes = object_store.get(&obj_meta.location).await?.bytes().await?;
let reader = BufReader::new(Cursor::new(commit_log_bytes));
for line in reader.lines() {
let action: Action = serde_json::from_str(line?.as_str())?;
if let Action::CommitInfo(commit_info) = action {
if let Some(log_version) =
extract_version_from_filename(obj_meta.location.as_ref())
{
versions.push(log_version);
commit_files.push(commit_info);
}
}
}
}
empty_stream = false;
}

// This implies no files were fetched during list_offset so either the starting_version is the latest
// or starting_version is invalid, so we try to get the first commit entry
if empty_stream {
let obj_meta = object_store
.head(&commit_uri_from_version(0))
.await
.map_err(|head_err| DeltaTableError::not_a_table(head_err.to_string()))?;
let commit_log_bytes = object_store.get(&obj_meta.location).await?.bytes().await?;
let reader = BufReader::new(Cursor::new(commit_log_bytes));
for line in reader.lines() {
let action: Action = serde_json::from_str(line?.as_str())?;
if let Action::CommitInfo(commit_info) = action {
if let Some(log_version) = extract_version_from_filename(obj_meta.location.as_ref())
{
versions.push(log_version);
commit_files.push(commit_info);
}
}
}
}
if versions.len() != commit_files.len() {
return Err(DeltaTableError::Generic(
"Length of versions not equal to length of commit files".to_string(),
));
}
Ok((versions, commit_files))
}

/// Read delta log for a specific version
pub async fn read_commit_entry(
storage: &dyn ObjectStore,
Expand Down Expand Up @@ -883,6 +950,55 @@ pub(crate) mod tests {
.try_collect::<Vec<Path>>()
.await
}

#[tokio::test]
async fn test_get_all_version_should_fail() {
use crate::protocol::SaveMode;
use crate::writer::test_utils::get_delta_schema;
use crate::DeltaOps;

let table_schema = get_delta_schema();

let table = DeltaOps::new_in_memory()
.create()
.with_columns(table_schema.fields().cloned())
.with_save_mode(SaveMode::Ignore)
.await
.unwrap();
assert_eq!(table.version(), 0);
assert_eq!(table.get_schema().unwrap(), &table_schema);

let get_err = get_all_versions_from(table.log_store(), -2).await;
assert!(get_err.is_err());
}

#[tokio::test]
async fn test_get_all_version_from_minus_1() {
use crate::protocol::SaveMode;
use crate::writer::test_utils::get_delta_schema;
use crate::DeltaOps;

let table_schema = get_delta_schema();

let table = DeltaOps::new_in_memory()
.create()
.with_columns(table_schema.fields().cloned())
.with_save_mode(SaveMode::Ignore)
.await
.unwrap();
assert_eq!(table.version(), 0);
assert_eq!(table.get_schema().unwrap(), &table_schema);

let res = get_all_versions_from(table.log_store(), -1).await;
assert!(res.is_ok());
let (versions, commit_infos) = res.unwrap();

assert_eq!(versions.len(), 1);
assert_eq!(commit_infos.len(), 1);

assert_eq!(versions[0], 0);
assert_eq!(commit_infos[0].operation, Some("CREATE TABLE".to_string()));
}
}

#[cfg(all(test, feature = "datafusion"))]
Expand Down
Loading