Skip to content

Commit 756c3a2

Browse files
authored
[log] Fix Lz4ArrowCompressionCodec gets stuck during compression and is unable to flush data error (#465)
1 parent 30a8608 commit 756c3a2

File tree

4 files changed

+705
-17
lines changed

4 files changed

+705
-17
lines changed

fluss-common/pom.xml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -62,9 +62,9 @@
6262

6363
<!-- TODO: these two dependencies need to be shaded. -->
6464
<dependency>
65-
<groupId>org.apache.commons</groupId>
66-
<artifactId>commons-compress</artifactId>
67-
<version>1.20</version>
65+
<groupId>org.lz4</groupId>
66+
<artifactId>lz4-java</artifactId>
67+
<version>1.8.0</version>
6868
</dependency>
6969

7070
<dependency>
Lines changed: 305 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,305 @@
1+
/*
2+
* Copyright (c) 2025 Alibaba Group Holding Ltd.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.alibaba.fluss.compression;
18+
19+
import com.alibaba.fluss.compression.FlussLZ4BlockOutputStream.FLG;
20+
21+
import net.jpountz.lz4.LZ4Compressor;
22+
import net.jpountz.lz4.LZ4Exception;
23+
import net.jpountz.lz4.LZ4Factory;
24+
import net.jpountz.lz4.LZ4FrameOutputStream.BD;
25+
import net.jpountz.lz4.LZ4SafeDecompressor;
26+
import net.jpountz.xxhash.XXHash32;
27+
import net.jpountz.xxhash.XXHashFactory;
28+
29+
import java.io.IOException;
30+
import java.io.InputStream;
31+
import java.nio.ByteBuffer;
32+
import java.nio.ByteOrder;
33+
34+
import static com.alibaba.fluss.compression.FlussLZ4BlockOutputStream.LZ4_FRAME_INCOMPRESSIBLE_MASK;
35+
import static com.alibaba.fluss.compression.FlussLZ4BlockOutputStream.MAGIC;
36+
37+
/* This file is based on source code of Apache Kafka Project (https://kafka.apache.org/), licensed by the Apache
38+
* Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for
39+
* additional information regarding copyright ownership. */
40+
41+
/**
42+
* A partial implementation of the v1.5.1 LZ4 Frame format.
43+
*
44+
* <p>This class is not thread-safe.
45+
*/
46+
public class FlussLZ4BlocakInputStream extends InputStream {
47+
public static final String PREMATURE_EOS = "Stream ended prematurely";
48+
public static final String NOT_SUPPORTED = "Stream unsupported (invalid magic bytes)";
49+
public static final String BLOCK_HASH_MISMATCH = "Block checksum mismatch";
50+
public static final String DESCRIPTOR_HASH_MISMATCH = "Stream frame descriptor corrupted";
51+
52+
private static final LZ4SafeDecompressor DECOMPRESSOR =
53+
LZ4Factory.fastestInstance().safeDecompressor();
54+
private static final XXHash32 CHECKSUM = XXHashFactory.fastestInstance().hash32();
55+
56+
private static final RuntimeException BROKEN_LZ4_EXCEPTION;
57+
58+
static {
59+
RuntimeException exception = null;
60+
try {
61+
detectBrokenLz4Version();
62+
} catch (RuntimeException e) {
63+
exception = e;
64+
}
65+
BROKEN_LZ4_EXCEPTION = exception;
66+
}
67+
68+
private final ByteBuffer in;
69+
private final ByteBuffer decompressionBuffer;
70+
// `flg` and `maxBlockSize` are effectively final, they are initialised in the `readHeader`
71+
// method that is only invoked from the constructor
72+
private FLG flg;
73+
private int maxBlockSize;
74+
75+
// If a block is compressed, this is the same as `decompressionBuffer`. If a block is not
76+
// compressed, this is a slice of `in` to avoid unnecessary copies.
77+
private ByteBuffer decompressedBuffer;
78+
private boolean finished;
79+
80+
/**
81+
* Create a new {@link InputStream} that will decompress data using the LZ4 algorithm.
82+
*
83+
* @param in The byte buffer to decompress
84+
*/
85+
public FlussLZ4BlocakInputStream(ByteBuffer in) throws IOException {
86+
if (BROKEN_LZ4_EXCEPTION != null) {
87+
throw BROKEN_LZ4_EXCEPTION;
88+
}
89+
this.in = in.duplicate().order(ByteOrder.LITTLE_ENDIAN);
90+
readHeader();
91+
decompressionBuffer = ByteBuffer.allocate(maxBlockSize);
92+
decompressionBuffer.order(ByteOrder.LITTLE_ENDIAN);
93+
finished = false;
94+
}
95+
96+
/** Reads the magic number and frame descriptor from input buffer. */
97+
private void readHeader() throws IOException {
98+
// read first 6 bytes into buffer to check magic and FLG/BD descriptor flags
99+
if (in.remaining() < 6) {
100+
throw new IOException(PREMATURE_EOS);
101+
}
102+
103+
if (MAGIC != in.getInt()) {
104+
throw new IOException(NOT_SUPPORTED);
105+
}
106+
// mark start of data to checksum
107+
in.mark();
108+
109+
flg = FlussLZ4BlockOutputStream.FLG.fromByte(in.get());
110+
maxBlockSize = BD.fromByte(in.get()).getBlockMaximumSize();
111+
112+
if (flg.isContentSizeSet()) {
113+
if (in.remaining() < 8) {
114+
throw new IOException(PREMATURE_EOS);
115+
}
116+
in.position(in.position() + 8);
117+
}
118+
119+
int len = in.position() - in.reset().position();
120+
121+
int hash = CHECKSUM.hash(in, in.position(), len, 0);
122+
in.position(in.position() + len);
123+
if (in.get() != (byte) ((hash >> 8) & 0xFF)) {
124+
throw new IOException(DESCRIPTOR_HASH_MISMATCH);
125+
}
126+
}
127+
128+
/**
129+
* Decompresses (if necessary) buffered data, optionally computes and validates a XXHash32
130+
* checksum, and writes the result to a buffer.
131+
*/
132+
private void readBlock() throws IOException {
133+
if (in.remaining() < 4) {
134+
throw new IOException(PREMATURE_EOS);
135+
}
136+
137+
int blockSize = in.getInt();
138+
boolean compressed = (blockSize & LZ4_FRAME_INCOMPRESSIBLE_MASK) == 0;
139+
blockSize &= ~LZ4_FRAME_INCOMPRESSIBLE_MASK;
140+
141+
// Check for EndMark
142+
if (blockSize == 0) {
143+
finished = true;
144+
if (flg.isContentChecksumSet()) {
145+
in.getInt(); // TODO: verify this content checksum
146+
}
147+
return;
148+
} else if (blockSize > maxBlockSize) {
149+
throw new IOException(
150+
String.format("Block size %d exceeded max: %d", blockSize, maxBlockSize));
151+
}
152+
153+
if (in.remaining() < blockSize) {
154+
throw new IOException(PREMATURE_EOS);
155+
}
156+
157+
if (compressed) {
158+
try {
159+
final int bufferSize =
160+
DECOMPRESSOR.decompress(
161+
in, in.position(), blockSize, decompressionBuffer, 0, maxBlockSize);
162+
decompressionBuffer.position(0);
163+
decompressionBuffer.limit(bufferSize);
164+
decompressedBuffer = decompressionBuffer;
165+
} catch (LZ4Exception e) {
166+
throw new IOException(e);
167+
}
168+
} else {
169+
decompressedBuffer = in.slice();
170+
decompressedBuffer.limit(blockSize);
171+
}
172+
173+
// verify checksum
174+
if (flg.isBlockChecksumSet()) {
175+
int hash = CHECKSUM.hash(in, in.position(), blockSize, 0);
176+
in.position(in.position() + blockSize);
177+
if (hash != in.getInt()) {
178+
throw new IOException(BLOCK_HASH_MISMATCH);
179+
}
180+
} else {
181+
in.position(in.position() + blockSize);
182+
}
183+
}
184+
185+
@Override
186+
public int read() throws IOException {
187+
if (finished) {
188+
return -1;
189+
}
190+
if (available() == 0) {
191+
readBlock();
192+
}
193+
if (finished) {
194+
return -1;
195+
}
196+
197+
return decompressedBuffer.get() & 0xFF;
198+
}
199+
200+
@Override
201+
public int read(byte[] b, int off, int len) throws IOException {
202+
net.jpountz.util.SafeUtils.checkRange(b, off, len);
203+
if (finished) {
204+
return -1;
205+
}
206+
if (available() == 0) {
207+
readBlock();
208+
}
209+
if (finished) {
210+
return -1;
211+
}
212+
len = Math.min(len, available());
213+
214+
decompressedBuffer.get(b, off, len);
215+
return len;
216+
}
217+
218+
@Override
219+
public long skip(long n) throws IOException {
220+
if (finished) {
221+
return 0;
222+
}
223+
if (available() == 0) {
224+
readBlock();
225+
}
226+
if (finished) {
227+
return 0;
228+
}
229+
int skipped = (int) Math.min(n, available());
230+
decompressedBuffer.position(decompressedBuffer.position() + skipped);
231+
return skipped;
232+
}
233+
234+
@Override
235+
public int available() {
236+
return decompressedBuffer == null ? 0 : decompressedBuffer.remaining();
237+
}
238+
239+
@Override
240+
public void close() {
241+
if (decompressionBuffer != null) {
242+
decompressionBuffer.clear();
243+
}
244+
}
245+
246+
@Override
247+
public void mark(int readlimit) {
248+
throw new RuntimeException("mark not supported");
249+
}
250+
251+
@Override
252+
public void reset() {
253+
throw new RuntimeException("reset not supported");
254+
}
255+
256+
@Override
257+
public boolean markSupported() {
258+
return false;
259+
}
260+
261+
/**
262+
* Checks whether the version of lz4 on the classpath has the fix for reading from ByteBuffers
263+
* with non-zero array offsets.
264+
*/
265+
static void detectBrokenLz4Version() {
266+
byte[] source = new byte[] {1, 1, 1, 1, 1, 2, 2, 2, 2, 2, 3, 3, 3, 3, 3};
267+
final LZ4Compressor compressor = LZ4Factory.fastestInstance().fastCompressor();
268+
269+
final byte[] compressed = new byte[compressor.maxCompressedLength(source.length)];
270+
final int compressedLength =
271+
compressor.compress(source, 0, source.length, compressed, 0, compressed.length);
272+
273+
// allocate an array-backed ByteBuffer with non-zero array-offset containing the compressed
274+
// data a buggy decompressor will read the data from the beginning of the underlying array
275+
// instead of the beginning of the ByteBuffer, failing to decompress the invalid data.
276+
final byte[] zeroes = {0, 0, 0, 0, 0};
277+
ByteBuffer nonZeroOffsetBuffer =
278+
ByteBuffer.allocate(
279+
zeroes.length
280+
+ compressed
281+
.length) // allocates the backing array with extra
282+
// space to offset the data
283+
.put(zeroes) // prepend invalid bytes (zeros) before the compressed data
284+
// in the array
285+
.slice() // create a new ByteBuffer sharing the underlying array, offset to
286+
// start on the compressed data
287+
.put(compressed); // write the compressed data at the beginning of this
288+
// new buffer
289+
290+
ByteBuffer dest = ByteBuffer.allocate(source.length);
291+
try {
292+
DECOMPRESSOR.decompress(
293+
nonZeroOffsetBuffer, 0, compressedLength, dest, 0, source.length);
294+
} catch (Exception e) {
295+
throw new RuntimeException(
296+
"Fluss has detected detected a buggy lz4-java library (< 1.4.x) on the classpath."
297+
+ " If you are using Fluss client libraries, make sure your application does not"
298+
+ " accidentally override the version provided by Fluss or include multiple versions"
299+
+ " of the library on the classpath. The lz4-java version on the classpath should"
300+
+ " match the version the Fluss client libraries depend on. Adding -verbose:class"
301+
+ " to your JVM arguments may help understand which lz4-java version is getting loaded.",
302+
e);
303+
}
304+
}
305+
}

0 commit comments

Comments
 (0)