You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
In Daft's current Ray Runner (Flotilla mode) , we have identified several limitations regarding elastic resource scaling:
One-way Scaling (Scale-up Only) : The SchedulerActor::run_scheduler_loop calculates required resources based on the task backlog and calls ray.autoscaler.sdk.request_resources to provision new nodes. However, this interface imposes a mandatory resource constraint . Even when tasks are completed and resources are no longer needed, Ray will not release these worker nodes because the "request" persists.
Resource Waste : When there are no running SwordfishTask s, idle worker nodes (and the RaySwordfishWorker actors on them) are not released, leading to significant resource wastage and increased costs.
Lack of Actor State Management : There is currently insufficient state tracking for UDFActor s (specifically RaySwordfishWorker ). The scheduler cannot easily distinguish between busy and truly idle actors to safely make scale-down decisions without interrupting ongoing tasks.
2. Goals
We propose a comprehensive Dynamic Resource Scaling solution for Daft on Ray to achieve:
Bidirectional Scaling : Support both automatic scale-out (based on load) and scale-in (releasing idle resources).
Stability : Prevent "thrashing" (rapid scale up/down) via cool-down periods and blacklisting mechanisms.
Compatibility : Seamless integration with the existing SchedulerActor loop and Ray's native autoscaler.
3. Proposed Architecture
We propose a Two-Layer Scaling Control Loop architecture:
Application Layer (Daft Scheduler) :
Monitors task queues and Worker states ( Busy , Idle ).
Decides which specific actors to retire based on idle duration.
Explicitly manages the lifecycle of RaySwordfishWorker actors.
Infrastructure Layer (Ray Autoscaler) :
Receives signals from Daft (via request_resources ).
When Daft releases actors and clears resource requests, Ray's native autoscaler detects the surplus capacity and scales down the physical nodes.
Key Components Design
SchedulerActor : Enhanced to periodically check for idle workers when the task backlog is empty.
RayWorkerManager :
retire_idle_ray_workers : Core logic to identify idle workers > threshold and shut them down.
pending_release_blacklist : A mechanism to track workers currently being shut down to prevent the scheduler from trying to assign tasks to them or Ray from "resurrecting" them immediately.
try_autoscale : Updated to support sending empty bundles to clear resource constraints.
RaySwordfishWorker : Enhanced with an ActorState machine ( Starting -> Ready -> Busy -> Idle -> Releasing ) to track precise idle duration.
4. Implementation Roadmap
We propose breaking this feature into three phases. Phase 1 is currently implemented in PR #5903.
Goal : Enable the ability to scale down safely without crashing tasks or causing errors.
Idle Detection : Implement ActorState in RaySwordfishWorker to track last_task_finished_at .
Graceful Retirement :
In RayWorkerManager , implement retire_idle_ray_workers to select workers that have been idle for DAFT_AUTOSCALING_DOWNSCALE_IDLE_SECONDS .
Call worker.release() (shutdown) via Python API.
Constraint Release :
Call ray.autoscaler.sdk.request_resources([]) (via clear_autoscaling_requests ) after releasing workers to remove the mandatory resource lock, allowing Ray to terminate the physical nodes.
Safety Mechanisms :
Blacklist : Introduce pending_release_blacklist to prevent "Worker Died" errors (where the scheduler sees a dying worker as a failure) and to ignore these workers during state refreshes.
Lock Optimization : Ensure no GIL-holding Python operations are performed while holding the Rust RayWorkerManager lock to prevent scheduler stalls.
Cooldown : Implement a global cool-down period between scale-down operations to ensure stability.
Status : Implemented in PR feat(ray): Implement dynamic scale-in for RaySwordfishActor #5903 . This PR establishes the foundational infrastructure for dynamic scaling.
Phase 2: Load-Based Scaling Policies
Goal : Optimize when to scale based on more complex metrics.
Queue-based Triggers : instead of just "backlog > 0", scale up based on backlog_size / active_workers ratio.
Resource Utilization : Integrate with Ray's metrics to make decisions based on actual CPU/Memory pressure.
Pre-emptive Scaling : Smooth out the scaling curve to avoid latency spikes when new bursts of tasks arrive.
Phase 3: Advanced Optimization
Goal : Fine-tuning for high-throughput production environments.
Heterogeneous Scaling : Support scaling different worker groups (e.g., GPU vs CPU workers) independently.
Predictive Scaling : Use historical usage patterns to pre-warm workers.
The core logic introduced in Phase 1 solves the "Worker Died" and "Constraint" issues as follows:
// Pseudocode logic in RayWorkerManager
fn retire_idle_ray_workers(&self) {
// 1. Lock state
let state = self.lock();
// 2. Check Cooldown
if in_cooldown() { return; }
// 3. Select Candidates
let workers_to_retire = state.workers.filter(|w| w.idle_time > threshold);
// 4. Update State (Critical for preventing race conditions)
for w in workers_to_retire {
state.workers.remove(w.id);
state.blacklist.add(w.id); // Mark as "dying" so we don't try to use it
}
// 5. Release Lock (Crucial for performance)
drop(state);
// 6. Perform Side Effects (Python/Ray calls)
for w in workers_to_retire {
w.shutdown(); // Remote call to Actor
}
// 7. Unblock Ray Autoscaler
ray.request_resources([]);
}
This design ensures that Daft acts as the authoritative source of truth for Worker lifecycles, safely handing off the physical resource management to Ray once the application-level actors are gracefully terminated.
Call for Feedback
We welcome feedback on this roadmap. Specifically, we invite reviewers to look at PR #5903 which implements Phase 1, enabling the critical "Scale-in" capability that is currently missing.
reacted with thumbs up emoji reacted with thumbs down emoji reacted with laugh emoji reacted with hooray emoji reacted with confused emoji reacted with heart emoji reacted with rocket emoji reacted with eyes emoji
Uh oh!
There was an error while loading. Please reload this page.
Uh oh!
There was an error while loading. Please reload this page.
-
1. Problem Statement
In Daft's current Ray Runner (Flotilla mode) , we have identified several limitations regarding elastic resource scaling:
2. Goals
We propose a comprehensive Dynamic Resource Scaling solution for Daft on Ray to achieve:
3. Proposed Architecture
We propose a Two-Layer Scaling Control Loop architecture:
Key Components Design
4. Implementation Roadmap
We propose breaking this feature into three phases. Phase 1 is currently implemented in PR #5903.
Phase 1: Basic Worker Scale-in & Graceful Shutdown (Current PR)
Goal : Enable the ability to scale down safely without crashing tasks or causing errors.
Status : Implemented in PR feat(ray): Implement dynamic scale-in for RaySwordfishActor #5903 . This PR establishes the foundational infrastructure for dynamic scaling.
Phase 2: Load-Based Scaling Policies
Goal : Optimize when to scale based on more complex metrics.
Phase 3: Advanced Optimization
Goal : Fine-tuning for high-throughput production environments.
5. Detailed Design of Phase 1 (PR #5903)
The core logic introduced in Phase 1 solves the "Worker Died" and "Constraint" issues as follows:
This design ensures that Daft acts as the authoritative source of truth for Worker lifecycles, safely handing off the physical resource management to Ray once the application-level actors are gracefully terminated.
Call for Feedback
We welcome feedback on this roadmap. Specifically, we invite reviewers to look at PR #5903 which implements Phase 1, enabling the critical "Scale-in" capability that is currently missing.
Beta Was this translation helpful? Give feedback.
All reactions