Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions .github/workflows/cvmfs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,21 @@ jobs:
arch: ${{ matrix.arch }}
- uses: julia-actions/cache@a45e8fa8be21c18a06b7177052533149e61e9b38 # v3.1.0
- uses: julia-actions/julia-buildpkg@e3eb439fad4f9aba7da2667e7510e4a46ebc46e1 # v1.7.0
- name: Generate root file
- name: Generate root files (no compression + LZ4 + ZSTD + ZLIB)
run: |
julia --code-coverage --project ./test/RNTupleWriting/output_sample.jl test1.root
for comp in 0 404 505 101; do
julia --code-coverage --project ./test/RNTupleWriting/output_sample.jl sample_$comp.root $comp
done
- uses: cvmfs-contrib/github-action-cvmfs@10197e000cc0add8e54ac4fb73d3ed44e2de72b4 # v5.5
- name: Read root file in C++
- name: Read and verify files in C++ ROOT
run: |
source /cvmfs/sft.cern.ch/lcg/views/dev3/latest/x86_64-ubuntu2204-gcc11-opt/setup.sh
python ./test/RNTupleWriting/test1.py test1.root
set -e
for comp in 0 404 505 101; do
echo "::group::Validate compression=$comp"
python ./test/RNTupleWriting/validate_rntuple.py sample_$comp.root
echo "::endgroup::"
done
- uses: julia-actions/julia-processcoverage@03114f09f119417c3242a9fb6e0b722676aedf38 # v1.2.2
- uses: codecov/codecov-action@e79a6962e0d4c0c17b229090214935d2e33f8354 # v6.0.1
env:
Expand Down
137 changes: 101 additions & 36 deletions src/RNTuple/Writing/TFileWriter.jl
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,26 @@ function rnt_write(io::IO, x::UnROOT.RNTuplePageInnerList)
write(io, temp_io)
end

function rnt_write(io::IO, x::UnROOT.PageLink; envelope=true)
# Writer-only inner page list: carries the per-column element offset and 32-bit
# compression settings that trail the inner list frame (see the "Page Locations"
# section of the RNTuple spec). The reader's `RNTuplePageInnerList` skips this
# trailer, so it does not need these fields.
struct InnerPageListWrite
pages::Vector{PageDescription}
element_offset::Int64
compression::UInt32
end

# Writer-only page-link envelope whose nested list carries `InnerPageListWrite`
# items (with element offset + compression settings). Field names match
# `UnROOT.PageLink` so the serialization below is shared.
struct PageLinkWrite
header_checksum::UInt64
cluster_summaries::Vector{ClusterSummary}
nested_page_locations::RNTuplePageTopList{RNTuplePageOuterList{InnerPageListWrite}}
end

function rnt_write(io::IO, x::Union{UnROOT.PageLink,PageLinkWrite}; envelope=true)
temp_io = IOBuffer()
rnt_write(temp_io, x.header_checksum)
rnt_write(temp_io, Write_RNTupleListFrame(x.cluster_summaries))
Expand Down Expand Up @@ -534,12 +553,32 @@ function schema_to_field_column_records(table)
return field_records, column_records
end

function generate_page_links(pages, page_positions)
outer_list = RNTuplePageOuterList{RNTuplePageInnerList{PageDescription}}([])
for (page, pos) in zip(pages, page_positions)
inner_list = RNTuplePageInnerList([
PageDescription(page.num_elements, Locator(length(page.data), pos))
])
function rnt_write(io::IO, x::InnerPageListWrite)
temp_io = IOBuffer()
for p in x.pages
rnt_write(temp_io, p)
end
write(temp_io, x.element_offset, x.compression)
size = position(temp_io) + sizeof(Int64) + sizeof(Int32)
write(io, Int64(-size))
write(io, Int32(length(x.pages)))
seekstart(temp_io)
write(io, temp_io)
end

"""
generate_page_links(page_locators, compression) -> RNTuplePageTopList

Build the nested page-location list for a single cluster. `page_locators` is a
vector of `(num_elements, nbytes, offset)` for each column's single page;
`compression` is the fCompress code recorded per column.
"""
function generate_page_links(page_locators, compression::Integer)
outer_list = RNTuplePageOuterList{InnerPageListWrite}([])
for (num_elements, nbytes, pos) in page_locators
inner_list = InnerPageListWrite(
[PageDescription(num_elements, Locator(nbytes, pos))],
0, UInt32(compression))
push!(outer_list, inner_list)
end
return RNTuplePageTopList([outer_list])
Expand All @@ -550,22 +589,27 @@ _tkey32_len(class, name, title) =
Int16(26 + 3 + ncodeunits(class) + ncodeunits(name) + ncodeunits(title))

