Skip to content

Commit db17f30

Browse files
committed
feat(ray): add opt-in downscaling for Ray autoscaler
Adds an opt-in Ray downscaling (scale-in) mechanism by retiring idle Flotilla workers to help Ray autoscaler shrink clusters when workloads become idle. Highlights: - Retire idle Ray workers with configurable idle threshold and a min-survivor floor. - Head-node protection and a pending-release blacklist to avoid immediate respawn. - Expose configuration via `daft.set_runner_ray(...)` and environment variables. - Keep scale-up behavior aligned with upstream high-water-mark ramp-up logic. - Document autoscaling/downscaling in `docs/distributed/ray.md`. Follow-ups on top of upstream PR #5903: - Resolve merge conflict with `main`: adopt the new `(scheduled, cancelled)` tuple from `Scheduler::schedule_tasks` and the `worker_id` field on `TaskEvent::Scheduled`. - Address @srilman review: move the per-tick downscale gating from `scheduler_actor.rs` into the generic `WorkerManager::retire_idle_workers`; `RayWorkerManager` now owns the enable flag, min-survivor floor, idle thresholds, head-node protection and blacklist TTLs.
1 parent a1e67c1 commit db17f30

7 files changed

Lines changed: 477 additions & 15 deletions

File tree

daft/runners/__init__.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
from __future__ import annotations
22

3+
import os
4+
35
from typing import TYPE_CHECKING
46
from daft.daft import get_runner as _get_runner_internal
57
from daft.daft import get_or_create_runner as _get_or_create_runner
@@ -66,20 +68,53 @@ def set_runner_ray(
6668
address: str | None = None,
6769
noop_if_initialized: bool = False,
6870
force_client_mode: bool = False,
71+
*,
72+
downscale_enabled: bool | None = None,
73+
downscale_idle_seconds: int | None = None,
74+
min_survivor_workers: int | None = None,
75+
pending_release_exclude_seconds: int | None = None,
6976
) -> Runner[PartitionT]:
7077
"""Configure Daft to execute dataframes using the Ray distributed computing framework.
7178
7279
Args:
7380
address: Ray cluster address to connect to. If None, connects to or starts a local Ray instance.
7481
noop_if_initialized: If True, skip initialization if Ray is already running.
7582
force_client_mode: If True, forces Ray to run in client mode.
83+
downscale_enabled: Enable/disable retiring idle Ray workers (scale-in). If not provided,
84+
falls back to the ``DAFT_AUTOSCALING_DOWNSCALE_ENABLED`` environment variable (default: False).
85+
downscale_idle_seconds: Minimum number of seconds a worker must be idle before it becomes eligible
86+
for retirement. If not provided, falls back to ``DAFT_AUTOSCALING_DOWNSCALE_IDLE_SECONDS``
87+
(default: 60).
88+
min_survivor_workers: Minimum number of Ray workers to keep alive even if they are idle.
89+
If not provided, falls back to ``DAFT_AUTOSCALING_MIN_SURVIVOR_WORKERS`` (default: 1).
90+
pending_release_exclude_seconds: Grace period (TTL) for recently-released worker IDs during
91+
worker discovery, to prevent the autoscaler from immediately respawning them. If not
92+
provided, falls back to ``DAFT_AUTOSCALING_PENDING_RELEASE_EXCLUDE_SECONDS`` (default: 120).
7693
7794
Returns:
7895
Runner[PartitionT]: A runner object with the Ray runner's configurations.
7996
8097
Note:
8198
Can also be configured via environment variable: DAFT_RUNNER=ray
8299
"""
100+
# Allow programmatic configuration of autoscaling/downscaling behavior via `daft.set_runner_ray`.
101+
# These settings are still backed by environment variables so they can propagate to the Rust
102+
# scheduler/worker-manager components without threading configuration throughout the stack.
103+
if downscale_enabled is not None:
104+
os.environ["DAFT_AUTOSCALING_DOWNSCALE_ENABLED"] = "1" if downscale_enabled else "0"
105+
if downscale_idle_seconds is not None:
106+
if downscale_idle_seconds < 0:
107+
raise ValueError("downscale_idle_seconds must be >= 0")
108+
os.environ["DAFT_AUTOSCALING_DOWNSCALE_IDLE_SECONDS"] = str(downscale_idle_seconds)
109+
if min_survivor_workers is not None:
110+
if min_survivor_workers < 0:
111+
raise ValueError("min_survivor_workers must be >= 0")
112+
os.environ["DAFT_AUTOSCALING_MIN_SURVIVOR_WORKERS"] = str(min_survivor_workers)
113+
if pending_release_exclude_seconds is not None:
114+
if pending_release_exclude_seconds < 0:
115+
raise ValueError("pending_release_exclude_seconds must be >= 0")
116+
os.environ["DAFT_AUTOSCALING_PENDING_RELEASE_EXCLUDE_SECONDS"] = str(pending_release_exclude_seconds)
117+
83118
return _set_runner_ray(
84119
address=address,
85120
noop_if_initialized=noop_if_initialized,

daft/runners/flotilla.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -515,6 +515,11 @@ def try_autoscale(bundles: list[dict[str, int]]) -> None:
515515
)
516516

517517

