diff --git a/pkg/cache/cluster.go b/pkg/cache/cluster.go index 3f662effc..cc64d8c91 100644 --- a/pkg/cache/cluster.go +++ b/pkg/cache/cluster.go @@ -191,7 +191,7 @@ type clusterCache struct { // namespacedResources is a simple map which indicates a groupKind is namespaced namespacedResources map[schema.GroupKind]bool - // maximum time we allow watches to run before relisting the group/kind and restarting the watch + // maximum time we allow watches to run before restarting them watchResyncTimeout time.Duration // sync retry timeout for cluster when sync error happens clusterSyncRetryTimeout time.Duration @@ -605,6 +605,7 @@ func (c *clusterCache) loadInitialState(ctx context.Context, api kube.APIResourc } func (c *clusterCache) watchEvents(ctx context.Context, api kube.APIResourceInfo, resClient dynamic.ResourceInterface, ns string, resourceVersion string) { + timeoutSeconds := int64(c.watchResyncTimeout.Seconds()) kube.RetryUntilSucceed(ctx, watchResourcesRetryTimeout, fmt.Sprintf("watch %s on %s", api.GroupKind, c.config.Host), c.log, func() (err error) { defer func() { if r := recover(); r != nil { @@ -622,6 +623,7 @@ func (c *clusterCache) watchEvents(ctx context.Context, api kube.APIResourceInfo w, err := watchutil.NewRetryWatcher(resourceVersion, &cache.ListWatch{ WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + options.TimeoutSeconds = &timeoutSeconds res, err := resClient.Watch(ctx, options) if errors.IsNotFound(err) { c.stopWatching(api.GroupKind, ns) @@ -633,17 +635,7 @@ func (c *clusterCache) watchEvents(ctx context.Context, api kube.APIResourceInfo return err } - defer func() { - w.Stop() - resourceVersion = "" - }() - - var watchResyncTimeoutCh <-chan time.Time - if c.watchResyncTimeout > 0 { - shouldResync := time.NewTimer(c.watchResyncTimeout) - defer shouldResync.Stop() - watchResyncTimeoutCh = shouldResync.C - } + defer w.Stop() for { select { @@ -651,12 +643,13 @@ func (c *clusterCache) watchEvents(ctx context.Context, api kube.APIResourceInfo case <-ctx.Done(): return nil - // re-synchronize API state and restart watch periodically - case <-watchResyncTimeoutCh: - return fmt.Errorf("Resyncing %s on %s due to timeout", api.GroupKind, c.config.Host) - // re-synchronize API state and restart watch if retry watcher failed to continue watching using provided resource version case <-w.Done(): + // The underlying retry watcher has stopped, possibly due to specifying an RV in + // the watch request that is stale (error code 410). This forces us to relist + // objects from the kube-apiserver to get a fresher RV and we invoke that relist + // by resetting the locally stored RV. + resourceVersion = "" return fmt.Errorf("Watch %s on %s has closed", api.GroupKind, c.config.Host) case event, ok := <-w.ResultChan(): @@ -666,8 +659,16 @@ func (c *clusterCache) watchEvents(ctx context.Context, api kube.APIResourceInfo obj, ok := event.Object.(*unstructured.Unstructured) if !ok { + // We failed to cast the object received in the watch event to something + // that contains a resource version field. Because of that, we don't know + // from what RV we should reinitialize the watch connection, so in order to + // avoid any inconsistencies due to accidental skipping of a potential RV, + // we reset the locally stored RV to forcefully invoke the list API call to + // get it from the kube-apiserver. + resourceVersion = "" return fmt.Errorf("Failed to convert to *unstructured.Unstructured: %v", event.Object) } + resourceVersion = obj.GetResourceVersion() c.processEvent(event.Type, obj) if kube.IsCRD(obj) {