"""
_write_rblob(file::IO, payload::AbstractVector{UInt8}, fdatime) -> Int64

Write an RBlob TKey followed by `payload` with correctly computed key sizes.
Returns the file position where the payload starts (for locators).
_write_rblob(file::IO, payload::AbstractVector{UInt8}, fdatime; compression=0) -> (pos, nbytes)

Write an RBlob TKey followed by `payload` with correctly computed key sizes,
optionally compressing the payload with ROOT's block framing for the given
`compression` (fCompress) code. The key's `fObjLen` records the uncompressed
size and `fNbytes` the on-disk size, so a reader can tell whether the blob is
compressed. Returns the file position where the (possibly compressed) payload
starts and its on-disk byte count (for locators).
"""
function _write_rblob(file::IO, payload::AbstractVector{UInt8}, fdatime)
function _write_rblob(file::IO, payload::AbstractVector{UInt8}, fdatime; compression::Integer=0)
ondisk = _root_compress(payload, compression)
klen = _tkey32_len("RBlob", "", "")
key = RBlob(; fNbytes = Int32(klen + length(payload)), fVersion = 4,
key = RBlob(; fNbytes = Int32(klen + length(ondisk)), fVersion = 4,
fObjLen = Int32(length(payload)), fDatime = fdatime,
fKeyLen = klen, fCycle = 1,
fSeekKey = Int32(position(file)), fSeekPdir = 100,
fClassName = "RBlob", fName = "", fTitle = "")
rnt_write(file, key)
pos = position(file)
write(file, payload)
return pos
write(file, ondisk)
return pos, length(ondisk)
end

_buffer_bytes(writer::Function) = (io = IOBuffer(); writer(io); take!(io))
Expand All @@ -584,8 +628,13 @@ function _root_datime(t::Base.Libc.TmStruct = Base.Libc.TmStruct(time()))
UInt32(t.min) << 6 | UInt32(t.sec)
end

# Default on-disk compression: LZ4 (algorithm 4) at level 4, i.e. ROOT fCompress
# code 404 — matching ROOT's RNTuple default for the LZ4 algorithm.
const RNT_DEFAULT_COMPRESSION = 100 * Const.kLZ4 + 4

"""
write_rntuple(file::IO, table; file_name="test_ntuple_minimal.root", rntuple_name="myntuple")
write_rntuple(file::IO, table; file_name="test_ntuple_minimal.root",
rntuple_name="myntuple", compression=$(100 * 4 + 4))

Write `table` (any Tables.jl-compatible table, e.g. a `NamedTuple` of vectors)
into `file` as an RNTuple inside a freshly created ROOT file structure. The
Expand All @@ -595,8 +644,13 @@ Supported column element types: `Bool` (bit column), `Int8`–`Int64`,
`UInt8`–`UInt64`, `Float16`/`Float32`/`Float64`, `String`, and (nested)
`Vector`s of these.

Current limitations: data is written uncompressed as a single cluster with one
page per column; struct/union columns are not supported.
`compression` is a ROOT `fCompress` code (`algorithm*100 + level`); pass `0` for
no compression. Supported algorithms: `4` LZ4 (default, level 4), `1` ZLIB,
`5` ZSTD. Each page and the header/footer/page-list envelopes are compressed
independently, and any block that fails to shrink is stored uncompressed.

Current limitations: data is written as a single cluster with one page per
column; struct/union columns are not supported.

# Example
```julia
Expand All @@ -607,7 +661,8 @@ julia> open("out.root", "w") do io
julia> LazyTree("out.root", "t")
```
"""
function write_rntuple(file::IO, table; file_name="test_ntuple_minimal.root", rntuple_name="myntuple")
function write_rntuple(file::IO, table; file_name="test_ntuple_minimal.root",
rntuple_name="myntuple", compression::Integer=RNT_DEFAULT_COMPRESSION)
if !istable(table)
error("RNTuple writing accepts object compatible with Tables.jl interface, got type $(typeof(table))")
end
Expand Down Expand Up @@ -648,7 +703,7 @@ function write_rntuple(file::IO, table; file_name="test_ntuple_minimal.root", rn
0, 0, # fEND, fSeekFree (patched at the end)
fNbytesFree, 1, # fNbytesFree, nfree
fNbytesName, 0x04,
0, # fCompress
Int32(compression), # fCompress
0, fNbytesInfo, # fSeekInfo (patched), fNbytesInfo
zeros(SVector{18,UInt8}))
fileheader_obs = rnt_write_observe(file, fileheader)
Expand All @@ -675,42 +730,52 @@ function write_rntuple(file::IO, table; file_name="test_ntuple_minimal.root", rn
field_records, col_records,
UnROOT.AliasRecord[], UnROOT.ExtraTypeInfo[])
header_bytes = _buffer_bytes(io -> rnt_write(io, rnt_header))
fSeekHeader = _write_rblob(file, header_bytes, fdatime)
fSeekHeader, header_nbytes = _write_rblob(file, header_bytes, fdatime; compression)

# pages (one page per column, one cluster), all in one RBlob
# pages (one page per column, one cluster), all in one RBlob. Each page is
# compressed independently and carries an XxHash-3 checksum of its on-disk
# (compressed) bytes, so the per-column locators can point inside the blob.
pages_arys = mapreduce(rnt_col_to_ary, vcat, input_cols)
@assert length(pages_arys) == length(col_records)
pages = [rnt_ary_to_page(ary, cr) for (ary, cr) in zip(pages_arys, col_records)]
pages_payload = _buffer_bytes(io -> foreach(p -> rnt_write(io, p), pages))
pages_begin = _write_rblob(file, pages_payload, fdatime)
page_positions = Vector{Int64}(undef, length(pages))
page_ondisk = [_root_compress(p.data, compression) for p in pages]
pages_payload = _buffer_bytes() do io
for od in page_ondisk
write(io, od)
write(io, xxh3_64(od)) # checksum over the on-disk (compressed) bytes
end
end
pages_begin, _ = _write_rblob(file, pages_payload, fdatime) # container itself not re-compressed
page_locators = Vector{Tuple{Int32,Int64,Int64}}(undef, length(pages))
let pos = pages_begin
for (i, p) in enumerate(pages)
page_positions[i] = pos
pos += length(p.data) + 8 # data + xxh3 checksum
for i in eachindex(pages)
nbytes = length(page_ondisk[i])
page_locators[i] = (pages[i].num_elements, nbytes, pos)
pos += nbytes + 8 # on-disk data + xxh3 checksum
end
end

# page list envelope
header_checksum = _checksum(rnt_header)
cluster_summary = Write_RNTupleListFrame([ClusterSummary(0, input_length)])
nested_page_locations = generate_page_links(pages, page_positions)
pagelink = UnROOT.PageLink(header_checksum, cluster_summary.payload, nested_page_locations)
nested_page_locations = generate_page_links(page_locators, compression)
pagelink = PageLinkWrite(header_checksum, cluster_summary.payload, nested_page_locations)
pagelink_bytes = _buffer_bytes(io -> rnt_write(io, pagelink))
pagelink_pos = _write_rblob(file, pagelink_bytes, fdatime)
pagelink_pos, pagelink_nbytes = _write_rblob(file, pagelink_bytes, fdatime; compression)

# footer envelope
rnt_footer = UnROOT.RNTupleFooter(0, header_checksum, UnROOT.RNTupleSchemaExtension([], [], [], []), [
UnROOT.ClusterGroupRecord(0, input_length, 1,
UnROOT.EnvLink(length(pagelink_bytes), UnROOT.Locator(length(pagelink_bytes), pagelink_pos))),
UnROOT.EnvLink(length(pagelink_bytes), UnROOT.Locator(pagelink_nbytes, pagelink_pos))),
])
footer_bytes = _buffer_bytes(io -> rnt_write(io, rnt_footer))
fSeekFooter = _write_rblob(file, footer_bytes, fdatime)
fSeekFooter, footer_nbytes = _write_rblob(file, footer_bytes, fdatime; compression)

