Skip to content
This repository was archived by the owner on Jan 12, 2026. It is now read-only.

Commit b7c3e7d

Browse files
authored
Improve Ray Datasets support (#259)
* WIP Signed-off-by: Antoni Baum <antoni.baum@protonmail.com> * Formatting Signed-off-by: Antoni Baum <antoni.baum@protonmail.com> * WIP Signed-off-by: Antoni Baum <antoni.baum@protonmail.com> * Remove test Signed-off-by: Antoni Baum <antoni.baum@protonmail.com> Signed-off-by: Antoni Baum <antoni.baum@protonmail.com>
1 parent 97c67ca commit b7c3e7d

File tree

2 files changed

+41
-201
lines changed

2 files changed

+41
-201
lines changed

xgboost_ray/data_sources/ray_dataset.py

Lines changed: 38 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,21 @@
11
from typing import Any, Optional, Sequence, Dict, Union, Tuple
22

3-
from collections import defaultdict
43
import pandas as pd
54

65
import ray
7-
from ray import ObjectRef
86
from ray.actor import ActorHandle
97

10-
from xgboost_ray.data_sources._distributed import \
11-
assign_partitions_to_actors, get_actor_rank_ips
128
from xgboost_ray.data_sources.data_source import DataSource, RayFileType
13-
from xgboost_ray.data_sources.object_store import ObjectStore
9+
from xgboost_ray.data_sources.pandas import Pandas
1410

1511
try:
1612
import ray.data.dataset # noqa: F401
1713
RAY_DATASET_AVAILABLE = True
1814
except (ImportError, AttributeError):
1915
RAY_DATASET_AVAILABLE = False
2016

17+
DATASET_TO_PANDAS_LIMIT = float("inf")
18+
2119

2220
def _assert_ray_data_available():
2321
if not RAY_DATASET_AVAILABLE:
@@ -44,72 +42,57 @@ def is_data_type(data: Any,
4442
return isinstance(data, ray.data.dataset.Dataset)
4543

4644
@staticmethod
47-
def load_data(
48-
data: Any, # ray.data.dataset.Dataset
49-
ignore: Optional[Sequence[str]] = None,
50-
indices: Optional[Union[Sequence[int], Sequence[
51-
ObjectRef]]] = None,
52-
**kwargs) -> pd.DataFrame:
45+
def load_data(data: "ray.data.dataset.Dataset",
46+
ignore: Optional[Sequence[str]] = None,
47+
indices: Optional[Union[Sequence[int], Sequence[
48+
"ray.data.dataset.Dataset"]]] = None,
49+
**kwargs) -> pd.DataFrame:
5350
_assert_ray_data_available()
5451

55-
if indices is not None and len(indices) > 0 and isinstance(
56-
indices[0], ObjectRef):
57-
# We got a list of ObjectRefs belonging to Ray dataset partitions
58-
return ObjectStore.load_data(
59-
data=indices, indices=None, ignore=ignore)
60-
61-
if hasattr(data, "to_pandas_refs"):
62-
obj_refs = data.to_pandas_refs()
63-
else:
64-
# Legacy API
65-
obj_refs = data.to_pandas()
52+
if indices is not None:
53+
if len(indices) > 0 and isinstance(indices[0],
54+
ray.data.dataset.Dataset):
55+
# We got a list of Datasets belonging a partition
56+
data = indices
57+
else:
58+
data = [data[i] for i in indices]
6659

67-
ray.wait(obj_refs)
68-
return ObjectStore.load_data(obj_refs, ignore=ignore, indices=indices)
60+
local_df = [ds.to_pandas(limit=DATASET_TO_PANDAS_LIMIT) for ds in data]
61+
return Pandas.load_data(pd.concat(local_df, copy=False), ignore=ignore)
6962

7063
@staticmethod
71-
def convert_to_series(data: Any) -> pd.Series:
64+
def convert_to_series(data: Union["ray.data.dataset.Dataset", Sequence[
65+
"ray.data.dataset.Dataset"]]) -> pd.Series:
7266
_assert_ray_data_available()
7367

74-
obj_refs = data.to_pandas()
75-
return ObjectStore.convert_to_series(obj_refs)
68+
if isinstance(data, ray.data.dataset.Dataset):
69+
data = data.to_pandas(limit=DATASET_TO_PANDAS_LIMIT)
70+
else:
71+
data = pd.concat(
72+
[ds.to_pandas(limit=DATASET_TO_PANDAS_LIMIT) for ds in data],
73+
copy=False)
74+
return DataSource.convert_to_series(data)
7675

7776
@staticmethod
7877
def get_actor_shards(
79-
data: Any, # ray.data.dataset.Dataset
78+
data: "ray.data.dataset.Dataset",
8079
actors: Sequence[ActorHandle]) -> \
8180
Tuple[Any, Optional[Dict[int, Any]]]:
8281
_assert_ray_data_available()
8382

84-
actor_rank_ips = get_actor_rank_ips(actors)
83+
# We do not use our assign_partitions_to_actors as assignment of splits
84+
# to actors is handled by locality_hints argument.
8585

86-
# Map node IDs to IP
87-
node_id_to_ip = {
88-
node["NodeID"]: node["NodeManagerAddress"]
89-
for node in ray.nodes()
90-
}
86+
dataset_splits = data.split(
87+
len(actors),
88+
equal=True,
89+
locality_hints=actors,
90+
)
9191

92-
# Get object store locations
93-
if hasattr(data, "to_pandas_refs"):
94-
obj_refs = data.to_pandas_refs()
95-
else:
96-
# Legacy API
97-
obj_refs = data.to_pandas()
98-
ray.wait(obj_refs)
99-
100-
ip_to_parts = defaultdict(list)
101-
for part_obj, location in ray.experimental.get_object_locations(
102-
obj_refs).items():
103-
if len(location["node_ids"]) == 0:
104-
node_id = None
105-
else:
106-
node_id = location["node_ids"][0]
107-
108-
ip = node_id_to_ip.get(node_id, None)
109-
ip_to_parts[ip].append(part_obj)
110-
111-
# Ray datasets should not be serialized
112-
return None, assign_partitions_to_actors(ip_to_parts, actor_rank_ips)
92+
return None, {
93+
i: [dataset_split]
94+
for i, dataset_split in enumerate(dataset_splits)
95+
}
11396

11497
@staticmethod
11598
def get_n(data: Any):

xgboost_ray/tests/test_data_source.py

Lines changed: 3 additions & 146 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,6 @@
99
from ray import ObjectRef
1010

1111
from xgboost_ray.data_sources import Modin, Dask, Partitioned
12-
from xgboost_ray.data_sources.ray_dataset import RAY_DATASET_AVAILABLE, \
13-
RayDataset
1412
from xgboost_ray.main import _RemoteRayXGBoostActor
1513

1614
from xgboost_ray.data_sources.modin import MODIN_INSTALLED
@@ -436,150 +434,9 @@ def create_remote_df(arr):
436434
f"partition {i} is not partition with ID {part_id}.")
437435

438436

439-
@unittest.skipIf(
440-
not RAY_DATASET_AVAILABLE,
441-
reason="Ray datasets are not available in this version of Ray")
442-
class RayDatasetSourceTest(_DistributedDataSourceTest, unittest.TestCase):
443-
def _testAssignPartitions(self, part_nodes, actor_nodes,
444-
expected_actor_parts):
445-
partitions = [
446-
ray.put(
447-
pd.DataFrame(p, columns=[str(x) for x in range(p.shape[1])]))
448-
for p in np.array_split(self.x, len(part_nodes))
449-
]
450-
451-
# Dict from partition (obj ref) to node host
452-
part_to_node = dict(zip(partitions, [f"node{n}" for n in part_nodes]))
453-
454-
actors_to_node = dict(enumerate(f"node{n}" for n in actor_nodes))
455-
456-
actor_to_parts = self._getActorToParts(actors_to_node, partitions,
457-
part_to_node, part_nodes)
458-
459-
print(expected_actor_parts)
460-
print(actor_to_parts)
461-
462-
for actor_rank, part_ids in expected_actor_parts.items():
463-
for i, part_id in enumerate(part_ids):
464-
self.assertEqual(
465-
actor_to_parts[actor_rank][i],
466-
partitions[part_id],
467-
msg=f"Assignment failed: Actor rank {actor_rank}, "
468-
f"partition {i} is not partition with ID {part_id}.")
469-
470-
def _getActorToParts(self, actors_to_node, partitions, part_to_node,
471-
part_nodes):
472-
def get_object_locations(data, *args, **kwargs):
473-
return {
474-
partitions[i]: {
475-
"node_ids": [part_nodes[i]],
476-
"object_size": 1
477-
}
478-
for i in range(len(data))
479-
}
480-
481-
def node_map():
482-
return [{
483-
"NodeID": n,
484-
"NodeManagerAddress": f"node{n}"
485-
} for n in range(4)]
486-
487-
def actor_ranks(actors):
488-
return actors_to_node
489-
490-
with patch(
491-
"ray.experimental.get_object_locations"
492-
) as mock_locations, patch("ray.nodes") as mock_nodes, patch(
493-
"xgboost_ray.data_sources.ray_dataset.get_actor_rank_ips"
494-
) as mock_ranks:
495-
mock_locations.side_effect = get_object_locations
496-
mock_nodes.side_effect = node_map
497-
mock_ranks.side_effect = actor_ranks
498-
499-
if hasattr(ray.data, "from_pandas_refs"):
500-
data = ray.data.from_pandas_refs(list(part_to_node.keys()))
501-
else:
502-
# Legacy API
503-
data = ray.data.from_pandas(list(part_to_node.keys()))
504-
505-
_, actor_to_parts = RayDataset.get_actor_shards(
506-
data=data, actors=[])
507-
508-
return actor_to_parts
509-
510-
def _testDataSourceAssignment(self, part_nodes, actor_nodes,
511-
expected_actor_parts):
512-
node_ips = [
513-
node["NodeManagerAddress"] for node in ray.nodes() if node["Alive"]
514-
]
515-
if len(node_ips) < max(max(actor_nodes), max(part_nodes)) + 1:
516-
print("Not running on cluster, skipping rest of this test.")
517-
return
518-
519-
actor_node_ips = [node_ips[nid] for nid in actor_nodes]
520-
part_node_ips = [node_ips[nid] for nid in part_nodes]
521-
522-
# Initialize data frames on remote nodes
523-
# This way we can control which partition is on which node
524-
@ray.remote(num_cpus=0.1)
525-
def create_remote_df(arr):
526-
return ray.put(pd.DataFrame(arr))
527-
528-
partitions = np.array_split(self.x, len(part_nodes))
529-
node_dfs: List[ObjectRef] = ray.get([
530-
create_remote_df.options(resources={
531-
f"node:{pip}": 0.1
532-
}).remote(partitions[pid]) for pid, pip in enumerate(part_node_ips)
533-
])
534-
535-
# Create Ray dataset from distributed partitions
536-
if hasattr(ray.data, "from_pandas_refs"):
537-
ray_ds = ray.data.from_pandas_refs(node_dfs)
538-
df_objs = ray_ds.to_pandas_refs()
539-
else:
540-
# Legacy API
541-
ray_ds = ray.data.from_pandas(node_dfs)
542-
df_objs = ray_ds.to_pandas()
543-
544-
ray.wait(df_objs)
545-
locations = ray.experimental.get_object_locations(df_objs)
546-
547-
try:
548-
self.assertSequenceEqual(
549-
[df[0][0] for df in partitions],
550-
[df[0][0] for df in ray.get(list(df_objs))],
551-
msg="Ray datasets mixed up the partition order")
552-
553-
self.assertSequenceEqual(
554-
part_node_ips,
555-
locations,
556-
msg="Ray datasets moved partitions to different IPs")
557-
except AssertionError as exc:
558-
print(f"Ray dataset part of the test failed: {exc}")
559-
print("This is a stochastic test failure. Ignoring the rest "
560-
"of this test.")
561-
return
562-
563-
# Create ray actors
564-
actors = [
565-
_RemoteRayXGBoostActor.options(resources={
566-
f"node:{nip}": 0.1
567-
}).remote(rank=rank, num_actors=len(actor_nodes))
568-
for rank, nip in enumerate(actor_node_ips)
569-
]
570-
571-
# Calculate shards
572-
_, actor_to_parts = RayDataset.get_actor_shards(ray_ds, actors)
573-
574-
for actor_rank, part_ids in expected_actor_parts.items():
575-
for i, part_id in enumerate(part_ids):
576-
assigned_df = ray.get(actor_to_parts[actor_rank][i])
577-
part_df = pd.DataFrame(partitions[part_id])
578-
579-
self.assertTrue(
580-
assigned_df.equals(part_df),
581-
msg=f"Assignment failed: Actor rank {actor_rank}, "
582-
f"partition {i} is not partition with ID {part_id}.")
437+
# Ray Datasets data source is not tested, as we do not make use of xgboost-ray
438+
# partition-to-actor assign logic. Furthermore, xgboost-ray with Ray Datasets
439+
# is tested in ray-project/ray.
583440

584441

585442
class PartitionedSourceTest(_DistributedDataSourceTest, unittest.TestCase):

0 commit comments

Comments
 (0)