Skip to content

Commit 10c3cbf

Browse files
committed
initial implementation
1 parent 5a58108 commit 10c3cbf

37 files changed

Lines changed: 2959 additions & 153 deletions

src/RNTuple/Writing/TFileWriter.jl

Lines changed: 167 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,8 @@ function color_diff(ary1, ary2)
3131
end
3232

3333
function rnt_write(io::IO, x::AbstractString; legacy=false)
34-
L = length(x)
34+
# byte count, not character count: we write codeunits below
35+
L = ncodeunits(x)
3536
if legacy
3637
if L > typemax(UInt8)
3738
error("String longer than 255 not implemented")
@@ -58,6 +59,7 @@ end
5859

5960
struct Page_write{T <: AbstractVector{UInt8}}
6061
data::T
62+
num_elements::Int32
6163
end
6264

6365
function rnt_write(io::IO, x::Page_write; checksum=true)
@@ -187,13 +189,15 @@ function rnt_write(io::IO, x::UnROOT.ColumnRecord)
187189
rnt_write(io, x.nbits)
188190
rnt_write(io, x.field_id)
189191
rnt_write(io, x.flags)
192+
# final (1.0) spec: representation index first, then the optional first
193+
# element index gated on the deferred-column flag 0x01
194+
rnt_write(io, x.representation_idx)
190195
if !iszero(x.first_ele_idx)
191-
if x.flags != 0x08
192-
error("First element index is set but flags is not 0x08")
196+
if iszero(x.flags & 0x01)
197+
error("First element index is set but the deferred-column flag (0x01) is not")
193198
end
194199
rnt_write(io, x.first_ele_idx)
195200
end
196-
rnt_write(io, x.representation_idx)
197201
end
198202

