Skip to content

Commit dd12b83

Browse files
committed
feat: ipam sync routine
1 parent 687b648 commit dd12b83

File tree

8 files changed

+367
-118
lines changed

8 files changed

+367
-118
lines changed

cmd/ipam/main.go

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ import (
2323

2424
"github.com/spf13/cobra"
2525
"google.golang.org/grpc"
26-
"google.golang.org/grpc/health"
2726
"google.golang.org/grpc/health/grpc_health_v1"
2827
corev1 "k8s.io/api/core/v1"
2928
"k8s.io/apimachinery/pkg/runtime"
@@ -46,7 +45,8 @@ import (
4645
const leaderElectorName = "liqo-ipam-leaderelection"
4746

4847
var (
49-
scheme = runtime.NewScheme()
48+
scheme = runtime.NewScheme()
49+
options ipam.Options
5050
)
5151

5252
func init() {
@@ -59,8 +59,6 @@ func init() {
5959
// +kubebuilder:rbac:groups=apps,resources=deployments,verbs=get;list;watch;
6060
// +kubebuilder:rbac:groups=core,resources=pods,verbs=get;list;watch;update;patch
6161

62-
var options ipam.Options
63-
6462
func main() {
6563
var cmd = cobra.Command{
6664
Use: "liqo-ipam",
@@ -70,7 +68,12 @@ func main() {
7068
flagsutils.InitKlogFlags(cmd.Flags())
7169
restcfg.InitFlags(cmd.Flags())
7270

73-
cmd.Flags().IntVar(&options.Port, "port", consts.IpamPort, "The port on which to listen for incoming gRPC requests.")
71+
// Server options.
72+
cmd.Flags().IntVar(&options.ServerOpts.Port, "port", consts.IpamPort, "The port on which to listen for incoming gRPC requests.")
73+
cmd.Flags().DurationVar(&options.ServerOpts.SyncFrequency, "interval", consts.SyncFrequency,
74+
"The interval at which the IPAM will synchronize the IPAM storage.")
75+
76+
// Leader election flags.
7477
cmd.Flags().BoolVar(&options.EnableLeaderElection, "leader-election", false, "Enable leader election for IPAM. "+
7578
"Enabling this will ensure there is only one active IPAM.")
7679
cmd.Flags().StringVar(&options.LeaderElectionNamespace, "leader-election-namespace", consts.DefaultLiqoNamespace,
@@ -102,14 +105,14 @@ func run(cmd *cobra.Command, _ []string) error {
102105

103106
// Get the rest config.
104107
cfg := restcfg.SetRateLimiter(ctrl.GetConfigOrDie())
105-
options.Config = cfg
108+
109+
// Get the client.
106110
cl, err := client.New(cfg, client.Options{
107111
Scheme: scheme,
108112
})
109113
if err != nil {
110114
return err
111115
}
112-
options.Client = cl
113116

114117
if options.EnableLeaderElection {
115118
if leader, err := leaderelection.Blocking(ctx, cfg, record.NewBroadcaster(), &leaderelection.Opts{
@@ -129,23 +132,20 @@ func run(cmd *cobra.Command, _ []string) error {
129132
}
130133
}
131134

132-
hs := health.NewServer()
133-
options.HealthServer = hs
134-
135-
liqoIPAM, err := ipam.New(ctx, &options)
135+
liqoIPAM, err := ipam.New(ctx, cl, &options.ServerOpts)
136136
if err != nil {
137137
return err
138138
}
139139

140-
lis, err := net.Listen("tcp", fmt.Sprintf("0.0.0.0:%d", options.Port))
140+
lis, err := net.Listen("tcp", fmt.Sprintf("0.0.0.0:%d", options.ServerOpts.Port))
141141
if err != nil {
142142
return err
143143
}
144144

145145
server := grpc.NewServer()
146146

147147
// Register health service
148-
grpc_health_v1.RegisterHealthServer(server, hs)
148+
grpc_health_v1.RegisterHealthServer(server, liqoIPAM.HealthServer)
149149

150150
// Register IPAM service
151151
ipam.RegisterIPAMServer(server, liqoIPAM)

pkg/consts/ipam.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,16 @@
1414

1515
package consts
1616

17+
import "time"
18+
1719
// NetworkType indicates the type of Network.
1820
type NetworkType string
1921

2022
const (
2123
// IpamPort is the port used by the IPAM gRPC server.
22-
IpamPort = 50051
24+
IpamPort = 6000
25+
// SyncFrequency is the frequency at which the IPAM should periodically sync its status.
26+
SyncFrequency = 2 * time.Minute
2327

2428
// NetworkNotRemappedLabelKey is the label key used to mark a Network that does not need CIDR remapping.
2529
NetworkNotRemappedLabelKey = "ipam.liqo.io/network-not-remapped"

pkg/ipam/initialize.go

Lines changed: 9 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -18,19 +18,11 @@ import (
1818
"context"
1919

2020
klog "k8s.io/klog/v2"
21-
22-
ipamv1alpha1 "github.com/liqotech/liqo/apis/ipam/v1alpha1"
23-
"github.com/liqotech/liqo/pkg/consts"
2421
)
2522

2623
// +kubebuilder:rbac:groups=ipam.liqo.io,resources=ips,verbs=get;list;watch
2724
// +kubebuilder:rbac:groups=ipam.liqo.io,resources=networks,verbs=get;list;watch
2825

29-
type ipCidr struct {
30-
ip string
31-
cidr string
32-
}
33-
3426
func (lipam *LiqoIPAM) initialize(ctx context.Context) error {
3527
if err := lipam.initializeNetworks(ctx); err != nil {
3628
return err
@@ -45,15 +37,16 @@ func (lipam *LiqoIPAM) initialize(ctx context.Context) error {
4537
}
4638

4739
func (lipam *LiqoIPAM) initializeNetworks(ctx context.Context) error {
48-
// Initialize the networks.
49-
nets, err := lipam.getReservedNetworks(ctx)
40+
// List all networks present in the cluster.
41+
nets, err := listNetworksOnCluster(ctx, lipam.Client)
5042
if err != nil {
5143
return err
5244
}
5345

46+
// Initialize the networks.
5447
for _, net := range nets {
5548
if err := lipam.reserveNetwork(net); err != nil {
56-
klog.Errorf("Failed to reserve network %s: %v", net, err)
49+
klog.Errorf("Failed to reserve network %q: %v", net, err)
5750
return err
5851
}
5952
}
@@ -62,86 +55,19 @@ func (lipam *LiqoIPAM) initializeNetworks(ctx context.Context) error {
6255
}
6356

6457
func (lipam *LiqoIPAM) initializeIPs(ctx context.Context) error {
65-
// Initialize the IPs.
66-
ips, err := lipam.getReservedIPs(ctx)
58+
// List all IPs present in the cluster.
59+
ips, err := listIPsOnCluster(ctx, lipam.Client)
6760
if err != nil {
6861
return err
6962
}
7063

64+
// Initialize the IPs.
7165
for _, ip := range ips {
72-
if err := lipam.reserveIP(ip.ip, ip.cidr); err != nil {
73-
klog.Errorf("Failed to reserve IP %s in network %s: %v", ip.ip, ip.cidr, err)
66+
if err := lipam.reserveIP(ip); err != nil {
67+
klog.Errorf("Failed to reserve IP %q (network %q): %v", ip.ip, ip.cidr, err)
7468
return err
7569
}
7670
}
7771

7872
return nil
7973
}
80-
81-
func (lipam *LiqoIPAM) getReservedNetworks(ctx context.Context) ([]string, error) {
82-
var nets []string
83-
var networks ipamv1alpha1.NetworkList
84-
if err := lipam.Options.Client.List(ctx, &networks); err != nil {
85-
return nil, err
86-
}
87-
88-
for i := range networks.Items {
89-
net := &networks.Items[i]
90-
91-
var cidr string
92-
switch {
93-
case net.Labels != nil && net.Labels[consts.NetworkNotRemappedLabelKey] == consts.NetworkNotRemappedLabelValue:
94-
cidr = net.Spec.CIDR.String()
95-
default:
96-
cidr = net.Status.CIDR.String()
97-
}
98-
if cidr == "" {
99-
klog.Warningf("Network %s has no CIDR", net.Name)
100-
continue
101-
}
102-
103-
nets = append(nets, cidr)
104-
}
105-
106-
return nets, nil
107-
}
108-
109-
func (lipam *LiqoIPAM) getReservedIPs(ctx context.Context) ([]ipCidr, error) {
110-
var ips []ipCidr
111-
var ipList ipamv1alpha1.IPList
112-
if err := lipam.Options.Client.List(ctx, &ipList); err != nil {
113-
return nil, err
114-
}
115-
116-
for i := range ipList.Items {
117-
ip := &ipList.Items[i]
118-
119-
address := ip.Status.IP.String()
120-
if address == "" {
121-
klog.Warningf("IP %s has no address", ip.Name)
122-
continue
123-
}
124-
125-
cidr := ip.Status.CIDR.String()
126-
if cidr == "" {
127-
klog.Warningf("IP %s has no CIDR", ip.Name)
128-
continue
129-
}
130-
131-
ips = append(ips, ipCidr{ip: address, cidr: cidr})
132-
}
133-
134-
return ips, nil
135-
}
136-
137-
func (lipam *LiqoIPAM) reserveNetwork(cidr string) error {
138-
// TODO: Reserve the network.
139-
klog.Infof("Reserved network %s", cidr)
140-
return nil
141-
}
142-
143-
func (lipam *LiqoIPAM) reserveIP(ip, cidr string) error {
144-
// TODO: Reserve the IP.
145-
klog.Infof("Reserved IP %s in network %s", ip, cidr)
146-
return nil
147-
}

pkg/ipam/ipam.go

Lines changed: 28 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -16,51 +16,58 @@ package ipam
1616

1717
import (
1818
"context"
19+
"sync"
1920
"time"
2021

2122
"google.golang.org/grpc/health"
2223
"google.golang.org/grpc/health/grpc_health_v1"
23-
"k8s.io/client-go/rest"
2424
"sigs.k8s.io/controller-runtime/pkg/client"
2525
)
2626

2727
// LiqoIPAM is the struct implementing the IPAM interface.
2828
type LiqoIPAM struct {
2929
UnimplementedIPAMServer
30+
HealthServer *health.Server
3031

31-
Options *Options
32-
}
33-
34-
// Options contains the options to configure the IPAM.
35-
type Options struct {
36-
Port int
37-
Config *rest.Config
38-
Client client.Client
32+
client.Client
3933

40-
EnableLeaderElection bool
41-
LeaderElectionNamespace string
42-
LeaderElectionName string
43-
LeaseDuration time.Duration
44-
RenewDeadline time.Duration
45-
RetryPeriod time.Duration
46-
PodName string
34+
opts *ServerOptions
35+
cacheNetworks map[string]networkInfo
36+
cacheIPs map[string]ipInfo
37+
mutex sync.Mutex
38+
}
4739

48-
HealthServer *health.Server
40+
// ServerOptions contains the options to configure the IPAM server.
41+
type ServerOptions struct {
42+
Port int
43+
SyncFrequency time.Duration
4944
}
5045

5146
// New creates a new instance of the LiqoIPAM.
52-
func New(ctx context.Context, opts *Options) (*LiqoIPAM, error) {
53-
opts.HealthServer.SetServingStatus(IPAM_ServiceDesc.ServiceName, grpc_health_v1.HealthCheckResponse_NOT_SERVING)
47+
func New(ctx context.Context, cl client.Client, opts *ServerOptions) (*LiqoIPAM, error) {
48+
hs := health.NewServer()
49+
hs.SetServingStatus(IPAM_ServiceDesc.ServiceName, grpc_health_v1.HealthCheckResponse_NOT_SERVING)
5450

5551
lipam := &LiqoIPAM{
56-
Options: opts,
52+
HealthServer: hs,
53+
54+
Client: cl,
55+
56+
opts: opts,
57+
cacheNetworks: make(map[string]networkInfo),
58+
cacheIPs: make(map[string]ipInfo),
5759
}
5860

61+
// Initialize the IPAM instance
5962
if err := lipam.initialize(ctx); err != nil {
6063
return nil, err
6164
}
6265

63-
opts.HealthServer.SetServingStatus(IPAM_ServiceDesc.ServiceName, grpc_health_v1.HealthCheckResponse_SERVING)
66+
// Launch sync routine
67+
go lipam.sync(ctx, opts.SyncFrequency)
68+
69+
hs.SetServingStatus(IPAM_ServiceDesc.ServiceName, grpc_health_v1.HealthCheckResponse_SERVING)
70+
6471
return lipam, nil
6572
}
6673

0 commit comments

Comments
 (0)