Skip to content

Commit 7771e9b

Browse files
fix: delete stale or invalid resume token for mongo client changeStreams (#1092)
1 parent d18e2a9 commit 7771e9b

File tree

12 files changed

+875
-125
lines changed

12 files changed

+875
-125
lines changed

fault-remediation/go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ require (
1212
github.com/prometheus/client_golang v1.23.2
1313
github.com/prometheus/client_model v0.6.2
1414
github.com/stretchr/testify v1.11.1
15+
golang.org/x/sync v0.20.0
1516
google.golang.org/protobuf v1.36.11
1617
k8s.io/api v0.35.3
1718
k8s.io/apimachinery v0.35.3
@@ -89,7 +90,6 @@ require (
8990
golang.org/x/crypto v0.48.0 // indirect
9091
golang.org/x/net v0.51.0 // indirect
9192
golang.org/x/oauth2 v0.35.0 // indirect
92-
golang.org/x/sync v0.20.0 // indirect
9393
golang.org/x/sys v0.41.0 // indirect
9494
golang.org/x/term v0.40.0 // indirect
9595
golang.org/x/text v0.34.0 // indirect

fault-remediation/main.go

Lines changed: 88 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"syscall"
2626
"time"
2727

28+
"golang.org/x/sync/errgroup"
2829
batchv1 "k8s.io/api/batch/v1"
2930
corev1 "k8s.io/api/core/v1"
3031
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
@@ -105,12 +106,63 @@ func run() error {
105106
func setupCtrlRuntimeManagement(ctx context.Context) error {
106107
slog.Info("Running in controller runtime managed mode")
107108

109+
mgr, err := createManager()
110+
if err != nil {
111+
return err
112+
}
113+
114+
params := initializer.InitializationParams{
115+
TomlConfigPath: tomlConfigPath,
116+
DryRun: dryRun,
117+
EnableLogCollector: enableLogCollector,
118+
Config: mgr.GetConfig(),
119+
}
120+
121+
g, gCtx := errgroup.WithContext(ctx)
122+
123+
// Start the manager first so health/metrics endpoints are live immediately.
124+
// This prevents Kubernetes liveness probes from killing the pod while MongoDB
125+
// initialization (which may be slow due to stale resume tokens or connectivity
126+
// issues) is still in progress.
127+
g.Go(func() error {
128+
slog.Info("Starting controller runtime controller")
129+
130+
if err := mgr.Start(gCtx); err != nil {
131+
slog.Error("Problem running manager", "error", err)
132+
return err
133+
}
134+
135+
return nil
136+
})
137+
138+
// Initialize datastore and reconciler concurrently — the manager is already
139+
// serving health probes, so the pod won't be killed during this phase.
140+
// cleanupReconciler is set once the reconciler is created so cleanup can run
141+
// after g.Wait() (i.e., after the manager has fully drained).
142+
var cleanupReconciler func()
143+
144+
g.Go(func() error {
145+
cleanup, initErr := initializeAndWatch(gCtx, params, mgr)
146+
cleanupReconciler = cleanup
147+
148+
return initErr
149+
})
150+
151+
err = g.Wait()
152+
153+
if cleanupReconciler != nil {
154+
cleanupReconciler()
155+
}
156+
157+
return err
158+
}
159+
160+
func createManager() (ctrl.Manager, error) {
108161
cfg := ctrl.GetConfigOrDie()
109162
cfg.Wrap(func(rt http.RoundTripper) http.RoundTripper {
110163
return auditlogger.NewAuditingRoundTripper(rt)
111164
})
112165

113-
//TODO: setup informers for node and job
114166
mgr, err := ctrl.NewManager(cfg, ctrl.Options{
115167
Scheme: scheme,
116168
Metrics: metricsserver.Options{
@@ -126,52 +178,64 @@ func setupCtrlRuntimeManagement(ctx context.Context) error {
126178
})
127179
if err != nil {
128180
slog.Error("Unable to start manager", "error", err)
129-
return err
181+
return nil, err
130182
}
131183

132184
if err = mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil {
133185
slog.Error("Unable to set up health check", "error", err)
134-
return err
186+
return nil, err
135187
}
136188

137189
if err := mgr.AddReadyzCheck("readyz", healthz.Ping); err != nil {
138190
slog.Error("Unable to set up ready check", "error", err)
139-
return err
191+
return nil, err
140192
}
141193

142-
params := initializer.InitializationParams{
143-
TomlConfigPath: tomlConfigPath,
144-
DryRun: dryRun,
145-
EnableLogCollector: enableLogCollector,
146-
Config: mgr.GetConfig(),
147-
}
194+
return mgr, nil
195+
}
196+
197+
const reconcilerCloseTimeout = 30 * time.Second
148198

199+
// initializeAndWatch performs MongoDB initialization, registers the reconciler, and
200+
// blocks until shutdown or unexpected stream death. It returns a cleanup function that
201+
// the caller must invoke after the manager has fully stopped (after g.Wait) so that
202+
// datastore resources are not torn down under in-flight reconciles.
203+
func initializeAndWatch(
204+
ctx context.Context, params initializer.InitializationParams, mgr ctrl.Manager,
205+
) (cleanup func(), err error) {
149206
components, err := initializer.InitializeAll(ctx, params, mgr.GetClient())
150207
if err != nil {
151-
return fmt.Errorf("initialization failed: %w", err)
208+
return nil, fmt.Errorf("initialization failed: %w", err)
152209
}
153210

154211
reconciler := components.FaultRemediationReconciler
155212

156-
defer func() {
157-
if err := reconciler.CloseAll(ctx); err != nil {
158-
slog.Error("failed to close datastore components", "error", err)
213+
cleanup = func() {
214+
closeCtx, cancel := context.WithTimeout(context.Background(), reconcilerCloseTimeout)
215+
defer cancel()
216+
217+
if closeErr := reconciler.CloseAll(closeCtx); closeErr != nil {
218+
slog.Error("failed to close datastore components", "error", closeErr)
159219
}
160-
}()
220+
}
161221

162-
err = components.FaultRemediationReconciler.SetupWithManager(ctx, mgr)
163-
if err != nil {
164-
return fmt.Errorf("SetupWithManager failed: %w", err)
222+
watcherDone, setupErr := reconciler.SetupWithManager(ctx, mgr)
223+
if setupErr != nil {
224+
return cleanup, fmt.Errorf("SetupWithManager failed: %w", setupErr)
165225
}
166226

167-
slog.Info("Starting controller runtime controller")
227+
slog.Info("Initialization completed, reconciler registered with manager")
168228

169-
if err = mgr.Start(ctx); err != nil {
170-
slog.Error("Problem running manager", "error", err)
171-
return err
172-
}
229+
select {
230+
case <-ctx.Done():
231+
return cleanup, nil
232+
case <-watcherDone:
233+
if ctx.Err() == nil {
234+
return cleanup, fmt.Errorf("change stream watcher terminated unexpectedly, event processing has stopped")
235+
}
173236

174-
return nil
237+
return cleanup, nil
238+
}
175239
}
176240

177241
func parseFlags() {

fault-remediation/pkg/reconciler/reconciler.go

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -593,14 +593,14 @@ func (r *FaultRemediationReconciler) CloseAll(ctx context.Context) error {
593593
}
594594

595595
// SetupWithManager configures the reconciler for controller-runtime managed operation.
596-
// It starts the watcher stream using the provided context and registers the reconciler
597-
// with the manager using a typed channel source. This method should only be called
598-
// when running under controller-runtime management.
599-
func (r *FaultRemediationReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager) error {
596+
// It starts the watcher stream and returns a done channel that is closed when the event
597+
// adapter goroutine exits. Callers should monitor this channel: if it closes while the
598+
// context is still active, the change stream died unexpectedly and the pod should exit.
599+
func (r *FaultRemediationReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager) (<-chan struct{}, error) {
600600
r.Watcher.Start(ctx)
601601

602602
reconciler := builder.TypedControllerManagedBy[*datastore.EventWithToken](mgr)
603-
typedCh := AdaptEvents(ctx, r.Watcher.Events())
603+
typedCh, watcherDone := AdaptEvents(ctx, r.Watcher.Events())
604604

605605
src := source.TypedChannel[*datastore.EventWithToken, *datastore.EventWithToken](
606606
typedCh,
@@ -615,25 +615,31 @@ func (r *FaultRemediationReconciler) SetupWithManager(ctx context.Context, mgr c
615615
},
616616
)
617617

618-
return reconciler.
618+
err := reconciler.
619619
Named("fault-remediation-controller").
620620
WatchesRawSource(
621621
src,
622622
).
623623
Complete(r)
624+
625+
return watcherDone, err
624626
}
625627

626628
// AdaptEvents transforms a channel of EventWithToken into a channel of controller-runtime
627629
// TypedGenericEvent. It spawns a goroutine that continuously reads from the input channel
628630
// until either the context is cancelled or the input channel is closed.
631+
// The returned done channel is closed when the adapter goroutine exits. If the input channel
632+
// closed while the context was still active, this indicates the change stream died unexpectedly.
629633
func AdaptEvents(
630634
ctx context.Context,
631635
in <-chan datastore.EventWithToken,
632-
) <-chan event.TypedGenericEvent[*datastore.EventWithToken] {
636+
) (<-chan event.TypedGenericEvent[*datastore.EventWithToken], <-chan struct{}) {
633637
out := make(chan event.TypedGenericEvent[*datastore.EventWithToken])
638+
done := make(chan struct{})
634639

635640
go func() {
636641
defer close(out)
642+
defer close(done)
637643

638644
for {
639645
select {
@@ -650,5 +656,5 @@ func AdaptEvents(
650656
}
651657
}()
652658

653-
return out
659+
return out, done
654660
}

fault-remediation/pkg/reconciler/reconciler_e2e_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -296,7 +296,7 @@ func TestMain(m *testing.M) {
296296

297297
reconciler = NewFaultRemediationReconciler(nil, mockWatcher, mockStore, cfg, false)
298298

299-
err = reconciler.SetupWithManager(testContext, mgr)
299+
_, err = reconciler.SetupWithManager(testContext, mgr)
300300
if err != nil {
301301
log.Fatalf("Failed to launch reconciler with mgr %v", err)
302302
}

fault-remediation/pkg/reconciler/reconciler_test.go

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1029,3 +1029,55 @@ func TestLogCollectorOnlyCalledWhenShouldCreateCR(t *testing.T) {
10291029
})
10301030
}
10311031
}
1032+
1033+
func TestAdaptEvents_DoneChannelClosesOnInputClose(t *testing.T) {
1034+
ctx, cancel := context.WithCancel(context.Background())
1035+
defer cancel()
1036+
1037+
in := make(chan datastore.EventWithToken)
1038+
_, done := AdaptEvents(ctx, in)
1039+
1040+
// Close the input channel to simulate change stream death
1041+
close(in)
1042+
1043+
select {
1044+
case <-done:
1045+
// done channel closed as expected
1046+
case <-time.After(2 * time.Second):
1047+
t.Fatal("done channel was not closed after input channel closed")
1048+
}
1049+
}
1050+
1051+
func TestAdaptEvents_DoneChannelClosesOnContextCancel(t *testing.T) {
1052+
ctx, cancel := context.WithCancel(context.Background())
1053+
1054+
in := make(chan datastore.EventWithToken)
1055+
_, done := AdaptEvents(ctx, in)
1056+
1057+
cancel()
1058+
1059+
select {
1060+
case <-done:
1061+
// done channel closed as expected
1062+
case <-time.After(2 * time.Second):
1063+
t.Fatal("done channel was not closed after context cancellation")
1064+
}
1065+
}
1066+
1067+
func TestAdaptEvents_ForwardsEvents(t *testing.T) {
1068+
ctx, cancel := context.WithCancel(context.Background())
1069+
defer cancel()
1070+
1071+
in := make(chan datastore.EventWithToken, 1)
1072+
out, _ := AdaptEvents(ctx, in)
1073+
1074+
testEvent := datastore.EventWithToken{}
1075+
in <- testEvent
1076+
1077+
select {
1078+
case <-out:
1079+
// Event forwarded successfully
1080+
case <-time.After(2 * time.Second):
1081+
t.Fatal("event was not forwarded through AdaptEvents")
1082+
}
1083+
}

node-drainer/main.go

Lines changed: 29 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -112,48 +112,50 @@ func run() error {
112112
DryRun: *dryRun,
113113
}
114114

115-
components, err := initializer.InitializeAll(ctx, params)
115+
// Create and start the health/metrics server BEFORE the potentially slow MongoDB
116+
// initialization. This ensures Kubernetes liveness probes get HTTP 200 responses
117+
// immediately, preventing the pod from being killed during initialization.
118+
srv, err := createMetricsServer(*metricsPort)
119+
if err != nil {
120+
return err
121+
}
122+
123+
g, gCtx := errgroup.WithContext(ctx)
124+
125+
startMetricsServer(g, gCtx, srv)
126+
127+
// Initialize components (may block on MongoDB connectivity / change stream setup)
128+
components, err := initializer.InitializeAll(gCtx, params)
116129
if err != nil {
117130
return fmt.Errorf("failed to initialize components: %w", err)
118131
}
119132

120133
// Informers must sync before processing events
121134
slog.Info("Starting Kubernetes informers")
122135

123-
if err := components.Informers.Run(ctx); err != nil {
136+
if err := components.Informers.Run(gCtx); err != nil {
124137
return fmt.Errorf("failed to start informers: %w", err)
125138
}
126139

127140
slog.Info("Kubernetes informers started and synced")
128141

129142
slog.Info("Starting queue worker")
130-
components.QueueManager.Start(ctx)
143+
components.QueueManager.Start(gCtx)
131144

132145
// Handle cold start - re-process any events that were in-progress during restart
133146
slog.Info("Handling cold start")
134147

135-
if err := handleColdStart(ctx, components); err != nil {
148+
if err := handleColdStart(gCtx, components); err != nil {
136149
slog.Error("Cold start handling failed", "error", err)
137150
}
138151

139152
slog.Info("Starting database event watcher")
140153

141154
criticalError := make(chan error)
142-
startEventWatcher(ctx, components, criticalError)
155+
startEventWatcher(gCtx, components, criticalError)
143156

144157
slog.Info("All components started successfully")
145158

146-
srv, err := createMetricsServer(*metricsPort)
147-
if err != nil {
148-
return err
149-
}
150-
151-
// Start server in errgroup alongside event watcher monitoring
152-
g, gCtx := errgroup.WithContext(ctx)
153-
154-
// Start the metrics/health server
155-
startMetricsServer(g, gCtx, srv)
156-
157159
// Monitor for critical errors or graceful shutdown signals.
158160
g.Go(func() error {
159161
select {
@@ -176,7 +178,7 @@ func run() error {
176178
return shutdownComponents(ctx, components)
177179
})
178180

179-
// Wait for both goroutines to finish
181+
// Wait for all goroutines to finish
180182
return g.Wait()
181183
}
182184

@@ -242,7 +244,16 @@ func startEventWatcher(ctx context.Context, components *initializer.Components,
242244
}
243245
}
244246

245-
slog.Info("Event watcher stopped")
247+
// The event channel closed. If the context is still active, this means the
248+
// change stream died unexpectedly (e.g., MongoDB error). Signal a critical
249+
// failure so the pod exits and Kubernetes restarts it.
250+
if ctx.Err() == nil {
251+
slog.Error("Event watcher channel closed unexpectedly, event processing has stopped")
252+
253+
criticalError <- fmt.Errorf("event watcher channel closed unexpectedly")
254+
} else {
255+
slog.Info("Event watcher stopped")
256+
}
246257
}()
247258
}
248259

0 commit comments

Comments
 (0)