Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
7ec8e51
* Introduced Conditions in PCS status.
unmarshall Jan 4, 2026
07402fe
* Added constants for TopologyLevelsUnavailable condition reason
unmarshall Jan 5, 2026
653f99d
formatting changes
unmarshall Jan 5, 2026
d32a99d
added license header to clustertoplogy util
unmarshall Jan 5, 2026
52deaea
* Made PodCliqueSetStatus.Conditions optional
unmarshall Jan 5, 2026
32bd205
Addressed review comments:
unmarshall Jan 7, 2026
c12c0c9
* Removed defaulting preferred constraint to Host topology domain. This
unmarshall Jan 8, 2026
d161df9
formatting fixed
unmarshall Jan 8, 2026
dc5ccdd
Addressed review comments:
unmarshall Jan 8, 2026
ad28d83
Changes for setting Topology aware scheduling constraints in PodGangs.
unmarshall Jan 3, 2026
eae0f9e
feat: working on topology e2e infra
Ronkahn21 Jan 4, 2026
469b645
feat: implement BP-1 and SP-1 topology e2e tests
Ronkahn21 Jan 5, 2026
5cd1daa
fix: update SP-1 test for 2 PCSG replicas
Ronkahn21 Jan 5, 2026
838a1a6
chore: update kai-scheduler dependencies to v0.13.0-rc0
Ronkahn21 Jan 5, 2026
a6628bd
fix: update pod replicas and minAvailable for inference-group, prefil…
Ronkahn21 Jan 5, 2026
eecda8b
fix: update rack distribution and reapply topology labels in k3d cluster
Ronkahn21 Jan 5, 2026
6d3c802
feat: refactor topology tests to use deployWorkloadAndGetPods for pod…
Ronkahn21 Jan 5, 2026
db8d227
feat: rename workload YAML files and add new test scenarios for topol…
Ronkahn21 Jan 7, 2026
259c30d
fix: update kai-scheduler component versions to latest commit
Ronkahn21 Jan 7, 2026
46b23b2
feat: add new PodGangSet YAML files and enhance e2e test command with…
Ronkahn21 Jan 8, 2026
980cb3f
feat: add utility functions for topology verification and update depe…
Ronkahn21 Jan 8, 2026
be4a774
fix: correct topology constraint placement in YAML files and update g…
Ronkahn21 Jan 8, 2026
ab1b628
feat: update workload YAML files to use PodCliqueSet and enhance pod …
Ronkahn21 Jan 8, 2026
a8d6b8f
fix: update KAI Topology retrieval to use cluster-scoped resource
Ronkahn21 Jan 12, 2026
e0455b2
feat: enhance e2e tests with Topology Aware Scheduling patterns and u…
Ronkahn21 Jan 12, 2026
b06cb08
fix: regenerate crds and others generated code
Ronkahn21 Jan 12, 2026
0a8932d
chore: restore from main
Ronkahn21 Jan 12, 2026
5f039a4
feat: enhance test environment setup and update cluster size for topo…
Ronkahn21 Jan 13, 2026
c951fae
refactor: simplify environment variable filtering using functional ap…
Ronkahn21 Jan 13, 2026
987b2e8
refactor: rename top-* files and references to tas-* for consistency
Ronkahn21 Jan 13, 2026
dbde617
refactor: revert some issues with rebase
Ronkahn21 Jan 13, 2026
59e11a0
revert change in the e2e test workflow.
Ronkahn21 Jan 13, 2026
b01966f
feat: add KAI PodGroup utilities for topology verification and manage…
Ronkahn21 Jan 14, 2026
ec8cf85
feat: add KAI PodGroup utilities for topology verification and manage…
Ronkahn21 Jan 14, 2026
ca7da49
feat: add new test scenarios for PodCliqueSet topology constraints
Ronkahn21 Jan 15, 2026
0669adb
fix: add PCSG topology constraints to scaled PodGangs
Ronkahn21 Jan 18, 2026
1e1c5b3
test: add SP9 test for multi-replica PCS with 3-level topology hierarchy
Ronkahn21 Jan 18, 2026
d8f1ff2
fix: use correct labels for filtering PCSG replica pods in tests
Ronkahn21 Jan 18, 2026
6e0d002
test: remove redundant SP7 topology test
Ronkahn21 Jan 18, 2026
4a6bda7
fix: ensure topology constraint is created when TAS is enabled
Ronkahn21 Jan 18, 2026
e63eb7a
chore: add license header to kai_topology.go
Ronkahn21 Jan 18, 2026
bdcc943
test: fix topology constraints expectations for scaled PodGangs
Ronkahn21 Jan 18, 2026
626479d
chore: remove obsolete single-node-disaggregated PCS sample configura…
Ronkahn21 Jan 18, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/e2e-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ jobs:
test_pattern: "^Test_RU"
- test_name: startup_ordering
test_pattern: "^Test_SO"
- test_name: Topology_Aware_Scheduling
test_pattern: "^Test_TAS"
name: E2E - ${{ matrix.test_name }}
steps:
# print runner specs so we have a record incase of failures
Expand Down
8 changes: 7 additions & 1 deletion operator/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -87,12 +87,18 @@ cover-html: test-cover
@echo "Coverage report generated at coverage.html"