# anchor: all locator values are known by now, no patching needed
# anchor: all locator values are known by now, no patching needed.
# fNBytes* is the on-disk (compressed) size; fLen* is the uncompressed size.
rnt_anchor = UnROOT.ROOT_3a3a_RNTuple(1, 0, 0, 0,
fSeekHeader, length(header_bytes), length(header_bytes),
fSeekFooter, length(footer_bytes), length(footer_bytes),
fSeekHeader, header_nbytes, length(header_bytes),
fSeekFooter, footer_nbytes, length(footer_bytes),
0x0000000040000000, 0) # checksum computed in rnt_write
tkey32_anchor = UnROOT.TKey32(klen_anchor + anchor_objlen, 4, anchor_objlen, fdatime,
klen_anchor, 1, position(file), 100, "ROOT::RNTuple", rntuple_name, "")
Expand Down
99 changes: 99 additions & 0 deletions src/RNTuple/Writing/compression.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
using XXHashNative: xxh64

# ROOT compression block framing for the RNTuple writer.
#
# ROOT stores a compressed buffer as a sequence of blocks. Each block starts
# with a 9-byte header
#
# [2-byte algorithm tag][1-byte method][3-byte LE compressed size][3-byte LE uncompressed size]
#
# For LZ4 an 8-byte big-endian XxHash-64 checksum of the compressed bytes
# follows the header and is *included* in the compressed size. A single block
# can describe at most 2^24-1 bytes, so larger payloads are split.
#
# This mirrors `uproot.compression.compress` and is the inverse of the read
# path in `decompress_bytes!` / `decompress_datastreambytes`.

