Skip to content
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 68 additions & 0 deletions src/hdfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,25 @@ const O_RDONLY: c_int = 0;
const O_WRONLY: c_int = 1;
const O_APPEND: c_int = 8;

/// Set a configuration key-value pair for the HDFS builder.
pub fn set_builder_conf(builder: *mut hdfsBuilder, key: &str, val: &str) -> Result<(), HdfsErr> {
let key_c = CString::new(key)
.map_err(|_| HdfsErr::Generic("Invalid key for CString".to_string()))?;
let val_c = CString::new(val)
.map_err(|_| HdfsErr::Generic("Invalid value for CString".to_string()))?;

let result = unsafe { hdfsBuilderConfSetStr(builder, key_c.as_ptr(), val_c.as_ptr()) };

if result != 0 {
Err(HdfsErr::Generic(format!(
"Failed to set builder conf: key = {}, val = {}",
key, val
)))
} else {
Ok(())
}
}

lazy_static! {
static ref HDFS_MANAGER: HdfsManager = HdfsManager::new();
}
Expand Down Expand Up @@ -171,6 +190,39 @@ impl HdfsFs {
self.raw
}

pub fn connect_with_config(
nn: &str,
config: &HashMap<String, String>,
) -> Result<Arc<Self>, HdfsErr> {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main entry for getting an Arc<HdfsFs> is get_hdfs_by_full_path. It caches already connected file system handles and will reuse them when getting another HdfsFs using uri with cached host. HdfsFs handles were never closed, since the Rust binding does not call hdfsDisconnect at all.

The caller of connect_with_config has to deal with caching and reusing already connected HdfsFs, otherwise we'll leave lots of connected Hadoop FileSystem objects in the JVM.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your comment! I’ve updated the implementation to use get_hdfs_by_full_path_with_config, so it now follows the same caching logic as get_hdfs_by_full_path and reuses existing HdfsFs handles. Let me know if there’s anything else I should adjust.

unsafe {
let builder = hdfsNewBuilder();
if builder.is_null() {
return Err(HdfsErr::Generic("Failed to create HDFS builder".to_string()));
}

let nn_c = CString::new(nn)
.map_err(|e| HdfsErr::Generic(format!("Invalid namenode URI: {e}")))?;
hdfsBuilderSetNameNode(builder, nn_c.as_ptr());

for (key, value) in config {
set_builder_conf(builder, key, value)?;
}

let fs = hdfsBuilderConnect(builder);
if fs.is_null() {
Err(HdfsErr::CannotConnectToNameNode(nn.to_string()))
} else {
let hdfs_fs = Arc::new(HdfsFs {
url: nn.to_string(),
raw: fs,
_marker: std::marker::PhantomData,
});

Ok(hdfs_fs)
}
}
}

/// Create HdfsFile from hdfsFile
fn new_hdfs_file(&self, path: &str, file: hdfsFile) -> Result<HdfsFile, HdfsErr> {
if file.is_null() {
Expand Down Expand Up @@ -887,6 +939,8 @@ mod test {
use uuid::Uuid;

use crate::minidfs::get_dfs;
use crate::hdfs::HdfsFs;
use std::collections::HashMap;

#[cfg(feature = "use_existing_hdfs")]
#[test]
Expand Down Expand Up @@ -1083,4 +1137,18 @@ mod test {
// Clean up
assert!(fs.delete(test_file, false).is_ok());
}

#[test]
fn test_connect_with_config() {
let mut config = HashMap::new();
config.insert("dfs.client.read.shortcircuit".to_string(), "true".to_string());
config.insert("fs.s3a.access.key".to_string(), "xxx".to_string());
config.insert("fs.s3a.secret.key".to_string(), "yyy".to_string());

let result = HdfsFs::connect_with_config("default", &config);
match result {
Ok(ref fs) => println!("Successfully connected to HDFS with config: {:?}", fs.url),
Err(e) => panic!("Failed to connect: {:?}", e),
}
}
}