Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ path = "src/lib.rs"
default = ["test_util"]
test_util = []
use_existing_hdfs = []
no_jvm_invocation = []

[build-dependencies]
cc = "1"
Expand Down
58 changes: 57 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,4 +84,60 @@ match fs.mkdir("/data") {
Ok(_) => { println!("/data has been created") },
Err(_) => { panic!("/data creation has failed") }
};
```
```

## JNI Context Support

fs-hdfs3 supports a special `no_jvm_invocation` feature for use cases where the library runs within JNI native functions and JVM invocation APIs are not needed. This is useful when implementing Java libraries with native JNI modules that use fs-hdfs3 to call Hadoop FileSystem APIs.

### Using the no_jvm_invocation Feature

To use this feature, add it to your Cargo.toml:

```toml
[dependencies.fs-hdfs3]
version = "0.1.12"
features = ["no_jvm_invocation"]
```

When this feature is enabled:
- The library does not link to `libjvm.so`
- JVM invocation APIs are disabled
- You must provide the JavaVM using the `jni_context` module functions

### JNI Native Function Example

```rust
use fs_hdfs3::jni_context::set_java_vm;
use fs_hdfs3::hdfs::get_hdfs;

// Call this once at application startup, typically in JNI_OnLoad
#[no_mangle]
pub extern "C" fn JNI_OnLoad(vm: *mut std::ffi::c_void, _reserved: *mut std::ffi::c_void) -> i32 {
// Set the JavaVM for fs-hdfs3 to use
if unsafe { set_java_vm(vm) }.is_err() {
return -1; // JNI_ERR
}
0x00010008 // JNI_VERSION_1_8
}

// Later, in your JNI native functions, just use fs-hdfs3 normally
#[no_mangle]
pub extern "C" fn Java_com_example_MyClass_myNativeMethod(
_env: *mut std::ffi::c_void,
_class: *mut std::ffi::c_void,
) -> i32 {
// No need to manage JNIEnv - fs-hdfs3 handles it automatically
match get_hdfs() {
Ok(fs) => {
// Use the filesystem...
println!("Successfully connected to HDFS");
}
Err(e) => {
eprintln!("Failed to connect to HDFS: {:?}", e);
return -1;
}
}
0
}
```
33 changes: 22 additions & 11 deletions build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,17 +39,22 @@ fn build_ffi(flags: &[String]) {
);

// To avoid the order issue of dependent dynamic libraries
#[cfg(not(feature = "no_jvm_invocation"))]
println!("cargo:rustc-link-lib=jvm");

let bindings = bindgen::Builder::default()
let builder = bindgen::Builder::default()
.header(header)
.allowlist_function("nmd.*")
.allowlist_function("hdfs.*")
.allowlist_function("hadoop.*")
.clang_args(flags)
.rustified_enum("tObjectKind")
.generate()
.expect("Unable to generate bindings");
.rustified_enum("tObjectKind");

// Add the conditional functions for no_jvm_invocation
#[cfg(feature = "no_jvm_invocation")]
let builder = builder.allowlist_function("hdfsSetJavaVM");

let bindings = builder.generate().expect("Unable to generate bindings");

let out_path = PathBuf::from(env::var("OUT_DIR").unwrap());
bindings
Expand Down Expand Up @@ -114,6 +119,9 @@ fn get_build_flags() -> Vec<String> {
result.extend(get_java_dependency());
result.push(String::from("-Wno-incompatible-pointer-types"));

#[cfg(feature = "no_jvm_invocation")]
result.push(String::from("-DLIBHDFS_NO_JVM_INVOCATION"));

result
}

Expand All @@ -129,14 +137,17 @@ fn get_java_dependency() -> Vec<String> {
#[cfg(target_os = "macos")]
result.push(format!("-I{java_home}/include/darwin"));

// libjvm link
let jvm_lib_location = java_locator::locate_jvm_dyn_library().unwrap();
println!("cargo:rustc-link-search=native={}", jvm_lib_location);
println!("cargo:rustc-link-lib=jvm");
#[cfg(not(feature = "no_jvm_invocation"))]
{
// libjvm link
let jvm_lib_location = java_locator::locate_jvm_dyn_library().unwrap();
println!("cargo:rustc-link-search=native={}", jvm_lib_location);
println!("cargo:rustc-link-lib=jvm");

// For tests, add libjvm path to rpath, this does not propagate upwards,
// unless building an .so, as per Cargo specs, so is only used when testing
println!("cargo:rustc-link-arg=-Wl,-rpath,{jvm_lib_location}");
// For tests, add libjvm path to rpath, this does not propagate upwards,
// unless building an .so, as per Cargo specs, so is only used when testing
println!("cargo:rustc-link-arg=-Wl,-rpath,{jvm_lib_location}");
}

result
}
Expand Down
7 changes: 7 additions & 0 deletions c_src/libhdfs/hdfs.c
Original file line number Diff line number Diff line change
Expand Up @@ -3600,6 +3600,13 @@ char* hdfsGetLastExceptionStackTrace()
return getLastTLSExceptionStackTrace();
}

