Skip to content
27 changes: 27 additions & 0 deletions components/src/dynamo/global_planner/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0

"""
GlobalPlanner - Centralized scaling execution service.
The GlobalPlanner is a standalone component that receives scale requests from
Planners and executes them via the Kubernetes API. It provides centralized
scaling management across multiple deployments and namespaces.
Architecture:
- Planners make scaling decisions (observe, predict, decide)
- Planners in delegating mode send requests to GlobalPlanner
- GlobalPlanner executes scaling via Kubernetes API
- GlobalPlanner is stateless and can scale horizontally
Usage:
python -m dynamo.global_planner \
--namespace=global-infra \
--managed-namespaces app-ns-1 app-ns-2
"""

__all__ = [
"ScaleRequestHandler",
]

from dynamo.global_planner.scale_handler import ScaleRequestHandler
118 changes: 118 additions & 0 deletions components/src/dynamo/global_planner/__main__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0

"""
GlobalPlanner - Centralized Scaling Execution Service

Entry point for the GlobalPlanner component.

Usage:
python -m dynamo.global_planner --namespace=global-infra

With authorization:
python -m dynamo.global_planner \\
--namespace=global-infra \\
--managed-namespaces app-ns-1 app-ns-2
"""

import asyncio
import logging
import os

from pydantic import BaseModel

from dynamo.global_planner.argparse_config import (
create_global_planner_parser,
validate_args,
)
from dynamo.global_planner.scale_handler import ScaleRequestHandler
from dynamo.runtime import DistributedRuntime, dynamo_worker
from dynamo.runtime.logging import configure_dynamo_logging

configure_dynamo_logging()
logger = logging.getLogger(__name__)


class HealthCheckRequest(BaseModel):
"""Request type for health check endpoint"""

text: str = "ping"


@dynamo_worker()
async def main(runtime: DistributedRuntime, args):
"""Initialize and run GlobalPlanner.

The GlobalPlanner is a centralized scaling service that:
1. Listens for scale requests from Planners
2. Validates caller authorization (optional)
3. Executes scaling via Kubernetes API
4. Returns success/failure status

Args:
runtime: Dynamo runtime instance
args: Parsed command-line arguments
"""
# Validate arguments
validate_args(args)

logger.info("=" * 60)
logger.info("Starting GlobalPlanner")
logger.info("=" * 60)
logger.info(f"Namespace: {args.namespace}")
logger.info(f"Environment: {args.environment}")

if args.managed_namespaces:
logger.info("Authorization: ENABLED")
logger.info(f"Authorized namespaces: {args.managed_namespaces}")
else:
logger.info("Authorization: DISABLED (accepting all namespaces)")

logger.info("=" * 60)

# Create the GlobalPlanner component
component = runtime.namespace(args.namespace).component("GlobalPlanner")

# Get K8s namespace (where GlobalPlanner pod is running)
k8s_namespace = os.environ.get("POD_NAMESPACE", "default")
logger.info(f"Running in Kubernetes namespace: {k8s_namespace}")

# Create scale request handler
handler = ScaleRequestHandler(
runtime=runtime,
managed_namespaces=args.managed_namespaces,
k8s_namespace=k8s_namespace,
)

# Serve scale_request endpoint
logger.info("Serving endpoints...")
scale_endpoint = component.endpoint("scale_request")
await scale_endpoint.serve_endpoint(handler.scale_request)
logger.info(" ✓ scale_request - Receives scaling requests from Planners")

# Serve health check endpoint
async def health_check(request: HealthCheckRequest):
"""Health check endpoint for monitoring"""
yield {
"status": "healthy",
"component": "GlobalPlanner",
"namespace": args.namespace,
"managed_namespaces": args.managed_namespaces or "all",
}

health_endpoint = component.endpoint("health")
await health_endpoint.serve_endpoint(health_check)
logger.info(" ✓ health - Health check endpoint")

logger.info("=" * 60)
logger.info("GlobalPlanner is ready and waiting for scale requests")
logger.info("=" * 60)

# Keep running forever (process scale requests as they come)
await asyncio.Event().wait()


if __name__ == "__main__":
parser = create_global_planner_parser()
args = parser.parse_args()
asyncio.run(main(args))
72 changes: 72 additions & 0 deletions components/src/dynamo/global_planner/argparse_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0

"""Argument parsing for GlobalPlanner."""

import argparse


