Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 81 additions & 2 deletions src/hdfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,25 @@ const O_RDONLY: c_int = 0;
const O_WRONLY: c_int = 1;
const O_APPEND: c_int = 8;

/// Set a configuration key-value pair for the HDFS builder.
pub fn set_builder_conf(builder: *mut hdfsBuilder, key: &str, val: &str) -> Result<(), HdfsErr> {
let key_c = CString::new(key)
.map_err(|_| HdfsErr::Generic("Invalid key for CString".to_string()))?;
let val_c = CString::new(val)
.map_err(|_| HdfsErr::Generic("Invalid value for CString".to_string()))?;

let result = unsafe { hdfsBuilderConfSetStr(builder, key_c.as_ptr(), val_c.as_ptr()) };

if result != 0 {
Err(HdfsErr::Generic(format!(
"Failed to set builder conf: key = {}, val = {}",
key, val
)))
} else {
Ok(())
}
}

lazy_static! {
static ref HDFS_MANAGER: HdfsManager = HdfsManager::new();
}
Expand All @@ -47,6 +66,13 @@ pub fn get_hdfs_by_full_path(path: &str) -> Result<Arc<HdfsFs>, HdfsErr> {
HDFS_MANAGER.get_hdfs_by_full_path(path)
}

pub fn get_hdfs_with_config(
path: &str,
config: Option<&HashMap<String, String>>,
) -> Result<Arc<HdfsFs>, HdfsErr> {
HDFS_MANAGER.get_hdfs_by_full_path_with_config(path, config)
}

/// The default NameNode configuration will be used (from the XML configuration files)
pub fn get_hdfs() -> Result<Arc<HdfsFs>, HdfsErr> {
HDFS_MANAGER.get_hdfs_by_full_path("default")
Expand Down Expand Up @@ -77,10 +103,41 @@ impl HdfsManager {
}
}

fn get_hdfs_by_full_path(&self, path: &str) -> Result<Arc<HdfsFs>, HdfsErr> {
fn get_hdfs_by_full_path(
&self,
path: &str,
) -> Result<Arc<HdfsFs>, HdfsErr> {
self.get_hdfs_by_full_path_with_config(path, None)
}

pub fn get_hdfs_by_full_path_with_config(
&self,
path: &str,
config: Option<&HashMap<String, String>>,
) -> Result<Arc<HdfsFs>, HdfsErr> {
let namenode_uri = match path {
"default" => "default".to_owned(),
_ => get_namenode_uri(path)?,
_ => {
let parsed = Url::parse(path)
.map_err(|e| HdfsErr::InvalidUrl(format!("{path}: {e}")))?;
match parsed.scheme() {
"s3a" | "s3" => {
let bucket = parsed.host_str().unwrap_or_default();
if bucket.is_empty() {
return Err(HdfsErr::InvalidUrl(format!(
"S3 path missing bucket: {path}"
)));
}
format!("{}://{}", parsed.scheme(), bucket)
}
"hdfs" | "viewfs" | "file" => get_namenode_uri(path)?,
scheme => {
return Err(HdfsErr::InvalidUrl(format!(
"Unsupported scheme for get_hdfs_by_full_path: {scheme}"
)))
}
}
}
};

// Get if already exists
Expand All @@ -100,6 +157,11 @@ impl HdfsManager {
let hdfs_builder = hdfsNewBuilder();
let cstr_uri = CString::new(namenode_uri.as_bytes()).unwrap();
hdfsBuilderSetNameNode(hdfs_builder, cstr_uri.as_ptr());
if let Some(cfg) = config {
for (k, v) in cfg {
set_builder_conf(hdfs_builder, k, v)?;
}
}
info!("Connecting to Namenode ({})", &namenode_uri);
hdfsBuilderConnect(hdfs_builder)
};
Expand Down Expand Up @@ -887,6 +949,8 @@ mod test {
use uuid::Uuid;

use crate::minidfs::get_dfs;
use crate::hdfs::HDFS_MANAGER;
use std::collections::HashMap;

#[cfg(feature = "use_existing_hdfs")]
#[test]
Expand Down Expand Up @@ -1083,4 +1147,19 @@ mod test {
// Clean up
assert!(fs.delete(test_file, false).is_ok());
}

#[test]
fn test_connect_with_config() {
let mut config = HashMap::new();
config.insert("dfs.client.read.shortcircuit".to_string(), "true".to_string());
config.insert("fs.s3a.access.key".to_string(), "xxx".to_string());
config.insert("fs.s3a.secret.key".to_string(), "yyy".to_string());

let result = HDFS_MANAGER.get_hdfs_by_full_path_with_config("default", Some(&config));

match result {
Ok(ref fs) => println!("Successfully connected to HDFS with config: {:?}", fs.url),
Err(e) => panic!("Failed to connect: {:?}", e),
}
}
}