@@ -17,25 +17,24 @@ package main
1717
1818import (
1919 "fmt"
20+ "net"
2021 "os"
2122
2223 "github.com/spf13/cobra"
24+ "google.golang.org/grpc"
25+ "google.golang.org/grpc/health"
26+ "google.golang.org/grpc/health/grpc_health_v1"
2327 corev1 "k8s.io/api/core/v1"
2428 "k8s.io/apimachinery/pkg/runtime"
2529 utilruntime "k8s.io/apimachinery/pkg/util/runtime"
26- "k8s.io/client-go/dynamic"
27- "k8s.io/client-go/kubernetes"
28- corev1clients "k8s.io/client-go/kubernetes/typed/core/v1"
2930 _ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
3031 "k8s.io/client-go/tools/record"
3132 "k8s.io/klog/v2"
32- "k8s.io/utils/ptr"
3333 ctrl "sigs.k8s.io/controller-runtime"
3434 "sigs.k8s.io/controller-runtime/pkg/log"
3535
3636 ipamv1alpha1 "github.com/liqotech/liqo/apis/ipam/v1alpha1"
37- "github.com/liqotech/liqo/pkg/consts"
38- liqoipam "github.com/liqotech/liqo/pkg/ipam"
37+ "github.com/liqotech/liqo/pkg/ipam"
3938 "github.com/liqotech/liqo/pkg/leaderelection"
4039 flagsutils "github.com/liqotech/liqo/pkg/utils/flags"
4140 "github.com/liqotech/liqo/pkg/utils/restcfg"
@@ -44,8 +43,7 @@ import (
4443const leaderElectorName = "liqo-ipam-leader-election"
4544
4645var (
47- scheme = runtime .NewScheme ()
48- options = liqoipam .NewOptions ()
46+ scheme = runtime .NewScheme ()
4947)
5048
5149func init () {
@@ -59,6 +57,8 @@ func init() {
5957// +kubebuilder:rbac:groups=core,resources=pods,verbs=get;list;watch;update;patch
6058// +kubebuilder:rbac:groups=ipam.liqo.io,resources=ipamstorages,verbs=get;list;watch;create;update;patch
6159
60+ var options ipam.Options
61+
6262func main () {
6363 var cmd = cobra.Command {
6464 Use : "liqo-ipam" ,
@@ -68,121 +68,75 @@ func main() {
6868 flagsutils .InitKlogFlags (cmd .Flags ())
6969 restcfg .InitFlags (cmd .Flags ())
7070
71- liqoipam .InitFlags (cmd .Flags (), options )
72- if err := liqoipam .MarkFlagsRequired (& cmd , options ); err != nil {
73- klog .Error (err )
74- os .Exit (1 )
75- }
71+ cmd .Flags ().IntVar (& options .Port , "port" , 50051 , "The port on which to listen for incoming gRPC requests." )
72+ cmd .Flags ().BoolVar (& options .EnableLeaderElection , "leader-election" , false , "Enable leader election for IPAM. " +
73+ "Enabling this will ensure there is only one active IPAM." )
74+ cmd .Flags ().StringVar (& options .LeaderElectionNamespace , "leader-election-namespace" , "liqo" ,
75+ "The namespace in which the leader election lease will be created." )
76+ cmd .Flags ().StringVar (& options .LeaderElectionName , "leader-election-name" , leaderElectorName ,
77+ "The name of the leader election lease." )
78+ cmd .Flags ().DurationVar (& options .LeaseDuration , "lease-duration" , 15 ,
79+ "The duration that non-leader candidates will wait to force acquire leadership." )
80+ cmd .Flags ().DurationVar (& options .RenewDeadline , "renew-deadline" , 10 ,
81+ "The duration that the acting IPAM will retry refreshing leadership before giving up." )
82+ cmd .Flags ().DurationVar (& options .RetryPeriod , "retry-period" , 2 ,
83+ "The duration the LeaderElector clients should wait between tries of actions." )
84+ cmd .Flags ().StringVar (& options .PodName , "pod-name" , "" ,
85+ "The name of the pod running the IPAM service." )
86+
87+ utilruntime .Must (cmd .MarkFlagRequired ("pod-name" ))
7688
7789 if err := cmd .Execute (); err != nil {
7890 klog .Error (err )
7991 os .Exit (1 )
8092 }
8193}
8294
83- func run (_ * cobra.Command , _ []string ) error {
84- var err error
85-
86- // The IpamStorage resource will be stored in the same namespace of the IPAM pod.
87- podNamespace := os .Getenv ("POD_NAMESPACE" )
95+ func run (cmd * cobra.Command , _ []string ) error {
96+ ctx := cmd .Context ()
8897
8998 // Set controller-runtime logger.
9099 log .SetLogger (klog .NewKlogr ())
91100
92101 // Get the rest config.
93102 cfg := restcfg .SetRateLimiter (ctrl .GetConfigOrDie ())
94103
95- // Get dynamic client.
96- dynClient := dynamic .NewForConfigOrDie (cfg )
97-
98- // Setup IPAM.
99- ipam := liqoipam .NewIPAM ()
100-
101- startIPAMServer := func () {
102- // Initialize and start IPAM server.
103- if err = initializeIPAM (ipam , options , dynClient , podNamespace ); err != nil {
104- klog .Errorf ("Failed to initialize IPAM: %s" , err )
104+ if options .EnableLeaderElection {
105+ if leader , err := leaderelection .Blocking (ctx , cfg , record .NewBroadcaster (), & leaderelection.Opts {
106+ PodInfo : leaderelection.PodInfo {
107+ PodName : options .PodName ,
108+ Namespace : options .LeaderElectionNamespace ,
109+ },
110+ LeaderElectorName : options .LeaderElectionName ,
111+ LeaseDuration : options .LeaseDuration ,
112+ RenewDeadline : options .RenewDeadline ,
113+ RetryPeriod : options .RetryPeriod ,
114+ }); err != nil {
115+ return err
116+ } else if ! leader {
117+ klog .Error ("IPAM is not the leader, exiting" )
105118 os .Exit (1 )
106119 }
107120 }
108121
109- stopIPAMServer := func () {
110- ipam .Terminate ()
111- }
122+ hs := health .NewServer ()
123+ liqoIPAM := ipam .New (& options )
112124
113- ctx := ctrl .SetupSignalHandler ()
114-
115- // If the lease is disabled, start IPAM server without leader election mechanism (i.e., do not support IPAM high-availability).
116- if ! options .LeaseEnabled {
117- startIPAMServer ()
118- <- ctx .Done ()
119- stopIPAMServer ()
120- return nil
121- }
122-
123- // Else, initialize the leader election mechanism to manage multiple replicas of the IPAM server running in active-passive mode.
124- leaderelectionOpts := & leaderelection.Opts {
125- PodInfo : leaderelection.PodInfo {
126- PodName : os .Getenv ("POD_NAME" ),
127- Namespace : podNamespace ,
128- DeploymentName : ptr .To (os .Getenv ("DEPLOYMENT_NAME" )),
129- },
130- LeaderElectorName : leaderElectorName ,
131- LeaseDuration : options .LeaseDuration ,
132- RenewDeadline : options .LeaseRenewDeadline ,
133- RetryPeriod : options .LeaseRetryPeriod ,
134- InitCallback : startIPAMServer ,
135- StopCallback : stopIPAMServer ,
136- LabelLeader : options .LabelLeader ,
137- }
138-
139- localClient := kubernetes .NewForConfigOrDie (cfg )
140- eb := record .NewBroadcaster ()
141- eb .StartRecordingToSink (& corev1clients.EventSinkImpl {Interface : localClient .CoreV1 ().Events (corev1 .NamespaceAll )})
142-
143- leaderElector , err := leaderelection .Init (leaderelectionOpts , cfg , eb )
125+ lis , err := net .Listen ("tcp" , fmt .Sprintf ("0.0.0.0:%d" , options .Port ))
144126 if err != nil {
145127 return err
146128 }
147129
148- // Start IPAM using leader election mechanism.
149- leaderelection .Run (ctx , leaderElector )
150-
151- return nil
152- }
153-
154- func initializeIPAM (ipam * liqoipam.IPAM , opts * liqoipam.Options , dynClient dynamic.Interface , namespace string ) error {
155- if ipam == nil {
156- return fmt .Errorf ("IPAM pointer is nil. Initialize it before calling this function" )
157- }
158-
159- if err := ipam .Init (liqoipam .Pools , dynClient , namespace ); err != nil {
160- return err
161- }
162-
163- // Configure PodCIDR
164- if err := ipam .SetPodCIDR (opts .PodCIDR .String ()); err != nil {
165- return err
166- }
130+ server := grpc .NewServer ()
167131
168- // Configure ServiceCIDR
169- if err := ipam .SetServiceCIDR (opts .ServiceCIDR .String ()); err != nil {
170- return err
171- }
172-
173- // Configure additional network pools.
174- for _ , pool := range opts .AdditionalPools .StringList .StringList {
175- if err := ipam .AddNetworkPool (pool ); err != nil {
176- return err
177- }
178- }
132+ // Register health service
133+ grpc_health_v1 .RegisterHealthServer (server , hs )
179134
180- // Configure reserved subnets.
181- if err := ipam .SetReservedSubnets (opts .ReservedPools .StringList .StringList ); err != nil {
182- return err
183- }
135+ // Register IPAM service
136+ ipam .RegisterIPAMServer (server , liqoIPAM )
184137
185- if err := ipam .Serve (consts .IpamPort ); err != nil {
138+ if err := server .Serve (lis ); err != nil {
139+ klog .Errorf ("failed to serve: %v" , err )
186140 return err
187141 }
188142
0 commit comments