Skip to content

Commit e55b90b

Browse files
stop search watch connection when failed cluster recovered
Signed-off-by: changzhen <[email protected]>
1 parent dd65ac4 commit e55b90b

File tree

2 files changed

+137
-1
lines changed

2 files changed

+137
-1
lines changed

pkg/search/proxy/store/multi_cluster_cache.go

Lines changed: 80 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,11 @@ type MultiClusterCache struct {
6161
restMapper meta.RESTMapper
6262
// newClientFunc returns a dynamic client for member cluster apiserver
6363
newClientFunc func(string) (dynamic.Interface, error)
64+
65+
// activeWatchers tracks all active watch connections for each GVR
66+
// key: GVR string representation, value: list of active watch multiplexers
67+
activeWatchersLock sync.RWMutex
68+
activeWatchers map[string][]*invalidatableWatchMux
6469
}
6570

6671
var _ Store = &MultiClusterCache{}
@@ -72,6 +77,7 @@ func NewMultiClusterCache(newClientFunc func(string) (dynamic.Interface, error),
7277
newClientFunc: newClientFunc,
7378
cache: map[string]*clusterCache{},
7479
registeredResources: map[schema.GroupVersionResource]struct{}{},
80+
activeWatchers: map[string][]*invalidatableWatchMux{},
7581
}
7682
}
7783

@@ -116,19 +122,33 @@ func (c *MultiClusterCache) UpdateCache(resourcesByCluster map[string]map[schema
116122
}
117123

118124
// add/update cluster cache
125+
clustersAdded := false
126+
addedClusters := []string{}
119127
for clusterName, resources := range resourcesByCluster {
120128
cache, exist := c.cache[clusterName]
121129
if !exist {
122130
klog.Infof("Add cache for cluster %v", clusterName)
123131
cache = newClusterCache(clusterName, c.clientForClusterFunc(clusterName), c.restMapper)
124132
c.cache[clusterName] = cache
133+
// Any cluster being added to cache (whether new or recovered) should trigger invalidation
134+
// This is critical for cluster recovery scenarios where existing watch connections
135+
// don't include the recovered cluster's resources
136+
clustersAdded = true
137+
addedClusters = append(addedClusters, clusterName)
125138
}
126139
err := cache.updateCache(resources)
127140
if err != nil {
128141
return err
129142
}
130143
}
131144
c.registeredResources = registeredResources
145+
146+
// Only invalidate watches when clusters are added (not removed)
147+
// Cluster removal is already handled by cacher.Stop() -> terminateAllWatchers()
148+
if clustersAdded {
149+
klog.Infof("Cluster topology changed (clusters added: %v), invalidating all active watches to trigger reconnection", addedClusters)
150+
c.invalidateAllWatches()
151+
}
132152
return nil
133153
}
134154

@@ -347,7 +367,7 @@ func (c *MultiClusterCache) Watch(ctx context.Context, gvr schema.GroupVersionRe
347367
accessor.SetResourceVersion(responseResourceVersion.String())
348368
}
349369

350-
mux := newWatchMux()
370+
mux := newInvalidatableWatchMux()
351371
clusters := c.getClusterNames()
352372
for i := range clusters {
353373
cluster := clusters[i]
@@ -367,6 +387,9 @@ func (c *MultiClusterCache) Watch(ctx context.Context, gvr schema.GroupVersionRe
367387
})
368388
}
369389
mux.Start()
390+
391+
// Register this watch so we can invalidate it when cluster topology changes
392+
c.registerWatch(gvr, mux)
370393
return mux, nil
371394
}
372395

@@ -500,6 +523,62 @@ func (c *MultiClusterCache) getClusterResourceVersion(ctx context.Context, clust
500523
return listObj.GetResourceVersion(), nil
501524
}
502525

