Skip to content

Commit e7a7089

Browse files
authored
feat(controller): add watch-list for initial cluster cache sync (argoproj#77) (argoproj#78)
* feat(controller): add watch-list for initial cluster cache sync The cluster cache's initial sync currently issues a LIST with ResourceVersion=0 per resource kind. That serves from the apiserver watch cache (no etcd) but returns every object in a single response body, because the apiserver ignores the limit parameter when serving from cache. On clusters with many objects of one kind (e.g. 17k StatefulSets on oxygen) the response exceeds what the GKE control-plane load balancer will carry, and the HTTP/2 stream is reset with INTERNAL_ERROR before the controller reads it. The cluster cache never finishes initializing, so every Application on the shard goes into ComparisonError and stays there. This adds an opt-in watch-list path (KEP-3157, GA in k8s 1.32) that opens a WATCH with sendInitialEvents=true instead of a LIST. The apiserver streams each object as an individual ADDED event, then sends a bookmark annotated with k8s.io/initial-events-end to mark the snapshot boundary. Same watch-cache source, same etcd-free guarantee, but the response is a stream of small events rather than one giant body — long-lived watch streams are what the LB already handles for every informer in the cluster. Enable with ARGOCD_CLUSTER_CACHE_WATCH_LIST=true. On clusters where the WatchList feature gate is disabled server-side (k8s <1.32 by default), the first rejected request latches a per-cluster flag and subsequent kinds fall back to the LIST path, so the cost is one 400 round-trip at most. * add retry/backoff to watchListResources Mirror the retry.OnError pattern from listResources so both initial-sync paths honor the same listRetryLimit/listRetryUseBackoff/listRetryFunc knobs. The retry predicate skips BadRequest/Invalid so the fallback-to-LIST latch stays a single fast round-trip on clusters without the WatchList feature gate. Addresses review feedback on argoproj#77.
1 parent fdc654d commit e7a7089

File tree

3 files changed

+183
-21
lines changed

3 files changed

+183
-21
lines changed

controller/cache/cache.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,12 @@ const (
7575
// EnvClusterCacheEventsProcessingInterval is the env variable to control the interval between processing events when BatchEventsProcessing is enabled
7676
EnvClusterCacheEventsProcessingInterval = "ARGOCD_CLUSTER_CACHE_EVENTS_PROCESSING_INTERVAL"
7777

78+
// EnvClusterCacheWatchList enables watch-list (sendInitialEvents) for
79+
// initial cluster cache sync. Requires the WatchList feature gate on the
80+
// target apiserver (GA in k8s 1.32). Falls back to LIST per cluster if
81+
// the apiserver rejects the request.
82+
EnvClusterCacheWatchList = "ARGOCD_CLUSTER_CACHE_WATCH_LIST"
83+
7884
// AnnotationIgnoreResourceUpdates when set to true on an untracked resource,
7985
// argo will apply `ignoreResourceUpdates` configuration on it.
8086
AnnotationIgnoreResourceUpdates = "argocd.argoproj.io/ignore-resource-updates"
@@ -115,6 +121,9 @@ var (
115121

116122
// clusterCacheEventsProcessingInterval specifies the interval between processing events when BatchEventsProcessing is enabled
117123
clusterCacheEventsProcessingInterval = 100 * time.Millisecond
124+
125+
// clusterCacheWatchList specifies whether to use watch-list for initial sync
126+
clusterCacheWatchList = false
118127
)
119128

120129
func init() {
@@ -128,6 +137,7 @@ func init() {
128137
clusterCacheRetryUseBackoff = env.ParseBoolFromEnv(EnvClusterCacheRetryUseBackoff, false)
129138
clusterCacheBatchEventsProcessing = env.ParseBoolFromEnv(EnvClusterCacheBatchEventsProcessing, true)
130139
clusterCacheEventsProcessingInterval = env.ParseDurationFromEnv(EnvClusterCacheEventsProcessingInterval, clusterCacheEventsProcessingInterval, 0, math.MaxInt64)
140+
clusterCacheWatchList = env.ParseBoolFromEnv(EnvClusterCacheWatchList, clusterCacheWatchList)
131141
}
132142

133143
type LiveStateCache interface {
@@ -587,6 +597,7 @@ func (c *liveStateCache) getCluster(cluster *appv1.Cluster) (clustercache.Cluste
587597
clustercache.SetRespectRBAC(respectRBAC),
588598
clustercache.SetBatchEventsProcessing(clusterCacheBatchEventsProcessing),
589599
clustercache.SetEventProcessingInterval(clusterCacheEventsProcessingInterval),
600+
clustercache.SetWatchList(clusterCacheWatchList),
590601
}
591602

592603
clusterCache = clustercache.NewClusterCache(clusterCacheConfig, clusterCacheOpts...)

gitops-engine/pkg/cache/cluster.go

Lines changed: 160 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,12 @@ package cache
2626

2727
import (
2828
"context"
29+
"errors"
2930
"fmt"
3031
"runtime/debug"
3132
"strings"
3233
"sync"
34+
"sync/atomic"
3335
"time"
3436

3537
"github.com/go-logr/logr"
@@ -55,11 +57,17 @@ import (
5557
"k8s.io/client-go/util/retry"
5658
"k8s.io/klog/v2/textlogger"
5759
"k8s.io/kubectl/pkg/util/openapi"
60+
"k8s.io/utils/ptr"
5861

5962
"github.com/argoproj/gitops-engine/pkg/utils/kube"
6063
"github.com/argoproj/gitops-engine/pkg/utils/tracing"
6164
)
6265

66+
// errWatchListUnsupported signals that the apiserver rejected a
67+
// sendInitialEvents watch request and the caller should fall back
68+
// to a LIST-based sync.
69+
var errWatchListUnsupported = errors.New("apiserver does not support watch-list (sendInitialEvents)")
70+
6371
const (
6472
watchResourcesRetryTimeout = 1 * time.Second
6573
ClusterRetryTimeout = 10 * time.Second
@@ -284,6 +292,17 @@ type clusterCache struct {
284292
// Maps any resource's UID to its direct children's ResourceKeys
285293
// Eliminates need for O(n) graph building during hierarchy traversal
286294
parentUIDToChildren map[types.UID][]kube.ResourceKey
295+
296+
// watchListEnabled opts this cache into using watch-list
297+
// (sendInitialEvents) for initial state sync instead of LIST. Requires
298+
// the WatchList feature gate on the apiserver (GA in k8s 1.32).
299+
watchListEnabled bool
300+
// watchListUnsupported latches to true after the first sendInitialEvents
301+
// watch is rejected by the apiserver, so subsequent syncs skip straight
302+
// to the LIST-based path. This avoids a 400 round-trip per group/kind
303+
// on clusters where watchListEnabled is set but the feature gate is
304+
// disabled server-side.
305+
watchListUnsupported atomic.Bool
287306
}
288307

289308
type clusterCacheSync struct {
@@ -705,6 +724,138 @@ func runSynced(lock sync.Locker, action func() error) error {
705724
return action()
706725
}
707726

727+
// watchListResources loads the initial state of a resource kind using a
728+
// watch request with sendInitialEvents=true (KEP-3157) instead of a LIST.
729+
// This is served from the apiserver's watch cache (no etcd round-trip) and
730+
// delivers objects as a stream of ADDED events terminated by a bookmark,
731+
// avoiding the single oversized response body that a ResourceVersion=0 LIST
732+
// produces on clusters with many objects of one kind.
733+
//
734+
// Returns errWatchListUnsupported if the apiserver rejects the request
735+
// because the WatchList feature gate is disabled (k8s <1.32 by default).
736+
// Callers should fall back to listResources on that error.
737+
func (c *clusterCache) watchListResources(ctx context.Context, resClient dynamic.ResourceInterface, callback func(*unstructured.Unstructured) error) (string, error) {
738+
if err := c.listSemaphore.Acquire(ctx, 1); err != nil {
739+
return "", fmt.Errorf("failed to acquire list semaphore: %w", err)
740+
}
741+
defer c.listSemaphore.Release(1)
742+
743+
// Bound the initial-state stream so a slow or stalled apiserver does not
744+
// tie up a watch slot indefinitely. Reuse the watch resync timeout as the
745+
// upper bound, matching the behaviour of the subsequent long-running watch.
746+
timeoutSeconds := int64(c.watchResyncTimeout.Seconds())
747+
opts := metav1.ListOptions{
748+
SendInitialEvents: ptr.To(true),
749+
ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan,
750+
AllowWatchBookmarks: true,
751+
TimeoutSeconds: &timeoutSeconds,
752+
}
753+
754+
var listRetry wait.Backoff
755+
if c.listRetryUseBackoff {
756+
listRetry = retry.DefaultBackoff
757+
} else {
758+
listRetry = retry.DefaultRetry
759+
}
760+
listRetry.Steps = int(c.listRetryLimit)
761+
762+
retryable := func(err error) bool {
763+
// BadRequest/Invalid means the feature gate is off server-side; no
764+
// point retrying, let the caller fall back to LIST immediately.
765+
if apierrors.IsBadRequest(err) || apierrors.IsInvalid(err) {
766+
return false
767+
}
768+
return c.listRetryFunc(err)
769+
}
770+
771+
var w watch.Interface
772+
var retryCount int64
773+
err := retry.OnError(listRetry, retryable, func() error {
774+
var ierr error
775+
w, ierr = resClient.Watch(ctx, opts)
776+
if ierr != nil && c.listRetryLimit > 1 && retryable(ierr) {
777+
retryCount++
778+
c.log.Info(fmt.Sprintf("Error starting watch-list: %v (try %d/%d)", ierr, retryCount, c.listRetryLimit))
779+
}
780+
//nolint:wrapcheck // wrap outside the retry
781+
return ierr
782+
})
783+
if err != nil {
784+
if apierrors.IsBadRequest(err) || apierrors.IsInvalid(err) {
785+
return "", fmt.Errorf("%w: %v", errWatchListUnsupported, err)
786+
}
787+
return "", fmt.Errorf("failed to start watch-list: %w", err)
788+
}
789+
defer w.Stop()
790+
791+
for {
792+
select {
793+
case <-ctx.Done():
794+
return "", ctx.Err()
795+
case event, ok := <-w.ResultChan():
796+
if !ok {
797+
return "", errors.New("watch-list stream closed before initial-events-end bookmark")
798+
}
799+
switch event.Type {
800+
case watch.Error:
801+
status := apierrors.FromObject(event.Object)
802+
if apierrors.IsBadRequest(status) || apierrors.IsInvalid(status) {
803+
return "", fmt.Errorf("%w: %v", errWatchListUnsupported, status)
804+
}
805+
return "", fmt.Errorf("watch-list received error event: %w", status)
806+
case watch.Bookmark:
807+
obj, ok := event.Object.(*unstructured.Unstructured)
808+
if !ok {
809+
return "", fmt.Errorf("watch-list bookmark has unexpected type %T", event.Object)
810+
}
811+
if obj.GetAnnotations()[metav1.InitialEventsAnnotationKey] == "true" {
812+
return obj.GetResourceVersion(), nil
813+
}
814+
case watch.Added, watch.Modified:
815+
obj, ok := event.Object.(*unstructured.Unstructured)
816+
if !ok {
817+
return "", fmt.Errorf("watch-list event object has unexpected type %T", event.Object)
818+
}
819+
if err := callback(obj); err != nil {
820+
return "", err
821+
}
822+
case watch.Deleted:
823+
// Deletion events before the end-of-initial-state bookmark are
824+
// not expected; ignore rather than fail so a race between the
825+
// snapshot and a concurrent delete does not abort the sync.
826+
}
827+
}
828+
}
829+
}
830+
831+
// eachResourceWithFallback streams the current state of a resource kind,
832+
// preferring watch-list (sendInitialEvents) and falling back to a
833+
// ResourceVersion=0 LIST if the apiserver rejects watch-list. The per-cluster
834+
// watchListUnsupported flag makes the fallback sticky so clusters without the
835+
// feature gate take at most one rejected watch round-trip.
836+
func (c *clusterCache) eachResourceWithFallback(ctx context.Context, resClient dynamic.ResourceInterface, callback func(*unstructured.Unstructured) error) (string, error) {
837+
if c.watchListEnabled && !c.watchListUnsupported.Load() {
838+
rv, err := c.watchListResources(ctx, resClient, callback)
839+
if err == nil {
840+
return rv, nil
841+
}
842+
if !errors.Is(err, errWatchListUnsupported) {
843+
return "", err
844+
}
845+
c.watchListUnsupported.Store(true)
846+
c.log.Info("watch-list unsupported by apiserver, falling back to LIST for remaining kinds", "host", c.config.Host, "error", err)
847+
}
848+
return c.listResources(ctx, resClient, func(listPager *pager.ListPager) error {
849+
return listPager.EachListItem(ctx, metav1.ListOptions{}, func(obj runtime.Object) error {
850+
un, ok := obj.(*unstructured.Unstructured)
851+
if !ok {
852+
return fmt.Errorf("watch-list fallback received unexpected object type %T", obj)
853+
}
854+
return callback(un)
855+
})
856+
})
857+
}
858+
708859
// listResources creates list pager and enforces number of concurrent list requests
709860
// The callback should not wait on any locks that may be held by other callers.
710861
func (c *clusterCache) listResources(ctx context.Context, resClient dynamic.ResourceInterface, callback func(*pager.ListPager) error) (string, error) {
@@ -763,15 +914,9 @@ func (c *clusterCache) listResources(ctx context.Context, resClient dynamic.Reso
763914
// loadInitialState loads the state of all the resources retrieved by the given resource client.
764915
func (c *clusterCache) loadInitialState(ctx context.Context, api kube.APIResourceInfo, resClient dynamic.ResourceInterface, ns string, lock bool) (string, error) {
765916
var items []*Resource
766-
resourceVersion, err := c.listResources(ctx, resClient, func(listPager *pager.ListPager) error {
767-
return listPager.EachListItem(ctx, metav1.ListOptions{}, func(obj runtime.Object) error {
768-
if un, ok := obj.(*unstructured.Unstructured); !ok {
769-
return fmt.Errorf("object %s/%s has an unexpected type", un.GroupVersionKind().String(), un.GetName())
770-
} else {
771-
items = append(items, c.newResource(un))
772-
}
773-
return nil
774-
})
917+
resourceVersion, err := c.eachResourceWithFallback(ctx, resClient, func(un *unstructured.Unstructured) error {
918+
items = append(items, c.newResource(un))
919+
return nil
775920
})
776921
if err != nil {
777922
return "", fmt.Errorf("failed to load initial state of resource %s: %w", api.GroupKind.String(), err)
@@ -1073,18 +1218,12 @@ func (c *clusterCache) sync() error {
10731218
lock.Unlock()
10741219

10751220
return c.processApi(client, api, func(resClient dynamic.ResourceInterface, ns string) error {
1076-
resourceVersion, err := c.listResources(ctx, resClient, func(listPager *pager.ListPager) error {
1077-
return listPager.EachListItem(context.Background(), metav1.ListOptions{}, func(obj runtime.Object) error {
1078-
if un, ok := obj.(*unstructured.Unstructured); !ok {
1079-
return fmt.Errorf("object %s/%s has an unexpected type", un.GroupVersionKind().String(), un.GetName())
1080-
} else {
1081-
newRes := c.newResource(un)
1082-
lock.Lock()
1083-
c.setNode(newRes)
1084-
lock.Unlock()
1085-
}
1086-
return nil
1087-
})
1221+
resourceVersion, err := c.eachResourceWithFallback(ctx, resClient, func(un *unstructured.Unstructured) error {
1222+
newRes := c.newResource(un)
1223+
lock.Lock()
1224+
c.setNode(newRes)
1225+
lock.Unlock()
1226+
return nil
10881227
})
10891228
if err != nil {
10901229
if c.isRestrictedResource(err) {

gitops-engine/pkg/cache/settings.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,18 @@ func SetListPageSize(listPageSize int64) UpdateSettingsFunc {
8787
}
8888
}
8989

90+
// SetWatchList enables watch-list (sendInitialEvents) for initial resource
91+
// state sync. When enabled, the cache opens a watch with sendInitialEvents=true
92+
// instead of issuing a LIST, which streams objects individually and avoids the
93+
// single oversized response that a watch-cache LIST produces on large clusters.
94+
// Requires the WatchList feature gate on the apiserver (GA in k8s 1.32).
95+
// Falls back to LIST automatically if the apiserver rejects the request.
96+
func SetWatchList(enabled bool) UpdateSettingsFunc {
97+
return func(cache *clusterCache) {
98+
cache.watchListEnabled = enabled
99+
}
100+
}
101+
90102
// SetListPageBufferSize sets the number of pages to prefetch for list pager.
91103
func SetListPageBufferSize(listPageBufferSize int32) UpdateSettingsFunc {
92104
return func(cache *clusterCache) {

0 commit comments

Comments
 (0)