Skip to content

Commit 3c649c2

Browse files
giorio94adamjensenbot
authored andcommitted
Virtual kubelet: working queue-based service reflection
1 parent dae3f31 commit 3c649c2

File tree

19 files changed

+675
-260
lines changed

19 files changed

+675
-260
lines changed

cmd/virtual-kubelet/root/flag.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,9 @@ func InstallFlags(flags *pflag.FlagSet, c *Opts) {
2727
flags.StringVar(&c.MetricsAddr, "metrics-addr", c.MetricsAddr, "address to listen for metrics/stats requests")
2828

2929
flags.UintVar(&c.PodSyncWorkers, "pod-sync-workers", c.PodSyncWorkers, "the number of pod synchronization workers")
30+
flags.UintVar(&c.ServiceWorkers, "service-reflection-workers", c.ServiceWorkers, "the number of service reflection workers")
31+
flags.UintVar(&c.EndpointSliceWorkers, "endpointslice-reflection-workers", c.EndpointSliceWorkers,
32+
"the number of endpointslice reflection workers")
3033

3134
flags.DurationVar(&c.InformerResyncPeriod, "full-resync-period", c.InformerResyncPeriod,
3235
"how often to perform a full resync of pods between kubernetes and the provider")

cmd/virtual-kubelet/root/opts.go

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,10 @@ const (
3232
DefaultLiqoInformerResyncPeriod time.Duration = 0
3333
DefaultMetricsAddr = ":10255"
3434
DefaultListenPort = 10250
35-
DefaultPodSyncWorkers = 10
35+
36+
DefaultPodSyncWorkers = 10
37+
DefaultServiceWorkers = 3
38+
DefaultEndpointSliceWorkers = 10
3639

3740
DefaultKubeletNamespace = "default"
3841
DefaultLiqoIpamServer = consts.NetworkManagerServiceName
@@ -54,8 +57,10 @@ type Opts struct {
5457

5558
MetricsAddr string
5659

57-
// Number of workers to use to handle pod notifications
58-
PodSyncWorkers uint
60+
// Number of workers to use to handle pod notifications and resource reflection
61+
PodSyncWorkers uint
62+
ServiceWorkers uint
63+
EndpointSliceWorkers uint
5964

6065
InformerResyncPeriod time.Duration
6166
LiqoInformerResyncPeriod time.Duration
@@ -94,6 +99,14 @@ func SetDefaultOpts(c *Opts) error {
9499
c.PodSyncWorkers = DefaultPodSyncWorkers
95100
}
96101

102+
if c.ServiceWorkers == 0 {
103+
c.ServiceWorkers = DefaultServiceWorkers
104+
}
105+
106+
if c.EndpointSliceWorkers == 0 {
107+
c.EndpointSliceWorkers = DefaultEndpointSliceWorkers
108+
}
109+
97110
if c.ListenPort == 0 {
98111
if kp := os.Getenv("KUBELET_PORT"); kp != "" {
99112
p, err := strconv.ParseInt(kp, 10, 32)

cmd/virtual-kubelet/root/root.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -112,10 +112,14 @@ func runRootCommand(ctx context.Context, c *Opts) error {
112112
HomeClusterID: c.HomeClusterID,
113113
RemoteClusterID: c.ForeignClusterID,
114114

115-
Namespace: c.KubeletNamespace,
116-
NodeName: c.NodeName,
115+
Namespace: c.KubeletNamespace,
116+
NodeName: c.NodeName,
117+
117118
LiqoIpamServer: c.LiqoIpamServer,
118119
InformerResyncPeriod: c.InformerResyncPeriod,
120+
121+
ServiceWorkers: c.ServiceWorkers,
122+
EndpointSliceWorkers: c.EndpointSliceWorkers,
119123
}
120124

121125
podProvider, err := podprovider.NewLiqoProvider(ctx, &podcfg)

internal/crdReplicator/reflection/handler_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@ var _ = Describe("Handler tests", func() {
174174
})
175175

176176
It("should succeed", func() { Expect(err).ToNot(HaveOccurred()) })
177-
It("should remote the finalizer from the local object", func() {
177+
It("should remove the finalizer from the local object", func() {
178178
Expect(localAfter.Finalizers).ToNot(ContainElement(finalizer))
179179
})
180180
It("the remote object should not be created", func() {

pkg/virtualKubelet/apiReflection/const.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ const (
1919
EndpointSlices
2020
Pods
2121
ReplicaSets
22-
Services
2322
Secrets
2423
)
2524

@@ -30,7 +29,6 @@ var ApiNames = map[ApiType]string{
3029
EndpointSlices: "endpointslices",
3130
Pods: "pods",
3231
ReplicaSets: "replicasets",
33-
Services: "services",
3432
Secrets: "secrets",
3533
}
3634

pkg/virtualKubelet/apiReflection/reflectors/blacklists.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,4 @@ var Blacklist = map[apimgmt.ApiType]blackListType{
2828
"default/kubernetes": struct{}{},
2929
},
3030
apimgmt.Pods: {},
31-
apimgmt.Services: {
32-
"default/kubernetes": struct{}{},
33-
},
3431
}

pkg/virtualKubelet/apiReflection/reflectors/outgoing/apiTypes.go

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@ var ReflectorBuilders = map[apimgmt.ApiType]func(reflector ri.APIReflector, opts
3232
apimgmt.Configmaps: configmapsReflectorBuilder,
3333
apimgmt.EndpointSlices: endpointslicesReflectorBuilder,
3434
apimgmt.Secrets: secretsReflectorBuilder,
35-
apimgmt.Services: servicesReflectorBuilder,
3635
}
3736

3837
func configmapsReflectorBuilder(reflector ri.APIReflector, _ map[options.OptionKey]options.Option) ri.OutgoingAPIReflector {
@@ -58,7 +57,3 @@ func endpointslicesReflectorBuilder(reflector ri.APIReflector, opts map[options.
5857
func secretsReflectorBuilder(reflector ri.APIReflector, _ map[options.OptionKey]options.Option) ri.OutgoingAPIReflector {
5958
return &SecretsReflector{APIReflector: reflector}
6059
}
61-
62-
func servicesReflectorBuilder(reflector ri.APIReflector, _ map[options.OptionKey]options.Option) ri.OutgoingAPIReflector {
63-
return &ServicesReflector{APIReflector: reflector}
64-
}

pkg/virtualKubelet/apiReflection/reflectors/outgoing/services.go

Lines changed: 0 additions & 217 deletions
This file was deleted.

pkg/virtualKubelet/forge/apiForger.go

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
appsv1 "k8s.io/api/apps/v1"
2424
corev1 "k8s.io/api/core/v1"
2525
discoveryv1beta1 "k8s.io/api/discovery/v1beta1"
26+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2627
"k8s.io/apimachinery/pkg/runtime"
2728
"k8s.io/klog/v2"
2829

@@ -33,6 +34,9 @@ import (
3334
"github.com/liqotech/liqo/pkg/virtualKubelet/options/types"
3435
)
3536

37+
// ReflectionFieldManager -> The name associated with the fields modified by virtual kubelet reflection.
38+
const ReflectionFieldManager = "reflection.liqo.io"
39+
3640
var (
3741
// LocalClusterID -> the cluster ID associated with the local cluster.
3842
LocalClusterID string
@@ -66,8 +70,6 @@ func HomeToForeign(homeObj, foreignObj runtime.Object, reflectionType string) (r
6670
return forger.endpointsliceHomeToForeign(homeObj.(*discoveryv1beta1.EndpointSlice), foreignObj.(*discoveryv1beta1.EndpointSlice))
6771
case *corev1.Pod:
6872
return forger.podHomeToForeign(homeObj, foreignObj, reflectionType)
69-
case *corev1.Service:
70-
return forger.serviceHomeToForeign(homeObj.(*corev1.Service), foreignObj.(*corev1.Service))
7173
}
7274

7375
return nil, errors.Errorf("error while creating foreign object from home: api %s unhandled", reflect.TypeOf(homeObj).String())
@@ -114,7 +116,15 @@ func initIpamClient() {
114116
grpc.WithInsecure(),
115117
grpc.WithBlock())
116118
if err != nil {
117-
klog.Error(err)
119+
klog.Fatalf("Failed to initialize IPAM client: %v", err)
118120
}
119121
forger.ipamClient = liqonetIpam.NewIpamClient(conn)
120122
}
123+
124+
// ApplyOptions returns the apply options configured for object reflection.
125+
func ApplyOptions() metav1.ApplyOptions {
126+
return metav1.ApplyOptions{
127+
Force: true,
128+
FieldManager: ReflectionFieldManager,
129+
}
130+
}

pkg/virtualKubelet/forge/forge_suite_test.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,11 @@ import (
2121
. "github.com/onsi/gomega"
2222
)
2323

24+
const (
25+
LocalClusterID = "local-cluster"
26+
RemoteClusterID = "remote-cluster"
27+
)
28+
2429
func TestForge(t *testing.T) {
2530
RegisterFailHandler(Fail)
2631
RunSpecs(t, "Forge Suite")

0 commit comments

Comments
 (0)