Skip to content

Commit 9a41b31

Browse files
committed
switch to tracking services
1 parent e367b69 commit 9a41b31

File tree

4 files changed

+42
-15
lines changed

4 files changed

+42
-15
lines changed

internal/kube/controller/controller.go

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ type Controller struct {
4040
connectorWatcher *watchers.ConnectorWatcher
4141
linkAccessWatcher *watchers.RouterAccessWatcher
4242
grantWatcher *watchers.AccessGrantWatcher
43+
serviceWatcher *watchers.ServiceWatcher
4344
sites map[string]*site.Site
4445
startGrantServer func()
4546
accessMgr *securedaccess.SecuredAccessManager
@@ -52,6 +53,7 @@ type Controller struct {
5253
attachableConnectors map[string]*skupperv2alpha1.AttachedConnector
5354
log *slog.Logger
5455
namespaces *NamespaceConfig
56+
observedServices map[string]string
5557
}
5658

5759
func skupperRouterConfig() internalinterfaces.TweakListOptionsFunc {
@@ -71,6 +73,12 @@ func listenerServices() internalinterfaces.TweakListOptionsFunc {
7173
}
7274
}
7375

76+
func sansSkupperListenerServices() internalinterfaces.TweakListOptionsFunc {
77+
return func(options *metav1.ListOptions) {
78+
options.LabelSelector = "!internal.skupper.io/listener"
79+
}
80+
}
81+
7482
func skupperSiteSizingConfig() internalinterfaces.TweakListOptionsFunc {
7583
return func(options *metav1.ListOptions) {
7684
options.LabelSelector = sizing.SiteSizingLabel
@@ -91,6 +99,7 @@ func NewController(cli internalclient.Clients, config *Config, options ...watche
9199
labelling: labels.NewLabelsAndAnnotations(config.Namespace),
92100
attachableConnectors: map[string]*skupperv2alpha1.AttachedConnector{},
93101
log: slog.New(slog.Default().Handler()).With(slog.String("component", "kube.controller")),
102+
observedServices: map[string]string{},
94103
}
95104

96105
hostname := os.Getenv("HOSTNAME")
@@ -120,6 +129,7 @@ func NewController(cli internalclient.Clients, config *Config, options ...watche
120129
controller.siteWatcher = controller.eventProcessor.WatchSites(config.WatchNamespace, filter(controller, controller.checkSite))
121130
controller.listenerWatcher = controller.eventProcessor.WatchListeners(config.WatchNamespace, filter(controller, controller.checkListener))
122131
controller.eventProcessor.WatchServices(listenerServices(), config.WatchNamespace, filter(controller, controller.checkListenerService))
132+
controller.serviceWatcher = controller.eventProcessor.WatchServices(sansSkupperListenerServices(), config.WatchNamespace, filter(controller, controller.checkObservedService))
123133
controller.connectorWatcher = controller.eventProcessor.WatchConnectors(config.WatchNamespace, filter(controller, controller.checkConnector))
124134
controller.linkAccessWatcher = controller.eventProcessor.WatchRouterAccesses(config.WatchNamespace, filter(controller, controller.checkRouterAccess))
125135
controller.eventProcessor.WatchAttachedConnectors(config.WatchNamespace, filter(controller, controller.checkAttachedConnector))
@@ -220,6 +230,10 @@ func (c *Controller) init(stopCh <-chan struct{}) error {
220230
)
221231
c.labelling.Update(config.Namespace+"/"+config.Name, config)
222232
}
233+
// get observed services prior to restoring listeners
234+
for _, svc := range c.serviceWatcher.List() {
235+
c.observedServices[svc.Namespace+"/"+svc.ObjectMeta.Name] = svc.ObjectMeta.Name
236+
}
223237
//recover existing sites & bindings
224238
siteRecovery := site.NewSiteRecovery(c.eventProcessor.GetKubeClient())
225239
for _, site := range c.siteWatcher.List() {
@@ -266,7 +280,8 @@ func (c *Controller) init(stopCh <-chan struct{}) error {
266280
slog.String("name", listener.Name),
267281
slog.String("namespace", listener.Namespace),
268282
)
269-
site.CheckListener(listener.ObjectMeta.Name, listener)
283+
_, svcExists := c.observedServices[listener.ObjectMeta.Namespace+"/"+listener.Spec.Host]
284+
site.CheckListener(listener.ObjectMeta.Name, listener, svcExists)
270285
}
271286
for _, la := range c.linkAccessWatcher.List() {
272287
if !c.namespaces.isControlled(la.Namespace) {
@@ -367,7 +382,11 @@ func (c *Controller) checkListener(key string, listener *skupperv2alpha1.Listene
367382
if err != nil {
368383
return err
369384
}
370-
return c.getSite(namespace).CheckListener(name, listener)
385+
svcExists := false
386+
if listener != nil {
387+
_, svcExists = c.observedServices[namespace+"/"+listener.Spec.Host]
388+
}
389+
return c.getSite(namespace).CheckListener(name, listener, svcExists)
371390
}
372391

373392
func (c *Controller) checkListenerService(key string, svc *corev1.Service) error {
@@ -378,6 +397,17 @@ func (c *Controller) checkListenerService(key string, svc *corev1.Service) error
378397
return c.getSite(svc.Namespace).CheckListenerService(svc)
379398
}
380399

400+
func (c *Controller) checkObservedService(key string, svc *corev1.Service) error {
401+
c.log.Debug("checkObservedService", slog.String("key", key))
402+
403+
if svc == nil {
404+
delete(c.observedServices, key)
405+
} else {
406+
c.observedServices[key] = svc.ObjectMeta.Name
407+
}
408+
return nil
409+
}
410+
381411
func (c *Controller) checkLink(key string, linkconfig *skupperv2alpha1.Link) error {
382412
namespace, name, err := cache.SplitMetaNamespaceKey(key)
383413
if err != nil {

internal/kube/site/per_target_listener.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ func (p *PerTargetListener) updateListener(l *skupperv2alpha1.Listener) bool {
3131
}
3232

3333
func (p *PerTargetListener) extractTargets(network []skupperv2alpha1.SiteRecord, mapping *qdr.PortMapping, exposedPorts ExposedPorts, context BindingContext) (bool, error) {
34-
p.logger.Info("Extracting targets for listener",
34+
p.logger.Debug("Extracting targets for listener",
3535
slog.String("namespace", p.definition.Namespace),
3636
slog.String("listener", p.definition.Name))
3737
targets := extractTargets(p.address(""), network)

internal/kube/site/site.go

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -920,21 +920,15 @@ func (s *Site) CheckListenerService(svc *corev1.Service) error {
920920
return nil
921921
}
922922

923-
func (s *Site) CheckListener(name string, listener *skupperv2alpha1.Listener) error {
923+
func (s *Site) CheckListener(name string, listener *skupperv2alpha1.Listener, svcExists bool) error {
924924
if s.site == nil {
925925
if listener == nil {
926926
return nil
927927
}
928928
return s.updateListenerStatus(listener, stderrors.New("No active site in namespace"))
929929
}
930-
if listener != nil {
931-
ctxt := context.TODO()
932-
current, err := s.clients.GetKubeClient().CoreV1().Services(s.namespace).Get(ctxt, listener.Spec.Host, metav1.GetOptions{})
933-
if current != nil && err == nil {
934-
if !isOwned(current) {
935-
return s.updateListenerStatus(listener, fmt.Errorf("Service to expose %s already exists in namespace", listener.Spec.Host))
936-
}
937-
}
930+
if listener != nil && svcExists {
931+
return s.updateListenerStatus(listener, fmt.Errorf("Service %s already exists in namespace", listener.Spec.Host))
938932
}
939933

940934
update, err1 := s.bindings.UpdateListener(name, listener)

internal/kube/site/site_test.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -280,8 +280,9 @@ func TestSite_ExposeUnexpose(t *testing.T) {
280280
}
281281
func TestSite_CheckListener(t *testing.T) {
282282
type args struct {
283-
name string
284-
listener *skupperv2alpha1.Listener
283+
name string
284+
listener *skupperv2alpha1.Listener
285+
svcExists bool
285286
}
286287
tests := []struct {
287288
name string
@@ -317,6 +318,7 @@ func TestSite_CheckListener(t *testing.T) {
317318
Host: "1.2.3.4",
318319
},
319320
},
321+
svcExists: false,
320322
},
321323
skupperObjects: []runtime.Object{
322324
&skupperv2alpha1.Listener{
@@ -347,6 +349,7 @@ func TestSite_CheckListener(t *testing.T) {
347349
Host: "backend",
348350
},
349351
},
352+
svcExists: true,
350353
},
351354
skupperObjects: []runtime.Object{
352355
&skupperv2alpha1.Listener{
@@ -418,7 +421,7 @@ func TestSite_CheckListener(t *testing.T) {
418421
assert.Assert(t, err)
419422
}
420423

421-
if err := s.CheckListener(tt.args.name, tt.args.listener); (err != nil) != tt.wantErr {
424+
if err := s.CheckListener(tt.args.name, tt.args.listener, tt.args.svcExists); (err != nil) != tt.wantErr {
422425
t.Errorf("Site.CheckListener() error = %v", err)
423426
}
424427

0 commit comments

Comments
 (0)