#ifdef LIBHDFS_NO_JVM_INVOCATION
int hdfsSetJavaVM(void *vm)
{
return setJavaVM(vm);
}
#endif /* LIBHDFS_NO_JVM_INVOCATION */

/**
* vim: ts=4: sw=4: et:
*/
5 changes: 5 additions & 0 deletions c_src/libhdfs/hdfs.h
Original file line number Diff line number Diff line change
Expand Up @@ -1079,6 +1079,11 @@ extern "C" {
LIBHDFS_EXTERNAL
char* hdfsGetLastExceptionStackTrace();

#ifdef LIBHDFS_NO_JVM_INVOCATION
LIBHDFS_EXTERNAL
int hdfsSetJavaVM(void *vm);
#endif /* LIBHDFS_NO_JVM_INVOCATION */

#ifdef __cplusplus
}
#endif
Expand Down
142 changes: 140 additions & 2 deletions c_src/libhdfs/jni_helper.c
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include <dirent.h>
#include <stdio.h>
#include <string.h>
#include <stdatomic.h>

static struct htable *gClassRefHTable = NULL;

Expand Down Expand Up @@ -629,6 +630,7 @@ static char* getClassPath()
return expandedClasspath;
}

#ifndef LIBHDFS_NO_JVM_INVOCATION

/**
* Get the global JNI environemnt.
Expand Down Expand Up @@ -701,7 +703,7 @@ static JNIEnv* getGlobalJNIEnv(void)
}
options[0].optionString = optHadoopClassPath;
hadoopJvmArgs = getenv("LIBHDFS_OPTS");
if (hadoopJvmArgs != NULL) {
if (hadoopJvmArgs != NULL) {
hadoopJvmArgs = strdup(hadoopJvmArgs);
for (noArgs = 1, str = hadoopJvmArgs; ; noArgs++, str = NULL) {
token = strtok_r(str, jvmArgDelims, &savePtr);
Expand Down Expand Up @@ -820,6 +822,143 @@ JNIEnv* getJNIEnv(void)
return NULL;
}

#else

/* Global JavaVM for no_jvm_invocation mode - using atomic for efficiency
*
* Memory ordering explanation:
* - setJavaVM uses memory_order_release: ensures all prior writes are visible
* before the JavaVM pointer becomes visible to other threads
* - getJNIEnvNoInvocation uses memory_order_acquire: ensures the JavaVM pointer
* load happens before any subsequent operations that depend on it
* - This provides safe publication of the JavaVM without expensive mutex locking
*/
static _Atomic(JavaVM*) g_cachedJavaVM = NULL;

int setJavaVM(void *vm)
{
JavaVM *javaVM = (JavaVM *)vm;
JavaVM *expected = NULL;

if (!vm) {
fprintf(stderr, "setJavaVM: vm parameter cannot be NULL\n");
return -1;
}

/* Atomically set the JavaVM if it's not already set (compare-and-swap) */
if (!atomic_compare_exchange_strong(&g_cachedJavaVM, &expected, javaVM)) {
if (atomic_load(&g_cachedJavaVM) == javaVM) {
/* The same JavaVM is already set, so we ignore this call. */
return 0;
} else {
fprintf(stderr, "setJavaVM: JavaVM already set. Cannot call setJavaVM() with a different JavaVM.\n");
return -1;
}
}

return 0;
}