526+
// registerWatch registers an active watch connection
527+
func (c *MultiClusterCache) registerWatch(gvr schema.GroupVersionResource, mux *invalidatableWatchMux) {
528+
c.activeWatchersLock.Lock()
529+
defer c.activeWatchersLock.Unlock()
530+
531+
key := gvr.String()
532+
c.activeWatchers[key] = append(c.activeWatchers[key], mux)
533+
534+
// Set up cleanup when watch is stopped
535+
go func() {
536+
<-mux.StoppedCh()
537+
c.unregisterWatch(gvr, mux)
538+
}()
539+
}
540+
541+
// unregisterWatch removes a watch connection from tracking
542+
func (c *MultiClusterCache) unregisterWatch(gvr schema.GroupVersionResource, mux *invalidatableWatchMux) {
543+
c.activeWatchersLock.Lock()
544+
defer c.activeWatchersLock.Unlock()
545+
546+
key := gvr.String()
547+
watchers := c.activeWatchers[key]
548+
for i, w := range watchers {
549+
if w == mux {
550+
// Remove from slice
551+
c.activeWatchers[key] = append(watchers[:i], watchers[i+1:]...)
552+
break
553+
}
554+
}
555+
556+
// Clean up empty entries
557+
if len(c.activeWatchers[key]) == 0 {
558+
delete(c.activeWatchers, key)
559+
}
560+
}
561+
562+
// invalidateAllWatches sends invalidation events to all active watches
563+
// This causes clients to reconnect and get the updated cluster list
564+
func (c *MultiClusterCache) invalidateAllWatches() {
565+
c.activeWatchersLock.RLock()
566+
defer c.activeWatchersLock.RUnlock()
567+
568+
totalWatches := 0
569+
for _, watchers := range c.activeWatchers {
570+
totalWatches += len(watchers)
571+
for _, mux := range watchers {
572+
// Send invalidation event asynchronously to avoid blocking
573+
go mux.Invalidate()
574+
}
575+
}
576+
577+
if totalWatches > 0 {
578+
klog.Infof("Sent invalidation signal to %d active watch connections", totalWatches)
579+
}
580+
}
581+
503582
// Inputs and outputs:
504583
// o.ResourceVersion o.Continue | cluster options.ResourceVersion options.Continue mrv
505584
// xxxx "" | "" xxxx "" decode(xxx)

pkg/search/proxy/store/util.go

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
"k8s.io/apimachinery/pkg/runtime"
2828
"k8s.io/apimachinery/pkg/util/sets"
2929
"k8s.io/apimachinery/pkg/watch"
30+
"k8s.io/klog/v2"
3031

3132
clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
3233
)
@@ -278,6 +279,62 @@ func (w *watchMux) startWatchSource(source watch.Interface, decorator func(watch
278279
}
279280
}
280281

282+
// invalidatableWatchMux extends watchMux with the ability to send invalidation events
283+
type invalidatableWatchMux struct {
284+
*watchMux
285+
stopped chan struct{} // Signals when the watch is stopped
286+
}
287+
288+
func newInvalidatableWatchMux() *invalidatableWatchMux {
289+
return &invalidatableWatchMux{
290+
watchMux: newWatchMux(),
291+
stopped: make(chan struct{}),
292+
}
293+
}
294+
295+
// Start extends the base Start method to handle the stopped channel
296+
func (w *invalidatableWatchMux) Start() {
297+
w.watchMux.Start()
298+
299+
// Close stopped channel when the watch ends
300+
go func() {
301+
<-w.watchMux.done
302+
close(w.stopped)
303+
}()
304+
}
305+
306+
// Invalidate sends an error event to trigger client reconnection
307+
func (w *invalidatableWatchMux) Invalidate() {
308+
select {
309+
case <-w.watchMux.done:
310+
// Watch already stopped, nothing to do
311+
return
312+
default:
313+
}
314+
315+
klog.V(2).Infof("Invalidating watch: cluster topology changed, closing watch connection")
316+
317+
// Directly stop the watch.
318+
// This will:
319+
// 1. Close the w.watchMux.done channel.
320+
// 2. Cause all source watchers to stop.
321+
// 3. Close the w.watchMux.result channel.
322+
// 4. ResultChan() on the client side will return ok=false when reading from it.
323+
w.Stop()
324+
325+
klog.V(4).Infof("Watch connection closed successfully")
326+
}
327+
328+
// StoppedCh returns a channel that is closed when the watch is stopped
329+
func (w *invalidatableWatchMux) StoppedCh() <-chan struct{} {
330+
return w.stopped
331+
}
332+
333+
// Stop extends the base Stop method
334+
func (w *invalidatableWatchMux) Stop() {
335+
w.watchMux.Stop()
336+
}
337+
281338
// MultiNamespace contains multiple namespaces.
282339
type MultiNamespace struct {
283340
allNamespaces bool

0 commit comments

Comments
 (0)