@@ -17,25 +17,25 @@ package main
1717
1818import (
1919 "fmt"
20+ "net"
2021 "os"
2122
2223 "github.com/spf13/cobra"
24+ "google.golang.org/grpc"
25+ "google.golang.org/grpc/health"
26+ "google.golang.org/grpc/health/grpc_health_v1"
2327 corev1 "k8s.io/api/core/v1"
2428 "k8s.io/apimachinery/pkg/runtime"
2529 utilruntime "k8s.io/apimachinery/pkg/util/runtime"
26- "k8s.io/client-go/dynamic"
27- "k8s.io/client-go/kubernetes"
28- corev1clients "k8s.io/client-go/kubernetes/typed/core/v1"
2930 _ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
3031 "k8s.io/client-go/tools/record"
3132 "k8s.io/klog/v2"
32- "k8s.io/utils/ptr"
3333 ctrl "sigs.k8s.io/controller-runtime"
3434 "sigs.k8s.io/controller-runtime/pkg/log"
3535
3636 ipamv1alpha1 "github.com/liqotech/liqo/apis/ipam/v1alpha1"
3737 "github.com/liqotech/liqo/pkg/consts"
38- liqoipam "github.com/liqotech/liqo/pkg/ipam"
38+ "github.com/liqotech/liqo/pkg/ipam"
3939 "github.com/liqotech/liqo/pkg/leaderelection"
4040 flagsutils "github.com/liqotech/liqo/pkg/utils/flags"
4141 "github.com/liqotech/liqo/pkg/utils/restcfg"
@@ -44,8 +44,7 @@ import (
4444const leaderElectorName = "liqo-ipam-leader-election"
4545
4646var (
47- scheme = runtime .NewScheme ()
48- options = liqoipam .NewOptions ()
47+ scheme = runtime .NewScheme ()
4948)
5049
5150func init () {
@@ -59,6 +58,8 @@ func init() {
5958// +kubebuilder:rbac:groups=core,resources=pods,verbs=get;list;watch;update;patch
6059// +kubebuilder:rbac:groups=ipam.liqo.io,resources=ipamstorages,verbs=get;list;watch;create;update;patch
6160
61+ var options ipam.Options
62+
6263func main () {
6364 var cmd = cobra.Command {
6465 Use : "liqo-ipam" ,
@@ -68,121 +69,77 @@ func main() {
6869 flagsutils .InitKlogFlags (cmd .Flags ())
6970 restcfg .InitFlags (cmd .Flags ())
7071
71- liqoipam .InitFlags (cmd .Flags (), options )
72- if err := liqoipam .MarkFlagsRequired (& cmd , options ); err != nil {
73- klog .Error (err )
74- os .Exit (1 )
75- }
72+ cmd .Flags ().IntVar (& options .Port , "port" , consts .IpamPort , "The port on which to listen for incoming gRPC requests." )
73+ cmd .Flags ().BoolVar (& options .EnableLeaderElection , "leader-election" , false , "Enable leader election for IPAM. " +
74+ "Enabling this will ensure there is only one active IPAM." )
75+ cmd .Flags ().StringVar (& options .LeaderElectionNamespace , "leader-election-namespace" , "liqo" ,
76+ "The namespace in which the leader election lease will be created." )
77+ cmd .Flags ().StringVar (& options .LeaderElectionName , "leader-election-name" , leaderElectorName ,
78+ "The name of the leader election lease." )
79+ cmd .Flags ().DurationVar (& options .LeaseDuration , "lease-duration" , 15 ,
80+ "The duration that non-leader candidates will wait to force acquire leadership." )
81+ cmd .Flags ().DurationVar (& options .RenewDeadline , "renew-deadline" , 10 ,
82+ "The duration that the acting IPAM will retry refreshing leadership before giving up." )
83+ cmd .Flags ().DurationVar (& options .RetryPeriod , "retry-period" , 2 ,
84+ "The duration the LeaderElector clients should wait between tries of actions." )
85+ cmd .Flags ().StringVar (& options .PodName , "pod-name" , "" ,
86+ "The name of the pod running the IPAM service." )
87+
88+ utilruntime .Must (cmd .MarkFlagRequired ("pod-name" ))
7689
7790 if err := cmd .Execute (); err != nil {
7891 klog .Error (err )
7992 os .Exit (1 )
8093 }
8194}
8295
83- func run (_ * cobra.Command , _ []string ) error {
84- var err error
85-
86- // The IpamStorage resource will be stored in the same namespace of the IPAM pod.
87- podNamespace := os .Getenv ("POD_NAMESPACE" )
96+ func run (cmd * cobra.Command , _ []string ) error {
97+ ctx := cmd .Context ()
8898
8999 // Set controller-runtime logger.
90100 log .SetLogger (klog .NewKlogr ())
91101
92102 // Get the rest config.
93103 cfg := restcfg .SetRateLimiter (ctrl .GetConfigOrDie ())
94104
95- // Get dynamic client.
96- dynClient := dynamic .NewForConfigOrDie (cfg )
97-
98- // Setup IPAM.
99- ipam := liqoipam .NewIPAM ()
100-
101- startIPAMServer := func () {
102- // Initialize and start IPAM server.
103- if err = initializeIPAM (ipam , options , dynClient , podNamespace ); err != nil {
104- klog .Errorf ("Failed to initialize IPAM: %s" , err )
105+ if options .EnableLeaderElection {
106+ if leader , err := leaderelection .Blocking (ctx , cfg , record .NewBroadcaster (), & leaderelection.Opts {
107+ PodInfo : leaderelection.PodInfo {
108+ PodName : options .PodName ,
109+ Namespace : options .LeaderElectionNamespace ,
110+ },
111+ LeaderElectorName : options .LeaderElectionName ,
112+ LeaseDuration : options .LeaseDuration ,
113+ RenewDeadline : options .RenewDeadline ,
114+ RetryPeriod : options .RetryPeriod ,
115+ }); err != nil {
116+ return err
117+ } else if ! leader {
118+ klog .Error ("IPAM is not the leader, exiting" )
105119 os .Exit (1 )
106120 }
107121 }
108122
109- stopIPAMServer := func () {
110- ipam .Terminate ()
111- }
123+ hs := health .NewServer ()
124+ options .HealthServer = hs
112125
113- ctx := ctrl . SetupSignalHandler ( )
126+ liqoIPAM := ipam . New ( & options )
114127
115- // If the lease is disabled, start IPAM server without leader election mechanism (i.e., do not support IPAM high-availability).
116- if ! options .LeaseEnabled {
117- startIPAMServer ()
118- <- ctx .Done ()
119- stopIPAMServer ()
120- return nil
121- }
122-
123- // Else, initialize the leader election mechanism to manage multiple replicas of the IPAM server running in active-passive mode.
124- leaderelectionOpts := & leaderelection.Opts {
125- PodInfo : leaderelection.PodInfo {
126- PodName : os .Getenv ("POD_NAME" ),
127- Namespace : podNamespace ,
128- DeploymentName : ptr .To (os .Getenv ("DEPLOYMENT_NAME" )),
129- },
130- LeaderElectorName : leaderElectorName ,
131- LeaseDuration : options .LeaseDuration ,
132- RenewDeadline : options .LeaseRenewDeadline ,
133- RetryPeriod : options .LeaseRetryPeriod ,
134- InitCallback : startIPAMServer ,
135- StopCallback : stopIPAMServer ,
136- LabelLeader : options .LabelLeader ,
137- }
138-
139- localClient := kubernetes .NewForConfigOrDie (cfg )
140- eb := record .NewBroadcaster ()
141- eb .StartRecordingToSink (& corev1clients.EventSinkImpl {Interface : localClient .CoreV1 ().Events (corev1 .NamespaceAll )})
142-
143- leaderElector , err := leaderelection .Init (leaderelectionOpts , cfg , eb )
128+ lis , err := net .Listen ("tcp" , fmt .Sprintf ("0.0.0.0:%d" , options .Port ))
144129 if err != nil {
145130 return err
146131 }
147132
148- // Start IPAM using leader election mechanism.
149- leaderelection .Run (ctx , leaderElector )
133+ server := grpc .NewServer ()
150134
151- return nil
152- }
135+ // Register health service
136+ grpc_health_v1 . RegisterHealthServer ( server , hs )
153137
154- func initializeIPAM (ipam * liqoipam.IPAM , opts * liqoipam.Options , dynClient dynamic.Interface , namespace string ) error {
155- if ipam == nil {
156- return fmt .Errorf ("IPAM pointer is nil. Initialize it before calling this function" )
157- }
158-
159- if err := ipam .Init (liqoipam .Pools , dynClient , namespace ); err != nil {
160- return err
161- }
162-
163- // Configure PodCIDR
164- if err := ipam .SetPodCIDR (opts .PodCIDR .String ()); err != nil {
165- return err
166- }
167-
168- // Configure ServiceCIDR
169- if err := ipam .SetServiceCIDR (opts .ServiceCIDR .String ()); err != nil {
170- return err
171- }
172-
173- // Configure additional network pools.
174- for _ , pool := range opts .AdditionalPools .StringList .StringList {
175- if err := ipam .AddNetworkPool (pool ); err != nil {
176- return err
177- }
178- }
179-
180- // Configure reserved subnets.
181- if err := ipam .SetReservedSubnets (opts .ReservedPools .StringList .StringList ); err != nil {
182- return err
183- }
138+ // Register IPAM service
139+ ipam .RegisterIPAMServer (server , liqoIPAM )
184140
185- if err := ipam .Serve (consts .IpamPort ); err != nil {
141+ if err := server .Serve (lis ); err != nil {
142+ klog .Errorf ("failed to serve: %v" , err )
186143 return err
187144 }
188145
0 commit comments