Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/e2e-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ jobs:
test_pattern: "^Test_RU"
- test_name: startup_ordering
test_pattern: "^Test_SO"
- test_name: Topology_Aware_Scheduling
test_pattern: "^Test_TAS"
name: E2E - ${{ matrix.test_name }}
steps:
# print runner specs so we have a record incase of failures
Expand Down
8 changes: 7 additions & 1 deletion operator/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -87,12 +87,18 @@ cover-html: test-cover
@echo "Coverage report generated at coverage.html"

# Run e2e tests
# Usage: make test-e2e [TEST_PATTERN=<pattern>]
# Examples:
# make test-e2e # Run all tests
# make test-e2e TEST_PATTERN=Test_GS # Run all gang scheduling tests
# make test-e2e TEST_PATTERN=Test_GS1 # Run specific test
# make test-e2e TEST_PATTERN=Test_TAS # Run all topology tests
.PHONY: test-e2e
test-e2e:
@echo "> Preparing charts (copying CRDs)..."
@$(MODULE_HACK_DIR)/prepare-charts.sh
@echo "> Running e2e tests..."
@cd e2e && go test -count=1 -tags=e2e ./tests/... -v -timeout 45m
@cd e2e && go test -count=1 -tags=e2e ./tests/... -v -timeout 45m $(if $(TEST_PATTERN),-run $(TEST_PATTERN))

# Make targets for local development and testing
# -------------------------------------------------------------
Expand Down
16 changes: 8 additions & 8 deletions operator/e2e/dependencies.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,25 @@ images:

# Kai Scheduler components
- name: ghcr.io/nvidia/kai-scheduler/admission
version: v0.12.0
version: v0.13.0-rc1
- name: ghcr.io/nvidia/kai-scheduler/binder
version: v0.12.0
version: v0.13.0-rc1
- name: ghcr.io/nvidia/kai-scheduler/operator
version: v0.12.0
version: v0.13.0-rc1
- name: ghcr.io/nvidia/kai-scheduler/podgroupcontroller
version: v0.12.0
version: v0.13.0-rc1
- name: ghcr.io/nvidia/kai-scheduler/podgrouper
version: v0.12.0
version: v0.13.0-rc1
- name: ghcr.io/nvidia/kai-scheduler/queuecontroller
version: v0.12.0
version: v0.13.0-rc1
- name: ghcr.io/nvidia/kai-scheduler/scheduler
version: v0.12.0
version: v0.13.0-rc1

# Helm charts used in E2E tests
helmCharts:
# Kai Scheduler - gang scheduling for Kubernetes
kaiScheduler:
releaseName: kai-scheduler
chartRef: oci://ghcr.io/nvidia/kai-scheduler/kai-scheduler
version: v0.12.0
version: v0.13.0-rc1
namespace: kai-scheduler
81 changes: 80 additions & 1 deletion operator/e2e/setup/k8s_clusters.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"os"
"path/filepath"
"runtime"
"sort"
"strings"
"sync"
"time"
Expand All @@ -46,6 +47,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
k8stypes "k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
Expand Down Expand Up @@ -536,7 +538,7 @@ func InstallCoreComponents(ctx context.Context, restConfig *rest.Config, kaiConf
skaffoldConfig := &SkaffoldInstallConfig{
SkaffoldYAMLPath: absoluteSkaffoldYAMLPath,
RestConfig: restConfig,
Profiles: []string{"debug"},
Profiles: []string{"topology-test"},
PushRepo: fmt.Sprintf("localhost:%s", registryPort),
PullRepo: fmt.Sprintf("registry:%s", registryPort),
Namespace: OperatorNamespace,
Expand Down Expand Up @@ -570,6 +572,11 @@ func InstallCoreComponents(ctx context.Context, restConfig *rest.Config, kaiConf
return err // Return the first error encountered
}

// Apply hierarchical topology labels to worker nodes
if err := applyTopologyLabels(ctx, restConfig, logger); err != nil {
return fmt.Errorf("failed to apply topology labels: %w", err)
}

logger.Debug("✅ All component installations completed successfully")
return nil
}
Expand Down Expand Up @@ -1062,3 +1069,75 @@ func waitForWebhookReady(ctx context.Context, restConfig *rest.Config, logger *u
return true, nil
})
}

// getBlockForNodeIndex returns the block label for a given node index (0-based).
// Nodes 0-13 are in block-1, nodes 14-27 are in block-2.
func getBlockForNodeIndex(idx int) string {
if idx <= 13 {
return "block-1"
}
return "block-2"
}

// getRackForNodeIndex returns the rack label for a given node index (0-based).
// Distribution: 4 racks with 7 nodes each across 2 blocks.
func getRackForNodeIndex(idx int) string {
rackRanges := []int{7, 13, 20, 27}
for rackNum, maxIdx := range rackRanges {
if idx <= maxIdx {
return fmt.Sprintf("rack-%d", rackNum+1)
}
}
return "rack-4"
}

