Skip to content

Cluster-based scheduling of RNTuple processing in distributed mode #15152

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 42 additions & 10 deletions bindings/experimental/distrdf/python/DistRDF/HeadNode.py
Original file line number Diff line number Diff line change
Expand Up @@ -825,7 +825,7 @@ def __init__(self, backend: BaseBackend, npartitions: Optional[int], localdf: RO
super().__init__(backend, npartitions, localdf)

self.mainntuplename = args[0]
self.inputfiles = args[1]
self.inputfiles = [args[1]] if isinstance(args[1], str) else args[1]
# Keep this in accordance with the implementation of TTree for now
self.subnames = [self.mainntuplename] * len(self.inputfiles)

Expand All @@ -838,28 +838,35 @@ def _build_ranges(self) -> List[Ranges.DataRange]:
# RDataFrame instance is going to process RNTuple data and the computation
# graph needs to be recreated at every task
self.exec_id = _graph_cache.ExecutionIdentifier("RNTuple", self.exec_id.graph_uuid)
return Ranges.get_ntuple_ranges(self.mainntuplename, self.inputfiles, self.npartitions, self.exec_id)
return Ranges.get_percentage_ranges(self.subnames, self.inputfiles, self.npartitions, None, self.exec_id)

def _generate_rdf_creator(self) -> Callable[[Ranges.DataRange], TaskObjects]:
"""
Generates a function that is responsible for building an instance of
RDataFrame on a distributed mapper for a given entry range. Specific for
the RNtuple data source.
"""

def build_rdf_from_range(current_range: Ranges.get_ntuple_ranges) -> TaskObjects:
def build_rdf_from_range(current_range: Ranges.TreeRangePerc) -> TaskObjects:
"""
Builds an RDataFrame instance for a distributed mapper.

The function creates a TChain from the information contained in the
input range object. If the chain cannot be built, returns None.
"""
ntuplename, filenames = current_range.ntuplename, current_range.filenames
if not filenames:
return TaskObjects(None, None)
clustered_range, entries_in_rntuples = Ranges.get_clustered_range_from_percs(
current_range)

rdf_toprocess = ROOT.RDF.Experimental.FromRNTuple(ntuplename, filenames)
return TaskObjects(rdf_toprocess, None)

if clustered_range is None:
return TaskObjects(None, entries_in_rntuples)

rdf_toprocess = ROOT.Internal.RDF.FromRNTuple(
clustered_range.treenames[0], # Only one RNTuple name for the whole dataset
clustered_range.filenames,
(clustered_range.globalstart, clustered_range.globalend)
)
return TaskObjects(rdf_toprocess, entries_in_rntuples)

return build_rdf_from_range

Expand All @@ -873,4 +880,29 @@ def _handle_returned_values(self, values: TaskResult) -> Iterable:
raise RuntimeError("The distributed execution returned no values. "
"This can happen if all files in your dataset contain empty trees.")

return values.mergeables
# User could have requested to read the same file multiple times indeed
input_files_and_trees = [
f"{filename}/{ntuplename}" for filename, ntuplename in zip(self.inputfiles, self.subnames)
]
files_counts = Counter(input_files_and_trees)

entries_in_rntuples = values.entries_in_trees
# Keys should be exactly the same
if files_counts.keys() != entries_in_rntuples.trees_with_entries.keys():
raise RuntimeError("The specified input files and the files that were "
"actually processed are not the same:\n"
f"Input files: {list(files_counts.keys())}\n"
f"Processed files: {list(entries_in_rntuples.trees_with_entries.keys())}")

# Multiply the entries of each tree by the number of times it was
# requested by the user
for fullpath in files_counts:
entries_in_rntuples.trees_with_entries[fullpath] *= files_counts[fullpath]

total_dataset_entries = sum(
entries_in_rntuples.trees_with_entries.values())
if entries_in_rntuples.processed_entries != total_dataset_entries:
raise RuntimeError(f"The dataset has {total_dataset_entries} entries, "
f"but {entries_in_rntuples.processed_entries} were processed.")

return values.mergeables
42 changes: 22 additions & 20 deletions bindings/experimental/distrdf/python/DistRDF/Ranges.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,23 +127,6 @@ class TaskTreeEntries:
processed_entries: int = 0
trees_with_entries: Dict[str, int] = field(default_factory=dict)

@dataclass
class RNTupleFileRange(DataRange):
ntuplename: str
filenames: List[str] = field(default_factory=list)

def split_equal_size(filenames: Iterable[str], npartitions: int):
"""Function to split the list of filenames into exactly N chunks of approximately equal size."""
quotient, remainder = divmod(len(filenames), npartitions)
return (filenames[i*quotient+min(i, remainder):(i+1)*quotient+min(i+1, remainder)] for i in range(npartitions))

def get_ntuple_ranges(ntuplename, filenames, npartitions, exec_id):

files_by_partition = split_equal_size(filenames, npartitions)
return [
RNTupleFileRange(exec_id, range_id, ntuplename, files_in_partition)
for range_id, files_in_partition in enumerate(files_by_partition)
]

def get_balanced_ranges(nentries, npartitions, exec_id: ExecutionIdentifier):
"""
Expand Down Expand Up @@ -188,8 +171,7 @@ def get_balanced_ranges(nentries, npartitions, exec_id: ExecutionIdentifier):

return ranges


def get_clusters_and_entries(treename: str, filename: str) -> Tuple[List[int], int]:
def get_ttree_clusters_and_entries(treename: str, filename: str) -> Tuple[List[int], int]:
"""
Retrieve cluster boundaries and number of entries of a TTree.
"""
Expand All @@ -210,6 +192,26 @@ def get_clusters_and_entries(treename: str, filename: str) -> Tuple[List[int], i
return clusters, entries


def get_rntuple_clusters_and_entries(ntuplename: str, filename: str) -> Tuple[List[int], int]:
"""
Retrieve cluster boundaries and number of entries of an RNTuple.
"""
clusters_and_entries = ROOT.Internal.RDF.GetClustersAndEntries(
ntuplename, filename)
boundaries = (
[0] +
[cluster.second for cluster in clusters_and_entries.first]
)
return boundaries, clusters_and_entries.second


def get_clusters_and_entries(rdf_uuid: str, datasetname: str, filename: str) -> Tuple[List[int], int]:
if rdf_uuid == "RNTuple":
return get_rntuple_clusters_and_entries(datasetname, filename)
else:
return get_ttree_clusters_and_entries(datasetname, filename)


def get_percentage_ranges(treenames: List[str], filenames: List[str], npartitions: int,
friendinfo: Optional[ROOT.Internal.TreeUtils.RFriendInfo],
exec_id: ExecutionIdentifier) -> List[TreeRangePerc]:
Expand Down Expand Up @@ -360,7 +362,7 @@ def get_clustered_range_from_percs(percrange: TreeRangePerc) -> Tuple[Optional[T
# are friends, all files in the dataset are opened and their number of
# entries are retrieved in order to ensure friend alignment.
all_clusters_entries = (
get_clusters_and_entries(treename, filename)
get_clusters_and_entries(percrange.exec_id.rdf_uuid, treename, filename)
for treename, filename in zip(percrange.treenames, percrange.filenames)
)
all_clusters, all_entries = zip(*all_clusters_entries)
Expand Down
42 changes: 23 additions & 19 deletions bindings/experimental/distrdf/test/test_ranges.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@

import ROOT

from collections import namedtuple

# Use a dummy execution identifier to test for TTree-based clustered ranges
Dummy = namedtuple("Dummy", ["rdf_uuid"])

def emptysourceranges_to_tuples(ranges):
"""Convert EmptySourceRange objects to tuples with the shape (start, end)"""
Expand Down Expand Up @@ -34,11 +38,11 @@ def test_nentries_multipleOf_npartitions(self):
npartitions_large = 10

# First case
rng = Ranges.get_balanced_ranges(nentries_small, npartitions_small, exec_id=None)
rng = Ranges.get_balanced_ranges(nentries_small, npartitions_small, exec_id=Dummy("dummy"))
ranges_small = emptysourceranges_to_tuples(rng)

# Second case
rng = Ranges.get_balanced_ranges(nentries_large, npartitions_large, exec_id=None)
rng = Ranges.get_balanced_ranges(nentries_large, npartitions_large, exec_id=Dummy("dummy"))
ranges_large = emptysourceranges_to_tuples(rng)

ranges_small_reqd = [(0, 2), (2, 4), (4, 6), (6, 8), (8, 10)]
Expand Down Expand Up @@ -70,12 +74,12 @@ def test_nentries_not_multipleOf_npartitions(self):

# Example in which fractional part of
# (nentries/npartitions) >= 0.5
rng = Ranges.get_balanced_ranges(nentries_1, npartitions, exec_id=None)
rng = Ranges.get_balanced_ranges(nentries_1, npartitions, exec_id=Dummy("dummy"))
ranges_1 = emptysourceranges_to_tuples(rng)

# Example in which fractional part of
# (nentries/npartitions) < 0.5
rng = Ranges.get_balanced_ranges(nentries_2, npartitions, exec_id=None)
rng = Ranges.get_balanced_ranges(nentries_2, npartitions, exec_id=Dummy("dummy"))
ranges_2 = emptysourceranges_to_tuples(rng)

# Required output pairs
Expand All @@ -94,7 +98,7 @@ def test_nentries_greater_than_npartitions(self):
nentries = 5
npartitions = 7

rng = Ranges.get_balanced_ranges(nentries, npartitions, exec_id=None)
rng = Ranges.get_balanced_ranges(nentries, npartitions, exec_id=Dummy("dummy"))
ranges = emptysourceranges_to_tuples(rng)

ranges_reqd = [(0, 1), (1, 2), (2, 3), (3, 4), (4, 5)]
Expand Down Expand Up @@ -157,7 +161,7 @@ def test_clustered_ranges_with_one_cluster(self):
filenames = ["backend/Slimmed_ntuple.root"]
npartitions = 1

percranges = Ranges.get_percentage_ranges(treenames, filenames, npartitions, friendinfo=None, exec_id=None)
percranges = Ranges.get_percentage_ranges(treenames, filenames, npartitions, friendinfo=None, exec_id=Dummy("dummy"))
clusteredranges = [Ranges.get_clustered_range_from_percs(percrange)[0] for percrange in percranges]

ranges = treeranges_to_tuples(clusteredranges)
Expand All @@ -180,7 +184,7 @@ def test_npartitions_greater_than_clusters(self):
filenames = ["backend/Slimmed_ntuple.root"]
npartitions = 2

percranges = Ranges.get_percentage_ranges(treenames, filenames, npartitions, friendinfo=None, exec_id=None)
percranges = Ranges.get_percentage_ranges(treenames, filenames, npartitions, friendinfo=None, exec_id=Dummy("dummy"))
clusteredranges = [Ranges.get_clustered_range_from_percs(percrange)[0] for percrange in percranges]

# We return one task per partition
Expand All @@ -204,7 +208,7 @@ def test_clustered_ranges_with_two_clusters_two_partitions(self):
filenames = ["backend/2clusters.root"]
npartitions = 2

percranges = Ranges.get_percentage_ranges(treenames, filenames, npartitions, friendinfo=None, exec_id=None)
percranges = Ranges.get_percentage_ranges(treenames, filenames, npartitions, friendinfo=None, exec_id=Dummy("dummy"))
clusteredranges = [Ranges.get_clustered_range_from_percs(percrange)[0] for percrange in percranges]
ranges = treeranges_to_tuples(clusteredranges)

Expand All @@ -227,7 +231,7 @@ def test_rdataframe_with_treename_and_filename_with_globbing(self):
expected_inputfiles = ["backend/2clusters.root"]
extracted_inputfiles = rdf.inputfiles

percranges = Ranges.get_percentage_ranges([treename], extracted_inputfiles, npartitions, friendinfo=None, exec_id=None)
percranges = Ranges.get_percentage_ranges([treename], extracted_inputfiles, npartitions, friendinfo=None, exec_id=Dummy("dummy"))
clusteredranges = [Ranges.get_clustered_range_from_percs(percrange)[0] for percrange in percranges]
ranges = treeranges_to_tuples(clusteredranges)

Expand Down Expand Up @@ -258,7 +262,7 @@ def test_rdataframe_with_notreename_and_chain_with_subtrees(self):
extracted_filenames = rdf.inputfiles

percranges = Ranges.get_percentage_ranges(
extracted_subtreenames, extracted_filenames, npartitions, friendinfo=None, exec_id=None)
extracted_subtreenames, extracted_filenames, npartitions, friendinfo=None, exec_id=Dummy("dummy"))
clusteredranges = [Ranges.get_clustered_range_from_percs(percrange)[0] for percrange in percranges]
ranges = treeranges_to_tuples(clusteredranges)

Expand All @@ -281,7 +285,7 @@ def test_clustered_ranges_with_four_clusters_four_partitions(self):
filenames = ["backend/4clusters.root"]
npartitions = 4

percranges = Ranges.get_percentage_ranges(treenames, filenames, npartitions, friendinfo=None, exec_id=None)
percranges = Ranges.get_percentage_ranges(treenames, filenames, npartitions, friendinfo=None, exec_id=Dummy("dummy"))
clusteredranges = [Ranges.get_clustered_range_from_percs(percrange)[0] for percrange in percranges]

ranges = treeranges_to_tuples(clusteredranges)
Expand All @@ -304,7 +308,7 @@ def test_clustered_ranges_with_many_clusters_four_partitions(self):
filenames = ["backend/1000clusters.root"]
npartitions = 4

percranges = Ranges.get_percentage_ranges(treenames, filenames, npartitions, friendinfo=None, exec_id=None)
percranges = Ranges.get_percentage_ranges(treenames, filenames, npartitions, friendinfo=None, exec_id=Dummy("dummy"))
clusteredranges = [Ranges.get_clustered_range_from_percs(percrange)[0] for percrange in percranges]

ranges = treeranges_to_tuples(clusteredranges)
Expand All @@ -327,7 +331,7 @@ def test_clustered_ranges_with_many_clusters_many_partitions(self):
filenames = ["backend/1000clusters.root"]
npartitions = 1000

percranges = Ranges.get_percentage_ranges(treenames, filenames, npartitions, friendinfo=None, exec_id=None)
percranges = Ranges.get_percentage_ranges(treenames, filenames, npartitions, friendinfo=None, exec_id=Dummy("dummy"))
clusteredranges = [Ranges.get_clustered_range_from_percs(percrange)[0] for percrange in percranges]

ranges = treeranges_to_tuples(clusteredranges)
Expand All @@ -349,7 +353,7 @@ def test_clustered_ranges_with_two_files(self):
filenames = ["backend/2clusters.root", "backend/4clusters.root"]
npartitions = 2

percranges = Ranges.get_percentage_ranges(treenames, filenames, npartitions, friendinfo=None, exec_id=None)
percranges = Ranges.get_percentage_ranges(treenames, filenames, npartitions, friendinfo=None, exec_id=Dummy("dummy"))
clusteredranges = [Ranges.get_clustered_range_from_percs(percrange)[0] for percrange in percranges]

ranges = treeranges_to_tuples(clusteredranges)
Expand All @@ -366,7 +370,7 @@ def test_three_files_one_partition(self):
filenames = [f"distrdf_unittests_file_{i}.root" for i in range(nfiles)]
npartitions = 1

percranges = Ranges.get_percentage_ranges(treenames, filenames, npartitions, friendinfo=None, exec_id=None)
percranges = Ranges.get_percentage_ranges(treenames, filenames, npartitions, friendinfo=None, exec_id=Dummy("dummy"))
clusteredranges = [Ranges.get_clustered_range_from_percs(percrange)[0] for percrange in percranges]

ranges = treeranges_to_tuples(clusteredranges)
Expand All @@ -382,7 +386,7 @@ def test_three_files_one_partition_per_file(self):
filenames = [f"distrdf_unittests_file_{i}.root" for i in range(nfiles)]
npartitions = nfiles

percranges = Ranges.get_percentage_ranges(treenames, filenames, npartitions, friendinfo=None, exec_id=None)
percranges = Ranges.get_percentage_ranges(treenames, filenames, npartitions, friendinfo=None, exec_id=Dummy("dummy"))
clusteredranges = [Ranges.get_clustered_range_from_percs(percrange)[0] for percrange in percranges]

ranges = treeranges_to_tuples(clusteredranges)
Expand All @@ -398,7 +402,7 @@ def test_three_files_two_partitions_per_file(self):
treenames = [f"tree_{i}" for i in range(nfiles)]
filenames = [f"distrdf_unittests_file_{i}.root" for i in range(nfiles)]
npartitions = nfiles * 2
percranges = Ranges.get_percentage_ranges(treenames, filenames, npartitions, friendinfo=None, exec_id=None)
percranges = Ranges.get_percentage_ranges(treenames, filenames, npartitions, friendinfo=None, exec_id=Dummy("dummy"))
clusteredranges = [Ranges.get_clustered_range_from_percs(percrange)[0] for percrange in percranges]

ranges = treeranges_to_tuples(clusteredranges)
Expand All @@ -425,7 +429,7 @@ def test_three_files_partitions_equal_clusters(self):
filenames = [f"distrdf_unittests_file_{i}.root" for i in range(nfiles)]
npartitions = nfiles * 10 # trees have 10 clusters

percranges = Ranges.get_percentage_ranges(treenames, filenames, npartitions, friendinfo=None, exec_id=None)
percranges = Ranges.get_percentage_ranges(treenames, filenames, npartitions, friendinfo=None, exec_id=Dummy("dummy"))
clusteredranges = [Ranges.get_clustered_range_from_percs(percrange)[0] for percrange in percranges]

ranges = treeranges_to_tuples(clusteredranges)
Expand Down Expand Up @@ -473,7 +477,7 @@ def test_three_files_partitions_greater_than_clusters(self):
filenames = [f"distrdf_unittests_file_{i}.root" for i in range(nfiles)]
npartitions = 42

percranges = Ranges.get_percentage_ranges(treenames, filenames, npartitions, friendinfo=None, exec_id=None)
percranges = Ranges.get_percentage_ranges(treenames, filenames, npartitions, friendinfo=None, exec_id=Dummy("dummy"))
clusteredranges = [Ranges.get_clustered_range_from_percs(percrange)[0] for percrange in percranges]

# We return one task per partition
Expand Down
Loading
Loading