Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ const (
ControlKnowKeyMemoryOffloading MemoryControlKnobName = "memory_offloading"
ControlKnowKeyDyingMemcgReclaim MemoryControlKnobName = "dying_memcg_reclaim"
ControlKnobKeyMemoryNUMAHeadroom MemoryControlKnobName = "memory_numa_headroom"
ControlKnobKeyMemoryHigh MemoryControlKnobName = "memory_high"
)

const (
Expand Down
2 changes: 2 additions & 0 deletions pkg/agent/qrm-plugins/memory/dynamicpolicy/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,8 @@ func NewDynamicPolicy(agentCtx *agent.GenericContext, conf *config.Configuration

memoryadvisor.RegisterControlKnobHandler(memoryadvisor.ControlKnobKeyMemoryLimitInBytes,
memoryadvisor.ControlKnobHandlerWithChecker(policyImplement.handleAdvisorMemoryLimitInBytes))
memoryadvisor.RegisterControlKnobHandler(memoryadvisor.ControlKnobKeyMemoryHigh,
memoryadvisor.ControlKnobHandlerWithChecker(policyImplement.handleAdvisorMemoryHigh))
memoryadvisor.RegisterControlKnobHandler(memoryadvisor.ControlKnobKeyCPUSetMems,
memoryadvisor.ControlKnobHandlerWithChecker(policyImplement.handleAdvisorCPUSetMems))
memoryadvisor.RegisterControlKnobHandler(memoryadvisor.ControlKnobKeyDropCache,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"strconv"
"time"

"github.com/opencontainers/runc/libcontainer/cgroups"
"github.com/samber/lo"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
Expand Down Expand Up @@ -392,6 +393,43 @@ func (p *DynamicPolicy) handleAdvisorResp(advisorResp *advisorsvc.ListAndWatchRe
return nil
}

func (p *DynamicPolicy) handleAdvisorMemoryHigh(
_ *config.Configuration,
_ interface{},
_ *dynamicconfig.DynamicAgentConfiguration,
emitter metrics.MetricEmitter,
metaServer *metaserver.MetaServer,
entryName, subEntryName string,
calculationInfo *advisorsvc.CalculationInfo, podResourceEntries state.PodResourceEntries,
) error {
memoryHighStr := calculationInfo.CalculationResult.Values[string(memoryadvisor.ControlKnobKeyMemoryHigh)]
memoryHigh, err := strconv.ParseInt(memoryHighStr, 10, 64)
if err != nil {
return fmt.Errorf("parse %s: %s failed with error: %v", memoryadvisor.ControlKnobKeyMemoryHigh, memoryHighStr, err)
}

if !cgroups.IsCgroup2UnifiedMode() {
general.Infof("memory.high is not supported in cgroupv1 mode")
return nil
}

if calculationInfo.CgroupPath != "" {
if err = cgroupmgr.ApplyMemoryWithRelativePath(calculationInfo.CgroupPath, &common.MemoryData{
HighInBytes: memoryHigh,
}); err != nil {
return fmt.Errorf("apply memory.high failed with error: %v", err)
}

_ = emitter.StoreInt64(util.MetricNameMemoryHandleAdvisorMemoryHigh, memoryHigh,
metrics.MetricTypeNameRaw, metrics.ConvertMapToTags(map[string]string{
"cgroupPath": calculationInfo.CgroupPath,
})...)
return nil
}

return nil
}

func (p *DynamicPolicy) handleAdvisorMemoryLimitInBytes(
_ *config.Configuration,
_ interface{},
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
//go:build linux
// +build linux

/*
Copyright 2022 The Katalyst Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package dynamicpolicy

import (
"sync"
"testing"

"github.com/bytedance/mockey"
"github.com/opencontainers/runc/libcontainer/cgroups"
"github.com/stretchr/testify/assert"

"github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/advisorsvc"
"github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/memory/dynamicpolicy/memoryadvisor"
"github.com/kubewharf/katalyst-core/pkg/metrics"
common "github.com/kubewharf/katalyst-core/pkg/util/cgroup/common"
cgroupmgr "github.com/kubewharf/katalyst-core/pkg/util/cgroup/manager"
)

var memoryHighTestMutex sync.Mutex

func TestDynamicPolicy_handleAdvisorMemoryHigh(t *testing.T) {
t.Parallel()

t.Run("invalid memory high value", func(t *testing.T) {
t.Parallel()
memoryHighTestMutex.Lock()
defer memoryHighTestMutex.Unlock()

p := &DynamicPolicy{}
calculationInfo := &advisorsvc.CalculationInfo{
CalculationResult: &advisorsvc.CalculationResult{
Values: map[string]string{
string(memoryadvisor.ControlKnobKeyMemoryHigh): "invalid",
},
},
}

err := p.handleAdvisorMemoryHigh(nil, nil, nil, metrics.DummyMetrics{}, nil, "", "", calculationInfo, nil)
assert.Error(t, err)
})

t.Run("cgroupv1 mode", func(t *testing.T) {
t.Parallel()
memoryHighTestMutex.Lock()
defer memoryHighTestMutex.Unlock()
defer mockey.UnPatchAll()

mockey.Mock(cgroups.IsCgroup2UnifiedMode).IncludeCurrentGoRoutine().To(func() bool {
return false
}).Build()

p := &DynamicPolicy{}
calculationInfo := &advisorsvc.CalculationInfo{
CgroupPath: "/kubepods/besteffort",
CalculationResult: &advisorsvc.CalculationResult{
Values: map[string]string{
string(memoryadvisor.ControlKnobKeyMemoryHigh): "228000000000",
},
},
}

err := p.handleAdvisorMemoryHigh(nil, nil, nil, metrics.DummyMetrics{}, nil, "", "", calculationInfo, nil)
assert.NoError(t, err)
})

t.Run("empty cgroup path", func(t *testing.T) {
t.Parallel()
memoryHighTestMutex.Lock()
defer memoryHighTestMutex.Unlock()
defer mockey.UnPatchAll()

mockey.Mock(cgroups.IsCgroup2UnifiedMode).IncludeCurrentGoRoutine().To(func() bool {
return true
}).Build()

p := &DynamicPolicy{}
calculationInfo := &advisorsvc.CalculationInfo{
CgroupPath: "",
CalculationResult: &advisorsvc.CalculationResult{
Values: map[string]string{
string(memoryadvisor.ControlKnobKeyMemoryHigh): "228000000000",
},
},
}

err := p.handleAdvisorMemoryHigh(nil, nil, nil, metrics.DummyMetrics{}, nil, "", "", calculationInfo, nil)
assert.NoError(t, err)
})

t.Run("success apply memory high", func(t *testing.T) {
t.Parallel()
memoryHighTestMutex.Lock()
defer memoryHighTestMutex.Unlock()
defer mockey.UnPatchAll()

var capturedCgroupPath string
var capturedMemoryData *common.MemoryData
applyMemoryCalled := false

mockey.Mock(cgroups.IsCgroup2UnifiedMode).IncludeCurrentGoRoutine().To(func() bool {
return true
}).Build()
mockey.Mock(cgroupmgr.ApplyMemoryWithRelativePath).IncludeCurrentGoRoutine().To(func(cgroupPath string, memoryData *common.MemoryData) error {
capturedCgroupPath = cgroupPath
capturedMemoryData = memoryData
applyMemoryCalled = true
return nil
}).Build()

p := &DynamicPolicy{}
calculationInfo := &advisorsvc.CalculationInfo{
CgroupPath: "/kubepods/besteffort",
CalculationResult: &advisorsvc.CalculationResult{
Values: map[string]string{
string(memoryadvisor.ControlKnobKeyMemoryHigh): "228000000000",
},
},
}

err := p.handleAdvisorMemoryHigh(nil, nil, nil, metrics.DummyMetrics{}, nil, "", "", calculationInfo, nil)
assert.NoError(t, err)
assert.True(t, applyMemoryCalled)
assert.Equal(t, "/kubepods/besteffort", capturedCgroupPath)
assert.Equal(t, int64(228000000000), capturedMemoryData.HighInBytes)
})
}
1 change: 1 addition & 0 deletions pkg/agent/qrm-plugins/util/consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ const (
MetricNameMemoryHandleAdvisorContainerEntryFailed = "memory_handle_advisor_container_entry_failed"
MetricNameMemoryHandleAdvisorExtraEntryFailed = "memory_handle_advisor_extra_entry_failed"
MetricNameMemoryHandleAdvisorMemoryLimit = "memory_handle_advisor_memory_limit"
MetricNameMemoryHandleAdvisorMemoryHigh = "memory_handle_advisor_memory_high"
MetricNameMemoryHandleAdvisorDropCache = "memory_handle_advisor_drop_cache"
MetricNameMemoryHandleAdvisorCPUSetMems = "memory_handle_advisor_cpuset_mems"
MetricNameMemoryHandlerAdvisorMemoryOffload = "memory_handler_advisor_memory_offloading"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1264,7 +1264,10 @@ func TestUpdate(t *testing.T) {
ExtraEntries: []types.ExtraMemoryAdvices{
{
CgroupPath: "/kubepods/besteffort",
Values: map[string]string{string(memoryadvisor.ControlKnobKeyMemoryLimitInBytes): strconv.Itoa(240 << 30)},
Values: map[string]string{
string(memoryadvisor.ControlKnobKeyMemoryLimitInBytes): strconv.Itoa(240 << 30),
string(memoryadvisor.ControlKnobKeyMemoryHigh): strconv.FormatInt(int64(0.95*float64(240<<30)), 10),
},
},
},
},
Expand Down Expand Up @@ -1318,7 +1321,10 @@ func TestUpdate(t *testing.T) {
ExtraEntries: []types.ExtraMemoryAdvices{
{
CgroupPath: "/kubepods/besteffort",
Values: map[string]string{string(memoryadvisor.ControlKnobKeyMemoryLimitInBytes): strconv.Itoa(184 << 30)},
Values: map[string]string{
string(memoryadvisor.ControlKnobKeyMemoryLimitInBytes): strconv.Itoa(184 << 30),
string(memoryadvisor.ControlKnobKeyMemoryHigh): strconv.FormatInt(187690070835, 10),
},
},
},
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ const (
reclaimMemoryUnlimited = -1

defaultProcZoneInfoFile = "/proc/zoneinfo"

// Factor for calculating memory.high based on memory.max, available for cgroup v2 only
memoryHighScaleFactor = 0.95
)

type memoryGuard struct {
Expand Down Expand Up @@ -110,11 +113,19 @@ func (mg *memoryGuard) GetAdvices() types.InternalMemoryCalculationResult {
general.Errorf("failed to get last reconcile result")
return types.InternalMemoryCalculationResult{}
}
memoryMax := mg.reclaimMemoryLimit.Load()
memoryHigh := int64(float64(memoryMax) * memoryHighScaleFactor)
if memoryMax == reclaimMemoryUnlimited {
memoryHigh = reclaimMemoryUnlimited
}
result := types.InternalMemoryCalculationResult{
ExtraEntries: []types.ExtraMemoryAdvices{
{
CgroupPath: mg.reclaimRelativeRootCgroupPath,
Values: map[string]string{string(memoryadvisor.ControlKnobKeyMemoryLimitInBytes): strconv.FormatInt(mg.reclaimMemoryLimit.Load(), 10)},
Values: map[string]string{
string(memoryadvisor.ControlKnobKeyMemoryLimitInBytes): strconv.FormatInt(memoryMax, 10),
string(memoryadvisor.ControlKnobKeyMemoryHigh): strconv.FormatInt(memoryHigh, 10),
},
},
},
}
Expand All @@ -127,9 +138,17 @@ func (mg *memoryGuard) GetAdvices() types.InternalMemoryCalculationResult {
continue
}

numaMemoryMax := numaBindingReclaimMemoryLimit[numaID]
numaMemoryHigh := int64(float64(numaMemoryMax) * memoryHighScaleFactor)
if numaMemoryMax == reclaimMemoryUnlimited {
numaMemoryHigh = reclaimMemoryUnlimited
}
result.ExtraEntries = append(result.ExtraEntries, types.ExtraMemoryAdvices{
CgroupPath: cgroupPath,
Values: map[string]string{string(memoryadvisor.ControlKnobKeyMemoryLimitInBytes): strconv.FormatInt(numaBindingReclaimMemoryLimit[numaID], 10)},
Values: map[string]string{
string(memoryadvisor.ControlKnobKeyMemoryLimitInBytes): strconv.FormatInt(numaMemoryMax, 10),
string(memoryadvisor.ControlKnobKeyMemoryHigh): strconv.FormatInt(numaMemoryHigh, 10),
},
})
}
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/util/cgroup/common/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,9 @@ type MemoryData struct {
// MinInBytes for memory.min
// cgroup memory that can never be reclaimed by kswapd.
MinInBytes int64
WmarkRatio int32
// HighInBytes for memory.high
HighInBytes int64
WmarkRatio int32
// SwapMaxInBytes < 0 means disable cgroup-level swap
SwapMaxInBytes int64
}
Expand Down
12 changes: 12 additions & 0 deletions pkg/util/cgroup/manager/v2/fs_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,18 @@ func (m *manager) ApplyMemory(absCgroupPath string, data *common.MemoryData) err
}
}

if data.HighInBytes != 0 {
memoryHighValue := "max"
if data.HighInBytes > 0 {
memoryHighValue = numToStr(data.HighInBytes)
}
if err, applied, oldData := common.InstrumentedWriteFileIfChange(absCgroupPath, "memory.high", memoryHighValue); err != nil {
return err
} else if applied {
klog.Infof("[CgroupV2] apply memory high successfully, cgroupPath: %s, data: %v, old data: %v\n", absCgroupPath, memoryHighValue, oldData)
}
}

if data.WmarkRatio != 0 {
newRatio := fmt.Sprintf("%d", data.WmarkRatio)
if err, applied, oldData := common.InstrumentedWriteFileIfChange(absCgroupPath, "memory.wmark_ratio", newRatio); err != nil {
Expand Down
Loading