diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java index f58331baa81a1..59019feb112d0 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java @@ -168,6 +168,17 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic { public static final int IO_COMPRESSION_CODEC_ZSTD_BUFFER_SIZE_DEFAULT = 0; + /** ZStandard number of compression worker threads. + * A value of 0 (the default) disables worker threads and runs + * compression on the calling thread, matching the upstream zstd + * default. A positive value enables multi-threaded compression with + * the specified number of background workers. */ + public static final String IO_COMPRESSION_CODEC_ZSTD_WORKERS_KEY = + "io.compression.codec.zstd.workers"; + + /** Default value for IO_COMPRESSION_CODEC_ZSTD_WORKERS_KEY (disabled). */ + public static final int IO_COMPRESSION_CODEC_ZSTD_WORKERS_DEFAULT = 0; + /** Internal buffer size for Lz4 compressor/decompressors */ public static final String IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_KEY = "io.compression.codec.lz4.buffersize"; diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/ZStandardCodec.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/ZStandardCodec.java index 7b7ad69014caa..109fd27a93b23 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/ZStandardCodec.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/ZStandardCodec.java @@ -68,6 +68,28 @@ public static int getCompressionLevel(Configuration conf) { CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_LEVEL_DEFAULT); } + /** + * Returns the number of compression worker threads to be used by the + * ZStandard compressor. A value of 0 (the default) disables worker + * threads, matching the upstream zstd default. Negative values are + * rejected. + * + * @param conf the configuration to read from + * @return the configured number of zstd compression worker threads + * @throws IllegalArgumentException if the configured value is negative + */ + public static int getCompressionWorkers(Configuration conf) { + int workers = conf.getInt( + CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_WORKERS_KEY, + CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_WORKERS_DEFAULT); + if (workers < 0) { + throw new IllegalArgumentException( + CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_WORKERS_KEY + + " must be >= 0, got " + workers); + } + return workers; + } + public static int getCompressionBufferSize(Configuration conf) { int bufferSize = getBufferSize(conf); return bufferSize == 0 ? @@ -135,7 +157,9 @@ public Class getCompressorType() { @Override public Compressor createCompressor() { return new ZStandardCompressor( - getCompressionLevel(conf), getCompressionBufferSize(conf)); + getCompressionLevel(conf), + getCompressionWorkers(conf), + getCompressionBufferSize(conf)); } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zstd/ZStandardCompressor.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zstd/ZStandardCompressor.java index 4fe16c87f1615..8d989ec96db55 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zstd/ZStandardCompressor.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zstd/ZStandardCompressor.java @@ -43,6 +43,7 @@ public class ZStandardCompressor implements Compressor { LoggerFactory.getLogger(ZStandardCompressor.class); private int level; + private int workers; private int directBufferSize; private byte[] userBuf = null; private int userBufOff = 0, userBufLen = 0; @@ -74,12 +75,30 @@ public static int getRecommendedBufferSize() { * @param bufferSize bufferSize. */ public ZStandardCompressor(int level, int bufferSize) { - this(level, bufferSize, bufferSize); + this(level, + CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_WORKERS_DEFAULT, + bufferSize, bufferSize); + } + + /** + * Creates a new compressor with the supplied compression level and number + * of compression worker threads. Compressed data will be generated in + * ZStandard format. + * + * @param level the zstd compression level + * @param workers number of zstd compression worker threads (0 disables + * multi-threaded compression) + * @param bufferSize the input/output direct buffer size + */ + public ZStandardCompressor(int level, int workers, int bufferSize) { + this(level, workers, bufferSize, bufferSize); } @VisibleForTesting - ZStandardCompressor(int level, int inputBufferSize, int outputBufferSize) { + ZStandardCompressor(int level, int workers, int inputBufferSize, + int outputBufferSize) { this.level = level; + this.workers = workers; zstdJniCtx = new ZstdCompressCtx(); uncompressedDirectBuf = ByteBuffer.allocateDirect(inputBufferSize); directBufferSize = outputBufferSize; @@ -101,6 +120,7 @@ public void reinit(Configuration conf) { return; } level = ZStandardCodec.getCompressionLevel(conf); + workers = ZStandardCodec.getCompressionWorkers(conf); reset(); LOG.debug("Reinit compressor with new compression configuration"); } @@ -196,11 +216,9 @@ public int compress(byte[] b, int off, int len) throws IOException { return n; } - // Always invoke the streaming API — even with empty input — so internally - // buffered bytes continue to be drained, matching native ZSTD_flushStream. - // Use END only when finish=true, no more user data, and all direct-buffer - // data consumed (mirrors ZSTD_endStream); otherwise FLUSH (mirrors - // ZSTD_compressStream + ZSTD_flushStream). + // Always invoke the streaming API - even with empty input - so internally + // buffered bytes continue to be drained. Use END only when finish=true, no + // more user data, and all direct-buffer data consumed; otherwise CONTINUE. boolean allConsumed = (uncompressedDirectBufLen - uncompressedDirectBufOff <= 0); boolean shouldEnd = finish && userBufLen == 0 && allConsumed; @@ -209,7 +227,16 @@ public int compress(byte[] b, int off, int len) throws IOException { compressedDirectBuf.position(0); compressedDirectBuf.limit(directBufferSize); - EndDirective endOp = shouldEnd ? EndDirective.END : EndDirective.FLUSH; + // CONTINUE should be used for non-end case, to support multi-threaded: + // 1. CONTINUE + workers >= 1: non-blocking. The call copies as much input + // as it can into a job, dispatches to workers, drains whatever output + // is ready, and returns. Multiple jobs can be in flight in parallel. + // 2. FLUSH + workers >= 1: multi-threaded compression will block to flush + // as much output as possible. The call won't return until every queued + // job has finished and its output has been drained to the dst buffer. + // 3. END + workers >= 1: same as FLUSH but also closes the frame. Same + // blocking behavior. + EndDirective endOp = shouldEnd ? EndDirective.END : EndDirective.CONTINUE; boolean done = zstdJniCtx.compressDirectByteBufferStream( compressedDirectBuf, uncompressedDirectBuf, endOp); @@ -269,6 +296,7 @@ public void reset() { checkStream(); zstdJniCtx.reset(); zstdJniCtx.setLevel(level); + zstdJniCtx.setWorkers(workers); finish = false; finished = false; bytesRead = 0; diff --git a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml index 21446d521385d..a07460dc308cb 100644 --- a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml +++ b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml @@ -981,6 +981,21 @@ + + io.compression.codec.zstd.workers + 0 + + Number of worker threads used by the ZStandard compressor for + multi-threaded compression. The default value 0 disables worker + threads and runs compression on the calling thread, matching the + upstream zstd default. Setting this to a positive value enables + parallel compression with the specified number of background + workers; the value is capped internally by the zstd library. + Negative values are rejected. This setting only affects compression; + decompression is unaffected. + + + io.serializations org.apache.hadoop.io.serializer.WritableSerialization, org.apache.hadoop.io.serializer.avro.AvroSpecificSerialization, org.apache.hadoop.io.serializer.avro.AvroReflectSerialization diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/zstd/TestZStandardCompressorDecompressor.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/zstd/TestZStandardCompressorDecompressor.java index a141a974e591f..e1546cdbbfa23 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/zstd/TestZStandardCompressorDecompressor.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/zstd/TestZStandardCompressorDecompressor.java @@ -198,8 +198,16 @@ public void testSetInputWithBytesSizeMoreThenDefaultZStandardBufferSize() byte[] bytes = generate(bytesSize); assertTrue(compressor.needsInput(), "needsInput error !!!"); compressor.setInput(bytes, 0, bytes.length); + compressor.finish(); byte[] emptyBytes = new byte[bytesSize]; - int cSize = compressor.compress(emptyBytes, 0, bytes.length); + // Drive compress() in a loop until the compressor reports finished(), + // mirroring how CompressorStream drains the compressor. + int cSize = 0; + while (!compressor.finished() && cSize < emptyBytes.length) { + compressor.needsInput(); + cSize += compressor.compress(emptyBytes, cSize, + emptyBytes.length - cSize); + } assertTrue(cSize > 0); } @@ -330,13 +338,27 @@ public void testCompressDecompress() throws Exception { assertEquals(0, compressor.getBytesRead()); compressor.finish(); + // Drive compress() in a loop until the compressor reports finished(), + // mirroring how CompressorStream drains the compressor. byte[] compressedResult = new byte[rawDataSize]; - int cSize = compressor.compress(compressedResult, 0, rawDataSize); + int cSize = 0; + while (!compressor.finished() && cSize < compressedResult.length) { + cSize += compressor.compress(compressedResult, cSize, + compressedResult.length - cSize); + } + assertTrue(compressor.finished()); assertEquals(rawDataSize, compressor.getBytesRead()); assertTrue(cSize < rawDataSize); decompressor.setInput(compressedResult, 0, cSize); + // Drive decompress() in a loop until the decompressor reports finished() + // (see CompressDecompressTester#COMPRESS_DECOMPRESS_BLOCK). byte[] decompressedBytes = new byte[rawDataSize]; - decompressor.decompress(decompressedBytes, 0, decompressedBytes.length); + int dSize = 0; + while (!decompressor.finished() && dSize < decompressedBytes.length) { + dSize += decompressor.decompress(decompressedBytes, dSize, + decompressedBytes.length - dSize); + } + assertEquals(rawDataSize, dSize); assertEquals(bytesToHex(rawData), bytesToHex(decompressedBytes)); compressor.reset(); decompressor.reset(); @@ -354,7 +376,7 @@ public void testCompressingWithOneByteOutputBuffer() throws Exception { ByteArrayOutputStream baos = new ByteArrayOutputStream(); Compressor compressor = - new ZStandardCompressor(3, IO_FILE_BUFFER_SIZE_DEFAULT, 1); + new ZStandardCompressor(3, 0, IO_FILE_BUFFER_SIZE_DEFAULT, 1); CompressionOutputStream outputStream = codec.createOutputStream(baos, compressor); @@ -397,14 +419,28 @@ public void testZStandardCompressDecompress() throws Exception { compressor.setInput(rawData, 0, rawData.length); compressor.finish(); + // Drive compress() in a loop until the compressor reports finished(), + // mirroring how CompressorStream drains the compressor. byte[] compressedResult = new byte[rawDataSize]; - int cSize = compressor.compress(compressedResult, 0, rawDataSize); + int cSize = 0; + while (!compressor.finished() && cSize < compressedResult.length) { + cSize += compressor.compress(compressedResult, cSize, + compressedResult.length - cSize); + } + assertTrue(compressor.finished()); assertEquals(rawDataSize, compressor.getBytesRead()); assertTrue(cSize < rawDataSize, "compressed size no less then original size"); decompressor.setInput(compressedResult, 0, cSize); + // Drive decompress() in a loop until the decompressor reports finished() + // (see CompressDecompressTester#COMPRESS_DECOMPRESS_BLOCK). byte[] decompressedBytes = new byte[rawDataSize]; - decompressor.decompress(decompressedBytes, 0, decompressedBytes.length); + int dSize = 0; + while (!decompressor.finished() && dSize < decompressedBytes.length) { + dSize += decompressor.decompress(decompressedBytes, dSize, + decompressedBytes.length - dSize); + } + assertEquals(rawDataSize, dSize); String decompressed = bytesToHex(decompressedBytes); String original = bytesToHex(rawData); assertEquals(original, decompressed); @@ -521,6 +557,107 @@ public void testDecompressReturnsWhenNothingToDecompress() throws Exception { assertEquals(0, result); } + // workers > 0 should produce data that round-trips correctly through the + // decompressor, matching the bytes produced with the default workers=0. + @Test + public void testCompressionWithWorkers() throws Exception { + byte[] bytes = FileUtils.readFileToByteArray(uncompressedFile); + + Configuration conf = new Configuration(); + conf.setInt("io.compression.codec.zstd.workers", 2); + ZStandardCodec codec = new ZStandardCodec(); + codec.setConf(conf); + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + Compressor compressor = codec.createCompressor(); + try (CompressionOutputStream outputStream = + codec.createOutputStream(baos, compressor)) { + outputStream.write(bytes); + outputStream.finish(); + } + assertTrue(compressor.finished()); + assertEquals(bytes.length, compressor.getBytesRead()); + + // Round-trip through the decompressor. + ByteArrayOutputStream decompressed = new ByteArrayOutputStream(); + ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray()); + Decompressor decompressor = codec.createDecompressor(); + try (CompressionInputStream inputStream = + codec.createInputStream(bais, decompressor)) { + byte[] buf = new byte[4096]; + int n; + while ((n = inputStream.read(buf, 0, buf.length)) != -1) { + decompressed.write(buf, 0, n); + } + } + assertArrayEquals(bytes, decompressed.toByteArray()); + } + + // A negative workers value must be rejected up-front by ZStandardCodec. + @Test + public void testNegativeWorkersIsRejected() { + Configuration conf = new Configuration(); + conf.setInt("io.compression.codec.zstd.workers", -1); + ZStandardCodec codec = new ZStandardCodec(); + codec.setConf(conf); + assertThrows(IllegalArgumentException.class, codec::createCompressor); + } + + // The default value (workers=0) must keep behaviour identical to before. + @Test + public void testDefaultWorkersIsZero() throws Exception { + Configuration conf = new Configuration(); + ZStandardCodec codec = new ZStandardCodec(); + codec.setConf(conf); + assertEquals(0, ZStandardCodec.getCompressionWorkers(conf)); + + byte[] bytes = FileUtils.readFileToByteArray(uncompressedFile); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + Compressor compressor = codec.createCompressor(); + try (CompressionOutputStream outputStream = + codec.createOutputStream(baos, compressor)) { + outputStream.write(bytes); + outputStream.finish(); + } + assertTrue(compressor.finished()); + assertEquals(bytes.length, compressor.getBytesRead()); + } + + // reinit() should pick up an updated workers value for pooled compressors. + @Test + public void testReinitUpdatesWorkers() throws Exception { + byte[] bytes = FileUtils.readFileToByteArray(uncompressedFile); + + ZStandardCodec codec = new ZStandardCodec(); + codec.setConf(new Configuration()); + Compressor compressor = codec.createCompressor(); + + Configuration newConf = new Configuration(); + newConf.setInt("io.compression.codec.zstd.workers", 2); + compressor.reinit(newConf); + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + try (CompressionOutputStream outputStream = + codec.createOutputStream(baos, compressor)) { + outputStream.write(bytes); + outputStream.finish(); + } + + // Round-trip to confirm the output is still valid zstd data. + ByteArrayOutputStream decompressed = new ByteArrayOutputStream(); + ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray()); + Decompressor decompressor = codec.createDecompressor(); + try (CompressionInputStream inputStream = + codec.createInputStream(bais, decompressor)) { + byte[] buf = new byte[4096]; + int n; + while ((n = inputStream.read(buf, 0, buf.length)) != -1) { + decompressed.write(buf, 0, n); + } + } + assertArrayEquals(bytes, decompressed.toByteArray()); + } + public static byte[] generate(int size) { byte[] data = new byte[size]; for (int i = 0; i < size; i++) {