Skip to content

Commit c28bf0c

Browse files
KaivalyaMDabhadkarrohansavclaude
authored
fix: cover cancellation completion marker for fault-remediation (#1335)
Signed-off-by: Kaivalya Dabhadkar <kdabhadkar@nvidia.com> Co-authored-by: rohansav <rohansavar@gmail.com> Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 32a883e commit c28bf0c

4 files changed

Lines changed: 336 additions & 13 deletions

File tree

fault-remediation/pkg/reconciler/reconciler.go

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -155,8 +155,7 @@ func (r *FaultRemediationReconciler) Reconcile(
155155
nodeQuarantined := healthEventWithStatus.HealthEventStatus.NodeQuarantined
156156

157157
if nodeQuarantined == string(model.UnQuarantined) || nodeQuarantined == string(model.Cancelled) {
158-
return r.handleCancellationEvent(ctx, nodeName, model.Status(nodeQuarantined), r.Watcher, event.ResumeToken,
159-
r.healthEventStore)
158+
return r.handleCancellationEvent(ctx, nodeName, model.Status(nodeQuarantined), r.Watcher, *event, r.healthEventStore)
160159
}
161160

162161
return r.handleRemediationEvent(ctx, &healthEventWithStatus, *event, r.Watcher, r.healthEventStore)
@@ -371,13 +370,15 @@ func (r *FaultRemediationReconciler) performRemediation(ctx context.Context,
371370
return crName, nil
372371
}
373372

374-
// handleCancellationEvent handles node unquarantine and cancellation events by clearing annotations
373+
// handleCancellationEvent handles node unquarantine and cancellation events by clearing
374+
// annotations and writing a completion marker so the event is excluded from cold start
375+
// queries on future restarts.
375376
func (r *FaultRemediationReconciler) handleCancellationEvent(
376377
ctx context.Context,
377378
nodeName string,
378379
status model.Status,
379380
watcherInstance datastore.ChangeStreamWatcher,
380-
resumeToken []byte,
381+
eventWithToken datastore.EventWithToken,
381382
healthEventStore datastore.HealthEventStore,
382383
) (ctrl.Result, error) {
383384
ctx, span := tracing.StartSpan(ctx, "fault_remediation.cancellation_event")
@@ -418,7 +419,16 @@ func (r *FaultRemediationReconciler) handleCancellationEvent(
418419
return ctrl.Result{}, fmt.Errorf("failed to clear remediation state for node: %w", err)
419420
}
420421

421-
if err := safeMarkProcessed(context.Background(), watcherInstance, resumeToken, nodeName); err != nil {
422+
if err := r.updateNodeRemediatedStatus(ctx, healthEventStore, eventWithToken, true); err != nil {
423+
slog.ErrorContext(ctx, "Failed to write completion marker for cancellation event",
424+
"node", nodeName,
425+
"error", err)
426+
tracing.RecordError(span, err)
427+
428+
return ctrl.Result{}, fmt.Errorf("failed to write completion marker for cancellation event: %w", err)
429+
}
430+
431+
if err := safeMarkProcessed(ctx, watcherInstance, eventWithToken.ResumeToken, nodeName); err != nil {
422432
return ctrl.Result{}, err
423433
}
424434

@@ -1128,9 +1138,12 @@ func (r *FaultRemediationReconciler) HandleColdStart(ctx context.Context) {
11281138
query.Or(
11291139
// Quarantined + drained but not yet remediated
11301140
unresolvedRemediationReadyEventsCondition(""),
1131-
// Cancelled/unquarantined events (need cleanup)
1132-
query.In("healtheventstatus.nodequarantined",
1133-
[]interface{}{string(model.UnQuarantined), string(model.Cancelled)}),
1141+
// Cancelled/unquarantined events that haven't been marked complete
1142+
query.And(
1143+
query.In("healtheventstatus.nodequarantined",
1144+
[]interface{}{string(model.UnQuarantined), string(model.Cancelled)}),
1145+
query.Eq("healtheventstatus.faultremediated", nil),
1146+
),
11341147
),
11351148
)
11361149

fault-remediation/pkg/reconciler/reconciler_e2e_test.go

Lines changed: 235 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2103,6 +2103,241 @@ func TestHandleColdStart_CancellationFlow(t *testing.T) {
21032103
_ = testDynamic.Resource(gvr).Delete(ctx, crName, metav1.DeleteOptions{})
21042104
}
21052105

2106+
// TestCancellationEvent_WritesCompletionMarker verifies that handleCancellationEvent writes
2107+
// faultRemediated=true to the health event store, making cold start self-terminating for
2108+
// cancellation events.
2109+
func TestCancellationEvent_WritesCompletionMarker(t *testing.T) {
2110+
ctx, cancel := context.WithTimeout(testContext, 30*time.Second)
2111+
defer cancel()
2112+
2113+
nodeName := testutils.GenerateTestNodeName("test-cancel-marker")
2114+
createTestNode(ctx, nodeName, nil, map[string]string{"test": "label"})
2115+
defer func() {
2116+
_ = testClient.CoreV1().Nodes().Delete(ctx, nodeName, metav1.DeleteOptions{})
2117+
}()
2118+
2119+
cleanupNodeAnnotations(ctx, t, nodeName)
2120+
2121+
gvr := schema.GroupVersionResource{
2122+
Group: "janitor.dgxc.nvidia.com",
2123+
Version: "v1alpha1",
2124+
Resource: "rebootnodes",
2125+
}
2126+
2127+
remediationClient, err := createTestRemediationClient(false, restartRemediationActions)
2128+
require.NoError(t, err)
2129+
2130+
var capturedStatuses []datastore.HealthEventStatus
2131+
var capturedIDs []string
2132+
var mu sync.Mutex
2133+
2134+
localStore := &MockHealthEventStore{}
2135+
localStore.UpdateHealthEventStatusFn = func(ctx context.Context, id string, status datastore.HealthEventStatus) error {
2136+
mu.Lock()
2137+
capturedIDs = append(capturedIDs, id)
2138+
capturedStatuses = append(capturedStatuses, status)
2139+
mu.Unlock()
2140+
return nil
2141+
}
2142+
2143+
localWatcher := NewMockChangeStreamWatcher()
2144+
cfg := ReconcilerConfig{
2145+
RemediationClient: remediationClient,
2146+
StateManager: statemanager.NewStateManager(testClient),
2147+
UpdateMaxRetries: 3,
2148+
UpdateRetryDelay: 100 * time.Millisecond,
2149+
}
2150+
localReconciler := NewFaultRemediationReconciler(nil, localWatcher, localStore, cfg, false)
2151+
2152+
// Pre-set the node state to match what fault-quarantine + node-drainer would have done.
2153+
_, err = cfg.StateManager.UpdateNVSentinelStateNodeLabel(ctx, nodeName,
2154+
statemanager.QuarantinedLabelValue, false)
2155+
require.NoError(t, err)
2156+
_, err = cfg.StateManager.UpdateNVSentinelStateNodeLabel(ctx, nodeName,
2157+
statemanager.DrainingLabelValue, false)
2158+
require.NoError(t, err)
2159+
_, err = cfg.StateManager.UpdateNVSentinelStateNodeLabel(ctx, nodeName,
2160+
statemanager.DrainSucceededLabelValue, false)
2161+
require.NoError(t, err)
2162+
2163+
// Step 1: Send a quarantine event to establish remediation state
2164+
t.Log("Step 1: Processing quarantine event to create CR and annotation")
2165+
eventID := "cancel-marker-event-1"
2166+
quarantineEvent := createQuarantineEvent(eventID, nodeName, protos.RecommendedAction_RESTART_BM)
2167+
quarantineEventToken := datastore.EventWithToken{
2168+
Event: map[string]interface{}(quarantineEvent),
2169+
ResumeToken: []byte("cancel-marker-token-1"),
2170+
}
2171+
_, err = localReconciler.Reconcile(ctx, &quarantineEventToken)
2172+
require.NoError(t, err)
2173+
2174+
var crName string
2175+
require.Eventually(t, func() bool {
2176+
state, _, err := localReconciler.annotationManager.GetRemediationState(ctx, nodeName)
2177+
if err != nil {
2178+
return false
2179+
}
2180+
if grp, ok := state.EquivalenceGroups["restart"]; ok {
2181+
crName = grp.MaintenanceCR
2182+
return crName != ""
2183+
}
2184+
return false
2185+
}, 5*time.Second, 100*time.Millisecond, "CR and annotation should be created")
2186+
2187+
mu.Lock()
2188+
capturedIDs = nil
2189+
capturedStatuses = nil
2190+
mu.Unlock()
2191+
2192+
// Step 2: Send the cancellation event and capture only the cancellation store updates.
2193+
t.Log("Step 2: Sending cancellation event and capturing store updates")
2194+
cancelledEvent := createCancelledEvent(eventID, nodeName, protos.RecommendedAction_RESTART_BM)
2195+
cancelledEventToken := datastore.EventWithToken{
2196+
Event: map[string]interface{}(cancelledEvent),
2197+
ResumeToken: []byte("cancel-marker-token-2"),
2198+
}
2199+
_, err = localReconciler.Reconcile(ctx, &cancelledEventToken)
2200+
require.NoError(t, err)
2201+
2202+
// Step 3: Wait for remediation state to be cleared (proves cancellation was processed)
2203+
require.Eventually(t, func() bool {
2204+
node, err := testClient.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{})
2205+
if err != nil {
2206+
return false
2207+
}
2208+
_, hasAnnotation := node.Annotations[annotation.AnnotationKey]
2209+
return !hasAnnotation
2210+
}, 5*time.Second, 100*time.Millisecond, "Remediation annotation should be cleared")
2211+
2212+
// Step 4: Verify that UpdateHealthEventStatus was called with FaultRemediated=true
2213+
t.Log("Step 4: Verifying completion marker was written")
2214+
mu.Lock()
2215+
defer mu.Unlock()
2216+
2217+
require.NotEmpty(t, capturedStatuses, "UpdateHealthEventStatus should have been called for cancellation")
2218+
2219+
var foundCompletionMarker bool
2220+
for i, status := range capturedStatuses {
2221+
if capturedIDs[i] == eventID && status.FaultRemediated != nil && *status.FaultRemediated {
2222+
foundCompletionMarker = true
2223+
t.Logf("Completion marker written for event ID: %s (FaultRemediated=true)", capturedIDs[i])
2224+
assert.NotNil(t, status.LastRemediationTimestamp,
2225+
"LastRemediationTimestamp should be set when FaultRemediated=true")
2226+
break
2227+
}
2228+
}
2229+
require.True(t, foundCompletionMarker,
2230+
"handleCancellationEvent must write FaultRemediated=true for the cancelled event")
2231+
2232+
_ = testDynamic.Resource(gvr).Delete(ctx, crName, metav1.DeleteOptions{})
2233+
}
2234+
2235+
// TestColdStartCancellationQueryRequiresCompletionMarker verifies that the cold start
2236+
// cancellation query includes the same completion gate as the remediation query leg.
2237+
// Without this faultremediated==nil predicate, every historical cancellation can replay
2238+
// on every restart.
2239+
func TestColdStartCancellationQueryRequiresCompletionMarker(t *testing.T) {
2240+
ctx, cancel := context.WithTimeout(testContext, 30*time.Second)
2241+
defer cancel()
2242+
2243+
var capturedQuery map[string]interface{}
2244+
coldStartCallCount := 0
2245+
2246+
localStore := &MockHealthEventStore{}
2247+
localStore.FindHealthEventsByQueryBatchedFn = func(_ context.Context, builder datastore.QueryBuilder, _ int, fn func([]datastore.HealthEventWithStatus) error) error {
2248+
coldStartCallCount++
2249+
capturedQuery = builder.ToMongo()
2250+
return fn([]datastore.HealthEventWithStatus{})
2251+
}
2252+
2253+
remediationClient, err := createTestRemediationClient(false, restartRemediationActions)
2254+
require.NoError(t, err)
2255+
cfg := ReconcilerConfig{
2256+
RemediationClient: remediationClient,
2257+
StateManager: statemanager.NewStateManager(testClient),
2258+
UpdateMaxRetries: 3,
2259+
UpdateRetryDelay: 100 * time.Millisecond,
2260+
}
2261+
localReconciler := NewFaultRemediationReconciler(nil, NewMockChangeStreamWatcher(), localStore, cfg, false)
2262+
2263+
localReconciler.HandleColdStart(ctx)
2264+
2265+
assert.Equal(t, 1, coldStartCallCount,
2266+
"Cold start should query the store exactly once")
2267+
require.NotNil(t, capturedQuery, "Cold start query should be captured")
2268+
require.True(t, coldStartCancellationQueryHasCompletionGate(capturedQuery),
2269+
"Cold start cancellation query must include faultremediated==nil gate")
2270+
}
2271+
2272+
func coldStartCancellationQueryHasCompletionGate(q map[string]interface{}) bool {
2273+
orConditions, ok := q["$or"].([]interface{})
2274+
if !ok {
2275+
return false
2276+
}
2277+
2278+
for _, rawCondition := range orConditions {
2279+
condition, ok := rawCondition.(map[string]interface{})
2280+
if !ok {
2281+
continue
2282+
}
2283+
2284+
if isCancellationQueryLeg(condition) && hasFaultRemediatedNilGate(condition) {
2285+
return true
2286+
}
2287+
2288+
andConditions, ok := condition["$and"].([]interface{})
2289+
if !ok {
2290+
continue
2291+
}
2292+
2293+
hasCancellationFilter := false
2294+
hasCompletionGate := false
2295+
for _, rawAndCondition := range andConditions {
2296+
andCondition, ok := rawAndCondition.(map[string]interface{})
2297+
if !ok {
2298+
continue
2299+
}
2300+
hasCancellationFilter = hasCancellationFilter || isCancellationQueryLeg(andCondition)
2301+
hasCompletionGate = hasCompletionGate || hasFaultRemediatedNilGate(andCondition)
2302+
}
2303+
if hasCancellationFilter && hasCompletionGate {
2304+
return true
2305+
}
2306+
}
2307+
2308+
return false
2309+
}
2310+
2311+
func isCancellationQueryLeg(condition map[string]interface{}) bool {
2312+
nodeQuarantined, ok := condition["healtheventstatus.nodequarantined"].(map[string]interface{})
2313+
if !ok {
2314+
return false
2315+
}
2316+
2317+
values, ok := nodeQuarantined["$in"].([]interface{})
2318+
if !ok {
2319+
return false
2320+
}
2321+
2322+
foundUnquarantined := false
2323+
foundCancelled := false
2324+
for _, value := range values {
2325+
switch value {
2326+
case string(model.UnQuarantined):
2327+
foundUnquarantined = true
2328+
case string(model.Cancelled):
2329+
foundCancelled = true
2330+
}
2331+
}
2332+
2333+
return foundUnquarantined && foundCancelled
2334+
}
2335+
2336+
func hasFaultRemediatedNilGate(condition map[string]interface{}) bool {
2337+
value, ok := condition["healtheventstatus.faultremediated"]
2338+
return ok && value == nil
2339+
}
2340+
21062341
// TestCustomAction_E2E tests the full reconciler flow with a custom remediation action.
21072342
// Exercises the complete path: raw MongoDB event → Reconcile → GetEffectiveActionName
21082343
// → template rendering → CR creation.

tests/fault_management_test.go

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -547,14 +547,20 @@ func TestManualUncordonWithFaultRemediation(t *testing.T) {
547547
WithLabel("suite", "fault-management-manual-intervention-fr")
548548

549549
var testCtx *helpers.RemediationTestContext
550+
var mongoPod string
550551
var podNames []string
551552
testNamespace := "immediate-test"
553+
const cancellationMarkerMessage = "XID 79 fatal error for FR cancellation marker"
552554

553555
feature.Setup(func(ctx context.Context, t *testing.T, c *envconf.Config) context.Context {
554556
var newCtx context.Context
555557
newCtx, testCtx = helpers.SetupFaultRemediationTest(ctx, t, c, testNamespace)
556558
newCtx = helpers.ApplyNodeDrainerConfig(newCtx, t, c, "data/nd-all-modes.yaml")
557559

560+
client, err := c.NewClient()
561+
require.NoError(t, err)
562+
mongoPod, _ = helpers.TryGetMongoDBPrimaryPodName(ctx, t, client)
563+
558564
return newCtx
559565
})
560566

@@ -569,7 +575,7 @@ func TestManualUncordonWithFaultRemediation(t *testing.T) {
569575
t.Log("Sending fatal health event to trigger quarantine")
570576
fatalEvent := helpers.NewHealthEvent(testCtx.NodeName).
571577
WithErrorCode("79").
572-
WithMessage("XID 79 fatal error").
578+
WithMessage(cancellationMarkerMessage).
573579
WithRecommendedAction(24)
574580
helpers.SendHealthEvent(ctx, t, fatalEvent)
575581

@@ -639,6 +645,25 @@ func TestManualUncordonWithFaultRemediation(t *testing.T) {
639645
return ctx
640646
})
641647

648+
feature.Assess("verify FR marks cancelled event remediated", func(ctx context.Context, t *testing.T, c *envconf.Config) context.Context {
649+
if mongoPod == "" {
650+
t.Log("Skipping direct faultRemediated MongoDB assertion for non-MongoDB datastore")
651+
return ctx
652+
}
653+
654+
client, err := c.NewClient()
655+
require.NoError(t, err)
656+
657+
t.Log("Waiting for FR to write faultRemediated=true on the Cancelled HealthEvent")
658+
require.Eventually(t, func() bool {
659+
return helpers.CancelledHealthEventFaultRemediated(ctx, t, client.RESTConfig(), client, mongoPod,
660+
testCtx.NodeName, cancellationMarkerMessage)
661+
}, helpers.EventuallyWaitTimeout, helpers.WaitInterval,
662+
"FR should write faultRemediated=true for the cancelled HealthEvent")
663+
664+
return ctx
665+
})
666+
642667
feature.Teardown(func(ctx context.Context, t *testing.T, c *envconf.Config) context.Context {
643668
client, err := c.NewClient()
644669
require.NoError(t, err)

0 commit comments

Comments
 (0)