WIP: feat: add downscale support via idle worker retirement in flotilla mode#5516
WIP: feat: add downscale support via idle worker retirement in flotilla mode#5516huleilei wants to merge 9 commits into
Conversation
There was a problem hiding this comment.
Greptile Overview
Greptile Summary
This PR implements bidirectional autoscaling for Daft's flotilla mode by adding downscale support via idle worker retirement. Previously, the system could only scale up; now it can also scale down when utilization is low.
Key changes:
-
Scheduler actor (scheduler_actor.rs): Added ratio-based downscale logic that monitors backlog/capacity ratio and retires idle workers when ratio stays below 0.75 for 10+ ticks. Includes bootstrap expansion for zero-capacity clusters and global idle detection for full cleanup.
-
Worker manager (worker_manager.rs): Implemented
retire_idle_workers()andrelease_idle_actors()methods with idle duration tracking (default 120s threshold). Scale-up maintains monotonic upper bound to prevent Ray autoscaler conflicts. -
Worker tracking (worker.rs): Added
last_task_finished_at,is_idle(), andidle_duration()to track worker idle state for safe retirement decisions. -
Python integration (flotilla.py): Added actor lifecycle management functions:
clear_autoscaling_requests(),sweep_force_release_swordfish_actors(), andforce_release_swordfish_actor()to properly clean up Ray actors. -
Configuration: 6 new environment variables for tuning downscale behavior (thresholds, stability windows, limits).
How it works:
- Scale-up continues via existing ratio-based autoscaling (ratio > 1.25)
- Ratio-based downscale: When backlog/capacity < 0.75 for 10 ticks, retire up to 10% of idle workers
- Global idle: When backlog=0 and no inflight tasks, release all idle workers
- Finalize: Clear all idle actors and reset Ray demand on job completion
The implementation includes comprehensive tests covering bootstrap expansion, ratio-based scaling, and downscale stabilization.
Confidence Score: 3/5
- This PR introduces complex autoscaling logic with potential race conditions between downscale paths
- Score reflects two critical logical issues: (1) potential race between ratio-based and global idle downscale branches that could attempt to release workers twice in the same tick, and (2) stale worker snapshot state after ratio-based downscale could cause incorrect global idle detection. The core implementation is sound with good test coverage, but these edge cases need resolution before merge.
- Pay close attention to
scheduler_actor.rslines 290-370 where the downscale logic has potential race conditions between the ratio-based and global idle branches
Important Files Changed
File Analysis
| Filename | Score | Overview |
|---|---|---|
| src/daft-distributed/src/scheduling/scheduler/scheduler_actor.rs | 3/5 | Added downscale logic with environment-based configuration, stability windows, and worker retirement. Includes bootstrap expansion debounce and global idle detection. Logic appears sound but there are potential race conditions between ratio-based and global idle branches. |
| src/daft-distributed/src/python/ray/worker_manager.rs | 4/5 | Implements idle worker retirement with configurable thresholds, monotonic upper bound for scale-up, and proper cleanup via sweep_force_release_swordfish_actors. The release_idle_actors and retire_idle_workers methods provide controlled downscaling. |
| daft/runners/flotilla.py | 4/5 | Added clear_autoscaling_requests(), list_swordfish_actors(), force_release_swordfish_actor(), and sweep_force_release_swordfish_actors() functions for actor lifecycle management. Uses deprecated ray.state.actors() with warning. Retry logic present for robustness. |
Sequence Diagram
sequenceDiagram
participant Scheduler as Scheduler Actor
participant WM as Worker Manager
participant Ray as Ray Autoscaler
participant Workers as Ray Workers
Note over Scheduler: Every tick (1s interval)
Scheduler->>WM: worker_snapshots()
WM-->>Scheduler: Current worker state
Scheduler->>Scheduler: Calculate backlog/capacity ratio
alt Bootstrap: Zero capacity + backlog > 0
Scheduler->>WM: try_autoscale(backlog_requests)
WM->>Ray: request_resources(bundles)
Ray-->>Workers: Provision new nodes
end
alt Scale-up: ratio > autoscaling_threshold
Scheduler->>WM: try_autoscale(pending_tasks)
WM->>Ray: request_resources(bundles)
Note over Scheduler: Reset downscale_ticks to 0
end
alt Ratio-based downscale: ratio < 0.75 for 10+ ticks
Scheduler->>Scheduler: Increment downscale_below_threshold_ticks
alt Stable & num_workers > min_survivors
Scheduler->>WM: release_idle_actors(num_to_retire, false)
WM->>WM: Select idle workers (idle > 120s)
WM->>Workers: shutdown() idle actors
WM->>Ray: sweep_force_release_swordfish_actors()
Note over Scheduler: Reset downscale_ticks to 0
end
end
alt Global idle: backlog=0 & no inflight tasks
Scheduler->>WM: release_idle_actors(all, true)
WM->>Workers: shutdown() all idle actors
Scheduler->>WM: try_autoscale([])
WM->>Ray: request_resources([]) to clear demand
Note over Scheduler: Reset downscale_ticks to 0
end
Note over Scheduler: Job completion
Scheduler->>WM: release_idle_actors(all, true)
Scheduler->>WM: try_autoscale([])
WM->>Ray: Clear resource requests
9 files reviewed, 4 comments
There was a problem hiding this comment.
Greptile Overview
Greptile Summary
This PR fixes a critical iterator invalidation bug in the worker retirement logic introduced in a previous commit. The original code was removing workers from a HashMap while iterating over the same collection, which could cause undefined behavior.
Key Changes:
- Refactored
internal_release_idle_workersto collect workers into a separateVecbefore removal - Workers are now removed from the HashMap after the iteration is complete, eliminating the race condition
- The fix ensures that the
selectediterator is consumed before any mutations tostate.ray_workers
Issue Found:
- Lines 348-349 contain Chinese comments that should be translated to English for consistency
Confidence Score: 4/5
- This PR is safe to merge - it fixes a critical iterator invalidation bug
- The fix properly addresses the iterator invalidation issue by collecting worker IDs before removal. The logic is sound and prevents the HashMap from being mutated during iteration. Minor style issue with Chinese comments doesn't affect correctness.
- No files require special attention - the fix is straightforward and correct
Important Files Changed
File Analysis
| Filename | Score | Overview |
|---|---|---|
| src/daft-distributed/src/python/ray/worker_manager.rs | 4/5 | Fixes iterator invalidation bug by collecting workers before removal; adds Chinese comments that should be in English |
Sequence Diagram
sequenceDiagram
participant Scheduler as Scheduler Actor
participant WM as RayWorkerManager
participant State as WorkerManagerState
participant Worker as RaySwordfishWorker
participant Ray as Ray/Flotilla
Note over Scheduler,Ray: Downscale Flow (idle worker retirement)
Scheduler->>WM: retire_idle_workers(max_to_retire)
WM->>State: Lock state mutex
WM->>State: Iterate workers to find idle candidates
State-->>WM: Return idle workers list
Note over WM: Collect worker IDs (no removal yet)
WM->>WM: Sort by idle duration (longest first)
WM->>WM: Take up to max_to_retire workers
Note over WM,State: Safe removal phase
loop For each selected worker
WM->>State: remove worker from HashMap
State-->>WM: Return worker object
WM->>WM: Add to workers_to_release Vec
end
Note over WM: Release phase (after iteration)
loop For each worker in workers_to_release
WM->>Worker: release(py)
Worker->>Worker: Check no active tasks
Worker->>Worker: Set state to Releasing
Worker->>Ray: shutdown()
Worker->>Worker: Set state to Released
end
WM->>Ray: clear_autoscaling_requests()
WM->>State: Get remaining worker IDs
WM->>Ray: sweep_force_release_swordfish_actors(exclude_ids)
WM-->>Scheduler: Return number released
1 file reviewed, 1 comment
481d263 to
012ee13
Compare
Greptile OverviewGreptile SummaryThis PR implements downscaling support for Daft's Ray-based autoscaling by enabling proactive idle worker retirement. The implementation adds environment-driven configuration for downscaling thresholds and survivor worker counts, tracks worker idle states with timestamps, and implements a blacklist mechanism to prevent immediate worker respawn during scale operations. Key changes:
Issues found:
Confidence Score: 4/5
Important Files ChangedFile Analysis
Sequence DiagramsequenceDiagram
participant Scheduler as SchedulerActor
participant WM as WorkerManager
participant Worker as RaySwordfishWorker
participant Ray as Ray Autoscaler
Note over Scheduler: Main Loop Iteration
Scheduler->>WM: worker_snapshots()
WM-->>Scheduler: Current worker states
Scheduler->>Scheduler: schedule_tasks()
alt Scale-up needed
Scheduler->>Scheduler: get_autoscaling_request()
Scheduler->>WM: try_autoscale(bundles)
WM->>WM: Check capacity vs demand
alt Need more resources
WM->>WM: Clear blacklist
WM->>Ray: request_resources(bundles)
Ray-->>WM: Scale-up triggered
end
else No scale-up & downscale enabled
Scheduler->>Scheduler: Count idle workers
alt Workers > min_survivor
Scheduler->>WM: retire_idle_ray_workers(num_to_retire, false)
WM->>WM: Find idle workers > threshold
loop For each selected worker
WM->>Worker: release(py)
Worker->>Worker: shutdown()
WM->>WM: Add to blacklist
end
WM->>Ray: clear_autoscaling_requests()
end
end
Note over Scheduler: Job Complete
alt Downscale enabled
Scheduler->>WM: retire_idle_ray_workers(all, true)
WM->>Worker: release() all idle
WM->>Ray: clear_autoscaling_requests()
end
|
bbeed26 to
bba4dc6
Compare
acea710 to
908c963
Compare
908c963 to
9b73170
Compare
|
Codex usage limits have been reached for code reviews. Please check with the admins of this repo to increase the limits by adding credits. |
Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com>
Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com>
This reverts commit a2ced04.
|
@stayrascal help me review. Thanks. |
|
see #5903 |
Root cause
What’s changed
Changes Made
Related Issues
Checklist
docs/mkdocs.ymlnavigation