Skip to content

Commit 979a673

Browse files
committed
fix the incorrect calculation of node resources in case of sidecar containers being injected during sync to host when virtual scheduler is enabled
1 parent a8c5ab5 commit 979a673

File tree

6 files changed

+333
-67
lines changed

6 files changed

+333
-67
lines changed

chart/values.schema.json

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2881,6 +2881,24 @@
28812881
"additionalProperties": false,
28822882
"type": "object"
28832883
},
2884+
"ReservedResources": {
2885+
"properties": {
2886+
"cpu": {
2887+
"type": "string",
2888+
"description": "CPU is amount of CPU to reserve."
2889+
},
2890+
"memory": {
2891+
"type": "string",
2892+
"description": "Memory is amount of Memory to reserve."
2893+
},
2894+
"ephemeralStorage": {
2895+
"type": "string",
2896+
"description": "EphemeralStorage is amount of EphemeralStorage to reserve."
2897+
}
2898+
},
2899+
"additionalProperties": false,
2900+
"type": "object"
2901+
},
28842902
"ResolveDNS": {
28852903
"properties": {
28862904
"hostname": {
@@ -3304,6 +3322,10 @@
33043322
},
33053323
"type": "array",
33063324
"description": "Patches patch the resource according to the provided specification."
3325+
},
3326+
"reservedResources": {
3327+
"$ref": "#/$defs/ReservedResources",
3328+
"description": "ReservedResources specifies the amount of a particular resource type to be reserved/subtracted from the allocatable resource on the virtual node.\nThis is to account for additional resource requirement in case of sidecar container(s) injected via mutating webhooks when workloads are synced from virtual to host.\nThis will take effect only when the virtual scheduler is enabled."
33073329
}
33083330
},
33093331
"additionalProperties": false,

chart/values.yaml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,16 @@ sync:
168168
# All specifies if all nodes should get synced by vCluster from the host to the virtual cluster or only the ones where pods are assigned to.
169169
all: false
170170
labels: {}
171+
# ReservedResources specifies the amount of a particular resource type to be reserved/subtracted from the allocatable resource on the virtual node.
172+
# This is to account for additional resource requirement in case of sidecar container(s) injected via mutating webhooks when workloads are synced from virtual to host.
173+
# This will take effect only when the virtual scheduler is enabled.
174+
reservedResources:
175+
# CPU is amount of CPU to reserve.
176+
cpu: ""
177+
# Memory is amount of Memory to reserve.
178+
memory: ""
179+
# EphemeralStorage is amount of EphemeralStorage to reserve.
180+
ephemeralStorage: ""
171181
# Secrets defines if secrets in the host should get synced to the virtual cluster.
172182
secrets:
173183
# Enabled defines if this option should be enabled.

config/config.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -800,6 +800,22 @@ type SyncNodes struct {
800800

801801
// Patches patch the resource according to the provided specification.
802802
Patches []TranslatePatch `json:"patches,omitempty"`
803+
804+
// ReservedResources specifies the amount of a particular resource type to be reserved/subtracted from the allocatable resource on the virtual node.
805+
// This is to account for additional resource requirement in case of sidecar container(s) injected via mutating webhooks when workloads are synced from virtual to host.
806+
// This will take effect only when the virtual scheduler is enabled.
807+
ReservedResources ReservedResources `json:"reservedResources,omitempty"`
808+
}
809+
810+
type ReservedResources struct {
811+
// CPU is amount of CPU to reserve.
812+
CPU string `json:"cpu,omitempty"`
813+
814+
// Memory is amount of Memory to reserve.
815+
Memory string `json:"memory,omitempty"`
816+
817+
// EphemeralStorage is amount of EphemeralStorage to reserve.
818+
EphemeralStorage string `json:"ephemeralStorage,omitempty"`
803819
}
804820

