Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions crates/core/src/kernel/transaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -937,6 +937,15 @@ pub struct FinalizedCommit {
pub metrics: Metrics,
}

impl std::fmt::Debug for FinalizedCommit {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("FinalizedCommit")
.field("version", &self.version)
.field("metrics", &self.metrics)
.finish()
}
}

impl FinalizedCommit {
/// The new table state after a commit
pub fn snapshot(&self) -> DeltaTableState {
Expand Down
63 changes: 57 additions & 6 deletions crates/core/src/writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use async_trait::async_trait;
use object_store::Error as ObjectStoreError;
use parquet::errors::ParquetError;
use serde_json::Value;
use tracing::log::*;

use crate::errors::DeltaTableError;
use crate::kernel::transaction::{CommitBuilder, CommitProperties};
Expand Down Expand Up @@ -152,14 +153,15 @@ pub trait DeltaWriter<T> {
/// and commit the changes to the Delta log, creating a new table version.
async fn flush_and_commit(&mut self, table: &mut DeltaTable) -> Result<i64, DeltaTableError> {
let adds: Vec<_> = self.flush().await?.drain(..).map(Action::Add).collect();
flush_and_commit(adds, table).await
flush_and_commit(adds, table, None).await
}
}

/// Method for flushing to be used by writers
pub(crate) async fn flush_and_commit(
adds: Vec<Action>,
table: &mut DeltaTable,
commit_properties: Option<CommitProperties>,
) -> Result<i64, DeltaTableError> {
let snapshot = table.snapshot()?;
let partition_cols = snapshot.metadata().partition_columns().clone();
Expand All @@ -174,11 +176,60 @@ pub(crate) async fn flush_and_commit(
predicate: None,
};

let version = CommitBuilder::from(CommitProperties::default())
let finalized = CommitBuilder::from(commit_properties.unwrap_or_default())
.with_actions(adds)
.build(Some(snapshot), table.log_store.clone(), operation)
.await?
.version();
table.update().await?;
Ok(version)
.await?;
table.state = Some(finalized.snapshot());
Ok(finalized.version())
}

#[cfg(test)]
mod tests {
use delta_kernel::schema::DataType;

use super::*;
use crate::{DeltaOps, DeltaResult};
use pretty_assertions::assert_ne;

/// This test doesn't have a great way to _validate_ that logs are not cleaned up as part of
/// the second commit.
///
/// Instead I just added some prints in the [PostCommit] logic to validate that the property
/// was getting pulled through correctly in the non-default case.
///
/// The _ideal_ testing scenario would probably be to propagate metrics out of
/// [flush_and_commit] but that's an API change we isn't desirable at the moment
#[tokio::test]
async fn test_flush_and_commit() -> DeltaResult<()> {
let mut table = DeltaOps::new_in_memory()
.create()
.with_table_name("my_table")
.with_column(
"id",
DataType::Primitive(delta_kernel::schema::PrimitiveType::Long),
true,
None,
)
.with_configuration_property(
crate::TableProperty::LogRetentionDuration,
Some("interval 0 days"),
)
.await?;

let add = Add::default();
let actions = vec![Action::Add(add)];
let first_version = flush_and_commit(actions, &mut table, None).await?;

let add = Add::default();
let actions = vec![Action::Add(add)];

let properties = CommitProperties::default().with_cleanup_expired_logs(Some(false));
let second_version = flush_and_commit(actions, &mut table, Some(properties)).await?;
assert_ne!(
second_version, first_version,
"flush_and_commit did not create a version apparently?"
);
Ok(())
}
}
16 changes: 15 additions & 1 deletion crates/core/src/writer/record_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use super::utils::{
use super::{DeltaWriter, DeltaWriterError, WriteMode};
use crate::errors::DeltaTableError;
use crate::kernel::schema::merge_arrow_schema;
use crate::kernel::transaction::CommitProperties;
use crate::kernel::MetadataExt as _;
use crate::kernel::{scalars::ScalarExt, Action, Add, PartitionsExt};
use crate::logstore::ObjectStoreRetryExt;
Expand All @@ -49,6 +50,7 @@ pub struct RecordBatchWriter {
arrow_writers: HashMap<String, PartitionWriter>,
num_indexed_cols: DataSkippingNumIndexedCols,
stats_columns: Option<Vec<String>>,
commit_properties: Option<CommitProperties>,
}

impl std::fmt::Debug for RecordBatchWriter {
Expand Down Expand Up @@ -103,9 +105,20 @@ impl RecordBatchWriter {
stats_columns: configuration
.get("delta.dataSkippingStatsColumns")
.map(|v| v.split(',').map(|s| s.to_string()).collect()),
commit_properties: None,
})
}

/// Add the [CommitProperties] to the [RecordBatchWriter] to be used when the writer flushes
/// the write into storage.
///
/// This can be useful for situations where slight modifications to the commit behavior are
/// required.
pub fn with_commit_properties(mut self, properties: CommitProperties) -> Self {
self.commit_properties = Some(properties);
self
}

/// Creates a [`RecordBatchWriter`] to write data to provided Delta Table
pub fn for_table(table: &DeltaTable) -> Result<Self, DeltaTableError> {
// Initialize an arrow schema ref from the delta table schema
Expand Down Expand Up @@ -142,6 +155,7 @@ impl RecordBatchWriter {
stats_columns: configuration
.get("delta.dataSkippingStatsColumns")
.map(|v| v.split(',').map(|s| s.to_string()).collect()),
commit_properties: None,
})
}

Expand Down Expand Up @@ -306,7 +320,7 @@ impl DeltaWriter<RecordBatch> for RecordBatchWriter {
let metadata = current_meta.with_schema(&schema)?;
adds.push(Action::Metadata(metadata));
}
super::flush_and_commit(adds, table).await
super::flush_and_commit(adds, table, self.commit_properties.clone()).await
}
}

Expand Down
Loading