-
Notifications
You must be signed in to change notification settings - Fork 20
Expand file tree
/
Copy pathcompression.jl
More file actions
99 lines (90 loc) · 3.9 KB
/
Copy pathcompression.jl
File metadata and controls
99 lines (90 loc) · 3.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
using XXHashNative: xxh64
# ROOT compression block framing for the RNTuple writer.
#
# ROOT stores a compressed buffer as a sequence of blocks. Each block starts
# with a 9-byte header
#
# [2-byte algorithm tag][1-byte method][3-byte LE compressed size][3-byte LE uncompressed size]
#
# For LZ4 an 8-byte big-endian XxHash-64 checksum of the compressed bytes
# follows the header and is *included* in the compressed size. A single block
# can describe at most 2^24-1 bytes, so larger payloads are split.
#
# This mirrors `uproot.compression.compress` and is the inverse of the read
# path in `decompress_bytes!` / `decompress_datastreambytes`.
const _RNT_3BYTE_MAX = (1 << 24) - 1
# ROOT's fCompress code: algorithm * 100 + level.
_rnt_compression_algo(fCompress::Integer) = Int(fCompress) ÷ 100
_rnt_compression_level(fCompress::Integer) = Int(fCompress) % 100
function _write_3byte_le!(io::IO, n::Integer)
write(io, UInt8(n & 0xff))
write(io, UInt8((n >> 8) & 0xff))
write(io, UInt8((n >> 16) & 0xff))
return nothing
end
# Compress one block (already <= 2^24-1 bytes) and emit its framed bytes.
function _write_compressed_block!(io::IO, algo::Int, level::Int, block::Vector{UInt8})
if algo == Const.kLZ4
comp = lz4_hc_compress(block, level)
write(io, UInt8('L'), UInt8('4'), 0x01)
_write_3byte_le!(io, length(comp) + 8) # compressed size counts the checksum
_write_3byte_le!(io, length(block))
write(io, hton(xxh64(comp))) # 8-byte big-endian checksum
write(io, comp)
elseif algo == Const.kZLIB
comp = _zlib_compress(block)
write(io, UInt8('Z'), UInt8('L'), 0x08)
_write_3byte_le!(io, length(comp))
_write_3byte_le!(io, length(block))
write(io, comp)
elseif algo == Const.kZSTD
comp = transcode(ZstdCompressor, block)
write(io, UInt8('Z'), UInt8('S'), 0x01)
_write_3byte_le!(io, length(comp))
_write_3byte_le!(io, length(block))
write(io, comp)
else
error("Unsupported RNTuple write-compression algorithm code $algo " *
"(supported: $(Const.kLZ4)=LZ4, $(Const.kZLIB)=ZLIB, $(Const.kZSTD)=ZSTD)")
end
return nothing
end
function _zlib_compress(block::Vector{UInt8})
# For incompressible input libdeflate emits *stored* deflate blocks, which
# are slightly larger than the input (5 bytes per 65535-byte block, plus the
# 6-byte zlib header/adler). Size the buffer with a safe upper bound so the
# call can never fail for lack of space (which would otherwise return a
# LibDeflateError, not bytes).
bound = length(block) + 5 * cld(length(block), 65535) + 64
out = Vector{UInt8}(undef, bound)
n = zlib_compress!(Compressor(), out, block)
n isa Integer || error("zlib compression failed: $n")
resize!(out, n)
return out
end
"""
_root_compress(payload, fCompress) -> Vector{UInt8}
Compress `payload` using ROOT's block framing for the `fCompress` setting
(`algorithm*100 + level`; `0` means no compression). Returns the on-disk bytes.
As ROOT does, if compression does not shrink the data the original `payload` is
returned unchanged, so a reader detects the absence of compression by comparing
the on-disk size to the uncompressed size.
"""
function _root_compress(payload::AbstractVector{UInt8}, fCompress::Integer)
(fCompress == 0 || isempty(payload)) && return payload
algo = _rnt_compression_algo(fCompress)
level = _rnt_compression_level(fCompress)
level == 0 && return payload
n = length(payload)
io = IOBuffer()
pos = 1
while pos <= n
stop = min(pos + _RNT_3BYTE_MAX - 1, n)
block = payload isa Vector{UInt8} && pos == 1 && stop == n ?
payload : collect(@view payload[pos:stop])
_write_compressed_block!(io, algo, level, block)
pos = stop + 1
end
out = take!(io)
return length(out) < n ? out : convert(Vector{UInt8}, payload)
end