Skip to content

bugfix: fix the incorrect calculation of node resources when sidecar containers are being injected during sync to host with virtual scheduler enabled #2652

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions chart/values.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -2881,6 +2881,24 @@
"additionalProperties": false,
"type": "object"
},
"ReservedResources": {
"properties": {
"cpu": {
"type": "string",
"description": "CPU is amount of CPU to reserve."
},
"memory": {
"type": "string",
"description": "Memory is amount of Memory to reserve."
},
"ephemeralStorage": {
"type": "string",
"description": "EphemeralStorage is amount of EphemeralStorage to reserve."
}
},
"additionalProperties": false,
"type": "object"
},
"ResolveDNS": {
"properties": {
"hostname": {
Expand Down Expand Up @@ -3304,6 +3322,10 @@
},
"type": "array",
"description": "Patches patch the resource according to the provided specification."
},
"reservedResources": {
"$ref": "#/$defs/ReservedResources",
"description": "ReservedResources specifies the amount of a particular resource type to be reserved/subtracted from the allocatable resource on the virtual node.\nThis is to account for additional resource requirement in case of sidecar container(s) injected via mutating webhooks when workloads are synced from virtual to host.\nThis will take effect only when the virtual scheduler is enabled."
}
},
"additionalProperties": false,
Expand Down
10 changes: 10 additions & 0 deletions chart/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,16 @@ sync:
# All specifies if all nodes should get synced by vCluster from the host to the virtual cluster or only the ones where pods are assigned to.
all: false
labels: {}
# ReservedResources specifies the amount of a particular resource type to be reserved/subtracted from the allocatable resource on the virtual node.
# This is to account for additional resource requirement in case of sidecar container(s) injected via mutating webhooks when workloads are synced from virtual to host.
# This will take effect only when the virtual scheduler is enabled.
reservedResources:
# CPU is amount of CPU to reserve.
cpu: ""
# Memory is amount of Memory to reserve.
memory: ""
# EphemeralStorage is amount of EphemeralStorage to reserve.
ephemeralStorage: ""
# Secrets defines if secrets in the host should get synced to the virtual cluster.
secrets:
# Enabled defines if this option should be enabled.
Expand Down
16 changes: 16 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -800,6 +800,22 @@ type SyncNodes struct {

// Patches patch the resource according to the provided specification.
Patches []TranslatePatch `json:"patches,omitempty"`

// ReservedResources specifies the amount of a particular resource type to be reserved/subtracted from the allocatable resource on the virtual node.
// This is to account for additional resource requirement in case of sidecar container(s) injected via mutating webhooks when workloads are synced from virtual to host.
// This will take effect only when the virtual scheduler is enabled.
ReservedResources ReservedResources `json:"reservedResources,omitempty"`
}

type ReservedResources struct {
// CPU is amount of CPU to reserve.
CPU string `json:"cpu,omitempty"`

// Memory is amount of Memory to reserve.
Memory string `json:"memory,omitempty"`

// EphemeralStorage is amount of EphemeralStorage to reserve.
EphemeralStorage string `json:"ephemeralStorage,omitempty"`
}

