Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions cmd/katalyst-agent/app/options/qrm/mb_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ type MBOptions struct {
CrossDomainGroups []string
ResetResctrlOnly bool
LocalIsVictimAndTotalIsAllRead bool
ExtraGroupPriorities map[string]int
}

func NewMBOptions() *MBOptions {
Expand Down Expand Up @@ -89,6 +90,8 @@ func (o *MBOptions) AddFlags(fss *cliflag.NamedFlagSets) {
o.ResetResctrlOnly, "not to run mb plugin really, and only reset to ensure resctrl FS in default status")
fs.BoolVar(&o.LocalIsVictimAndTotalIsAllRead, "mb-local-is-victim",
o.LocalIsVictimAndTotalIsAllRead, "turn resctrl local as victim")
fs.StringToIntVar(&o.ExtraGroupPriorities, "mb-extra-group-priorities",
o.ExtraGroupPriorities, "extra resctrl groups with priorities")
}

func (o *MBOptions) ApplyTo(conf *qrm.MBQRMPluginConfig) error {
Expand All @@ -103,5 +106,6 @@ func (o *MBOptions) ApplyTo(conf *qrm.MBQRMPluginConfig) error {
conf.DomainGroupAwareCapacityPCT = o.DomainGroupAwareCapacityPCT
conf.ResetResctrlOnly = o.ResetResctrlOnly
conf.LocalIsVictimAndTotalIsAllRead = o.LocalIsVictimAndTotalIsAllRead
conf.ExtraGroupPriorities = o.ExtraGroupPriorities
return nil
}
104 changes: 103 additions & 1 deletion pkg/agent/qrm-plugins/mb/advisor/advisor_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,21 @@ limitations under the License.
package advisor

import (
"errors"
"fmt"
"strings"

"golang.org/x/exp/maps"
"k8s.io/apimachinery/pkg/util/sets"

"github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/mb/advisor/priority"
"github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/mb/advisor/resource"
"github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/mb/monitor"
"github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/mb/plan"
)

const combinedGroupPrefix = "combined-"

// getMinEffectiveCapacity identifies the min dynamic capacity required by pre-defined groups,
// if the specific groups have active MB traffics
func getMinEffectiveCapacity(base int, groupCaps map[string]int, incomingStats monitor.GroupMBStats) int {
Expand Down Expand Up @@ -126,13 +130,111 @@ func getGroupIncomingInfo(capacity int, incomingStats monitor.GroupMBStats) *res
CapacityInMB: capacity,
}

result.GroupSorted = sortGroups(maps.Keys(incomingStats))
result.GroupSorted = priority.GetInstance().SortGroups(maps.Keys(incomingStats))
result.GroupTotalUses = getUsedTotalByGroup(incomingStats)
result.FreeInMB, result.GroupLimits = getLimitsByGroupSorted(capacity, result.GroupSorted, result.GroupTotalUses)
result.ResourceState = resource.GetResourceState(capacity, result.FreeInMB)
return result
}

// groupByWeight extracts the common logic of grouping by weight
func groupByWeight[T any](stats map[string]T) map[int][]string {
groups := make(map[int][]string, len(stats))
for group := range stats {
weight := priority.GetInstance().GetWeight(group)
groups[weight] = append(groups[weight], group)
}
return groups
}

// preProcessGroupInfo combines groups with same priority together
func preProcessGroupInfo(stats monitor.GroupMBStats) (monitor.GroupMBStats, domainGroupMapping, error) {
groups := groupByWeight(stats)

result := make(monitor.GroupMBStats)
groupInfos := domainGroupMapping{}

for weight, equivGroups := range groups {
if len(equivGroups) == 1 {
result[equivGroups[0]] = stats[equivGroups[0]]
continue
}

newKey := getCombinedGroupKey(weight)
groupInfo := combinedGroupMapping{}
combined := make(monitor.GroupMB)
maxMap := make(map[int]int)

// First pass: find max TotalMB for each CCD and build combined stats
for _, group := range equivGroups {
for id, stat := range stats[group] {
if stat.TotalMB > combined[id].TotalMB {
combined[id] = stat
maxMap[id] = stat.TotalMB
}
}
}

// Second pass: validate and build CCD sets for each group (only within equivGroups)
for _, group := range equivGroups {
ccdSet := ccdSet{}
for id, mbStat := range stats[group] {
// skip shared ccd with similar incoming data
if mbStat.TotalMB > maxMap[id]/2 && mbStat.TotalMB < maxMap[id] {
return nil, nil, errors.New("invalid incoming inputs")
}
if mbStat.TotalMB == maxMap[id] {
ccdSet[id] = struct{}{}
}
}
groupInfo[group] = ccdSet
}

result[newKey] = combined
groupInfos[newKey] = groupInfo
}

return result, groupInfos, nil
}

func preProcessGroupSumStat(sumStats map[string][]monitor.MBInfo) map[string][]monitor.MBInfo {
groups := groupByWeight(sumStats)

result := make(map[string][]monitor.MBInfo)

for weight, equivGroups := range groups {
if len(equivGroups) == 1 {
result[equivGroups[0]] = sumStats[equivGroups[0]]
continue
}

newKey := getCombinedGroupKey(weight)
// sumStats holds outgoing summary of each domain for each group, in other words,
// each slot of sumStats has uniform shape: slice with length of domain number
numDomains := len(sumStats[equivGroups[0]])
sumList := make([]monitor.MBInfo, numDomains)

for _, group := range equivGroups {
for id, stat := range sumStats[group] {
sumList[id].LocalMB += stat.LocalMB
sumList[id].RemoteMB += stat.RemoteMB
sumList[id].TotalMB += stat.TotalMB
}
}
result[newKey] = sumList
}

return result
}