def create_global_planner_parser() -> argparse.ArgumentParser:
"""Create and configure the argument parser for GlobalPlanner.

Returns:
argparse.ArgumentParser: Configured argument parser for GlobalPlanner
"""
parser = argparse.ArgumentParser(
description="GlobalPlanner - Centralized Scaling Execution Service",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# Simple deployment (accept all namespaces)
python -m dynamo.global_planner --namespace=global-infra

# With authorization
python -m dynamo.global_planner \\
--namespace=global-infra \\
--managed-namespaces app-ns-1 app-ns-2 app-ns-3

# Custom environment
python -m dynamo.global_planner \\
--namespace=global-infra \\
--environment=kubernetes
""",
)

parser.add_argument(
"--namespace",
required=True,
help="Dynamo namespace for GlobalPlanner component",
)

parser.add_argument(
"--managed-namespaces",
type=str,
nargs="+",
default=None,
help="Optional: List of namespaces authorized to use this GlobalPlanner (default: accept all)",
)

parser.add_argument(
"--environment",
default="kubernetes",
choices=["kubernetes"],
help="Environment type (currently only kubernetes supported)",
)

return parser


def validate_args(args):
"""Validate GlobalPlanner arguments.

Args:
args: Parsed arguments from argparse

Raises:
ValueError: If arguments are invalid
"""
# managed_namespaces is optional - if not specified, accept all
if args.managed_namespaces and len(args.managed_namespaces) == 0:
raise ValueError(
"--managed-namespaces must have at least one namespace if specified"
)
132 changes: 132 additions & 0 deletions components/src/dynamo/global_planner/scale_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0

"""Handler for scale_request endpoint in GlobalPlanner."""

import logging

from dynamo.planner import KubernetesConnector, TargetReplica
from dynamo.planner.scale_protocol import ScaleRequest, ScaleResponse
from dynamo.runtime import DistributedRuntime, dynamo_endpoint

logger = logging.getLogger(__name__)


class ScaleRequestHandler:
"""Handles incoming scale requests in GlobalPlanner.
This handler:
1. Receives scale requests from Planners
2. Validates caller authorization (optional)
3. Caches KubernetesConnector per DGD for efficiency
4. Executes scaling via Kubernetes API
5. Returns current replica counts
"""

def __init__(
self, runtime: DistributedRuntime, managed_namespaces: list, k8s_namespace: str
):
"""Initialize the scale request handler.
Args:
runtime: Dynamo runtime instance
managed_namespaces: List of authorized namespaces (None = accept all)
k8s_namespace: Kubernetes namespace where GlobalPlanner is running
"""
self.runtime = runtime
# If managed_namespaces is None, accept all namespaces
self.managed_namespaces = (
set(managed_namespaces) if managed_namespaces else None
)
self.k8s_namespace = k8s_namespace
self.connectors = {} # Cache of KubernetesConnector per DGD

if self.managed_namespaces:
logger.info(
f"ScaleRequestHandler initialized for namespaces: {managed_namespaces}"
)
else:
logger.info("ScaleRequestHandler initialized (accepting all namespaces)")

@dynamo_endpoint(ScaleRequest, ScaleResponse)
async def scale_request(self, request: ScaleRequest):
"""Process scaling request from a Planner.
Args:
request: ScaleRequest with target replicas and DGD info
Yields:
ScaleResponse with status and current replica counts
"""
try:
# Validate caller namespace (if authorization is enabled)
if (
self.managed_namespaces is not None
and request.caller_namespace not in self.managed_namespaces
):
yield {
"status": "error",
"message": f"Namespace {request.caller_namespace} not authorized",
"current_replicas": {},
}
return

logger.info(
f"Processing scale request from {request.caller_namespace} "
f"for DGD {request.graph_deployment_name} "
f"in K8s namespace {request.k8s_namespace}"
)

# Get or create connector for this DGD
connector_key = f"{request.k8s_namespace}/{request.graph_deployment_name}"
if connector_key not in self.connectors:
connector = KubernetesConnector(
dynamo_namespace=request.caller_namespace,
model_name="managed", # Not used for remote execution
k8s_namespace=request.k8s_namespace,
parent_dgd_name=request.graph_deployment_name,
)
await connector._async_init()
self.connectors[connector_key] = connector
logger.debug(f"Created new connector for {connector_key}")
else:
connector = self.connectors[connector_key]
logger.debug(f"Reusing cached connector for {connector_key}")

# Convert request replicas to TargetReplica objects
target_replicas = [
TargetReplica(
sub_component_type=r.sub_component_type,
component_name=r.component_name,
desired_replicas=r.desired_replicas,
)
for r in request.target_replicas
]

# Execute scaling
await connector.set_component_replicas(
target_replicas, blocking=request.blocking
)

# Get current replica counts
current_replicas = {}
deployment = await connector.kube_api.get_graph_deployment(
connector.parent_dgd_name
)
for service_name, service_spec in deployment["spec"]["services"].items():
sub_type = service_spec.get("subComponentType", "")
if sub_type:
current_replicas[sub_type] = service_spec.get("replicas", 0)

logger.info(
f"Successfully scaled {request.graph_deployment_name}: {current_replicas}"
)
yield {
"status": "success",
"message": f"Scaled {request.graph_deployment_name} successfully",
"current_replicas": current_replicas,
}

except Exception as e:
logger.exception(f"Error processing scale request: {e}")
yield {"status": "error", "message": str(e), "current_replicas": {}}
19 changes: 13 additions & 6 deletions components/src/dynamo/planner/kubernetes_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ def __init__(
dynamo_namespace: str,
model_name: Optional[str] = None,
k8s_namespace: Optional[str] = None,
parent_dgd_name: Optional[str] = None,
):
self.kube_api = KubernetesAPI(k8s_namespace)

Expand All @@ -60,13 +61,19 @@ def __init__(
model_name.lower()
) # normalize model name to lowercase (MDC)

graph_deployment_name = os.getenv("DYN_PARENT_DGD_K8S_NAME")
if not graph_deployment_name:
raise DeploymentValidationError(
["DYN_PARENT_DGD_K8S_NAME environment variable is not set"]
)
# Allow overriding parent DGD name for centralized planner
if parent_dgd_name:
self.parent_dgd_name = parent_dgd_name
else:
graph_deployment_name = os.getenv("DYN_PARENT_DGD_K8S_NAME")
if not graph_deployment_name:
raise DeploymentValidationError(
["DYN_PARENT_DGD_K8S_NAME environment variable is not set"]
)
self.parent_dgd_name = graph_deployment_name

self.graph_deployment_name = graph_deployment_name
# For backwards compatibility
self.graph_deployment_name = self.parent_dgd_name

async def add_component(
self, sub_component_type: SubComponentType, blocking: bool = True
Expand Down
Loading
Loading