type SyncNodeSelector struct {
Expand Down
17 changes: 12 additions & 5 deletions config/legacyconfig/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,11 +203,18 @@ type SyncPods struct {
type SyncNodes struct {
Enabled *bool `json:"enabled,omitempty"`

FakeKubeletIPs *bool `json:"fakeKubeletIPs,omitempty"`
SyncAllNodes *bool `json:"syncAllNodes,omitempty"`
NodeSelector string `json:"nodeSelector,omitempty"`
EnableScheduler *bool `json:"enableScheduler,omitempty"`
SyncNodeChanges *bool `json:"syncNodeChanges,omitempty"`
FakeKubeletIPs *bool `json:"fakeKubeletIPs,omitempty"`
SyncAllNodes *bool `json:"syncAllNodes,omitempty"`
NodeSelector string `json:"nodeSelector,omitempty"`
EnableScheduler *bool `json:"enableScheduler,omitempty"`
SyncNodeChanges *bool `json:"syncNodeChanges,omitempty"`
ReservedResources ReservedResourcesValues `json:"reservedResources,omitempty"`
}

type ReservedResourcesValues struct {
CPU string `json:"cpu,omitempty"`
Memory string `json:"memory,omitempty"`
EphemeralStorage string `json:"ephemeralStorage,omitempty"`
}

type SyncGeneric struct {
Expand Down
9 changes: 9 additions & 0 deletions config/legacyconfig/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -622,6 +622,15 @@ func convertBaseValues(oldConfig BaseHelm, newConfig *config.Config) error {
if oldConfig.Sync.Nodes.SyncNodeChanges != nil {
newConfig.Sync.FromHost.Nodes.SyncBackChanges = *oldConfig.Sync.Nodes.SyncNodeChanges
}
if oldConfig.Sync.Nodes.ReservedResources.CPU != "" {
newConfig.Sync.FromHost.Nodes.ReservedResources.CPU = oldConfig.Sync.Nodes.ReservedResources.CPU
}
if oldConfig.Sync.Nodes.ReservedResources.Memory != "" {
newConfig.Sync.FromHost.Nodes.ReservedResources.Memory = oldConfig.Sync.Nodes.ReservedResources.Memory
}
if oldConfig.Sync.Nodes.ReservedResources.EphemeralStorage != "" {
newConfig.Sync.FromHost.Nodes.ReservedResources.EphemeralStorage = oldConfig.Sync.Nodes.ReservedResources.EphemeralStorage
}
if oldConfig.Sync.PersistentVolumes.Enabled != nil {
newConfig.Sync.ToHost.PersistentVolumes.Enabled = *oldConfig.Sync.PersistentVolumes.Enabled
}
Expand Down
30 changes: 30 additions & 0 deletions config/legacyconfig/migrate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -727,6 +727,36 @@ exportKubeConfig:
context: my-context
server: https://my-vcluster.example.com`,
},
{
Name: "reservedResources set",
Distro: "k8s",
In: `
sync:
nodes:
reservedResources:
cpu: "100m"
memory: "256Mi"
ephemeralStorage: "512Mi"
`,
Expected: `controlPlane:
backingStore:
etcd:
deploy:
enabled: true
distro:
k8s:
enabled: true
statefulSet:
scheduling:
podManagementPolicy: OrderedReady
sync:
fromHost:
nodes:
reservedResources:
cpu: 100m
ephemeralStorage: 512Mi
memory: 256Mi`,
},
}

for _, testCase := range testCases {
Expand Down
4 changes: 4 additions & 0 deletions config/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ sync:
selector:
all: false
labels: {}
reservedResources:
cpu: ""
memory: ""
ephemeralStorage: ""
secrets:
enabled: false
mappings:
Expand Down
124 changes: 90 additions & 34 deletions pkg/controllers/resources/nodes/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ func NewSyncer(ctx *synccontext.RegisterContext, nodeServiceProvider nodeservice
fakeKubeletIPs: ctx.Config.Networking.Advanced.ProxyKubelets.ByIP,
fakeKubeletHostnames: ctx.Config.Networking.Advanced.ProxyKubelets.ByHostname,

reservedResourceCPU: ctx.Config.Sync.FromHost.Nodes.ReservedResources.CPU,
reservedResourceMemory: ctx.Config.Sync.FromHost.Nodes.ReservedResources.Memory,
reservedResourceEphemeralStorage: ctx.Config.Sync.FromHost.Nodes.ReservedResources.EphemeralStorage,

physicalClient: ctx.PhysicalManager.GetClient(),
virtualClient: ctx.VirtualManager.GetClient(),
nodeServiceProvider: nodeServiceProvider,
Expand All @@ -79,18 +83,22 @@ func NewSyncer(ctx *synccontext.RegisterContext, nodeServiceProvider nodeservice
type nodeSyncer struct {
synccontext.Mapper

nodeSelector labels.Selector
physicalClient client.Client
virtualClient client.Client
unmanagedPodCache client.Reader
nodeServiceProvider nodeservice.Provider
enforcedTolerations []*corev1.Toleration
enableScheduler bool
clearImages bool
enforceNodeSelector bool
useFakeKubelets bool
fakeKubeletIPs bool
fakeKubeletHostnames bool
nodeSelector labels.Selector
physicalClient client.Client
virtualClient client.Client
unmanagedPodCache client.Reader
managedPodCache client.Reader
nodeServiceProvider nodeservice.Provider
enforcedTolerations []*corev1.Toleration
enableScheduler bool
clearImages bool
enforceNodeSelector bool
useFakeKubelets bool
fakeKubeletIPs bool
fakeKubeletHostnames bool
reservedResourceCPU string
reservedResourceMemory string
reservedResourceEphemeralStorage string
}

func (s *nodeSyncer) Resource() client.Object {
Expand All @@ -116,7 +124,7 @@ func (s *nodeSyncer) ModifyController(ctx *synccontext.RegisterContext, bld *bui
return bld, fmt.Errorf("constructing label selector for non-vcluster pods: %w", err)
}
// create a pod cache containing pods from all namespaces for calculating the correct node resources
podCache, err := cache.New(ctx.PhysicalManager.GetConfig(), cache.Options{
unmanagedPodCache, err := cache.New(ctx.PhysicalManager.GetConfig(), cache.Options{
Scheme: ctx.PhysicalManager.GetScheme(),
Mapper: ctx.PhysicalManager.GetRESTMapper(),
// omits pods managed by the vcluster
Expand All @@ -126,7 +134,7 @@ func (s *nodeSyncer) ModifyController(ctx *synccontext.RegisterContext, bld *bui
return nil, fmt.Errorf("create cache : %w", err)
}
// add index for pod by node
err = podCache.IndexField(ctx, &corev1.Pod{}, constants.IndexRunningNonVClusterPodsByNode, func(object client.Object) []string {
err = unmanagedPodCache.IndexField(ctx, &corev1.Pod{}, constants.IndexRunningNonVClusterPodsByNode, func(object client.Object) []string {
pPod := object.(*corev1.Pod)
// we ignore all non-running pods and the ones that are part of the current vcluster
// to later calculate the status.allocatable part of the nodes correctly
Expand All @@ -141,41 +149,89 @@ func (s *nodeSyncer) ModifyController(ctx *synccontext.RegisterContext, bld *bui
if err != nil {
return nil, fmt.Errorf("index pod by node: %w", err)
}

managedSelector, err := labels.NewRequirement(translate.MarkerLabel, selection.Equals, []string{translate.VClusterName})
if err != nil {
return bld, fmt.Errorf("constructing label selector for vcluster managed pods: %w", err)
}
// create a pod cache containing pods managed by the current vCluster
managedPodCache, err := cache.New(ctx.PhysicalManager.GetConfig(), cache.Options{
Scheme: ctx.PhysicalManager.GetScheme(),
Mapper: ctx.PhysicalManager.GetRESTMapper(),
// only select pods managed by the vcluster
DefaultLabelSelector: labels.NewSelector().Add(*managedSelector),
})
if err != nil {
return nil, fmt.Errorf("create cache : %w", err)
}
// add index for pod by node
err = managedPodCache.IndexField(ctx.Context, &corev1.Pod{}, constants.IndexByAssigned, func(object client.Object) []string {
pPod := object.(*corev1.Pod)
// we ignore all non-running pods and the ones not assigned to a node
if pPod.Status.Phase == corev1.PodSucceeded || pPod.Status.Phase == corev1.PodFailed {
return []string{}
} else if pPod.Spec.NodeName == "" {
return []string{}
}

return []string{pPod.Spec.NodeName}
})
if err != nil {
return nil, fmt.Errorf("index pod by node: %w", err)
}

go func() {
err := unmanagedPodCache.Start(ctx)
if err != nil {
klog.Fatalf("error starting unmanaged pod cache: %v", err)
}
}()

go func() {
err := podCache.Start(ctx)
err := managedPodCache.Start(ctx.Context)
if err != nil {
klog.Fatalf("error starting pod cache: %v", err)
klog.Fatalf("error starting managed pod cache: %v", err)
}
}()

podCache.WaitForCacheSync(ctx)
s.unmanagedPodCache = podCache
unmanagedPodCache.WaitForCacheSync(ctx)
s.unmanagedPodCache = unmanagedPodCache

managedPodCache.WaitForCacheSync(ctx)
s.managedPodCache = managedPodCache

// enqueues nodes based on pod phase changes if the scheduler is enabled
// the syncer is configured to update virtual node's .status.allocatable fields by summing the consumption of these pods
handlerFuncs := handler.TypedFuncs[*corev1.Pod, ctrl.Request]{
GenericFunc: func(_ context.Context, ev event.TypedGenericEvent[*corev1.Pod], q workqueue.TypedRateLimitingInterface[ctrl.Request]) {
enqueuePod(nil, ev.Object, q)
},
CreateFunc: func(_ context.Context, ev event.TypedCreateEvent[*corev1.Pod], q workqueue.TypedRateLimitingInterface[ctrl.Request]) {
enqueuePod(nil, ev.Object, q)
},
UpdateFunc: func(_ context.Context, ue event.TypedUpdateEvent[*corev1.Pod], q workqueue.TypedRateLimitingInterface[ctrl.Request]) {
enqueuePod(ue.ObjectOld, ue.ObjectNew, q)
},
DeleteFunc: func(_ context.Context, ev event.TypedDeleteEvent[*corev1.Pod], q workqueue.TypedRateLimitingInterface[ctrl.Request]) {
enqueuePod(nil, ev.Object, q)
},
}

bld.WatchesRawSource(
source.Kind(unmanagedPodCache, &corev1.Pod{},
handlerFuncs),
)

bld.WatchesRawSource(
source.Kind(podCache, &corev1.Pod{},
handler.TypedFuncs[*corev1.Pod, ctrl.Request]{
GenericFunc: func(_ context.Context, ev event.TypedGenericEvent[*corev1.Pod], q workqueue.TypedRateLimitingInterface[ctrl.Request]) {
enqueueNonVClusterPod(nil, ev.Object, q)
},
CreateFunc: func(_ context.Context, ev event.TypedCreateEvent[*corev1.Pod], q workqueue.TypedRateLimitingInterface[ctrl.Request]) {
enqueueNonVClusterPod(nil, ev.Object, q)
},
UpdateFunc: func(_ context.Context, ue event.TypedUpdateEvent[*corev1.Pod], q workqueue.TypedRateLimitingInterface[ctrl.Request]) {
enqueueNonVClusterPod(ue.ObjectOld, ue.ObjectNew, q)
},
DeleteFunc: func(_ context.Context, ev event.TypedDeleteEvent[*corev1.Pod], q workqueue.TypedRateLimitingInterface[ctrl.Request]) {
enqueueNonVClusterPod(nil, ev.Object, q)
},
}),
source.Kind(managedPodCache, &corev1.Pod{},
handlerFuncs),
)
}
return modifyController(ctx, s.nodeServiceProvider, bld)
}

// only used when scheduler is enabled
func enqueueNonVClusterPod(old, new client.Object, q workqueue.TypedRateLimitingInterface[ctrl.Request]) {
func enqueuePod(old, new client.Object, q workqueue.TypedRateLimitingInterface[ctrl.Request]) {
pod, ok := new.(*corev1.Pod)
if !ok {
klog.Errorf("invalid type passed to pod handler: %T", new)
Expand Down
Loading
Loading