Skip to content

Commit

Permalink
feat: Drop unnecessary listing for the sake of watch reinitialization
Browse files Browse the repository at this point in the history
Signed-off-by: Antoni Zawodny <[email protected]>
  • Loading branch information
tosi3k committed Jul 30, 2024
1 parent 6b2984e commit 409990b
Showing 1 changed file with 16 additions and 15 deletions.
31 changes: 16 additions & 15 deletions pkg/cache/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -605,6 +605,7 @@ func (c *clusterCache) loadInitialState(ctx context.Context, api kube.APIResourc
}

func (c *clusterCache) watchEvents(ctx context.Context, api kube.APIResourceInfo, resClient dynamic.ResourceInterface, ns string, resourceVersion string) {
timeoutSeconds := int64(c.watchResyncTimeout.Seconds())
kube.RetryUntilSucceed(ctx, watchResourcesRetryTimeout, fmt.Sprintf("watch %s on %s", api.GroupKind, c.config.Host), c.log, func() (err error) {
defer func() {
if r := recover(); r != nil {
Expand All @@ -622,6 +623,7 @@ func (c *clusterCache) watchEvents(ctx context.Context, api kube.APIResourceInfo

w, err := watchutil.NewRetryWatcher(resourceVersion, &cache.ListWatch{
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
options.TimeoutSeconds = &timeoutSeconds
res, err := resClient.Watch(ctx, options)
if errors.IsNotFound(err) {
c.stopWatching(api.GroupKind, ns)
Expand All @@ -633,30 +635,21 @@ func (c *clusterCache) watchEvents(ctx context.Context, api kube.APIResourceInfo
return err
}

defer func() {
w.Stop()
resourceVersion = ""
}()

var watchResyncTimeoutCh <-chan time.Time
if c.watchResyncTimeout > 0 {
shouldResync := time.NewTimer(c.watchResyncTimeout)
defer shouldResync.Stop()
watchResyncTimeoutCh = shouldResync.C
}
defer w.Stop()

for {
select {
// stop watching when parent context got cancelled
case <-ctx.Done():
return nil

// re-synchronize API state and restart watch periodically
case <-watchResyncTimeoutCh:
return fmt.Errorf("Resyncing %s on %s due to timeout", api.GroupKind, c.config.Host)

// re-synchronize API state and restart watch if retry watcher failed to continue watching using provided resource version
case <-w.Done():
// The underlying retry watcher has stopped, possibly due to specifying an RV in
// the watch request that is stale (error code 410). This forces us to relist
// objects from the kube-apiserver to get a fresher RV and we invoke that relist
// by resetting the locally stored RV.
resourceVersion = ""
return fmt.Errorf("Watch %s on %s has closed", api.GroupKind, c.config.Host)

case event, ok := <-w.ResultChan():
Expand All @@ -666,8 +659,16 @@ func (c *clusterCache) watchEvents(ctx context.Context, api kube.APIResourceInfo

obj, ok := event.Object.(*unstructured.Unstructured)
if !ok {
// We failed to cast the object received in the watch event to something
// that contains a resource version field. Because of that, we don't know
// from what RV we should reinitialize the watch connection, so in order to
// avoid any inconsistencies due to accidental skipping of a potential RV,
// we reset the locally stored RV to forcefully invoke the list API call to
// get it from the kube-apiserver.
resourceVersion = ""
return fmt.Errorf("Failed to convert to *unstructured.Unstructured: %v", event.Object)
}
resourceVersion = obj.GetResourceVersion()

c.processEvent(event.Type, obj)
if kube.IsCRD(obj) {
Expand Down

0 comments on commit 409990b

Please sign in to comment.