# Run e2e tests
# Usage: make test-e2e [TEST_PATTERN=<pattern>]
# Examples:
# make test-e2e # Run all tests
# make test-e2e TEST_PATTERN=Test_GS # Run all gang scheduling tests
# make test-e2e TEST_PATTERN=Test_GS1 # Run specific test
# make test-e2e TEST_PATTERN=Test_TAS # Run all topology tests
.PHONY: test-e2e
test-e2e:
@echo "> Preparing charts (copying CRDs)..."
@$(MODULE_HACK_DIR)/prepare-charts.sh
@echo "> Running e2e tests..."
@cd e2e && go test -count=1 -tags=e2e ./tests/... -v -timeout 45m
@cd e2e && go test -count=1 -tags=e2e ./tests/... -v -timeout 45m $(if $(TEST_PATTERN),-run $(TEST_PATTERN))

# Make targets for local development and testing
# -------------------------------------------------------------
Expand Down
1 change: 0 additions & 1 deletion operator/charts/templates/clusterrole.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -140,5 +140,4 @@ rules:
- watch
- patch
- update
- delete

16 changes: 8 additions & 8 deletions operator/e2e/dependencies.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,25 @@ images:

# Kai Scheduler components
- name: ghcr.io/nvidia/kai-scheduler/admission
version: v0.12.0
version: v0.13.0-rc1
- name: ghcr.io/nvidia/kai-scheduler/binder
version: v0.12.0
version: v0.13.0-rc1
- name: ghcr.io/nvidia/kai-scheduler/operator
version: v0.12.0
version: v0.13.0-rc1
- name: ghcr.io/nvidia/kai-scheduler/podgroupcontroller
version: v0.12.0
version: v0.13.0-rc1
- name: ghcr.io/nvidia/kai-scheduler/podgrouper
version: v0.12.0
version: v0.13.0-rc1
- name: ghcr.io/nvidia/kai-scheduler/queuecontroller
version: v0.12.0
version: v0.13.0-rc1
- name: ghcr.io/nvidia/kai-scheduler/scheduler
version: v0.12.0
version: v0.13.0-rc1

# Helm charts used in E2E tests
helmCharts:
# Kai Scheduler - gang scheduling for Kubernetes
kaiScheduler:
releaseName: kai-scheduler
chartRef: oci://ghcr.io/nvidia/kai-scheduler/kai-scheduler
version: v0.12.0
version: v0.13.0-rc1
namespace: kai-scheduler
81 changes: 80 additions & 1 deletion operator/e2e/setup/k8s_clusters.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"os"
"path/filepath"
"runtime"
"sort"
"strings"
"sync"
"time"
Expand All @@ -46,6 +47,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
k8stypes "k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
Expand Down Expand Up @@ -536,7 +538,7 @@ func InstallCoreComponents(ctx context.Context, restConfig *rest.Config, kaiConf
skaffoldConfig := &SkaffoldInstallConfig{
SkaffoldYAMLPath: absoluteSkaffoldYAMLPath,
RestConfig: restConfig,
Profiles: []string{"debug"},
Profiles: []string{"topology-test"},
PushRepo: fmt.Sprintf("localhost:%s", registryPort),
PullRepo: fmt.Sprintf("registry:%s", registryPort),
Namespace: OperatorNamespace,
Expand Down Expand Up @@ -570,6 +572,11 @@ func InstallCoreComponents(ctx context.Context, restConfig *rest.Config, kaiConf
return err // Return the first error encountered
}

// Apply hierarchical topology labels to worker nodes
if err := applyTopologyLabels(ctx, restConfig, logger); err != nil {
return fmt.Errorf("failed to apply topology labels: %w", err)
}

logger.Debug("✅ All component installations completed successfully")
return nil
}
Expand Down Expand Up @@ -1062,3 +1069,75 @@ func waitForWebhookReady(ctx context.Context, restConfig *rest.Config, logger *u
return true, nil
})
}

// getBlockForNodeIndex returns the block label for a given node index (0-based).
// Nodes 0-13 are in block-1, nodes 14-27 are in block-2.
func getBlockForNodeIndex(idx int) string {
if idx <= 13 {
return "block-1"
}
return "block-2"
}

