Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion cmd/crd-replicator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ var scheme = runtime.NewScheme()
func init() {
_ = clientgoscheme.AddToScheme(scheme)
// +kubebuilder:scaffold:scheme

_ = liqov1beta1.AddToScheme(scheme)
}

func main() {
Expand Down Expand Up @@ -88,14 +90,15 @@ func main() {
reflectionManager := reflection.NewManager(dynClient, clusterID, *workers, *resyncPeriod)
reflectionManager.Start(ctx, resources.GetResourcesToReplicate())

reflectors := make(map[liqov1beta1.ClusterID]*reflection.Reflector)
d := &crdreplicator.Controller{
Scheme: mgr.GetScheme(),
Client: mgr.GetClient(),
ClusterID: clusterID,

RegisteredResources: resources.GetResourcesToReplicate(),
ReflectionManager: reflectionManager,
Reflectors: make(map[liqov1beta1.ClusterID]*reflection.Reflector),
Reflectors: reflectors,

IdentityReader: identitymanager.NewCertificateIdentityReader(ctx,
mgr.GetClient(), k8sClient, mgr.GetConfig(),
Expand All @@ -106,6 +109,16 @@ func main() {
os.Exit(1)
}

// Set up ForeignClusterStateController
fcStateController := &crdreplicator.ForeignClusterStateController{
Client: mgr.GetClient(),
Reflectors: reflectors,
}
if err = fcStateController.SetupWithManager(mgr); err != nil {
klog.Error(err, "unable to setup the ForeignClusterState controller")
os.Exit(1)
}

klog.Info("Starting crdReplicator manager")
if err := mgr.Start(ctx); err != nil {
klog.Error(err, "unable to start the crdReplicator manager")
Expand Down
8 changes: 8 additions & 0 deletions deployments/liqo/files/liqo-crd-replicator-ClusterRole.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,11 @@ rules:
- patch
- update
- watch
- apiGroups:
- core.liqo.io
resources:
- foreignclusters
verbs:
- get
- list
- watch
83 changes: 83 additions & 0 deletions internal/crdReplicator/foreignclusterstate_controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Copyright 2019-2025 The Liqo Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package crdreplicator

import (
"context"
"fmt"

"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"

liqov1beta1 "github.com/liqotech/liqo/apis/core/v1beta1"
"github.com/liqotech/liqo/internal/crdReplicator/reflection"
"github.com/liqotech/liqo/pkg/consts"
"github.com/liqotech/liqo/pkg/utils/foreigncluster"
)

// ForeignClusterStateController reconciles on the state of the foreign clusters to manager the reflection.
type ForeignClusterStateController struct {
client.Client

// Reflectors is a map containing the reflectors towards each remote cluster.
Reflectors map[liqov1beta1.ClusterID]*reflection.Reflector
}

// cluster-role
// +kubebuilder:rbac:groups=core.liqo.io,resources=foreignclusters,verbs=get;list;watch

// Reconcile reconciles the state of the foreign clusters to manage the reflection.
func (c *ForeignClusterStateController) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
// Fetch the ForeignCluster object
foreignCluster := &liqov1beta1.ForeignCluster{}
if err := c.Get(ctx, req.NamespacedName, foreignCluster); err != nil {
return ctrl.Result{}, client.IgnoreNotFound(err)
}

// If the foreign cluster seems to be disabled, skip reconciliation.
if foreignCluster.Status.Role == liqov1beta1.UnknownRole {
klog.Infof("ForeignCluster %q has unknown role, skipping reconciliation", foreignCluster.Name)
return ctrl.Result{}, nil
}

// Get the reflector for the remote cluster
reflector, exists := c.Reflectors[foreignCluster.Spec.ClusterID]
if !exists {
klog.Warningf("No reflector found for ForeignCluster %q, will retry later", foreignCluster.Name)
return ctrl.Result{}, fmt.Errorf("no reflector found for ForeignCluster %q", foreignCluster.Name)
}

if dead, message := foreigncluster.IsDead(foreignCluster); dead {
reflector.RemoteReachable = false
klog.Warningf("Remote cluster %q is dead: %s", foreignCluster.Name, message)
} else {
reflector.RemoteReachable = true
if message != "" {
klog.Warningf("Remote cluster %q is temporary unreachable: %s", foreignCluster.Name, message)
} else {
klog.Infof("Remote cluster %q is alive", foreignCluster.Name)
}
}

return ctrl.Result{}, nil
}

// SetupWithManager registers a new controller for identity Secrets.
func (c *ForeignClusterStateController) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).Named(consts.CtrlForeignClusterStateCRDReplicator).
For(&liqov1beta1.ForeignCluster{}).
Complete(c)
}
23 changes: 19 additions & 4 deletions internal/crdReplicator/reflection/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,16 @@ func (r *Reflector) handle(ctx context.Context, key item) error {
// Check if the local resource has been marked for deletion
if !localUnstr.GetDeletionTimestamp().IsZero() {
klog.Infof("[%v] Deleting remote %v with name %v, since the local one is being deleted", r.remoteClusterID, key.gvr, key.name)
vanished, err := r.deleteRemoteObject(ctx, resource, key)
if err != nil {
return err

// If the remote cluster is not reachable, then do not try to delete the remote object.
vanished := true
if r.RemoteReachable {
vanished, err = r.deleteRemoteObject(ctx, resource, key)
if err != nil {
return err
}
tracer.Step("Ensured the absence of the remote object")
}
tracer.Step("Ensured the absence of the remote object")

// Remove the finalizer from the local resource, if the remote one does no longer exist.
if vanished {
Expand All @@ -118,6 +123,11 @@ func (r *Reflector) handle(ctx context.Context, key item) error {
}
tracer.Step("Ensured the local finalizer presence")

if !resource.informer.HasSynced() {
klog.Warningf("[%v] The informer for %v is not synced, skipping remote reconciliation for %v", r.remoteClusterID, key.gvr, key.name)
return fmt.Errorf("the informer for %v is not synced, skipping remote reconciliation for %v", key.gvr, key.name)
}

// Retrieve the resource from the remote cluster
remote, err := resource.remote.Get(key.name)
switch {
Expand Down Expand Up @@ -258,6 +268,11 @@ func (r *Reflector) updateObjectStatusInner(ctx context.Context, cl dynamic.Inte

// deleteRemoteObject deletes a given object from the remote cluster.
func (r *Reflector) deleteRemoteObject(ctx context.Context, resource *reflectedResource, key item) (vanished bool, err error) {
if !r.RemoteReachable {
klog.Warningf("[%v] Remote cluster is marked as dead, skipping deletion of %v with name %v", r.remoteClusterID, key.gvr, key.name)
return true, nil
}

if _, err := resource.remote.Get(key.name); err != nil {
if kerrors.IsForbidden(err) {
klog.Infof("[%v] Cannot retrieve remote %v with name %v (permission removed by provider)", r.remoteClusterID, key.gvr, key.name)
Expand Down
25 changes: 15 additions & 10 deletions internal/crdReplicator/reflection/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,23 @@ import (
"k8s.io/client-go/dynamic"
"k8s.io/client-go/dynamic/dynamicinformer"
"k8s.io/client-go/dynamic/fake"
"k8s.io/client-go/informers"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/cache"

liqov1beta1 "github.com/liqotech/liqo/apis/core/v1beta1"
ipamv1alpha1 "github.com/liqotech/liqo/apis/ipam/v1alpha1"
offloadingv1beta1 "github.com/liqotech/liqo/apis/offloading/v1beta1"
"github.com/liqotech/liqo/pkg/consts"
)

func getDummyInformer(ctx context.Context, client dynamic.Interface, namespace string, gvr schema.GroupVersionResource) informers.GenericInformer {
factory := dynamicinformer.NewFilteredDynamicSharedInformerFactory(client, 0, namespace, func(lo *metav1.ListOptions) {})
informer := factory.ForResource(gvr)
factory.Start(ctx.Done())
factory.WaitForCacheSync(ctx.Done())
return informer
}

var _ = Describe("Handler tests", func() {

const (
Expand All @@ -64,13 +72,6 @@ var _ = Describe("Handler tests", func() {
)

Item := func(name string) item { return item{gvr: gvr, name: name} }
Lister := func(ctx context.Context, client dynamic.Interface, namespace string, gvr schema.GroupVersionResource) cache.GenericNamespaceLister {
factory := dynamicinformer.NewFilteredDynamicSharedInformerFactory(client, 0, namespace, func(lo *metav1.ListOptions) {})
informer := factory.ForResource(gvr)
factory.Start(ctx.Done())
factory.WaitForCacheSync(ctx.Done())
return informer.Lister().ByNamespace(namespace)
}

BeforeEach(func() {
ctx, cancel = context.WithCancel(context.Background())
Expand Down Expand Up @@ -99,11 +100,14 @@ var _ = Describe("Handler tests", func() {
local = fake.NewSimpleDynamicClient(scheme, localBefore.DeepCopy())
remote = fake.NewSimpleDynamicClient(scheme, remoteBefore.DeepCopy())

localInformer := getDummyInformer(ctx, local, localNamespace, gvr)
remoteInformer := getDummyInformer(ctx, remote, remoteNamespace, gvr)
reflector = Reflector{
manager: &Manager{
client: local,
},

RemoteReachable: true,
remoteClient: remote,
localNamespace: localNamespace,
remoteNamespace: remoteNamespace,
Expand All @@ -114,8 +118,9 @@ var _ = Describe("Handler tests", func() {
gvr: {
gvr: gvr,
ownership: ownership,
local: Lister(ctx, local, localNamespace, gvr),
remote: Lister(ctx, remote, remoteNamespace, gvr),
informer: remoteInformer.Informer(),
local: localInformer.Lister().ByNamespace(localNamespace),
remote: remoteInformer.Lister().ByNamespace(remoteNamespace),
},
},
}
Expand Down
21 changes: 10 additions & 11 deletions internal/crdReplicator/reflection/reflector.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@ import (

// Reflector represents an object managing the reflection of resources towards a given remote cluster.
type Reflector struct {
mu sync.RWMutex
// RemoteReachable indicates whether the remote cluster is reachable or if it is dead and will never come up.
RemoteReachable bool
mu sync.RWMutex

manager *Manager
localNamespace string
Expand All @@ -67,8 +69,9 @@ type reflectedResource struct {
gvr schema.GroupVersionResource
ownership consts.OwnershipType

local cache.GenericNamespaceLister
remote cache.GenericNamespaceLister
informer cache.SharedIndexInformer
local cache.GenericNamespaceLister
remote cache.GenericNamespaceLister

cancel context.CancelFunc
initialized bool
Expand All @@ -86,6 +89,7 @@ func (r *Reflector) GetSecretHash() string {

// Start starts the reflection towards the remote cluster.
func (r *Reflector) Start(ctx context.Context) {
r.RemoteReachable = true
ctx, r.cancel = context.WithCancel(ctx)
klog.Infof("[%v] Starting reflection towards remote cluster", r.remoteClusterID)
for i := uint(0); i < r.manager.workers; i++ {
Expand Down Expand Up @@ -139,8 +143,9 @@ func (r *Reflector) StartForResource(ctx context.Context, resource *resources.Re
gvr: gvr,
ownership: resource.Ownership,

local: r.manager.listers[gvr].ByNamespace(r.localNamespace),
remote: informer.Lister().ByNamespace(r.remoteNamespace),
informer: informer.Informer(),
local: r.manager.listers[gvr].ByNamespace(r.localNamespace),
remote: informer.Lister().ByNamespace(r.remoteNamespace),

cancel: cancel,
}
Expand All @@ -152,12 +157,6 @@ func (r *Reflector) StartForResource(ctx context.Context, resource *resources.Re

// Start the informer, and wait for its caches to sync
factory.Start(ctx.Done())
synced := factory.WaitForCacheSync(ctx.Done())

if !synced[gvr] {
// The context was closed before the cache was ready, let abort the setup
return
}

// The informer has synced, and we are now ready to start te replication
klog.Infof("[%v] Reflection of %v correctly started", r.remoteClusterID, gvr)
Expand Down
4 changes: 4 additions & 0 deletions pkg/consts/annotations.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,8 @@ const (
// WebhookServiceNameAnnotationKey is the constant representing
// the key of the annotation containing the Webhook service name.
WebhookServiceNameAnnotationKey = "liqo.io/webhook-service-name"

// ForeignClusterPermanentlyUnreachableAnnotationKey is the annotation used to signal that the foreign cluster is not reachable and it will
// never come up.
ForeignClusterPermanentlyUnreachableAnnotationKey = "liqo.io/foreign-cluster-permanently-unreachable"
)
7 changes: 4 additions & 3 deletions pkg/consts/controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,10 @@ package consts
// with the prefix "Ctrl".
const (
// Core.
CtrlForeignCluster = "foreigncluster"
CtrlSecretCRDReplicator = "secret_crdreplicator" //nolint:gosec // not a credential
CtrlSecretWebhook = "secret_webhook"
CtrlForeignCluster = "foreigncluster"
CtrlSecretCRDReplicator = "secret_crdreplicator" //nolint:gosec // not a credential
CtrlForeignClusterStateCRDReplicator = "foreignclusterstate_crdreplicator"
CtrlSecretWebhook = "secret_webhook"

// Networking.
CtrlConfigurationExternal = "configuration_external"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package virtualnodectrl

import (
"context"
"fmt"
"time"

corev1 "k8s.io/api/core/v1"
Expand All @@ -30,6 +31,7 @@ import (

offloadingv1beta1 "github.com/liqotech/liqo/apis/offloading/v1beta1"
"github.com/liqotech/liqo/pkg/consts"
"github.com/liqotech/liqo/pkg/utils/foreigncluster"
"github.com/liqotech/liqo/pkg/utils/indexer"
)

Expand All @@ -46,9 +48,25 @@ func drainNode(ctx context.Context, cl client.Client, vn *offloadingv1beta1.Virt
return err
}

if err = evictPods(ctx, cl, podsToEvict); err != nil {
// Check the foreign cluster status
fc, err := foreigncluster.GetForeignClusterByID(ctx, cl, vn.Spec.ClusterID)
if err != nil && !kerrors.IsNotFound(err) {
klog.Error(err)
return err
return fmt.Errorf("failed to get foreign cluster %q: %w", vn.Spec.ClusterID, err)
}

// Check whether foreigncluster is dead. If fc is nil, we assume the foreign cluster is dead.
if dead, msg := foreigncluster.IsDead(fc); dead {
klog.Infof("Force pod deletion: foreign cluster %q is dead: %s", vn.Spec.ClusterID, msg)
if err := forcePodsDeletion(ctx, cl, podsToEvict); err != nil {
klog.Error(err)
return fmt.Errorf("failed to force deletion of pods on virtual node %q: %w", vn.Name, err)
}
} else {
if err = evictPods(ctx, cl, podsToEvict); err != nil {
klog.Error(err)
return err
}
}

return nil
Expand Down Expand Up @@ -113,6 +131,22 @@ func evictPod(ctx context.Context, cl client.Client, pod *corev1.Pod) error {
return nil
}

func forcePodsDeletion(ctx context.Context, cl client.Client, podList *corev1.PodList) error {
for i := range podList.Items {
pod := &podList.Items[i]
klog.Infof("Force deletion of pod %s/%s", pod.Namespace, pod.Name)

if err := cl.Delete(ctx, pod, &client.DeleteOptions{
GracePeriodSeconds: new(int64), // Delete immediately orphan pod.
}); err != nil && !kerrors.IsNotFound(err) {
klog.Errorf("Failed to delete pod %s/%s: %v", pod.Namespace, pod.Name, err)
return err
}
}

return nil
}

// waitForDelete waits for the pod deletion.
func waitPodForDelete(ctx context.Context, cl client.Client, pod *corev1.Pod) error {
//nolint:staticcheck // Waiting for PollWithContextCancel implementation.
Expand Down
Loading