Skip to content

Support starting arbitrary cells given id#1071

Open
fzyzcjy wants to merge 10 commits intorollout_ft/31from
rollout_ft/32
Open

Support starting arbitrary cells given id#1071
fzyzcjy wants to merge 10 commits intorollout_ft/31from
rollout_ft/32

Conversation

@fzyzcjy
Copy link
Copy Markdown
Collaborator

@fzyzcjy fzyzcjy commented May 5, 2026

No description provided.

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request implements the start_cell method in the RolloutManager to facilitate cell recovery. The review feedback suggests correcting the import path for PortCursors to include the proper package prefix and wrapping the recovery logic with health monitoring pause/resume calls within a try...finally block to ensure system stability and state consistency during the operation.

I am having trouble creating individual review comments. Click here to see my feedback.

miles/ray/rollout/rollout_manager.py (31)

high

The import path for PortCursors is incorrect. Based on the project structure and existing imports in related files (like rollout_server.py), it should be imported from miles.ray.rollout.addr_allocator instead of ray.rollout.addr_allocator.

from miles.ray.rollout.addr_allocator import PortCursors

miles/ray/rollout/rollout_manager.py (242-246)

medium

It is recommended to pause health monitoring before starting or recovering engines to prevent inconsistent state detection. Additionally, to prevent the health monitor from remaining paused in case of an error during recovery, use a try...finally block to ensure the monitoring is resumed, as per the repository's resource management guidelines.

    async def start_cell(self, cell_id: int):
        self._health_monitoring_pause()
        try:
            port_cursors = PortCursors.empty()
            idx = get_cell_indexer_of_id_map(self.servers)[cell_id]
            group = self.servers[idx.srv_key].server_groups[idx.group_index]
            await group.recover(port_cursors=port_cursors, filter_indices=idx.engine_indices)
        finally:
            self._health_monitoring_resume()
References
  1. To prevent resource leaks or inconsistent states, use constructs like try...finally or a with statement to ensure cleanup or state restoration logic is always executed, even in the case of exceptions.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant