[Data] Cleaning up ResourceManager and ReservationOpResourceAllocator#60273
[Data] Cleaning up ResourceManager and ReservationOpResourceAllocator#60273alexeykudinkin merged 24 commits intomasterfrom
ResourceManager and ReservationOpResourceAllocator#60273Conversation
aa9553e to
1fd6e80
Compare
There was a problem hiding this comment.
Code Review
This pull request provides a solid cleanup of the ResourceManager and related components. The changes clarify the handling of materializing operators, refactor memory usage accounting, and centralize logic for completed operators. The removal of object store memory estimation when no samples are available simplifies the code. The test updates are also comprehensive and improve test quality by using more realistic mocks.
I have a couple of suggestions to improve code clarity by avoiding shadowed variables in list/generator comprehensions.
1fd6e80 to
2b1bca1
Compare
98d7bf0 to
88d9709
Compare
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Revisiting selection of eligible ops Signed-off-by: Alexey Kudinkin <ak@anyscale.com> # Conflicts: # python/ray/data/_internal/execution/resource_manager.py Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
…bytes` Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Updated usages Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
d060cf5 to
f8bae90
Compare
ResourceManager and ReservationOpResourceAllocatorResourceManager and ReservationOpResourceAllocator
| logger.warning(msg) | ||
|
|
||
| def __init__(self, topology: "Topology"): | ||
| self._topology = topology |
There was a problem hiding this comment.
Missing rate limiting update in idle detection
Medium Severity
The detect_idle method no longer updates last_detection_time[op] when detecting idle state. Previously, this timestamp was updated in both branches (active and idle). Without this update, after the first idle detection, every subsequent call will pass the interval check because last_detection_time remains stale, breaking the rate-limiting behavior. This causes print_warning_if_idle_for_too_long to be called repeatedly and _should_unblock_streaming_output_backpressure to always return True for idle operators.
There was a problem hiding this comment.
This is incorrect. I've added a test covering expected behavior for IdleDetector
…tor` (ray-project#60273) ## Description Cleaning up `ResourceManager` - Cleaning up methods duplication - Fixing `_should_unblock_streaming_output_backpressure` semantic - Abstracting common `_is_blocking_materializing_op` util to determine if operation is a blocking materializing op Cleaning up `ReservationOpResourceAllocator` - Adjusting `can_submit_new_task` to check for available Object Store when launching tasks ## Related issues > Link related issues: "Fixes ray-project#1234", "Closes ray-project#1234", or "Related to ray-project#1234". ## Additional information > Optional: Add implementation details, API changes, usage examples, screenshots, etc. --------- Signed-off-by: Alexey Kudinkin <ak@anyscale.com> Signed-off-by: 400Ping <jiekaichang@apache.org>
…tor` (ray-project#60273) ## Description Cleaning up `ResourceManager` - Cleaning up methods duplication - Fixing `_should_unblock_streaming_output_backpressure` semantic - Abstracting common `_is_blocking_materializing_op` util to determine if operation is a blocking materializing op Cleaning up `ReservationOpResourceAllocator` - Adjusting `can_submit_new_task` to check for available Object Store when launching tasks ## Related issues > Link related issues: "Fixes ray-project#1234", "Closes ray-project#1234", or "Related to ray-project#1234". ## Additional information > Optional: Add implementation details, API changes, usage examples, screenshots, etc. --------- Signed-off-by: Alexey Kudinkin <ak@anyscale.com> Signed-off-by: Sirui Huang <ray.huang@anyscale.com>


Description
Cleaning up
ResourceManager_should_unblock_streaming_output_backpressuresemantic_is_blocking_materializing_oputil to determine if operation is a blocking materializing opCleaning up
ReservationOpResourceAllocatorcan_submit_new_taskto check for available Object Store when launching tasksRelated issues
Additional information