@@ -18,31 +18,23 @@ package root
1818import (
1919 "context"
2020 "os"
21- "path"
22- "time"
2321
2422 "github.com/pkg/errors"
2523 "github.com/spf13/cobra"
2624 "github.com/virtual-kubelet/virtual-kubelet/node"
2725 corev1 "k8s.io/api/core/v1"
2826 k8serrors "k8s.io/apimachinery/pkg/api/errors"
2927 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
30- "k8s.io/apimachinery/pkg/fields"
3128 "k8s.io/client-go/discovery"
32- kubeinformers "k8s.io/client-go/informers"
3329 "k8s.io/client-go/kubernetes"
34- "k8s.io/client-go/kubernetes/scheme"
3530 "k8s.io/client-go/rest"
36- "k8s.io/client-go/tools/record"
37- "k8s.io/client-go/util/workqueue"
3831 "k8s.io/klog/v2"
3932
4033 "github.com/liqotech/liqo/internal/utils/errdefs"
4134 "github.com/liqotech/liqo/pkg/utils"
4235 "github.com/liqotech/liqo/pkg/utils/restcfg"
4336 "github.com/liqotech/liqo/pkg/virtualKubelet"
4437 nodeprovider "github.com/liqotech/liqo/pkg/virtualKubelet/liqoNodeProvider"
45- "github.com/liqotech/liqo/pkg/virtualKubelet/manager"
4638 podprovider "github.com/liqotech/liqo/pkg/virtualKubelet/provider"
4739)
4840
@@ -70,8 +62,8 @@ func runRootCommand(ctx context.Context, c *Opts) error {
7062 return errors .New ("cluster id is mandatory" )
7163 }
7264
73- if c .PodSyncWorkers == 0 {
74- return errdefs .InvalidInput ("pod sync workers must be greater than 0" )
65+ if c .PodWorkers == 0 || c . ServiceWorkers == 0 || c . EndpointSliceWorkers == 0 || c . ConfigMapWorkers == 0 || c . SecretWorkers == 0 {
66+ return errdefs .InvalidInput ("reflection workers must be greater than 0" )
7567 }
7668
7769 config , err := utils .GetRestConfig (c .HomeKubeconfig )
@@ -85,27 +77,6 @@ func runRootCommand(ctx context.Context, c *Opts) error {
8577 return err
8678 }
8779
88- // Create a shared informer factory for Kubernetes pods in the current namespace (if specified) and scheduled to the current node.
89- podInformerFactory := kubeinformers .NewSharedInformerFactoryWithOptions (
90- client ,
91- c .InformerResyncPeriod ,
92- kubeinformers .WithTweakListOptions (func (options * metav1.ListOptions ) {
93- options .FieldSelector = fields .OneTermEqualSelector ("spec.nodeName" , c .NodeName ).String ()
94- }))
95- podInformer := podInformerFactory .Core ().V1 ().Pods ()
96-
97- // Create another shared informer factory for Kubernetes secrets and configmaps (not subject to any selectors).
98- scmInformerFactory := kubeinformers .NewSharedInformerFactoryWithOptions (client , c .InformerResyncPeriod )
99- // Create a secret informer and a config map informer so we can pass their listers to the resource manager.
100- secretInformer := scmInformerFactory .Core ().V1 ().Secrets ()
101- configMapInformer := scmInformerFactory .Core ().V1 ().ConfigMaps ()
102- serviceInformer := scmInformerFactory .Core ().V1 ().Services ()
103-
104- rm , err := manager .NewResourceManager (podInformer .Lister (), secretInformer .Lister (), configMapInformer .Lister (), serviceInformer .Lister ())
105- if err != nil {
106- return errors .Wrap (err , "could not create resource manager" )
107- }
108-
10980 // Initialize the pod provider
11081 podcfg := podprovider.InitConfig {
11182 HomeConfig : config ,
@@ -114,10 +85,12 @@ func runRootCommand(ctx context.Context, c *Opts) error {
11485
11586 Namespace : c .KubeletNamespace ,
11687 NodeName : c .NodeName ,
88+ NodeIP : os .Getenv ("VKUBELET_POD_IP" ),
11789
11890 LiqoIpamServer : c .LiqoIpamServer ,
11991 InformerResyncPeriod : c .InformerResyncPeriod ,
12092
93+ PodWorkers : c .PodWorkers ,
12194 ServiceWorkers : c .ServiceWorkers ,
12295 EndpointSliceWorkers : c .EndpointSliceWorkers ,
12396 ConfigMapWorkers : c .ConfigMapWorkers ,
@@ -195,55 +168,12 @@ func runRootCommand(ctx context.Context, c *Opts) error {
195168 return err
196169 }
197170
198- eb := record .NewBroadcaster ()
199- pc , err := node .NewPodController (node.PodControllerConfig {
200- PodClient : client .CoreV1 (),
201- PodInformer : podInformer ,
202- EventRecorder : eb .NewRecorder (scheme .Scheme , corev1.EventSource {Component : path .Join (c .NodeName , "pod-controller" )}),
203- Provider : podProvider ,
204- SecretInformer : secretInformer ,
205- ConfigMapInformer : configMapInformer ,
206- ServiceInformer : serviceInformer ,
207- SyncPodsFromKubernetesRateLimiter : newPodControllerWorkqueueRateLimiter (),
208- SyncPodStatusFromProviderRateLimiter : newPodControllerWorkqueueRateLimiter (),
209- DeletePodsFromKubernetesRateLimiter : newPodControllerWorkqueueRateLimiter (),
210- })
211- if err != nil {
212- return errors .Wrap (err , "error setting up pod controller" )
213- }
214-
215- go podInformerFactory .Start (ctx .Done ())
216- go scmInformerFactory .Start (ctx .Done ())
217-
218- cancelHTTP , err := setupHTTPServer (ctx , podProvider , getAPIConfig (c ), func (context.Context ) ([]* corev1.Pod , error ) {
219- return rm .GetPods (), nil
220- })
171+ cancelHTTP , err := setupHTTPServer (ctx , podProvider .PodHandler (), getAPIConfig (c ))
221172 if err != nil {
222173 return errors .Wrap (err , "error while setting up HTTP server" )
223174 }
224175 defer cancelHTTP ()
225176
226- go func () {
227- if err := pc .Run (ctx , int (c .PodSyncWorkers )); err != nil && errors .Is (err , context .Canceled ) {
228- klog .Fatal (errors .Wrap (err , "error in pod controller running" ))
229- }
230- }()
231-
232- if c .StartupTimeout > 0 {
233- ctx , cancel := context .WithTimeout (ctx , c .StartupTimeout )
234- klog .Info ("Waiting for pod controller / VK to be ready" )
235- select {
236- case <- ctx .Done ():
237- cancel ()
238- return ctx .Err ()
239- case <- pc .Ready ():
240- }
241- cancel ()
242- if err := pc .Err (); err != nil {
243- return err
244- }
245- }
246-
247177 go func () {
248178 if err := nodeRunner .Run (ctx ); err != nil {
249179 klog .Error (err , "error in pod controller running" )
@@ -259,14 +189,6 @@ func runRootCommand(ctx context.Context, c *Opts) error {
259189 return nil
260190}
261191
262- // newPodControllerWorkqueueRateLimiter returns a new custom rate limiter to be assigned to the pod controller workqueues.
263- // Differently from the standard workqueue.DefaultControllerRateLimiter(), composed of an overall bucket rate limiter
264- // and a per-item exponential rate limiter to address failures, this includes only the latter component. Hance avoiding
265- // performance limitations when processing a high number of pods in parallel.
266- func newPodControllerWorkqueueRateLimiter () workqueue.RateLimiter {
267- return workqueue .NewItemExponentialFailureRateLimiter (5 * time .Millisecond , 1000 * time .Second )
268- }
269-
270192func getVersion (config * rest.Config ) string {
271193 client , err := discovery .NewDiscoveryClientForConfig (config )
272194 if err != nil {
0 commit comments