const _RNT_3BYTE_MAX = (1 << 24) - 1

# ROOT's fCompress code: algorithm * 100 + level.
_rnt_compression_algo(fCompress::Integer) = Int(fCompress) ÷ 100
_rnt_compression_level(fCompress::Integer) = Int(fCompress) % 100

function _write_3byte_le!(io::IO, n::Integer)
write(io, UInt8(n & 0xff))
write(io, UInt8((n >> 8) & 0xff))
write(io, UInt8((n >> 16) & 0xff))
return nothing
end

# Compress one block (already <= 2^24-1 bytes) and emit its framed bytes.
function _write_compressed_block!(io::IO, algo::Int, level::Int, block::Vector{UInt8})
if algo == Const.kLZ4
comp = lz4_hc_compress(block, level)
write(io, UInt8('L'), UInt8('4'), 0x01)
_write_3byte_le!(io, length(comp) + 8) # compressed size counts the checksum
_write_3byte_le!(io, length(block))
write(io, hton(xxh64(comp))) # 8-byte big-endian checksum
write(io, comp)
elseif algo == Const.kZLIB
comp = _zlib_compress(block)
write(io, UInt8('Z'), UInt8('L'), 0x08)
_write_3byte_le!(io, length(comp))
_write_3byte_le!(io, length(block))
write(io, comp)
elseif algo == Const.kZSTD
comp = transcode(ZstdCompressor, block)
write(io, UInt8('Z'), UInt8('S'), 0x01)
_write_3byte_le!(io, length(comp))
_write_3byte_le!(io, length(block))
write(io, comp)
else
error("Unsupported RNTuple write-compression algorithm code $algo " *
"(supported: $(Const.kLZ4)=LZ4, $(Const.kZLIB)=ZLIB, $(Const.kZSTD)=ZSTD)")
end
return nothing
end

function _zlib_compress(block::Vector{UInt8})
# For incompressible input libdeflate emits *stored* deflate blocks, which
# are slightly larger than the input (5 bytes per 65535-byte block, plus the
# 6-byte zlib header/adler). Size the buffer with a safe upper bound so the
# call can never fail for lack of space (which would otherwise return a
# LibDeflateError, not bytes).
bound = length(block) + 5 * cld(length(block), 65535) + 64
out = Vector{UInt8}(undef, bound)
n = zlib_compress!(Compressor(), out, block)
n isa Integer || error("zlib compression failed: $n")
resize!(out, n)
return out
end

"""
_root_compress(payload, fCompress) -> Vector{UInt8}

Compress `payload` using ROOT's block framing for the `fCompress` setting
(`algorithm*100 + level`; `0` means no compression). Returns the on-disk bytes.
As ROOT does, if compression does not shrink the data the original `payload` is
returned unchanged, so a reader detects the absence of compression by comparing
the on-disk size to the uncompressed size.
"""
function _root_compress(payload::AbstractVector{UInt8}, fCompress::Integer)
(fCompress == 0 || isempty(payload)) && return payload
algo = _rnt_compression_algo(fCompress)
level = _rnt_compression_level(fCompress)
level == 0 && return payload

n = length(payload)
io = IOBuffer()
pos = 1
while pos <= n
stop = min(pos + _RNT_3BYTE_MAX - 1, n)
block = payload isa Vector{UInt8} && pos == 1 && stop == n ?
payload : collect(@view payload[pos:stop])
_write_compressed_block!(io, algo, level, block)
pos = stop + 1
end
out = take!(io)
return length(out) < n ? out : convert(Vector{UInt8}, payload)
end
3 changes: 2 additions & 1 deletion src/UnROOT.jl
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ using CodecLz4, CodecXz, CodecZstd, StaticArrays, LorentzVectors, ArraysOfArrays
using LRUCache
import IterTools: groupby

using LibDeflate: zlib_decompress!, Decompressor, crc32
using LibDeflate: zlib_decompress!, zlib_compress!, Decompressor, Compressor, crc32
using BitIntegers: @define_integers

import Tables, PrettyTables
Expand Down Expand Up @@ -67,6 +67,7 @@ include("RNTuple/fieldcolumn_reading.jl")
include("RNTuple/displays.jl")

include("RNTuple/Writing/page_writing.jl")
include("RNTuple/Writing/compression.jl")
include("RNTuple/Writing/TFileWriter.jl")
include("RNTuple/Writing/Stubs.jl")

Expand Down
Loading