Skip to content

Commit d18e2a9

Browse files
authored
Optimize labeler memory usage footprint (#1093)
Signed-off-by: Ajay Mishra <ajmishra@nvidia.com>
1 parent 715c233 commit d18e2a9

File tree

2 files changed

+211
-4
lines changed

2 files changed

+211
-4
lines changed

labeler/pkg/labeler/labeler.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -292,6 +292,10 @@ func (l *Labeler) registerNodeEventHandlers() error {
292292
}
293293
},
294294
UpdateFunc: func(oldObj, newObj any) {
295+
if !l.nodeRequiresReconciliation(oldObj, newObj) {
296+
return
297+
}
298+
295299
if err := l.handleNodeEvent(newObj); err != nil {
296300
slog.Error("Failed to handle node update event", "error", err)
297301
}
@@ -393,6 +397,31 @@ func hasReadyDriverPod(objs []any, excludePod *v1.Pod) bool {
393397
return false
394398
}
395399

400+
// nodeRequiresReconciliation returns true only when a node update changed an
401+
// input label the labeler reads from nodes. DCGM and driver labels are driven
402+
// by pod events, so the node UpdateFunc only needs to react to changes in kata
403+
// detection labels and the gpu-present label (for assumeDriverInstalled mode).
404+
func (l *Labeler) nodeRequiresReconciliation(oldObj, newObj any) bool {
405+
oldNode, ok1 := oldObj.(*v1.Node)
406+
newNode, ok2 := newObj.(*v1.Node)
407+
408+
if !ok1 || !ok2 {
409+
return true
410+
}
411+
412+
if oldNode.Labels[gpuPresentLabel] != newNode.Labels[gpuPresentLabel] {
413+
return true
414+
}
415+
416+
for _, key := range l.kataLabels {
417+
if oldNode.Labels[key] != newNode.Labels[key] {
418+
return true
419+
}
420+
}
421+
422+
return false
423+
}
424+
396425
const gpuPresentLabel = "nvidia.com/gpu.present"
397426

398427
func (l *Labeler) isGPUNode(nodeName string) bool {

labeler/pkg/labeler/labeler_test.go

Lines changed: 182 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,10 @@ package labeler
1616

1717
import (
1818
"context"
19+
"fmt"
20+
"runtime"
21+
"sync"
22+
"sync/atomic"
1923
"testing"
2024
"time"
2125

@@ -27,6 +31,7 @@ import (
2731
"k8s.io/client-go/kubernetes"
2832
"k8s.io/client-go/kubernetes/fake"
2933
"k8s.io/client-go/tools/cache"
34+
"k8s.io/client-go/util/retry"
3035
"sigs.k8s.io/controller-runtime/pkg/envtest"
3136
)
3237

@@ -1144,15 +1149,22 @@ func TestStaleLabelsRemoval(t *testing.T) {
11441149
return len(dcgmObjs) > 0 || len(driverObjs) > 0
11451150
}, 10*time.Second, 100*time.Millisecond, "pods not indexed in custom indexes")
11461151

1147-
// Restore original labels - reconcileAllNodes() may have removed them
1148-
// before pods were indexed during the initial sync race
1152+
// Restore original labels - reconcileAllNodes() may have removed them
1153+
// before pods were indexed during the initial sync race.
1154+
// Uses RetryOnConflict because the labeler may concurrently update
1155+
// the same node during its initial reconciliation.
1156+
err = retry.RetryOnConflict(retry.DefaultBackoff, func() error {
11491157
node, err := cli.CoreV1().Nodes().Get(ctx, tt.existingNode.Name, metav1.GetOptions{})
1150-
require.NoError(t, err, "failed to get node")
1158+
if err != nil {
1159+
return err
1160+
}
11511161
for k, v := range tt.existingNode.Labels {
11521162
node.Labels[k] = v
11531163
}
11541164
_, err = cli.CoreV1().Nodes().Update(ctx, node, metav1.UpdateOptions{})
1155-
require.NoError(t, err, "failed to restore node labels")
1165+
return err
1166+
})
1167+
require.NoError(t, err, "failed to restore node labels")
11561168
}
11571169

11581170
err = labeler.handleNodeEvent(tt.existingNode)
@@ -1313,3 +1325,169 @@ func TestAssumeDriverInstalled(t *testing.T) {
13131325
})
13141326
}
13151327
}
1328+
1329+
// TestMemoryUnderNodeUpdatePressure creates nodes in envtest and rapidly updates
1330+
// their status conditions (simulating kubelet heartbeats) to detect unbounded
1331+
// memory growth in the labeler's node event handler path.
1332+
func TestMemoryUnderNodeUpdatePressure(t *testing.T) {
1333+
if testing.Short() {
1334+
t.Skip("skipping memory stress test in short mode")
1335+
}
1336+
1337+
const (
1338+
nodeCount = 200
1339+
testDuration = 60 * time.Second
1340+
workers = 10
1341+
maxGrowthMiB = 30
1342+
)
1343+
1344+
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute)
1345+
defer cancel()
1346+
1347+
testEnv := envtest.Environment{}
1348+
cfg, err := testEnv.Start()
1349+
require.NoError(t, err)
1350+
defer func() { _ = testEnv.Stop() }()
1351+
1352+
cli, err := kubernetes.NewForConfig(cfg)
1353+
require.NoError(t, err)
1354+
1355+
createNodes(t, ctx, cli, nodeCount)
1356+
1357+
labeler, err := NewLabeler(cli, 30*time.Second, "nvidia-dcgm", "nvidia-driver-daemonset", "nvidia-driver-installer", "", false)
1358+
require.NoError(t, err)
1359+
1360+
labelerCtx, labelerCancel := context.WithCancel(ctx)
1361+
defer labelerCancel()
1362+
go func() { _ = labeler.Run(labelerCtx) }()
1363+
1364+
require.True(t, cache.WaitForCacheSync(labelerCtx.Done(), labeler.informersSynced...))
1365+
1366+
runtime.GC()
1367+
time.Sleep(200 * time.Millisecond)
1368+
var baseline runtime.MemStats
1369+
runtime.ReadMemStats(&baseline)
1370+
1371+
updateCtx, updateCancel := context.WithTimeout(labelerCtx, testDuration)
1372+
defer updateCancel()
1373+
1374+
var peakHeapInuse atomic.Uint64
1375+
peakHeapInuse.Store(baseline.HeapInuse)
1376+
go func() {
1377+
ticker := time.NewTicker(100 * time.Millisecond)
1378+
defer ticker.Stop()
1379+
for {
1380+
select {
1381+
case <-updateCtx.Done():
1382+
return
1383+
case <-ticker.C:
1384+
var ms runtime.MemStats
1385+
runtime.ReadMemStats(&ms)
1386+
for {
1387+
cur := peakHeapInuse.Load()
1388+
if ms.HeapInuse <= cur || peakHeapInuse.CompareAndSwap(cur, ms.HeapInuse) {
1389+
break
1390+
}
1391+
}
1392+
}
1393+
}
1394+
}()
1395+
1396+
generateNodeUpdates(t, updateCtx, cli, nodeCount, workers)
1397+
1398+
runtime.GC()
1399+
time.Sleep(200 * time.Millisecond)
1400+
var final runtime.MemStats
1401+
runtime.ReadMemStats(&final)
1402+
1403+
finalGrowthMiB := int64(final.HeapInuse-baseline.HeapInuse) / 1024 / 1024
1404+
peakGrowthMiB := int64(peakHeapInuse.Load()-baseline.HeapInuse) / 1024 / 1024
1405+
1406+
t.Logf("nodes=%d duration=%s baseline=%dMiB peak=%dMiB final=%dMiB peakGrowth=%dMiB finalGrowth=%dMiB",
1407+
nodeCount, testDuration,
1408+
baseline.HeapInuse/1024/1024, peakHeapInuse.Load()/1024/1024, final.HeapInuse/1024/1024,
1409+
peakGrowthMiB, finalGrowthMiB)
1410+
1411+
if peakGrowthMiB > maxGrowthMiB {
1412+
t.Errorf("peak memory grew by %d MiB (limit %d MiB) — likely unbounded notification buffer growth",
1413+
peakGrowthMiB, maxGrowthMiB)
1414+
}
1415+
if finalGrowthMiB > maxGrowthMiB {
1416+
t.Errorf("final memory grew by %d MiB (limit %d MiB) — memory not reclaimed after GC",
1417+
finalGrowthMiB, maxGrowthMiB)
1418+
}
1419+
}
1420+
1421+
func createNodes(t *testing.T, ctx context.Context, cli kubernetes.Interface, count int) {
1422+
t.Helper()
1423+
1424+
var wg sync.WaitGroup
1425+
sem := make(chan struct{}, 20)
1426+
1427+
for i := range count {
1428+
wg.Add(1)
1429+
sem <- struct{}{}
1430+
go func() {
1431+
defer wg.Done()
1432+
defer func() { <-sem }()
1433+
1434+
_, err := cli.CoreV1().Nodes().Create(ctx, &corev1.Node{
1435+
ObjectMeta: metav1.ObjectMeta{
1436+
Name: fmt.Sprintf("node-%04d", i),
1437+
Labels: map[string]string{"nvidia.com/gpu.present": "true"},
1438+
},
1439+
}, metav1.CreateOptions{})
1440+
require.NoError(t, err)
1441+
}()
1442+
}
1443+
1444+
wg.Wait()
1445+
t.Logf("created %d nodes", count)
1446+
}
1447+
1448+
func generateNodeUpdates(t *testing.T, ctx context.Context, cli kubernetes.Interface, nodeCount, workers int) {
1449+
t.Helper()
1450+
1451+
sem := make(chan struct{}, workers)
1452+
1453+
for round := 1; ; round++ {
1454+
var wg sync.WaitGroup
1455+
cancelled := false
1456+
1457+
for i := range nodeCount {
1458+
select {
1459+
case <-ctx.Done():
1460+
cancelled = true
1461+
case sem <- struct{}{}:
1462+
}
1463+
if cancelled {
1464+
break
1465+
}
1466+
1467+
wg.Add(1)
1468+
go func() {
1469+
defer wg.Done()
1470+
defer func() { <-sem }()
1471+
1472+
node, err := cli.CoreV1().Nodes().Get(ctx, fmt.Sprintf("node-%04d", i), metav1.GetOptions{})
1473+
if err != nil {
1474+
return
1475+
}
1476+
node.Status.Conditions = []corev1.NodeCondition{{
1477+
Type: corev1.NodeReady,
1478+
Status: corev1.ConditionTrue,
1479+
LastHeartbeatTime: metav1.Now(),
1480+
Reason: fmt.Sprintf("round-%d", round),
1481+
}}
1482+
_, _ = cli.CoreV1().Nodes().UpdateStatus(ctx, node, metav1.UpdateOptions{})
1483+
}()
1484+
}
1485+
1486+
wg.Wait()
1487+
1488+
if cancelled {
1489+
return
1490+
}
1491+
t.Logf("heartbeat round %d complete", round)
1492+
}
1493+
}

0 commit comments

Comments
 (0)