Skip to content

Commit 9e75ea0

Browse files
committed
feature: Robustify the operator termination codepath
1 parent fb2d434 commit 9e75ea0

File tree

13 files changed

+126
-53
lines changed

13 files changed

+126
-53
lines changed

internal/controllers/dataplane.go

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,12 +38,12 @@ import (
3838
// DataplaneReconciler reconciles a Dataplane object.
3939
type dataplaneReconciler struct {
4040
client.Client
41-
eventCh chan event.Event
41+
eventCh event.EventChannel
4242
terminating bool
4343
log logr.Logger
4444
}
4545

46-
func NewDataplaneController(mgr manager.Manager, ch chan event.Event, log logr.Logger) (Controller, error) {
46+
func NewDataplaneController(mgr manager.Manager, ch event.EventChannel, log logr.Logger) (Controller, error) {
4747
r := &dataplaneReconciler{
4848
Client: mgr.GetClient(),
4949
eventCh: ch,
@@ -54,12 +54,17 @@ func NewDataplaneController(mgr manager.Manager, ch chan event.Event, log logr.L
5454
if err != nil {
5555
return nil, err
5656
}
57+
58+
// increase the ref count on the channel
59+
r.eventCh.Get()
60+
5761
r.log.Info("created dataplane controller")
5862

5963
if err := c.Watch(
6064
source.Kind(mgr.GetCache(), &stnrgwv1.Dataplane{},
6165
&handler.TypedEnqueueRequestForObject[*stnrgwv1.Dataplane]{},
62-
predicate.TypedGenerationChangedPredicate[*stnrgwv1.Dataplane]{}), // trigger when the Dataplane spec changes
66+
// trigger when the Dataplane spec changes
67+
predicate.TypedGenerationChangedPredicate[*stnrgwv1.Dataplane]{}),
6368
); err != nil {
6469
return nil, err
6570
}
@@ -95,11 +100,12 @@ func (r *dataplaneReconciler) Reconcile(ctx context.Context, req reconcile.Reque
95100
store.Dataplanes.Reset(dataplaneList)
96101
r.log.V(2).Info("Reset Dataplane store", "configs", store.Dataplanes.String())
97102

98-
r.eventCh <- event.NewEventReconcile()
103+
r.eventCh.Channel() <- event.NewEventReconcile()
99104

100105
return reconcile.Result{}, nil
101106
}
102107

103108
func (r *dataplaneReconciler) Terminate() {
104109
r.terminating = true
110+
r.eventCh.Put()
105111
}

internal/controllers/gateway.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,13 +41,13 @@ const (
4141

4242
type gatewayReconciler struct {
4343
client.Client
44-
eventCh chan event.Event
44+
eventCh event.EventChannel
4545
terminating bool
4646
log logr.Logger
4747
}
4848

4949
// NewGatewayController registers a reconciler for Gateway and the associated Secret objects.
50-
func NewGatewayController(mgr manager.Manager, ch chan event.Event, log logr.Logger) (Controller, error) {
50+
func NewGatewayController(mgr manager.Manager, ch event.EventChannel, log logr.Logger) (Controller, error) {
5151
ctx := context.Background()
5252
r := &gatewayReconciler{
5353
Client: mgr.GetClient(),
@@ -59,6 +59,10 @@ func NewGatewayController(mgr manager.Manager, ch chan event.Event, log logr.Log
5959
if err != nil {
6060
return nil, err
6161
}
62+
63+
// increase the ref count on the channel
64+
r.eventCh.Get()
65+
6266
r.log.Info("Created Gateway controller")
6367

6468
// watch GatewayClass objects that match this controller name
@@ -275,7 +279,7 @@ func (r *gatewayReconciler) Reconcile(ctx context.Context, req reconcile.Request
275279
store.DaemonSets.Reset(daemonSetList)
276280
r.log.V(2).Info("reset DaemonSet store", "daemonSets", store.DaemonSets.String())
277281

278-
r.eventCh <- event.NewEventReconcile()
282+
r.eventCh.Channel() <- event.NewEventReconcile()
279283

280284
return reconcile.Result{}, nil
281285
}
@@ -438,4 +442,5 @@ func secretGatewayIndexFunc(o client.Object) []string {
438442

439443
func (r *gatewayReconciler) Terminate() {
440444
r.terminating = true
445+
r.eventCh.Put()
441446
}

internal/controllers/gatewayconfig.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,12 +44,12 @@ const secretGatewayConfigIndex = "secretGatewayConfigIndex"
4444
// GatewayConfigReconciler reconciles a GatewayConfig object
4545
type gatewayConfigReconciler struct {
4646
client.Client
47-
eventCh chan event.Event
47+
eventCh event.EventChannel
4848
terminating bool
4949
log logr.Logger
5050
}
5151

52-
func NewGatewayConfigController(mgr manager.Manager, ch chan event.Event, log logr.Logger) (Controller, error) {
52+
func NewGatewayConfigController(mgr manager.Manager, ch event.EventChannel, log logr.Logger) (Controller, error) {
5353
ctx := context.Background()
5454
r := &gatewayConfigReconciler{
5555
Client: mgr.GetClient(),
@@ -61,6 +61,10 @@ func NewGatewayConfigController(mgr manager.Manager, ch chan event.Event, log lo
6161
if err != nil {
6262
return nil, err
6363
}
64+
65+
// increase the ref count on the channel
66+
r.eventCh.Get()
67+
6468
r.log.Info("Created GatewayConfig controller")
6569

6670
if err := c.Watch(
@@ -160,7 +164,7 @@ func (r *gatewayConfigReconciler) Reconcile(ctx context.Context, req reconcile.R
160164
r.log.V(2).Info("Reset AuthSecret store", "secrets", store.AuthSecrets.String())
161165

162166
if !r.terminating {
163-
r.eventCh <- event.NewEventReconcile()
167+
r.eventCh.Channel() <- event.NewEventReconcile()
164168
}
165169

166170
return reconcile.Result{}, nil
@@ -219,4 +223,5 @@ func secretGatewayConfigIndexFunc(o client.Object) []string {
219223

220224
func (r *gatewayConfigReconciler) Terminate() {
221225
r.terminating = true
226+
r.eventCh.Put()
222227
}

internal/controllers/node.go

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,12 @@ const NodeListSize = 10
2828

2929
type nodeReconciler struct {
3030
client.Client
31-
eventCh chan event.Event
31+
eventCh event.EventChannel
3232
terminating bool
3333
log logr.Logger
3434
}
3535

36-
func NewNodeController(mgr manager.Manager, ch chan event.Event, log logr.Logger) (Controller, error) {
36+
func NewNodeController(mgr manager.Manager, ch event.EventChannel, log logr.Logger) (Controller, error) {
3737
r := &nodeReconciler{
3838
Client: mgr.GetClient(),
3939
eventCh: ch,
@@ -44,6 +44,10 @@ func NewNodeController(mgr manager.Manager, ch chan event.Event, log logr.Logger
4444
if err != nil {
4545
return nil, err
4646
}
47+
48+
// increase the ref count on the channel
49+
r.eventCh.Get()
50+
4751
r.log.Info("created node controller")
4852

4953
if err := c.Watch(
@@ -69,6 +73,7 @@ func (r *nodeReconciler) Reconcile(ctx context.Context, req reconcile.Request) (
6973
log.Info("Reconciling")
7074

7175
// the node being reconciled
76+
eventCh := r.eventCh.Channel()
7277
node := &corev1.Node{}
7378
if err := r.Get(ctx, req.NamespacedName, node); err != nil {
7479
if !apierrors.IsNotFound(err) {
@@ -78,7 +83,7 @@ func (r *nodeReconciler) Reconcile(ctx context.Context, req reconcile.Request) (
7883
log.Info("node removed: triggering reconcile")
7984
store.Nodes.Remove(req.NamespacedName)
8085

81-
r.eventCh <- event.NewEventReconcile()
86+
eventCh <- event.NewEventReconcile()
8287
return reconcile.Result{}, nil
8388
}
8489

@@ -87,7 +92,7 @@ func (r *nodeReconciler) Reconcile(ctx context.Context, req reconcile.Request) (
8792
log.Info("node added: triggering reconcile")
8893
store.Nodes.Upsert(node)
8994

90-
r.eventCh <- event.NewEventReconcile()
95+
eventCh <- event.NewEventReconcile()
9196
return reconcile.Result{}, nil
9297

9398
}
@@ -101,10 +106,11 @@ func (r *nodeReconciler) Reconcile(ctx context.Context, req reconcile.Request) (
101106
log.Info("node addresses changed: triggering reconcile")
102107
store.Nodes.Upsert(node)
103108

104-
r.eventCh <- event.NewEventReconcile()
109+
eventCh <- event.NewEventReconcile()
105110
return reconcile.Result{}, nil
106111
}
107112

108113
func (r *nodeReconciler) Terminate() {
109114
r.terminating = true
115+
r.eventCh.Put()
110116
}

internal/controllers/udproute.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,12 +41,12 @@ const (
4141

4242
type udpRouteReconciler struct {
4343
client.Client
44-
eventCh chan event.Event
44+
eventCh event.EventChannel
4545
terminating bool
4646
log logr.Logger
4747
}
4848

49-
func NewUDPRouteController(mgr manager.Manager, ch chan event.Event, log logr.Logger) (Controller, error) {
49+
func NewUDPRouteController(mgr manager.Manager, ch event.EventChannel, log logr.Logger) (Controller, error) {
5050
ctx := context.Background()
5151
r := &udpRouteReconciler{
5252
Client: mgr.GetClient(),
@@ -58,6 +58,10 @@ func NewUDPRouteController(mgr manager.Manager, ch chan event.Event, log logr.Lo
5858
if err != nil {
5959
return nil, err
6060
}
61+
62+
// increase the ref count on the channel
63+
r.eventCh.Get()
64+
6165
r.log.Info("Created UDPRoute controller")
6266

6367
// watch UDPRoute objects
@@ -354,7 +358,7 @@ func (r *udpRouteReconciler) Reconcile(ctx context.Context, req reconcile.Reques
354358
store.StaticServices.Reset(ssvcList)
355359
r.log.V(2).Info("Reset StaticService store", "static-services", store.StaticServices.String())
356360

357-
r.eventCh <- event.NewEventReconcile()
361+
r.eventCh.Channel() <- event.NewEventReconcile()
358362

359363
return reconcile.Result{}, nil
360364
}
@@ -636,6 +640,7 @@ func staticServiceUDPRouteIndexFunc(o client.Object) []string {
636640

637641
func (r *udpRouteReconciler) Terminate() {
638642
r.terminating = true
643+
r.eventCh.Put()
639644
}
640645

641646
// TypedLabelSelectorPredicate is the generic version of LabelSelectorPredicate that somehow seems

internal/event/channel.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package event
2+
3+
import "sync"
4+
5+
type EventChannel interface {
6+
// Channel returns the event channel without incrementing the reference count.
7+
Channel() chan Event
8+
// Get returns the event channel and increments the reference count.
9+
Get() chan Event
10+
// Put decrements the reference count.
11+
Put()
12+
// Close blocks until all references are gone and then closes the channel.
13+
Close()
14+
}
15+
16+
type eventChannel struct {
17+
ch chan Event
18+
wg sync.WaitGroup
19+
}
20+
21+
func NewEventChannel(ch chan Event) EventChannel {
22+
return &eventChannel{ch: ch, wg: sync.WaitGroup{}}
23+
}
24+
25+
func (ec *eventChannel) Channel() chan Event { return ec.ch }
26+
func (ec *eventChannel) Get() chan Event { ec.wg.Add(1); return ec.ch }
27+
func (ec *eventChannel) Put() { ec.wg.Done() }
28+
func (ec *eventChannel) Close() { ec.wg.Wait(); close(ec.ch) }

internal/licensemanager/manager.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ type Manager interface {
2525
// Generate a license configuration for the dataplane.
2626
GenerateLicenseConfig() (stnrv1.LicenseConfig, error)
2727
// SetOperatorChannel sets up the operator channel where the manager can send rendering
28-
SetOperatorChannel(c chan event.Event)
28+
SetOperatorChannel(c event.EventChannel)
2929
// LastError returns the last license manager error.
3030
LastError() error
3131
// Status returns the current licensing status.

internal/licensemanager/stub.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,11 @@ type stubMgr struct{}
1515

1616
func NewStubManager(_ string, _ logr.Logger) Manager { return &stubMgr{} }
1717

18-
func (_ *stubMgr) Start(_ context.Context) error { return nil }
19-
func (_ *stubMgr) Validate(_ licensecfg.Feature) bool { return true }
20-
func (_ *stubMgr) Status() stnrv1.LicenseStatus { return stnrv1.NewEmptyLicenseStatus() }
21-
func (_ *stubMgr) LastError() error { return nil }
22-
func (_ *stubMgr) SetOperatorChannel(_ chan event.Event) {}
18+
func (_ *stubMgr) Start(_ context.Context) error { return nil }
19+
func (_ *stubMgr) Validate(_ licensecfg.Feature) bool { return true }
20+
func (_ *stubMgr) Status() stnrv1.LicenseStatus { return stnrv1.NewEmptyLicenseStatus() }
21+
func (_ *stubMgr) LastError() error { return nil }
22+
func (_ *stubMgr) SetOperatorChannel(_ event.EventChannel) {}
2323
func (_ *stubMgr) GenerateLicenseConfig() (stnrv1.LicenseConfig, error) {
2424
return stnrv1.LicenseConfig{}, nil
2525
}

internal/operator/operator.go

Lines changed: 24 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -45,27 +45,29 @@ type OperatorConfig struct {
4545
}
4646

4747
type Operator struct {
48-
ctx context.Context
49-
mgr manager.Manager
50-
gwConfC, dpC, gwC, rouC, nodeC controllers.Controller
51-
renderCh, operatorCh, updaterCh, configCh chan event.Event
52-
manager manager.Manager
53-
tracker *config.ProgressTracker
54-
progressReporters []config.ProgressReporter
55-
finalizer bool
56-
gen, lastAckedGen int
57-
ackLock sync.RWMutex
58-
log, logger logr.Logger
48+
ctx context.Context
49+
mgr manager.Manager
50+
gwConfC, dpC, gwC, rouC, nodeC controllers.Controller
51+
operatorCh event.EventChannel
52+
renderCh, updaterCh, configCh chan event.Event
53+
manager manager.Manager
54+
tracker *config.ProgressTracker
55+
progressReporters []config.ProgressReporter
56+
finalizer bool
57+
gen, lastAckedGen int
58+
ackLock sync.RWMutex
59+
log, logger logr.Logger
5960
}
6061

6162
// NewOperator creates a new Operator
6263
func NewOperator(cfg OperatorConfig) *Operator {
6364
config.ControllerName = cfg.ControllerName
6465

66+
opCh := make(chan event.Event, channelBufferSize)
6567
return &Operator{
6668
mgr: cfg.Manager,
6769
renderCh: cfg.RenderCh,
68-
operatorCh: make(chan event.Event, channelBufferSize),
70+
operatorCh: event.NewEventChannel(opCh),
6971
updaterCh: cfg.UpdaterCh,
7072
configCh: cfg.ConfigCh,
7173
tracker: config.NewProgressTracker(),
@@ -89,6 +91,9 @@ func (o *Operator) Start(ctx context.Context, cancel context.CancelFunc) error {
8991
return fmt.Errorf("Controller runtime manager uninitialized")
9092
}
9193

94+
// increment the refcount on our operator channel
95+
o.operatorCh.Get()
96+
9297
log.V(3).Info("Starting GatewayConfig controller")
9398
c, err := controllers.NewGatewayConfigController(o.mgr, o.operatorCh, o.logger)
9499
if err != nil {
@@ -130,7 +135,7 @@ func (o *Operator) Start(ctx context.Context, cancel context.CancelFunc) error {
130135
}
131136

132137
func (o *Operator) eventLoop(ctx context.Context, cancel context.CancelFunc) {
133-
defer close(o.operatorCh)
138+
defer o.operatorCh.Close()
134139

135140
throttler := time.NewTicker(config.ThrottleTimeout)
136141
throttler.Stop()
@@ -139,7 +144,7 @@ func (o *Operator) eventLoop(ctx context.Context, cancel context.CancelFunc) {
139144
for {
140145
select {
141146

142-
case e := <-o.operatorCh:
147+
case e := <-o.operatorCh.Channel():
143148
switch e.GetType() {
144149
case event.EventTypeUpdate:
145150
// pass through to the updater
@@ -221,6 +226,9 @@ func (o *Operator) Terminate() {
221226
if o.finalizer {
222227
o.Finalize()
223228
}
229+
230+
// release our channel
231+
o.operatorCh.Put()
224232
}
225233

226234
// Finalize invalidates the status on all the managed resources. Note that Finalize must be called
@@ -239,7 +247,7 @@ func (o *Operator) Finalize() {
239247
"last-acked-generation", lastGen)
240248

241249
// event loop is blocked: we must handle message passing ourselves
242-
u := <-o.operatorCh
250+
u := <-o.operatorCh.Channel()
243251

244252
o.log.V(2).Info("Renderer ready, initiating the updater", "event", u.String())
245253

@@ -254,7 +262,7 @@ func (o *Operator) Finalize() {
254262
timeout := time.After(2 * time.Second)
255263
for {
256264
select {
257-
case <-o.operatorCh:
265+
case <-o.operatorCh.Channel():
258266
if o.GetLastAckedGeneration() != finalGen {
259267
o.log.V(2).Info("Update ready, exiting finalizer",
260268
"gen", o.gen, "last-acked-generation", lastGen)

0 commit comments

Comments
 (0)