Skip to content
This repository was archived by the owner on Jan 12, 2026. It is now read-only.

Commit 61d293e

Browse files
Add support for distributed dataset loading using Modin (#67)
* Re-factor data loading structure * Better tests, better documentation * Assign partitions to actor WIP * Update xgboost_ray/data_sources/data_source.py Co-authored-by: Richard Liaw <rliaw@berkeley.edu> * Update xgboost_ray/data_sources/data_source.py Co-authored-by: Richard Liaw <rliaw@berkeley.edu> * Update xgboost_ray/data_sources/data_source.py Co-authored-by: Richard Liaw <rliaw@berkeley.edu> * Update docs * Move data source fetching into separate function * Add partition assignment unit test * Add cluster test * Resolve breaking api change * Resolve breaking api change (cont) * Resolve breaking api change (cont) * Load according to data shards * Fix tests, add example * Assign partitions to actor WIP * Move data source fetching into separate function * Add partition assignment unit test * Add cluster test * Load according to data shards * Fix tests, add example * Merge * Fix tests * Docs * Update requirements * Disable cache dir for pip requirements * modin wheels * Only enable for modin >= 0.9.0 * Modin check in test matrix * skip test if modin is not installed * Disable modin example if incompatible * More CPUs * Increase test size because of additional tests/examples Co-authored-by: Richard Liaw <rliaw@berkeley.edu>
1 parent 6166dd5 commit 61d293e

File tree

12 files changed

+774
-71
lines changed

12 files changed

+774
-71
lines changed

.github/workflows/test.yaml

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ jobs:
3030
3131
test_linux_ray_master:
3232
runs-on: ubuntu-latest
33-
timeout-minutes: 12
33+
timeout-minutes: 14
3434
strategy:
3535
matrix:
3636
python-version: [3.6.9, 3.7, 3.8]
@@ -64,6 +64,7 @@ jobs:
6464
pushd xgboost_ray/tests
6565
python -m pytest -vv -s --log-cli-level=DEBUG --durations=0 -x test_colocation.py
6666
python -m pytest -v --durations=0 -x test_matrix.py
67+
python -m pytest -v --durations=0 -x test_data_source.py
6768
python -m pytest -v --durations=0 -x test_xgboost_api.py
6869
python -m pytest -v --durations=0 -x test_fault_tolerance.py
6970
python -m pytest -v --durations=0 -x test_end_to_end.py
@@ -74,13 +75,14 @@ jobs:
7475
ray stop || true
7576
echo "running simple.py" && python simple.py --smoke-test
7677
echo "running simple_predict.py" && python simple_predict.py
78+
echo "running simple_modin.py" && python simple_modin.py --smoke-test
7779
echo "running simple_tune.py" && python simple_tune.py --smoke-test
7880
echo "running train_on_test_data.py" && python train_on_test_data.py --smoke-test
7981
# for f in *.py; do echo "running $f" && python "$f" || exit 1 ; done
8082

8183
test_linux_ray_release:
8284
runs-on: ubuntu-latest
83-
timeout-minutes: 12
85+
timeout-minutes: 14
8486
strategy:
8587
matrix:
8688
python-version: [3.6.9, 3.7, 3.8]
@@ -95,7 +97,7 @@ jobs:
9597
python -m pip install --upgrade pip
9698
python -m pip install codecov
9799
python -m pip install -U ray
98-
if [ -f requirements-test.txt ]; then python -m pip install -r requirements-test.txt; fi
100+
if [ -f requirements-test.txt ]; then python -m pip install -r requirements-test.txt; fi
99101
- name: Install package
100102
run: |
101103
python -m pip install -e .
@@ -107,6 +109,7 @@ jobs:
107109
pushd xgboost_ray/tests
108110
python -m pytest -vv -s --log-cli-level=DEBUG --durations=0 -x test_colocation.py
109111
python -m pytest -v --durations=0 -x test_matrix.py
112+
python -m pytest -v --durations=0 -x test_data_source.py
110113
python -m pytest -v --durations=0 -x test_xgboost_api.py
111114
python -m pytest -v --durations=0 -x test_fault_tolerance.py
112115
python -m pytest -v --durations=0 -x test_end_to_end.py
@@ -117,6 +120,7 @@ jobs:
117120
ray stop || true
118121
echo "running simple.py" && python simple.py --smoke-test
119122
echo "running simple_predict.py" && python simple_predict.py
123+
echo "running simple_modin.py" && python simple_modin.py --smoke-test
120124
echo "running simple_tune.py" && python simple_tune.py --smoke-test
121125
echo "running train_on_test_data.py" && python train_on_test_data.py --smoke-test
122126
# for f in *.py; do echo "running $f" && python "$f" || exit 1 ; done
@@ -125,7 +129,7 @@ jobs:
125129
# Test compatibility when some optional libraries are missing
126130
# Test runs on latest ray release
127131
runs-on: ubuntu-latest
128-
timeout-minutes: 12
132+
timeout-minutes: 14
129133
strategy:
130134
matrix:
131135
python-version: [3.6.9, 3.7, 3.8]
@@ -157,6 +161,7 @@ jobs:
157161
pushd xgboost_ray/tests
158162
python -m pytest -vv -s --log-cli-level=DEBUG --durations=0 -x test_colocation.py
159163
python -m pytest -v --durations=0 -x test_matrix.py
164+
python -m pytest -v --durations=0 -x test_data_source.py
160165
python -m pytest -v --durations=0 -x test_xgboost_api.py
161166
python -m pytest -v --durations=0 -x test_fault_tolerance.py
162167
python -m pytest -v --durations=0 -x test_end_to_end.py

examples/simple_modin.py

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
import argparse
2+
3+
import numpy as np
4+
import pandas as pd
5+
6+
import ray
7+
8+
from xgboost_ray import RayDMatrix, train, RayParams
9+
from xgboost_ray.data_sources.modin import MODIN_INSTALLED
10+
11+
12+
def main(cpus_per_actor, num_actors):
13+
if not MODIN_INSTALLED:
14+
print(f"Modin is not installed or installed in a version that is not "
15+
f"compatible with xgboost_ray (< 0.9.0).")
16+
return
17+
18+
# Import modin after initializing Ray
19+
from modin.distributed.dataframe.pandas import from_partitions
20+
21+
# Generate dataset
22+
x = np.repeat(range(8), 16).reshape((32, 4))
23+
# Even numbers --> 0, odd numbers --> 1
24+
y = np.tile(np.repeat(range(2), 4), 4)
25+
26+
# Flip some bits to reduce max accuracy
27+
bits_to_flip = np.random.choice(32, size=6, replace=False)
28+
y[bits_to_flip] = 1 - y[bits_to_flip]
29+
30+
data = pd.DataFrame(x)
31+
data["label"] = y
32+
33+
# Split into 4 partitions
34+
partitions = [ray.put(part) for part in np.split(data, 4)]
35+
36+
# Create modin df here
37+
modin_df = from_partitions(partitions, axis=0)
38+
39+
train_set = RayDMatrix(modin_df, "label")
40+
41+
evals_result = {}
42+
# Set XGBoost config.
43+
xgboost_params = {
44+
"tree_method": "approx",
45+
"objective": "binary:logistic",
46+
"eval_metric": ["logloss", "error"],
47+
}
48+
49+
# Train the classifier
50+
bst = train(
51+
params=xgboost_params,
52+
dtrain=train_set,
53+
evals=[(train_set, "train")],
54+
evals_result=evals_result,
55+
ray_params=RayParams(
56+
max_actor_restarts=0,
57+
gpus_per_actor=0,
58+
cpus_per_actor=cpus_per_actor,
59+
num_actors=num_actors),
60+
verbose_eval=False,
61+
num_boost_round=10)
62+
63+
model_path = "modin.xgb"
64+
bst.save_model(model_path)
65+
print("Final training error: {:.4f}".format(
66+
evals_result["train"]["error"][-1]))
67+
68+
69+
if __name__ == "__main__":
70+
parser = argparse.ArgumentParser()
71+
parser.add_argument(
72+
"--address",
73+
required=False,
74+
type=str,
75+
help="the address to use for Ray")
76+
parser.add_argument(
77+
"--cpus-per-actor",
78+
type=int,
79+
default=1,
80+
help="Sets number of CPUs per xgboost training worker.")
81+
parser.add_argument(
82+
"--num-actors",
83+
type=int,
84+
default=4,
85+
help="Sets number of xgboost workers to use.")
86+
parser.add_argument(
87+
"--smoke-test", action="store_true", default=False, help="gpu")
88+
89+
args, _ = parser.parse_known_args()
90+
91+
if args.smoke_test:
92+
ray.init(num_cpus=args.num_actors + 1)
93+
else:
94+
ray.init(address=args.address)
95+
96+
main(args.cpus_per_actor, args.num_actors)

requirements-test.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,4 +7,4 @@ pytest
77
pyarrow
88
ray[tune]
99
scikit-learn
10-
modin[ray]
10+
modin

xgboost_ray/data_sources/__init__.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,13 @@
66
from xgboost_ray.data_sources.petastorm import Petastorm
77
from xgboost_ray.data_sources.csv import CSV
88
from xgboost_ray.data_sources.parquet import Parquet
9+
from xgboost_ray.data_sources.object_store import ObjectStore
910

10-
data_sources = [Numpy, Pandas, Modin, MLDataset, Petastorm, CSV, Parquet]
11+
data_sources = [
12+
Numpy, Pandas, Modin, MLDataset, Petastorm, CSV, Parquet, ObjectStore
13+
]
1114

1215
__all__ = [
1316
"DataSource", "RayFileType", "Numpy", "Pandas", "Modin", "MLDataset",
14-
"Petastorm", "CSV", "Parquet"
17+
"Petastorm", "CSV", "Parquet", "ObjectStore"
1518
]

xgboost_ray/data_sources/data_source.py

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
1+
from typing import Any, Optional, Sequence, Tuple, Dict
2+
13
from enum import Enum
2-
from typing import Any, Optional, Sequence, Tuple
34

45
import pandas as pd
56

7+
from ray.actor import ActorHandle
8+
69

710
class RayFileType(Enum):
811
"""Enum for different file types (used for overrides)."""
@@ -63,7 +66,7 @@ def get_filetype(data: Any) -> Optional[RayFileType]:
6366
@staticmethod
6467
def load_data(data: Any,
6568
ignore: Optional[Sequence[str]] = None,
66-
indices: Optional[Sequence[int]] = None,
69+
indices: Optional[Sequence[Any]] = None,
6770
**kwargs) -> pd.DataFrame:
6871
"""
6972
Load data into a pandas dataframe.
@@ -73,7 +76,7 @@ def load_data(data: Any,
7376
Args:
7477
data (Any): Input data
7578
ignore (Optional[Sequence[str]]): Column names to ignore
76-
indices (Optional[Sequence[int]]): Indices to select. What an
79+
indices (Optional[Sequence[Any]]): Indices to select. What an
7780
index indicates depends on the data source.
7881
7982
Returns:
@@ -109,3 +112,24 @@ def get_column(cls, data: pd.DataFrame,
109112
def get_n(data: Any):
110113
"""Get length of data source partitions for sharding."""
111114
return len(list(data))
115+
116+
@staticmethod
117+
def get_actor_shards(
118+
data: Any,
119+
actors: Sequence[ActorHandle]) -> \
120+
Tuple[Any, Optional[Dict[int, Any]]]:
121+
"""Get a dict mapping actor ranks to shards.
122+
123+
Args:
124+
data (Any): Data to shard.
125+
126+
Returns:
127+
Returns a tuple of which the first element indicates the new
128+
data object that will overwrite the existing data object
129+
in the RayDMatrix (e.g. when the object is not serializable).
130+
The second element is a dict mapping actor ranks to shards.
131+
These objects are usually passed to the ``load_data()`` method
132+
for distributed loading, so that method needs to be able to
133+
deal with the respective data.
134+
"""
135+
return data, None

0 commit comments

Comments
 (0)