Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,17 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic {
public static final int
IO_COMPRESSION_CODEC_ZSTD_BUFFER_SIZE_DEFAULT = 0;

/** ZStandard number of compression worker threads.
* A value of 0 (the default) disables worker threads and runs
* compression on the calling thread, matching the upstream zstd
* default. A positive value enables multi-threaded compression with
* the specified number of background workers. */
public static final String IO_COMPRESSION_CODEC_ZSTD_WORKERS_KEY =
"io.compression.codec.zstd.workers";

/** Default value for IO_COMPRESSION_CODEC_ZSTD_WORKERS_KEY (disabled). */
public static final int IO_COMPRESSION_CODEC_ZSTD_WORKERS_DEFAULT = 0;

/** Internal buffer size for Lz4 compressor/decompressors */
public static final String IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_KEY =
"io.compression.codec.lz4.buffersize";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,28 @@ public static int getCompressionLevel(Configuration conf) {
CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_LEVEL_DEFAULT);
}

/**
* Returns the number of compression worker threads to be used by the
* ZStandard compressor. A value of 0 (the default) disables worker
* threads, matching the upstream zstd default. Negative values are
* rejected.
*
* @param conf the configuration to read from
* @return the configured number of zstd compression worker threads
* @throws IllegalArgumentException if the configured value is negative
*/
public static int getCompressionWorkers(Configuration conf) {
int workers = conf.getInt(
CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_WORKERS_KEY,
CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_WORKERS_DEFAULT);
if (workers < 0) {
throw new IllegalArgumentException(
CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_WORKERS_KEY
+ " must be >= 0, got " + workers);
}
return workers;
}