func getCombinedGroupKey(weight int) string {
return fmt.Sprintf("%s%d", combinedGroupPrefix, weight)
}

func isCombinedGroup(groupName string) bool {
return strings.Contains(groupName, combinedGroupPrefix)
}

func getLimitsByGroupSorted(capacity int, groupSorting []sets.String, groupUsages map[string]int) (int, map[string]int) {
result := make(map[string]int)

Expand Down
141 changes: 141 additions & 0 deletions pkg/agent/qrm-plugins/mb/advisor/advisor_helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,16 @@ import (

"k8s.io/apimachinery/pkg/util/sets"

"github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/mb/advisor/priority"
"github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/mb/advisor/resource"
"github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/mb/monitor"
"github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/mb/plan"
)

func init() {
priority.GetInstance().AddWeight("machine", 9_000)
}

func Test_maskPlanWithNoThrottles(t *testing.T) {
t.Parallel()
type args struct {
Expand Down Expand Up @@ -179,3 +185,138 @@ func Test_getDomainTotalMBs(t *testing.T) {
})
}
}

func Test_preProcessGroupInfo(t *testing.T) {
t.Parallel()
tests := []struct {
name string
stats monitor.GroupMBStats
wantResult monitor.GroupMBStats
wantGroupInfos domainGroupMapping
wantErr bool
}{
{
name: "two groups with different weights - no combination",
stats: monitor.GroupMBStats{
"dedicated": {
0: {LocalMB: 5_000, RemoteMB: 3_000, TotalMB: 8_000},
},
"share-50": {
1: {LocalMB: 4_000, RemoteMB: 2_000, TotalMB: 6_000},
},
},
wantResult: monitor.GroupMBStats{
"dedicated": {
0: {LocalMB: 5_000, RemoteMB: 3_000, TotalMB: 8_000},
},
"share-50": {
1: {LocalMB: 4_000, RemoteMB: 2_000, TotalMB: 6_000},
},
},
wantGroupInfos: domainGroupMapping{},
wantErr: false,
},
{
name: "two groups with same weight - combine into one",
stats: monitor.GroupMBStats{
"dedicated": {
0: {LocalMB: 10_000, RemoteMB: 5_000, TotalMB: 15_000},
},
"machine": {
1: {LocalMB: 8_000, RemoteMB: 4_000, TotalMB: 12_000},
},
},
wantResult: monitor.GroupMBStats{
"combined-9000": {
0: {LocalMB: 10_000, RemoteMB: 5_000, TotalMB: 15_000},
1: {LocalMB: 8_000, RemoteMB: 4_000, TotalMB: 12_000},
},
},
wantGroupInfos: domainGroupMapping{
"combined-9000": {
"dedicated": {0: struct{}{}},
"machine": {1: struct{}{}},
},
},
wantErr: false,
},
}
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
gotResult, gotGroupInfos, err := preProcessGroupInfo(tt.stats)
if (err != nil) != tt.wantErr {
t.Errorf("preProcessGroupInfo() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !reflect.DeepEqual(gotResult, tt.wantResult) {
t.Errorf("preProcessGroupInfo() gotResult = %v, want %v", gotResult, tt.wantResult)
}
if !reflect.DeepEqual(gotGroupInfos, tt.wantGroupInfos) {
t.Errorf("preProcessGroupInfo() gotGroupInfos = %v, want %v", gotGroupInfos, tt.wantGroupInfos)
}
})
}
}

func Test_preProcessGroupSumStat(t *testing.T) {
t.Parallel()
tests := []struct {
name string
sumStats map[string][]monitor.MBInfo
want map[string][]monitor.MBInfo
}{
{
name: "two groups with different weights - no combination",
sumStats: map[string][]monitor.MBInfo{
"dedicated": {
{LocalMB: 5_000, RemoteMB: 3_000, TotalMB: 8_000},
{LocalMB: 4_000, RemoteMB: 2_000, TotalMB: 6_000},
},
"share-50": {
{LocalMB: 3_000, RemoteMB: 1_000, TotalMB: 4_000},
{LocalMB: 2_000, RemoteMB: 1_000, TotalMB: 3_000},
},
},
want: map[string][]monitor.MBInfo{
"dedicated": {
{LocalMB: 5_000, RemoteMB: 3_000, TotalMB: 8_000},
{LocalMB: 4_000, RemoteMB: 2_000, TotalMB: 6_000},
},
"share-50": {
{LocalMB: 3_000, RemoteMB: 1_000, TotalMB: 4_000},
{LocalMB: 2_000, RemoteMB: 1_000, TotalMB: 3_000},
},
},
},
{
name: "two groups with same weight - combine and sum",
sumStats: map[string][]monitor.MBInfo{
"dedicated": {
{LocalMB: 5_000, RemoteMB: 3_000, TotalMB: 8_000},
{LocalMB: 4_000, RemoteMB: 2_000, TotalMB: 6_000},
},
"machine": {
{LocalMB: 3_000, RemoteMB: 1_000, TotalMB: 4_000},
{LocalMB: 2_000, RemoteMB: 1_000, TotalMB: 3_000},
},
},
want: map[string][]monitor.MBInfo{
"combined-9000": {
{LocalMB: 8_000, RemoteMB: 4_000, TotalMB: 12_000},
{LocalMB: 6_000, RemoteMB: 3_000, TotalMB: 9_000},
},
},
},
}
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
if got := preProcessGroupSumStat(tt.sumStats); !reflect.DeepEqual(got, tt.want) {
t.Errorf("preProcessGroupSumStat() = %v, want %v", got, tt.want)
}
})
}
}
Loading
Loading