Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 0 additions & 12 deletions .github/workflows/rust-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,6 @@ jobs:

- uses: Swatinem/rust-cache@v2

- name: Install native libs
run: sudo apt-get install -y libkrb5-dev

- name: build and lint with clippy
run: cargo clippy --all-targets --features integration-test -- -D warnings

Expand Down Expand Up @@ -84,14 +81,5 @@ jobs:
distribution: "temurin"
java-version: "17"

- name: Install native libs
run: sudo apt-get install -y libkrb5-dev krb5-user

- name: Download Hadoop
run: |
wget -q https://dlcdn.apache.org/hadoop/common/hadoop-3.4.0/hadoop-3.4.0.tar.gz
tar -xf hadoop-3.4.0.tar.gz -C $GITHUB_WORKSPACE
echo "$GITHUB_WORKSPACE/hadoop-3.4.0/bin" >> $GITHUB_PATH

- name: Run tests
run: cargo test --features integration-test
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ async-trait = "0.1"
bytes = "1"
chrono = "0.4"
futures = "0.3"
hdfs-native = "0.12"
hdfs-native = "0.12.2"
object_store = "0.12.2"
thiserror = "2"
tokio = { version = "1", features = ["rt", "net", "io-util", "macros", "sync", "time"] }
Expand Down
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@ Each release supports a certain minor release of both the `object_store` crate a
|0.12.x|>=0.10, <0.12|0.10|
|0.13.x|>=0.10, <0.12|0.11|
|0.14.x|0.12|0.11|
|0.15.x|0.12|0.12|
|0.15.x|>=0.12.2, <0.13|0.12|

# Usage
```rust
use hdfs_native_object_store::HdfsObjectStore;
let store = HdfsObjectStore::with_url("hdfs://localhost:9000")?;
use hdfs_native_object_store::HdfsObjectStoreBuilder;
let store = HdfsObjectStoreBuilder::new().with_url("hdfs://localhost:9000").build()?;
```

# Documentation
Expand Down
138 changes: 103 additions & 35 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,18 @@
//! # Usage
//!
//! ```rust
//! use hdfs_native_object_store::HdfsObjectStore;
//! # use object_store::Result;
//! # fn main() -> Result<()> {
//! let store = HdfsObjectStore::with_url("hdfs://localhost:9000")?;
//! # Ok(())
//! # }
//! use hdfs_native_object_store::HdfsObjectStoreBuilder;
//! let store = HdfsObjectStoreBuilder::new()
//! .with_url("hdfs://localhost:9000")
//! .build()
//! .unwrap();
//! ```
//!
use std::{
collections::HashMap,
fmt::{Display, Formatter},
future,
path::PathBuf,
sync::Arc,
};

use async_trait::async_trait;
Expand All @@ -34,6 +32,7 @@ use object_store::{
ObjectStore, PutMode, PutMultipartOpts, PutOptions, PutPayload, PutResult, Result, UploadPart,
};
use tokio::{
runtime::Handle,
sync::{mpsc, oneshot},
task::{self, JoinHandle},
};
Expand All @@ -51,23 +50,64 @@ fn generic_error(
}
}

/// Builder for creating an [HdfsObjectStore]
#[derive(Default)]
pub struct HdfsObjectStoreBuilder {
inner: ClientBuilder,
}

impl HdfsObjectStoreBuilder {
/// Create a new [HdfsObjectStoreBuilder]
pub fn new() -> Self {
Self::default()
}

/// Set the URL to connect to. Can be the address of a single NameNode, or a logical NameService
pub fn with_url(mut self, url: impl Into<String>) -> Self {
self.inner = self.inner.with_url(url);
self
}

/// Set configs to use for the client. The provided configs will override any found in the default config files loaded
pub fn with_config(
mut self,
config: impl IntoIterator<Item = (impl Into<String>, impl Into<String>)>,
) -> Self {
self.inner = self.inner.with_config(config);
self
}

// Use a dedicated tokio runtime for spawned tasks and IO operations
pub fn with_io_runtime(mut self, runtime: Handle) -> Self {
self.inner = self.inner.with_io_runtime(runtime);
self
}

/// Create the [HdfsObjectStore]] instance from the provided settings
pub fn build(self) -> Result<HdfsObjectStore> {
let client = self.inner.build().to_object_store_err()?;

Ok(HdfsObjectStore { client })
}
}

/// Interface for [Hadoop Distributed File System](https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-hdfs/HdfsDesign.html).
#[derive(Debug, Clone)]
pub struct HdfsObjectStore {
client: Arc<Client>,
client: Client,
}

