Skip to content

Commit 9ef3351

Browse files
committed
chore: handle old events present in db
Signed-off-by: Tanisha goyal <[email protected]>
1 parent b9ba256 commit 9ef3351

File tree

7 files changed

+174
-25
lines changed

7 files changed

+174
-25
lines changed

data-models/pkg/protos/health_event.pb.go

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

data-models/protobufs/health_event.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ message HealthEvents {
3030
}
3131

3232
// ProcessingStrategy defines how downstream modules should handle the event.
33+
// UNSPECIFIED: Reserved as the zero value (required by protobuf). Not used anywhere in the code.
3334
// EXECUTE_REMEDIATION: normal behavior; downstream modules may update cluster state.
3435
// STORE_ONLY: observability-only behavior; event should be persisted/exported but should not modify cluster resources.
3536
enum ProcessingStrategy {

docs/designs/025-processing-strategy-for-health-checks.md

Lines changed: 76 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -338,18 +338,39 @@ For `processingStrategy=STORE_ONLY` events, the platform connector should:
338338
2. Not create node conditions
339339
3. Not create Kubernetes events
340340

341-
File: `platform-connectors/pkg/connectors/kubernetes/process_node_events.go`
341+
**Normalization:** Platform connector normalizes `processingStrategy` in the gRPC server handler before any processing or enqueuing. If the field is missing or set to `UNSPECIFIED` (e.g., from custom monitors), it defaults to `EXECUTE_REMEDIATION`. This ensures all events in the database have an explicit strategy, providing consistent behavior across all modules.
342+
343+
File: `platform-connectors/pkg/server/platform_connector_server.go`
342344

343-
Add skip logic in `processHealthEvents()` method to skip adding node condition/event:
345+
Add normalization in `HealthEventOccurredV1()` method before enqueuing to ring buffers:
344346

345347
```go
346-
func (r *K8sConnector) processHealthEvents(ctx context.Context, healthEvents *protos.HealthEvents) error {
347-
var nodeConditions []corev1.NodeCondition
348+
func (p *PlatformConnectorServer) HealthEventOccurredV1(ctx context.Context,
349+
he *pb.HealthEvents) (*empty.Empty, error) {
350+
slog.Info("Health events received", "events", he)
351+
352+
healthEventsReceived.Add(float64(len(he.Events)))
353+
354+
// Custom monitors that don't set processingStrategy will default to EXECUTE_REMEDIATION.
355+
for _, event := range he.Events {
356+
if event.ProcessingStrategy == pb.ProcessingStrategy_UNSPECIFIED {
357+
event.ProcessingStrategy = pb.ProcessingStrategy_EXECUTE_REMEDIATION
358+
}
359+
}
360+
361+
// ... pipeline processing and enqueuing to ring buffers
362+
}
363+
```
364+
365+
File: `platform-connectors/pkg/connectors/kubernetes/process_node_events.go`
366+
367+
Add skip logic for `STORE_ONLY` events in `filterProcessableEvents()` method:
348368

349-
// NEW: Filter out STORE_ONLY events - they should not modify node conditions or create K8s events
369+
```go
370+
func filterProcessableEvents(healthEvents *protos.HealthEvents) []*protos.HealthEvent {
350371
var processableEvents []*protos.HealthEvent
351372
for _, healthEvent := range healthEvents.Events {
352-
if healthEvent.ProcessingStrategy == protos.STORE_ONLY {
373+
if healthEvent.ProcessingStrategy == protos.ProcessingStrategy_STORE_ONLY {
353374
slog.Info("Skipping STORE_ONLY event - no node conditions or K8s events will be created",
354375
"node", healthEvent.NodeName,
355376
"checkName", healthEvent.CheckName)
@@ -358,7 +379,7 @@ func (r *K8sConnector) processHealthEvents(ctx context.Context, healthEvents *pr
358379
processableEvents = append(processableEvents, healthEvent)
359380
}
360381

361-
// ... existing code
382+
return processableEvents
362383
}
363384
```
364385
---
@@ -404,35 +425,49 @@ We need new methods in mongodb and postgres pipeline builder:
404425
File: `store-client/pkg/client/mongodb_pipeline_builder.go`
405426

406427
```go
407-
// BuildProcessableHealthEventInsertsPipeline creates a pipeline that watches for all processable health event inserts.
428+
// BuildProcessableHealthEventInsertsPipeline creates a pipeline that watches for
429+
// all EXECUTE_REMEDIATION health event inserts.
430+
//
431+
// Backward Compatibility: This pipeline uses $or to match events where processingstrategy is either:
432+
// - EXECUTE_REMEDIATION (new events from NVSentinel health monitors)
433+
// - Missing/null (old events created before update, custom monitors, or circuit breaker backlog)
408434
func (b *MongoDBPipelineBuilder) BuildProcessableHealthEventInsertsPipeline() datastore.Pipeline {
409435
return datastore.ToPipeline(
410436
datastore.D(
411437
datastore.E("$match", datastore.D(
412438
datastore.E("operationType", datastore.D(
413439
datastore.E("$in", datastore.A("insert")),
414440
)),
415-
datastore.E("fullDocument.healthevent.processingstrategy", "EXECUTE_REMEDIATION"),
441+
// Exclude STORE_ONLY events, but include EXECUTE_REMEDIATION and missing field
442+
datastore.E("$or", datastore.A(
443+
datastore.D(datastore.E("fullDocument.healthevent.processingstrategy", int32(protos.ProcessingStrategy_EXECUTE_REMEDIATION))),
444+
datastore.D(datastore.E("fullDocument.healthevent.processingstrategy", datastore.D(datastore.E("$exists", false)))),
445+
)),
416446
)),
417447
),
418448
)
419449
}
420450

421-
// BuildProcessableNonFatalUnhealthyInsertsPipeline creates a pipeline for non-fatal unhealthy events
422-
// excluding STORE_ONLY events.
451+
// BuildProcessableNonFatalUnhealthyInsertsPipeline creates a pipeline for non-fatal, unhealthy event inserts
452+
// excluding STORE_ONLY events. This is used by health-events-analyzer for pattern analysis.
453+
//
454+
// Backward Compatibility: Uses $or to include EXECUTE_REMEDIATION and missing field.
423455
func (b *MongoDBPipelineBuilder) BuildProcessableNonFatalUnhealthyInsertsPipeline() datastore.Pipeline {
424456
return datastore.ToPipeline(
425457
datastore.D(
426458
datastore.E("$match", datastore.D(
427459
datastore.E("operationType", "insert"),
428460
datastore.E("fullDocument.healthevent.agent", datastore.D(datastore.E("$ne", "health-events-analyzer"))),
429461
datastore.E("fullDocument.healthevent.ishealthy", false),
430-
datastore.E("fullDocument.healthevent.processingstrategy", "EXECUTE_REMEDIATION"),
462+
// Exclude STORE_ONLY events, but include EXECUTE_REMEDIATION and missing field
463+
datastore.E("$or", datastore.A(
464+
datastore.D(datastore.E("fullDocument.healthevent.processingstrategy", int32(protos.ProcessingStrategy_EXECUTE_REMEDIATION))),
465+
datastore.D(datastore.E("fullDocument.healthevent.processingstrategy", datastore.D(datastore.E("$exists", false)))),
466+
)),
431467
)),
432468
),
433469
)
434470
}
435-
436471
```
437472

438473
Same method required in postgres builder
@@ -441,35 +476,46 @@ File: `store-client/pkg/client/postgresql_pipeline_builder.go`
441476

442477
```go
443478
// BuildProcessableHealthEventInsertsPipeline creates a pipeline that watches for health event inserts
444-
// excluding STORE_ONLY events.
479+
// with processingStrategy=EXECUTE_REMEDIATION
480+
//
481+
// Backward Compatibility: Uses $or to include EXECUTE_REMEDIATION and missing field.
445482
func (b *PostgreSQLPipelineBuilder) BuildProcessableHealthEventInsertsPipeline() datastore.Pipeline {
446483
return datastore.ToPipeline(
447484
datastore.D(
448485
datastore.E("$match", datastore.D(
449486
datastore.E("operationType", datastore.D(
450487
datastore.E("$in", datastore.A("insert")),
451488
)),
452-
datastore.E("fullDocument.healthevent.processingstrategy", "EXECUTE_REMEDIATION"),
489+
// Exclude STORE_ONLY events, but include EXECUTE_REMEDIATION and missing field
490+
datastore.E("$or", datastore.A(
491+
datastore.D(datastore.E("fullDocument.healthevent.processingstrategy", int32(protos.ProcessingStrategy_EXECUTE_REMEDIATION))),
492+
datastore.D(datastore.E("fullDocument.healthevent.processingstrategy", datastore.D(datastore.E("$exists", false)))),
493+
)),
453494
)),
454495
),
455496
)
456497
}
457498

458-
// BuildProcessableNonFatalUnhealthyInsertsPipeline creates a pipeline for non-fatal unhealthy events
459-
// excluding STORE_ONLY events.
499+
// BuildProcessableNonFatalUnhealthyInsertsPipeline creates a pipeline for non-fatal, unhealthy event inserts
500+
// excluding STORE_ONLY events. For PostgreSQL, handles both INSERT and UPDATE operations.
501+
//
502+
// Backward Compatibility: Uses $or to include EXECUTE_REMEDIATION and missing field.
460503
func (b *PostgreSQLPipelineBuilder) BuildProcessableNonFatalUnhealthyInsertsPipeline() datastore.Pipeline {
461504
return datastore.ToPipeline(
462505
datastore.D(
463506
datastore.E("$match", datastore.D(
464507
datastore.E("operationType", datastore.D(datastore.E("$in", datastore.A("insert", "update")))),
465508
datastore.E("fullDocument.healthevent.agent", datastore.D(datastore.E("$ne", "health-events-analyzer"))),
466509
datastore.E("fullDocument.healthevent.ishealthy", false),
467-
datastore.E("fullDocument.healthevent.processingstrategy", "EXECUTE_REMEDIATION"),
510+
// Exclude STORE_ONLY events, but include EXECUTE_REMEDIATION and missing field
511+
datastore.E("$or", datastore.A(
512+
datastore.D(datastore.E("fullDocument.healthevent.processingstrategy", int32(protos.ProcessingStrategy_EXECUTE_REMEDIATION))),
513+
datastore.D(datastore.E("fullDocument.healthevent.processingstrategy", datastore.D(datastore.E("$exists", false)))),
514+
)),
468515
)),
469516
),
470517
)
471518
}
472-
473519
```
474520

475521
---
@@ -532,6 +578,8 @@ func createPipeline() interface{} {
532578

533579
Update the default pipeline query to exclude `processingStrategy=STORE_ONLY` events. We need this condition for every rule that's why we are adding it at code level instead of keeping it at config file level.
534580

581+
**Backward Compatibility Note:** Historical events in the database (created before this feature) won't have the `processingstrategy` field. These old events should be treated as `EXECUTE_REMEDIATION` (they were meant to be processed). We use `$or` to explicitly match both `EXECUTE_REMEDIATION` and a missing field to ensure backward compatibility. Other modules and health monitors do not require changes since they only act on newly inserted events, which will already have the `processingStrategy` field set (all health monitors run in `EXECUTE_REMEDIATION` mode by default).
582+
535583
File: `health-events-analyzer/pkg/reconciler/reconciler.go`
536584

537585
```go
@@ -542,11 +590,18 @@ func (r *Reconciler) getPipelineStages(
542590
// CRITICAL: Always start with agent filter to exclude events from health-events-analyzer itself
543591
// This prevents the analyzer from matching its own generated events, which would cause
544592
// infinite loops and incorrect rule evaluations
593+
//
594+
// Backward Compatibility: Use $or to include events where processingstrategy is either
595+
// EXECUTE_REMEDIATION or missing (old events created before this feature was added).
596+
// Old events without this field should be treated as EXECUTE_REMEDIATION.
545597
pipeline := []map[string]interface{}{
546598
{
547599
"$match": map[string]interface{}{
548-
"healthevent.agent": map[string]interface{}{"$ne": "health-events-analyzer"},
549-
"healthevent.processingstrategy": map[string]interface{}{"$eq": "EXECUTE_REMEDIATION"}, // Exclude STORE_ONLY by default
600+
"healthevent.agent": map[string]interface{}{"$ne": "health-events-analyzer"},
601+
"$or": []map[string]interface{}{
602+
{"healthevent.processingstrategy": "EXECUTE_REMEDIATION"},
603+
{"healthevent.processingstrategy": map[string]interface{}{"$exists": false}},
604+
},
550605
},
551606
},
552607
}

platform-connectors/pkg/server/platform_connector_server.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,13 @@ func (p *PlatformConnectorServer) HealthEventOccurredV1(ctx context.Context,
5454

5555
healthEventsReceived.Add(float64(len(he.Events)))
5656

57+
// Custom monitors that don't set processingStrategy will default to EXECUTE_REMEDIATION.
58+
for _, event := range he.Events {
59+
if event.ProcessingStrategy == pb.ProcessingStrategy_UNSPECIFIED {
60+
event.ProcessingStrategy = pb.ProcessingStrategy_EXECUTE_REMEDIATION
61+
}
62+
}
63+
5764
if p.Pipeline != nil {
5865
for i := range he.Events {
5966
p.Pipeline.Process(ctx, he.Events[i])

store-client/pkg/client/mongodb_pipeline_builder.go

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,14 +86,27 @@ func (b *MongoDBPipelineBuilder) BuildAllHealthEventInsertsPipeline() datastore.
8686

8787
// BuildProcessableHealthEventInsertsPipeline creates a pipeline that watches for
8888
// all EXECUTE_REMEDIATION health event inserts.
89+
//
90+
// Backward Compatibility: This pipeline uses $or to match events where processingstrategy is either:
91+
// - EXECUTE_REMEDIATION (new events from NVSentinel health monitors)
92+
// - Missing/null (old events created before upgrade, custom monitors, or circuit breaker backlog)
8993
func (b *MongoDBPipelineBuilder) BuildProcessableHealthEventInsertsPipeline() datastore.Pipeline {
9094
return datastore.ToPipeline(
9195
datastore.D(
9296
datastore.E("$match", datastore.D(
9397
datastore.E("operationType", datastore.D(
9498
datastore.E("$in", datastore.A("insert")),
9599
)),
96-
datastore.E("fullDocument.healthevent.processingstrategy", int32(protos.ProcessingStrategy_EXECUTE_REMEDIATION)),
100+
datastore.E("$or", datastore.A(
101+
datastore.D(datastore.E(
102+
"fullDocument.healthevent.processingstrategy",
103+
int32(protos.ProcessingStrategy_EXECUTE_REMEDIATION),
104+
)),
105+
datastore.D(datastore.E(
106+
"fullDocument.healthevent.processingstrategy",
107+
datastore.D(datastore.E("$exists", false)),
108+
)),
109+
)),
97110
)),
98111
),
99112
)
@@ -113,6 +126,35 @@ func (b *MongoDBPipelineBuilder) BuildNonFatalUnhealthyInsertsPipeline() datasto
113126
)
114127
}
115128

129+
// BuildProcessableNonFatalUnhealthyInsertsPipeline creates a pipeline for non-fatal, unhealthy event inserts
130+
// with processingStrategy=EXECUTE_REMEDIATION. This is used by health-events-analyzer for pattern analysis.
131+
//
132+
// Backward Compatibility: This pipeline uses $or to match events where processingstrategy is either:
133+
// - EXECUTE_REMEDIATION (new events from NVSentinel health monitors)
134+
// - Missing/null (old events created before upgrade, custom monitors)
135+
func (b *MongoDBPipelineBuilder) BuildProcessableNonFatalUnhealthyInsertsPipeline() datastore.Pipeline {
136+
return datastore.ToPipeline(
137+
datastore.D(
138+
datastore.E("$match", datastore.D(
139+
datastore.E("operationType", "insert"),
140+
datastore.E("fullDocument.healthevent.agent", datastore.D(datastore.E("$ne", "health-events-analyzer"))),
141+
datastore.E("fullDocument.healthevent.ishealthy", false),
142+
// Exclude STORE_ONLY events, but include EXECUTE_REMEDIATION and missing field (backward compat)
143+
datastore.E("$or", datastore.A(
144+
datastore.D(datastore.E(
145+
"fullDocument.healthevent.processingstrategy",
146+
int32(protos.ProcessingStrategy_EXECUTE_REMEDIATION),
147+
)),
148+
datastore.D(datastore.E(
149+
"fullDocument.healthevent.processingstrategy",
150+
datastore.D(datastore.E("$exists", false)),
151+
)),
152+
)),
153+
)),
154+
),
155+
)
156+
}
157+
116158
// BuildQuarantinedAndDrainedNodesPipeline creates a pipeline for remediation-ready nodes
117159
// This watches for insert/update events where both quarantine and eviction status indicate the
118160
// node is ready for reboot, or where the node has been unquarantined and needs cleanup, or where

store-client/pkg/client/postgresql_pipeline_builder.go

Lines changed: 46 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,14 +118,28 @@ func (b *PostgreSQLPipelineBuilder) BuildAllHealthEventInsertsPipeline() datasto
118118

119119
// BuildProcessableHealthEventInsertsPipeline creates a pipeline that watches for health event inserts
120120
// with processingStrategy=EXECUTE_REMEDIATION
121+
//
122+
// Backward Compatibility: This pipeline uses $or to match events where processingstrategy is either:
123+
// - EXECUTE_REMEDIATION (new events from NVSentinel health monitors)
124+
// - Missing/null (old events created before upgrade, custom monitors, or circuit breaker backlog)
121125
func (b *PostgreSQLPipelineBuilder) BuildProcessableHealthEventInsertsPipeline() datastore.Pipeline {
122126
return datastore.ToPipeline(
123127
datastore.D(
124128
datastore.E("$match", datastore.D(
125129
datastore.E("operationType", datastore.D(
126130
datastore.E("$in", datastore.A("insert")),
127131
)),
128-
datastore.E("fullDocument.healthevent.processingstrategy", int32(protos.ProcessingStrategy_EXECUTE_REMEDIATION)),
132+
// Exclude STORE_ONLY events, but include EXECUTE_REMEDIATION and missing field (backward compat)
133+
datastore.E("$or", datastore.A(
134+
datastore.D(datastore.E(
135+
"fullDocument.healthevent.processingstrategy",
136+
int32(protos.ProcessingStrategy_EXECUTE_REMEDIATION),
137+
)),
138+
datastore.D(datastore.E(
139+
"fullDocument.healthevent.processingstrategy",
140+
datastore.D(datastore.E("$exists", false)),
141+
)),
142+
)),
129143
)),
130144
),
131145
)
@@ -146,6 +160,37 @@ func (b *PostgreSQLPipelineBuilder) BuildNonFatalUnhealthyInsertsPipeline() data
146160
)
147161
}
148162

163+
// BuildProcessableNonFatalUnhealthyInsertsPipeline creates a pipeline for non-fatal, unhealthy event inserts
164+
// with processingStrategy=EXECUTE_REMEDIATION. This is used by health-events-analyzer for pattern analysis.
165+
// For PostgreSQL, we need to handle both INSERT and UPDATE operations because platform-connectors
166+
// may insert a record and then immediately update it, causing the trigger to fire UPDATE events.
167+
//
168+
// Backward Compatibility: This pipeline uses $or to match events where processingstrategy is either:
169+
// - EXECUTE_REMEDIATION (new events from NVSentinel health monitors)
170+
// - Missing/null (old events created before upgrade, custom monitors)
171+
func (b *PostgreSQLPipelineBuilder) BuildProcessableNonFatalUnhealthyInsertsPipeline() datastore.Pipeline {
172+
return datastore.ToPipeline(
173+
datastore.D(
174+
datastore.E("$match", datastore.D(
175+
datastore.E("operationType", datastore.D(datastore.E("$in", datastore.A("insert", "update")))),
176+
datastore.E("fullDocument.healthevent.agent", datastore.D(datastore.E("$ne", "health-events-analyzer"))),
177+
datastore.E("fullDocument.healthevent.ishealthy", false),
178+
// Exclude STORE_ONLY events, but include EXECUTE_REMEDIATION and missing field (backward compat)
179+
datastore.E("$or", datastore.A(
180+
datastore.D(datastore.E(
181+
"fullDocument.healthevent.processingstrategy",
182+
int32(protos.ProcessingStrategy_EXECUTE_REMEDIATION),
183+
)),
184+
datastore.D(datastore.E(
185+
"fullDocument.healthevent.processingstrategy",
186+
datastore.D(datastore.E("$exists", false)),
187+
)),
188+
)),
189+
)),
190+
),
191+
)
192+
}
193+
149194
// BuildQuarantinedAndDrainedNodesPipeline creates a pipeline for remediation-ready nodes
150195
// Similar to BuildNodeQuarantineStatusPipeline, this supports both UPDATE and INSERT operations
151196
// to be defensive against PostgreSQL trigger edge cases.

tests/helpers/healthevent.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ import (
2828
"testing"
2929
"time"
3030

31-
"github.com/nvidia/nvsentinel/data-models/pkg/protos"
3231
"github.com/stretchr/testify/require"
3332
)
3433

@@ -73,7 +72,6 @@ func NewHealthEvent(nodeName string) *HealthEventTemplate {
7372
EntityValue: "0",
7473
},
7574
},
76-
ProcessingStrategy: int(protos.ProcessingStrategy_EXECUTE_REMEDIATION),
7775
}
7876
}
7977

0 commit comments

Comments
 (0)