Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/core/src/operations/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ impl DeltaOps {
/// Vacuum stale files from delta table
#[must_use]
pub fn vacuum(self) -> VacuumBuilder {
VacuumBuilder::new(self.0.log_store, self.0.state.unwrap())
VacuumBuilder::new(self.0.log_store, self.0.state.unwrap().snapshot)
}

/// Audit active files with files present on the filesystem
Expand Down
174 changes: 99 additions & 75 deletions crates/core/src/operations/vacuum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use tracing::log::*;
use super::{CustomExecuteHandler, Operation};
use crate::errors::{DeltaResult, DeltaTableError};
use crate::kernel::transaction::{CommitBuilder, CommitProperties};
use crate::kernel::EagerSnapshot;
use crate::logstore::{LogStore, LogStoreRef};
use crate::protocol::DeltaOperation;
use crate::table::config::TablePropertiesExt as _;
Expand Down Expand Up @@ -93,7 +94,7 @@ pub enum VacuumMode {
/// See this module's documentation for more information
pub struct VacuumBuilder {
/// A snapshot of the to-be-vacuumed table's state
snapshot: DeltaTableState,
snapshot: EagerSnapshot,
/// Delta object store for handling data files
log_store: LogStoreRef,
/// Period of stale files allowed.
Expand Down Expand Up @@ -154,7 +155,7 @@ pub struct VacuumEndOperationMetrics {
/// Methods to specify various vacuum options and to execute the operation
impl VacuumBuilder {
/// Create a new [`VacuumBuilder`]
pub fn new(log_store: LogStoreRef, snapshot: DeltaTableState) -> Self {
pub fn new(log_store: LogStoreRef, snapshot: EagerSnapshot) -> Self {
VacuumBuilder {
snapshot,
log_store,
Expand Down Expand Up @@ -228,7 +229,7 @@ impl VacuumBuilder {

let min_retention = Duration::milliseconds(
self.snapshot
.table_config()
.table_properties()
.deleted_file_retention_duration()
.as_millis() as i64,
);
Expand Down Expand Up @@ -280,7 +281,12 @@ impl VacuumBuilder {
&self.log_store,
)
.await?;
let valid_files = self.snapshot.file_paths_iter().collect::<HashSet<Path>>();
let valid_files: HashSet<_> = self
.snapshot
.files(self.log_store.as_ref(), None)
.map_ok(|f| f.object_store_path())
.try_collect()
.await?;

let mut files_to_delete = vec![];
let mut file_sizes = vec![];
Expand Down Expand Up @@ -346,7 +352,7 @@ impl std::future::IntoFuture for VacuumBuilder {
let plan = this.create_vacuum_plan().await?;
if this.dry_run {
return Ok((
DeltaTable::new_with_state(this.log_store, this.snapshot),
DeltaTable::new_with_state(this.log_store, DeltaTableState::new(this.snapshot)),
VacuumMetrics {
files_deleted: plan.files_to_delete.iter().map(|f| f.to_string()).collect(),
dry_run: true,
Expand Down Expand Up @@ -374,7 +380,7 @@ impl std::future::IntoFuture for VacuumBuilder {
metrics,
),
None => (
DeltaTable::new_with_state(this.log_store, this.snapshot),
DeltaTable::new_with_state(this.log_store, DeltaTableState::new(this.snapshot)),
Default::default(),
),
})
Expand All @@ -401,7 +407,7 @@ impl VacuumPlan {
pub async fn execute(
self,
store: LogStoreRef,
snapshot: &DeltaTableState,
snapshot: &EagerSnapshot,
mut commit_properties: CommitProperties,
operation_id: uuid::Uuid,
handle: Option<Arc<dyn CustomExecuteHandler>>,
Expand Down Expand Up @@ -499,14 +505,15 @@ fn is_hidden_directory(partition_columns: &[String], path: &Path) -> Result<bool

/// List files no longer referenced by a Delta table and are older than the retention threshold.
async fn get_stale_files(
snapshot: &DeltaTableState,
snapshot: &EagerSnapshot,
retention_period: Duration,
now_timestamp_millis: i64,
store: &dyn LogStore,
) -> DeltaResult<HashSet<String>> {
let tombstone_retention_timestamp = now_timestamp_millis - retention_period.num_milliseconds();
snapshot
.all_tombstones(store)
.snapshot()
.tombstones(store)
.try_filter(|tombstone| {
// if the file has a creation time before the `tombstone_retention_timestamp`
// then it's considered as a stale file
Expand Down Expand Up @@ -534,22 +541,24 @@ mod tests {
Url::from_directory_path(std::fs::canonicalize(table_path).unwrap()).unwrap();
let table = open_table(table_uri).await?;

let (_table, result) = VacuumBuilder::new(table.log_store(), table.snapshot()?.clone())
.with_retention_period(Duration::hours(0))
.with_dry_run(true)
.with_mode(VacuumMode::Lite)
.with_enforce_retention_duration(false)
.await?;
let (_table, result) =
VacuumBuilder::new(table.log_store(), table.snapshot()?.snapshot.clone())
.with_retention_period(Duration::hours(0))
.with_dry_run(true)
.with_mode(VacuumMode::Lite)
.with_enforce_retention_duration(false)
.await?;
// When running lite, this table with superfluous parquet files should not have anything to
// delete
assert!(result.files_deleted.is_empty());

let (_table, result) = VacuumBuilder::new(table.log_store(), table.snapshot()?.clone())
.with_retention_period(Duration::hours(0))
.with_dry_run(true)
.with_mode(VacuumMode::Full)
.with_enforce_retention_duration(false)
.await?;
let (_table, result) =
VacuumBuilder::new(table.log_store(), table.snapshot()?.snapshot.clone())
.with_retention_period(Duration::hours(0))
.with_dry_run(true)
.with_mode(VacuumMode::Full)
.with_enforce_retention_duration(false)
.await?;
let mut files_deleted = result.files_deleted.clone();
files_deleted.sort();
// When running with full, these superfluous parquet files which are not actually
Expand Down Expand Up @@ -577,24 +586,26 @@ mod tests {
let versions_to_keep = vec![3];

// First, vacuum without keeping any particular versions
let (_table, result) = VacuumBuilder::new(table.log_store(), table.snapshot()?.clone())
.with_retention_period(Duration::hours(0))
.with_dry_run(true)
.with_mode(VacuumMode::Full)
.with_enforce_retention_duration(false)
.await?;
let (_table, result) =
VacuumBuilder::new(table.log_store(), table.snapshot()?.snapshot.clone())
.with_retention_period(Duration::hours(0))
.with_dry_run(true)
.with_mode(VacuumMode::Full)
.with_enforce_retention_duration(false)
.await?;

// Our simple_table has 32 data files in it which could be vacuumed.
assert_eq!(32, result.files_deleted.len());

// Next, vacuum with specific versions retained
let (_table, result) = VacuumBuilder::new(table.log_store(), table.snapshot()?.clone())
.with_retention_period(Duration::hours(0))
.with_keep_versions(&versions_to_keep)
.with_dry_run(true)
.with_mode(VacuumMode::Full)
.with_enforce_retention_duration(false)
.await?;
let (_table, result) =
VacuumBuilder::new(table.log_store(), table.snapshot()?.snapshot.clone())
.with_retention_period(Duration::hours(0))
.with_keep_versions(&versions_to_keep)
.with_dry_run(true)
.with_mode(VacuumMode::Full)
.with_enforce_retention_duration(false)
.await?;
assert_ne!(
32,
result.files_deleted.len(),
Expand All @@ -614,24 +625,26 @@ mod tests {
let versions_to_keep = vec![2, 3];

// First, vacuum without keeping any particular versions
let (_table, result) = VacuumBuilder::new(table.log_store(), table.snapshot()?.clone())
.with_retention_period(Duration::hours(0))
.with_dry_run(true)
.with_mode(VacuumMode::Full)
.with_enforce_retention_duration(false)
.await?;
let (_table, result) =
VacuumBuilder::new(table.log_store(), table.snapshot()?.snapshot.clone())
.with_retention_period(Duration::hours(0))
.with_dry_run(true)
.with_mode(VacuumMode::Full)
.with_enforce_retention_duration(false)
.await?;

// Our simple_table has 32 data files in it which could be vacuumed.
assert_eq!(32, result.files_deleted.len());

// Next, vacuum with specific versions retained
let (_table, result) = VacuumBuilder::new(table.log_store(), table.snapshot()?.clone())
.with_retention_period(Duration::hours(0))
.with_keep_versions(&versions_to_keep)
.with_dry_run(true)
.with_mode(VacuumMode::Full)
.with_enforce_retention_duration(false)
.await?;
let (_table, result) =
VacuumBuilder::new(table.log_store(), table.snapshot()?.snapshot.clone())
.with_retention_period(Duration::hours(0))
.with_keep_versions(&versions_to_keep)
.with_dry_run(true)
.with_mode(VacuumMode::Full)
.with_enforce_retention_duration(false)
.await?;
assert_ne!(
32,
result.files_deleted.len(),
Expand Down Expand Up @@ -690,14 +703,16 @@ mod tests {
.unwrap();
table.load().await.unwrap();

let (mut table, result) =
VacuumBuilder::new(table.log_store(), table.snapshot().unwrap().clone())
.with_retention_period(Duration::hours(0))
.with_keep_versions(&[2, 3])
.with_mode(VacuumMode::Full)
.with_enforce_retention_duration(false)
.await
.unwrap();
let (mut table, result) = VacuumBuilder::new(
table.log_store(),
table.snapshot().unwrap().snapshot.clone(),
)
.with_retention_period(Duration::hours(0))
.with_keep_versions(&[2, 3])
.with_mode(VacuumMode::Full)
.with_enforce_retention_duration(false)
.await
.unwrap();
// Our simple_table has 32 data files in it, and we shouldn't have deleted them all!
assert_ne!(32, result.files_deleted.len());

Expand All @@ -724,10 +739,13 @@ mod tests {
Url::from_directory_path(std::fs::canonicalize(table_path).unwrap()).unwrap();
let table = open_table(table_uri).await.unwrap();

let result = VacuumBuilder::new(table.log_store(), table.snapshot().unwrap().clone())
.with_retention_period(Duration::hours(1))
.with_dry_run(true)
.await;
let result = VacuumBuilder::new(
table.log_store(),
table.snapshot().unwrap().snapshot.clone(),
)
.with_retention_period(Duration::hours(1))
.with_dry_run(true)
.await;

assert!(result.is_err());

Expand All @@ -736,23 +754,27 @@ mod tests {
Url::from_directory_path(std::fs::canonicalize(table_path).unwrap()).unwrap();
let table = open_table(table_uri).await.unwrap();

let (table, result) =
VacuumBuilder::new(table.log_store(), table.snapshot().unwrap().clone())
.with_retention_period(Duration::hours(0))
.with_dry_run(true)
.with_enforce_retention_duration(false)
.await?;
let (table, result) = VacuumBuilder::new(
table.log_store(),
table.snapshot().unwrap().snapshot.clone(),
)
.with_retention_period(Duration::hours(0))
.with_dry_run(true)
.with_enforce_retention_duration(false)
.await?;
// do not enforce retention duration check with 0 hour will purge all files
assert_eq!(
result.files_deleted,
vec!["part-00001-911a94a2-43f6-4acb-8620-5e68c2654989-c000.snappy.parquet"]
);

let (table, result) =
VacuumBuilder::new(table.log_store(), table.snapshot().unwrap().clone())
.with_retention_period(Duration::hours(169))
.with_dry_run(true)
.await?;
let (table, result) = VacuumBuilder::new(
table.log_store(),
table.snapshot().unwrap().snapshot.clone(),
)
.with_retention_period(Duration::hours(169))
.with_dry_run(true)
.await?;

assert_eq!(
result.files_deleted,
Expand All @@ -765,11 +787,13 @@ mod tests {
.as_secs()
/ 3600;
let empty: Vec<String> = Vec::new();
let (_table, result) =
VacuumBuilder::new(table.log_store(), table.snapshot().unwrap().clone())
.with_retention_period(Duration::hours(retention_hours as i64))
.with_dry_run(true)
.await?;
let (_table, result) = VacuumBuilder::new(
table.log_store(),
table.snapshot().unwrap().snapshot.clone(),
)
.with_retention_period(Duration::hours(retention_hours as i64))
.with_dry_run(true)
.await?;

assert_eq!(result.files_deleted, empty);
Ok(())
Expand Down
4 changes: 4 additions & 0 deletions crates/core/src/table/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,10 @@ impl DeltaTableState {

/// Returns an iterator of file names present in the loaded state
#[inline]
#[deprecated(
since = "0.30.0",
note = "Simple object store paths are not meaningful once we support full urls."
)]
pub fn file_paths_iter(&self) -> impl Iterator<Item = Path> + '_ {
self.log_data().iter().map(|add| add.object_store_path())
}
Expand Down
2 changes: 1 addition & 1 deletion python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -504,7 +504,7 @@ impl RawDeltaTable {
.map_err(PyErr::from),
Err(e) => Err(PyRuntimeError::new_err(e.to_string())),
}?;
let mut cmd = VacuumBuilder::new(self.log_store()?, snapshot)
let mut cmd = VacuumBuilder::new(self.log_store()?, snapshot.snapshot().clone())
.with_enforce_retention_duration(enforce_retention_duration)
.with_dry_run(dry_run);

Expand Down
Loading