impl HdfsObjectStore {
/// Creates a new HdfsObjectStore from an existing [Client]
/// Creates a new HdfsObjectStore from an existing `hdfs-native` [Client]
///
/// ```rust
/// # use std::sync::Arc;
/// use hdfs_native::ClientBuilder;
/// # use hdfs_native_object_store::HdfsObjectStore;
/// let client = ClientBuilder::new().with_url("hdfs://127.0.0.1:9000").build().unwrap();
/// let store = HdfsObjectStore::new(Arc::new(client));
/// let store = HdfsObjectStore::new(client);
/// ```
pub fn new(client: Arc<Client>) -> Self {
pub fn new(client: Client) -> Self {
Self { client }
}

Expand All @@ -81,13 +121,14 @@ impl HdfsObjectStore {
/// # Ok(())
/// # }
/// ```
#[deprecated(since = "0.15.0", note = "Use HdfsObjectStoreBuilder instead")]
pub fn with_url(url: &str) -> Result<Self> {
Ok(Self::new(Arc::new(
ClientBuilder::new()
.with_url(url)
.build()
.to_object_store_err()?,
)))
let client = ClientBuilder::new()
.with_url(url)
.build()
.to_object_store_err()?;

Ok(Self { client })
}

/// Creates a new HdfsObjectStore using the specified URL and Hadoop configs.
Expand All @@ -106,14 +147,15 @@ impl HdfsObjectStore {
/// # Ok(())
/// # }
/// ```
#[deprecated(since = "0.15.0", note = "Use HdfsObjectStoreBuilder instead")]
pub fn with_config(url: &str, config: HashMap<String, String>) -> Result<Self> {
Ok(Self::new(Arc::new(
ClientBuilder::new()
.with_url(url)
.with_config(config)
.build()
.to_object_store_err()?,
)))
let client = ClientBuilder::new()
.with_url(url)
.with_config(config)
.build()
.to_object_store_err()?;

Ok(Self { client })
}

async fn internal_copy(&self, from: &Path, to: &Path, overwrite: bool) -> Result<()> {
Expand Down Expand Up @@ -190,7 +232,7 @@ impl Display for HdfsObjectStore {

impl From<Client> for HdfsObjectStore {
fn from(value: Client) -> Self {
Self::new(Arc::new(value))
Self { client: value }
}
}

Expand Down Expand Up @@ -257,7 +299,7 @@ impl ObjectStore for HdfsObjectStore {
let (tmp_file, tmp_file_path) = self.open_tmp_file(&final_file_path).await?;

Ok(Box::new(HdfsMultipartWriter::new(
Arc::clone(&self.client),
self.client.clone(),
tmp_file,
&tmp_file_path,
&final_file_path,
Expand Down Expand Up @@ -491,19 +533,14 @@ type PartSender = mpsc::UnboundedSender<(oneshot::Sender<Result<()>>, PutPayload
// A once cell is used to track whether a part has finished writing or not.
// On completing, rename the file to the actual target.
struct HdfsMultipartWriter {
client: Arc<Client>,
client: Client,
sender: Option<(JoinHandle<Result<()>>, PartSender)>,
tmp_filename: String,
final_filename: String,
}

impl HdfsMultipartWriter {
fn new(
client: Arc<Client>,
writer: FileWriter,
tmp_filename: &str,
final_filename: &str,
) -> Self {
fn new(client: Client, writer: FileWriter, tmp_filename: &str, final_filename: &str) -> Self {
let (sender, receiver) = mpsc::unbounded_channel();

Self {
Expand Down Expand Up @@ -659,16 +696,22 @@ mod test {
use std::collections::HashSet;

use object_store::integration::*;
use serial_test::serial;
use tokio::runtime::Runtime;

use crate::HdfsObjectStore;
use crate::HdfsObjectStoreBuilder;

#[tokio::test]
#[serial]
async fn hdfs_test() {
let dfs = hdfs_native::minidfs::MiniDfs::with_features(&HashSet::from([
hdfs_native::minidfs::DfsFeatures::HA,
]));

let integration = HdfsObjectStore::with_url(&dfs.url).unwrap();
let integration = HdfsObjectStoreBuilder::new()
.with_url(&dfs.url)
.build()
.unwrap();

put_get_delete_list(&integration).await;
list_uses_directories_correctly(&integration).await;
Expand All @@ -680,4 +723,29 @@ mod test {
get_opts(&integration).await;
put_opts(&integration, false).await;
}

#[test]
#[serial]
fn test_no_tokio() {
let dfs = hdfs_native::minidfs::MiniDfs::with_features(&HashSet::from([
hdfs_native::minidfs::DfsFeatures::HA,
]));

let integration = HdfsObjectStoreBuilder::new()
.with_url(&dfs.url)
.build()
.unwrap();

futures::executor::block_on(get_opts(&integration));

let rt = Runtime::new().unwrap();

let integration = HdfsObjectStoreBuilder::new()
.with_url(&dfs.url)
.with_io_runtime(rt.handle().clone())
.build()
.unwrap();

futures::executor::block_on(get_opts(&integration));
}
}