Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion c_src/libhdfs/exception.c
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ static const struct ExceptionInfo gExceptionInfo[] = {
},
{
"java.lang.UnsupportedOperationException",
0,
NOPRINT_UNSUPPORTED_OPERATION,
ENOTSUP,
},
{
Expand Down
1 change: 1 addition & 0 deletions c_src/libhdfs/exception.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
#define NOPRINT_EXC_UNRESOLVED_LINK 0x04
#define NOPRINT_EXC_PARENT_NOT_DIRECTORY 0x08
#define NOPRINT_EXC_ILLEGAL_ARGUMENT 0x10
#define NOPRINT_UNSUPPORTED_OPERATION 0x20

#ifdef WIN32
#ifdef LIBHDFS_DLL_EXPORT
Expand Down
10 changes: 5 additions & 5 deletions c_src/libhdfs/hdfs.c
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
// Bit fields for hdfsFile_internal flags
#define HDFS_FILE_SUPPORTS_DIRECT_READ (1<<0)

tSize readDirect(hdfsFS fs, hdfsFile f, void* buffer, tSize length);
tSize readDirect(hdfsFS fs, hdfsFile f, void* buffer, tSize length, int noPrintFlag);
static void hdfsFreeFileInfoEntry(hdfsFileInfo *hdfsFileInfo);

/**
Expand Down Expand Up @@ -1174,7 +1174,7 @@ static hdfsFile hdfsOpenFileImpl(hdfsFS fs, const char *path, int flags,
if ((flags & O_WRONLY) == 0) {
// Try a test read to see if we can do direct reads
char buf;
if (readDirect(fs, file, &buf, 0) == 0) {
if (readDirect(fs, file, &buf, 0, NOPRINT_UNSUPPORTED_OPERATION) == 0) {
// Success - 0-byte read should return 0
file->flags |= HDFS_FILE_SUPPORTS_DIRECT_READ;
} else if (errno != ENOTSUP) {
Expand Down Expand Up @@ -1409,7 +1409,7 @@ tSize hdfsRead(hdfsFS fs, hdfsFile f, void* buffer, tSize length)
return -1;
}
if (f->flags & HDFS_FILE_SUPPORTS_DIRECT_READ) {
return readDirect(fs, f, buffer, length);
return readDirect(fs, f, buffer, length, PRINT_EXC_ALL);
}

// JAVA EQUIVALENT:
Expand Down Expand Up @@ -1464,7 +1464,7 @@ tSize hdfsRead(hdfsFS fs, hdfsFile f, void* buffer, tSize length)
}

// Reads using the read(ByteBuffer) API, which does fewer copies
tSize readDirect(hdfsFS fs, hdfsFile f, void* buffer, tSize length)
tSize readDirect(hdfsFS fs, hdfsFile f, void* buffer, tSize length, int noPrintFlags)
{
// JAVA EQUIVALENT:
// ByteBuffer bbuffer = ByteBuffer.allocateDirect(length) // wraps C buffer
Expand Down Expand Up @@ -1498,7 +1498,7 @@ tSize readDirect(hdfsFS fs, hdfsFile f, void* buffer, tSize length)
HADOOP_ISTRM, "read", "(Ljava/nio/ByteBuffer;)I", bb);
destroyLocalReference(env, bb);
if (jthr) {
errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
errno = printExceptionAndFree(env, jthr, noPrintFlags,
"readDirect: FSDataInputStream#read");
return -1;
}
Expand Down
38 changes: 29 additions & 9 deletions src/hdfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
// under the License.

//! it's a modified version of hdfs-rs
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::ffi::{CStr, CString};
use std::fmt::Write;
use std::fmt::{Debug, Formatter};
Expand Down Expand Up @@ -52,6 +52,14 @@ pub fn get_hdfs() -> Result<Arc<HdfsFs>, HdfsErr> {
HDFS_MANAGER.get_hdfs_by_full_path("default")
}

/// Register the name of a new scheme that this implementation should allow. No validation is done.
pub fn register_scheme(scheme: & 'static str) {
HDFS_MANAGER.register_scheme(scheme)
}
pub fn get_registered_schemes() -> HashSet<& 'static str> {
HDFS_MANAGER.hdfs_schemes.read().unwrap().clone()
}

/// Remove an instance of HdfsFs from the cache by a specified path with uri
pub fn unload_hdfs_cache_by_full_path(
path: &str,
Expand All @@ -64,16 +72,23 @@ pub fn unload_hdfs_cache(hdfs: Arc<HdfsFs>) -> Result<Option<Arc<HdfsFs>>, HdfsE
HDFS_MANAGER.remove_hdfs(hdfs)
}


pub const LOCAL_FS_SCHEME: &str = "file";
pub const HDFS_FS_SCHEME: &str = "hdfs";
pub const VIEW_FS_SCHEME: &str = "viewfs";
pub const initial_schemes: [&str; 3] = [LOCAL_FS_SCHEME, HDFS_FS_SCHEME, VIEW_FS_SCHEME];
/// Hdfs manager
/// All of the HdfsFs instances will be managed in a singleton HdfsManager
struct HdfsManager {
hdfs_cache: Arc<RwLock<HashMap<String, Arc<HdfsFs>>>>,
hdfs_schemes: Arc<RwLock<HashSet<& 'static str>>>,
}

impl HdfsManager {
fn new() -> Self {
Self {
hdfs_cache: Arc::new(RwLock::new(HashMap::new())),
hdfs_schemes: Arc::new(RwLock::new(initial_schemes.into_iter().collect()))
}
}

Expand Down Expand Up @@ -140,6 +155,12 @@ impl HdfsManager {
let mut cache = self.hdfs_cache.write().unwrap();
Ok(cache.remove(hdfs_key))
}
fn register_scheme(&self, scheme: & 'static str) {
let mut hdfs_schemes = self.hdfs_schemes.write().unwrap();
if !hdfs_schemes.contains(scheme) {
hdfs_schemes.insert(scheme);
}
}
}

/// Hdfs Filesystem
Expand Down Expand Up @@ -838,20 +859,16 @@ impl Drop for BlockHosts {
}
}

pub const LOCAL_FS_SCHEME: &str = "file";
pub const HDFS_FS_SCHEME: &str = "hdfs";
pub const VIEW_FS_SCHEME: &str = "viewfs";

#[inline]
fn get_namenode_uri(path: &str) -> Result<String, HdfsErr> {
let schemes = HDFS_MANAGER.hdfs_schemes.read().unwrap();
match Url::parse(path) {
Ok(url) => match url.scheme() {
LOCAL_FS_SCHEME => Ok("file:///".to_string()),
HDFS_FS_SCHEME | VIEW_FS_SCHEME => {
scheme if schemes.contains(&scheme) => {
if let Some(host) = url.host() {
let mut uri_builder = String::new();
write!(&mut uri_builder, "{}://{}", url.scheme(), host).unwrap();

if let Some(port) = url.port() {
write!(&mut uri_builder, ":{}", port).unwrap();
}
Expand All @@ -860,22 +877,25 @@ fn get_namenode_uri(path: &str) -> Result<String, HdfsErr> {
Err(HdfsErr::InvalidUrl(path.to_string()))
}
}
_ => Err(HdfsErr::InvalidUrl(path.to_string())),
_ => {
Err(HdfsErr::InvalidUrl(path.to_string()))
}
},
Err(_) => Err(HdfsErr::InvalidUrl(path.to_string())),
}
}

#[inline]
pub fn get_uri(path: &str) -> Result<String, HdfsErr> {
let schemes = HDFS_MANAGER.hdfs_schemes.read().unwrap();
let path = if path.starts_with('/') {
format!("{}://{}", LOCAL_FS_SCHEME, path)
} else {
path.to_string()
};
match Url::parse(&path) {
Ok(url) => match url.scheme() {
LOCAL_FS_SCHEME | HDFS_FS_SCHEME | VIEW_FS_SCHEME => Ok(url.to_string()),
scheme if schemes.contains(&scheme) => Ok(url.to_string()),
_ => Err(HdfsErr::InvalidUrl(path.to_string())),
},
Err(_) => Err(HdfsErr::InvalidUrl(path.to_string())),
Expand Down