Skip to content

Commit 30a8608

Browse files
authored
[log] Fix the 'Destination buffer is too small' error while decompress data using ZSTD (#464)
1 parent 5d7f76b commit 30a8608

File tree

1 file changed

+40
-17
lines changed

1 file changed

+40
-17
lines changed

fluss-common/src/main/java/com/alibaba/fluss/compression/ZstdArrowCompressionCodec.java

Lines changed: 40 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424

2525
import com.github.luben.zstd.Zstd;
2626

27+
import java.nio.ByteBuffer;
28+
2729
/* This file is based on source code of Apache Arrow-java Project (https://github.com/apache/arrow-java), licensed by
2830
* the Apache Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with
2931
* this work for additional information regarding copyright ownership. */
@@ -34,7 +36,7 @@ public class ZstdArrowCompressionCodec extends AbstractCompressionCodec {
3436
private final int compressionLevel;
3537

3638
public ZstdArrowCompressionCodec() {
37-
this.compressionLevel = DEFAULT_COMPRESSION_LEVEL;
39+
this(DEFAULT_COMPRESSION_LEVEL);
3840
}
3941

4042
public ZstdArrowCompressionCodec(int compressionLevel) {
@@ -44,41 +46,62 @@ public ZstdArrowCompressionCodec(int compressionLevel) {
4446
@Override
4547
protected ArrowBuf doCompress(BufferAllocator allocator, ArrowBuf uncompressedBuffer) {
4648
long maxSize = Zstd.compressBound(uncompressedBuffer.writerIndex());
47-
long dstSize = CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH + maxSize;
48-
ArrowBuf compressedBuffer = allocator.buffer(dstSize);
49+
ByteBuffer uncompressedDirectBuffer = uncompressedBuffer.nioBuffer();
50+
51+
long compressedSize = CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH + maxSize;
52+
ArrowBuf compressedBuffer = allocator.buffer(compressedSize);
53+
ByteBuffer compressedDirectBuffer =
54+
compressedBuffer.nioBuffer(
55+
CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH, (int) maxSize);
56+
57+
// The reason why we use Zstd.compressDirectByteBuffer instead of Zstd.compressUnsafe used
58+
// in arrow-java here is that compressUnsafe() may encounter occasional data corruption
59+
// issues when dealing with large volumes of data, and the cause has not yet been
60+
// determined.
4961
long bytesWritten =
50-
Zstd.compressUnsafe(
51-
compressedBuffer.memoryAddress()
52-
+ CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH,
53-
dstSize,
54-
/*src*/ uncompressedBuffer.memoryAddress(),
55-
/*srcSize=*/ uncompressedBuffer.writerIndex(),
56-
/*level=*/ this.compressionLevel);
62+
Zstd.compressDirectByteBuffer(
63+
compressedDirectBuffer,
64+
0,
65+
(int) maxSize,
66+
uncompressedDirectBuffer,
67+
0,
68+
(int) uncompressedBuffer.writerIndex(),
69+
compressionLevel);
70+
5771
if (Zstd.isError(bytesWritten)) {
5872
compressedBuffer.close();
5973
throw new RuntimeException("Error compressing: " + Zstd.getErrorName(bytesWritten));
6074
}
75+
6176
compressedBuffer.writerIndex(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH + bytesWritten);
6277
return compressedBuffer;
6378
}
6479

6580
@Override
6681
protected ArrowBuf doDecompress(BufferAllocator allocator, ArrowBuf compressedBuffer) {
6782
long decompressedLength = readUncompressedLength(compressedBuffer);
83+
84+
ByteBuffer compressedDirectBuffer = compressedBuffer.nioBuffer();
6885
ArrowBuf uncompressedBuffer = allocator.buffer(decompressedLength);
86+
ByteBuffer uncompressedDirectBuffer =
87+
uncompressedBuffer.nioBuffer(0, (int) decompressedLength);
88+
6989
long decompressedSize =
70-
Zstd.decompressUnsafe(
71-
uncompressedBuffer.memoryAddress(),
72-
decompressedLength,
73-
/*src=*/ compressedBuffer.memoryAddress()
74-
+ CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH,
75-
compressedBuffer.writerIndex()
76-
- CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH);
90+
Zstd.decompressDirectByteBuffer(
91+
uncompressedDirectBuffer,
92+
0,
93+
(int) decompressedLength,
94+
compressedDirectBuffer,
95+
(int) CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH,
96+
(int)
97+
(compressedBuffer.writerIndex()
98+
- CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH));
7799
if (Zstd.isError(decompressedSize)) {
78100
uncompressedBuffer.close();
79101
throw new RuntimeException(
80102
"Error decompressing: " + Zstd.getErrorName(decompressedSize));
81103
}
104+
82105
if (decompressedLength != decompressedSize) {
83106
uncompressedBuffer.close();
84107
throw new RuntimeException(

0 commit comments

Comments
 (0)