Skip to content
This repository was archived by the owner on Jan 12, 2026. It is now read-only.

Commit 929b3e3

Browse files
authored
Remove legacy code (#217)
There are several places where we still have code paths for legacy Ray versions. With the next release we can require newer Ray versions to be used.
1 parent fc08c62 commit 929b3e3

File tree

5 files changed

+33
-222
lines changed

5 files changed

+33
-222
lines changed

setup.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,5 @@
1010
"distributed computing framework Ray.",
1111
url="https://github.com/ray-project/xgboost_ray",
1212
install_requires=[
13-
"ray>=1.6", "numpy>=1.16", "pandas", "wrapt>=1.12.1",
14-
"xgboost>=0.90"
13+
"ray>=1.10", "numpy>=1.16", "pandas", "wrapt>=1.12.1", "xgboost>=0.90"
1514
])

xgboost_ray/main.py

Lines changed: 8 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -38,15 +38,11 @@ class EarlyStopException(XGBoostError):
3838
from ray.util.annotations import PublicAPI, DeveloperAPI
3939
from ray.util.placement_group import PlacementGroup, \
4040
remove_placement_group, get_current_placement_group
41+
from ray.util.queue import Queue
4142

42-
from xgboost_ray.util import Event, Queue, MultiActorTask, \
43-
force_on_current_node
43+
from xgboost_ray.util import Event, MultiActorTask, force_on_current_node
4444

45-
if LooseVersion(ray.__version__) >= LooseVersion("1.5.0"):
46-
# https://github.com/ray-project/ray/pull/16437
47-
DEFAULT_PG = "default"
48-
else:
49-
DEFAULT_PG = None
45+
DEFAULT_PG = "default"
5046

5147
RAY_INSTALLED = True
5248
except ImportError:
@@ -63,7 +59,7 @@ def inner_f(*args, **kwargs):
6359
RAY_INSTALLED = False
6460

6561
from xgboost_ray.tune import _try_add_tune_callback, _get_tune_resources, \
66-
TUNE_USING_PG, is_session_enabled
62+
is_session_enabled
6763

6864
from xgboost_ray.matrix import RayDMatrix, combine_data, \
6965
RayDeviceQuantileDMatrix, RayDataIter, concat_dataframes, \
@@ -849,7 +845,7 @@ def _create_communication_processes(added_tune_callback: bool = False):
849845
node_ip = get_node_ip_address()
850846
# Have to explicitly set num_cpus to 0.
851847
placement_option = {"num_cpus": 0}
852-
if added_tune_callback and TUNE_USING_PG:
848+
if added_tune_callback:
853849
# If Tune is using placement groups, then we force Queue and
854850
# StopEvent onto same bundle as the Trainable.
855851
# This forces all 3 to be on the same node.
@@ -1388,12 +1384,9 @@ def _wrapped(*args, **kwargs):
13881384
placement_strategy = None
13891385
if not ray_params.elastic_training:
13901386
if added_tune_callback:
1391-
if TUNE_USING_PG:
1392-
# If Tune is using placement groups, then strategy has already
1393-
# been set. Don't create an additional placement_group here.
1394-
placement_strategy = None
1395-
else:
1396-
placement_strategy = "PACK"
1387+
# Tune is using placement groups, so the strategy has already
1388+
# been set. Don't create an additional placement_group here.
1389+
placement_strategy = None
13971390
elif bool(ENV.USE_SPREAD_STRATEGY):
13981391
placement_strategy = "SPREAD"
13991392

xgboost_ray/tests/test_colocation.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,10 @@
88
import numpy as np
99

1010
import ray
11+
from ray.util.queue import _QueueActor
1112
from xgboost_ray import train, RayDMatrix, RayParams
1213
from xgboost_ray.main import _train
13-
from xgboost_ray.util import _EventActor, _QueueActor
14+
from xgboost_ray.util import _EventActor
1415

