diff --git a/cmd/ipam/main.go b/cmd/ipam/main.go index ec992c4bf9..79c926a05d 100644 --- a/cmd/ipam/main.go +++ b/cmd/ipam/main.go @@ -23,7 +23,6 @@ import ( "github.com/spf13/cobra" "google.golang.org/grpc" - "google.golang.org/grpc/health" "google.golang.org/grpc/health/grpc_health_v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" @@ -46,7 +45,8 @@ import ( const leaderElectorName = "liqo-ipam-leaderelection" var ( - scheme = runtime.NewScheme() + scheme = runtime.NewScheme() + options ipam.Options ) func init() { @@ -59,8 +59,6 @@ func init() { // +kubebuilder:rbac:groups=apps,resources=deployments,verbs=get;list;watch; // +kubebuilder:rbac:groups=core,resources=pods,verbs=get;list;watch;update;patch -var options ipam.Options - func main() { var cmd = cobra.Command{ Use: "liqo-ipam", @@ -70,7 +68,12 @@ func main() { flagsutils.InitKlogFlags(cmd.Flags()) restcfg.InitFlags(cmd.Flags()) - cmd.Flags().IntVar(&options.Port, "port", consts.IpamPort, "The port on which to listen for incoming gRPC requests.") + // Server options. + cmd.Flags().IntVar(&options.ServerOpts.Port, "port", consts.IpamPort, "The port on which to listen for incoming gRPC requests.") + cmd.Flags().DurationVar(&options.ServerOpts.SyncFrequency, "interval", consts.SyncFrequency, + "The interval at which the IPAM will synchronize the IPAM storage.") + + // Leader election flags. cmd.Flags().BoolVar(&options.EnableLeaderElection, "leader-election", false, "Enable leader election for IPAM. "+ "Enabling this will ensure there is only one active IPAM.") cmd.Flags().StringVar(&options.LeaderElectionNamespace, "leader-election-namespace", consts.DefaultLiqoNamespace, @@ -102,14 +105,14 @@ func run(cmd *cobra.Command, _ []string) error { // Get the rest config. cfg := restcfg.SetRateLimiter(ctrl.GetConfigOrDie()) - options.Config = cfg + + // Get the client. cl, err := client.New(cfg, client.Options{ Scheme: scheme, }) if err != nil { return err } - options.Client = cl if options.EnableLeaderElection { if leader, err := leaderelection.Blocking(ctx, cfg, record.NewBroadcaster(), &leaderelection.Opts{ @@ -129,15 +132,12 @@ func run(cmd *cobra.Command, _ []string) error { } } - hs := health.NewServer() - options.HealthServer = hs - - liqoIPAM, err := ipam.New(ctx, &options) + liqoIPAM, err := ipam.New(ctx, cl, &options.ServerOpts) if err != nil { return err } - lis, err := net.Listen("tcp", fmt.Sprintf("0.0.0.0:%d", options.Port)) + lis, err := net.Listen("tcp", fmt.Sprintf("0.0.0.0:%d", options.ServerOpts.Port)) if err != nil { return err } @@ -145,7 +145,7 @@ func run(cmd *cobra.Command, _ []string) error { server := grpc.NewServer() // Register health service - grpc_health_v1.RegisterHealthServer(server, hs) + grpc_health_v1.RegisterHealthServer(server, liqoIPAM.HealthServer) // Register IPAM service ipam.RegisterIPAMServer(server, liqoIPAM) diff --git a/pkg/consts/ipam.go b/pkg/consts/ipam.go index 871ddb1bc5..35b45ff2f9 100644 --- a/pkg/consts/ipam.go +++ b/pkg/consts/ipam.go @@ -14,12 +14,16 @@ package consts +import "time" + // NetworkType indicates the type of Network. type NetworkType string const ( // IpamPort is the port used by the IPAM gRPC server. - IpamPort = 50051 + IpamPort = 6000 + // SyncFrequency is the frequency at which the IPAM should periodically sync its status. + SyncFrequency = 2 * time.Minute // NetworkNotRemappedLabelKey is the label key used to mark a Network that does not need CIDR remapping. NetworkNotRemappedLabelKey = "ipam.liqo.io/network-not-remapped" diff --git a/pkg/ipam/initialize.go b/pkg/ipam/initialize.go index 744e1262f9..647e04e8cf 100644 --- a/pkg/ipam/initialize.go +++ b/pkg/ipam/initialize.go @@ -18,19 +18,11 @@ import ( "context" klog "k8s.io/klog/v2" - - ipamv1alpha1 "github.com/liqotech/liqo/apis/ipam/v1alpha1" - "github.com/liqotech/liqo/pkg/consts" ) // +kubebuilder:rbac:groups=ipam.liqo.io,resources=ips,verbs=get;list;watch // +kubebuilder:rbac:groups=ipam.liqo.io,resources=networks,verbs=get;list;watch -type ipCidr struct { - ip string - cidr string -} - func (lipam *LiqoIPAM) initialize(ctx context.Context) error { if err := lipam.initializeNetworks(ctx); err != nil { return err @@ -45,15 +37,16 @@ func (lipam *LiqoIPAM) initialize(ctx context.Context) error { } func (lipam *LiqoIPAM) initializeNetworks(ctx context.Context) error { - // Initialize the networks. - nets, err := lipam.getReservedNetworks(ctx) + // List all networks present in the cluster. + nets, err := listNetworksOnCluster(ctx, lipam.Client) if err != nil { return err } + // Initialize the networks. for _, net := range nets { if err := lipam.reserveNetwork(net); err != nil { - klog.Errorf("Failed to reserve network %s: %v", net, err) + klog.Errorf("Failed to reserve network %q: %v", net, err) return err } } @@ -62,86 +55,19 @@ func (lipam *LiqoIPAM) initializeNetworks(ctx context.Context) error { } func (lipam *LiqoIPAM) initializeIPs(ctx context.Context) error { - // Initialize the IPs. - ips, err := lipam.getReservedIPs(ctx) + // List all IPs present in the cluster. + ips, err := listIPsOnCluster(ctx, lipam.Client) if err != nil { return err } + // Initialize the IPs. for _, ip := range ips { - if err := lipam.reserveIP(ip.ip, ip.cidr); err != nil { - klog.Errorf("Failed to reserve IP %s in network %s: %v", ip.ip, ip.cidr, err) + if err := lipam.reserveIP(ip); err != nil { + klog.Errorf("Failed to reserve IP %q (network %q): %v", ip.ip, ip.cidr, err) return err } } return nil } - -func (lipam *LiqoIPAM) getReservedNetworks(ctx context.Context) ([]string, error) { - var nets []string - var networks ipamv1alpha1.NetworkList - if err := lipam.Options.Client.List(ctx, &networks); err != nil { - return nil, err - } - - for i := range networks.Items { - net := &networks.Items[i] - - var cidr string - switch { - case net.Labels != nil && net.Labels[consts.NetworkNotRemappedLabelKey] == consts.NetworkNotRemappedLabelValue: - cidr = net.Spec.CIDR.String() - default: - cidr = net.Status.CIDR.String() - } - if cidr == "" { - klog.Warningf("Network %s has no CIDR", net.Name) - continue - } - - nets = append(nets, cidr) - } - - return nets, nil -} - -func (lipam *LiqoIPAM) getReservedIPs(ctx context.Context) ([]ipCidr, error) { - var ips []ipCidr - var ipList ipamv1alpha1.IPList - if err := lipam.Options.Client.List(ctx, &ipList); err != nil { - return nil, err - } - - for i := range ipList.Items { - ip := &ipList.Items[i] - - address := ip.Status.IP.String() - if address == "" { - klog.Warningf("IP %s has no address", ip.Name) - continue - } - - cidr := ip.Status.CIDR.String() - if cidr == "" { - klog.Warningf("IP %s has no CIDR", ip.Name) - continue - } - - ips = append(ips, ipCidr{ip: address, cidr: cidr}) - } - - return ips, nil -} - -func (lipam *LiqoIPAM) reserveNetwork(cidr string) error { - // TODO: Reserve the network. - klog.Infof("Reserved network %s", cidr) - return nil -} - -func (lipam *LiqoIPAM) reserveIP(ip, cidr string) error { - // TODO: Reserve the IP. - klog.Infof("Reserved IP %s in network %s", ip, cidr) - return nil -} diff --git a/pkg/ipam/ipam.go b/pkg/ipam/ipam.go index f591345059..a7eb1d719d 100644 --- a/pkg/ipam/ipam.go +++ b/pkg/ipam/ipam.go @@ -16,51 +16,58 @@ package ipam import ( "context" + "sync" "time" "google.golang.org/grpc/health" "google.golang.org/grpc/health/grpc_health_v1" - "k8s.io/client-go/rest" "sigs.k8s.io/controller-runtime/pkg/client" ) // LiqoIPAM is the struct implementing the IPAM interface. type LiqoIPAM struct { UnimplementedIPAMServer + HealthServer *health.Server - Options *Options -} - -// Options contains the options to configure the IPAM. -type Options struct { - Port int - Config *rest.Config - Client client.Client + client.Client - EnableLeaderElection bool - LeaderElectionNamespace string - LeaderElectionName string - LeaseDuration time.Duration - RenewDeadline time.Duration - RetryPeriod time.Duration - PodName string + opts *ServerOptions + cacheNetworks map[string]networkInfo + cacheIPs map[string]ipInfo + mutex sync.Mutex +} - HealthServer *health.Server +// ServerOptions contains the options to configure the IPAM server. +type ServerOptions struct { + Port int + SyncFrequency time.Duration } // New creates a new instance of the LiqoIPAM. -func New(ctx context.Context, opts *Options) (*LiqoIPAM, error) { - opts.HealthServer.SetServingStatus(IPAM_ServiceDesc.ServiceName, grpc_health_v1.HealthCheckResponse_NOT_SERVING) +func New(ctx context.Context, cl client.Client, opts *ServerOptions) (*LiqoIPAM, error) { + hs := health.NewServer() + hs.SetServingStatus(IPAM_ServiceDesc.ServiceName, grpc_health_v1.HealthCheckResponse_NOT_SERVING) lipam := &LiqoIPAM{ - Options: opts, + HealthServer: hs, + + Client: cl, + + opts: opts, + cacheNetworks: make(map[string]networkInfo), + cacheIPs: make(map[string]ipInfo), } + // Initialize the IPAM instance if err := lipam.initialize(ctx); err != nil { return nil, err } - opts.HealthServer.SetServingStatus(IPAM_ServiceDesc.ServiceName, grpc_health_v1.HealthCheckResponse_SERVING) + // Launch sync routine + go lipam.sync(ctx, opts.SyncFrequency) + + hs.SetServingStatus(IPAM_ServiceDesc.ServiceName, grpc_health_v1.HealthCheckResponse_SERVING) + return lipam, nil } diff --git a/pkg/ipam/ips.go b/pkg/ipam/ips.go new file mode 100644 index 0000000000..2ae7eed0ab --- /dev/null +++ b/pkg/ipam/ips.go @@ -0,0 +1,94 @@ +// Copyright 2019-2024 The Liqo Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ipam + +import ( + "context" + "time" + + klog "k8s.io/klog/v2" + "sigs.k8s.io/controller-runtime/pkg/client" + + ipamv1alpha1 "github.com/liqotech/liqo/apis/ipam/v1alpha1" +) + +type ipInfo struct { + ipCidr + creationTimestamp time.Time +} + +type ipCidr struct { + ip string + cidr string +} + +func (i ipCidr) String() string { + return i.ip + "-" + i.cidr +} + +// reserveIP reserves an IP, saving it in the cache. +func (lipam *LiqoIPAM) reserveIP(ip ipCidr) error { + lipam.mutex.Lock() + defer lipam.mutex.Unlock() + + ipI := ipInfo{ + ipCidr: ip, + creationTimestamp: time.Now(), + } + if lipam.cacheIPs == nil { + lipam.cacheIPs = make(map[string]ipInfo) + } + lipam.cacheIPs[ip.String()] = ipI + + klog.Infof("Reserved IP %q (network %q)", ip.ip, ip.cidr) + return nil +} + +// freeIP frees an IP, removing it from the cache. +func (lipam *LiqoIPAM) freeIP(ip ipCidr) { + lipam.mutex.Lock() + defer lipam.mutex.Unlock() + + delete(lipam.cacheIPs, ip.String()) + klog.Infof("Freed IP %q (network %q)", ip.ip, ip.cidr) +} + +func listIPsOnCluster(ctx context.Context, cl client.Client) ([]ipCidr, error) { + var ips []ipCidr + var ipList ipamv1alpha1.IPList + if err := cl.List(ctx, &ipList); err != nil { + return nil, err + } + + for i := range ipList.Items { + ip := &ipList.Items[i] + + address := ip.Status.IP.String() + if address == "" { + klog.Warningf("IP %q has no address", ip.Name) + continue + } + + cidr := ip.Status.CIDR.String() + if cidr == "" { + klog.Warningf("IP %q has no CIDR", ip.Name) + continue + } + + ips = append(ips, ipCidr{ip: address, cidr: cidr}) + } + + return ips, nil +} diff --git a/pkg/ipam/networks.go b/pkg/ipam/networks.go new file mode 100644 index 0000000000..c99a5c0c05 --- /dev/null +++ b/pkg/ipam/networks.go @@ -0,0 +1,86 @@ +// Copyright 2019-2024 The Liqo Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ipam + +import ( + "context" + "time" + + klog "k8s.io/klog/v2" + "sigs.k8s.io/controller-runtime/pkg/client" + + ipamv1alpha1 "github.com/liqotech/liqo/apis/ipam/v1alpha1" + "github.com/liqotech/liqo/pkg/consts" +) + +type networkInfo struct { + cidr string + creationTimestamp time.Time +} + +// reserveNetwork reserves a network, saving it in the cache. +func (lipam *LiqoIPAM) reserveNetwork(cidr string) error { + lipam.mutex.Lock() + defer lipam.mutex.Unlock() + + nwI := networkInfo{ + cidr: cidr, + creationTimestamp: time.Now(), + } + if lipam.cacheNetworks == nil { + lipam.cacheNetworks = make(map[string]networkInfo) + } + lipam.cacheNetworks[cidr] = nwI + + klog.Infof("Reserved network %q", cidr) + return nil +} + +// freeNetwork frees a network, removing it from the cache. +func (lipam *LiqoIPAM) freeNetwork(cidr string) { + lipam.mutex.Lock() + defer lipam.mutex.Unlock() + + delete(lipam.cacheNetworks, cidr) + klog.Infof("Freed network %q", cidr) +} + +func listNetworksOnCluster(ctx context.Context, cl client.Client) ([]string, error) { + var nets []string + var networks ipamv1alpha1.NetworkList + if err := cl.List(ctx, &networks); err != nil { + return nil, err + } + + for i := range networks.Items { + net := &networks.Items[i] + + var cidr string + switch { + case net.Labels != nil && net.Labels[consts.NetworkNotRemappedLabelKey] == consts.NetworkNotRemappedLabelValue: + cidr = net.Spec.CIDR.String() + default: + cidr = net.Status.CIDR.String() + } + if cidr == "" { + klog.Warningf("Network %q has no CIDR", net.Name) + continue + } + + nets = append(nets, cidr) + } + + return nets, nil +} diff --git a/pkg/ipam/options.go b/pkg/ipam/options.go new file mode 100644 index 0000000000..828b7a54fd --- /dev/null +++ b/pkg/ipam/options.go @@ -0,0 +1,32 @@ +// Copyright 2019-2024 The Liqo Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ipam + +import ( + "time" +) + +// Options contains the options for the ipam pod. +type Options struct { + EnableLeaderElection bool + LeaderElectionNamespace string + LeaderElectionName string + LeaseDuration time.Duration + RenewDeadline time.Duration + RetryPeriod time.Duration + PodName string + + ServerOpts ServerOptions +} diff --git a/pkg/ipam/sync.go b/pkg/ipam/sync.go new file mode 100644 index 0000000000..0e953df4e5 --- /dev/null +++ b/pkg/ipam/sync.go @@ -0,0 +1,100 @@ +// Copyright 2019-2024 The Liqo Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ipam + +import ( + "context" + "os" + "time" + + "k8s.io/apimachinery/pkg/util/wait" + klog "k8s.io/klog/v2" +) + +// +kubebuilder:rbac:groups=ipam.liqo.io,resources=ips,verbs=get;list;watch +// +kubebuilder:rbac:groups=ipam.liqo.io,resources=networks,verbs=get;list;watch + +func (lipam *LiqoIPAM) sync(ctx context.Context, syncFrequency time.Duration) { + err := wait.PollUntilContextCancel(ctx, syncFrequency, false, + func(ctx context.Context) (done bool, err error) { + klog.Info("Started IPAM cache sync routine") + now := time.Now() + // networks created before this threshold will be removed from the cache if they are not present in the cluster. + expiredThreshold := now.Add(-syncFrequency) + + // Sync networks. + if err := lipam.syncNetworks(ctx, expiredThreshold); err != nil { + return false, err + } + + // Sync IPs. + if err := lipam.syncIPs(ctx, expiredThreshold); err != nil { + return false, err + } + + klog.Info("Completed IPAM cache sync routine") + return false, nil + }) + if err != nil { + klog.Errorf("IPAM cache sync routine failed: %v", err) + os.Exit(1) + } +} + +func (lipam *LiqoIPAM) syncNetworks(ctx context.Context, expiredThreshold time.Time) error { + // List all networks present in the cluster. + nets, err := listNetworksOnCluster(ctx, lipam.Client) + if err != nil { + return err + } + + // Create a set for faster lookup. + nwSet := make(map[string]struct{}) + for _, net := range nets { + nwSet[net] = struct{}{} + } + + // Remove networks that are present in the cache but not in the cluster, and were added before the threshold. + for key := range lipam.cacheNetworks { + if _, ok := nwSet[key]; !ok && lipam.cacheNetworks[key].creationTimestamp.Before(expiredThreshold) { + lipam.freeNetwork(lipam.cacheNetworks[key].cidr) + } + } + + return nil +} + +func (lipam *LiqoIPAM) syncIPs(ctx context.Context, expiredThreshold time.Time) error { + // List all IPs present in the cluster. + ips, err := listIPsOnCluster(ctx, lipam.Client) + if err != nil { + return err + } + + // Create a set for faster lookup. + ipSet := make(map[string]struct{}) + for _, ip := range ips { + ipSet[ip.String()] = struct{}{} + } + + // Remove IPs that are present in the cache but not in the cluster, and were added before the threshold. + for key := range lipam.cacheIPs { + if _, ok := ipSet[key]; !ok && lipam.cacheIPs[key].creationTimestamp.Before(expiredThreshold) { + lipam.freeIP(lipam.cacheIPs[key].ipCidr) + } + } + + return nil +}