-
Notifications
You must be signed in to change notification settings - Fork 14
/
Copy pathDefaultDataFrame.java
133 lines (119 loc) · 4.26 KB
/
DefaultDataFrame.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
package org.apache.arrow.datafusion;
import java.nio.file.Path;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.vector.ipc.ArrowFileReader;
import org.apache.arrow.vector.ipc.ArrowReader;
import org.apache.arrow.vector.util.ByteArrayReadableSeekableByteChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
class DefaultDataFrame extends AbstractProxy implements DataFrame {
private static final Logger logger = LoggerFactory.getLogger(DefaultDataFrame.class);
private final SessionContext context;
DefaultDataFrame(SessionContext context, long pointer) {
super(pointer);
this.context = context;
}
@Override
public CompletableFuture<ArrowReader> collect(BufferAllocator allocator) {
CompletableFuture<ArrowReader> result = new CompletableFuture<>();
Runtime runtime = context.getRuntime();
long runtimePointer = runtime.getPointer();
long dataframe = getPointer();
DataFrames.collectDataframe(
runtimePointer,
dataframe,
(String errString, byte[] arr) -> {
if (ErrorUtil.containsError(errString)) {
result.completeExceptionally(new RuntimeException(errString));
} else {
logger.info("successfully completed with arr length={}", arr.length);
ByteArrayReadableSeekableByteChannel byteChannel =
new ByteArrayReadableSeekableByteChannel(arr);
result.complete(new ArrowFileReader(byteChannel, allocator));
}
});
return result;
}
@Override
public CompletableFuture<RecordBatchStream> executeStream(BufferAllocator allocator) {
CompletableFuture<RecordBatchStream> result = new CompletableFuture<>();
Runtime runtime = context.getRuntime();
long runtimePointer = runtime.getPointer();
long dataframe = getPointer();
DataFrames.executeStream(
runtimePointer,
dataframe,
(errString, streamId) -> {
if (ErrorUtil.containsError(errString)) {
result.completeExceptionally(new RuntimeException(errString));
} else {
result.complete(new DefaultRecordBatchStream(context, streamId, allocator));
}
});
return result;
}
@Override
public CompletableFuture<Void> show() {
Runtime runtime = context.getRuntime();
long runtimePointer = runtime.getPointer();
long dataframe = getPointer();
CompletableFuture<Void> future = new CompletableFuture<>();
DataFrames.showDataframe(
runtimePointer,
dataframe,
new RuntimeExceptionCallback(future));
return future;
}
@Override
public CompletableFuture<Void> writeParquet(Path path) {
Runtime runtime = context.getRuntime();
long runtimePointer = runtime.getPointer();
long dataframe = getPointer();
CompletableFuture<Void> future = new CompletableFuture<>();
DataFrames.writeParquet(
runtimePointer,
dataframe,
path.toAbsolutePath().toString(),
new RuntimeExceptionCallback(future));
return future;
}
@Override
public CompletableFuture<Void> writeCsv(Path path) {
Runtime runtime = context.getRuntime();
long runtimePointer = runtime.getPointer();
long dataframe = getPointer();
CompletableFuture<Void> future = new CompletableFuture<>();
DataFrames.writeCsv(
runtimePointer,
dataframe,
path.toAbsolutePath().toString(),
new RuntimeExceptionCallback(future));
return future;
}
@Override
public TableProvider intoView() {
long dataframe = getPointer();
long tableProviderPointer = DataFrames.intoView(dataframe);
return new DefaultTableProvider(tableProviderPointer);
}
@Override
void doClose(long pointer) {
DataFrames.destroyDataFrame(pointer);
}
private static class RuntimeExceptionCallback implements Consumer<String> {
private final CompletableFuture<?> future;
private RuntimeExceptionCallback(CompletableFuture<?> future) {
this.future = future;
}
@Override
public void accept(String errString) {
if (ErrorUtil.containsError(errString)) {
future.completeExceptionally(new RuntimeException(errString));
} else {
future.complete(null);
}
}
}
}