From d36c136f860cf0f73d1347202a121db2bd5e0589 Mon Sep 17 00:00:00 2001 From: Evan Wu Date: Sun, 13 Jul 2025 23:28:25 -0700 Subject: [PATCH 1/2] feat: Add connect_with_config for configurable HDFS connections --- src/hdfs.rs | 68 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 68 insertions(+) diff --git a/src/hdfs.rs b/src/hdfs.rs index d5b6a67..c097289 100644 --- a/src/hdfs.rs +++ b/src/hdfs.rs @@ -37,6 +37,25 @@ const O_RDONLY: c_int = 0; const O_WRONLY: c_int = 1; const O_APPEND: c_int = 8; +/// Set a configuration key-value pair for the HDFS builder. +pub fn set_builder_conf(builder: *mut hdfsBuilder, key: &str, val: &str) -> Result<(), HdfsErr> { + let key_c = CString::new(key) + .map_err(|_| HdfsErr::Generic("Invalid key for CString".to_string()))?; + let val_c = CString::new(val) + .map_err(|_| HdfsErr::Generic("Invalid value for CString".to_string()))?; + + let result = unsafe { hdfsBuilderConfSetStr(builder, key_c.as_ptr(), val_c.as_ptr()) }; + + if result != 0 { + Err(HdfsErr::Generic(format!( + "Failed to set builder conf: key = {}, val = {}", + key, val + ))) + } else { + Ok(()) + } +} + lazy_static! { static ref HDFS_MANAGER: HdfsManager = HdfsManager::new(); } @@ -171,6 +190,39 @@ impl HdfsFs { self.raw } + pub fn connect_with_config( + nn: &str, + config: &HashMap, + ) -> Result, HdfsErr> { + unsafe { + let builder = hdfsNewBuilder(); + if builder.is_null() { + return Err(HdfsErr::Generic("Failed to create HDFS builder".to_string())); + } + + let nn_c = CString::new(nn) + .map_err(|e| HdfsErr::Generic(format!("Invalid namenode URI: {e}")))?; + hdfsBuilderSetNameNode(builder, nn_c.as_ptr()); + + for (key, value) in config { + set_builder_conf(builder, key, value)?; + } + + let fs = hdfsBuilderConnect(builder); + if fs.is_null() { + Err(HdfsErr::CannotConnectToNameNode(nn.to_string())) + } else { + let hdfs_fs = Arc::new(HdfsFs { + url: nn.to_string(), + raw: fs, + _marker: std::marker::PhantomData, + }); + + Ok(hdfs_fs) + } + } + } + /// Create HdfsFile from hdfsFile fn new_hdfs_file(&self, path: &str, file: hdfsFile) -> Result { if file.is_null() { @@ -887,6 +939,8 @@ mod test { use uuid::Uuid; use crate::minidfs::get_dfs; + use crate::hdfs::HdfsFs; + use std::collections::HashMap; #[cfg(feature = "use_existing_hdfs")] #[test] @@ -1083,4 +1137,18 @@ mod test { // Clean up assert!(fs.delete(test_file, false).is_ok()); } + + #[test] + fn test_connect_with_config() { + let mut config = HashMap::new(); + config.insert("dfs.client.read.shortcircuit".to_string(), "true".to_string()); + config.insert("fs.s3a.access.key".to_string(), "xxx".to_string()); + config.insert("fs.s3a.secret.key".to_string(), "yyy".to_string()); + + let result = HdfsFs::connect_with_config("default", &config); + match result { + Ok(ref fs) => println!("Successfully connected to HDFS with config: {:?}", fs.url), + Err(e) => panic!("Failed to connect: {:?}", e), + } + } } From 566cd6012337c0c0717809b2822721645abfb058 Mon Sep 17 00:00:00 2001 From: Evan Wu Date: Wed, 16 Jul 2025 12:13:34 -0700 Subject: [PATCH 2/2] address comments --- src/hdfs.rs | 85 ++++++++++++++++++++++++++++++----------------------- 1 file changed, 48 insertions(+), 37 deletions(-) diff --git a/src/hdfs.rs b/src/hdfs.rs index c097289..a7378b6 100644 --- a/src/hdfs.rs +++ b/src/hdfs.rs @@ -66,6 +66,13 @@ pub fn get_hdfs_by_full_path(path: &str) -> Result, HdfsErr> { HDFS_MANAGER.get_hdfs_by_full_path(path) } +pub fn get_hdfs_with_config( + path: &str, + config: Option<&HashMap>, +) -> Result, HdfsErr> { + HDFS_MANAGER.get_hdfs_by_full_path_with_config(path, config) +} + /// The default NameNode configuration will be used (from the XML configuration files) pub fn get_hdfs() -> Result, HdfsErr> { HDFS_MANAGER.get_hdfs_by_full_path("default") @@ -96,10 +103,41 @@ impl HdfsManager { } } - fn get_hdfs_by_full_path(&self, path: &str) -> Result, HdfsErr> { + fn get_hdfs_by_full_path( + &self, + path: &str, + ) -> Result, HdfsErr> { + self.get_hdfs_by_full_path_with_config(path, None) + } + + pub fn get_hdfs_by_full_path_with_config( + &self, + path: &str, + config: Option<&HashMap>, + ) -> Result, HdfsErr> { let namenode_uri = match path { "default" => "default".to_owned(), - _ => get_namenode_uri(path)?, + _ => { + let parsed = Url::parse(path) + .map_err(|e| HdfsErr::InvalidUrl(format!("{path}: {e}")))?; + match parsed.scheme() { + "s3a" | "s3" => { + let bucket = parsed.host_str().unwrap_or_default(); + if bucket.is_empty() { + return Err(HdfsErr::InvalidUrl(format!( + "S3 path missing bucket: {path}" + ))); + } + format!("{}://{}", parsed.scheme(), bucket) + } + "hdfs" | "viewfs" | "file" => get_namenode_uri(path)?, + scheme => { + return Err(HdfsErr::InvalidUrl(format!( + "Unsupported scheme for get_hdfs_by_full_path: {scheme}" + ))) + } + } + } }; // Get if already exists @@ -119,6 +157,11 @@ impl HdfsManager { let hdfs_builder = hdfsNewBuilder(); let cstr_uri = CString::new(namenode_uri.as_bytes()).unwrap(); hdfsBuilderSetNameNode(hdfs_builder, cstr_uri.as_ptr()); + if let Some(cfg) = config { + for (k, v) in cfg { + set_builder_conf(hdfs_builder, k, v)?; + } + } info!("Connecting to Namenode ({})", &namenode_uri); hdfsBuilderConnect(hdfs_builder) }; @@ -190,39 +233,6 @@ impl HdfsFs { self.raw } - pub fn connect_with_config( - nn: &str, - config: &HashMap, - ) -> Result, HdfsErr> { - unsafe { - let builder = hdfsNewBuilder(); - if builder.is_null() { - return Err(HdfsErr::Generic("Failed to create HDFS builder".to_string())); - } - - let nn_c = CString::new(nn) - .map_err(|e| HdfsErr::Generic(format!("Invalid namenode URI: {e}")))?; - hdfsBuilderSetNameNode(builder, nn_c.as_ptr()); - - for (key, value) in config { - set_builder_conf(builder, key, value)?; - } - - let fs = hdfsBuilderConnect(builder); - if fs.is_null() { - Err(HdfsErr::CannotConnectToNameNode(nn.to_string())) - } else { - let hdfs_fs = Arc::new(HdfsFs { - url: nn.to_string(), - raw: fs, - _marker: std::marker::PhantomData, - }); - - Ok(hdfs_fs) - } - } - } - /// Create HdfsFile from hdfsFile fn new_hdfs_file(&self, path: &str, file: hdfsFile) -> Result { if file.is_null() { @@ -939,7 +949,7 @@ mod test { use uuid::Uuid; use crate::minidfs::get_dfs; - use crate::hdfs::HdfsFs; + use crate::hdfs::HDFS_MANAGER; use std::collections::HashMap; #[cfg(feature = "use_existing_hdfs")] @@ -1145,7 +1155,8 @@ mod test { config.insert("fs.s3a.access.key".to_string(), "xxx".to_string()); config.insert("fs.s3a.secret.key".to_string(), "yyy".to_string()); - let result = HdfsFs::connect_with_config("default", &config); + let result = HDFS_MANAGER.get_hdfs_by_full_path_with_config("default", Some(&config)); + match result { Ok(ref fs) => println!("Successfully connected to HDFS with config: {:?}", fs.url), Err(e) => panic!("Failed to connect: {:?}", e),