Skip to content

Commit da9c932

Browse files
committed
chore: upgrade to the latest arrow/datafusion/delta-rs
Signed-off-by: R. Tyler Croy <rtyler@brokenco.de>
1 parent 3c9a95f commit da9c932

File tree

6 files changed

+17
-13
lines changed

6 files changed

+17
-13
lines changed

Cargo.toml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,9 @@ url = "2.3"
3636
dashmap = "6.0.1"
3737

3838
# datafusion feature is required for writer version 2
39-
deltalake-core = { version = "0.25.0", features = ["json", "datafusion"]}
40-
deltalake-aws = { version = "0.8.0", optional = true }
41-
deltalake-azure = { version = "0.8.0", optional = true }
39+
deltalake-core = { version = "0.26.0", features = ["json", "datafusion"]}
40+
deltalake-aws = { version = "0.9.0", optional = true }
41+
deltalake-azure = { version = "0.9.0", optional = true }
4242

4343
# s3 feature enabled, helps for locking interactions with DLQ
4444
dynamodb_lock = { version = "0.6.0", optional = true }

src/lib.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,8 @@ extern crate strum_macros;
1717
extern crate serde_json;
1818

1919
use coercions::CoercionTree;
20-
use deltalake_core::operations::transaction::TableReference;
20+
use deltalake_core::kernel::transaction::CommitBuilder;
21+
use deltalake_core::kernel::transaction::TableReference;
2122
use deltalake_core::protocol::DeltaOperation;
2223
use deltalake_core::protocol::OutputMode;
2324
use deltalake_core::{DeltaTable, DeltaTableError};
@@ -972,7 +973,7 @@ impl IngestProcessor {
972973
.duration_since(std::time::UNIX_EPOCH)
973974
.expect("Time went backwards")
974975
.as_millis() as i64;
975-
let commit = deltalake_core::operations::transaction::CommitBuilder::default()
976+
let commit = CommitBuilder::default()
976977
.with_actions(actions.clone())
977978
.build(
978979
self.table.state.as_ref().map(|s| s as &dyn TableReference),

src/offsets.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
use crate::delta_helpers::*;
22
use crate::{DataTypeOffset, DataTypePartition};
3+
use deltalake_core::kernel::transaction::CommitBuilder;
4+
use deltalake_core::kernel::transaction::TableReference;
35
use deltalake_core::kernel::Action;
4-
use deltalake_core::operations::transaction::TableReference;
56
use deltalake_core::protocol::DeltaOperation;
67
use deltalake_core::protocol::OutputMode;
78
use deltalake_core::{DeltaTable, DeltaTableError};
@@ -116,7 +117,7 @@ async fn commit_partition_offsets(
116117
.as_millis() as i64;
117118

118119
table.update().await?;
119-
let commit = deltalake_core::operations::transaction::CommitBuilder::default()
120+
let commit = CommitBuilder::default()
120121
.with_actions(actions)
121122
.build(
122123
table.state.as_ref().map(|s| s as &dyn TableReference),

src/writer.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use deltalake_core::arrow::{
1212
json::reader::ReaderBuilder,
1313
record_batch::*,
1414
};
15+
use deltalake_core::kernel::transaction::CommitBuilder;
1516
use deltalake_core::parquet::{
1617
arrow::ArrowWriter,
1718
basic::{Compression, LogicalType},
@@ -22,13 +23,13 @@ use deltalake_core::parquet::{
2223
};
2324
use deltalake_core::protocol::DeltaOperation;
2425
use deltalake_core::protocol::SaveMode;
26+
use deltalake_core::{kernel::transaction::TableReference, parquet::format::FileMetaData};
2527
use deltalake_core::{
2628
kernel::{Action, Add, Schema},
29+
logstore::ObjectStoreRef,
2730
protocol::{ColumnCountStat, ColumnValueStat, Stats},
28-
storage::ObjectStoreRef,
2931
DeltaTable, DeltaTableError, ObjectStoreError,
3032
};
31-
use deltalake_core::{operations::transaction::TableReference, parquet::format::FileMetaData};
3233
use log::{error, info, warn};
3334
use serde_json::{Number, Value};
3435
use std::collections::HashMap;
@@ -583,7 +584,7 @@ impl DataWriter {
583584
self.write(values).await?;
584585
let mut adds = self.write_parquet_files(&table.table_uri()).await?;
585586
let actions = adds.drain(..).map(Action::Add).collect();
586-
let commit = deltalake_core::operations::transaction::CommitBuilder::default()
587+
let commit = CommitBuilder::default()
587588
.with_max_retries(100) //We increase this from the default 15 times because (at leat for Azure) this may fail in case of to frequent writes (which happen if many messages arrive in the dead letter queue)
588589
.with_actions(actions)
589590
.build(

tests/delta_partitions_tests.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
#[allow(dead_code)]
22
mod helpers;
33

4+
use deltalake_core::kernel::transaction::CommitBuilder;
5+
use deltalake_core::kernel::transaction::TableReference;
46
use deltalake_core::kernel::{Action, Add};
5-
use deltalake_core::operations::transaction::TableReference;
67
use deltalake_core::protocol::{DeltaOperation, SaveMode};
78
use deltalake_core::DeltaTableError;
89
use kafka_delta_ingest::writer::*;
@@ -104,7 +105,7 @@ async fn test_delta_partitions() {
104105
predicate: None,
105106
};
106107

107-
let version = deltalake_core::operations::transaction::CommitBuilder::default()
108+
let version = CommitBuilder::default()
108109
.with_actions(result.iter().cloned().map(Action::Add).collect())
109110
.build(
110111
table.state.as_ref().map(|s| s as &dyn TableReference),

tests/helpers/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,11 @@ use std::time::Duration;
1010
use bytes::Buf;
1111
use chrono::prelude::*;
1212
use deltalake_core::kernel::{Action, Add, Metadata, Protocol, Remove, Transaction};
13+
use deltalake_core::logstore::ObjectStoreRef;
1314
use deltalake_core::parquet::{
1415
file::reader::{FileReader, SerializedFileReader},
1516
record::RowAccessor,
1617
};
17-
use deltalake_core::storage::ObjectStoreRef;
1818
use deltalake_core::{DeltaTable, Path};
1919
use kafka_delta_ingest::{start_ingest, IngestOptions};
2020
use rdkafka::admin::{AdminClient, AdminOptions, NewTopic, TopicReplication};

0 commit comments

Comments
 (0)