1516

1617
class _MockQueueActor(_QueueActor):
@@ -57,7 +58,7 @@ def tearDown(self) -> None:
5758
shutil.rmtree(self.tmpdir)
5859
ray.shutdown()
5960

60-
@patch("xgboost_ray.util._QueueActor", _MockQueueActor)
61+
@patch("ray.util.queue._QueueActor", _MockQueueActor)
6162
@patch("xgboost_ray.util._EventActor", _MockEventActor)
6263
def test_communication_colocation(self):
6364
"""Checks that Queue and Event actors are colocated with the driver."""

xgboost_ray/tune.py

Lines changed: 21 additions & 130 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,14 @@
11
# Tune imports.
2-
import os
3-
from typing import Dict, Union, List, Optional
2+
from typing import Dict, Optional
43

54
import ray
65

7-
try:
8-
from typing import OrderedDict
9-
except ImportError:
10-
from collections import OrderedDict
11-
126
import logging
137

148
from ray.util.annotations import PublicAPI
159

1610
from xgboost_ray.xgb import xgboost as xgb
1711

18-
from xgboost_ray.compat import TrainingCallback
1912
from xgboost_ray.session import put_queue, get_rabit_rank
2013
from xgboost_ray.util import Unavailable, force_on_current_node
2114

@@ -42,90 +35,7 @@ def is_session_enabled():
4235
flatten_dict = is_session_enabled
4336
TUNE_INSTALLED = False
4437

