Skip to content

Commit 124bf0e

Browse files
authored
fix(java): fix bytebuffer no such method error (#1580)
## What does this PR do? This PR add a bytebuffer util and fix bytebuffer no such method error ## Related issues Closes #1577 #1579 ## Does this PR introduce any user-facing change? <!-- If any user-facing interface changes, please [open an issue](https://github.com/apache/incubator-fury/issues/new/choose) describing the need to do so and update the document if necessary. --> - [ ] Does this PR introduce any public API change? - [ ] Does this PR introduce any binary protocol compatibility change? ## Benchmark <!-- When the PR has an impact on performance (if you don't know whether the PR will have an impact on performance, you can submit the PR first, and if it will have impact on performance, the code reviewer will explain it), be sure to attach a benchmark data here. -->
1 parent a003624 commit 124bf0e

File tree

14 files changed

+148
-145
lines changed

14 files changed

+148
-145
lines changed

java/benchmark/src/main/java/org/apache/fury/benchmark/UserTypeDeserializeSuite.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
import org.apache.fury.benchmark.state.ObjectType;
3131
import org.apache.fury.benchmark.state.ProtoBuffersState;
3232
import org.apache.fury.benchmark.state.ProtostuffState;
33-
import org.apache.fury.memory.Platform;
33+
import org.apache.fury.memory.ByteBufferUtil;
3434
import org.openjdk.jmh.Main;
3535
import org.openjdk.jmh.annotations.Benchmark;
3636
import org.openjdk.jmh.annotations.BenchmarkMode;
@@ -125,7 +125,7 @@ public Object protobuffers_deserialize(ProtoBuffersState.ProtoBuffersUserTypeSta
125125

126126
@Benchmark
127127
public Object flatbuffers_deserialize(FlatBuffersState.FlatBuffersUserTypeState state) {
128-
Platform.clearBuffer(state.deserializedData);
128+
ByteBufferUtil.clearBuffer(state.deserializedData);
129129
if (state.objectType == ObjectType.SAMPLE) {
130130
return FlatBuffersState.deserializeSample(state.deserializedData);
131131
} else {

java/benchmark/src/main/java/org/apache/fury/benchmark/UserTypeSerializeSuite.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
import org.apache.fury.benchmark.state.ObjectType;
3434
import org.apache.fury.benchmark.state.ProtoBuffersState;
3535
import org.apache.fury.benchmark.state.ProtostuffState;
36-
import org.apache.fury.memory.Platform;
36+
import org.apache.fury.memory.ByteBufferUtil;
3737
import org.openjdk.jmh.Main;
3838
import org.openjdk.jmh.annotations.Benchmark;
3939
import org.openjdk.jmh.annotations.BenchmarkMode;
@@ -133,7 +133,7 @@ public byte[] protobuffers_serialize(ProtoBuffersState.ProtoBuffersUserTypeState
133133

134134
@Benchmark
135135
public Object flatbuffers_serialize(FlatBuffersState.FlatBuffersUserTypeState state) {
136-
Platform.clearBuffer(state.directBuffer);
136+
ByteBufferUtil.clearBuffer(state.directBuffer);
137137
if (state.objectType == ObjectType.SAMPLE) {
138138
return FlatBuffersState.serializeSample((Sample) state.object, state.directBuffer);
139139
} else {

java/benchmark/src/main/java/org/apache/fury/benchmark/ZeroCopySuite.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,9 @@
4141
import org.apache.fury.benchmark.data.SerializableByteBuffer;
4242
import org.apache.fury.benchmark.state.BufferType;
4343
import org.apache.fury.config.Language;
44+
import org.apache.fury.memory.ByteBufferUtil;
4445
import org.apache.fury.memory.MemoryBuffer;
4546
import org.apache.fury.memory.MemoryUtils;
46-
import org.apache.fury.memory.Platform;
4747
import org.apache.fury.serializer.BufferObject;
4848
import org.apache.fury.test.bean.ArraysData;
4949
import org.apache.fury.util.Preconditions;
@@ -314,7 +314,7 @@ public Object jsonb_serialize(JsonBState state, Blackhole bh) {
314314
public static Object jsonbSerialize(JsonBState state, Blackhole bh) {
315315
byte[] bytes = JSONB.toBytes(state.data, state.jsonbWriteFeatures);
316316
if (state.bufferType == BufferType.directBuffer) {
317-
Platform.clearBuffer(state.directBuffer);
317+
ByteBufferUtil.clearBuffer(state.directBuffer);
318318
state.directBuffer.put(bytes);
319319
}
320320
if (bh != null) {
@@ -331,7 +331,7 @@ public Object jsonb_deserialize(JsonBState state, Blackhole bh) {
331331

332332
public static Object jsonbDeserialize(JsonBState state, Blackhole bh) {
333333
if (state.bufferType == BufferType.directBuffer) {
334-
Platform.rewind(state.directBuffer);
334+
ByteBufferUtil.rewind(state.directBuffer);
335335
byte[] bytes = new byte[state.buffer.length];
336336
state.directBuffer.get(bytes);
337337
Object newObj = JSONB.parseObject(bytes, Object.class, state.jsonbReaderFeatures);

java/benchmark/src/main/java/org/apache/fury/benchmark/state/FlatBuffersState.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@
3939
import org.apache.fury.benchmark.state.generated.FBSMedia;
4040
import org.apache.fury.benchmark.state.generated.FBSMediaContent;
4141
import org.apache.fury.benchmark.state.generated.FBSSample;
42-
import org.apache.fury.memory.Platform;
42+
import org.apache.fury.memory.ByteBufferUtil;
4343
import org.checkerframework.checker.nullness.qual.Nullable;
4444
import org.openjdk.jmh.annotations.Level;
4545
import org.openjdk.jmh.annotations.Param;
@@ -439,7 +439,7 @@ public void setup() {
439439
} else {
440440
deserializedData = ByteBuffer.wrap(data);
441441
}
442-
Platform.clearBuffer(deserializedData);
442+
ByteBufferUtil.clearBuffer(deserializedData);
443443
Object newObj = deserializeFunc.apply(deserializedData);
444444
Preconditions.checkArgument(object.equals(newObj));
445445
}

java/benchmark/src/main/java/org/apache/fury/benchmark/state/FstState.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
import org.apache.fury.benchmark.data.Image;
3030
import org.apache.fury.benchmark.data.Media;
3131
import org.apache.fury.benchmark.data.MediaContent;
32-
import org.apache.fury.memory.Platform;
32+
import org.apache.fury.memory.ByteBufferUtil;
3333
import org.apache.fury.util.Preconditions;
3434
import org.nustaq.serialization.FSTConfiguration;
3535
import org.openjdk.jmh.annotations.CompilerControl;
@@ -93,7 +93,7 @@ public static byte[] serialize(
9393
Blackhole blackhole) {
9494
byte[] bytes = fst.asSharedByteArray(value, out);
9595
if (bufferType == BufferType.directBuffer) {
96-
Platform.clearBuffer(directBuffer);
96+
ByteBufferUtil.clearBuffer(directBuffer);
9797
directBuffer.put(bytes, 0, out[0]);
9898
}
9999
if (blackhole != null) {
@@ -116,7 +116,7 @@ public static Object deserialize(
116116
ByteBuffer directBuffer,
117117
Blackhole blackhole) {
118118
if (bufferType == BufferType.directBuffer) {
119-
Platform.rewind(directBuffer);
119+
ByteBufferUtil.rewind(directBuffer);
120120
byte[] bytes = new byte[out[0]];
121121
directBuffer.get(bytes);
122122
Object newObj = fst.asObject(bytes);

java/benchmark/src/main/java/org/apache/fury/benchmark/state/JsonbState.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@
3838
import org.apache.fury.benchmark.data.CustomJDKSerialization;
3939
import org.apache.fury.logging.Logger;
4040
import org.apache.fury.logging.LoggerFactory;
41-
import org.apache.fury.memory.Platform;
41+
import org.apache.fury.memory.ByteBufferUtil;
4242
import org.apache.fury.util.Preconditions;
4343
import org.openjdk.jmh.annotations.CompilerControl;
4444
import org.openjdk.jmh.annotations.Fork;
@@ -90,7 +90,7 @@ public void setup() {
9090
"======> Jsonb | {} | {} | {} | {} |", objectType, references, bufferType, buffer.length);
9191
if (bufferType == BufferType.directBuffer) {
9292
directBuffer.put(buffer);
93-
Platform.clearBuffer(directBuffer);
93+
ByteBufferUtil.clearBuffer(directBuffer);
9494
}
9595
Preconditions.checkArgument(object.equals(deserialize(null, this)));
9696
}
@@ -127,7 +127,7 @@ public static JSONReader.Feature[] getJsonbReaderConfig(boolean refTracking) {
127127
public static byte[] serialize(Blackhole blackhole, JsonbBenchmarkState state, Object value) {
128128
byte[] bytes = JSONB.toBytes(value, state.jsonbWriteFeatures);
129129
if (state.bufferType == BufferType.directBuffer) {
130-
Platform.clearBuffer(state.directBuffer);
130+
ByteBufferUtil.clearBuffer(state.directBuffer);
131131
state.directBuffer.put(bytes);
132132
}
133133
if (blackhole != null) {
@@ -139,7 +139,7 @@ public static byte[] serialize(Blackhole blackhole, JsonbBenchmarkState state, O
139139

140140
public static Object deserialize(Blackhole blackhole, JsonbBenchmarkState state) {
141141
if (state.bufferType == BufferType.directBuffer) {
142-
Platform.rewind(state.directBuffer);
142+
ByteBufferUtil.rewind(state.directBuffer);
143143
byte[] bytes = new byte[state.buffer.length];
144144
state.directBuffer.get(bytes);
145145
Object newObj = JSONB.parseObject(bytes, Object.class, state.jsonbReaderFeatures);

java/fury-core/src/main/java/org/apache/fury/io/FuryReadableChannel.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.nio.channels.ReadableByteChannel;
2525
import javax.annotation.concurrent.NotThreadSafe;
2626
import org.apache.fury.exception.DeserializationException;
27+
import org.apache.fury.memory.ByteBufferUtil;
2728
import org.apache.fury.memory.MemoryBuffer;
2829
import org.apache.fury.memory.Platform;
2930
import org.apache.fury.util.Preconditions;
@@ -60,7 +61,7 @@ public int fillBuffer(int minFillSize) {
6061
byteBuf.position(0);
6162
newByteBuf.put(byteBuf);
6263
byteBuf = byteBuffer = newByteBuf;
63-
memoryBuf.initDirectBuffer(Platform.getAddress(byteBuf), position, byteBuf);
64+
memoryBuf.initDirectBuffer(ByteBufferUtil.getAddress(byteBuf), position, byteBuf);
6465
}
6566
byteBuf.limit(newLimit);
6667
int readCount = channel.read(byteBuf);
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.fury.memory;
21+
22+
import java.lang.reflect.Field;
23+
import java.nio.Buffer;
24+
import java.nio.ByteBuffer;
25+
import org.apache.fury.util.Preconditions;
26+
27+
public class ByteBufferUtil {
28+
private static final long BUFFER_ADDRESS_FIELD_OFFSET;
29+
private static final long BUFFER_CAPACITY_FIELD_OFFSET;
30+
31+
static {
32+
try {
33+
Field addressField = Buffer.class.getDeclaredField("address");
34+
BUFFER_ADDRESS_FIELD_OFFSET = Platform.objectFieldOffset(addressField);
35+
Preconditions.checkArgument(BUFFER_ADDRESS_FIELD_OFFSET != 0);
36+
Field capacityField = Buffer.class.getDeclaredField("capacity");
37+
BUFFER_CAPACITY_FIELD_OFFSET = Platform.objectFieldOffset(capacityField);
38+
Preconditions.checkArgument(BUFFER_CAPACITY_FIELD_OFFSET != 0);
39+
} catch (NoSuchFieldException e) {
40+
throw new IllegalStateException(e);
41+
}
42+
}
43+
44+
public static long getAddress(ByteBuffer buffer) {
45+
Preconditions.checkNotNull(buffer, "buffer is null");
46+
Preconditions.checkArgument(buffer.isDirect(), "Can't get address of a non-direct ByteBuffer.");
47+
long offHeapAddress;
48+
try {
49+
offHeapAddress = Platform.getLong(buffer, BUFFER_ADDRESS_FIELD_OFFSET);
50+
} catch (Throwable t) {
51+
throw new Error("Could not access direct byte buffer address field.", t);
52+
}
53+
return offHeapAddress;
54+
}
55+
56+
private static final ByteBuffer localBuffer = ByteBuffer.allocateDirect(0);
57+
58+
/** Create a direct buffer from native memory represented by address [address, address + size). */
59+
public static ByteBuffer createDirectByteBufferFromNativeAddress(long address, int size) {
60+
try {
61+
// ByteBuffer.allocateDirect(0) is about 30x slower than `localBuffer.duplicate()`.
62+
ByteBuffer buffer = localBuffer.duplicate();
63+
Platform.putLong(buffer, BUFFER_ADDRESS_FIELD_OFFSET, address);
64+
Platform.putInt(buffer, BUFFER_CAPACITY_FIELD_OFFSET, size);
65+
buffer.clear();
66+
return buffer;
67+
} catch (Throwable t) {
68+
throw new Error("Failed to wrap unsafe off-heap memory with ByteBuffer", t);
69+
}
70+
}
71+
72+
/** Wrap a buffer [address, address + size) into provided <code>buffer</code>. */
73+
public static void wrapDirectByteBufferFromNativeAddress(
74+
ByteBuffer buffer, long address, int size) {
75+
Preconditions.checkArgument(
76+
buffer.isDirect(), "Can't wrap native memory into a non-direct ByteBuffer.");
77+
Platform.putLong(buffer, BUFFER_ADDRESS_FIELD_OFFSET, address);
78+
Platform.putInt(buffer, BUFFER_CAPACITY_FIELD_OFFSET, size);
79+
buffer.clear();
80+
}
81+
82+
public static ByteBuffer wrapDirectBuffer(long address, int size) {
83+
return createDirectByteBufferFromNativeAddress(address, size);
84+
}
85+
86+
/** Wrap a buffer [address, address + size) into provided <code>buffer</code>. */
87+
public static void wrapDirectBuffer(ByteBuffer buffer, long address, int size) {
88+
Platform.putLong(buffer, BUFFER_ADDRESS_FIELD_OFFSET, address);
89+
Platform.putInt(buffer, BUFFER_CAPACITY_FIELD_OFFSET, size);
90+
buffer.clear();
91+
}
92+
93+
public static void clearBuffer(Buffer buffer) {
94+
buffer.clear();
95+
}
96+
97+
public static void flipBuffer(Buffer buffer) {
98+
buffer.flip();
99+
}
100+
101+
public static void rewind(Buffer buffer) {
102+
buffer.rewind();
103+
}
104+
105+
public static void position(Buffer buffer, int pos) {
106+
buffer.position(pos);
107+
}
108+
}

java/fury-core/src/main/java/org/apache/fury/memory/MemoryBuffer.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -329,7 +329,7 @@ public void get(int offset, ByteBuffer target, int numBytes) {
329329
}
330330
final int targetPos = target.position();
331331
if (target.isDirect()) {
332-
final long targetAddr = Platform.getAddress(target) + targetPos;
332+
final long targetAddr = ByteBufferUtil.getAddress(target) + targetPos;
333333
final long sourceAddr = address + offset;
334334
if (sourceAddr <= addressLimit - numBytes) {
335335
Platform.copyMemory(heapMemory, sourceAddr, null, targetAddr, numBytes);
@@ -340,7 +340,7 @@ public void get(int offset, ByteBuffer target, int numBytes) {
340340
assert target.hasArray();
341341
get(offset, target.array(), targetPos + target.arrayOffset(), numBytes);
342342
}
343-
target.position(targetPos + numBytes);
343+
ByteBufferUtil.position(target, targetPos + numBytes);
344344
}
345345

346346
public void put(int offset, ByteBuffer source, int numBytes) {
@@ -350,7 +350,7 @@ public void put(int offset, ByteBuffer source, int numBytes) {
350350
}
351351
final int sourcePos = source.position();
352352
if (source.isDirect()) {
353-
final long sourceAddr = Platform.getAddress(source) + sourcePos;
353+
final long sourceAddr = ByteBufferUtil.getAddress(source) + sourcePos;
354354
final long targetAddr = address + offset;
355355
if (targetAddr <= addressLimit - numBytes) {
356356
Platform.copyMemory(null, sourceAddr, heapMemory, targetAddr, numBytes);
@@ -361,7 +361,7 @@ public void put(int offset, ByteBuffer source, int numBytes) {
361361
assert source.hasArray();
362362
put(offset, source.array(), sourcePos + source.arrayOffset(), numBytes);
363363
}
364-
source.position(sourcePos + numBytes);
364+
ByteBufferUtil.position(source, sourcePos + numBytes);
365365
}
366366

367367
public void put(int index, byte[] src) {
@@ -2465,12 +2465,12 @@ public ByteBuffer sliceAsByteBuffer(int offset, int length) {
24652465
ByteBuffer offHeapBuffer = this.offHeapBuffer;
24662466
if (offHeapBuffer != null) {
24672467
ByteBuffer duplicate = offHeapBuffer.duplicate();
2468-
int start = (int) (address - Platform.getAddress(duplicate));
2469-
duplicate.position(start + offset);
2468+
int start = (int) (address - ByteBufferUtil.getAddress(duplicate));
2469+
ByteBufferUtil.position(duplicate, start + offset);
24702470
duplicate.limit(start + offset + length);
24712471
return duplicate.slice();
24722472
} else {
2473-
return Platform.createDirectByteBufferFromNativeAddress(address + offset, length);
2473+
return ByteBufferUtil.createDirectByteBufferFromNativeAddress(address + offset, length);
24742474
}
24752475
}
24762476
}
@@ -2553,7 +2553,7 @@ public static MemoryBuffer fromByteArray(byte[] buffer) {
25532553
public static MemoryBuffer fromByteBuffer(ByteBuffer buffer) {
25542554
if (buffer.isDirect()) {
25552555
return new MemoryBuffer(
2556-
Platform.getAddress(buffer) + buffer.position(), buffer.remaining(), buffer);
2556+
ByteBufferUtil.getAddress(buffer) + buffer.position(), buffer.remaining(), buffer);
25572557
} else {
25582558
int offset = buffer.arrayOffset() + buffer.position();
25592559
return new MemoryBuffer(buffer.array(), offset, buffer.remaining());
@@ -2562,7 +2562,7 @@ public static MemoryBuffer fromByteBuffer(ByteBuffer buffer) {
25622562

25632563
public static MemoryBuffer fromDirectByteBuffer(
25642564
ByteBuffer buffer, int size, FuryStreamReader streamReader) {
2565-
long offHeapAddress = Platform.getAddress(buffer) + buffer.position();
2565+
long offHeapAddress = ByteBufferUtil.getAddress(buffer) + buffer.position();
25662566
return new MemoryBuffer(offHeapAddress, size, buffer, streamReader);
25672567
}
25682568

0 commit comments

Comments
 (0)