Skip to content

Commit 446830d

Browse files
committed
WIP: start migrating Pipeline class
1 parent dcc84e2 commit 446830d

File tree

6 files changed

+278
-4
lines changed

6 files changed

+278
-4
lines changed

packages/core/java/pom.xml

+24
Original file line numberDiff line numberDiff line change
@@ -83,5 +83,29 @@
8383
<groupId>net.imglib2</groupId>
8484
<artifactId>imglib2</artifactId>
8585
</dependency>
86+
<dependency>
87+
<groupId>net.java.dev.jna</groupId>
88+
<artifactId>jna</artifactId>
89+
</dependency>
90+
<dependency>
91+
<groupId>net.java.dev.jna</groupId>
92+
<artifactId>jna-platform</artifactId>
93+
</dependency>
94+
<dependency>
95+
<groupId>com.google.code.gson</groupId>
96+
<artifactId>gson</artifactId>
97+
</dependency>
98+
99+
<!-- Test scope dependencies -->
100+
<dependency>
101+
<groupId>org.junit.jupiter</groupId>
102+
<artifactId>junit-jupiter-api</artifactId>
103+
<scope>test</scope>
104+
</dependency>
105+
<dependency>
106+
<groupId>org.junit.jupiter</groupId>
107+
<artifactId>junit-jupiter-engine</artifactId>
108+
<scope>test</scope>
109+
</dependency>
86110
</dependencies>
87111
</project>

packages/core/java/src/main/java/org/itk/wasm/BinaryStream.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,12 @@
1919
*/
2020
package org.itk.wasm;
2121

