Skip to content

Commit afcc326

Browse files
XRFXLPlalitadithya
authored andcommitted
fix: custom drain mode should handle parallel drains (#966)
Signed-off-by: Ajay Mishra <ajmishra@nvidia.com>
1 parent 429d2b8 commit afcc326

File tree

4 files changed

+113
-45
lines changed

4 files changed

+113
-45
lines changed

demos/local-slinky-drain-demo/scripts/00-setup.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ install_custom_drain_crd() {
152152
install_nvsentinel() {
153153
section "Phase 5: Installing NVSentinel"
154154

155-
local nvsentinel_version="${NVSENTINEL_VERSION:-v0.6.0}"
155+
local nvsentinel_version="${NVSENTINEL_VERSION:-v0.10.0}"
156156

157157
log "Installing Prometheus Operator CRDs (for PodMonitor)..."
158158
kubectl apply --server-side -f https://raw.githubusercontent.com/prometheus-operator/prometheus-operator/v0.68.0/example/prometheus-operator-crd/monitoring.coreos.com_podmonitors.yaml

node-drainer/pkg/evaluator/evaluator.go

Lines changed: 5 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,8 @@ import (
2828
"github.com/nvidia/nvsentinel/node-drainer/pkg/config"
2929
"github.com/nvidia/nvsentinel/node-drainer/pkg/customdrain"
3030
"github.com/nvidia/nvsentinel/node-drainer/pkg/queue"
31-
"github.com/nvidia/nvsentinel/store-client/pkg/client"
3231
"github.com/nvidia/nvsentinel/store-client/pkg/datastore"
3332
"github.com/nvidia/nvsentinel/store-client/pkg/query"
34-
"github.com/nvidia/nvsentinel/store-client/pkg/utils"
3533
)
3634

3735
const (
@@ -89,7 +87,7 @@ func (e *NodeDrainEvaluator) EvaluateEventWithDatabase(ctx context.Context, heal
8987
}
9088

9189
if e.config.CustomDrain.Enabled && e.customDrainClient != nil {
92-
return e.evaluateCustomDrain(ctx, healthEvent, database, partialDrainEntity)
90+
return e.evaluateCustomDrain(ctx, healthEvent, partialDrainEntity)
9391
}
9492

9593
return e.evaluateUserNamespaceActions(ctx, healthEvent, partialDrainEntity)
@@ -305,19 +303,12 @@ func isTerminalStatus(status model.Status) bool {
305303
}
306304

307305
func (e *NodeDrainEvaluator) evaluateCustomDrain(ctx context.Context, healthEvent model.HealthEventWithStatus,
308-
database queue.DataStore, partialDrainEntity *protos.Entity) (*DrainActionResult, error) {
306+
partialDrainEntity *protos.Entity) (*DrainActionResult, error) {
309307
nodeName := healthEvent.HealthEvent.NodeName
308+
eventID := healthEvent.HealthEvent.Id
310309

311-
eventID, err := getEventID(ctx, database, nodeName)
312-
if err != nil {
313-
slog.Error("Failed to get event ID for custom drain",
314-
"node", nodeName,
315-
"error", err)
316-
317-
return &DrainActionResult{
318-
Action: ActionWait,
319-
WaitDelay: customDrainPollInterval,
320-
}, nil
310+
if eventID == "" {
311+
return nil, fmt.Errorf("health event for node %s is missing Id, cannot generate DrainRequest CR name", nodeName)
321312
}
322313

323314
crName := customdrain.GenerateCRName(nodeName, eventID)
@@ -388,33 +379,6 @@ func (e *NodeDrainEvaluator) evaluateCustomDrain(ctx context.Context, healthEven
388379
}, nil
389380
}
390381

391-
func getEventID(ctx context.Context, database queue.DataStore, nodeName string) (string, error) {
392-
opts := &client.FindOneOptions{
393-
Sort: map[string]any{"_id": -1},
394-
}
395-
396-
filter := map[string]any{
397-
"healthevent.nodename": nodeName,
398-
}
399-
400-
result, err := database.FindDocument(ctx, filter, opts)
401-
if err != nil {
402-
return "", fmt.Errorf("failed to query database for node %s: %w", nodeName, err)
403-
}
404-
405-
var document map[string]any
406-
if err := result.Decode(&document); err != nil {
407-
return "", fmt.Errorf("failed to decode health event for node %s: %w", nodeName, err)
408-
}
409-
410-
eventID, err := utils.ExtractDocumentID(document)
411-
if err != nil {
412-
return "", fmt.Errorf("failed to extract document ID for node %s: %w", nodeName, err)
413-
}
414-
415-
return eventID, nil
416-
}
417-
418382
/*
419383
This function determines whether we can skip draining for the current unhealthy HealthEvent. A full drain can be
420384
skipped if a previous full drain completed against the node after it was most-recently quarantined. Additionally,

node-drainer/pkg/reconciler/reconciler.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -285,6 +285,12 @@ func (r *Reconciler) ProcessEventGeneric(ctx context.Context,
285285

286286
eventID := utils.ExtractEventID(event)
287287

288+
if healthEventWithStatus.HealthEvent != nil && healthEventWithStatus.HealthEvent.Id == "" {
289+
if docID, err := utils.ExtractDocumentID(event); err == nil {
290+
healthEventWithStatus.HealthEvent.Id = docID
291+
}
292+
}
293+
288294
slog.Debug("Processing event", "node", nodeName, "eventID", eventID)
289295

290296
metrics.TotalEventsReceived.Inc()

node-drainer/pkg/reconciler/reconciler_integration_test.go

Lines changed: 101 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1366,15 +1366,22 @@ func createPod(ctx context.Context, t *testing.T, client kubernetes.Interface, n
13661366

13671367
type healthEventOptions struct {
13681368
nodeName string
1369+
eventID string
13691370
nodeQuarantined model.Status
13701371
drainForce bool
13711372
recommendedAction protos.RecommendedAction
13721373
entitiesImpacted []*protos.Entity
13731374
}
13741375

13751376
func createHealthEvent(opts healthEventOptions) map[string]interface{} {
1377+
eventID := opts.nodeName + "-event"
1378+
if opts.eventID != "" {
1379+
eventID = opts.eventID
1380+
}
1381+
13761382
healthEvent := &protos.HealthEvent{
13771383
NodeName: opts.nodeName,
1384+
Id: eventID,
13781385
CheckName: "test-check",
13791386
RecommendedAction: opts.recommendedAction,
13801387
EntitiesImpacted: opts.entitiesImpacted,
@@ -1384,9 +1391,6 @@ func createHealthEvent(opts healthEventOptions) map[string]interface{} {
13841391
healthEvent.DrainOverrides = &protos.BehaviourOverrides{Force: true}
13851392
}
13861393

1387-
eventID := opts.nodeName + "-event"
1388-
// Return just the fullDocument content, as the event watcher extracts this
1389-
// from the change stream before passing to the reconciler
13901394
return map[string]interface{}{
13911395
"_id": eventID,
13921396
"healthevent": healthEvent,
@@ -1859,6 +1863,100 @@ func TestReconciler_CustomDrainHappyPath(t *testing.T) {
18591863
}, 10*time.Second, 500*time.Millisecond, "CR should be deleted after drain completes")
18601864
}
18611865

1866+
func TestReconciler_CustomDrainMultipleEventsOnSameNode(t *testing.T) {
1867+
customDrainCfg := config.CustomDrainConfig{
1868+
Enabled: true,
1869+
TemplateMountPath: "../customdrain/testdata",
1870+
TemplateFileName: "drain-template.yaml",
1871+
Namespace: "default",
1872+
ApiGroup: "drain.example.com",
1873+
Version: "v1alpha1",
1874+
Kind: "DrainRequest",
1875+
StatusConditionType: "Complete",
1876+
StatusConditionStatus: "True",
1877+
Timeout: config.Duration{Duration: 30 * time.Minute},
1878+
}
1879+
1880+
setup := setupCustomDrainTest(t, customDrainCfg)
1881+
1882+
nodeName := "multi-event-node"
1883+
createNode(setup.ctx, t, setup.client, nodeName)
1884+
createNamespace(setup.ctx, t, setup.client, "default")
1885+
1886+
gvr := schema.GroupVersionResource{
1887+
Group: "drain.example.com",
1888+
Version: "v1alpha1",
1889+
Resource: "drainrequests",
1890+
}
1891+
1892+
eventA := createHealthEvent(healthEventOptions{
1893+
nodeName: nodeName,
1894+
eventID: "event-aaa",
1895+
nodeQuarantined: model.Quarantined,
1896+
})
1897+
eventB := createHealthEvent(healthEventOptions{
1898+
nodeName: nodeName,
1899+
eventID: "event-bbb",
1900+
nodeQuarantined: model.Quarantined,
1901+
})
1902+
1903+
setup.mockDB.storeEvent(nodeName, eventB)
1904+
1905+
// Process Event A — should create its own DrainRequest CR
1906+
err := setup.reconciler.ProcessEventGeneric(setup.ctx, eventA, setup.mockDB, setup.healthEventStore, nodeName)
1907+
require.Error(t, err)
1908+
assert.Contains(t, err.Error(), "waiting for custom drain CR to complete")
1909+
1910+
crNameA := customdrain.GenerateCRName(nodeName, "event-aaa")
1911+
crNameB := customdrain.GenerateCRName(nodeName, "event-bbb")
1912+
1913+
_, err = setup.dynamicClient.Resource(gvr).Namespace("default").Get(setup.ctx, crNameA, metav1.GetOptions{})
1914+
require.NoError(t, err, "DrainRequest for Event A should exist")
1915+
1916+
// Process Event B — should create a separate DrainRequest CR
1917+
err = setup.reconciler.ProcessEventGeneric(setup.ctx, eventB, setup.mockDB, setup.healthEventStore, nodeName)
1918+
require.Error(t, err)
1919+
assert.Contains(t, err.Error(), "waiting for custom drain CR to complete")
1920+
1921+
_, err = setup.dynamicClient.Resource(gvr).Namespace("default").Get(setup.ctx, crNameB, metav1.GetOptions{})
1922+
require.NoError(t, err, "DrainRequest for Event B should exist")
1923+
1924+
// Retry Event A while CR is incomplete — should poll Event A's CR, not Event B's
1925+
err = setup.reconciler.ProcessEventGeneric(setup.ctx, eventA, setup.mockDB, setup.healthEventStore, nodeName)
1926+
require.Error(t, err)
1927+
assert.Contains(t, err.Error(), "waiting for retry delay",
1928+
"Event A should be waiting on its own CR, not trying to create a new one")
1929+
1930+
// Complete Event A's DrainRequest
1931+
crA, err := setup.dynamicClient.Resource(gvr).Namespace("default").Get(setup.ctx, crNameA, metav1.GetOptions{})
1932+
require.NoError(t, err)
1933+
err = unstructured.SetNestedField(crA.Object, []any{
1934+
map[string]any{
1935+
"type": "Complete",
1936+
"status": "True",
1937+
"lastTransitionTime": time.Now().Format(time.RFC3339),
1938+
"reason": "DrainSucceeded",
1939+
"message": "All pods drained",
1940+
},
1941+
}, "status", "conditions")
1942+
require.NoError(t, err)
1943+
_, err = setup.dynamicClient.Resource(gvr).Namespace("default").UpdateStatus(setup.ctx, crA, metav1.UpdateOptions{})
1944+
require.NoError(t, err)
1945+
1946+
// Process Event A again — should detect completion and mark AlreadyDrained
1947+
err = setup.reconciler.ProcessEventGeneric(setup.ctx, eventA, setup.mockDB, setup.healthEventStore, nodeName)
1948+
require.NoError(t, err, "Event A should complete successfully after its CR is done")
1949+
1950+
require.Eventually(t, func() bool {
1951+
_, err := setup.dynamicClient.Resource(gvr).Namespace("default").Get(setup.ctx, crNameA, metav1.GetOptions{})
1952+
return errors.IsNotFound(err)
1953+
}, 10*time.Second, 500*time.Millisecond, "Event A's CR should be cleaned up")
1954+
1955+
// Event B's CR should still exist independently
1956+
_, err = setup.dynamicClient.Resource(gvr).Namespace("default").Get(setup.ctx, crNameB, metav1.GetOptions{})
1957+
require.NoError(t, err, "Event B's CR should still exist — it hasn't been completed yet")
1958+
}
1959+
18621960
func TestReconciler_CustomDrainCRDNotFound(t *testing.T) {
18631961
customDrainCfg := config.CustomDrainConfig{
18641962
Enabled: true,

0 commit comments

Comments
 (0)