Skip to content

Commit a2080a6

Browse files
[Data] Refactor _AutoscalingCoordinatorActor to use direct threading instead of self-referential remote calls (ray-project#60320)
Refactor `_AutoscalingCoordinatorActor` to use direct threading with a `threading.Lock` instead of self-referential remote calls. This eliminates potential Ray Core timeout errors for internal operations and makes the code easier to debug. Fixes ray-project#60190 Contribution by Gittensor, see my contribution statistics at https://gittensor.io/miners/details?githubId=171929553 --------- Signed-off-by: DeborahOlaboye <deboraholaboye@gmail.com> Co-authored-by: Balaji Veeramani <balaji@anyscale.com>
1 parent db06618 commit a2080a6

File tree

2 files changed

+54
-52
lines changed

2 files changed

+54
-52
lines changed

python/ray/data/_internal/cluster_autoscaler/default_autoscaling_coordinator.py

Lines changed: 52 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -254,30 +254,29 @@ def __init__(
254254

255255
self._ongoing_reqs: Dict[str, OngoingRequest] = {}
256256
self._cluster_node_resources: List[ResourceDict] = []
257+
# Lock for thread-safe access to shared state from the background
258+
self._lock = threading.Lock()
257259
self._update_cluster_node_resources()
258260

259261
# This is an actor, so the following check should always be True.
260262
# It's only needed for unit tests.
261263
if ray.is_initialized():
262-
self._self_handle = ray.get_runtime_context().current_actor
263-
264264
# Start a thread to perform periodical operations.
265265
def tick_thread_run():
266266
while True:
267-
# Call tick() as an actor task,
268-
# so we don't need to handle multi-threading.
269267
time.sleep(self.TICK_INTERVAL_S)
270-
ray.get(self._self_handle.tick.remote())
268+
self._tick()
271269

272270
self._tick_thread = threading.Thread(target=tick_thread_run, daemon=True)
273271
self._tick_thread.start()
274272

275-
def tick(self):
273+
def _tick(self):
276274
"""Used to perform periodical operations, e.g., purge expired requests,
277275
merge and send requests, check cluster resource updates, etc."""
278-
self._merge_and_send_requests()
279-
self._update_cluster_node_resources()
280-
self._reallocate_resources()
276+
with self._lock:
277+
self._merge_and_send_requests()
278+
self._update_cluster_node_resources()
279+
self._reallocate_resources()
281280

282281
def request_resources(
283282
self,
@@ -288,51 +287,53 @@ def request_resources(
288287
priority: ResourceRequestPriority = ResourceRequestPriority.MEDIUM,
289288
) -> None:
290289
logger.debug("Received request from %s: %s.", requester_id, resources)
291-
# Round up the resource values to integers,
292-
# because the Autoscaler SDK only accepts integer values.
293-
for r in resources:
294-
for k in r:
295-
r[k] = math.ceil(r[k])
296-
now = self._get_current_time()
297-
request_updated = False
298-
old_req = self._ongoing_reqs.get(requester_id)
299-
if old_req is not None:
300-
if request_remaining != old_req.request_remaining:
301-
raise ValueError(
302-
"Cannot change request_remaining flag of an ongoing request."
290+
with self._lock:
291+
# Round up the resource values to integers,
292+
# because the Autoscaler SDK only accepts integer values.
293+
for r in resources:
294+
for k in r:
295+
r[k] = math.ceil(r[k])
296+
now = self._get_current_time()
297+
request_updated = False
298+
old_req = self._ongoing_reqs.get(requester_id)
299+
if old_req is not None:
300+
if request_remaining != old_req.request_remaining:
301+
raise ValueError(
302+
"Cannot change request_remaining flag of an ongoing request."
303+
)
304+
if priority.value != old_req.priority:
305+
raise ValueError("Cannot change priority of an ongoing request.")
306+
307+
request_updated = resources != old_req.requested_resources
308+
old_req.requested_resources = resources
309+
old_req.expiration_time = now + expire_after_s
310+
else:
311+
request_updated = True
312+
self._ongoing_reqs[requester_id] = OngoingRequest(
313+
first_request_time=now,
314+
requested_resources=resources,
315+
request_remaining=request_remaining,
316+
priority=priority.value,
317+
expiration_time=now + expire_after_s,
318+
allocated_resources=[],
303319
)
304-
if priority.value != old_req.priority:
305-
raise ValueError("Cannot change priority of an ongoing request.")
306-
307-
request_updated = resources != old_req.requested_resources
308-
old_req.requested_resources = resources
309-
old_req.expiration_time = now + expire_after_s
310-
else:
311-
request_updated = True
312-
self._ongoing_reqs[requester_id] = OngoingRequest(
313-
first_request_time=now,
314-
requested_resources=resources,
315-
request_remaining=request_remaining,
316-
priority=priority.value,
317-
expiration_time=now + expire_after_s,
318-
allocated_resources=[],
319-
)
320-
if request_updated:
321-
# If the request has updated, immediately send
322-
# a new request and reallocate resources.
323-
self._merge_and_send_requests()
324-
self._reallocate_resources()
320+
if request_updated:
321+
# If the request has updated, immediately send
322+
# a new request and reallocate resources.
323+
self._merge_and_send_requests()
324+
self._reallocate_resources()
325325

326326
def cancel_request(
327327
self,
328328
requester_id: str,
329329
):
330330
logger.debug("Canceling request for %s.", requester_id)
331-
if requester_id not in self._ongoing_reqs:
332-
return
333-
del self._ongoing_reqs[requester_id]
334-
self._merge_and_send_requests()
335-
self._reallocate_resources()
331+
with self._lock:
332+
if requester_id not in self._ongoing_reqs:
333+
return
334+
del self._ongoing_reqs[requester_id]
335+
self._merge_and_send_requests()
336+
self._reallocate_resources()
336337

337338
def _purge_expired_requests(self):
338339
now = self._get_current_time()
@@ -352,9 +353,10 @@ def _merge_and_send_requests(self):
352353

353354
def get_allocated_resources(self, requester_id: str) -> List[ResourceDict]:
354355
"""Get the allocated resources for the requester."""
355-
if requester_id not in self._ongoing_reqs:
356-
return []
357-
return self._ongoing_reqs[requester_id].allocated_resources
356+
with self._lock:
357+
if requester_id not in self._ongoing_reqs:
358+
return []
359+
return self._ongoing_reqs[requester_id].allocated_resources
358360

359361
def _maybe_subtract_resources(self, res1: ResourceDict, res2: ResourceDict) -> bool:
360362
"""If res2<=res1, subtract res2 from res1 in-place, and return True.

python/ray/data/tests/test_autoscaling_coordinator.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ def _remove_head_node_resources(res):
153153

154154
# After req1_timeout, req1 should be expired.
155155
mocked_time = req1_timeout + 0.1
156-
as_coordinator.tick()
156+
as_coordinator._tick()
157157
mock_request_resources.assert_called_with(req2)
158158
res1 = as_coordinator.get_allocated_resources("requester1")
159159
res2 = as_coordinator.get_allocated_resources("requester2")
@@ -164,7 +164,7 @@ def _remove_head_node_resources(res):
164164

165165
# After req2_timeout, req2 should be expired.
166166
mocked_time = req2_timeout + 0.1
167-
as_coordinator.tick()
167+
as_coordinator._tick()
168168
mock_request_resources.assert_called_with([])
169169
res1 = as_coordinator.get_allocated_resources("requester1")
170170
res2 = as_coordinator.get_allocated_resources("requester2")

0 commit comments

Comments
 (0)