diff --git a/Cargo.toml b/Cargo.toml index d0570b0..ca3276b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -36,9 +36,9 @@ url = "2.3" dashmap = "6.0.1" # datafusion feature is required for writer version 2 -deltalake-core = { version = "0.25.0", features = ["json", "datafusion"]} -deltalake-aws = { version = "0.8.0", optional = true } -deltalake-azure = { version = "0.8.0", optional = true } +deltalake-core = { version = "0.26.0", features = ["json", "datafusion"]} +deltalake-aws = { version = "0.9.0", optional = true } +deltalake-azure = { version = "0.9.0", optional = true } # s3 feature enabled, helps for locking interactions with DLQ dynamodb_lock = { version = "0.6.0", optional = true } diff --git a/src/lib.rs b/src/lib.rs index 7491ddf..7296e04 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -17,7 +17,8 @@ extern crate strum_macros; extern crate serde_json; use coercions::CoercionTree; -use deltalake_core::operations::transaction::TableReference; +use deltalake_core::kernel::transaction::CommitBuilder; +use deltalake_core::kernel::transaction::TableReference; use deltalake_core::protocol::DeltaOperation; use deltalake_core::protocol::OutputMode; use deltalake_core::{DeltaTable, DeltaTableError}; @@ -972,7 +973,7 @@ impl IngestProcessor { .duration_since(std::time::UNIX_EPOCH) .expect("Time went backwards") .as_millis() as i64; - let commit = deltalake_core::operations::transaction::CommitBuilder::default() + let commit = CommitBuilder::default() .with_actions(actions.clone()) .build( self.table.state.as_ref().map(|s| s as &dyn TableReference), diff --git a/src/offsets.rs b/src/offsets.rs index e280e49..00ae9ac 100644 --- a/src/offsets.rs +++ b/src/offsets.rs @@ -1,7 +1,8 @@ use crate::delta_helpers::*; use crate::{DataTypeOffset, DataTypePartition}; +use deltalake_core::kernel::transaction::CommitBuilder; +use deltalake_core::kernel::transaction::TableReference; use deltalake_core::kernel::Action; -use deltalake_core::operations::transaction::TableReference; use deltalake_core::protocol::DeltaOperation; use deltalake_core::protocol::OutputMode; use deltalake_core::{DeltaTable, DeltaTableError}; @@ -116,7 +117,7 @@ async fn commit_partition_offsets( .as_millis() as i64; table.update().await?; - let commit = deltalake_core::operations::transaction::CommitBuilder::default() + let commit = CommitBuilder::default() .with_actions(actions) .build( table.state.as_ref().map(|s| s as &dyn TableReference), diff --git a/src/writer.rs b/src/writer.rs index df7bc69..5ba1fab 100644 --- a/src/writer.rs +++ b/src/writer.rs @@ -12,6 +12,7 @@ use deltalake_core::arrow::{ json::reader::ReaderBuilder, record_batch::*, }; +use deltalake_core::kernel::transaction::CommitBuilder; use deltalake_core::parquet::{ arrow::ArrowWriter, basic::{Compression, LogicalType}, @@ -22,13 +23,13 @@ use deltalake_core::parquet::{ }; use deltalake_core::protocol::DeltaOperation; use deltalake_core::protocol::SaveMode; +use deltalake_core::{kernel::transaction::TableReference, parquet::format::FileMetaData}; use deltalake_core::{ kernel::{Action, Add, Schema}, + logstore::ObjectStoreRef, protocol::{ColumnCountStat, ColumnValueStat, Stats}, - storage::ObjectStoreRef, DeltaTable, DeltaTableError, ObjectStoreError, }; -use deltalake_core::{operations::transaction::TableReference, parquet::format::FileMetaData}; use log::{error, info, warn}; use serde_json::{Number, Value}; use std::collections::HashMap; @@ -583,7 +584,7 @@ impl DataWriter { self.write(values).await?; let mut adds = self.write_parquet_files(&table.table_uri()).await?; let actions = adds.drain(..).map(Action::Add).collect(); - let commit = deltalake_core::operations::transaction::CommitBuilder::default() + let commit = CommitBuilder::default() .with_max_retries(100) //We increase this from the default 15 times because (at leat for Azure) this may fail in case of to frequent writes (which happen if many messages arrive in the dead letter queue) .with_actions(actions) .build( diff --git a/tests/delta_partitions_tests.rs b/tests/delta_partitions_tests.rs index 5058897..efa4326 100644 --- a/tests/delta_partitions_tests.rs +++ b/tests/delta_partitions_tests.rs @@ -1,8 +1,9 @@ #[allow(dead_code)] mod helpers; +use deltalake_core::kernel::transaction::CommitBuilder; +use deltalake_core::kernel::transaction::TableReference; use deltalake_core::kernel::{Action, Add}; -use deltalake_core::operations::transaction::TableReference; use deltalake_core::protocol::{DeltaOperation, SaveMode}; use deltalake_core::DeltaTableError; use kafka_delta_ingest::writer::*; @@ -104,7 +105,7 @@ async fn test_delta_partitions() { predicate: None, }; - let version = deltalake_core::operations::transaction::CommitBuilder::default() + let version = CommitBuilder::default() .with_actions(result.iter().cloned().map(Action::Add).collect()) .build( table.state.as_ref().map(|s| s as &dyn TableReference), diff --git a/tests/helpers/mod.rs b/tests/helpers/mod.rs index a07ebbb..c14a9ae 100644 --- a/tests/helpers/mod.rs +++ b/tests/helpers/mod.rs @@ -10,11 +10,11 @@ use std::time::Duration; use bytes::Buf; use chrono::prelude::*; use deltalake_core::kernel::{Action, Add, Metadata, Protocol, Remove, Transaction}; +use deltalake_core::logstore::ObjectStoreRef; use deltalake_core::parquet::{ file::reader::{FileReader, SerializedFileReader}, record::RowAccessor, }; -use deltalake_core::storage::ObjectStoreRef; use deltalake_core::{DeltaTable, Path}; use kafka_delta_ingest::{start_ingest, IngestOptions}; use rdkafka::admin::{AdminClient, AdminOptions, NewTopic, TopicReplication};