// applyTopologyLabels applies hierarchical topology labels to worker nodes in the k3d cluster.
// Creates a 4-level topology hierarchy: zone -> block -> rack -> host (kubernetes.io/hostname already exists)
// Distribution strategy for 28 worker nodes:
// - Zone: all nodes in "zone-1"
// - Block: nodes 0-13 in "block-1", nodes 14-27 in "block-2"
// - Rack: 4 racks total (2 per block), 7 hosts per rack
func applyTopologyLabels(ctx context.Context, restConfig *rest.Config, logger *utils.Logger) error {
logger.Info("🏷️ Applying hierarchical topology labels to worker nodes...")

// Create clientset
clientset, err := kubernetes.NewForConfig(restConfig)
if err != nil {
return fmt.Errorf("failed to create clientset: %w", err)
}

// Get all worker nodes (filter by label set during cluster creation)
nodes, err := clientset.CoreV1().Nodes().List(ctx, metav1.ListOptions{
LabelSelector: "node_role.e2e.grove.nvidia.com=agent",
})
if err != nil {
return fmt.Errorf("failed to list worker nodes: %w", err)
}

if len(nodes.Items) == 0 {
logger.Warn("⚠️ No worker nodes found for topology labeling")
return nil
}

sortedNodes := make([]v1.Node, len(nodes.Items))
copy(sortedNodes, nodes.Items)
sort.Slice(sortedNodes, func(i, j int) bool { return sortedNodes[i].Name < sortedNodes[j].Name })

for idx, node := range sortedNodes {
topologyLabels := fmt.Sprintf(`{"metadata":{"labels":{"kubernetes.io/zone":"zone-1","kubernetes.io/block":"%s","kubernetes.io/rack":"%s"}}}`,
getBlockForNodeIndex(idx), getRackForNodeIndex(idx))

_, err := clientset.CoreV1().Nodes().Patch(
ctx,
node.Name,
k8stypes.StrategicMergePatchType,
[]byte(topologyLabels),
metav1.PatchOptions{},
)
if err != nil {
return fmt.Errorf("failed to patch node %s with topology labels: %w", node.Name, err)
}
}
logger.Infof("✅ Applied topology labels to %d worker nodes", len(sortedNodes))
return nil
}
17 changes: 16 additions & 1 deletion operator/e2e/setup/skaffold.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"time"

"github.com/ai-dynamo/grove/operator/e2e/utils"
"github.com/samber/lo"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
Expand Down Expand Up @@ -153,7 +154,10 @@ func runSkaffoldBuild(ctx context.Context, absSkaffoldPath, skaffoldDir, kubecon
cmd.Dir = skaffoldDir

// Set up environment variables
cmd.Env = os.Environ()
// To allow running the tests from the IDE
cmd.Env = filterEnv(os.Environ(), "GOOS", "GOARCH")
cmd.Env = append(cmd.Env, "CGO_ENABLED=0")

cmd.Env = append(cmd.Env, fmt.Sprintf("KUBECONFIG=%s", kubeconfigPath))

// Add build-specific environment variables
Expand Down Expand Up @@ -315,3 +319,14 @@ func writeTemporaryKubeconfig(restConfig *rest.Config, logger *utils.Logger) (st
logger.Debugf("📄 Wrote temporary kubeconfig to: %s", tmpPath)
return tmpPath, cleanup, nil
}

// filterEnv filters out specified environment variables from the environment
func filterEnv(env []string, keysToRemove ...string) []string {
filtered := lo.Filter(env, func(e string, _ int) bool {
_, found := lo.Find(keysToRemove, func(key string) bool {
return strings.HasPrefix(e, key+"=")
})
return !found
})
return filtered
}
13 changes: 2 additions & 11 deletions operator/e2e/tests/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,12 +181,7 @@ func prepareTestCluster(ctx context.Context, t *testing.T, requiredWorkerNodes i
logger.Error("=== CLEANUP FAILURE - COLLECTING DIAGNOSTICS ===")
logger.Error("================================================================================")
CollectAllDiagnostics(diagnosticsTc)

// Mark cleanup as failed - this will cause all subsequent tests to fail immediately
// when they try to prepare the cluster, preventing potentially corrupted test state
sharedCluster.MarkCleanupFailed(err)

t.Fatalf("Failed to cleanup workloads: %v. All subsequent tests will fail.", err)
t.Fatalf("Failed to cleanup workloads: %v", err)
}
}

Expand Down Expand Up @@ -642,11 +637,7 @@ func scalePCSGAcrossAllReplicas(tc TestContext, pcsName, pcsgName string, pcsRep

// convertUnstructuredToTyped converts an unstructured map to a typed object
func convertUnstructuredToTyped(u map[string]interface{}, typed interface{}) error {
data, err := json.Marshal(u)
if err != nil {
return err
}
return json.Unmarshal(data, typed)
return utils.ConvertUnstructuredToTyped(u, typed)
}

// convertTypedToUnstructured converts a typed object to an unstructured object
Expand Down
Loading
Loading