Skip to content

Commit 429d2b8

Browse files
Fix slinky-drainer to wait for fully drained state before deleting (#919)
Signed-off-by: Fagani Hajizada <fagani.hajizada@gmail.com>
1 parent c5d06ee commit 429d2b8

File tree

3 files changed

+131
-17
lines changed

3 files changed

+131
-17
lines changed

plugins/mock-slurm-operator/pkg/controller/node_controller.go

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import (
3232
const (
3333
annotationKey = "nodeset.slinky.slurm.net/node-cordon-reason"
3434
slurmDrainConditionType = "SlurmNodeStateDrain"
35+
slurmIdleConditionType = "SlurmNodeStateIdle"
3536
)
3637

3738
type NodeReconciler struct {
@@ -127,13 +128,22 @@ func (r *NodeReconciler) listSlinkyPodsOnNode(ctx context.Context, nodeName stri
127128
}
128129

129130
func (r *NodeReconciler) updatePodDrainCondition(ctx context.Context, pod *corev1.Pod, reason string) error {
130-
pod.Status.Conditions = append(pod.Status.Conditions, corev1.PodCondition{
131-
Type: slurmDrainConditionType,
132-
Status: corev1.ConditionTrue,
133-
LastTransitionTime: metav1.Now(),
134-
Reason: "NodeCordonedBySlinkyDrainer",
135-
Message: reason,
136-
})
131+
pod.Status.Conditions = append(pod.Status.Conditions,
132+
corev1.PodCondition{
133+
Type: slurmDrainConditionType,
134+
Status: corev1.ConditionTrue,
135+
LastTransitionTime: metav1.Now(),
136+
Reason: "NodeCordonedBySlinkyDrainer",
137+
Message: reason,
138+
},
139+
corev1.PodCondition{
140+
Type: slurmIdleConditionType,
141+
Status: corev1.ConditionTrue,
142+
LastTransitionTime: metav1.Now(),
143+
Reason: "NodeDrained",
144+
Message: "Node is idle after drain",
145+
},
146+
)
137147

138148
return r.Status().Update(ctx, pod)
139149
}

plugins/slinky-drainer/pkg/controller/drainrequest_controller.go

Lines changed: 35 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,15 @@ const (
4545
annotationPrefix = "[J] [NVSentinel]"
4646
nvsentinelStateLabelKey = "dgxc.nvidia.com/nvsentinel-state"
4747
drainRequestFinalizer = "nvsentinel.nvidia.com/slinky-drainer"
48+
49+
// Slurm base-state conditions that indicate the node still has running work.
50+
// A pod is only considered fully drained when SlurmNodeStateDrain is True
51+
// and none of these busy-state conditions are True.
52+
// Ref: https://github.com/SlinkyProject/slurm-operator/blob/main/pkg/conditions/conditions.go
53+
slurmNodeStateAllocatedConditionType = "SlurmNodeStateAllocated"
54+
slurmNodeStateMixedConditionType = "SlurmNodeStateMixed"
55+
slurmNodeStateCompletingConditionType = "SlurmNodeStateCompleting"
56+
slurmNodeStateUndrainConditionType = "SlurmNodeStateUndrain"
4857
)
4958

5059
type DrainRequestReconciler struct {
@@ -100,9 +109,9 @@ func (r *DrainRequestReconciler) Reconcile(ctx context.Context, req ctrl.Request
100109
return r.markDrainComplete(ctx, drainReq, "NoPods", "No Slinky pods found on node")
101110
}
102111

103-
allReady, notReadyPods := r.checkPodsReadyForDrain(pods)
104-
if !allReady {
105-
slog.Info("Waiting for pods to be ready for drain",
112+
allDrained, notReadyPods := r.checkPodsFullyDrained(pods)
113+
if !allDrained {
114+
slog.Info("Waiting for pods to be fully drained",
106115
"drainrequest", req.NamespacedName,
107116
"total", len(pods),
108117
"notReady", len(notReadyPods),
@@ -289,15 +298,15 @@ func (r *DrainRequestReconciler) getSlinkyPods(ctx context.Context, nodeName str
289298
return podList.Items, nil
290299
}
291300

292-
func (r *DrainRequestReconciler) checkPodsReadyForDrain(pods []corev1.Pod) (bool, []string) {
301+
func (r *DrainRequestReconciler) checkPodsFullyDrained(pods []corev1.Pod) (bool, []string) {
293302
var notReady []string
294303

295304
for _, pod := range pods {
296305
if !isPodReady(&pod) {
297306
continue
298307
}
299308

300-
if !hasSlurmDrainCondition(&pod) {
309+
if !isPodDrained(&pod) {
301310
notReady = append(notReady, pod.Name)
302311
}
303312
}
@@ -315,9 +324,28 @@ func isPodReady(pod *corev1.Pod) bool {
315324
return false
316325
}
317326

318-
func hasSlurmDrainCondition(pod *corev1.Pod) bool {
327+
// isPodDrained returns true when the Slurm node behind this pod has the DRAIN
328+
// flag set, the UNDRAIN flag is NOT set, and no work is running. This mirrors
329+
// the Slinky operator's IsNodeDrained logic so we only delete pods after all
330+
// Slurm jobs have finished.
331+
func isPodDrained(pod *corev1.Pod) bool {
332+
return isNodeDrain(pod) && !isPodBusy(pod)
333+
}
334+
335+
func isNodeDrain(pod *corev1.Pod) bool {
336+
return hasPodCondition(pod, slurmNodeStateDrainConditionType) &&
337+
!hasPodCondition(pod, slurmNodeStateUndrainConditionType)
338+
}
339+
340+
func isPodBusy(pod *corev1.Pod) bool {
341+
return hasPodCondition(pod, slurmNodeStateAllocatedConditionType) ||
342+
hasPodCondition(pod, slurmNodeStateMixedConditionType) ||
343+
hasPodCondition(pod, slurmNodeStateCompletingConditionType)
344+
}
345+
346+
func hasPodCondition(pod *corev1.Pod, condType corev1.PodConditionType) bool {
319347
for _, cond := range pod.Status.Conditions {
320-
if cond.Type == slurmNodeStateDrainConditionType && cond.Status == corev1.ConditionTrue {
348+
if cond.Type == condType && cond.Status == corev1.ConditionTrue {
321349
return true
322350
}
323351
}

plugins/slinky-drainer/pkg/controller/drainrequest_controller_test.go

Lines changed: 79 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,14 @@ func TestReconcile_FullDrainCycle(t *testing.T) {
6767

6868
assertNodeAnnotation(t, tc, node.Name, "[J] [NVSentinel] 79 GPU:0 - GPU has fallen off the bus")
6969

70-
markPodDrainReady(t, tc, pod.Name, pod.Namespace)
70+
// Simulate DRAINING state: Slurm accepted drain but jobs still running.
71+
markPodDraining(t, tc, pod.Name, pod.Namespace)
72+
73+
// Pod must NOT be deleted while in DRAINING state (busy).
74+
assertPodNotDeleted(t, tc, pod.Name, pod.Namespace, 3*time.Second)
75+
76+
// Simulate transition to DRAINED state: jobs finished, node is idle.
77+
markPodDrained(t, tc, pod.Name, pod.Namespace)
7178

7279
waitForDrainComplete(t, tc, "drain-full-cycle", "default")
7380
waitForPodDeletion(t, tc, pod.Name, pod.Namespace)
@@ -97,6 +104,30 @@ func TestReconcile_PreExistingAnnotationPreserved(t *testing.T) {
97104
assertNodeAnnotation(t, tc, node.Name, "Manual drain by operator")
98105
}
99106

107+
func TestReconcile_DrainingPodNotDeleted(t *testing.T) {
108+
tc := setupTestEnv(t, "drain-still-draining")
109+
110+
node := createNode(t, tc, "test-node-draining", nil, map[string]string{
111+
nvsentinelStateLabelKey: "draining",
112+
})
113+
pod := createSlinkyPod(t, tc, node.Name)
114+
markPodReady(t, tc, pod.Name, pod.Namespace)
115+
createDrainRequest(t, tc, "drain-still-draining", drainv1alpha1.DrainRequestSpec{
116+
NodeName: node.Name,
117+
ErrorCode: []string{"79"},
118+
Reason: "GPU has fallen off the bus",
119+
})
120+
121+
assertNodeAnnotation(t, tc, node.Name, "[J] [NVSentinel] 79 - GPU has fallen off the bus")
122+
123+
// Set pod to DRAINING: Drain flag set but node is still busy (Allocated).
124+
markPodDraining(t, tc, pod.Name, pod.Namespace)
125+
126+
// Verify pod is NOT deleted and DrainRequest is NOT completed while draining.
127+
assertPodNotDeleted(t, tc, pod.Name, pod.Namespace, 5*time.Second)
128+
assertDrainNotComplete(t, tc, "drain-still-draining", "default")
129+
}
130+
100131
// ---------------------------------------------------------------------------
101132
// Test setup
102133
// ---------------------------------------------------------------------------
@@ -235,24 +266,43 @@ func createFailedPod(t *testing.T, tc *testEnvContext, nodeName string) {
235266
func markPodReady(t *testing.T, tc *testEnvContext, podName, podNamespace string) {
236267
t.Helper()
237268

269+
var pod corev1.Pod
270+
271+
require.Eventually(t, func() bool {
272+
return tc.client.Get(tc.ctx, types.NamespacedName{Name: podName, Namespace: podNamespace}, &pod) == nil
273+
}, testTimeout, testPollInterval, "Pod %s/%s should exist", podNamespace, podName)
274+
275+
pod.Status.Phase = corev1.PodRunning
276+
pod.Status.Conditions = []corev1.PodCondition{
277+
{Type: corev1.PodReady, Status: corev1.ConditionTrue},
278+
}
279+
require.NoError(t, tc.client.Status().Update(tc.ctx, &pod))
280+
}
281+
282+
func markPodDraining(t *testing.T, tc *testEnvContext, podName, podNamespace string) {
283+
t.Helper()
284+
238285
pod := &corev1.Pod{}
239286
require.NoError(t, tc.client.Get(tc.ctx, types.NamespacedName{Name: podName, Namespace: podNamespace}, pod))
240287

241-
pod.Status.Phase = corev1.PodRunning
242288
pod.Status.Conditions = []corev1.PodCondition{
243289
{Type: corev1.PodReady, Status: corev1.ConditionTrue},
290+
{Type: slurmNodeStateDrainConditionType, Status: corev1.ConditionTrue},
291+
{Type: slurmNodeStateAllocatedConditionType, Status: corev1.ConditionTrue},
244292
}
245293
require.NoError(t, tc.client.Status().Update(tc.ctx, pod))
246294
}
247295

248-
func markPodDrainReady(t *testing.T, tc *testEnvContext, podName, podNamespace string) {
296+
func markPodDrained(t *testing.T, tc *testEnvContext, podName, podNamespace string) {
249297
t.Helper()
250298

251299
pod := &corev1.Pod{}
252300
require.NoError(t, tc.client.Get(tc.ctx, types.NamespacedName{Name: podName, Namespace: podNamespace}, pod))
253301

254302
pod.Status.Conditions = []corev1.PodCondition{
303+
{Type: corev1.PodReady, Status: corev1.ConditionTrue},
255304
{Type: slurmNodeStateDrainConditionType, Status: corev1.ConditionTrue},
305+
{Type: "SlurmNodeStateIdle", Status: corev1.ConditionTrue},
256306
}
257307
require.NoError(t, tc.client.Status().Update(tc.ctx, pod))
258308
}
@@ -328,6 +378,32 @@ func waitForAnnotationRemoved(t *testing.T, tc *testEnvContext, nodeName string)
328378
}, testTimeout, testPollInterval, "Annotation on node %s should be removed", nodeName)
329379
}
330380

381+
func assertPodNotDeleted(t *testing.T, tc *testEnvContext, podName, podNamespace string, waitDuration time.Duration) {
382+
t.Helper()
383+
384+
assert.Never(t, func() bool {
385+
p := &corev1.Pod{}
386+
if err := tc.client.Get(tc.ctx, types.NamespacedName{Name: podName, Namespace: podNamespace}, p); err != nil {
387+
return apierrors.IsNotFound(err)
388+
}
389+
390+
return p.DeletionTimestamp != nil
391+
}, waitDuration, testPollInterval, "Pod %s/%s should NOT be deleted while draining", podNamespace, podName)
392+
}
393+
394+
func assertDrainNotComplete(t *testing.T, tc *testEnvContext, drName, drNamespace string) {
395+
t.Helper()
396+
397+
dr := &drainv1alpha1.DrainRequest{}
398+
require.NoError(t, tc.client.Get(tc.ctx, types.NamespacedName{Name: drName, Namespace: drNamespace}, dr))
399+
400+
for _, c := range dr.Status.Conditions {
401+
if c.Type == drainCompleteConditionType && c.Status == metav1.ConditionTrue {
402+
t.Fatalf("DrainRequest %s/%s should NOT have DrainComplete=True while pods are still draining", drNamespace, drName)
403+
}
404+
}
405+
}
406+
331407
func assertNodeAnnotation(t *testing.T, tc *testEnvContext, nodeName, expectedValue string) {
332408
t.Helper()
333409

0 commit comments

Comments
 (0)