diff --git a/Cargo.toml b/Cargo.toml index b7cf9cf..0bf139e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -38,14 +38,11 @@ bytes = "1.4" chrono = { version = "0.4.37", default-features = false, features = ["std"] } chrono-tz = "0.9" fallible-streaming-iterator = { version = "0.1" } -flate2 = "1" -lz4_flex = "0.11" lzokay-native = "0.1" num = "0.4.1" prost = { version = "0.12" } snafu = "0.8" -snap = "1.1" -zstd = "0.12" +libcramjam = { version = "*", default-features = false, features = ["snappy", "zstd", "lz4", "zlib", "deflate"] } # async support async-trait = { version = "0.1.77", optional = true } diff --git a/src/compression.rs b/src/compression.rs index ffd6038..1ec3a50 100644 --- a/src/compression.rs +++ b/src/compression.rs @@ -18,8 +18,6 @@ // Modified from https://github.com/DataEngineeringLabs/orc-format/blob/416490db0214fc51d53289253c0ee91f7fc9bc17/src/read/decompress/mod.rs //! Related code for handling decompression of ORC files. -use std::io::Read; - use bytes::{Bytes, BytesMut}; use fallible_streaming_iterator::FallibleStreamingIterator; use snafu::ResultExt; @@ -141,32 +139,28 @@ struct Lz4 { impl DecompressorVariant for Zlib { fn decompress_block(&self, compressed_bytes: &[u8], scratch: &mut Vec) -> Result<()> { - let mut gz = flate2::read::DeflateDecoder::new(compressed_bytes); scratch.clear(); - gz.read_to_end(scratch).context(error::IoSnafu)?; + libcramjam::deflate::decompress(compressed_bytes, scratch).context(error::IoSnafu)?; Ok(()) } } impl DecompressorVariant for Zstd { fn decompress_block(&self, compressed_bytes: &[u8], scratch: &mut Vec) -> Result<()> { - let mut reader = - zstd::Decoder::new(compressed_bytes).context(error::BuildZstdDecoderSnafu)?; scratch.clear(); - reader.read_to_end(scratch).context(error::IoSnafu)?; + libcramjam::zstd::decompress(compressed_bytes, scratch).context(error::IoSnafu)?; Ok(()) } } impl DecompressorVariant for Snappy { fn decompress_block(&self, compressed_bytes: &[u8], scratch: &mut Vec) -> Result<()> { - let len = - snap::raw::decompress_len(compressed_bytes).context(error::BuildSnappyDecoderSnafu)?; - scratch.resize(len, 0); - let mut decoder = snap::raw::Decoder::new(); - decoder - .decompress(compressed_bytes, scratch) + let len = libcramjam::snappy::snap::raw::decompress_len(compressed_bytes) .context(error::BuildSnappyDecoderSnafu)?; + scratch.resize(len, 0); + let n = libcramjam::snappy::raw::decompress(compressed_bytes, scratch.as_mut_slice()) + .context(error::IoSnafu)?; + scratch.truncate(n); Ok(()) } } @@ -184,12 +178,10 @@ impl DecompressorVariant for Lzo { impl DecompressorVariant for Lz4 { fn decompress_block(&self, compressed_bytes: &[u8], scratch: &mut Vec) -> Result<()> { - let decompressed = - lz4_flex::block::decompress(compressed_bytes, self.max_decompressed_block_size) - .context(error::BuildLz4DecoderSnafu)?; - // TODO: better way to utilize scratch here - scratch.clear(); - scratch.extend(decompressed); + scratch.resize(self.max_decompressed_block_size, 0); + let n = libcramjam::lz4::block::decompress_into(compressed_bytes, scratch, None) + .context(error::IoSnafu)?; + scratch.truncate(n); Ok(()) } } diff --git a/src/error.rs b/src/error.rs index 02713e2..31ee549 100644 --- a/src/error.rs +++ b/src/error.rs @@ -142,7 +142,7 @@ pub enum OrcError { BuildSnappyDecoder { #[snafu(implicit)] location: Location, - source: snap::Error, + source: libcramjam::snappy::snap::Error, }, #[snafu(display("Failed to build lzo decoder: {}", source))] @@ -156,7 +156,7 @@ pub enum OrcError { BuildLz4Decoder { #[snafu(implicit)] location: Location, - source: lz4_flex::block::DecompressError, + source: io::Error, }, #[snafu(display("Arrow error: {}", source))]