199203
function rnt_write(io::IO, x::RNTupleFrame{T}) where T
@@ -497,10 +501,14 @@ function add_field_column_record!(field_records, column_records, input_T::Type{<
497501
nothing
498502
end
499503

504+
_rnt_cpp_typename(T::Type{<:Real}) = RNT_WRITE_CPP_TYPE_NAME_DICT[T]
505+
_rnt_cpp_typename(::Type{<:AbstractString}) = "std::string"
506+
_rnt_cpp_typename(T::Type{<:AbstractVector}) = "std::vector<" * _rnt_cpp_typename(eltype(T)) * ">"
507+
500508
# vector case
501509
function add_field_column_record!(field_records, column_records, input_T::Type{<:AbstractVector}, NAME; parent_field_id, col_field_id = parent_field_id)
502510
implicit_field_id = length(field_records)
503-
fr = UnROOT.FieldRecord(; field_version=0x00000000, type_version=0x00000000, parent_field_id, struct_role=0x0001, flags=0x0000, repetition=0, source_field_id=-1, root_streamer_checksum=-1, field_name=string(NAME), type_name="", type_alias="", field_desc="", )
511+
fr = UnROOT.FieldRecord(; field_version=0x00000000, type_version=0x00000000, parent_field_id, struct_role=0x0001, flags=0x0000, repetition=0, source_field_id=-1, root_streamer_checksum=-1, field_name=string(NAME), type_name=_rnt_cpp_typename(input_T), type_alias="", field_desc="", )
504512
push!(field_records, fr)
505513
rnt_col_type = RNT_COL_TYPE_TABLE[RNT_WRITE_JL_TYPE_DICT[Index64] + 1]
506514
cr_offset = UnROOT.ColumnRecord(rnt_col_type.type, rnt_col_type.nbits, col_field_id, 0x00, 0x00, 0)
@@ -526,115 +534,186 @@ function schema_to_field_column_records(table)
526534
return field_records, column_records
527535
end
528536

529-
function generate_page_links(column_records, pages_obses)
537+
function generate_page_links(pages, page_positions)
530538
outer_list = RNTuplePageOuterList{RNTuplePageInnerList{PageDescription}}([])
531-
for (cr, page_obs) in zip(column_records, pages_obses)
532-
Nbytes = length(page_obs.object.data)
533-
Nitems = Nbytes * 8 ÷ cr.nbits
539+
for (page, pos) in zip(pages, page_positions)
534540
inner_list = RNTuplePageInnerList([
535-
PageDescription(Nitems, Locator(Nbytes, page_obs.position))
541+
PageDescription(page.num_elements, Locator(length(page.data), pos))
536542
])
537-
push!(outer_list, inner_list)
543+
push!(outer_list, inner_list)
538544
end
539545
return RNTuplePageTopList([outer_list])
540546
end
541547

548+
# serialized TKey32 header (26 bytes of fixed fields) + three length-prefixed strings
549+
_tkey32_len(class, name, title) =
550+
Int16(26 + 3 + ncodeunits(class) + ncodeunits(name) + ncodeunits(title))
551+
552+
"""
553+
_write_rblob(file::IO, payload::AbstractVector{UInt8}) -> Int64
554+
555+
Write an RBlob TKey followed by `payload` with correctly computed key sizes.
556+
Returns the file position where the payload starts (for locators).
557+
"""
558+
function _write_rblob(file::IO, payload::AbstractVector{UInt8})
559+
klen = _tkey32_len("RBlob", "", "")
560+
key = RBlob(; fNbytes = Int32(klen + length(payload)), fVersion = 4,
561+
fObjLen = Int32(length(payload)), fDatime = Stubs.WRITE_TIME,
562+
fKeyLen = klen, fCycle = 1,
563+
fSeekKey = Int32(position(file)), fSeekPdir = 100,
564+
fClassName = "RBlob", fName = "", fTitle = "")
565+
rnt_write(file, key)
566+
pos = position(file)
567+
write(file, payload)
568+
return pos
569+
end
570+
571+
_buffer_bytes(writer::Function) = (io = IOBuffer(); writer(io); take!(io))
572+
573+
"""
574+
write_rntuple(file::IO, table; file_name="test_ntuple_minimal.root", rntuple_name="myntuple")
575+
576+
Write `table` (any Tables.jl-compatible table, e.g. a `NamedTuple` of vectors)
577+
into `file` as an RNTuple inside a freshly created ROOT file structure. The
578+
output is readable by UnROOT itself, uproot, and ROOT (≥ 6.34).
579+
580+
Supported column element types: `Bool` (bit column), `Int8`–`Int64`,
581+
`UInt8`–`UInt64`, `Float16`/`Float32`/`Float64`, `String`, and (nested)
582+
`Vector`s of these.
583+
584+
Current limitations: data is written uncompressed as a single cluster with one
585+
page per column; struct/union columns are not supported.
586+
587+
# Example
588+
```julia
589+
julia> open("out.root", "w") do io
590+
UnROOT.write_rntuple(io, (x=[1.0, 2.0], s=["a", "b"]); rntuple_name="t")
591+
end
592+
593+
julia> LazyTree("out.root", "t")
594+
```
595+
"""
542596
function write_rntuple(file::IO, table; file_name="test_ntuple_minimal.root", rntuple_name="myntuple")
543597
if !istable(table)
544598
error("RNTuple writing accepts object compatible with Tables.jl interface, got type $(typeof(table))")
545599
end
546600

547601
input_cols = columntable(table)
548-
if !allequal(length, input_cols)
602+
if !allequal(map(length, values(input_cols)))
549603
error("Top-level columns must have the same length")
550604
end
551605
input_length = length(input_cols[begin])
552606

553-
rntAnchor_update = Dict{Symbol, Any}()
554-
555-
file_preamble_obs = rnt_write_observe(file, Stubs.file_preamble)
556-
fileheader_obs = rnt_write_observe(file, Stubs.fileheader)
557-
dummy_padding1_obs = rnt_write_observe(file, Stubs.dummy_padding1)
558-
559-
fileheader_obs[:fBEGIN] = UInt32(position(file))
560-
561-
tkey32_tfile_obs = rnt_write_observe(file, Stubs.tkey32_tfile)
562-
tkey32_tfile_obs[:fName] = file_name
563-
tfile_obs = rnt_write_observe(file, Stubs.tfile)
564-
565-
tdirectory32_obs = rnt_write_observe(file, Stubs.tdirectory32)
566-
dummy_padding2_obs = rnt_write_observe(file, Stubs.dummy_padding2)
567-
568-
RBlob1 = UnROOT.RBlob(; fNbytes = 0x00DC, fVersion = 0x0004, fObjLen = 0x000000BA, fDatime = Stubs.WRITE_TIME, fKeyLen = 34,
569-
fCycle = 0x0001, fSeekKey = position(file), fSeekPdir = 100, fClassName = "RBlob", fName = "", fTitle = "")
570-
RBlob1_update = Dict{Symbol, Any}()
571-
RBlob1_obs = rnt_write_observe(file, RBlob1)
607+
# name-dependent sizes of the TFile container records
608+
klen_tfile = _tkey32_len("TFile", file_name, "")
609+
tnamed_len = 2 + ncodeunits(file_name) # (1+name) + (1+empty title)
610+
fNbytesName = Int32(klen_tfile + tnamed_len)
611+
tfile_objlen = Int32(tnamed_len + 30 + 30) # TNamed + directory header + padding
612+
klen_dir = _tkey32_len("", file_name, "")
613+
klen_anchor = _tkey32_len("ROOT::RNTuple", rntuple_name, "")
614+
anchor_objlen = Int32(length(Stubs.magic_6bytes) + 72) # streamed header + anchor v2 payload + checksum
615+
fNbytesKeys = Int32(klen_dir + 4 + klen_anchor)
616+
klen_end = _tkey32_len("", file_name, "")
617+
fNbytesFree = Int32(klen_end + 10)
618+
fNbytesInfo = Int32(64 + length(Stubs.tsreamerinfo_compressed)) # constant streamer record
619+
620+
rnt_write(file, Stubs.file_preamble)
621+
fileheader = UnROOT.FileHeader32(
622+
100, # fBEGIN
623+
0, 0, # fEND, fSeekFree (patched at the end)
624+
fNbytesFree, 1, # fNbytesFree, nfree
625+
fNbytesName, 0x04,
626+
0, # fCompress
627+
0, fNbytesInfo, # fSeekInfo (patched), fNbytesInfo
628+
zeros(SVector{18,UInt8}))
629+
fileheader_obs = rnt_write_observe(file, fileheader)
630+
rnt_write(file, Stubs.dummy_padding1)
631+
@assert position(file) == 100
632+
633+
rnt_write(file, UnROOT.TKey32(klen_tfile + tfile_objlen, 4, tfile_objlen, Stubs.WRITE_TIME,
634+
klen_tfile, 1, 100, 0, "TFile", file_name, ""))
635+
rnt_write(file, UnROOT.TFile_write(file_name, ""))
636+
tdirectory32 = UnROOT.ROOTDirectoryHeader32(5, Stubs.WRITE_TIME, Stubs.WRITE_TIME,
637+
fNbytesKeys, fNbytesName, 100, 0,
638+
0) # fSeekKeys patched below
639+
tdirectory32_obs = rnt_write_observe(file, tdirectory32)
640+
rnt_write(file, Stubs.dummy_padding2)
641+
642+
# RNTuple header envelope
572643
field_records, col_records = schema_to_field_column_records(table)
573644
rnt_header = UnROOT.RNTupleHeader(
574-
zero(UInt64), rntuple_name, "", "ROOT v6.35.001",
575-
field_records, col_records,
576-
UnROOT.AliasRecord[], UnROOT.ExtraTypeInfo[]
577-
)
578-
579-
rntAnchor_update[:fSeekHeader] = UInt32(position(file))
580-
rnt_header_obs = rnt_write_observe(file, rnt_header)
581-
rntAnchor_update[:fNBytesHeader] = rnt_header_obs.len
582-
rntAnchor_update[:fLenHeader] = rnt_header_obs.len
583-
RBlob1_update[:fObjLen] = rnt_header_obs.len
584-
RBlob1_update[:fNbytes] = rnt_header_obs.len + 34
585-
586-
Base.setindex!(RBlob1_obs, RBlob1_update)
645+
zero(UInt64), rntuple_name, "", "ROOT v6.35.001",
646+
field_records, col_records,
647+
UnROOT.AliasRecord[], UnROOT.ExtraTypeInfo[])
648+
header_bytes = _buffer_bytes(io -> rnt_write(io, rnt_header))
649+
fSeekHeader = _write_rblob(file, header_bytes)
587650

588-
RBlob2_obs = rnt_write_observe(file, Stubs.RBlob2)
651+
# pages (one page per column, one cluster), all in one RBlob
589652
pages_arys = mapreduce(rnt_col_to_ary, vcat, input_cols)
590653
@assert length(pages_arys) == length(col_records)
591654
pages = [rnt_ary_to_page(ary, cr) for (ary, cr) in zip(pages_arys, col_records)]
592-
pages_obses = [rnt_write_observe(file, page) for page in pages]
655+
pages_payload = _buffer_bytes(io -> foreach(p -> rnt_write(io, p), pages))
656+
pages_begin = _write_rblob(file, pages_payload)
657+
page_positions = Vector{Int64}(undef, length(pages))
658+
let pos = pages_begin
659+
for (i, p) in enumerate(pages)
660+
page_positions[i] = pos
661+
pos += length(p.data) + 8 # data + xxh3 checksum
662+
end
663+
end
593664

594-
RBlob3_obs = rnt_write_observe(file, Stubs.RBlob3)
665+
# page list envelope
666+
header_checksum = _checksum(rnt_header)
595667
cluster_summary = Write_RNTupleListFrame([ClusterSummary(0, input_length)])
596-
nested_page_locations = generate_page_links(col_records, pages_obses)
597-
598-
pagelink = UnROOT.PageLink(_checksum(rnt_header_obs.object), cluster_summary.payload, nested_page_locations)
599-
pagelink_obs = rnt_write_observe(file, pagelink)
600-
601-
RBlob4_obs = rnt_write_observe(file, Stubs.RBlob4)
602-
rntAnchor_update[:fSeekFooter] = UInt32(position(file))
603-
rnt_footer = UnROOT.RNTupleFooter(0, _checksum(rnt_header_obs.object), UnROOT.RNTupleSchemaExtension([], [], [], []), [
604-
UnROOT.ClusterGroupRecord(0, input_length, 1, UnROOT.EnvLink(pagelink_obs.len, UnROOT.Locator(pagelink_obs.len, pagelink_obs.position, ))),
668+
nested_page_locations = generate_page_links(pages, page_positions)
669+
pagelink = UnROOT.PageLink(header_checksum, cluster_summary.payload, nested_page_locations)
670+
pagelink_bytes = _buffer_bytes(io -> rnt_write(io, pagelink))
671+
pagelink_pos = _write_rblob(file, pagelink_bytes)
672+
673+
# footer envelope
674+
rnt_footer = UnROOT.RNTupleFooter(0, header_checksum, UnROOT.RNTupleSchemaExtension([], [], [], []), [
675+
UnROOT.ClusterGroupRecord(0, input_length, 1,
676+
UnROOT.EnvLink(length(pagelink_bytes), UnROOT.Locator(length(pagelink_bytes), pagelink_pos))),
605677
])
606-
rnt_footer_obs = rnt_write_observe(file, rnt_footer)
607-
rntAnchor_update[:fNBytesFooter] = rnt_footer_obs.len
608-
rntAnchor_update[:fLenFooter] = rnt_footer_obs.len
609-
610-
tkey32_anchor_position = position(file)
611-
tkey32_anchor = UnROOT.TKey32(0x00000080, 4, typemin(Int32), Stubs.WRITE_TIME, 50, 1, tkey32_anchor_position, 100, "ROOT::RNTuple", rntuple_name, "")
612-
tkey32_anchor_obs1 = rnt_write_observe(file, tkey32_anchor)
613-
tkey32_anchor_update = Dict{Symbol, Any}()
614-
magic_6bytes_obs = rnt_write_observe(file, Stubs.magic_6bytes)
615-
rnt_anchor_obs = rnt_write_observe(file, Stubs.rnt_anchor)
616-
Base.setindex!(rnt_anchor_obs, rntAnchor_update)
617-
tkey32_anchor_update[:fObjlen] = rnt_anchor_obs.len + magic_6bytes_obs.len
618-
Base.setindex!(tkey32_anchor_obs1, tkey32_anchor_update)
619-
620-
tdirectory32_obs[:fSeekKeys] = UInt32(position(file))
621-
tkey32_TDirectory_obs = rnt_write_observe(file, Stubs.tkey32_TDirectory)
622-
n_keys_obs = rnt_write_observe(file, Stubs.n_keys)
623-
tkey32_anchor_obs2 = rnt_write_observe(file, tkey32_anchor)
624-
Base.setindex!(tkey32_anchor_obs2, tkey32_anchor_update)
625-
678+
footer_bytes = _buffer_bytes(io -> rnt_write(io, rnt_footer))
679+
fSeekFooter = _write_rblob(file, footer_bytes)
680+
681+
# anchor: all locator values are known by now, no patching needed
682+
rnt_anchor = UnROOT.ROOT_3a3a_RNTuple(1, 0, 0, 0,
683+
fSeekHeader, length(header_bytes), length(header_bytes),
684+
fSeekFooter, length(footer_bytes), length(footer_bytes),
685+
0x0000000040000000, 0) # checksum computed in rnt_write
686+
tkey32_anchor = UnROOT.TKey32(klen_anchor + anchor_objlen, 4, anchor_objlen, Stubs.WRITE_TIME,
687+
klen_anchor, 1, position(file), 100, "ROOT::RNTuple", rntuple_name, "")
688+
rnt_write(file, tkey32_anchor)
689+
rnt_write(file, Stubs.magic_6bytes)
690+
rnt_write(file, rnt_anchor)
691+
692+
# directory key listing (1 key: the anchor)
693+
tdirectory32_obs[:fSeekKeys] = Int32(position(file))
694+
rnt_write(file, UnROOT.TKey32(fNbytesKeys, 4, Int32(4 + klen_anchor), Stubs.WRITE_TIME,
695+
klen_dir, 1, position(file), 100, "", file_name, ""))
696+
rnt_write(file, Stubs.n_keys)
697+
rnt_write(file, tkey32_anchor)
698+
699+
# streamer info (constant compressed TList blob)
626700
fileheader_obs[:fSeekInfo] = UInt32(position(file))
627-
tkey32_TStreamerInfo_obs = rnt_write_observe(file, Stubs.tkey32_TStreamerInfo)
628-
tsreamerinfo_compressed_obs = rnt_write_observe(file, Stubs.tsreamerinfo_compressed)
629-
fileheader_obs[:fSeekFree] = UInt32(position(file))
630-
tfile_end_obs = rnt_write_observe(file, Stubs.tfile_end)
631-
fileheader_obs[:fEND] = UInt32(position(file))
632-
633-
flush!(RBlob1_obs)
634-
flush!(tkey32_anchor_obs1)
635-
flush!(tkey32_anchor_obs2)
636-
flush!(tkey32_tfile_obs)
637-
flush!(tdirectory32_obs)
701+
rnt_write(file, UnROOT.TKey32(fNbytesInfo, 4, 1254, Stubs.WRITE_TIME,
702+
64, 1, position(file), 100, "TList", "StreamerInfo", "Doubly linked list"))
703+
rnt_write(file, Stubs.tsreamerinfo_compressed)
704+
705+
# free-segments record: one segment [fEND, 2000000000]
706+
fSeekFree = position(file)
707+
fileheader_obs[:fSeekFree] = UInt32(fSeekFree)
708+
fEND = fSeekFree + fNbytesFree
709+
rnt_write(file, UnROOT.TKey32(fNbytesFree, 4, 10, Stubs.WRITE_TIME,
710+
klen_end, 1, fSeekFree, 100, "", file_name, ""))
711+
rnt_write(file, UInt16(1); legacy=true) # TFree version
712+
rnt_write(file, UInt32(fEND); legacy=true) # first free byte
713+
rnt_write(file, UInt32(2000000000); legacy=true)
714+
@assert position(file) == fEND
715+
fileheader_obs[:fEND] = UInt32(fEND)
716+
638717
flush!(fileheader_obs)
639-
flush!(rnt_anchor_obs)
718+
flush!(tdirectory32_obs)
640719
end

src/RNTuple/Writing/page_writing.jl

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,14 @@ function rnt_ary_to_page(ary::AbstractVector, cr::ColumnRecord) end
2929

3030
function rnt_ary_to_page(ary::AbstractVector{Bool}, cr::ColumnRecord)
3131
chunks = BitVector(ary).chunks
32-
Page_write(reinterpret(UInt8, chunks))
32+
bytes = reinterpret(UInt8, chunks)
33+
# bit-packed pages store exactly ceil(n/8) bytes; BitVector chunks are
34+
# 64-bit padded, so truncate (unused trailing bits are zero by invariant)
35+
Page_write(bytes[1:cld(length(ary), 8)], Int32(length(ary)))
3336
end
3437

3538
function rnt_ary_to_page(ary::AbstractVector{T}, cr::ColumnRecord) where {T<:Number}
36-
Page_write(page_encode(ary, cr))
39+
Page_write(page_encode(ary, cr), Int32(length(ary)))
3740
end
3841

3942
function page_encode(ary::AbstractVector{T}, cr::ColumnRecord) where {T}

src/RNTuple/bootstrap.jl

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -91,11 +91,14 @@ function decompress_bytes!(uncomp_data, compbytes, NTarget::Integer)
9191
# skip checksum which is 8 bytes
9292
# original: lz4_decompress(rawbytes[9:end], uncompbytes)
9393
input = @view rawbytes[9:end]
94-
input_ptr = pointer(input)
95-
input_size = length(input)
96-
output_ptr = pointer(uncomp_data) + fulfilled
97-
output_size = uncompbytes
98-
_decompress_lz4!(input_ptr, input_size, output_ptr, output_size)
94+
# raw Ptr arguments do not root their parent arrays in the ccall
95+
GC.@preserve rawbytes uncomp_data begin
96+
input_ptr = pointer(input)
97+
input_size = length(input)
98+
output_ptr = pointer(uncomp_data) + fulfilled
99+
output_size = uncompbytes
100+
_decompress_lz4!(input_ptr, input_size, output_ptr, output_size)
101+
end
99102
elseif cname == @SVector UInt8['Z', 'L']
100103
output = @view(uncomp_data[fulfilled+1:fulfilled+uncompbytes])
101104
zlib_decompress!(Decompressor(), output, rawbytes, uncompbytes)

0 commit comments

Comments
 (0)