Skip to content

Commit f457c20

Browse files
committed
Support additional file systems
1 parent 8c03c5e commit f457c20

4 files changed

Lines changed: 36 additions & 15 deletions

File tree

c_src/libhdfs/exception.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ static const struct ExceptionInfo gExceptionInfo[] = {
8181
},
8282
{
8383
"java.lang.UnsupportedOperationException",
84-
0,
84+
NOPRINT_UNSUPPORTED_OPERATION,
8585
ENOTSUP,
8686
},
8787
{

c_src/libhdfs/exception.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@
6969
#define NOPRINT_EXC_UNRESOLVED_LINK 0x04
7070
#define NOPRINT_EXC_PARENT_NOT_DIRECTORY 0x08
7171
#define NOPRINT_EXC_ILLEGAL_ARGUMENT 0x10
72+
#define NOPRINT_UNSUPPORTED_OPERATION 0x20
7273

7374
#ifdef WIN32
7475
#ifdef LIBHDFS_DLL_EXPORT

c_src/libhdfs/hdfs.c

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@
5858
// Bit fields for hdfsFile_internal flags
5959
#define HDFS_FILE_SUPPORTS_DIRECT_READ (1<<0)
6060

61-
tSize readDirect(hdfsFS fs, hdfsFile f, void* buffer, tSize length);
61+
tSize readDirect(hdfsFS fs, hdfsFile f, void* buffer, tSize length, int noPrintFlag);
6262
static void hdfsFreeFileInfoEntry(hdfsFileInfo *hdfsFileInfo);
6363

6464
/**
@@ -1174,7 +1174,7 @@ static hdfsFile hdfsOpenFileImpl(hdfsFS fs, const char *path, int flags,
11741174
if ((flags & O_WRONLY) == 0) {
11751175
// Try a test read to see if we can do direct reads
11761176
char buf;
1177-
if (readDirect(fs, file, &buf, 0) == 0) {
1177+
if (readDirect(fs, file, &buf, 0, NOPRINT_UNSUPPORTED_OPERATION) == 0) {
11781178
// Success - 0-byte read should return 0
11791179
file->flags |= HDFS_FILE_SUPPORTS_DIRECT_READ;
11801180
} else if (errno != ENOTSUP) {
@@ -1409,7 +1409,7 @@ tSize hdfsRead(hdfsFS fs, hdfsFile f, void* buffer, tSize length)
14091409
return -1;
14101410
}
14111411
if (f->flags & HDFS_FILE_SUPPORTS_DIRECT_READ) {
1412-
return readDirect(fs, f, buffer, length);
1412+
return readDirect(fs, f, buffer, length, PRINT_EXC_ALL);
14131413
}
14141414

14151415
// JAVA EQUIVALENT:
@@ -1464,7 +1464,7 @@ tSize hdfsRead(hdfsFS fs, hdfsFile f, void* buffer, tSize length)
14641464
}
14651465

14661466
// Reads using the read(ByteBuffer) API, which does fewer copies
1467-
tSize readDirect(hdfsFS fs, hdfsFile f, void* buffer, tSize length)
1467+
tSize readDirect(hdfsFS fs, hdfsFile f, void* buffer, tSize length, int noPrintFlags)
14681468
{
14691469
// JAVA EQUIVALENT:
14701470
// ByteBuffer bbuffer = ByteBuffer.allocateDirect(length) // wraps C buffer
@@ -1498,7 +1498,7 @@ tSize readDirect(hdfsFS fs, hdfsFile f, void* buffer, tSize length)
14981498
HADOOP_ISTRM, "read", "(Ljava/nio/ByteBuffer;)I", bb);
14991499
destroyLocalReference(env, bb);
15001500
if (jthr) {
1501-
errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
1501+
errno = printExceptionAndFree(env, jthr, noPrintFlags,
15021502
"readDirect: FSDataInputStream#read");
15031503
return -1;
15041504
}

src/hdfs.rs

Lines changed: 29 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
// under the License.
1717

1818
//! it's a modified version of hdfs-rs
19-
use std::collections::HashMap;
19+
use std::collections::{HashMap, HashSet};
2020
use std::ffi::{CStr, CString};
2121
use std::fmt::Write;
2222
use std::fmt::{Debug, Formatter};
@@ -52,6 +52,14 @@ pub fn get_hdfs() -> Result<Arc<HdfsFs>, HdfsErr> {
5252
HDFS_MANAGER.get_hdfs_by_full_path("default")
5353
}
5454

55+
/// Register the name of a new scheme that this implementation should allow. No validation is done.
56+
pub fn register_scheme(scheme: & 'static str) {
57+
HDFS_MANAGER.register_scheme(scheme)
58+
}
59+
pub fn get_registered_schemes() -> HashSet<& 'static str> {
60+
HDFS_MANAGER.hdfs_schemes.read().unwrap().clone()
61+
}
62+
5563
/// Remove an instance of HdfsFs from the cache by a specified path with uri
5664
pub fn unload_hdfs_cache_by_full_path(
5765
path: &str,
@@ -64,16 +72,23 @@ pub fn unload_hdfs_cache(hdfs: Arc<HdfsFs>) -> Result<Option<Arc<HdfsFs>>, HdfsE
6472
HDFS_MANAGER.remove_hdfs(hdfs)
6573
}
6674

75+
76+
pub const LOCAL_FS_SCHEME: &str = "file";
77+
pub const HDFS_FS_SCHEME: &str = "hdfs";
78+
pub const VIEW_FS_SCHEME: &str = "viewfs";
79+
pub const initial_schemes: [&str; 3] = [LOCAL_FS_SCHEME, HDFS_FS_SCHEME, VIEW_FS_SCHEME];
6780
/// Hdfs manager
6881
/// All of the HdfsFs instances will be managed in a singleton HdfsManager
6982
struct HdfsManager {
7083
hdfs_cache: Arc<RwLock<HashMap<String, Arc<HdfsFs>>>>,
84+
hdfs_schemes: Arc<RwLock<HashSet<& 'static str>>>,
7185
}
7286

7387
impl HdfsManager {
7488
fn new() -> Self {
7589
Self {
7690
hdfs_cache: Arc::new(RwLock::new(HashMap::new())),
91+
hdfs_schemes: Arc::new(RwLock::new(initial_schemes.into_iter().collect()))
7792
}
7893
}
7994

@@ -140,6 +155,12 @@ impl HdfsManager {
140155
let mut cache = self.hdfs_cache.write().unwrap();
141156
Ok(cache.remove(hdfs_key))
142157
}
158+
fn register_scheme(&self, scheme: & 'static str) {
159+
let mut hdfs_schemes = self.hdfs_schemes.write().unwrap();
160+
if !hdfs_schemes.contains(scheme) {
161+
hdfs_schemes.insert(scheme);
162+
}
163+
}
143164
}
144165

145166
/// Hdfs Filesystem
@@ -838,20 +859,16 @@ impl Drop for BlockHosts {
838859
}
839860
}
840861

841-
pub const LOCAL_FS_SCHEME: &str = "file";
842-
pub const HDFS_FS_SCHEME: &str = "hdfs";
843-
pub const VIEW_FS_SCHEME: &str = "viewfs";
844-
845862
#[inline]
846863
fn get_namenode_uri(path: &str) -> Result<String, HdfsErr> {
864+
let schemes = HDFS_MANAGER.hdfs_schemes.read().unwrap();
847865
match Url::parse(path) {
848866
Ok(url) => match url.scheme() {
849867
LOCAL_FS_SCHEME => Ok("file:///".to_string()),
850-
HDFS_FS_SCHEME | VIEW_FS_SCHEME => {
868+
scheme if schemes.contains(&scheme) => {
851869
if let Some(host) = url.host() {
852870
let mut uri_builder = String::new();
853871
write!(&mut uri_builder, "{}://{}", url.scheme(), host).unwrap();
854-
855872
if let Some(port) = url.port() {
856873
write!(&mut uri_builder, ":{}", port).unwrap();
857874
}
@@ -860,22 +877,25 @@ fn get_namenode_uri(path: &str) -> Result<String, HdfsErr> {
860877
Err(HdfsErr::InvalidUrl(path.to_string()))
861878
}
862879
}
863-
_ => Err(HdfsErr::InvalidUrl(path.to_string())),
880+
_ => {
881+
Err(HdfsErr::InvalidUrl(path.to_string()))
882+
}
864883
},
865884
Err(_) => Err(HdfsErr::InvalidUrl(path.to_string())),
866885
}
867886
}
868887

869888
#[inline]
870889
pub fn get_uri(path: &str) -> Result<String, HdfsErr> {
890+
let schemes = HDFS_MANAGER.hdfs_schemes.read().unwrap();
871891
let path = if path.starts_with('/') {
872892
format!("{}://{}", LOCAL_FS_SCHEME, path)
873893
} else {
874894
path.to_string()
875895
};
876896
match Url::parse(&path) {
877897
Ok(url) => match url.scheme() {
878-
LOCAL_FS_SCHEME | HDFS_FS_SCHEME | VIEW_FS_SCHEME => Ok(url.to_string()),
898+
scheme if schemes.contains(&scheme) => Ok(url.to_string()),
879899
_ => Err(HdfsErr::InvalidUrl(path.to_string())),
880900
},
881901
Err(_) => Err(HdfsErr::InvalidUrl(path.to_string())),

0 commit comments

Comments
 (0)