805821
type SyncNodeSelector struct {

config/values.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,10 @@ sync:
8080
selector:
8181
all: false
8282
labels: {}
83+
reservedResources:
84+
cpu: ""
85+
memory: ""
86+
ephemeralStorage: ""
8387
secrets:
8488
enabled: false
8589
mappings:

pkg/controllers/resources/nodes/syncer.go

Lines changed: 90 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,10 @@ func NewSyncer(ctx *synccontext.RegisterContext, nodeServiceProvider nodeservice
6969
fakeKubeletIPs: ctx.Config.Networking.Advanced.ProxyKubelets.ByIP,
7070
fakeKubeletHostnames: ctx.Config.Networking.Advanced.ProxyKubelets.ByHostname,
7171

72+
reservedResourceCPU: ctx.Config.Sync.FromHost.Nodes.ReservedResources.CPU,
73+
reservedResourceMemory: ctx.Config.Sync.FromHost.Nodes.ReservedResources.Memory,
74+
reservedResourceEphemeralStorage: ctx.Config.Sync.FromHost.Nodes.ReservedResources.EphemeralStorage,
75+
7276
physicalClient: ctx.PhysicalManager.GetClient(),
7377
virtualClient: ctx.VirtualManager.GetClient(),
7478
nodeServiceProvider: nodeServiceProvider,
@@ -79,18 +83,22 @@ func NewSyncer(ctx *synccontext.RegisterContext, nodeServiceProvider nodeservice
7983
type nodeSyncer struct {
8084
synccontext.Mapper
8185

82-
nodeSelector labels.Selector
83-
physicalClient client.Client
84-
virtualClient client.Client
85-
unmanagedPodCache client.Reader
86-
nodeServiceProvider nodeservice.Provider
87-
enforcedTolerations []*corev1.Toleration
88-
enableScheduler bool
89-
clearImages bool
90-
enforceNodeSelector bool
91-
useFakeKubelets bool
92-
fakeKubeletIPs bool
93-
fakeKubeletHostnames bool
86+
nodeSelector labels.Selector
87+
physicalClient client.Client
88+
virtualClient client.Client
89+
unmanagedPodCache client.Reader
90+
managedPodCache client.Reader
91+
nodeServiceProvider nodeservice.Provider
92+
enforcedTolerations []*corev1.Toleration
93+
enableScheduler bool
94+
clearImages bool
95+
enforceNodeSelector bool
96+
useFakeKubelets bool
97+
fakeKubeletIPs bool
98+
fakeKubeletHostnames bool
99+
reservedResourceCPU string
100+
reservedResourceMemory string
101+
reservedResourceEphemeralStorage string
94102
}
95103

96104
func (s *nodeSyncer) Resource() client.Object {
@@ -116,7 +124,7 @@ func (s *nodeSyncer) ModifyController(ctx *synccontext.RegisterContext, bld *bui
116124
return bld, fmt.Errorf("constructing label selector for non-vcluster pods: %w", err)
117125
}
118126
// create a pod cache containing pods from all namespaces for calculating the correct node resources
119-
podCache, err := cache.New(ctx.PhysicalManager.GetConfig(), cache.Options{
127+
unmanagedPodCache, err := cache.New(ctx.PhysicalManager.GetConfig(), cache.Options{
120128
Scheme: ctx.PhysicalManager.GetScheme(),
121129
Mapper: ctx.PhysicalManager.GetRESTMapper(),
122130
// omits pods managed by the vcluster
@@ -126,7 +134,7 @@ func (s *nodeSyncer) ModifyController(ctx *synccontext.RegisterContext, bld *bui
126134
return nil, fmt.Errorf("create cache : %w", err)
127135
}
128136
// add index for pod by node
129-
err = podCache.IndexField(ctx, &corev1.Pod{}, constants.IndexRunningNonVClusterPodsByNode, func(object client.Object) []string {
137+
err = unmanagedPodCache.IndexField(ctx, &corev1.Pod{}, constants.IndexRunningNonVClusterPodsByNode, func(object client.Object) []string {
130138
pPod := object.(*corev1.Pod)
131139
// we ignore all non-running pods and the ones that are part of the current vcluster
132140
// to later calculate the status.allocatable part of the nodes correctly
@@ -141,41 +149,89 @@ func (s *nodeSyncer) ModifyController(ctx *synccontext.RegisterContext, bld *bui
141149
if err != nil {
142150
return nil, fmt.Errorf("index pod by node: %w", err)
143151
}
152+
153+
managedSelector, err := labels.NewRequirement(translate.MarkerLabel, selection.Equals, []string{translate.VClusterName})
154+
if err != nil {
155+
return bld, fmt.Errorf("constructing label selector for vcluster managed pods: %w", err)
156+
}
157+
// create a pod cache containing pods managed by the current vCluster
158+
managedPodCache, err := cache.New(ctx.PhysicalManager.GetConfig(), cache.Options{
159+
Scheme: ctx.PhysicalManager.GetScheme(),
160+
Mapper: ctx.PhysicalManager.GetRESTMapper(),
161+
// only select pods managed by the vcluster
162+
DefaultLabelSelector: labels.NewSelector().Add(*managedSelector),
163+
})
164+
if err != nil {
165+
return nil, fmt.Errorf("create cache : %w", err)
166+
}
167+
// add index for pod by node
168+
err = managedPodCache.IndexField(ctx.Context, &corev1.Pod{}, constants.IndexByAssigned, func(object client.Object) []string {
169+
pPod := object.(*corev1.Pod)
170+
// we ignore all non-running pods and the ones not assigned to a node
171+
if pPod.Status.Phase == corev1.PodSucceeded || pPod.Status.Phase == corev1.PodFailed {
172+
return []string{}
173+
} else if pPod.Spec.NodeName == "" {
174+
return []string{}
175+
}
176+
177+
return []string{pPod.Spec.NodeName}
178+
})
179+
if err != nil {
180+
return nil, fmt.Errorf("index pod by node: %w", err)
181+
}
182+
183+
go func() {
184+
err := unmanagedPodCache.Start(ctx)
185+
if err != nil {
186+
klog.Fatalf("error starting unmanaged pod cache: %v", err)
187+
}
188+
}()
189+
144190
go func() {
145-
err := podCache.Start(ctx)
191+
err := managedPodCache.Start(ctx.Context)
146192
if err != nil {
147-
klog.Fatalf("error starting pod cache: %v", err)
193+
klog.Fatalf("error starting managed pod cache: %v", err)
148194
}
149195
}()
150196

151-
podCache.WaitForCacheSync(ctx)
152-
s.unmanagedPodCache = podCache
197+
unmanagedPodCache.WaitForCacheSync(ctx)
198+
s.unmanagedPodCache = unmanagedPodCache
199+
200+
managedPodCache.WaitForCacheSync(ctx)
201+
s.managedPodCache = managedPodCache
153202

154203
// enqueues nodes based on pod phase changes if the scheduler is enabled
155204
// the syncer is configured to update virtual node's .status.allocatable fields by summing the consumption of these pods
205+
handlerFuncs := handler.TypedFuncs[*corev1.Pod, ctrl.Request]{
206+
GenericFunc: func(_ context.Context, ev event.TypedGenericEvent[*corev1.Pod], q workqueue.TypedRateLimitingInterface[ctrl.Request]) {
207+
enqueuePod(nil, ev.Object, q)
208+
},
209+
CreateFunc: func(_ context.Context, ev event.TypedCreateEvent[*corev1.Pod], q workqueue.TypedRateLimitingInterface[ctrl.Request]) {
210+
enqueuePod(nil, ev.Object, q)
211+
},
212+
UpdateFunc: func(_ context.Context, ue event.TypedUpdateEvent[*corev1.Pod], q workqueue.TypedRateLimitingInterface[ctrl.Request]) {
213+
enqueuePod(ue.ObjectOld, ue.ObjectNew, q)
214+
},
215+
DeleteFunc: func(_ context.Context, ev event.TypedDeleteEvent[*corev1.Pod], q workqueue.TypedRateLimitingInterface[ctrl.Request]) {
216+
enqueuePod(nil, ev.Object, q)
217+
},
218+
}
219+
220+
bld.WatchesRawSource(
221+
source.Kind(unmanagedPodCache, &corev1.Pod{},
222+
handlerFuncs),
223+
)
224+
156225
bld.WatchesRawSource(
157-
source.Kind(podCache, &corev1.Pod{},
158-
handler.TypedFuncs[*corev1.Pod, ctrl.Request]{
159-
GenericFunc: func(_ context.Context, ev event.TypedGenericEvent[*corev1.Pod], q workqueue.TypedRateLimitingInterface[ctrl.Request]) {
160-
enqueueNonVClusterPod(nil, ev.Object, q)
161-
},
162-
CreateFunc: func(_ context.Context, ev event.TypedCreateEvent[*corev1.Pod], q workqueue.TypedRateLimitingInterface[ctrl.Request]) {
163-
enqueueNonVClusterPod(nil, ev.Object, q)
164-
},
165-
UpdateFunc: func(_ context.Context, ue event.TypedUpdateEvent[*corev1.Pod], q workqueue.TypedRateLimitingInterface[ctrl.Request]) {
166-
enqueueNonVClusterPod(ue.ObjectOld, ue.ObjectNew, q)
167-
},
168-
DeleteFunc: func(_ context.Context, ev event.TypedDeleteEvent[*corev1.Pod], q workqueue.TypedRateLimitingInterface[ctrl.Request]) {
169-
enqueueNonVClusterPod(nil, ev.Object, q)
170-
},
171-
}),
226+
source.Kind(managedPodCache, &corev1.Pod{},
227+
handlerFuncs),
172228
)
173229
}
174230
return modifyController(ctx, s.nodeServiceProvider, bld)
175231
}
176232

177233
// only used when scheduler is enabled
178-
func enqueueNonVClusterPod(old, new client.Object, q workqueue.TypedRateLimitingInterface[ctrl.Request]) {
234+
func enqueuePod(old, new client.Object, q workqueue.TypedRateLimitingInterface[ctrl.Request]) {
179235
pod, ok := new.(*corev1.Pod)
180236
if !ok {
181237
klog.Errorf("invalid type passed to pod handler: %T", new)

0 commit comments

Comments
 (0)