Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ url = "2.3"
dashmap = "6.0.1"

# datafusion feature is required for writer version 2
deltalake-core = { version = "0.25.0", features = ["json", "datafusion"]}
deltalake-aws = { version = "0.8.0", optional = true }
deltalake-azure = { version = "0.8.0", optional = true }
deltalake-core = { version = "0.26.0", features = ["json", "datafusion"]}
deltalake-aws = { version = "0.9.0", optional = true }
deltalake-azure = { version = "0.9.0", optional = true }

# s3 feature enabled, helps for locking interactions with DLQ
dynamodb_lock = { version = "0.6.0", optional = true }
Expand Down
5 changes: 3 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ extern crate strum_macros;
extern crate serde_json;

use coercions::CoercionTree;
use deltalake_core::operations::transaction::TableReference;
use deltalake_core::kernel::transaction::CommitBuilder;
use deltalake_core::kernel::transaction::TableReference;
use deltalake_core::protocol::DeltaOperation;
use deltalake_core::protocol::OutputMode;
use deltalake_core::{DeltaTable, DeltaTableError};
Expand Down Expand Up @@ -972,7 +973,7 @@ impl IngestProcessor {
.duration_since(std::time::UNIX_EPOCH)
.expect("Time went backwards")
.as_millis() as i64;
let commit = deltalake_core::operations::transaction::CommitBuilder::default()
let commit = CommitBuilder::default()
.with_actions(actions.clone())
.build(
self.table.state.as_ref().map(|s| s as &dyn TableReference),
Expand Down
5 changes: 3 additions & 2 deletions src/offsets.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use crate::delta_helpers::*;
use crate::{DataTypeOffset, DataTypePartition};
use deltalake_core::kernel::transaction::CommitBuilder;
use deltalake_core::kernel::transaction::TableReference;
use deltalake_core::kernel::Action;
use deltalake_core::operations::transaction::TableReference;
use deltalake_core::protocol::DeltaOperation;
use deltalake_core::protocol::OutputMode;
use deltalake_core::{DeltaTable, DeltaTableError};
Expand Down Expand Up @@ -116,7 +117,7 @@ async fn commit_partition_offsets(
.as_millis() as i64;

table.update().await?;
let commit = deltalake_core::operations::transaction::CommitBuilder::default()
let commit = CommitBuilder::default()
.with_actions(actions)
.build(
table.state.as_ref().map(|s| s as &dyn TableReference),
Expand Down
7 changes: 4 additions & 3 deletions src/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use deltalake_core::arrow::{
json::reader::ReaderBuilder,
record_batch::*,
};
use deltalake_core::kernel::transaction::CommitBuilder;
use deltalake_core::parquet::{
arrow::ArrowWriter,
basic::{Compression, LogicalType},
Expand All @@ -22,13 +23,13 @@ use deltalake_core::parquet::{
};
use deltalake_core::protocol::DeltaOperation;
use deltalake_core::protocol::SaveMode;
use deltalake_core::{kernel::transaction::TableReference, parquet::format::FileMetaData};
use deltalake_core::{
kernel::{Action, Add, Schema},
logstore::ObjectStoreRef,
protocol::{ColumnCountStat, ColumnValueStat, Stats},
storage::ObjectStoreRef,
DeltaTable, DeltaTableError, ObjectStoreError,
};
use deltalake_core::{operations::transaction::TableReference, parquet::format::FileMetaData};
use log::{error, info, warn};
use serde_json::{Number, Value};
use std::collections::HashMap;
Expand Down Expand Up @@ -583,7 +584,7 @@ impl DataWriter {
self.write(values).await?;
let mut adds = self.write_parquet_files(&table.table_uri()).await?;
let actions = adds.drain(..).map(Action::Add).collect();
let commit = deltalake_core::operations::transaction::CommitBuilder::default()
let commit = CommitBuilder::default()
.with_max_retries(100) //We increase this from the default 15 times because (at leat for Azure) this may fail in case of to frequent writes (which happen if many messages arrive in the dead letter queue)
.with_actions(actions)
.build(
Expand Down
5 changes: 3 additions & 2 deletions tests/delta_partitions_tests.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
#[allow(dead_code)]
mod helpers;

use deltalake_core::kernel::transaction::CommitBuilder;
use deltalake_core::kernel::transaction::TableReference;
use deltalake_core::kernel::{Action, Add};
use deltalake_core::operations::transaction::TableReference;
use deltalake_core::protocol::{DeltaOperation, SaveMode};
use deltalake_core::DeltaTableError;
use kafka_delta_ingest::writer::*;
Expand Down Expand Up @@ -104,7 +105,7 @@ async fn test_delta_partitions() {
predicate: None,
};

let version = deltalake_core::operations::transaction::CommitBuilder::default()
let version = CommitBuilder::default()
.with_actions(result.iter().cloned().map(Action::Add).collect())
.build(
table.state.as_ref().map(|s| s as &dyn TableReference),
Expand Down
2 changes: 1 addition & 1 deletion tests/helpers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ use std::time::Duration;
use bytes::Buf;
use chrono::prelude::*;
use deltalake_core::kernel::{Action, Add, Metadata, Protocol, Remove, Transaction};
use deltalake_core::logstore::ObjectStoreRef;
use deltalake_core::parquet::{
file::reader::{FileReader, SerializedFileReader},
record::RowAccessor,
};
use deltalake_core::storage::ObjectStoreRef;
use deltalake_core::{DeltaTable, Path};
use kafka_delta_ingest::{start_ingest, IngestOptions};
use rdkafka::admin::{AdminClient, AdminOptions, NewTopic, TopicReplication};
Expand Down
Loading