518+
def clear_autoscaling_requests() -> None:
519+
# Clear any previously requested resources by the Ray autoscaler.
520+
try_autoscale(bundles=[])
521+
522+
518523
@ray.remote(num_cpus=0)
519524
class RemoteFlotillaRunner:
520525
def __init__(

docs/distributed/ray.md

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,3 +145,25 @@ ray job submit \
145145
The runtime env parameter specifies that Daft should be installed on the Ray workers. Alternative methods of including Daft in the worker dependencies can be found [here](https://docs.ray.io/en/latest/ray-core/handling-dependencies.html).
146146

147147
For more information about Ray jobs, see [Ray docs -> Ray Jobs Overview](https://docs.ray.io/en/latest/cluster/running-applications/job-submission/index.html).
148+
149+
### Autoscaling and downscaling
150+
151+
When Daft runs on a Ray cluster managed by the Ray autoscaler (including KubeRay), it can send scale-up requests based on pending task demand. Ray's autoscaler request API is sticky: without additional coordination, the autoscaler may keep previously requested capacity even when the workload becomes idle.
152+
153+
Daft can optionally retire idle Flotilla workers (scale-in) and clear outstanding autoscaler requests to make it easier for Ray to scale the cluster back down. This feature is **opt-in**.
154+
155+
You can enable it via `set_runner_ray`:
156+
157+
```python
158+
import daft
159+
160+
daft.set_runner_ray(
161+
address="ray://<head_node_host>:10001",
162+
downscale_enabled=True,
163+
downscale_idle_seconds=60,
164+
min_survivor_workers=1,
165+
pending_release_exclude_seconds=120,
166+
)
167+
```
168+
169+
Or via environment variables (useful for Ray Jobs / KubeRay manifests): `DAFT_AUTOSCALING_DOWNSCALE_ENABLED` (default: false), `DAFT_AUTOSCALING_DOWNSCALE_IDLE_SECONDS` (default: 60), `DAFT_AUTOSCALING_MIN_SURVIVOR_WORKERS` (default: 1), and `DAFT_AUTOSCALING_PENDING_RELEASE_EXCLUDE_SECONDS` (default: 120).

src/daft-distributed/src/python/ray/worker.rs

Lines changed: 58 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,8 @@
1-
use std::{collections::HashMap, sync::Arc};
1+
use std::{
2+
collections::HashMap,
3+
sync::Arc,
4+
time::{Duration, Instant},
5+
};
26

37
use common_error::DaftResult;
48
use pyo3::prelude::*;
@@ -11,6 +15,15 @@ use crate::scheduling::{
1115

1216
type ActiveTaskDetails = HashMap<TaskContext, TaskDetails>;
1317

18+
#[derive(Debug, Clone, Copy)]
19+
pub(crate) enum ActorState {
20+
Ready,
21+
Busy,
22+
Idle,
23+
Releasing,
24+
Released,
25+
}
26+
1427
#[pyclass(module = "daft.daft", name = "RaySwordfishWorker", from_py_object)]
1528
#[derive(Debug, Clone)]
1629
pub(crate) struct RaySwordfishWorker {
@@ -21,6 +34,8 @@ pub(crate) struct RaySwordfishWorker {
2134
num_gpus: f64,
2235
active_task_details: ActiveTaskDetails,
2336
ip_address: String,
37+
last_task_finished_at: Instant,
38+
state: ActorState,
2439
}
2540

2641
#[pymethods]
@@ -42,6 +57,8 @@ impl RaySwordfishWorker {
4257
total_memory_bytes,
4358
active_task_details: Default::default(),
4459
ip_address,
60+
last_task_finished_at: Instant::now(),
61+
state: ActorState::Ready,
4562
}
4663
}
4764
}
@@ -51,8 +68,16 @@ impl RaySwordfishWorker {
5168
self.total_memory_bytes
5269
}
5370

71+
pub fn set_state(&mut self, state: ActorState) {
72+
self.state = state;
73+
}
74+
5475
pub fn mark_task_finished(&mut self, task_context: &TaskContext) {
5576
self.active_task_details.remove(task_context);
77+
self.last_task_finished_at = Instant::now();
78+
if self.active_task_details.is_empty() {
79+
self.set_state(ActorState::Idle);
80+
}
5681
}
5782

5883
pub fn submit_tasks(
@@ -75,6 +100,9 @@ impl RaySwordfishWorker {
75100

76101
self.active_task_details
77102
.insert(task_context.clone(), task_details);
103+
if self.active_task_details.len() == 1 {
104+
self.set_state(ActorState::Busy);
105+
}
78106

79107
let ray_task_result_handle = RayTaskResultHandle::new(
80108
task_context,
@@ -89,12 +117,41 @@ impl RaySwordfishWorker {
89117
Ok(task_handles)
90118
}
91119

120+
pub fn is_idle(&self) -> bool {
121+
self.active_task_details.is_empty()
122+
}
123+
124+
pub fn idle_duration(&self, now: Instant) -> Duration {
125+
if self.is_idle() {
126+
now.saturating_duration_since(self.last_task_finished_at)
127+
} else {
128+
Duration::from_secs(0)
129+
}
130+
}
131+
92132
#[allow(dead_code)]
93133
pub fn shutdown(&self, py: Python<'_>) {
94134
self.ray_worker_handle
95135
.call_method0(py, pyo3::intern!(py, "shutdown"))
96136
.expect("Failed to shutdown RaySwordfishWorker");
97137
}
138+
139+
pub fn release(&mut self, py: Python<'_>) {
140+
let inflight = self.active_task_details.len();
141+
if inflight > 0 {
142+
tracing::warn!(
143+
target: "ray_swordfish_worker",
144+
worker_id = %self.worker_id,
145+
inflight_tasks = inflight,
146+
"Cannot release worker because it has active tasks."
147+
);
148+
return;
149+
}
150+
151+
self.set_state(ActorState::Releasing);
152+
self.shutdown(py);
153+
self.set_state(ActorState::Released);
154+
}
98155
}
99156

100157
impl Worker for RaySwordfishWorker {

0 commit comments

Comments
 (0)