Skip to content

[Data] Move gauges out of streaming executor#60275

Closed
limarkdcunha wants to merge 0 commit intoray-project:masterfrom
limarkdcunha:master
Closed

[Data] Move gauges out of streaming executor#60275
limarkdcunha wants to merge 0 commit intoray-project:masterfrom
limarkdcunha:master

Conversation

@limarkdcunha
Copy link
Contributor

@limarkdcunha limarkdcunha commented Jan 18, 2026

This PR moves out responsibility of tracking Prometheus metrics for resource budgets into a new class ResourceAllocatorPrometheusCallback. This is in accordance with single responsibility principle.
Closes #60269

@limarkdcunha
Copy link
Contributor Author

@bveeramani please review this draft pull, if the overall structure is good I will take care of adding the tests. Thanks

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request effectively refactors the Prometheus metrics tracking for resource budgets out of StreamingExecutor and into a new ResourceAllocatorPrometheusCallback class. This is a good change that improves code organization and adheres to the single responsibility principle. The implementation in the new callback class is a correct translation of the original logic.

I have a couple of suggestions for the new ResourceAllocatorPrometheusCallback class to improve its design and consistency. One is about improving encapsulation by not accessing private members of StreamingExecutor directly. The other is a minor consistency improvement in how arguments are passed to function calls.

Comment on lines 50 to 61
topology = executor._topology
resource_manager = executor._resource_manager
dataset_id = executor._dataset_id

if topology is None or resource_manager is None:
return

for i, op in enumerate(topology):
tags = {
"dataset": dataset_id,
"operator": executor._get_operator_id(op, i),
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This callback accesses several private members of StreamingExecutor (_topology, _resource_manager, _dataset_id, and _get_operator_id). This breaks encapsulation and makes the callback tightly coupled to the implementation of StreamingExecutor.

To improve this, consider adding public properties or methods to StreamingExecutor to expose the necessary information. For example:

In StreamingExecutor:

@property
def topology(self):
    return self._topology

# ... and so on for other members

def get_operator_id(self, op: PhysicalOperator, topology_index: int) -> str:
    # This can be a public method that wraps the private one.
    return self._get_operator_id(op, topology_index)

This would make the callback system more robust and maintainable.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is valid, but I think it's okay if we address in a follow-up to keep this PR as just moving code around

Copy link
Member

@bveeramani bveeramani left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM except for docstring nits.

Comment on lines 14 to 17
"""
Callback that monitors the StreamingExecutor and updates Prometheus
metrics related to resource allocation (CPU/GPU budgets, memory, etc.).
"""
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Ray follows Google docstring convention. For consistency with convention, could you write this as a one-line summary on the same line as the triple-quotes and add additional description below?

Suggested change
"""
Callback that monitors the StreamingExecutor and updates Prometheus
metrics related to resource allocation (CPU/GPU budgets, memory, etc.).
"""
"""Single-line summary on same line as triple-quotes.
More description.
"""

Comment on lines 47 to 49
"""
Called by the executor after every scheduling loop step.
"""
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"""
Called by the executor after every scheduling loop step.
"""
"""Called by the executor after every scheduling loop step."""

Comment on lines 50 to 61
topology = executor._topology
resource_manager = executor._resource_manager
dataset_id = executor._dataset_id

if topology is None or resource_manager is None:
return

for i, op in enumerate(topology):
tags = {
"dataset": dataset_id,
"operator": executor._get_operator_id(op, i),
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is valid, but I think it's okay if we address in a follow-up to keep this PR as just moving code around

Comment on lines 124 to 125
self._callbacks = [ResourceAllocatorPrometheusCallback()]
self._callbacks.extend(get_execution_callbacks(self._data_context))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code special-cases ResourceAllocatorPrometheusCallback, and I think special cases will make the code harder to read.

In a follow-up PR, I think we should unify these.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey this is already tech debt right, so I would like to avoid even moving some part of this to sometime later, lets fix this in this PR itself, I will spend some more time on it if that works with you?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the refactor will be a non-trivial change. To keep the diff for this PR small and easily reviewable, I think I'd prefer to land this as-is but address the refactor shortly in a follow-up. Just want to avoid a situation where we have a large PR with many different changes

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here's an Issue to describe the tech debt issue -- do you want to take a stab it? #60279

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure that works I will add a test for this new class and minor changes per the prev comments and will take up the issue, you can assign it to me Thanks

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sweet. Would you mind commenting on the Issue so I can assign it to you?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Btw, lemme know when docstring nits have been addressed on this PR, and I'll merge it

Copy link
Contributor Author

@limarkdcunha limarkdcunha Jan 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bveeramani made the changes as discussed, let me know if something else is needed in this PR or this is good to be merged from my side, I will start taking a look at the other issue. If possible please assign it to me. Thanks

@limarkdcunha limarkdcunha marked this pull request as ready for review January 19, 2026 19:54
@limarkdcunha limarkdcunha requested a review from a team as a code owner January 19, 2026 19:54
cursor[bot]

This comment was marked as outdated.

@ray-gardener ray-gardener bot added data Ray Data-related issues observability Issues related to the Ray Dashboard, Logging, Metrics, Tracing, and/or Profiling community-contribution Contributed by the community labels Jan 20, 2026
Comment on lines 253 to 257

# Trigger callbacks to ensure resource gauges receive a final update.
for callback in self._callbacks:
callback.on_execution_step(self)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we call after_execution_succeeds and after_execution_fails later during shutdown -- any reason we can't use those to ensure the gauges receive a final update? Seems unintuitive to call on_execution_step hooks during shutdown

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah that makes much more sense now that I think about it Thanks will make the changes and let you know

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bveeramani I have updated it by wrapping on_execution_step inside after_execution_succeeds and after_execution_fails so from consumer class perspective they look different as per needed. Let me know if that makes sense or else I can look for another approach. Thanks

Copy link
Member

@bveeramani bveeramani left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@bveeramani bveeramani enabled auto-merge (squash) January 20, 2026 17:52
@github-actions github-actions bot added the go add ONLY when ready to merge, run all tests label Jan 20, 2026
@github-actions github-actions bot disabled auto-merge January 20, 2026 17:53
cursor[bot]

This comment was marked as outdated.

Copy link

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

@limarkdcunha
Copy link
Contributor Author

I closed this for now, just as discussed will pickup this after #60279 is closed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

community-contribution Contributed by the community data Ray Data-related issues go add ONLY when ready to merge, run all tests observability Issues related to the Ray Dashboard, Logging, Metrics, Tracing, and/or Profiling

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Data] Move gauges out of streaming executor

2 participants