/**
* getJNIEnv: A helper function to get the JNIEnv* for the given thread.
* Uses a JavaVM that was previously set via setJavaVM(). If the current thread
* is already attached by the caller, returns JNIEnv directly. If not attached,
* attaches the thread and stores JNIEnv in TLS for proper cleanup.
*
* Implementation note: we use POSIX thread-local storage (TLS) ONLY for threads
* that WE attach (not caller-attached threads). This allows us to associate a
* destructor function with each thread, that will detach the thread from the Java VM
* when the thread terminates. If we fail to do this, it will cause a memory leak.
* The contract is: if state->env is not NULL, then WE attached this thread.
*
* However, POSIX TLS is not the most efficient way to do things. It requires a
* key to be initialized before it can be used. Since we don't know if this key
* is initialized at the start of this function, we have to lock a mutex first
* and check. Luckily, most operating systems support the more efficient
* __thread construct, which is initialized by the linker.
*
* @param: None.
* @return The JNIEnv* corresponding to the thread.
*/
JNIEnv* getJNIEnv(void)
{
struct ThreadLocalState *state = NULL;
JNIEnv *env = NULL;
jint rv;

JavaVM *vm = atomic_load(&g_cachedJavaVM);
if (vm == NULL) {
fprintf(stderr, "getJNIEnv: JavaVM not set. Call setJavaVM() first.\n");
return NULL;
}

/* Check thread local storage first - if we have TLS with an env, we must have
* attached this thread */
THREAD_LOCAL_STORAGE_GET_QUICK(&state);
if (state && state->env) return state->env;

mutexLock(&jvmMutex);
if (threadLocalStorageGet(&state)) {
mutexUnlock(&jvmMutex);
return NULL;
}
if (state) {
if (state->env) {
/* We have attached this thread before, so we can return the JNIEnv directly. */
mutexUnlock(&jvmMutex);

/* Free any stale exception strings */
free(state->lastExceptionRootCause);
free(state->lastExceptionStackTrace);
state->lastExceptionRootCause = NULL;
state->lastExceptionStackTrace = NULL;

return state->env;
}
} else {
/* Create a ThreadLocalState for this thread */
state = threadLocalStorageCreate();
if (!state) {
mutexUnlock(&jvmMutex);
fprintf(stderr, "getJNIEnv: Unable to create ThreadLocalState\n");
return NULL;
}
if (threadLocalStorageSet(state)) {
mutexUnlock(&jvmMutex);
fprintf(stderr, "getJNIEnv: Unable to set ThreadLocalState\n");
return NULL;
}
THREAD_LOCAL_STORAGE_SET_QUICK(state);
mutexUnlock(&jvmMutex);
}

/* Try to get JNIEnv from the JavaVM - this will succeed if the thread is already attached */
rv = (*vm)->GetEnv(vm, (void**)&env, JNI_VERSION_1_2);

if (rv == JNI_OK && env != NULL) {
/* Thread is already attached by caller - return JNIEnv directly without storing in TLS */
return env;
} else if (rv == JNI_EDETACHED) {
/* Thread is not attached - we need to attach it ourselves and store in TLS */
rv = (*vm)->AttachCurrentThread(vm, (void**)&env, NULL);
if (rv != JNI_OK || env == NULL) {
fprintf(stderr, "getJNIEnv: AttachCurrentThread failed with error: %d\n", rv);
return NULL;
}

/* Store the JNIEnv we attached in the thread local state. The contract is that if
* state->env is not NULL, then it MUST BE US who attached this thread. */
state->env = env;
return env;
} else {
/* Some other error occurred */
fprintf(stderr, "getJNIEnv: JavaVM->GetEnv failed with error: %d\n", rv);
return NULL;
}
}

#endif /* LIBHDFS_NO_JVM_INVOCATION */

char* getLastTLSExceptionRootCause()
{
struct ThreadLocalState *state = NULL;
Expand Down Expand Up @@ -938,4 +1077,3 @@ jthrowable fetchEnumInstance(JNIEnv *env, const char *className,
*out = jEnum;
return NULL;
}

25 changes: 21 additions & 4 deletions c_src/libhdfs/jni_helper.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,10 +138,14 @@ LIBHDFS_EXTERNAL
jthrowable classNameOfObject(jobject jobj, JNIEnv *env, char **name);

/** getJNIEnv: A helper function to get the JNIEnv* for the given thread.
* It gets this from the ThreadLocalState if it exists. If a ThreadLocalState
* does not exist, one will be created.
* If no JVM exists, then one will be created. JVM command line arguments
* are obtained from the LIBHDFS_OPTS environment variable.
*
* In regular mode: Gets JNIEnv from ThreadLocalState if it exists, otherwise
* creates one. If no JVM exists, creates one using LIBHDFS_OPTS environment variable.
*
* In no_jvm_invocation mode: Uses JavaVM set via setJavaVM(). If the current thread
* is already attached by the caller, returns JNIEnv directly. If not attached,
* attaches the thread and stores JNIEnv in ThreadLocalState for proper cleanup.
*
* @param: None.
* @return The JNIEnv* corresponding to the thread.
* */
Expand Down Expand Up @@ -184,6 +188,19 @@ char* getLastTLSExceptionStackTrace();
LIBHDFS_EXTERNAL
void setTLSExceptionStrings(const char *rootCause, const char *stackTrace);

#ifdef LIBHDFS_NO_JVM_INVOCATION
/** setJavaVM: Set the JavaVM for use in JNI context.
* This function should be called once at the beginning of the application
* to provide the JavaVM that was passed to the JNI library.
* The JavaVM will be cached and used to obtain JNIEnv for each thread.
*
* @param vm The JavaVM pointer from JNI context.
* @return 0 on success, -1 on error.
*/
LIBHDFS_EXTERNAL
int setJavaVM(void *vm);
#endif /* LIBHDFS_NO_JVM_INVOCATION */

/**
* Figure out if a Java object is an instance of a particular class.
*
Expand Down
3 changes: 2 additions & 1 deletion c_src/libhdfs/os/posix/thread_local_storage.c
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ void hdfsThreadDestructor(void *v)
JNIEnv *env = state->env;;
jint ret;

/* Detach the current thread from the JVM */
/* Detach the current thread from the JVM if env is present (means WE attached it) */
if (env) {
ret = (*env)->GetJavaVM(env, &vm);
if (ret) {
Expand Down Expand Up @@ -71,6 +71,7 @@ struct ThreadLocalState* threadLocalStorageCreate()
"threadLocalStorageSet: OOM - Unable to allocate thread local state\n");
return NULL;
}
state->env = NULL;
state->lastExceptionStackTrace = NULL;
state->lastExceptionRootCause = NULL;
return state;
Expand Down
Loading