public static int getCompressionBufferSize(Configuration conf) {
int bufferSize = getBufferSize(conf);
return bufferSize == 0 ?
Expand Down Expand Up @@ -135,7 +157,9 @@ public Class<? extends Compressor> getCompressorType() {
@Override
public Compressor createCompressor() {
return new ZStandardCompressor(
getCompressionLevel(conf), getCompressionBufferSize(conf));
getCompressionLevel(conf),
getCompressionWorkers(conf),
getCompressionBufferSize(conf));
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public class ZStandardCompressor implements Compressor {
LoggerFactory.getLogger(ZStandardCompressor.class);

private int level;
private int workers;
private int directBufferSize;
private byte[] userBuf = null;
private int userBufOff = 0, userBufLen = 0;
Expand Down Expand Up @@ -74,12 +75,30 @@ public static int getRecommendedBufferSize() {
* @param bufferSize bufferSize.
*/
public ZStandardCompressor(int level, int bufferSize) {
this(level, bufferSize, bufferSize);
this(level,
CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_WORKERS_DEFAULT,
bufferSize, bufferSize);
}

/**
* Creates a new compressor with the supplied compression level and number
* of compression worker threads. Compressed data will be generated in
* ZStandard format.
*
* @param level the zstd compression level
* @param workers number of zstd compression worker threads (0 disables
* multi-threaded compression)
* @param bufferSize the input/output direct buffer size
*/
public ZStandardCompressor(int level, int workers, int bufferSize) {
this(level, workers, bufferSize, bufferSize);
}

@VisibleForTesting
ZStandardCompressor(int level, int inputBufferSize, int outputBufferSize) {
ZStandardCompressor(int level, int workers, int inputBufferSize,
int outputBufferSize) {
this.level = level;
this.workers = workers;
zstdJniCtx = new ZstdCompressCtx();
uncompressedDirectBuf = ByteBuffer.allocateDirect(inputBufferSize);
directBufferSize = outputBufferSize;
Expand All @@ -101,6 +120,7 @@ public void reinit(Configuration conf) {
return;
}
level = ZStandardCodec.getCompressionLevel(conf);
workers = ZStandardCodec.getCompressionWorkers(conf);
reset();
LOG.debug("Reinit compressor with new compression configuration");
}
Expand Down Expand Up @@ -196,11 +216,9 @@ public int compress(byte[] b, int off, int len) throws IOException {
return n;
}

// Always invoke the streaming API — even with empty input — so internally
// buffered bytes continue to be drained, matching native ZSTD_flushStream.
// Use END only when finish=true, no more user data, and all direct-buffer
// data consumed (mirrors ZSTD_endStream); otherwise FLUSH (mirrors
// ZSTD_compressStream + ZSTD_flushStream).
// Always invoke the streaming API - even with empty input - so internally
// buffered bytes continue to be drained. Use END only when finish=true, no
// more user data, and all direct-buffer data consumed; otherwise CONTINUE.
boolean allConsumed = (uncompressedDirectBufLen - uncompressedDirectBufOff <= 0);
boolean shouldEnd = finish && userBufLen == 0 && allConsumed;

Expand All @@ -209,7 +227,16 @@ public int compress(byte[] b, int off, int len) throws IOException {
compressedDirectBuf.position(0);
compressedDirectBuf.limit(directBufferSize);

EndDirective endOp = shouldEnd ? EndDirective.END : EndDirective.FLUSH;
// CONTINUE should be used for non-end case, to support multi-threaded:
// 1. CONTINUE + workers >= 1: non-blocking. The call copies as much input
// as it can into a job, dispatches to workers, drains whatever output
// is ready, and returns. Multiple jobs can be in flight in parallel.
// 2. FLUSH + workers >= 1: multi-threaded compression will block to flush
// as much output as possible. The call won't return until every queued
// job has finished and its output has been drained to the dst buffer.
// 3. END + workers >= 1: same as FLUSH but also closes the frame. Same
// blocking behavior.
EndDirective endOp = shouldEnd ? EndDirective.END : EndDirective.CONTINUE;
Copy link
Copy Markdown
Member Author

@pan3793 pan3793 Apr 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my spark integration test shows it has no effect when using FLUSH - setting workers to 4 has the same cpu usage and wall-clock time as the default workers 0. while after the change to CONTINUE, the cpu average usage takes ~3.5x, wall-clock time cost has also decreased significantly

boolean done = zstdJniCtx.compressDirectByteBufferStream(
compressedDirectBuf, uncompressedDirectBuf, endOp);

Expand Down Expand Up @@ -269,6 +296,7 @@ public void reset() {
checkStream();
zstdJniCtx.reset();
zstdJniCtx.setLevel(level);
zstdJniCtx.setWorkers(workers);
finish = false;
finished = false;
bytesRead = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -981,6 +981,21 @@
</description>
</property>

<property>
<name>io.compression.codec.zstd.workers</name>
<value>0</value>
<description>
Number of worker threads used by the ZStandard compressor for
multi-threaded compression. The default value 0 disables worker
threads and runs compression on the calling thread, matching the
upstream zstd default. Setting this to a positive value enables
parallel compression with the specified number of background
workers; the value is capped internally by the zstd library.
Negative values are rejected. This setting only affects compression;
decompression is unaffected.
</description>
</property>

<property>
<name>io.serializations</name>
<value>org.apache.hadoop.io.serializer.WritableSerialization, org.apache.hadoop.io.serializer.avro.AvroSpecificSerialization, org.apache.hadoop.io.serializer.avro.AvroReflectSerialization</value>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,8 +198,16 @@ public void testSetInputWithBytesSizeMoreThenDefaultZStandardBufferSize()
byte[] bytes = generate(bytesSize);
assertTrue(compressor.needsInput(), "needsInput error !!!");
compressor.setInput(bytes, 0, bytes.length);
compressor.finish();
byte[] emptyBytes = new byte[bytesSize];
int cSize = compressor.compress(emptyBytes, 0, bytes.length);
// Drive compress() in a loop until the compressor reports finished(),
// mirroring how CompressorStream drains the compressor.
int cSize = 0;
while (!compressor.finished() && cSize < emptyBytes.length) {
compressor.needsInput();
cSize += compressor.compress(emptyBytes, cSize,
emptyBytes.length - cSize);
}
assertTrue(cSize > 0);
}

Expand Down Expand Up @@ -330,13 +338,27 @@ public void testCompressDecompress() throws Exception {
assertEquals(0, compressor.getBytesRead());
compressor.finish();

// Drive compress() in a loop until the compressor reports finished(),
// mirroring how CompressorStream drains the compressor.
byte[] compressedResult = new byte[rawDataSize];
int cSize = compressor.compress(compressedResult, 0, rawDataSize);
int cSize = 0;
while (!compressor.finished() && cSize < compressedResult.length) {
cSize += compressor.compress(compressedResult, cSize,
compressedResult.length - cSize);
}
assertTrue(compressor.finished());
assertEquals(rawDataSize, compressor.getBytesRead());
assertTrue(cSize < rawDataSize);
decompressor.setInput(compressedResult, 0, cSize);
// Drive decompress() in a loop until the decompressor reports finished()
// (see CompressDecompressTester#COMPRESS_DECOMPRESS_BLOCK).
byte[] decompressedBytes = new byte[rawDataSize];
decompressor.decompress(decompressedBytes, 0, decompressedBytes.length);
int dSize = 0;
while (!decompressor.finished() && dSize < decompressedBytes.length) {
dSize += decompressor.decompress(decompressedBytes, dSize,
decompressedBytes.length - dSize);
}
assertEquals(rawDataSize, dSize);
assertEquals(bytesToHex(rawData), bytesToHex(decompressedBytes));
compressor.reset();
decompressor.reset();
Expand All @@ -354,7 +376,7 @@ public void testCompressingWithOneByteOutputBuffer() throws Exception {

ByteArrayOutputStream baos = new ByteArrayOutputStream();
Compressor compressor =
new ZStandardCompressor(3, IO_FILE_BUFFER_SIZE_DEFAULT, 1);
new ZStandardCompressor(3, 0, IO_FILE_BUFFER_SIZE_DEFAULT, 1);
CompressionOutputStream outputStream =
codec.createOutputStream(baos, compressor);

Expand Down Expand Up @@ -397,14 +419,28 @@ public void testZStandardCompressDecompress() throws Exception {
compressor.setInput(rawData, 0, rawData.length);
compressor.finish();

// Drive compress() in a loop until the compressor reports finished(),
// mirroring how CompressorStream drains the compressor.
byte[] compressedResult = new byte[rawDataSize];
int cSize = compressor.compress(compressedResult, 0, rawDataSize);
int cSize = 0;
while (!compressor.finished() && cSize < compressedResult.length) {
cSize += compressor.compress(compressedResult, cSize,
compressedResult.length - cSize);
}
assertTrue(compressor.finished());
assertEquals(rawDataSize, compressor.getBytesRead());
assertTrue(cSize < rawDataSize,
"compressed size no less then original size");
decompressor.setInput(compressedResult, 0, cSize);
// Drive decompress() in a loop until the decompressor reports finished()
// (see CompressDecompressTester#COMPRESS_DECOMPRESS_BLOCK).
byte[] decompressedBytes = new byte[rawDataSize];
decompressor.decompress(decompressedBytes, 0, decompressedBytes.length);
int dSize = 0;
while (!decompressor.finished() && dSize < decompressedBytes.length) {
dSize += decompressor.decompress(decompressedBytes, dSize,
decompressedBytes.length - dSize);
}
assertEquals(rawDataSize, dSize);
String decompressed = bytesToHex(decompressedBytes);
String original = bytesToHex(rawData);
assertEquals(original, decompressed);
Expand Down Expand Up @@ -521,6 +557,107 @@ public void testDecompressReturnsWhenNothingToDecompress() throws Exception {
assertEquals(0, result);
}

// workers > 0 should produce data that round-trips correctly through the
// decompressor, matching the bytes produced with the default workers=0.
@Test
public void testCompressionWithWorkers() throws Exception {
byte[] bytes = FileUtils.readFileToByteArray(uncompressedFile);

Configuration conf = new Configuration();
conf.setInt("io.compression.codec.zstd.workers", 2);
ZStandardCodec codec = new ZStandardCodec();
codec.setConf(conf);

ByteArrayOutputStream baos = new ByteArrayOutputStream();
Compressor compressor = codec.createCompressor();
try (CompressionOutputStream outputStream =
codec.createOutputStream(baos, compressor)) {
outputStream.write(bytes);
outputStream.finish();
}
assertTrue(compressor.finished());
assertEquals(bytes.length, compressor.getBytesRead());

// Round-trip through the decompressor.
ByteArrayOutputStream decompressed = new ByteArrayOutputStream();
ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
Decompressor decompressor = codec.createDecompressor();
try (CompressionInputStream inputStream =
codec.createInputStream(bais, decompressor)) {
byte[] buf = new byte[4096];
int n;
while ((n = inputStream.read(buf, 0, buf.length)) != -1) {
decompressed.write(buf, 0, n);
}
}
assertArrayEquals(bytes, decompressed.toByteArray());
}

// A negative workers value must be rejected up-front by ZStandardCodec.
@Test
public void testNegativeWorkersIsRejected() {
Configuration conf = new Configuration();
conf.setInt("io.compression.codec.zstd.workers", -1);
ZStandardCodec codec = new ZStandardCodec();
codec.setConf(conf);
assertThrows(IllegalArgumentException.class, codec::createCompressor);
}

// The default value (workers=0) must keep behaviour identical to before.
@Test
public void testDefaultWorkersIsZero() throws Exception {
Configuration conf = new Configuration();
ZStandardCodec codec = new ZStandardCodec();
codec.setConf(conf);
assertEquals(0, ZStandardCodec.getCompressionWorkers(conf));

byte[] bytes = FileUtils.readFileToByteArray(uncompressedFile);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
Compressor compressor = codec.createCompressor();
try (CompressionOutputStream outputStream =
codec.createOutputStream(baos, compressor)) {
outputStream.write(bytes);
outputStream.finish();
}
assertTrue(compressor.finished());
assertEquals(bytes.length, compressor.getBytesRead());
}

// reinit() should pick up an updated workers value for pooled compressors.
@Test
public void testReinitUpdatesWorkers() throws Exception {
byte[] bytes = FileUtils.readFileToByteArray(uncompressedFile);

ZStandardCodec codec = new ZStandardCodec();
codec.setConf(new Configuration());
Compressor compressor = codec.createCompressor();

Configuration newConf = new Configuration();
newConf.setInt("io.compression.codec.zstd.workers", 2);
compressor.reinit(newConf);

ByteArrayOutputStream baos = new ByteArrayOutputStream();
try (CompressionOutputStream outputStream =
codec.createOutputStream(baos, compressor)) {
outputStream.write(bytes);
outputStream.finish();
}

// Round-trip to confirm the output is still valid zstd data.
ByteArrayOutputStream decompressed = new ByteArrayOutputStream();
ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
Decompressor decompressor = codec.createDecompressor();
try (CompressionInputStream inputStream =
codec.createInputStream(bais, decompressor)) {
byte[] buf = new byte[4096];
int n;
while ((n = inputStream.read(buf, 0, buf.length)) != -1) {
decompressed.write(buf, 0, n);
}
}
assertArrayEquals(bytes, decompressed.toByteArray());
}

public static byte[] generate(int size) {
byte[] data = new byte[size];
for (int i = 0; i < size; i++) {
Expand Down
Loading