// getRackForNodeIndex returns the rack label for a given node index (0-based).
// Distribution: 4 racks with 7 nodes each across 2 blocks.
func getRackForNodeIndex(idx int) string {
rackRanges := []int{7, 13, 20, 27}
for rackNum, maxIdx := range rackRanges {
if idx <= maxIdx {
return fmt.Sprintf("rack-%d", rackNum+1)
}
}
return "rack-4"
}

// applyTopologyLabels applies hierarchical topology labels to worker nodes in the k3d cluster.
// Creates a 4-level topology hierarchy: zone -> block -> rack -> host (kubernetes.io/hostname already exists)
// Distribution strategy for 28 worker nodes:
// - Zone: all nodes in "zone-1"
// - Block: nodes 0-13 in "block-1", nodes 14-27 in "block-2"
// - Rack: 4 racks total (2 per block), 7 hosts per rack
func applyTopologyLabels(ctx context.Context, restConfig *rest.Config, logger *utils.Logger) error {
logger.Info("🏷️ Applying hierarchical topology labels to worker nodes...")

// Create clientset
clientset, err := kubernetes.NewForConfig(restConfig)
if err != nil {
return fmt.Errorf("failed to create clientset: %w", err)
}

// Get all worker nodes (filter by label set during cluster creation)
nodes, err := clientset.CoreV1().Nodes().List(ctx, metav1.ListOptions{
LabelSelector: "node_role.e2e.grove.nvidia.com=agent",
})
if err != nil {
return fmt.Errorf("failed to list worker nodes: %w", err)
}

if len(nodes.Items) == 0 {
logger.Warn("⚠️ No worker nodes found for topology labeling")
return nil
}

sortedNodes := make([]v1.Node, len(nodes.Items))
copy(sortedNodes, nodes.Items)
sort.Slice(sortedNodes, func(i, j int) bool { return sortedNodes[i].Name < sortedNodes[j].Name })

for idx, node := range sortedNodes {
topologyLabels := fmt.Sprintf(`{"metadata":{"labels":{"kubernetes.io/zone":"zone-1","kubernetes.io/block":"%s","kubernetes.io/rack":"%s"}}}`,
getBlockForNodeIndex(idx), getRackForNodeIndex(idx))

_, err := clientset.CoreV1().Nodes().Patch(
ctx,
node.Name,
k8stypes.StrategicMergePatchType,
[]byte(topologyLabels),
metav1.PatchOptions{},
)
if err != nil {
return fmt.Errorf("failed to patch node %s with topology labels: %w", node.Name, err)
}
}
logger.Infof("✅ Applied topology labels to %d worker nodes", len(sortedNodes))
return nil
}
17 changes: 16 additions & 1 deletion operator/e2e/setup/skaffold.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"time"

"github.com/ai-dynamo/grove/operator/e2e/utils"
"github.com/samber/lo"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
Expand Down Expand Up @@ -153,7 +154,10 @@ func runSkaffoldBuild(ctx context.Context, absSkaffoldPath, skaffoldDir, kubecon
cmd.Dir = skaffoldDir

// Set up environment variables
cmd.Env = os.Environ()
// To allow running the tests from the IDE
cmd.Env = filterEnv(os.Environ(), "GOOS", "GOARCH")
cmd.Env = append(cmd.Env, "CGO_ENABLED=0")

cmd.Env = append(cmd.Env, fmt.Sprintf("KUBECONFIG=%s", kubeconfigPath))

// Add build-specific environment variables
Expand Down Expand Up @@ -315,3 +319,14 @@ func writeTemporaryKubeconfig(restConfig *rest.Config, logger *utils.Logger) (st
logger.Debugf("📄 Wrote temporary kubeconfig to: %s", tmpPath)
return tmpPath, cleanup, nil
}

// filterEnv filters out specified environment variables from the environment
func filterEnv(env []string, keysToRemove ...string) []string {
filtered := lo.Filter(env, func(e string, _ int) bool {
_, found := lo.Find(keysToRemove, func(key string) bool {
return strings.HasPrefix(e, key+"=")
})
return !found
})
return filtered
}
6 changes: 1 addition & 5 deletions operator/e2e/tests/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -637,11 +637,7 @@ func scalePCSGAcrossAllReplicas(tc TestContext, pcsName, pcsgName string, pcsRep

// convertUnstructuredToTyped converts an unstructured map to a typed object
func convertUnstructuredToTyped(u map[string]interface{}, typed interface{}) error {
data, err := json.Marshal(u)
if err != nil {
return err
}
return json.Unmarshal(data, typed)
return utils.ConvertUnstructuredToTyped(u, typed)
}

// convertTypedToUnstructured converts a typed object to an unstructured object
Expand Down
Loading
Loading