-
Notifications
You must be signed in to change notification settings - Fork 822
feat: add GlobalPlanner component for centralized scaling #5702
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Signed-off-by: Daiyaan <darfeen@nvidia.com>
…sion Signed-off-by: Daiyaan <darfeen@nvidia.com>
8ee75fc to
bb98036
Compare
Signed-off-by: Daiyaan <darfeen@nvidia.com>
Signed-off-by: Daiyaan <darfeen@nvidia.com>
Signed-off-by: Daiyaan <darfeen@nvidia.com>
Signed-off-by: Daiyaan <darfeen@nvidia.com>
WalkthroughThis pull request introduces a new GlobalPlanner component as a centralized scaling service for Kubernetes-based distributed systems. The SLA Planner can now operate in delegating mode, delegating scaling decisions to the remote GlobalPlanner service instead of managing replicas locally. Local mode preserves existing behavior. Changes
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Poem
🚥 Pre-merge checks | ✅ 1 | ❌ 2❌ Failed checks (1 warning, 1 inconclusive)
✅ Passed checks (1 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 6
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
components/src/dynamo/planner/kubernetes_connector.py (1)
285-285: Bug: Positional argument mismatch in CLI instantiation.The
KubernetesConnectorconstructor signature is(dynamo_namespace, model_name=None, k8s_namespace=None, parent_dgd_name=None), butargs.k8s_namespaceis being passed as the second positional argument, which maps tomodel_nameinstead ofk8s_namespace.🐛 Proposed fix
- connector = KubernetesConnector(args.dynamo_namespace, args.k8s_namespace) + connector = KubernetesConnector( + dynamo_namespace=args.dynamo_namespace, + k8s_namespace=args.k8s_namespace, + )
🤖 Fix all issues with AI agents
In `@components/src/dynamo/global_planner/scale_handler.py`:
- Around line 97-104: The target_replicas comprehension assigns
r.sub_component_type (a str) directly to TargetReplica.sub_component_type which
expects a SubComponentType enum; update the comprehension in scale_handler.py so
you convert the string to the enum (use SubComponentType(r.sub_component_type)
or SubComponentType[r.sub_component_type] depending on how the enum is defined)
when constructing each TargetReplica, and add a small try/except to handle
invalid values (log or raise a clear error) so malformed
TargetReplicaRequest.sub_component_type values are surfaced.
- Around line 113-115: The call to the synchronous method get_graph_deployment
is incorrectly awaited; change the statement that assigns deployment from await
connector.kube_api.get_graph_deployment(connector.parent_dgd_name) to a direct
call without await so deployment =
connector.kube_api.get_graph_deployment(connector.parent_dgd_name), ensuring any
surrounding async function does not expect an awaitable from
get_graph_deployment.
In `@components/src/dynamo/planner/remote_planner_client.py`:
- Line 1: Add the standard SPDX copyright header at the top of the module to
satisfy CI: insert the SPDX short identifier and copyright line before the
module docstring in remote_planner_client.py (i.e., place the header above the
existing top-level string literal that currently reads "Client for calling
remote planner's scale_request endpoint."). Ensure the header uses the project's
standard format (SPDX-License-Identifier and copyright holder/year) so the file
starts with that header followed by the existing docstring.
In `@components/src/dynamo/planner/scale_protocol.py`:
- Line 1: Add the standard SPDX license header as the very first lines of
scale_protocol.py (above the module docstring) so the CI recognizes the file’s
license; specifically insert the project's standard SPDX identifier (for example
"SPDX-License-Identifier: Apache-2.0") at the top of the file, preserving the
existing module docstring and rest of the code in scale_protocol.py.
In `@components/src/dynamo/planner/utils/planner_core.py`:
- Around line 674-679: The current flow may reference next_num_p and next_num_d
when they are undefined if _compute_replica_requirements fails or next_num_req
is None; fix by only performing the scaling block when replica computation
succeeded (i.e., after _compute_replica_requirements returns a non-None
next_num_req) or by explicitly initializing/returning early—move the code that
checks args.no_operation and calls _delegate_scaling / _execute_local_scaling
(referencing planner_mode, _delegate_scaling, _execute_local_scaling) inside the
conditional where next_num_p and next_num_d are set (after
_compute_replica_requirements) or add a guard like “if next_num_req is None:
return” before using next_num_p/next_num_d.
In `@tests/global_planner/unit/test_scale_request_handler.py`:
- Around line 55-64: The call site in scale_handler.py incorrectly awaits the
synchronous method get_graph_deployment (defined as def get_graph_deployment in
kube.py); remove the await so the call is a normal synchronous call (e.g.,
result = kube_api.get_graph_deployment(...)) and adjust any surrounding code
accordingly, and update the unit test mock in
tests/global_planner/unit/test_scale_request_handler.py to use Mock() instead of
AsyncMock() for mock_connector.kube_api.get_graph_deployment so the test no
longer masks the runtime TypeError.
🧹 Nitpick comments (14)
components/src/dynamo/global_planner/argparse_config.py (1)
68-72: Dead code: validation condition is unreachable.The condition
args.managed_namespaces and len(args.managed_namespaces) == 0can never beTrue. Whennargs="+"is used (line 44), argparse requires at least one value if the flag is provided—it will raise an error beforevalidate_argsis called. If the flag is omitted,managed_namespacesisNone(falsy), so theandshort-circuits.You can safely remove this validation or simplify it.
🧹 Suggested simplification
def validate_args(args): """Validate GlobalPlanner arguments. Args: args: Parsed arguments from argparse Raises: ValueError: If arguments are invalid """ - # managed_namespaces is optional - if not specified, accept all - if args.managed_namespaces and len(args.managed_namespaces) == 0: - raise ValueError( - "--managed-namespaces must have at least one namespace if specified" - ) + # managed_namespaces is optional - if not specified, accept all + # Note: nargs="+" already ensures at least one value when flag is provided + passcomponents/src/dynamo/planner/scale_protocol.py (2)
7-9: Remove unusedTYPE_CHECKINGblock.The
TYPE_CHECKINGimport and emptyifblock serve no purpose currently. Either remove it or add the intended type-only imports.🧹 Proposed fix
-from typing import TYPE_CHECKING, List, Optional +from typing import List, Optional from pydantic import BaseModel - -# Import SubComponentType only for type checking to avoid runtime dependency -if TYPE_CHECKING: - pass
41-46: Consider usingLiteralfor thestatusfield.Using
Literal["success", "error", "scaling"]instead ofstrwould provide better type safety and documentation of valid values.🧹 Proposed enhancement
+from typing import List, Literal, Optional + class ScaleResponse(BaseModel): """Response from scaling operation""" - status: str # "success", "error", "scaling" + status: Literal["success", "error", "scaling"] message: str current_replicas: dict # {"prefill": 3, "decode": 5}components/src/dynamo/planner/remote_planner_client.py (2)
52-56: Clarify the intent of consuming only the first response.The
async forwith immediatebreakworks but is non-obvious. A brief comment explaining why only the first response is consumed (e.g., round-robin returns one response per instance) would improve readability.🧹 Suggested clarification
- response_data = None - async for response in await self._client.round_robin(request_json): - # Take first response - response_data = response - break + response_data = None + async for response in await self._client.round_robin(request_json): + # round_robin selects one instance; consume its single response + response_data = response + break
33-34: Wrapwait_for_instances()call with a timeout to prevent indefinite blocking.The method signature does not support a timeout parameter. Since similar initialization waits in the codebase use
asyncio.wait_for()(e.g., in sglang/main.py), consider wrapping this call with an appropriate timeout:await asyncio.wait_for(self._client.wait_for_instances(), timeout=30.0)This prevents the initialization from hanging indefinitely if the GlobalPlanner service is unavailable.
tests/planner/unit/test_planner_argparse.py (1)
59-65: Consider adding a positive validation test.You test that validation fails when namespace is missing, but there's no test confirming
validate_planner_argssucceeds (returns without error) when all required arguments are provided in delegating mode.🧪 Suggested additional test
def test_validate_delegating_mode_with_namespace(): """Test validation passes for delegating mode with GlobalPlanner namespace.""" parser = create_sla_planner_parser() args = parser.parse_args([ "--namespace", "test-ns", "--planner-mode", "delegating", "--global-planner-namespace", "global-ns", ]) # Should not raise validate_planner_args(args)tests/planner/unit/test_remote_planner.py (3)
42-49: Consider prefixing unused parameter with underscore.The
request_jsonparameter is unused in the mock function. While this is intentional (matching the expected signature), prefixing with_would silence the linter and clarify intent.♻️ Suggested change
- async def mock_round_robin(request_json): + async def mock_round_robin(_request_json): yield { "status": "success", "message": "Scaled successfully", "current_replicas": {"prefill": 3, "decode": 5}, }
57-57: Prefix unused variable with underscore.The
mock_clientvariable is unpacked but never used. Prefix with_to indicate it's intentionally unused.♻️ Suggested change
- runtime, mock_client = mock_runtime + runtime, _mock_client = mock_runtime
146-148: Consider clearer empty async generator pattern.The
returnbeforeyieldpattern works but is confusing. A cleaner approach would be more explicit.♻️ Suggested alternatives
- async def mock_round_robin_empty(request_json): - return - yield # Make it a generator but never yield anything + async def mock_round_robin_empty(_request_json): + # Empty async generator - never yields + if False: + yieldOr use an async generator expression:
client_mock.round_robin = AsyncMock(return_value=(__x async for __x in ()))components/src/dynamo/global_planner/__main__.py (1)
94-101: Prefix unusedrequestparameter with underscore.The
requestparameter is unused in the health check endpoint. This is expected for a health check, but prefixing with_clarifies intent and silences the linter.♻️ Suggested change
- async def health_check(request: HealthCheckRequest): + async def health_check(_request: HealthCheckRequest): """Health check endpoint for monitoring""" yield { "status": "healthy",components/src/dynamo/planner/utils/planner_core.py (1)
564-565: Consider failing fast instead of using "unknown" fallback.If
DYN_PARENT_DGD_K8S_NAMEis not set, the request will be sent withgraph_deployment_name="unknown", which will likely fail at the GlobalPlanner when it tries to interact with Kubernetes. Failing early with a clear error message might be more helpful for debugging.♻️ Suggested change
+ graph_deployment_name = os.environ.get("DYN_PARENT_DGD_K8S_NAME") + if not graph_deployment_name: + logger.error("DYN_PARENT_DGD_K8S_NAME environment variable not set") + return + request = ScaleRequest( caller_namespace=self.namespace, - graph_deployment_name=os.environ.get("DYN_PARENT_DGD_K8S_NAME", "unknown"), + graph_deployment_name=graph_deployment_name, k8s_namespace=os.environ.get("POD_NAMESPACE", "default"),components/src/dynamo/global_planner/scale_handler.py (3)
116-119: Rename unused loop variable.
service_nameis not used in the loop body. Rename to_service_nameto indicate it's intentionally unused.♻️ Suggested change
- for service_name, service_spec in deployment["spec"]["services"].items(): + for _service_name, service_spec in deployment["spec"]["services"].items(): sub_type = service_spec.get("subComponentType", "") if sub_type: current_replicas[sub_type] = service_spec.get("replicas", 0)
130-131: Remove redundant exception object fromlogging.exception.
logging.exceptionautomatically includes the exception information. Including{e}in the message is redundant.♻️ Suggested change
except Exception as e: - logger.exception(f"Error processing scale request: {e}") + logger.exception("Error processing scale request") yield {"status": "error", "message": str(e), "current_replicas": {}}
42-42: Consider adding cache eviction or bounds for long-running deployments.The
connectorscache grows unbounded as new DGDs are encountered. For long-running GlobalPlanner instances managing many transient DGDs, this could lead to memory growth. Consider adding an LRU cache or periodic cleanup of stale connectors.
| if not self.args.no_operation: | ||
| target_replicas = [ | ||
| TargetReplica( | ||
| sub_component_type=SubComponentType.PREFILL, | ||
| component_name=self.prefill_component_name, | ||
| desired_replicas=next_num_p, | ||
| ), | ||
| TargetReplica( | ||
| sub_component_type=SubComponentType.DECODE, | ||
| component_name=self.decode_component_name, | ||
| desired_replicas=next_num_d, | ||
| ), | ||
| ] | ||
| await self.connector.set_component_replicas(target_replicas, blocking=False) | ||
| # Execute scaling based on mode | ||
| if self.planner_mode == "delegating": | ||
| await self._delegate_scaling(next_num_p, next_num_d) | ||
| else: | ||
| await self._execute_local_scaling(next_num_p, next_num_d) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Potential UnboundLocalError if replica computation fails.
If _compute_replica_requirements raises an exception (caught at lines 670-672), the code returns early. However, if the exception handling path changes or if next_num_req is None (causing early return at implicit flow), next_num_p and next_num_d could be undefined when reaching lines 677-679.
Looking at the current flow: if next_num_req is None, the code doesn't enter the if block at line 647, and no_operation check at 674 would execute with undefined variables.
🐛 Proposed fix - move scaling inside the conditional
try:
next_num_p, next_num_d = self._compute_replica_requirements(
next_num_req, next_isl, next_osl
)
# Update predicted replica metrics in Prometheus
if self.prometheus_port != 0:
self.prometheus_metrics.predicted_num_p.set(next_num_p)
self.prometheus_metrics.predicted_num_d.set(next_num_d)
+
+ if not self.args.no_operation:
+ # Execute scaling based on mode
+ if self.planner_mode == "delegating":
+ await self._delegate_scaling(next_num_p, next_num_d)
+ else:
+ await self._execute_local_scaling(next_num_p, next_num_d)
except Exception as e:
logger.error(f"Failed to compute number of replicas: {e}")
return
-
- if not self.args.no_operation:
- # Execute scaling based on mode
- if self.planner_mode == "delegating":
- await self._delegate_scaling(next_num_p, next_num_d)
- else:
- await self._execute_local_scaling(next_num_p, next_num_d)🤖 Prompt for AI Agents
In `@components/src/dynamo/planner/utils/planner_core.py` around lines 674 - 679,
The current flow may reference next_num_p and next_num_d when they are undefined
if _compute_replica_requirements fails or next_num_req is None; fix by only
performing the scaling block when replica computation succeeded (i.e., after
_compute_replica_requirements returns a non-None next_num_req) or by explicitly
initializing/returning early—move the code that checks args.no_operation and
calls _delegate_scaling / _execute_local_scaling (referencing planner_mode,
_delegate_scaling, _execute_local_scaling) inside the conditional where
next_num_p and next_num_d are set (after _compute_replica_requirements) or add a
guard like “if next_num_req is None: return” before using next_num_p/next_num_d.
…thod Signed-off-by: Daiyaan <darfeen@nvidia.com>
Implement a two-component architecture for delegated scaling:
Architecture changes:
GlobalPlanner features:
Planner changes:
Tests:
Overview:
Details:
Where should the reviewer start?
Related Issues: (use one of the action keywords Closes / Fixes / Resolves / Relates to)
Summary by CodeRabbit
Release Notes
New Features
Tests
✏️ Tip: You can customize this high-level summary in your review settings.