Skip to content
This repository was archived by the owner on Jan 12, 2026. It is now read-only.

Commit b5d725a

Browse files
fschlimbYard1
andauthored
Supporting __partitioned__ interface (#153)
Co-authored-by: Antoni Baum <antoni.baum@protonmail.com>
1 parent 7dbf7f1 commit b5d725a

File tree

6 files changed

+369
-7
lines changed

6 files changed

+369
-7
lines changed

run_ci_examples.sh

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ echo "running simple_dask.py" && python simple_dask.py --smoke-test
2929
echo "running simple_modin.py" && python simple_modin.py --smoke-test
3030
echo "running simple_objectstore.py" && python simple_objectstore.py --smoke-test
3131
echo "running simple_ray_dataset.py" && python simple_objectstore.py --smoke-test
32+
echo "running simple_partitioned.py" && python simple_partitioned.py --smoke-test
3233

3334
if [ "$TUNE" = "1" ]; then
3435
echo "running simple_tune.py" && python simple_tune.py --smoke-test
@@ -42,4 +43,4 @@ popd
4243
pushd xgboost_ray/tests
4344
echo "running examples with Ray Client"
4445
python -m pytest -v --durations=0 -x test_client.py
45-
popd || exit 1
46+
popd || exit 1

xgboost_ray/data_sources/__init__.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,15 @@
99
from xgboost_ray.data_sources.parquet import Parquet
1010
from xgboost_ray.data_sources.object_store import ObjectStore
1111
from xgboost_ray.data_sources.ray_dataset import RayDataset
12+
from xgboost_ray.data_sources.partitioned import Partitioned
1213

1314
data_sources = [
14-
Numpy, Pandas, Modin, Dask, MLDataset, Petastorm, CSV, Parquet,
15-
ObjectStore, RayDataset
15+
Numpy, Pandas, Partitioned, Modin, Dask, MLDataset, Petastorm, CSV,
16+
Parquet, ObjectStore, RayDataset
1617
]
1718

1819
__all__ = [
1920
"DataSource", "RayFileType", "Numpy", "Pandas", "Modin", "Dask",
20-
"MLDataset", "Petastorm", "CSV", "Parquet", "ObjectStore", "RayDataset"
21+
"MLDataset", "Petastorm", "CSV", "Parquet", "ObjectStore", "RayDataset",
22+
"Partitioned"
2123
]
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
from typing import Any, Optional, Sequence, Dict, Tuple
2+
3+
from collections import defaultdict
4+
import pandas as pd
5+
import numpy as np
6+
7+
from ray import ObjectRef
8+
from ray.actor import ActorHandle
9+
10+
from xgboost_ray.data_sources._distributed import \
11+
assign_partitions_to_actors, get_actor_rank_ips
12+
from xgboost_ray.data_sources.data_source import DataSource, RayFileType
13+
from xgboost_ray.data_sources.pandas import Pandas
14+
from xgboost_ray.data_sources.numpy import Numpy
15+
16+
17+
class Partitioned(DataSource):
18+
"""Read from distributed data structure implementing __partitioned__.
19+
20+
__partitioned__ provides meta data about how the data is partitioned and
21+
distributed across several compute nodes, making supporting objects them
22+
suitable for distributed loading.
23+
24+
Also see the __partitioned__ spec:
25+
https://github.com/IntelPython/DPPY-Spec/blob/draft/partitioned/Partitioned.md
26+
"""
27+
supports_central_loading = True
28+
supports_distributed_loading = True
29+
30+
@staticmethod
31+
def is_data_type(data: Any,
32+
filetype: Optional[RayFileType] = None) -> bool:
33+
return hasattr(data, "__partitioned__")
34+
35+
@staticmethod
36+
def load_data(
37+
data: Any, # __partitioned__ dict
38+
ignore: Optional[Sequence[str]] = None,
39+
indices: Optional[Sequence[ObjectRef]] = None,
40+
**kwargs) -> pd.DataFrame:
41+
42+
assert isinstance(data, dict), "Expected __partitioned__ dict"
43+
_get = data["get"]
44+
45+
if indices is None or len(indices) == 0:
46+
tiling = data["partition_tiling"]
47+
ndims = len(tiling)
48+
# we need tuples to access partitions in the right order
49+
pos_suffix = (0, ) * (ndims - 1)
50+
parts = data["partitions"]
51+
# get the full data, e.g. all shards/partitions
52+
local_df = [
53+
_get(parts[(i, ) + pos_suffix]["data"])
54+
for i in range(tiling[0])
55+
]
56+
else:
57+
# here we got a list of futures for partitions
58+
local_df = _get(indices)
59+
60+
if isinstance(local_df[0], pd.DataFrame):
61+
return Pandas.load_data(
62+
pd.concat(local_df, copy=False), ignore=ignore)
63+
else:
64+
return Numpy.load_data(np.concatenate(local_df), ignore=ignore)
65+
66+
@staticmethod
67+
def get_actor_shards(
68+
data: Any, # partitioned.pandas.DataFrame
69+
actors: Sequence[ActorHandle]) -> \
70+
Tuple[Any, Optional[Dict[int, Any]]]:
71+
assert hasattr(data, "__partitioned__")
72+
73+
actor_rank_ips = get_actor_rank_ips(actors)
74+
75+
# Get accessor func and partitions
76+
parted = data.__partitioned__
77+
parts = parted["partitions"]
78+
tiling = parted["partition_tiling"]
79+
ndims = len(tiling)
80+
if ndims < 1 or ndims > 2 or any(tiling[x] != 1
81+
for x in range(1, ndims)):
82+
raise RuntimeError(
83+
"Only row-wise partitionings of 1d/2d structures supported.")
84+
85+
# Now build a table mapping from IP to list of partitions
86+
ip_to_parts = defaultdict(lambda: [])
87+
# we need tuples to access partitions in the right order
88+
pos_suffix = (0, ) * (ndims - 1)
89+
for i in range(tiling[0]):
90+
part = parts[(i, ) + pos_suffix] # this works for 1d and 2d
91+
ip_to_parts[part["location"][0]].append(part["data"])
92+
# __partitioned__ is serializable, so pass it here
93+
# as the first return value
94+
ret = parted, assign_partitions_to_actors(ip_to_parts, actor_rank_ips)
95+
return ret
96+
97+
@staticmethod
98+
def get_n(data: Any):
99+
"""Get length of data source partitions for sharding."""
100+
return data.__partitioned__["shape"][0]
Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
import argparse
2+
3+
from sklearn import datasets
4+
from sklearn.model_selection import train_test_split
5+
6+
import numpy as np
7+
8+
import ray
9+
10+
from xgboost_ray import RayDMatrix, train, RayParams
11+
12+
nc = 31
13+
14+
15+
@ray.remote
16+
class AnActor:
17+
"""We mimic a distributed DF by having several actors create
18+
data which form the global DF.
19+
"""
20+
21+
@ray.method(num_returns=2)
22+
def genData(self, rank, nranks, nrows):
23+
"""Generate global dataset and cut out local piece.
24+
In real life each actor would of course directly create local data.
25+
"""
26+
# Load dataset
27+
data, labels = datasets.load_breast_cancer(return_X_y=True)
28+
# Split into train and test set
29+
train_x, _, train_y, _ = train_test_split(data, labels, test_size=0.25)
30+
train_y = train_y.reshape((train_y.shape[0], 1))
31+
train = np.hstack([train_x, train_y])
32+
assert nrows <= train.shape[0]
33+
assert nc == train.shape[1]
34+
sz = nrows // nranks
35+
return train[sz * rank:sz * (rank + 1)], ray.util.get_node_ip_address()
36+
37+
38+
class Parted:
39+
"""Class exposing __partitioned__
40+
"""
41+
42+
def __init__(self, parted):
43+
self.__partitioned__ = parted
44+
45+
46+
def main(cpus_per_actor, num_actors):
47+
nr = 424
48+
actors = [AnActor.remote() for _ in range(num_actors)]
49+
parts = [
50+
actors[i].genData.remote(i, num_actors, nr) for i in range(num_actors)
51+
]
52+
rowsperpart = nr // num_actors
53+
nr = rowsperpart * num_actors
54+
parted = Parted({
55+
"shape": (nr, nc),
56+
"partition_tiling": (num_actors, 1),
57+
"get": lambda x: ray.get(x),
58+
"partitions": {(i, 0): {
59+
"start": (i * rowsperpart, 0),
60+
"shape": (rowsperpart, nc),
61+
"data": parts[i][0],
62+
"location": [ray.get(parts[i][1])],
63+
}
64+
for i in range(num_actors)}
65+
})
66+
67+
yl = nc - 1
68+
# Let's create DMatrix from our __partitioned__ structure
69+
train_set = RayDMatrix(parted, f"f{yl}")
70+
71+
evals_result = {}
72+
# Set XGBoost config.
73+
xgboost_params = {
74+
"tree_method": "approx",
75+
"objective": "binary:logistic",
76+
"eval_metric": ["logloss", "error"],
77+
}
78+
79+
# Train the classifier
80+
bst = train(
81+
params=xgboost_params,
82+
dtrain=train_set,
83+
evals=[(train_set, "train")],
84+
evals_result=evals_result,
85+
ray_params=RayParams(
86+
max_actor_restarts=0,
87+
gpus_per_actor=0,
88+
cpus_per_actor=cpus_per_actor,
89+
num_actors=num_actors),
90+
verbose_eval=False,
91+
num_boost_round=10)
92+
93+
model_path = "partitioned.xgb"
94+
bst.save_model(model_path)
95+
print("Final training error: {:.4f}".format(
96+
evals_result["train"]["error"][-1]))
97+
98+
99+
if __name__ == "__main__":
100+
parser = argparse.ArgumentParser()
101+
parser.add_argument(
102+
"--address",
103+
required=False,
104+
type=str,
105+
help="the address to use for Ray")
106+
parser.add_argument(
107+
"--server-address",
108+
required=False,
109+
type=str,
110+
help="Address of the remote server if using Ray Client.")
111+
parser.add_argument(
112+
"--cpus-per-actor",
113+
type=int,
114+
default=1,
115+
help="Sets number of CPUs per xgboost training worker.")
116+
parser.add_argument(
117+
"--num-actors",
118+
type=int,
119+
default=4,
120+
help="Sets number of xgboost workers to use.")
121+
parser.add_argument(
122+
"--smoke-test", action="store_true", default=False, help="gpu")
123+
124+
args, _ = parser.parse_known_args()
125+
126+
if not ray.is_initialized():
127+
if args.smoke_test:
128+
ray.init(num_cpus=args.num_actors + 1)
129+
elif args.server_address:
130+
ray.util.connect(args.server_address)
131+
else:
132+
ray.init(address=args.address)
133+
134+
main(args.cpus_per_actor, args.num_actors)

xgboost_ray/matrix.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -394,8 +394,8 @@ def get_data_source(self) -> Type[DataSource]:
394394

395395
# Todo (krfricke): It would be good to have a more general way to
396396
# check for compatibility here. Combine with test below?
397-
if not isinstance(self.data,
398-
(Iterable, MLDataset, RayDataset)) or invalid_data:
397+
if not (isinstance(self.data, (Iterable, MLDataset, RayDataset))
398+
or hasattr(self.data, "__partitioned__")) or invalid_data:
399399
raise ValueError(
400400
f"Distributed data loading only works with already "
401401
f"distributed datasets. These should be specified through a "

0 commit comments

Comments
 (0)