45-
# Todo(krfricke): Remove after next ray core release
46-
if not hasattr(OrigTuneReportCallback, "_get_report_dict") or not issubclass(
47-
OrigTuneReportCallback, TrainingCallback):
48-
TUNE_LEGACY = True
49-
else:
50-
TUNE_LEGACY = False
51-
52-
# Todo(amogkam): Remove after Ray 1.3 release.
53-
try:
54-
from ray.tune import PlacementGroupFactory
55-
56-
TUNE_USING_PG = True
57-
except ImportError:
58-
TUNE_USING_PG = False
59-
PlacementGroupFactory = Unavailable
60-
61-
if TUNE_LEGACY and TUNE_INSTALLED:
62-
# Until the next release, keep compatible callbacks here.
63-
class TuneReportCallback(OrigTuneReportCallback, TrainingCallback):
64-
def _get_report_dict(self, evals_log):
65-
if isinstance(evals_log, OrderedDict):
66-
# xgboost>=1.3
67-
result_dict = flatten_dict(evals_log, delimiter="-")
68-
for k in list(result_dict):
69-
result_dict[k] = result_dict[k][0]
70-
else:
71-
# xgboost<1.3
72-
result_dict = dict(evals_log)
73-
if not self._metrics:
74-
report_dict = result_dict
75-
else:
76-
report_dict = {}
77-
for key in self._metrics:
78-
if isinstance(self._metrics, dict):
79-
metric = self._metrics[key]
80-
else:
81-
metric = key
82-
report_dict[key] = result_dict[metric]
83-
return report_dict
84-
85-
def after_iteration(self, model, epoch: int, evals_log: Dict):
86-
if get_rabit_rank() == 0:
87-
report_dict = self._get_report_dict(evals_log)
88-
put_queue(lambda: tune.report(**report_dict))
89-
90-
class _TuneCheckpointCallback(_OrigTuneCheckpointCallback,
91-
TrainingCallback):
92-
def __init__(self, filename: str, frequency: int):
93-
super(_TuneCheckpointCallback, self).__init__(filename)
94-
self._frequency = frequency
95-
96-
@staticmethod
97-
def _create_checkpoint(model, epoch: int, filename: str,
98-
frequency: int):
99-
if epoch % frequency > 0:
100-
return
101-
with tune.checkpoint_dir(step=epoch) as checkpoint_dir:
102-
model.save_model(os.path.join(checkpoint_dir, filename))
103-
104-
def after_iteration(self, model, epoch: int, evals_log: Dict):
105-
if get_rabit_rank() == 0:
106-
put_queue(lambda: self._create_checkpoint(
107-
model, epoch, self._filename, self._frequency))
108-
109-
class TuneReportCheckpointCallback(OrigTuneReportCheckpointCallback,
110-
TrainingCallback):
111-
_checkpoint_callback_cls = _TuneCheckpointCallback
112-
_report_callbacks_cls = TuneReportCallback
113-
114-
def __init__(
115-
self,
116-
metrics: Union[None, str, List[str], Dict[str, str]] = None,
117-
filename: str = "checkpoint",
118-
frequency: int = 5):
119-
self._checkpoint = self._checkpoint_callback_cls(
120-
filename, frequency)
121-
self._report = self._report_callbacks_cls(metrics)
122-
123-
def after_iteration(self, model, epoch: int, evals_log: Dict):
124-
if get_rabit_rank() == 0:
125-
self._checkpoint.after_iteration(model, epoch, evals_log)
126-
self._report.after_iteration(model, epoch, evals_log)
127-
128-
elif TUNE_INSTALLED:
38+
if TUNE_INSTALLED:
12939
# New style callbacks.
13040
class TuneReportCallback(OrigTuneReportCallback):
13141
def after_iteration(self, model, epoch: int, evals_log: Dict):
@@ -168,15 +78,10 @@ def _try_add_tune_callback(kwargs: Dict):
16878
target="xgboost_ray.tune.TuneReportCallback"))
16979
has_tune_callback = True
17080
elif isinstance(cb, OrigTuneReportCheckpointCallback):
171-
if TUNE_LEGACY:
172-
replace_cb = TuneReportCheckpointCallback(
173-
metrics=cb._report._metrics,
174-
filename=cb._checkpoint._filename)
175-
else:
176-
replace_cb = TuneReportCheckpointCallback(
177-
metrics=cb._report._metrics,
178-
filename=cb._checkpoint._filename,
179-
frequency=cb._checkpoint._frequency)
81+
replace_cb = TuneReportCheckpointCallback(
82+
metrics=cb._report._metrics,
83+
filename=cb._checkpoint._filename,
84+
frequency=cb._checkpoint._frequency)
18085
new_callbacks.append(replace_cb)
18186
logging.warning(
18287
REPLACE_MSG.format(
@@ -203,35 +108,21 @@ def _get_tune_resources(num_actors: int, cpus_per_actor: int,
203108
resources_per_actor: Optional[Dict]):
204109
"""Returns object to use for ``resources_per_trial`` with Ray Tune."""
205110
if TUNE_INSTALLED:
206-
if not TUNE_USING_PG:
207-
resources_per_actor = {} if not resources_per_actor \
208-
else resources_per_actor
209-
extra_custom_resources = {
210-
k: v * num_actors
211-
for k, v in resources_per_actor.items()
212-
}
213-
return dict(
214-
cpu=1,
215-
extra_cpu=cpus_per_actor * num_actors,
216-
extra_gpu=gpus_per_actor * num_actors,
217-
extra_custom_resources=extra_custom_resources,
218-
)
219-
else:
220-
from ray.tune import PlacementGroupFactory
221-
222-
head_bundle = {"CPU": 1}
223-
child_bundle = {"CPU": cpus_per_actor, "GPU": gpus_per_actor}
224-
child_bundle_extra = {} if resources_per_actor is None else \
225-
resources_per_actor
226-
child_bundles = [{
227-
**child_bundle,
228-
**child_bundle_extra
229-
} for _ in range(num_actors)]
230-
bundles = [head_bundle] + child_bundles
231-
placement_group_factory = PlacementGroupFactory(
232-
bundles, strategy="PACK")
233-
234-
return placement_group_factory
111+
from ray.tune import PlacementGroupFactory
112+
113+
head_bundle = {"CPU": 1}
114+
child_bundle = {"CPU": cpus_per_actor, "GPU": gpus_per_actor}
115+
child_bundle_extra = {} if resources_per_actor is None else \
116+
resources_per_actor
117+
child_bundles = [{
118+
**child_bundle,
119+
**child_bundle_extra
120+
} for _ in range(num_actors)]
121+
bundles = [head_bundle] + child_bundles
122+
placement_group_factory = PlacementGroupFactory(
123+
bundles, strategy="PACK")
124+
125+
return placement_group_factory
235126
else:
236127
raise RuntimeError("Tune is not installed, so `get_tune_resources` is "
237128
"not supported. You can install Ray Tune via `pip "

xgboost_ray/util.py

Lines changed: 0 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44

55
import ray
66
from ray.util.annotations import DeveloperAPI
7-
from ray.util.queue import Queue as RayQueue, Empty, Full
87

98

109
@DeveloperAPI
@@ -50,78 +49,6 @@ def shutdown(self):
5049
self.actor = None
5150

5251

53-
# Remove after Ray 1.2 release.
54-
if getattr(RayQueue, "shutdown", None) is not None:
55-
from ray.util.queue import _QueueActor
56-
else:
57-
# Have to copy the class here so that we can subclass this for mocking.
58-
# If we have the @ray.remote decorator, then we can't subclass it.
59-
class _QueueActor:
60-
def __init__(self, maxsize):
61-
self.maxsize = maxsize
62-
self.queue = asyncio.Queue(self.maxsize)
63-
64-
def qsize(self):
65-
return self.queue.qsize()
66-
67-
def empty(self):
68-
return self.queue.empty()
69-
70-
def full(self):
71-
return self.queue.full()
72-
73-
async def put(self, item, timeout=None):
74-
try:
75-
await asyncio.wait_for(self.queue.put(item), timeout)
76-
except asyncio.TimeoutError:
77-
raise Full
78-
79-
async def get(self, timeout=None):
80-
try:
81-
return await asyncio.wait_for(self.queue.get(), timeout)
82-
except asyncio.TimeoutError:
83-
raise Empty
84-
85-
def put_nowait(self, item):
86-
self.queue.put_nowait(item)
87-
88-
def put_nowait_batch(self, items):
89-
# If maxsize is 0, queue is unbounded, so no need to check size.
90-
if self.maxsize > 0 and len(items) + self.qsize() > self.maxsize:
91-
raise Full(f"Cannot add {len(items)} items to queue of size "
92-
f"{self.qsize()} and maxsize {self.maxsize}.")
93-
for item in items:
94-
self.queue.put_nowait(item)
95-
96-
def get_nowait(self):
97-
return self.queue.get_nowait()
98-
99-
def get_nowait_batch(self, num_items):
100-
if num_items > self.qsize():
101-
raise Empty(f"Cannot get {num_items} items from queue of size "
102-
f"{self.qsize()}.")
103-
return [self.queue.get_nowait() for _ in range(num_items)]
104-
105-
106-
# Remove after Ray 1.2 release.
107-
@DeveloperAPI
108-
class Queue(RayQueue):
109-
def __init__(self, maxsize: int = 0,
110-
actor_options: Optional[Dict] = None) -> None:
111-
actor_options = actor_options or {}
112-
self.maxsize = maxsize
113-
self.actor = ray.remote(_QueueActor).options(**actor_options).remote(
114-
self.maxsize)
115-
116-
def shutdown(self):
117-
if getattr(RayQueue, "shutdown", None) is not None:
118-
super(Queue, self).shutdown()
119-
else:
120-
if self.actor:
121-
ray.kill(self.actor)
122-
self.actor = None
123-
124-
12552
@DeveloperAPI
12653
class MultiActorTask:
12754
"""Utility class to hold multiple futures.

0 commit comments

Comments
 (0)