22+
import java.nio.ByteBuffer;
23+
2224
public class BinaryStream {
23-
public byte[] data;
25+
public ByteBuffer data;
2426

25-
public BinaryStream(byte[] data) {
27+
public BinaryStream(ByteBuffer data) {
2628
this.data = data;
2729
}
2830
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,230 @@
1+
import static io.github.kawamuray.wasmtime.WasmValType.I32;
2+
3+
import io.github.kawamuray.wasmtime.Config;
4+
import io.github.kawamuray.wasmtime.Engine;
5+
import io.github.kawamuray.wasmtime.Extern;
6+
import io.github.kawamuray.wasmtime.Linker;
7+
import io.github.kawamuray.wasmtime.Memory;
8+
import io.github.kawamuray.wasmtime.Module;
9+
import io.github.kawamuray.wasmtime.Store;
10+
import io.github.kawamuray.wasmtime.WasmFunctions;
11+
import io.github.kawamuray.wasmtime.WasmFunctions.Consumer0;
12+
import io.github.kawamuray.wasmtime.WasmFunctions.Consumer1;
13+
import io.github.kawamuray.wasmtime.WasmFunctions.Function0;
14+
import io.github.kawamuray.wasmtime.WasmFunctions.Function1;
15+
import io.github.kawamuray.wasmtime.WasmFunctions.Function2;
16+
import io.github.kawamuray.wasmtime.WasmFunctions.Function3;
17+
import io.github.kawamuray.wasmtime.WasmFunctions.Function4;
18+
import io.github.kawamuray.wasmtime.wasi.WasiCtx;
19+
import io.github.kawamuray.wasmtime.wasi.WasiCtxBuilder;
20+
21+
import java.io.ByteArrayOutputStream;
22+
import java.io.FileInputStream;
23+
import java.io.IOException;
24+
import java.io.InputStream;
25+
import java.nio.ByteBuffer;
26+
import java.nio.charset.StandardCharsets;
27+
import java.nio.file.Path;
28+
import java.util.ArrayList;
29+
import java.util.HashSet;
30+
import java.util.List;
31+
import java.util.Map;
32+
import java.util.Set;
33+
34+
public class Pipeline {
35+
private Config config;
36+
private Engine engine;
37+
private Linker linker;
38+
private Module module;
39+
private String moduleName;
40+
41+
/** TEMP */
42+
public Pipeline() throws IOException {
43+
this("../python/itkwasm/test/input/stdout-stderr-test.wasi.wasm");
44+
}
45+
46+
public Pipeline(String path) throws IOException {
47+
this(readBytes(path));
48+
}
49+
50+
public Pipeline(byte[] wasmBytes) throws IOException {
51+
config = new Config();
52+
config.wasmBulkMemory(true);
53+
config.wasmSimd(true);
54+
//config.wasmMemory64(true);
55+
engine = new Engine(config);
56+
57+
linker = new Linker(engine);
58+
//linker.allowShadowing(true);
59+
module = new Module(engine, wasmBytes);
60+
}
61+
62+
public List<PipelineOutput> run(List<String> args, List<PipelineOutput> outputs, List<PipelineInput> inputs) throws Exception {
63+
WasiCtx wasiConfig = new WasiCtxBuilder()
64+
.inheritEnv()
65+
.inheritStderr()
66+
.inheritStdin()
67+
.inheritStdout()
68+
.args(args).build();
69+
70+
Set<String> preopenDirectories = new HashSet<>();
71+
/*
72+
TODO -- ensure these paths are POSIX style, not Windows
73+
for (int index = 0; index < inputs.size(); index++) {
74+
PipelineInput<?> input = inputs.get(index);
75+
if (input.type == InterfaceTypes.TextFile || input.type == InterfaceTypes.BinaryFile) {
76+
Path parentPath = Path.of(input.data.getPath()).getParent();
77+
if (parentPath != null) {
78+
preopenDirectories.add(parentPath.toString());
79+
}
80+
}
81+
}
82+
for (int index = 0; index < outputs.size(); index++) {
83+
PipelineOutput output = outputs.get(index);
84+
if (output.type == InterfaceTypes.TextFile || output.type == InterfaceTypes.BinaryFile) {
85+
Path parentPath = Path.of(output.data.getPath()).getParent();
86+
if (parentPath != null) {
87+
preopenDirectories.add(parentPath.toString());
88+
}
89+
}
90+
}
91+
*/
92+
//
93+
for (String preopen : preopenDirectories) {
94+
wasiConfig.pushPreopenDir(Path.of(preopen), preopen);
95+
}
96+
97+
Store<Engine> store = new Store<>(engine, wasiConfig);
98+
99+
WasiCtx.addToLinker(linker);
100+
101+
// TODO: Decide how to name this more appropriately.
102+
moduleName = "instance1";
103+
104+
linker.module(store, moduleName, module);
105+
106+
Consumer0 main = consumer0(store, "");
107+
Consumer0 initialize = consumer0(store, "_initialize");
108+
Function0<Integer> delayedStart = func0(store, "itk_wasm_delayed_start");
109+
Consumer1<Integer> delayedExit = consumer1(store, "itk_wasm_delayed_exit");
110+
Function4<Integer, Integer, Integer, Integer, Integer> inputArrayAlloc = func4(store, "itk_wasm_input_array_alloc");
111+
Function3<Integer, Integer, Integer, Integer> inputJsonAlloc = func3(store, "itk_wasm_input_json_alloc");
112+
Function2<Integer, Integer, Integer> outputJsonAddress = func2(store, "itk_wasm_output_json_address");
113+
Function2<Integer, Integer, Integer> outputJsonSize = func2(store, "itk_wasm_output_json_size");
114+
Function3<Integer, Integer, Integer, Integer> outputArrayAddress = func3(store, "itk_wasm_output_array_address");
115+
Function3<Integer, Integer, Integer, Integer> outputArraySize = func3(store, "itk_wasm_output_array_size");
116+
Consumer0 freeAll = consumer0(store, "itk_wasm_free_all");
117+
Memory memory = extern(store, "memory").memory();
118+
119+
int returnCode = delayedStart.call();
120+
121+
List<PipelineOutput> populatedOutputs = new ArrayList<>();
122+
if (!outputs.isEmpty() && returnCode == 0) {
123+
for (int index = 0; index < outputs.size(); index++) {
124+
PipelineOutput output = outputs.get(index);
125+
if (output.type == InterfaceTypes.TextStream) {
126+
Pointer dataPtr = outputArrayAddress.invokeP(store, 0, index, 0);
127+
long dataLen = outputArraySize.invokeL(store, 0, index, 0);
128+
byte[] dataBytes = wasmTimeLift(dataPtr, dataLen);
129+
String dataString = new String(dataBytes, StandardCharsets.UTF_8);
130+
TextStream textStream = new TextStream(dataString);
131+
populatedOutputs.add(new PipelineOutput(InterfaceTypes.TextStream, textStream));
132+
} else if (output.type == InterfaceTypes.BinaryStream) {
133+
Pointer dataPtr = outputArrayAddress.invokeP(store, 0, index, 0);
134+
long dataLen = outputArraySize.invokeL(store, 0, index, 0);
135+
byte[] dataBytes = wasmTimeLift(dataPtr, dataLen);
136+
BinaryStream binaryStream = new BinaryStream(dataBytes);
137+
populatedOutputs.add(new PipelineOutput(InterfaceTypes.BinaryStream, binaryStream));
138+
} else {
139+
throw new IllegalArgumentException("Unexpected/not yet supported output.type " + output.type);
140+
}
141+
}
142+
}
143+
144+
DelayedExit delayedExit = instance.getExports(store).get("itk_wasm_delayed_exit");
145+
delayedExit.invokeV(store, returnCode);
146+
147+
return populatedOutputs;
148+
}
149+
150+
private Extern extern(Store<Engine> store, String name) {
151+
return linker.get(store, moduleName, name).get();
152+
}
153+
private Consumer0 consumer0(Store<Engine> store, String name) {
154+
return WasmFunctions.consumer(store, extern(store, name).func());
155+
}
156+
private Consumer1<Integer> consumer1(Store<Engine> store, String name) {
157+
return WasmFunctions.consumer(store, extern(store, name).func(), I32);
158+
}
159+
private Function0<Integer> func0(Store<Engine> store, String name) {
160+
return WasmFunctions.func(store, extern(store, name).func(), I32);
161+
}
162+
private Function1<Integer, Integer> func1(Store<Engine> store, String name) {
163+
return WasmFunctions.func(store, extern(store, name).func(), I32, I32);
164+
}
165+
private Function2<Integer, Integer, Integer> func2(Store<Engine> store, String name) {
166+
return WasmFunctions.func(store, extern(store, name).func(), I32, I32, I32);
167+
}
168+
private Function3<Integer, Integer, Integer, Integer> func3(Store<Engine> store, String name) {
169+
return WasmFunctions.func(store, extern(store, name).func(), I32, I32, I32, I32);
170+
}
171+
private Function4<Integer, Integer, Integer, Integer, Integer> func4(Store<Engine> store, String name) {
172+
return WasmFunctions.func(store, extern(store, name).func(), I32, I32, I32, I32, I32);
173+
}
174+
175+
private byte[] wasmTimeLift(Pointer ptr, long size) {
176+
long ptrValue = Pointer.nativeValue(ptr);
177+
if (ptrValue + size > memory.capacity()) {
178+
throw new IndexOutOfBoundsException("Attempting to lift out of bounds");
179+
}
180+
ByteBuffer byteBuffer = memory.getByteBuffer(ptrValue, size);
181+
byte[] data = new byte[byteBuffer.remaining()];
182+
byteBuffer.get(data);
183+
return data;
184+
}
185+
186+
private void wasmTimeLower(Pointer ptr, byte[] data) {
187+
long ptrValue = Pointer.nativeValue(ptr);
188+
long size = data.length;
189+
if (ptrValue + size > memory.capacity()) {
190+
throw new IndexOutOfBoundsException("Attempting to lower out of bounds");
191+
}
192+
ByteBuffer byteBuffer = memory.getByteBuffer(ptrValue, size);
193+
byteBuffer.put(data);
194+
}
195+
196+
private Pointer setInputArray(byte[] dataArray, int inputIndex, int subIndex) {
197+
Pointer dataPtr = new Memory(dataArray.length);
198+
dataPtr.write(0, dataArray, 0, dataArray.length);
199+
Pointer resultPtr = inputArrayAlloc.invokeP(store, 0, inputIndex, subIndex, dataArray.length);
200+
wasmTimeLower(resultPtr, dataArray);
201+
return resultPtr;
202+
}
203+
204+
private void setInputJson(Map<String, Object> dataObject, int inputIndex) throws JsonProcessingException {
205+
byte[] dataJson = objectMapper.writeValueAsBytes(dataObject);
206+
Pointer jsonPtr = inputJsonAlloc.invokeP(store, 0, inputIndex, dataJson.length);
207+
wasmTimeLower(jsonPtr, dataJson);
208+
}
209+
210+
private Map<String, Object> getOutputJson(int outputIndex) throws IOException {
211+
Pointer jsonPtr = outputJsonAddress.invokeP(store, 0, outputIndex);
212+
long jsonLen = outputJsonSize.invokeL(store, 0, outputIndex);
213+
byte[] jsonBytes = wasmTimeLift(jsonPtr, jsonLen);
214+
String jsonString = new String(jsonBytes, StandardCharsets.UTF_8);
215+
return objectMapper.readValue(jsonString, Map.class);
216+
}
217+
218+
private static byte[] readBytes(String filename) throws IOException {
219+
//try (InputStream is = Main.class.getResourceAsStream(filename)) {
220+
try (InputStream is = new FileInputStream(filename)) {
221+
ByteArrayOutputStream buffer = new ByteArrayOutputStream();
222+
int nRead;
223+
byte[] buf = new byte[16384];
224+
while ((nRead = is.read(buf, 0, buf.length)) != -1) {
225+
buffer.write(buf, 0, nRead);
226+
}
227+
return buffer.toByteArray();
228+
}
229+
}
230+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package org.itk.wasm;
2+
3+
public class PipelineInput<T> {
4+
public InterfaceTypes type;
5+
public T data;
6+
public String path;
7+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package org.itk.wasm;
2+
3+
public class PipelineOutput<T> {
4+
public InterfaceTypes type;
5+
public T data;
6+
public String path;
7+
}

packages/core/java/src/main/java/org/itk/wasm/TextStream.java

+6-2
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,14 @@
1919
*/
2020
package org.itk.wasm;
2121

22+
import java.nio.ByteBuffer;
23+
2224
public class TextStream {
23-
public String data;
25+
public ByteBuffer data;
2426

25-
public TextStream(String data) {
27+
public TextStream(ByteBuffer data) {
2628
this.data = data;
2729
}
30+
31+
// TODO: implement toString to copy data into a String ?
2832
}

0 commit comments

Comments
 (0)