Skip to content

Commit adacc8c

Browse files
authored
Merge pull request #870 from h-w-chen/dev/pap-cpufreq-as-dvfs-limiter
enhance(advisor): power_aware advisor - cpufreq as dvfs limiter
2 parents d3ce152 + 2531a44 commit adacc8c

31 files changed

+1365
-111
lines changed

cmd/katalyst-agent/app/options/metaserver/metric.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,11 @@ func NewMetricFetcherOptions() *MetricFetcherOptions {
5555
MetricInsurancePeriod: defaultMetricInsurancePeriod,
5656
MetricProvisions: []string{metaserver.MetricProvisionerMalachite, metaserver.MetricProvisionerKubelet},
5757

58-
DefaultInterval: time.Second * 5,
59-
ProvisionerIntervalSecs: map[string]int{metaserver.MetricProvisionerMalachiteRealtime: 1},
58+
DefaultInterval: time.Second * 5,
59+
ProvisionerIntervalSecs: map[string]int{
60+
metaserver.MetricProvisionerMalachiteRealtime: 1,
61+
metaserver.MetricProvisionerMalachiteRealtimeFreq: 1,
62+
},
6063

6164
MalachiteOptions: &MalachiteOptions{},
6265
CgroupOptions: &CgroupOptions{},

cmd/katalyst-agent/app/options/sysadvisor/poweraware/power_aware_plugin.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ type PowerAwarePluginOptions struct {
2828
DisablePowerPressureEvict bool
2929
PowerCappingAdvisorSocketAbsPath string
3030
AnnotationKeyPrefix string
31+
DVFSIndication string
3132
}
3233

3334
func (p *PowerAwarePluginOptions) AddFlags(fss *cliflag.NamedFlagSets) {
@@ -37,6 +38,7 @@ func (p *PowerAwarePluginOptions) AddFlags(fss *cliflag.NamedFlagSets) {
3738
fs.BoolVar(&p.DisablePowerCapping, "power-capping-Disabled", p.DisablePowerCapping, "flag for power aware plugin disabling power capping")
3839
fs.StringVar(&p.PowerCappingAdvisorSocketAbsPath, "power-capping-advisor-sock-abs-path", p.PowerCappingAdvisorSocketAbsPath, "absolute path of unix socket file for power capping advisor served in sys-advisor")
3940
fs.StringVar(&p.AnnotationKeyPrefix, "power-aware-annotation-key-prefix", p.AnnotationKeyPrefix, "prefix of node annotation keys used by power aware plugin")
41+
fs.StringVar(&p.DVFSIndication, "power-aware-dvfs-indication", p.DVFSIndication, "indication metric name of dvfs effect")
4042
}
4143

4244
func (p *PowerAwarePluginOptions) ApplyTo(o *poweraware.PowerAwarePluginConfiguration) error {
@@ -45,11 +47,14 @@ func (p *PowerAwarePluginOptions) ApplyTo(o *poweraware.PowerAwarePluginConfigur
4547
o.DisablePowerCapping = p.DisablePowerCapping
4648
o.PowerCappingAdvisorSocketAbsPath = p.PowerCappingAdvisorSocketAbsPath
4749
o.AnnotationKeyPrefix = p.AnnotationKeyPrefix
50+
o.DVFSIndication = p.DVFSIndication
4851

4952
return nil
5053
}
5154

5255
// NewPowerAwarePluginOptions creates a new Options with a default config.
5356
func NewPowerAwarePluginOptions() *PowerAwarePluginOptions {
54-
return &PowerAwarePluginOptions{}
57+
return &PowerAwarePluginOptions{
58+
DVFSIndication: poweraware.DVFSIndicationPower,
59+
}
5560
}
Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
/*
2+
Copyright 2022 The Katalyst Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package assess
18+
19+
import (
20+
"context"
21+
"fmt"
22+
23+
"github.com/pkg/errors"
24+
25+
"github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/plugin/poweraware/reader"
26+
"github.com/kubewharf/katalyst-core/pkg/util/general"
27+
)
28+
29+
const (
30+
// minKHZ is the minimum khz for a valid cpu freq
31+
minKHZ = 1000_000
32+
33+
// alpha is the decaying coefficient of Exponential Moving Average formula
34+
alpha = 0.4
35+
)
36+
37+
type cpuFreqChangeAssessor struct {
38+
// highFreqKHZ may update itself to higher value observed
39+
// todo: consider to get fixed base cpu freq value instead of this self-updated one
40+
highFreqKHZ int
41+
effectKeeper effectKeeper
42+
43+
cpuFreqReader reader.MetricReader
44+
}
45+
46+
type effectKeeper struct {
47+
effectiveValue int
48+
}
49+
50+
func (h *effectKeeper) Update(curr int) int {
51+
// not to smooth out incoming maxima yet; should it be spiky, it shall be averaged out eventually
52+
if curr > h.effectiveValue {
53+
h.effectiveValue = curr
54+
return curr
55+
}
56+
57+
h.effectiveValue = int(ema(float64(h.effectiveValue), float64(curr), alpha))
58+
return h.effectiveValue
59+
}
60+
61+
// ema implements Sliding-Window Exponential Moving Average algorithm
62+
func ema(meanAverage, value, alpha float64) float64 {
63+
if meanAverage <= 0 {
64+
return value
65+
}
66+
67+
return meanAverage*(1-alpha) + value*alpha
68+
}
69+
70+
func (h *effectKeeper) Clear() {
71+
h.effectiveValue = 0
72+
}
73+
74+
func (c *cpuFreqChangeAssessor) IsInitialized() bool {
75+
return c.highFreqKHZ >= minKHZ
76+
}
77+
78+
func (c *cpuFreqChangeAssessor) Init() error {
79+
freq, err := c.cpuFreqReader.Get(context.Background())
80+
if err != nil {
81+
return errors.Wrap(err, "failed to get initial cpu freq value")
82+
}
83+
84+
general.Infof("pap: cpufreq set initial value %d khz", freq)
85+
c.highFreqKHZ = freq
86+
return nil
87+
}
88+
89+
func (c *cpuFreqChangeAssessor) Clear() {
90+
c.effectKeeper.Clear()
91+
}
92+
93+
func (c *cpuFreqChangeAssessor) AssessEffect(_ int, _, _ bool) (int, error) {
94+
// always check cpu freq to assess the effect
95+
currentFreq, err := c.cpuFreqReader.Get(context.Background())
96+
if err != nil {
97+
return 0, errors.Wrap(err, "failed to fetch latest cpu freq to access dvfs effect")
98+
}
99+
100+
if currentFreq > c.highFreqKHZ {
101+
c.highFreqKHZ = currentFreq
102+
c.effectKeeper.Clear()
103+
}
104+
105+
return c.assessEffectByFreq(currentFreq)
106+
}
107+
108+
func (c *cpuFreqChangeAssessor) assessEffectByFreq(currentFreq int) (int, error) {
109+
general.InfofV(6, "pap: cpuFreqChangeAssessor assessEffectByFreq: curr %d, base %d", currentFreq, c.highFreqKHZ)
110+
if currentFreq < minKHZ {
111+
return 0, fmt.Errorf("invalid currentFreq frequency %d khz", currentFreq)
112+
}
113+
114+
if currentFreq >= c.highFreqKHZ {
115+
return 0, nil
116+
}
117+
118+
instantEffect := 100 - currentFreq*100/c.highFreqKHZ
119+
return c.effectKeeper.Update(instantEffect), nil
120+
}
121+
122+
func (c *cpuFreqChangeAssessor) Update(_ int) {
123+
// no need to keep track of the cpu freq change as the initial cpu freq is always the baseline
124+
}
125+
126+
func (c *cpuFreqChangeAssessor) AssessTarget(actualWatt, desiredWatt int, maxDecreasePercent int) int {
127+
// keep as is if there is no room to decrease
128+
if maxDecreasePercent <= 0 {
129+
return actualWatt
130+
}
131+
132+
// when there is decrease room for cpu frequency, lower the power in smaller portion to avoid misstep
133+
lowerLimit := (100 - maxDecreasePercent/2) * actualWatt / 100
134+
if lowerLimit > desiredWatt {
135+
return lowerLimit
136+
}
137+
138+
return desiredWatt
139+
}
140+
141+
func NewCPUFreqChangeAssessor(initKHZ int, nodeMetricGetter reader.NodeMetricGetter) Assessor {
142+
return &cpuFreqChangeAssessor{
143+
highFreqKHZ: initKHZ,
144+
effectKeeper: effectKeeper{},
145+
cpuFreqReader: reader.NewCPUFreqReader(nodeMetricGetter),
146+
}
147+
}
Lines changed: 181 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,181 @@
1+
/*
2+
Copyright 2022 The Katalyst Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package assess
18+
19+
import "testing"
20+
21+
func Test_cpuFreqChangeAssessor_AccumulateEffect(t *testing.T) {
22+
t.Parallel()
23+
type fields struct {
24+
initFreqKHZ int
25+
}
26+
type args struct {
27+
current int
28+
}
29+
tests := []struct {
30+
name string
31+
fields fields
32+
args args
33+
want int
34+
wantErr bool
35+
}{
36+
{
37+
name: "happy path",
38+
fields: fields{
39+
initFreqKHZ: 2500_000,
40+
},
41+
args: args{
42+
current: 2000_000,
43+
},
44+
want: 20,
45+
wantErr: false,
46+
},
47+
{
48+
name: "negative low current freq",
49+
fields: fields{
50+
initFreqKHZ: 2500_000,
51+
},
52+
args: args{
53+
current: 777_000,
54+
},
55+
want: 0,
56+
wantErr: true,
57+
},
58+
{
59+
name: "fine for higher freq no change as temporary spike",
60+
fields: fields{
61+
initFreqKHZ: 2500_000,
62+
},
63+
args: args{
64+
current: 2600_000,
65+
},
66+
want: 0,
67+
wantErr: false,
68+
},
69+
}
70+
for _, tt := range tests {
71+
tt := tt
72+
t.Run(tt.name, func(t *testing.T) {
73+
t.Parallel()
74+
c := &cpuFreqChangeAssessor{
75+
highFreqKHZ: tt.fields.initFreqKHZ,
76+
}
77+
got, err := c.assessEffectByFreq(tt.args.current)
78+
if (err != nil) != tt.wantErr {
79+
t.Errorf("AssessEffect() error = %v, wantErr %v", err, tt.wantErr)
80+
return
81+
}
82+
if got != tt.want {
83+
t.Errorf("AssessEffect() got = %v, want %v", got, tt.want)
84+
}
85+
})
86+
}
87+
}
88+
89+
func Test_cpuFreqChangeAssessor_AssessTarget(t *testing.T) {
90+
t.Parallel()
91+
type fields struct {
92+
initFreqMhz int
93+
}
94+
type args struct {
95+
actualWatt int
96+
desiredWatt int
97+
maxDecreasePercent int
98+
}
99+
tests := []struct {
100+
name string
101+
fields fields
102+
args args
103+
want int
104+
}{
105+
{
106+
name: "happy path",
107+
fields: fields{
108+
initFreqMhz: 2500,
109+
},
110+
args: args{
111+
actualWatt: 100,
112+
desiredWatt: 80,
113+
maxDecreasePercent: 10,
114+
},
115+
want: 95,
116+
},
117+
{
118+
name: "happy path",
119+
fields: fields{
120+
initFreqMhz: 2500,
121+
},
122+
args: args{
123+
actualWatt: 100,
124+
desiredWatt: 80,
125+
maxDecreasePercent: 0,
126+
},
127+
want: 100,
128+
},
129+
}
130+
for _, tt := range tests {
131+
tt := tt
132+
t.Run(tt.name, func(t *testing.T) {
133+
t.Parallel()
134+
c := &cpuFreqChangeAssessor{
135+
highFreqKHZ: tt.fields.initFreqMhz,
136+
}
137+
if got := c.AssessTarget(tt.args.actualWatt, tt.args.desiredWatt, tt.args.maxDecreasePercent); got != tt.want {
138+
t.Errorf("AssessTarget() = %v, want %v", got, tt.want)
139+
}
140+
})
141+
}
142+
}
143+
144+
func Test_effectKeeper_Update(t *testing.T) {
145+
t.Parallel()
146+
type fields struct {
147+
value int
148+
}
149+
type args struct {
150+
curr int
151+
}
152+
tests := []struct {
153+
name string
154+
fields fields
155+
args args
156+
want int
157+
}{
158+
{
159+
name: "happy path to smooth out old spike",
160+
fields: fields{
161+
value: 100,
162+
},
163+
args: args{
164+
curr: 80,
165+
},
166+
want: 92,
167+
},
168+
}
169+
for _, tt := range tests {
170+
tt := tt
171+
t.Run(tt.name, func(t *testing.T) {
172+
t.Parallel()
173+
h := &effectKeeper{
174+
effectiveValue: tt.fields.value,
175+
}
176+
if got := h.Update(tt.args.curr); got != tt.want {
177+
t.Errorf("Update() = %v, want %v", got, tt.want)
178+
}
179+
})
180+
}
181+
}

0 commit comments

Comments
 (0)