diff --git a/c_src/libhdfs/exception.c b/c_src/libhdfs/exception.c index a0a60a6..5efe3fa 100644 --- a/c_src/libhdfs/exception.c +++ b/c_src/libhdfs/exception.c @@ -81,7 +81,7 @@ static const struct ExceptionInfo gExceptionInfo[] = { }, { "java.lang.UnsupportedOperationException", - 0, + NOPRINT_UNSUPPORTED_OPERATION, ENOTSUP, }, { diff --git a/c_src/libhdfs/exception.h b/c_src/libhdfs/exception.h index da4ec61..c64e54b 100644 --- a/c_src/libhdfs/exception.h +++ b/c_src/libhdfs/exception.h @@ -69,6 +69,7 @@ #define NOPRINT_EXC_UNRESOLVED_LINK 0x04 #define NOPRINT_EXC_PARENT_NOT_DIRECTORY 0x08 #define NOPRINT_EXC_ILLEGAL_ARGUMENT 0x10 +#define NOPRINT_UNSUPPORTED_OPERATION 0x20 #ifdef WIN32 #ifdef LIBHDFS_DLL_EXPORT diff --git a/c_src/libhdfs/hdfs.c b/c_src/libhdfs/hdfs.c index 34ae654..e765a37 100644 --- a/c_src/libhdfs/hdfs.c +++ b/c_src/libhdfs/hdfs.c @@ -58,7 +58,7 @@ // Bit fields for hdfsFile_internal flags #define HDFS_FILE_SUPPORTS_DIRECT_READ (1<<0) -tSize readDirect(hdfsFS fs, hdfsFile f, void* buffer, tSize length); +tSize readDirect(hdfsFS fs, hdfsFile f, void* buffer, tSize length, int noPrintFlag); static void hdfsFreeFileInfoEntry(hdfsFileInfo *hdfsFileInfo); /** @@ -1174,7 +1174,7 @@ static hdfsFile hdfsOpenFileImpl(hdfsFS fs, const char *path, int flags, if ((flags & O_WRONLY) == 0) { // Try a test read to see if we can do direct reads char buf; - if (readDirect(fs, file, &buf, 0) == 0) { + if (readDirect(fs, file, &buf, 0, NOPRINT_UNSUPPORTED_OPERATION) == 0) { // Success - 0-byte read should return 0 file->flags |= HDFS_FILE_SUPPORTS_DIRECT_READ; } else if (errno != ENOTSUP) { @@ -1409,7 +1409,7 @@ tSize hdfsRead(hdfsFS fs, hdfsFile f, void* buffer, tSize length) return -1; } if (f->flags & HDFS_FILE_SUPPORTS_DIRECT_READ) { - return readDirect(fs, f, buffer, length); + return readDirect(fs, f, buffer, length, PRINT_EXC_ALL); } // JAVA EQUIVALENT: @@ -1464,7 +1464,7 @@ tSize hdfsRead(hdfsFS fs, hdfsFile f, void* buffer, tSize length) } // Reads using the read(ByteBuffer) API, which does fewer copies -tSize readDirect(hdfsFS fs, hdfsFile f, void* buffer, tSize length) +tSize readDirect(hdfsFS fs, hdfsFile f, void* buffer, tSize length, int noPrintFlags) { // JAVA EQUIVALENT: // ByteBuffer bbuffer = ByteBuffer.allocateDirect(length) // wraps C buffer @@ -1498,7 +1498,7 @@ tSize readDirect(hdfsFS fs, hdfsFile f, void* buffer, tSize length) HADOOP_ISTRM, "read", "(Ljava/nio/ByteBuffer;)I", bb); destroyLocalReference(env, bb); if (jthr) { - errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, + errno = printExceptionAndFree(env, jthr, noPrintFlags, "readDirect: FSDataInputStream#read"); return -1; } diff --git a/src/hdfs.rs b/src/hdfs.rs index d5b6a67..6dfc97a 100644 --- a/src/hdfs.rs +++ b/src/hdfs.rs @@ -16,7 +16,7 @@ // under the License. //! it's a modified version of hdfs-rs -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::ffi::{CStr, CString}; use std::fmt::Write; use std::fmt::{Debug, Formatter}; @@ -52,6 +52,14 @@ pub fn get_hdfs() -> Result, HdfsErr> { HDFS_MANAGER.get_hdfs_by_full_path("default") } +/// Register the name of a new scheme that this implementation should allow. No validation is done. +pub fn register_scheme(scheme: & 'static str) { + HDFS_MANAGER.register_scheme(scheme) +} +pub fn get_registered_schemes() -> HashSet<& 'static str> { + HDFS_MANAGER.hdfs_schemes.read().unwrap().clone() +} + /// Remove an instance of HdfsFs from the cache by a specified path with uri pub fn unload_hdfs_cache_by_full_path( path: &str, @@ -64,16 +72,23 @@ pub fn unload_hdfs_cache(hdfs: Arc) -> Result>, HdfsE HDFS_MANAGER.remove_hdfs(hdfs) } + +pub const LOCAL_FS_SCHEME: &str = "file"; +pub const HDFS_FS_SCHEME: &str = "hdfs"; +pub const VIEW_FS_SCHEME: &str = "viewfs"; +pub const initial_schemes: [&str; 3] = [LOCAL_FS_SCHEME, HDFS_FS_SCHEME, VIEW_FS_SCHEME]; /// Hdfs manager /// All of the HdfsFs instances will be managed in a singleton HdfsManager struct HdfsManager { hdfs_cache: Arc>>>, + hdfs_schemes: Arc>>, } impl HdfsManager { fn new() -> Self { Self { hdfs_cache: Arc::new(RwLock::new(HashMap::new())), + hdfs_schemes: Arc::new(RwLock::new(initial_schemes.into_iter().collect())) } } @@ -140,6 +155,12 @@ impl HdfsManager { let mut cache = self.hdfs_cache.write().unwrap(); Ok(cache.remove(hdfs_key)) } + fn register_scheme(&self, scheme: & 'static str) { + let mut hdfs_schemes = self.hdfs_schemes.write().unwrap(); + if !hdfs_schemes.contains(scheme) { + hdfs_schemes.insert(scheme); + } + } } /// Hdfs Filesystem @@ -838,20 +859,16 @@ impl Drop for BlockHosts { } } -pub const LOCAL_FS_SCHEME: &str = "file"; -pub const HDFS_FS_SCHEME: &str = "hdfs"; -pub const VIEW_FS_SCHEME: &str = "viewfs"; - #[inline] fn get_namenode_uri(path: &str) -> Result { + let schemes = HDFS_MANAGER.hdfs_schemes.read().unwrap(); match Url::parse(path) { Ok(url) => match url.scheme() { LOCAL_FS_SCHEME => Ok("file:///".to_string()), - HDFS_FS_SCHEME | VIEW_FS_SCHEME => { + scheme if schemes.contains(&scheme) => { if let Some(host) = url.host() { let mut uri_builder = String::new(); write!(&mut uri_builder, "{}://{}", url.scheme(), host).unwrap(); - if let Some(port) = url.port() { write!(&mut uri_builder, ":{}", port).unwrap(); } @@ -860,7 +877,9 @@ fn get_namenode_uri(path: &str) -> Result { Err(HdfsErr::InvalidUrl(path.to_string())) } } - _ => Err(HdfsErr::InvalidUrl(path.to_string())), + _ => { + Err(HdfsErr::InvalidUrl(path.to_string())) + } }, Err(_) => Err(HdfsErr::InvalidUrl(path.to_string())), } @@ -868,6 +887,7 @@ fn get_namenode_uri(path: &str) -> Result { #[inline] pub fn get_uri(path: &str) -> Result { + let schemes = HDFS_MANAGER.hdfs_schemes.read().unwrap(); let path = if path.starts_with('/') { format!("{}://{}", LOCAL_FS_SCHEME, path) } else { @@ -875,7 +895,7 @@ pub fn get_uri(path: &str) -> Result { }; match Url::parse(&path) { Ok(url) => match url.scheme() { - LOCAL_FS_SCHEME | HDFS_FS_SCHEME | VIEW_FS_SCHEME => Ok(url.to_string()), + scheme if schemes.contains(&scheme) => Ok(url.to_string()), _ => Err(HdfsErr::InvalidUrl(path.to_string())), }, Err(_) => Err(HdfsErr::InvalidUrl(path.to_string())),