From c72000a2df1545ecf7bee53ffa8a2cc5d7d3a595 Mon Sep 17 00:00:00 2001 From: Adam Binford Date: Sat, 16 Aug 2025 14:20:08 -0400 Subject: [PATCH 1/4] Add builder API and remove unnecessary Arcs --- Cargo.lock | 4 +- Cargo.toml | 2 +- README.md | 6 +-- src/lib.rs | 136 +++++++++++++++++++++++++++++++++++++++-------------- 4 files changed, 108 insertions(+), 40 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ba27b0c..5841ddf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -535,9 +535,9 @@ checksum = "07e28edb80900c19c28f1072f2e8aeca7fa06b23cd4169cefe1af5aa3260783f" [[package]] name = "hdfs-native" -version = "0.12.0" +version = "0.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "23709a4e6b07f231c28608b9c84a873157221cae9674478ca7569ec5ac505517" +checksum = "0f2446941d466dabd56a44f77ee435e4501a68dfc5022ed18b7626606ce95570" dependencies = [ "aes", "base64", diff --git a/Cargo.toml b/Cargo.toml index 943b68d..d6480b9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,7 +15,7 @@ async-trait = "0.1" bytes = "1" chrono = "0.4" futures = "0.3" -hdfs-native = "0.12" +hdfs-native = "0.12.1" object_store = "0.12.2" thiserror = "2" tokio = { version = "1", features = ["rt", "net", "io-util", "macros", "sync", "time"] } diff --git a/README.md b/README.md index 47dfb89..992d595 100644 --- a/README.md +++ b/README.md @@ -12,12 +12,12 @@ Each release supports a certain minor release of both the `object_store` crate a |0.12.x|>=0.10, <0.12|0.10| |0.13.x|>=0.10, <0.12|0.11| |0.14.x|0.12|0.11| -|0.15.x|0.12|0.12| +|0.15.x|>=0.12.2, <0.13|0.12| # Usage ```rust -use hdfs_native_object_store::HdfsObjectStore; -let store = HdfsObjectStore::with_url("hdfs://localhost:9000")?; +use hdfs_native_object_store::HdfsObjectStoreBuilder; +let store = HdfsObjectStoreBuilder::new().with_url("hdfs://localhost:9000").build()?; ``` # Documentation diff --git a/src/lib.rs b/src/lib.rs index 711689e..1ee5f00 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -3,12 +3,11 @@ //! # Usage //! //! ```rust -//! use hdfs_native_object_store::HdfsObjectStore; -//! # use object_store::Result; -//! # fn main() -> Result<()> { -//! let store = HdfsObjectStore::with_url("hdfs://localhost:9000")?; -//! # Ok(()) -//! # } +//! use hdfs_native_object_store::HdfsObjectStoreBuilder; +//! let store = HdfsObjectStoreBuilder::new() +//! .with_url("hdfs://localhost:9000") +//! .build() +//! .unwrap(); //! ``` //! use std::{ @@ -16,7 +15,6 @@ use std::{ fmt::{Display, Formatter}, future, path::PathBuf, - sync::Arc, }; use async_trait::async_trait; @@ -34,6 +32,7 @@ use object_store::{ ObjectStore, PutMode, PutMultipartOpts, PutOptions, PutPayload, PutResult, Result, UploadPart, }; use tokio::{ + runtime::Handle, sync::{mpsc, oneshot}, task::{self, JoinHandle}, }; @@ -51,14 +50,55 @@ fn generic_error( } } +/// Builder for creating an [HdfsObjectStore] +#[derive(Default)] +pub struct HdfsObjectStoreBuilder { + inner: ClientBuilder, +} + +impl HdfsObjectStoreBuilder { + /// Create a new [HdfsObjectStoreBuilder] + pub fn new() -> Self { + Self::default() + } + + /// Set the URL to connect to. Can be the address of a single NameNode, or a logical NameService + pub fn with_url(mut self, url: impl Into) -> Self { + self.inner = self.inner.with_url(url); + self + } + + /// Set configs to use for the client. The provided configs will override any found in the default config files loaded + pub fn with_config( + mut self, + config: impl IntoIterator, impl Into)>, + ) -> Self { + self.inner = self.inner.with_config(config); + self + } + + // Use a dedicated tokio runtime for spawned tasks and IO operations + pub fn with_io_runtime(mut self, runtime: Handle) -> Self { + self.inner = self.inner.with_io_runtime(runtime); + self + } + + /// Create the [HdfsObjectStore]] instance from the provided settings + pub fn build(self) -> Result { + let client = self.inner.build().to_object_store_err()?; + + Ok(HdfsObjectStore { client }) + } +} + /// Interface for [Hadoop Distributed File System](https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-hdfs/HdfsDesign.html). #[derive(Debug, Clone)] pub struct HdfsObjectStore { - client: Arc, + client: Client, } impl HdfsObjectStore { - /// Creates a new HdfsObjectStore from an existing [Client] + /// Creates a new HdfsObjectStore from an existing `hdfs-native` [Client] /// /// ```rust /// # use std::sync::Arc; @@ -67,7 +107,7 @@ impl HdfsObjectStore { /// let client = ClientBuilder::new().with_url("hdfs://127.0.0.1:9000").build().unwrap(); /// let store = HdfsObjectStore::new(Arc::new(client)); /// ``` - pub fn new(client: Arc) -> Self { + pub fn new(client: Client) -> Self { Self { client } } @@ -81,13 +121,14 @@ impl HdfsObjectStore { /// # Ok(()) /// # } /// ``` + #[deprecated(since = "0.15.0", note = "Use HdfsObjectStoreBuilder instead")] pub fn with_url(url: &str) -> Result { - Ok(Self::new(Arc::new( - ClientBuilder::new() - .with_url(url) - .build() - .to_object_store_err()?, - ))) + let client = ClientBuilder::new() + .with_url(url) + .build() + .to_object_store_err()?; + + Ok(Self { client }) } /// Creates a new HdfsObjectStore using the specified URL and Hadoop configs. @@ -106,14 +147,15 @@ impl HdfsObjectStore { /// # Ok(()) /// # } /// ``` + #[deprecated(since = "0.15.0", note = "Use HdfsObjectStoreBuilder instead")] pub fn with_config(url: &str, config: HashMap) -> Result { - Ok(Self::new(Arc::new( - ClientBuilder::new() - .with_url(url) - .with_config(config) - .build() - .to_object_store_err()?, - ))) + let client = ClientBuilder::new() + .with_url(url) + .with_config(config) + .build() + .to_object_store_err()?; + + Ok(Self { client }) } async fn internal_copy(&self, from: &Path, to: &Path, overwrite: bool) -> Result<()> { @@ -190,7 +232,7 @@ impl Display for HdfsObjectStore { impl From for HdfsObjectStore { fn from(value: Client) -> Self { - Self::new(Arc::new(value)) + Self { client: value } } } @@ -257,7 +299,7 @@ impl ObjectStore for HdfsObjectStore { let (tmp_file, tmp_file_path) = self.open_tmp_file(&final_file_path).await?; Ok(Box::new(HdfsMultipartWriter::new( - Arc::clone(&self.client), + self.client.clone(), tmp_file, &tmp_file_path, &final_file_path, @@ -491,19 +533,14 @@ type PartSender = mpsc::UnboundedSender<(oneshot::Sender>, PutPayload // A once cell is used to track whether a part has finished writing or not. // On completing, rename the file to the actual target. struct HdfsMultipartWriter { - client: Arc, + client: Client, sender: Option<(JoinHandle>, PartSender)>, tmp_filename: String, final_filename: String, } impl HdfsMultipartWriter { - fn new( - client: Arc, - writer: FileWriter, - tmp_filename: &str, - final_filename: &str, - ) -> Self { + fn new(client: Client, writer: FileWriter, tmp_filename: &str, final_filename: &str) -> Self { let (sender, receiver) = mpsc::unbounded_channel(); Self { @@ -659,16 +696,22 @@ mod test { use std::collections::HashSet; use object_store::integration::*; + use serial_test::serial; + use tokio::runtime::Runtime; - use crate::HdfsObjectStore; + use crate::HdfsObjectStoreBuilder; #[tokio::test] + #[serial] async fn hdfs_test() { let dfs = hdfs_native::minidfs::MiniDfs::with_features(&HashSet::from([ hdfs_native::minidfs::DfsFeatures::HA, ])); - let integration = HdfsObjectStore::with_url(&dfs.url).unwrap(); + let integration = HdfsObjectStoreBuilder::new() + .with_url(&dfs.url) + .build() + .unwrap(); put_get_delete_list(&integration).await; list_uses_directories_correctly(&integration).await; @@ -680,4 +723,29 @@ mod test { get_opts(&integration).await; put_opts(&integration, false).await; } + + #[test] + #[serial] + fn test_no_tokio() { + let dfs = hdfs_native::minidfs::MiniDfs::with_features(&HashSet::from([ + hdfs_native::minidfs::DfsFeatures::HA, + ])); + + let integration = HdfsObjectStoreBuilder::new() + .with_url(&dfs.url) + .build() + .unwrap(); + + futures::executor::block_on(get_opts(&integration)); + + let rt = Runtime::new().unwrap(); + + let integration = HdfsObjectStoreBuilder::new() + .with_url(&dfs.url) + .with_io_runtime(rt.handle().clone()) + .build() + .unwrap(); + + futures::executor::block_on(get_opts(&integration)); + } } From 461b1da718bfcd7e0f49e65a220543806adad05e Mon Sep 17 00:00:00 2001 From: Adam Binford Date: Sat, 16 Aug 2025 14:29:32 -0400 Subject: [PATCH 2/4] Fix doctest and simplify workflow --- .github/workflows/rust-test.yml | 12 ------------ src/lib.rs | 2 +- 2 files changed, 1 insertion(+), 13 deletions(-) diff --git a/.github/workflows/rust-test.yml b/.github/workflows/rust-test.yml index 36b9c8e..18f5b0c 100644 --- a/.github/workflows/rust-test.yml +++ b/.github/workflows/rust-test.yml @@ -41,9 +41,6 @@ jobs: - uses: Swatinem/rust-cache@v2 - - name: Install native libs - run: sudo apt-get install -y libkrb5-dev - - name: build and lint with clippy run: cargo clippy --all-targets --features integration-test -- -D warnings @@ -84,14 +81,5 @@ jobs: distribution: "temurin" java-version: "17" - - name: Install native libs - run: sudo apt-get install -y libkrb5-dev krb5-user - - - name: Download Hadoop - run: | - wget -q https://dlcdn.apache.org/hadoop/common/hadoop-3.4.0/hadoop-3.4.0.tar.gz - tar -xf hadoop-3.4.0.tar.gz -C $GITHUB_WORKSPACE - echo "$GITHUB_WORKSPACE/hadoop-3.4.0/bin" >> $GITHUB_PATH - - name: Run tests run: cargo test --features integration-test diff --git a/src/lib.rs b/src/lib.rs index 1ee5f00..7823365 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -105,7 +105,7 @@ impl HdfsObjectStore { /// use hdfs_native::ClientBuilder; /// # use hdfs_native_object_store::HdfsObjectStore; /// let client = ClientBuilder::new().with_url("hdfs://127.0.0.1:9000").build().unwrap(); - /// let store = HdfsObjectStore::new(Arc::new(client)); + /// let store = HdfsObjectStore::new(client); /// ``` pub fn new(client: Client) -> Self { Self { client } From aa475cf0e6d029c1ff9aa27e570634a8a4d96f1a Mon Sep 17 00:00:00 2001 From: Adam Binford Date: Sat, 16 Aug 2025 14:34:16 -0400 Subject: [PATCH 3/4] Try disable a test --- src/lib.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 7823365..06b9a80 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -731,12 +731,12 @@ mod test { hdfs_native::minidfs::DfsFeatures::HA, ])); - let integration = HdfsObjectStoreBuilder::new() - .with_url(&dfs.url) - .build() - .unwrap(); + // let integration = HdfsObjectStoreBuilder::new() + // .with_url(&dfs.url) + // .build() + // .unwrap(); - futures::executor::block_on(get_opts(&integration)); + // futures::executor::block_on(get_opts(&integration)); let rt = Runtime::new().unwrap(); From 2335ef1d44e94b27ba32d77b01c89116d6b96861 Mon Sep 17 00:00:00 2001 From: Adam Binford Date: Sun, 17 Aug 2025 19:08:15 -0400 Subject: [PATCH 4/4] Bump hdfs-native to 0.12.2 --- Cargo.lock | 4 ++-- Cargo.toml | 2 +- src/lib.rs | 10 +++++----- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5841ddf..8ad5288 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -535,9 +535,9 @@ checksum = "07e28edb80900c19c28f1072f2e8aeca7fa06b23cd4169cefe1af5aa3260783f" [[package]] name = "hdfs-native" -version = "0.12.1" +version = "0.12.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0f2446941d466dabd56a44f77ee435e4501a68dfc5022ed18b7626606ce95570" +checksum = "411cba6a8e2c07a9476729690d05bd445c1425771eca41d2f5c3e41edebf7900" dependencies = [ "aes", "base64", diff --git a/Cargo.toml b/Cargo.toml index d6480b9..d70c586 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,7 +15,7 @@ async-trait = "0.1" bytes = "1" chrono = "0.4" futures = "0.3" -hdfs-native = "0.12.1" +hdfs-native = "0.12.2" object_store = "0.12.2" thiserror = "2" tokio = { version = "1", features = ["rt", "net", "io-util", "macros", "sync", "time"] } diff --git a/src/lib.rs b/src/lib.rs index 06b9a80..7823365 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -731,12 +731,12 @@ mod test { hdfs_native::minidfs::DfsFeatures::HA, ])); - // let integration = HdfsObjectStoreBuilder::new() - // .with_url(&dfs.url) - // .build() - // .unwrap(); + let integration = HdfsObjectStoreBuilder::new() + .with_url(&dfs.url) + .build() + .unwrap(); - // futures::executor::block_on(get_opts(&integration)); + futures::executor::block_on(get_opts(&integration)); let rt = Runtime::new().unwrap();