Skip to content

Commit 2975586

Browse files
authored
Merge pull request #1006 from syc4704413/dev/syc/borwein-model-input
feat(sys-advisor): get borwein model input and store it in cache
2 parents 156b470 + 8e8bc8c commit 2975586

File tree

23 files changed

+1348
-296
lines changed

23 files changed

+1348
-296
lines changed

cmd/katalyst-agent/app/options/dynamic/adminqos/eviction/numa_cpu_pressure_eviction.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ func NewNumaCPUPressureEvictionOptions() NumaCPUPressureEvictionOptions {
4141
EnableEviction: false,
4242
ThresholdMetPercentage: 0.7,
4343
MetricRingSize: 4,
44-
GracePeriod: 60,
44+
GracePeriod: 0,
4545
ThresholdExpandFactor: 1.1,
4646
CandidateCount: 2,
4747
WorkloadMetricsLabelKeys: []string{},

pkg/agent/sysadvisor/metacache/metacache.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,8 @@ type MetaReader interface {
8282
GetFilteredInferenceResult(filterFunc func(result interface{}) (interface{}, error), modelName string) (interface{}, error)
8383
// GetInferenceResult gets specified model inference result
8484
GetInferenceResult(modelName string) (interface{}, error)
85+
// GetModelInput gets model input, the dimension of model input is : "container", "numa", "node"
86+
GetModelInput(metricDimension string) (metric map[string]interface{}, err error)
8587

8688
// GetSupportedWantedFeatureGates gets supported and wanted FeatureGates
8789
GetSupportedWantedFeatureGates() (map[string]*advisorsvc.FeatureGate, error)
@@ -128,6 +130,9 @@ type MetaWriter interface {
128130
// SetInferenceResult sets specified model inference result
129131
SetInferenceResult(modelName string, result interface{}) error
130132

133+
// SetModelInput sets model input, the dimension of model input is : "container", "numa", "node"
134+
SetModelInput(metricDimension string, metric map[string]interface{}) error
135+
131136
// SetSupportedWantedFeatureGates sets supported and wanted FeatureGates
132137
SetSupportedWantedFeatureGates(featureGates map[string]*advisorsvc.FeatureGate) error
133138
sync.Locker
@@ -169,6 +174,9 @@ type MetaCacheImp struct {
169174
modelToResult map[string]interface{}
170175
modelMutex sync.RWMutex
171176

177+
modelInput map[string]map[string]interface{}
178+
modelInputMutex sync.RWMutex
179+
172180
featureGates map[string]*advisorsvc.FeatureGate
173181
featureGatesMutex sync.RWMutex
174182

@@ -206,6 +214,7 @@ func NewMetaCacheImp(conf *config.Configuration, emitterPool metricspool.Metrics
206214
checkpointName: stateFileName,
207215
emitter: emitter,
208216
modelToResult: make(map[string]interface{}),
217+
modelInput: make(map[string]map[string]interface{}),
209218
featureGates: make(map[string]*advisorsvc.FeatureGate),
210219
containerCreateTimestamp: make(map[string]int64),
211220
}
@@ -319,6 +328,17 @@ func (mc *MetaCacheImp) GetInferenceResult(modelName string) (interface{}, error
319328
return mc.GetFilteredInferenceResult(nil, modelName)
320329
}
321330

331+
// GetModelInput gets model input, the dimension of model input is : "container", "numa", "node"
332+
func (mc *MetaCacheImp) GetModelInput(metricDimension string) (map[string]interface{}, error) {
333+
mc.modelInputMutex.RLock()
334+
defer mc.modelInputMutex.RUnlock()
335+
336+
if mc.modelInput[metricDimension] == nil {
337+
return nil, fmt.Errorf("model input for dimension: %s doesn't exist", metricDimension)
338+
}
339+
return mc.modelInput[metricDimension], nil
340+
}
341+
322342
// GetSupportedWantedFeatureGates gets supported and wanted FeatureGates
323343
func (mc *MetaCacheImp) GetSupportedWantedFeatureGates() (map[string]*advisorsvc.FeatureGate, error) {
324344
mc.featureGatesMutex.RLock()
@@ -578,6 +598,18 @@ func (mc *MetaCacheImp) SetInferenceResult(modelName string, result interface{})
578598
return nil
579599
}
580600

601+
// SetModelInput sets model input
602+
func (mc *MetaCacheImp) SetModelInput(metricDimension string, metric map[string]interface{}) error {
603+
mc.modelInputMutex.Lock()
604+
defer mc.modelInputMutex.Unlock()
605+
606+
if metric == nil {
607+
return fmt.Errorf("nil model input")
608+
}
609+
mc.modelInput[metricDimension] = metric
610+
return nil
611+
}
612+
581613
// SetSupportedWantedFeatureGates sets supported and wanted FeatureGates
582614
func (mc *MetaCacheImp) SetSupportedWantedFeatureGates(featureGates map[string]*advisorsvc.FeatureGate) error {
583615
mc.featureGatesMutex.Lock()
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
/*
2+
Copyright 2022 The Katalyst Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package consts
18+
19+
const (
20+
MetricDimensionNode = "node"
21+
MetricDimensionNuma = "numa"
22+
MetricDimensionContainer = "container"
23+
)

pkg/agent/sysadvisor/plugin/inference/inference.go

Lines changed: 48 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,10 @@ import (
2626

2727
"github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/metacache"
2828
"github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/plugin"
29+
"github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/plugin/inference/modelinputfetcher"
30+
genericinput "github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/plugin/inference/modelinputfetcher/generic"
2931
"github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/plugin/inference/modelresultfetcher"
30-
borweinfetcher "github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/plugin/inference/modelresultfetcher/borwein"
32+
borweinresult "github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/plugin/inference/modelresultfetcher/borwein"
3133
"github.com/kubewharf/katalyst-core/pkg/config"
3234
metricemitter "github.com/kubewharf/katalyst-core/pkg/config/agent/sysadvisor/metric-emitter"
3335
"github.com/kubewharf/katalyst-core/pkg/config/generic"
@@ -38,15 +40,19 @@ import (
3840
)
3941

4042
func init() {
41-
modelresultfetcher.RegisterModelResultFetcherInitFunc(borweinfetcher.BorweinModelResultFetcherName,
42-
borweinfetcher.NewBorweinModelResultFetcher)
43+
modelinputfetcher.RegisterModelInputFetcherInitFunc(genericinput.GenericModelInputFetcherName,
44+
genericinput.NewGenericModelInputFetcher)
45+
modelresultfetcher.RegisterModelResultFetcherInitFunc(borweinresult.BorweinModelResultFetcherName,
46+
borweinresult.NewBorweinModelResultFetcher)
4347
}
4448

4549
type InferencePlugin struct {
4650
name string
4751
// conf config.Configuration
4852

49-
period time.Duration
53+
period time.Duration
54+
55+
modelsInputFetchers map[string]modelinputfetcher.ModelInputFetcher
5056
modelsResultFetchers map[string]modelresultfetcher.ModelResultFetcher
5157

5258
metaServer *metaserver.MetaServer
@@ -78,6 +84,7 @@ func NewInferencePlugin(pluginName string, conf *config.Configuration, extraConf
7884
inferencePlugin := InferencePlugin{
7985
name: pluginName,
8086
period: conf.InferencePluginConfiguration.SyncPeriod,
87+
modelsInputFetchers: make(map[string]modelinputfetcher.ModelInputFetcher),
8188
modelsResultFetchers: make(map[string]modelresultfetcher.ModelResultFetcher),
8289
metaServer: metaServer,
8390
metricEmitter: metricEmitter,
@@ -87,6 +94,20 @@ func NewInferencePlugin(pluginName string, conf *config.Configuration, extraConf
8794
metaWriter: metaCache,
8895
}
8996

97+
for fetcherName, initFn := range modelinputfetcher.GetRegisteredModelInputFetcherInitFuncs() {
98+
// todo: support only enabling part of fetchers
99+
general.Infof("try init fetcher: %s", fetcherName)
100+
fetcher, err := initFn(fetcherName, conf, extraConf, emitterPool, metaServer, metaCache)
101+
if err != nil {
102+
return nil, fmt.Errorf("failed to start sysadvisor plugin %v: %v", pluginName, err)
103+
} else if fetcher == nil {
104+
general.Infof("fetcher: %s isn't enabled", fetcherName)
105+
continue
106+
}
107+
108+
inferencePlugin.modelsInputFetchers[fetcherName] = fetcher
109+
}
110+
90111
for fetcherName, initFn := range modelresultfetcher.GetRegisteredModelResultFetcherInitFuncs() {
91112
// todo: support only enabling part of fetchers
92113
general.Infof("try init fetcher: %s", fetcherName)
@@ -105,7 +126,7 @@ func NewInferencePlugin(pluginName string, conf *config.Configuration, extraConf
105126
}
106127

107128
func (infp *InferencePlugin) Run(ctx context.Context) {
108-
wait.UntilWithContext(ctx, infp.fetchModelResult, infp.period)
129+
wait.UntilWithContext(ctx, infp.inference, infp.period)
109130
}
110131

111132
// Name returns the name of inference plugin
@@ -118,6 +139,23 @@ func (infp *InferencePlugin) Init() error {
118139
return nil
119140
}
120141

142+
func (infp *InferencePlugin) fetchModelInput(ctx context.Context) {
143+
var wg sync.WaitGroup
144+
for modelName, fetcher := range infp.modelsInputFetchers {
145+
wg.Add(1)
146+
go func(modelName string, fetcher modelinputfetcher.ModelInputFetcher) {
147+
defer wg.Done()
148+
general.Infof("FetchModelInput for model: %s start", modelName)
149+
err := fetcher.FetchModelInput(ctx, infp.metaReader, infp.metaWriter, infp.metaServer)
150+
if err != nil {
151+
general.Errorf("FetchModelInput for model: %s failed with error: %v", modelName, err)
152+
}
153+
}(modelName, fetcher)
154+
}
155+
156+
wg.Wait()
157+
}
158+
121159
func (infp *InferencePlugin) fetchModelResult(ctx context.Context) {
122160
var wg sync.WaitGroup
123161

@@ -135,3 +173,8 @@ func (infp *InferencePlugin) fetchModelResult(ctx context.Context) {
135173

136174
wg.Wait()
137175
}
176+
177+
func (infp *InferencePlugin) inference(ctx context.Context) {
178+
infp.fetchModelInput(ctx)
179+
infp.fetchModelResult(ctx)
180+
}

pkg/agent/sysadvisor/plugin/inference/inference_test.go

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ import (
2222
"time"
2323

2424
"github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/metacache"
25+
"github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/plugin/inference/modelinputfetcher"
26+
genericinput "github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/plugin/inference/modelinputfetcher/generic"
2527
"github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/plugin/inference/modelresultfetcher"
2628
borweinfetcher "github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/plugin/inference/modelresultfetcher/borwein"
2729
"github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/types"
@@ -31,6 +33,13 @@ import (
3133
metricspool "github.com/kubewharf/katalyst-core/pkg/metrics/metrics-pool"
3234
)
3335

36+
func NewNilModelInputFetcher(fetcherName string, conf *config.Configuration, extraConf interface{},
37+
emitterPool metricspool.MetricsEmitterPool, metaServer *metaserver.MetaServer,
38+
metaCache metacache.MetaCache,
39+
) (modelinputfetcher.ModelInputFetcher, error) {
40+
return nil, nil
41+
}
42+
3443
func NewNilModelResultFetcher(fetcherName string, conf *config.Configuration, extraConf interface{},
3544
emitterPool metricspool.MetricsEmitterPool, metaServer *metaserver.MetaServer,
3645
metaCache metacache.MetaCache,
@@ -68,6 +77,10 @@ func TestNewInferencePlugin(t *testing.T) {
6877
},
6978
}
7079

80+
modelinputfetcher.RegisterModelInputFetcherInitFunc(genericinput.GenericModelInputFetcherName,
81+
modelinputfetcher.NewDummyModelInputFetcher)
82+
modelinputfetcher.RegisterModelInputFetcherInitFunc("test-nil-fetcher",
83+
NewNilModelInputFetcher)
7184
modelresultfetcher.RegisterModelResultFetcherInitFunc(borweinfetcher.BorweinModelResultFetcherName,
7285
modelresultfetcher.NewDummyModelResultFetcher)
7386
modelresultfetcher.RegisterModelResultFetcherInitFunc("test-nil-fetcher",
@@ -91,6 +104,7 @@ func TestInferencePlugin_Run(t *testing.T) {
91104
type fields struct {
92105
name string
93106
period time.Duration
107+
modelsInputFetchers map[string]modelinputfetcher.ModelInputFetcher
94108
modelsResultFetchers map[string]modelresultfetcher.ModelResultFetcher
95109
metaServer *metaserver.MetaServer
96110
emitter metrics.MetricEmitter
@@ -110,6 +124,9 @@ func TestInferencePlugin_Run(t *testing.T) {
110124
fields: fields{
111125
name: types.AdvisorPluginNameInference,
112126
period: 5 * time.Second,
127+
modelsInputFetchers: map[string]modelinputfetcher.ModelInputFetcher{
128+
"test": modelinputfetcher.DummyModelInputFetcher{},
129+
},
113130
modelsResultFetchers: map[string]modelresultfetcher.ModelResultFetcher{
114131
"test": modelresultfetcher.DummyModelResultFetcher{},
115132
},
@@ -128,6 +145,7 @@ func TestInferencePlugin_Run(t *testing.T) {
128145
infp := &InferencePlugin{
129146
name: tt.fields.name,
130147
period: tt.fields.period,
148+
modelsInputFetchers: tt.fields.modelsInputFetchers,
131149
modelsResultFetchers: tt.fields.modelsResultFetchers,
132150
metaServer: tt.fields.metaServer,
133151
metricEmitter: tt.fields.emitter,
@@ -145,6 +163,7 @@ func TestInferencePlugin_Name(t *testing.T) {
145163
type fields struct {
146164
name string
147165
period time.Duration
166+
modelsInputFetchers map[string]modelinputfetcher.ModelInputFetcher
148167
modelsResultFetchers map[string]modelresultfetcher.ModelResultFetcher
149168
metaServer *metaserver.MetaServer
150169
emitter metrics.MetricEmitter
@@ -161,6 +180,9 @@ func TestInferencePlugin_Name(t *testing.T) {
161180
fields: fields{
162181
name: types.AdvisorPluginNameInference,
163182
period: 5 * time.Second,
183+
modelsInputFetchers: map[string]modelinputfetcher.ModelInputFetcher{
184+
"test": modelinputfetcher.DummyModelInputFetcher{},
185+
},
164186
modelsResultFetchers: map[string]modelresultfetcher.ModelResultFetcher{
165187
"test": modelresultfetcher.DummyModelResultFetcher{},
166188
},
@@ -178,6 +200,7 @@ func TestInferencePlugin_Name(t *testing.T) {
178200
infp := &InferencePlugin{
179201
name: tt.fields.name,
180202
period: tt.fields.period,
203+
modelsInputFetchers: tt.fields.modelsInputFetchers,
181204
modelsResultFetchers: tt.fields.modelsResultFetchers,
182205
metaServer: tt.fields.metaServer,
183206
metricEmitter: tt.fields.emitter,
@@ -196,6 +219,7 @@ func TestInferencePlugin_Init(t *testing.T) {
196219
type fields struct {
197220
name string
198221
period time.Duration
222+
modelsInputFetchers map[string]modelinputfetcher.ModelInputFetcher
199223
modelsResultFetchers map[string]modelresultfetcher.ModelResultFetcher
200224
metaServer *metaserver.MetaServer
201225
emitter metrics.MetricEmitter
@@ -212,6 +236,9 @@ func TestInferencePlugin_Init(t *testing.T) {
212236
fields: fields{
213237
name: types.AdvisorPluginNameInference,
214238
period: 5 * time.Second,
239+
modelsInputFetchers: map[string]modelinputfetcher.ModelInputFetcher{
240+
"test": modelinputfetcher.DummyModelInputFetcher{},
241+
},
215242
modelsResultFetchers: map[string]modelresultfetcher.ModelResultFetcher{
216243
"test": modelresultfetcher.DummyModelResultFetcher{},
217244
},
@@ -229,6 +256,7 @@ func TestInferencePlugin_Init(t *testing.T) {
229256
infp := &InferencePlugin{
230257
name: tt.fields.name,
231258
period: tt.fields.period,
259+
modelsInputFetchers: tt.fields.modelsInputFetchers,
232260
modelsResultFetchers: tt.fields.modelsResultFetchers,
233261
metaServer: tt.fields.metaServer,
234262
metricEmitter: tt.fields.emitter,
@@ -291,3 +319,53 @@ func TestInferencePlugin_fetchModelResult(t *testing.T) {
291319
})
292320
}
293321
}
322+
323+
func TestInferencePlugin_fetchModelInput(t *testing.T) {
324+
t.Parallel()
325+
type fields struct {
326+
name string
327+
period time.Duration
328+
modelsInputFetchers map[string]modelinputfetcher.ModelInputFetcher
329+
metaServer *metaserver.MetaServer
330+
emitter metrics.MetricEmitter
331+
metaReader metacache.MetaReader
332+
metaWriter metacache.MetaWriter
333+
}
334+
type args struct {
335+
ctx context.Context
336+
}
337+
tests := []struct {
338+
name string
339+
fields fields
340+
args args
341+
}{
342+
{
343+
name: "test inference plugin fetching model input",
344+
fields: fields{
345+
name: types.AdvisorPluginNameInference,
346+
period: 5 * time.Second,
347+
modelsInputFetchers: map[string]modelinputfetcher.ModelInputFetcher{
348+
"test": modelinputfetcher.DummyModelInputFetcher{},
349+
},
350+
metaServer: &metaserver.MetaServer{},
351+
emitter: metrics.DummyMetrics{},
352+
},
353+
},
354+
}
355+
for _, tt := range tests {
356+
tt := tt
357+
t.Run(tt.name, func(t *testing.T) {
358+
t.Parallel()
359+
infp := &InferencePlugin{
360+
name: tt.fields.name,
361+
period: tt.fields.period,
362+
modelsInputFetchers: tt.fields.modelsInputFetchers,
363+
metaServer: tt.fields.metaServer,
364+
metricEmitter: tt.fields.emitter,
365+
metaReader: tt.fields.metaReader,
366+
metaWriter: tt.fields.metaWriter,
367+
}
368+
infp.fetchModelInput(tt.args.ctx)
369+
})
370+
}
371+
}

0 commit comments

Comments
 (0)