feat(ray): Implement dynamic scale-in for RaySwordfishActor#5903
Conversation
Greptile SummaryThis PR implements dynamic scale-in for Key changes:
The implementation is well-tested with comprehensive unit tests for the retirement logic, blacklist mechanism, and integration with the scheduler. Confidence Score: 4/5
Important Files Changed
Sequence DiagramsequenceDiagram
participant S as SchedulerActor
participant WM as WorkerManager
participant W as RaySwordfishWorker
participant Ray as Ray Autoscaler
Note over S: Main scheduler loop (every 1s tick)
S->>WM: worker_snapshots()
WM-->>S: List of worker states
alt Has pending tasks requiring scale-up
S->>WM: try_autoscale(bundles)
WM->>WM: Clear pending_release_blacklist
WM->>Ray: request_resources(bundles)
else No scale-up needed AND downscale_enabled
S->>S: Count idle workers (empty active_task_details)
S->>S: Calculate num_to_retire = min(idle_count, allowed_to_retire)
alt num_to_retire > 0
S->>WM: retire_idle_ray_workers(num_to_retire, false)
WM->>Ray: get_head_node_id()
Ray-->>WM: head_node_id
WM->>WM: Filter candidates: skip head node, check idle duration
WM->>WM: Sort by longest idle duration
WM->>WM: Select top N candidates
loop For each selected worker
WM->>W: release()
W->>W: Check no active tasks
W->>W: shutdown()
W->>W: Set state to Released
WM->>WM: Add worker_id to pending_release_blacklist
end
WM->>Ray: clear_autoscaling_requests()
end
end
Note over S: On job completion (loop exit)
alt downscale_enabled
S->>WM: retire_idle_ray_workers(all_workers, true)
Note over WM: force_all_when_cluster_idle=true
WM->>Ray: clear_autoscaling_requests()
WM->>WM: Release all idle workers
end
|
There was a problem hiding this comment.
Additional Comments (3)
-
src/daft-distributed/src/python/ray/worker_manager.rs, line 287-340 (link)logic: Holding the mutex lock while calling
worker.release(py)and Python operations can cause significant lock contention. The state mutex is held from line 288 through line 340, during which Python GIL operations occur (lines 334-340). This blocks other operations likesubmit_tasks_to_workersunnecessarily.Consider releasing the lock before Python operations:
-
src/daft-distributed/src/python/ray/worker.rs, line 138-146 (link)style: The
releasemethod silently returns early if there are inflight tasks without setting state or logging. This could lead to confusion during debugging.Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!
-
src/daft-distributed/src/scheduling/scheduler/scheduler_actor.rs, line 138-144 (link)style: Environment variable parsing with defaults lacks documentation. Consider adding comments explaining these configuration options and their defaults.
Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!
8 files reviewed, 3 comments
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #5903 +/- ##
==========================================
- Coverage 72.91% 72.83% -0.09%
==========================================
Files 973 973
Lines 126166 126494 +328
==========================================
+ Hits 91995 92132 +137
- Misses 34171 34362 +191
🚀 New features to boost your workflow:
|
1dc006a to
e801b78
Compare
|
@colin-ho @universalmind303 help me review when you are convenient. Thanks |
srilman
left a comment
There was a problem hiding this comment.
Overall this makes sense to me and I think would be useful. However, I feel like it would make more sense for the worker manager to have the logic to determine what workers should be retired vs within the scheduler and then passing into the worker manager.
| // - `DAFT_AUTOSCALING_MIN_SURVIVOR_WORKERS`: The minimum number of workers to keep | ||
| // running even if they are idle. This prevents the cluster from scaling down to | ||
| // zero workers during brief idle periods. Defaults to `1`. | ||
| let downscale_enabled = std::env::var("DAFT_AUTOSCALING_DOWNSCALE_ENABLED") |
There was a problem hiding this comment.
Actually I think these configurations would be useful to have in the daft.set_runner_ray configuration on top of these environment variables
Merging this PR will not alter performance
Comparing Footnotes
|
|
This PR is also useful for us. |
srilman
left a comment
There was a problem hiding this comment.
Overall, LGTM. Had 1 small thing, but once thats addressed plus the merge conflict, we can merge
| "Downscale: retired idle workers" | ||
| ); | ||
| } | ||
| } |
There was a problem hiding this comment.
Rather than having this code here, can we move it to the WorkerManager directly? Plus, rather than a specific retire_idle_ray_workers, can we have a generic retire_idle_workers
|
@huleilei - Thanks for your patience and work on this. Once you are able to resolve the last comment and the merge conflicts, we can merge this. Thanks again. |
Adds an opt-in Ray downscaling (scale-in) mechanism by retiring idle Flotilla workers to help Ray autoscaler shrink clusters when workloads become idle. Highlights: - Retire idle Ray workers with configurable idle threshold and a min-survivor floor. - Head-node protection and a pending-release blacklist to avoid immediate respawn. - Expose configuration via `daft.set_runner_ray(...)` and environment variables. - Keep scale-up behavior aligned with upstream high-water-mark ramp-up logic. - Document autoscaling/downscaling in `docs/distributed/ray.md`. Follow-ups on top of upstream PR Eventual-Inc#5903: - Resolve merge conflict with `main`: adopt the new `(scheduled, cancelled)` tuple from `Scheduler::schedule_tasks` and the `worker_id` field on `TaskEvent::Scheduled`. - Address @srilman review: move the per-tick downscale gating from `scheduler_actor.rs` into the generic `WorkerManager::retire_idle_workers`; `RayWorkerManager` now owns the enable flag, min-survivor floor, idle thresholds, head-node protection and blacklist TTLs.
* origin/main: (115 commits) feat: add ignore_corrupt_files option to read_parquet, read_csv and read_iceberg (Eventual-Inc#6520) fix(deps): gate vllm to Linux so macOS/Windows resolve without CUDA wheels (Eventual-Inc#7095) fix: pass options in Gravitino PostgreSQL read method (Eventual-Inc#7047) feat(ray): Implement dynamic scale-in for RaySwordfishActor (Eventual-Inc#5903) feat(delta-lake): support column mapping for reads (Eventual-Inc#7005) feat(functions): add string distance/similarity functions (Eventual-Inc#7068) test(parquet): cover read_parquet edge cases (Eventual-Inc#7085) refactor(checkpoint): drop "seal" vocabulary from Rust API surface (Eventual-Inc#7078) fix(asof-join): use unknown clustering spec instead of hash (Eventual-Inc#7075) docs: standardize Slack links to use daft.ai/slack (Eventual-Inc#7066) feat: add try_cast function for safe type conversion (Eventual-Inc#6960) refactor(file): rename File byte-range fields to position/size (Eventual-Inc#6747) fix(ray): configure worker startup timeout on runner (Eventual-Inc#7055) feat(shuffle): default flight shuffle compression to lz4 (Eventual-Inc#7071) feat(iceberg): support branch and tag reads (Eventual-Inc#7042) fix(shuffle): concat recordbatches before repartition (Eventual-Inc#7064) perf: update jemalloc 5.3.0 → 5.3.1 to fix muzzy decay performance bug (Eventual-Inc#7059) feat: thread assume_sorted_and_aligned_partitions parameter through ASOF join (Eventual-Inc#7067) fix(flight-shuffle): reduce coordinator memory to O(map_tasks + partitions) (Eventual-Inc#7056) refactor(distributed): rename needs_hash_repartition to can_skip_hash_repartition (Eventual-Inc#7053) ... # Conflicts: # daft/checkpoint.py # src/daft-distributed/src/pipeline_node/limit.rs # src/daft-distributed/src/pipeline_node/stage_checkpoint_keys.rs # src/daft-distributed/src/scheduling/task.rs # src/daft-local-execution/src/pipeline.rs # src/daft-local-execution/src/sinks/blocking_sink.rs # src/daft-local-execution/src/sources/scan_task.rs
Changes Made
This commit implements the dynamic scaling down (scale-in) functionality for RaySwordfishActor to release idle resources.
retire_idle_ray_workersinRayWorkerManager. It identifies workers that have been idle for longer thanDAFT_AUTOSCALING_DOWNSCALE_IDLE_SECONDS(default: 60s) and releases them.SchedulerActorloop now periodically checks for idle workers and triggers retirement while maintaining a minimum survivor count (DAFT_AUTOSCALING_MIN_SURVIVOR_WORKERS, default: 1).pending_release_blacklistto prevent the autoscaler from immediately respawning workers that were just released.Configuration
New environment variables added:
DAFT_AUTOSCALING_DOWNSCALE_ENABLED: Enable/disable downscaling (default:true).DAFT_AUTOSCALING_DOWNSCALE_IDLE_SECONDS: Seconds a worker must be idle before retirement (default:60).DAFT_AUTOSCALING_MIN_SURVIVOR_WORKERS: Minimum number of workers to keep alive (default:1).DAFT_AUTOSCALING_PENDING_RELEASE_EXCLUDE_SECONDS: TTL for blacklisted worker IDs (default:120).Related Issues
#5683