Skip to content

Commit 04ff40c

Browse files
authored
Upgrade arrow to 2023 Spring version (#76)
* upgrade jni and tokio * fix lint * add unit test and binding * fix javadoc * add err case
1 parent 1ff0b3b commit 04ff40c

File tree

16 files changed

+163
-81
lines changed

16 files changed

+163
-81
lines changed

datafusion-examples/src/main/java/org/apache/arrow/datafusion/examples/ExampleMain.java

+10-1
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,16 @@ public static void main(String[] args) throws Exception {
5555

5656
context
5757
.sql("select * from test_parquet limit 3")
58-
.thenComposeAsync(df -> df.registerTable(context, "test_parquet_limited"))
58+
.thenAccept(
59+
df -> {
60+
try {
61+
boolean previouslyRegistered =
62+
context.registerTable("test_parquet_limited", df.intoView()).isPresent();
63+
assert !previouslyRegistered;
64+
} catch (Exception e) {
65+
throw new RuntimeException(e);
66+
}
67+
})
5968
.join();
6069

6170
context.sql("select * from test_parquet_limited").thenComposeAsync(DataFrame::show).join();

datafusion-java/build.gradle

+1-1
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ java {
3030
withSourcesJar()
3131

3232
compileJava {
33-
options.compilerArgs += ["-h", "${buildDir}/target/headers"]
33+
options.compilerArgs += ["-h", "${layout.buildDirectory.asFile.get()}/target/headers"]
3434
}
3535
}
3636

datafusion-java/src/main/java/org/apache/arrow/datafusion/DataFrame.java

+5-5
Original file line numberDiff line numberDiff line change
@@ -45,11 +45,11 @@ public interface DataFrame extends NativeProxy {
4545
CompletableFuture<Void> writeCsv(Path path);
4646

4747
/**
48-
* Register this dataframe as a temporary table.
48+
* Converts this DataFrame into a TableProvider that can be registered as a table view using
49+
* {@link SessionContext#registerParquet(String, Path)}
4950
*
50-
* @param context SessionContext to register table to
51-
* @param name name of the tmp table
52-
* @return null
51+
* @return the table provider ready to be e.g. {@link SessionContext#registerTable(String,
52+
* TableProvider) registered}.
5353
*/
54-
CompletableFuture<Void> registerTable(SessionContext context, String name);
54+
TableProvider intoView();
5555
}

datafusion-java/src/main/java/org/apache/arrow/datafusion/DataFrames.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,5 @@ static native void writeParquet(
2020

2121
static native void writeCsv(long runtime, long dataframe, String path, Consumer<String> callback);
2222

23-
static native void registerTable(
24-
long runtime, long dataframe, long context, String name, Consumer<String> callback);
23+
static native long intoView(long dataframe);
2524
}

datafusion-java/src/main/java/org/apache/arrow/datafusion/DefaultDataFrame.java

+4-18
Original file line numberDiff line numberDiff line change
@@ -104,25 +104,11 @@ public CompletableFuture<Void> writeCsv(Path path) {
104104
return future;
105105
}
106106

107-
public CompletableFuture<Void> registerTable(SessionContext ctx, String name) {
108-
Runtime runtime = context.getRuntime();
109-
long runtimePointer = runtime.getPointer();
107+
@Override
108+
public TableProvider intoView() {
110109
long dataframe = getPointer();
111-
long contextPointer = ctx.getPointer();
112-
CompletableFuture<Void> future = new CompletableFuture<>();
113-
DataFrames.registerTable(
114-
runtimePointer,
115-
dataframe,
116-
contextPointer,
117-
name,
118-
(String errString) -> {
119-
if (containsError(errString)) {
120-
future.completeExceptionally(new RuntimeException(errString));
121-
} else {
122-
future.complete(null);
123-
}
124-
});
125-
return future;
110+
long tableProviderPointer = DataFrames.intoView(dataframe);
111+
return new DefaultTableProvider(tableProviderPointer);
126112
}
127113

128114
@Override

datafusion-java/src/main/java/org/apache/arrow/datafusion/DefaultSessionContext.java

+13
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package org.apache.arrow.datafusion;
22

33
import java.nio.file.Path;
4+
import java.util.Optional;
45
import java.util.concurrent.CompletableFuture;
56
import java.util.function.Consumer;
67
import org.slf4j.Logger;
@@ -19,6 +20,8 @@ static native void registerCsv(
1920
static native void registerParquet(
2021
long runtime, long context, String name, String path, Consumer<String> callback);
2122

23+
static native long registerTable(long context, String name, long tableProvider) throws Exception;
24+
2225
@Override
2326
public CompletableFuture<DataFrame> sql(String sql) {
2427
long runtime = getRuntime().getPointer();
@@ -64,6 +67,16 @@ public CompletableFuture<Void> registerParquet(String name, Path path) {
6467
return future;
6568
}
6669

70+
@Override
71+
public Optional<TableProvider> registerTable(String name, TableProvider tableProvider)
72+
throws Exception {
73+
long previouslyRegistered = registerTable(getPointer(), name, tableProvider.getPointer());
74+
if (previouslyRegistered == 0) {
75+
return Optional.empty();
76+
}
77+
return Optional.of(new DefaultTableProvider(previouslyRegistered));
78+
}
79+
6780
private void voidCallback(CompletableFuture<Void> future, String errMessage) {
6881
if (null != errMessage && !errMessage.isEmpty()) {
6982
future.completeExceptionally(new RuntimeException(errMessage));
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package org.apache.arrow.datafusion;
2+
3+
class DefaultTableProvider extends AbstractProxy implements TableProvider {
4+
DefaultTableProvider(long pointer) {
5+
super(pointer);
6+
}
7+
8+
@Override
9+
void doClose(long pointer) throws Exception {
10+
TableProviders.destroyTableProvider(pointer);
11+
}
12+
}

datafusion-java/src/main/java/org/apache/arrow/datafusion/SessionContext.java

+12
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package org.apache.arrow.datafusion;
22

33
import java.nio.file.Path;
4+
import java.util.Optional;
45
import java.util.concurrent.CompletableFuture;
56

67
/** A session context holds resources and is the entrance for obtaining {@link DataFrame} */
@@ -32,6 +33,17 @@ public interface SessionContext extends AutoCloseable, NativeProxy {
3233
*/
3334
CompletableFuture<Void> registerParquet(String name, Path path);
3435

36+
/**
37+
* Registers a TableProvider as a table that can be referenced from SQL statements executed
38+
* against this context.
39+
*
40+
* @param name table reference
41+
* @param tableProvider table provider
42+
* @return as of Arrow 22 this is only {@link Optional#empty()}
43+
* @throws Exception when the table is already registered
44+
*/
45+
Optional<TableProvider> registerTable(String name, TableProvider tableProvider) throws Exception;
46+
3547
/**
3648
* Get the runtime associated with this context
3749
*
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
package org.apache.arrow.datafusion;
2+
3+
/** vague interface that maps to {@code Arc<dyn TableProvider>}. */
4+
public interface TableProvider extends NativeProxy {}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
package org.apache.arrow.datafusion;
2+
3+
class TableProviders {
4+
5+
private TableProviders() {}
6+
7+
static native void destroyTableProvider(long pointer);
8+
}

datafusion-java/src/test/java/org/apache/arrow/datafusion/TestQuery.java

+21-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
package org.apache.arrow.datafusion;
22

3-
import static org.junit.jupiter.api.Assertions.*;
3+
import static org.junit.jupiter.api.Assertions.assertEquals;
4+
import static org.junit.jupiter.api.Assertions.assertFalse;
5+
import static org.junit.jupiter.api.Assertions.assertThrows;
6+
import static org.junit.jupiter.api.Assertions.assertTrue;
47

58
import java.nio.file.Files;
69
import java.nio.file.Path;
@@ -15,6 +18,23 @@
1518
import org.junit.jupiter.api.io.TempDir;
1619

1720
public class TestQuery {
21+
22+
@Test
23+
public void testQueryInMemoryTable() throws Exception {
24+
try (SessionContext context = SessionContexts.create();
25+
BufferAllocator allocator = new RootAllocator()) {
26+
DataFrame df = context.sql("SELECT * FROM (VALUES (1, 2), (3, 4)) AS t (x, y)").join();
27+
assertFalse(
28+
context.registerTable("test", df.intoView()).isPresent(),
29+
"there should not be any duplicates");
30+
testQuery(context, allocator);
31+
assertThrows(
32+
Exception.class,
33+
() -> context.registerTable("test", df.intoView()),
34+
"now there should be duplicates");
35+
}
36+
}
37+
1838
@Test
1939
public void testQueryCsv(@TempDir Path tempDir) throws Exception {
2040
try (SessionContext context = SessionContexts.create();

datafusion-jni/Cargo.toml

+2-2
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,8 @@ edition = "2021"
1212
[dependencies]
1313
jni = "^0.21.0"
1414
tokio = "^1.32.0"
15-
arrow = "^22.0"
16-
datafusion = "^12.0"
15+
arrow = "^36.0"
16+
datafusion = "^22.0"
1717

1818
[lib]
1919
crate_type = ["cdylib"]

datafusion-jni/src/context.rs

+30
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
1+
use datafusion::datasource::TableProvider;
12
use datafusion::execution::context::SessionContext;
23
use datafusion::prelude::{CsvReadOptions, ParquetReadOptions};
34
use jni::objects::{JClass, JObject, JString};
45
use jni::sys::jlong;
56
use jni::JNIEnv;
7+
use std::sync::Arc;
68
use tokio::runtime::Runtime;
79

810
#[no_mangle]
@@ -46,6 +48,34 @@ pub extern "system" fn Java_org_apache_arrow_datafusion_DefaultSessionContext_re
4648
});
4749
}
4850

51+
#[no_mangle]
52+
pub extern "system" fn Java_org_apache_arrow_datafusion_DefaultSessionContext_registerTable(
53+
mut env: JNIEnv,
54+
_class: JClass,
55+
pointer: jlong,
56+
name: JString,
57+
table_provider: jlong,
58+
) -> jlong {
59+
let name: String = env
60+
.get_string(&name)
61+
.expect("Couldn't get name as string!")
62+
.into();
63+
let context = unsafe { &mut *(pointer as *mut SessionContext) };
64+
let table_provider = unsafe { &*(table_provider as *const Arc<dyn TableProvider>) };
65+
let result = context.register_table(&name, table_provider.clone());
66+
match result {
67+
// TODO this is to be fixed on datafusion side as duplicates will not be returned
68+
// and instead returned as err
69+
Ok(Some(v)) => Box::into_raw(Box::new(v)) as jlong,
70+
Ok(None) => 0,
71+
Err(err) => {
72+
env.throw_new("java/lang/Exception", err.to_string())
73+
.unwrap();
74+
0
75+
}
76+
}
77+
}
78+
4979
#[no_mangle]
5080
pub extern "system" fn Java_org_apache_arrow_datafusion_DefaultSessionContext_registerParquet(
5181
mut env: JNIEnv,

0 commit comments

Comments
 (0)