Skip to content

Commit 53ff542

Browse files
authored
Merge pull request #392 from barney-s/watch-timeout
Add the ability to set watch timeouts.
2 parents a2c2fc4 + 4a54ca1 commit 53ff542

File tree

1 file changed

+49
-3
lines changed

1 file changed

+49
-3
lines changed

pkg/patterns/declarative/pkg/watch/dynamic.go

+49-3
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"context"
2121
"fmt"
2222
"sync"
23+
"sync/atomic"
2324
"time"
2425

2526
"k8s.io/apimachinery/pkg/api/meta"
@@ -35,8 +36,19 @@ import (
3536
"sigs.k8s.io/controller-runtime/pkg/log"
3637
)
3738

38-
// WatchDelay is the time between a Watch being dropped and attempting to resume it
39-
const WatchDelay = 30 * time.Second
39+
var (
40+
// WatchActivityTimeout sets a timeout for a Watch activity under normal operation
41+
WatchActivityTimeout = 300 * time.Second
42+
// WatchActivityFirstTimeout sets a timeout for Watch activity in an Apply path
43+
// We expect the author to set this to a lower value in environments where it makes sense.
44+
// func (r *Reconciler) SetupWithManager(mgr ctrl.Manager) error { ... watch.WatchActivityFirstTimeout = 10 * time.Second ... }
45+
WatchActivityFirstTimeout = 300 * time.Second
46+
)
47+
48+
const (
49+
// WatchDelay is the time between a Watch being dropped and attempting to resume it
50+
WatchDelay = 30 * time.Second
51+
)
4052

4153
// NewDynamicWatch constructs a watcher for unstructured objects.
4254
// Deprecated: avoid using directly; will move to internal in future.
@@ -138,13 +150,46 @@ type clientObject struct {
138150
//
139151
// [1] https://github.com/kubernetes/kubernetes/issues/54878#issuecomment-357575276
140152
func (w *dynamicKindWatch) watchUntilClosed(ctx context.Context, eventTarget metav1.ObjectMeta, watchStarted *sync.WaitGroup) {
153+
var sawActivity atomic.Bool
154+
141155
log := log.FromContext(ctx)
142156

143157
options := w.FilterOptions
144158
// Though we don't use the resource version, we allow bookmarks to help keep TCP connections healthy.
145159
options.AllowWatchBookmarks = true
146160

147-
events, err := w.resource.Watch(context.TODO(), options)
161+
activityTimeout := WatchActivityTimeout
162+
if watchStarted != nil {
163+
activityTimeout = WatchActivityFirstTimeout
164+
}
165+
ctx, cancel := context.WithCancel(ctx)
166+
defer cancel()
167+
// Check for events periodically
168+
ticker := time.NewTicker(activityTimeout)
169+
defer ticker.Stop()
170+
sawActivity.Store(false)
171+
172+
go func() {
173+
for {
174+
select {
175+
case <-ctx.Done():
176+
return
177+
case <-ticker.C:
178+
if !sawActivity.Load() {
179+
log.WithValues("kind", w.GVK.String()).WithValues("namespace", w.FilterNamespace).WithValues("labels", options.LabelSelector).Info("no activity seen for a while, cancelling watch")
180+
cancel()
181+
return
182+
}
183+
sawActivity.Store(false)
184+
}
185+
}
186+
}()
187+
188+
events, err := w.resource.Watch(ctx, options)
189+
// If the Watch() call doesnt return, this would not be set to true thereby causing the timer to cancle the watch() context
190+
// We have seen cases where a proxy in between causes the first watch to hang if there were no matching objects to return
191+
sawActivity.Store(true)
192+
148193
if watchStarted != nil {
149194
watchStarted.Done()
150195
}
@@ -159,6 +204,7 @@ func (w *dynamicKindWatch) watchUntilClosed(ctx context.Context, eventTarget met
159204
defer events.Stop()
160205

161206
for clientEvent := range events.ResultChan() {
207+
sawActivity.Store(true)
162208
switch clientEvent.Type {
163209
case watch.Bookmark:
164210
// not an object change, we ignore it

0 commit comments

Comments
 (0)