diff --git a/src/hdfs.rs b/src/hdfs.rs index d5b6a67..a7378b6 100644 --- a/src/hdfs.rs +++ b/src/hdfs.rs @@ -37,6 +37,25 @@ const O_RDONLY: c_int = 0; const O_WRONLY: c_int = 1; const O_APPEND: c_int = 8; +/// Set a configuration key-value pair for the HDFS builder. +pub fn set_builder_conf(builder: *mut hdfsBuilder, key: &str, val: &str) -> Result<(), HdfsErr> { + let key_c = CString::new(key) + .map_err(|_| HdfsErr::Generic("Invalid key for CString".to_string()))?; + let val_c = CString::new(val) + .map_err(|_| HdfsErr::Generic("Invalid value for CString".to_string()))?; + + let result = unsafe { hdfsBuilderConfSetStr(builder, key_c.as_ptr(), val_c.as_ptr()) }; + + if result != 0 { + Err(HdfsErr::Generic(format!( + "Failed to set builder conf: key = {}, val = {}", + key, val + ))) + } else { + Ok(()) + } +} + lazy_static! { static ref HDFS_MANAGER: HdfsManager = HdfsManager::new(); } @@ -47,6 +66,13 @@ pub fn get_hdfs_by_full_path(path: &str) -> Result, HdfsErr> { HDFS_MANAGER.get_hdfs_by_full_path(path) } +pub fn get_hdfs_with_config( + path: &str, + config: Option<&HashMap>, +) -> Result, HdfsErr> { + HDFS_MANAGER.get_hdfs_by_full_path_with_config(path, config) +} + /// The default NameNode configuration will be used (from the XML configuration files) pub fn get_hdfs() -> Result, HdfsErr> { HDFS_MANAGER.get_hdfs_by_full_path("default") @@ -77,10 +103,41 @@ impl HdfsManager { } } - fn get_hdfs_by_full_path(&self, path: &str) -> Result, HdfsErr> { + fn get_hdfs_by_full_path( + &self, + path: &str, + ) -> Result, HdfsErr> { + self.get_hdfs_by_full_path_with_config(path, None) + } + + pub fn get_hdfs_by_full_path_with_config( + &self, + path: &str, + config: Option<&HashMap>, + ) -> Result, HdfsErr> { let namenode_uri = match path { "default" => "default".to_owned(), - _ => get_namenode_uri(path)?, + _ => { + let parsed = Url::parse(path) + .map_err(|e| HdfsErr::InvalidUrl(format!("{path}: {e}")))?; + match parsed.scheme() { + "s3a" | "s3" => { + let bucket = parsed.host_str().unwrap_or_default(); + if bucket.is_empty() { + return Err(HdfsErr::InvalidUrl(format!( + "S3 path missing bucket: {path}" + ))); + } + format!("{}://{}", parsed.scheme(), bucket) + } + "hdfs" | "viewfs" | "file" => get_namenode_uri(path)?, + scheme => { + return Err(HdfsErr::InvalidUrl(format!( + "Unsupported scheme for get_hdfs_by_full_path: {scheme}" + ))) + } + } + } }; // Get if already exists @@ -100,6 +157,11 @@ impl HdfsManager { let hdfs_builder = hdfsNewBuilder(); let cstr_uri = CString::new(namenode_uri.as_bytes()).unwrap(); hdfsBuilderSetNameNode(hdfs_builder, cstr_uri.as_ptr()); + if let Some(cfg) = config { + for (k, v) in cfg { + set_builder_conf(hdfs_builder, k, v)?; + } + } info!("Connecting to Namenode ({})", &namenode_uri); hdfsBuilderConnect(hdfs_builder) }; @@ -887,6 +949,8 @@ mod test { use uuid::Uuid; use crate::minidfs::get_dfs; + use crate::hdfs::HDFS_MANAGER; + use std::collections::HashMap; #[cfg(feature = "use_existing_hdfs")] #[test] @@ -1083,4 +1147,19 @@ mod test { // Clean up assert!(fs.delete(test_file, false).is_ok()); } + + #[test] + fn test_connect_with_config() { + let mut config = HashMap::new(); + config.insert("dfs.client.read.shortcircuit".to_string(), "true".to_string()); + config.insert("fs.s3a.access.key".to_string(), "xxx".to_string()); + config.insert("fs.s3a.secret.key".to_string(), "yyy".to_string()); + + let result = HDFS_MANAGER.get_hdfs_by_full_path_with_config("default", Some(&config)); + + match result { + Ok(ref fs) => println!("Successfully connected to HDFS with config: {:?}", fs.url), + Err(e) => panic!("Failed to connect: {:?}", e), + } + } }