Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func NewNumaCPUPressureEvictionOptions() NumaCPUPressureEvictionOptions {
EnableEviction: false,
ThresholdMetPercentage: 0.7,
MetricRingSize: 4,
GracePeriod: 60,
GracePeriod: 0,
ThresholdExpandFactor: 1.1,
CandidateCount: 2,
WorkloadMetricsLabelKeys: []string{},
Expand Down
32 changes: 32 additions & 0 deletions pkg/agent/sysadvisor/metacache/metacache.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ type MetaReader interface {
GetFilteredInferenceResult(filterFunc func(result interface{}) (interface{}, error), modelName string) (interface{}, error)
// GetInferenceResult gets specified model inference result
GetInferenceResult(modelName string) (interface{}, error)
// GetModelInput gets model input, the dimension of model input is : "container", "numa", "node"
GetModelInput(metricDimension string) (metric map[string]interface{}, err error)

// GetSupportedWantedFeatureGates gets supported and wanted FeatureGates
GetSupportedWantedFeatureGates() (map[string]*advisorsvc.FeatureGate, error)
Expand Down Expand Up @@ -128,6 +130,9 @@ type MetaWriter interface {
// SetInferenceResult sets specified model inference result
SetInferenceResult(modelName string, result interface{}) error

// SetModelInput sets model input, the dimension of model input is : "container", "numa", "node"
SetModelInput(metricDimension string, metric map[string]interface{}) error

// SetSupportedWantedFeatureGates sets supported and wanted FeatureGates
SetSupportedWantedFeatureGates(featureGates map[string]*advisorsvc.FeatureGate) error
sync.Locker
Expand Down Expand Up @@ -169,6 +174,9 @@ type MetaCacheImp struct {
modelToResult map[string]interface{}
modelMutex sync.RWMutex

modelInput map[string]map[string]interface{}
modelInputMutex sync.RWMutex

featureGates map[string]*advisorsvc.FeatureGate
featureGatesMutex sync.RWMutex

Expand Down Expand Up @@ -206,6 +214,7 @@ func NewMetaCacheImp(conf *config.Configuration, emitterPool metricspool.Metrics
checkpointName: stateFileName,
emitter: emitter,
modelToResult: make(map[string]interface{}),
modelInput: make(map[string]map[string]interface{}),
featureGates: make(map[string]*advisorsvc.FeatureGate),
containerCreateTimestamp: make(map[string]int64),
}
Expand Down Expand Up @@ -319,6 +328,17 @@ func (mc *MetaCacheImp) GetInferenceResult(modelName string) (interface{}, error
return mc.GetFilteredInferenceResult(nil, modelName)
}

// GetModelInput gets model input, the dimension of model input is : "container", "numa", "node"
func (mc *MetaCacheImp) GetModelInput(metricDimension string) (map[string]interface{}, error) {
mc.modelInputMutex.RLock()
defer mc.modelInputMutex.RUnlock()

if mc.modelInput[metricDimension] == nil {
return nil, fmt.Errorf("model input for dimension: %s doesn't exist", metricDimension)
}
return mc.modelInput[metricDimension], nil
}

// GetSupportedWantedFeatureGates gets supported and wanted FeatureGates
func (mc *MetaCacheImp) GetSupportedWantedFeatureGates() (map[string]*advisorsvc.FeatureGate, error) {
mc.featureGatesMutex.RLock()
Expand Down Expand Up @@ -578,6 +598,18 @@ func (mc *MetaCacheImp) SetInferenceResult(modelName string, result interface{})
return nil
}

// SetModelInput sets model input
func (mc *MetaCacheImp) SetModelInput(metricDimension string, metric map[string]interface{}) error {
mc.modelInputMutex.Lock()
defer mc.modelInputMutex.Unlock()

if metric == nil {
return fmt.Errorf("nil model input")
}
mc.modelInput[metricDimension] = metric
return nil
}

// SetSupportedWantedFeatureGates sets supported and wanted FeatureGates
func (mc *MetaCacheImp) SetSupportedWantedFeatureGates(featureGates map[string]*advisorsvc.FeatureGate) error {
mc.featureGatesMutex.Lock()
Expand Down
23 changes: 23 additions & 0 deletions pkg/agent/sysadvisor/plugin/inference/consts/consts.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
Copyright 2022 The Katalyst Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package consts

const (
MetricDimensionNode = "node"
MetricDimensionNuma = "numa"
MetricDimensionContainer = "container"
)
53 changes: 48 additions & 5 deletions pkg/agent/sysadvisor/plugin/inference/inference.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@ import (

"github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/metacache"
"github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/plugin"
"github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/plugin/inference/modelinputfetcher"
genericinput "github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/plugin/inference/modelinputfetcher/generic"
"github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/plugin/inference/modelresultfetcher"
borweinfetcher "github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/plugin/inference/modelresultfetcher/borwein"
borweinresult "github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/plugin/inference/modelresultfetcher/borwein"
"github.com/kubewharf/katalyst-core/pkg/config"
metricemitter "github.com/kubewharf/katalyst-core/pkg/config/agent/sysadvisor/metric-emitter"
"github.com/kubewharf/katalyst-core/pkg/config/generic"
Expand All @@ -38,15 +40,19 @@ import (
)

func init() {
modelresultfetcher.RegisterModelResultFetcherInitFunc(borweinfetcher.BorweinModelResultFetcherName,
borweinfetcher.NewBorweinModelResultFetcher)
modelinputfetcher.RegisterModelInputFetcherInitFunc(genericinput.GenericModelInputFetcherName,
genericinput.NewGenericModelInputFetcher)
modelresultfetcher.RegisterModelResultFetcherInitFunc(borweinresult.BorweinModelResultFetcherName,
borweinresult.NewBorweinModelResultFetcher)
}

type InferencePlugin struct {
name string
// conf config.Configuration

period time.Duration
period time.Duration

modelsInputFetchers map[string]modelinputfetcher.ModelInputFetcher
modelsResultFetchers map[string]modelresultfetcher.ModelResultFetcher

metaServer *metaserver.MetaServer
Expand Down Expand Up @@ -78,6 +84,7 @@ func NewInferencePlugin(pluginName string, conf *config.Configuration, extraConf
inferencePlugin := InferencePlugin{
name: pluginName,
period: conf.InferencePluginConfiguration.SyncPeriod,
modelsInputFetchers: make(map[string]modelinputfetcher.ModelInputFetcher),
modelsResultFetchers: make(map[string]modelresultfetcher.ModelResultFetcher),
metaServer: metaServer,
metricEmitter: metricEmitter,
Expand All @@ -87,6 +94,20 @@ func NewInferencePlugin(pluginName string, conf *config.Configuration, extraConf
metaWriter: metaCache,
}

for fetcherName, initFn := range modelinputfetcher.GetRegisteredModelInputFetcherInitFuncs() {
// todo: support only enabling part of fetchers
general.Infof("try init fetcher: %s", fetcherName)
fetcher, err := initFn(fetcherName, conf, extraConf, emitterPool, metaServer, metaCache)
if err != nil {
return nil, fmt.Errorf("failed to start sysadvisor plugin %v: %v", pluginName, err)
} else if fetcher == nil {
general.Infof("fetcher: %s isn't enabled", fetcherName)
continue
}

inferencePlugin.modelsInputFetchers[fetcherName] = fetcher
}

for fetcherName, initFn := range modelresultfetcher.GetRegisteredModelResultFetcherInitFuncs() {
// todo: support only enabling part of fetchers
general.Infof("try init fetcher: %s", fetcherName)
Expand All @@ -105,7 +126,7 @@ func NewInferencePlugin(pluginName string, conf *config.Configuration, extraConf
}

func (infp *InferencePlugin) Run(ctx context.Context) {
wait.UntilWithContext(ctx, infp.fetchModelResult, infp.period)
wait.UntilWithContext(ctx, infp.inference, infp.period)
}

// Name returns the name of inference plugin
Expand All @@ -118,6 +139,23 @@ func (infp *InferencePlugin) Init() error {
return nil
}

func (infp *InferencePlugin) fetchModelInput(ctx context.Context) {
var wg sync.WaitGroup
for modelName, fetcher := range infp.modelsInputFetchers {
wg.Add(1)
go func(modelName string, fetcher modelinputfetcher.ModelInputFetcher) {
defer wg.Done()
general.Infof("FetchModelInput for model: %s start", modelName)
err := fetcher.FetchModelInput(ctx, infp.metaReader, infp.metaWriter, infp.metaServer)
if err != nil {
general.Errorf("FetchModelInput for model: %s failed with error: %v", modelName, err)
}
}(modelName, fetcher)
}

wg.Wait()
}

func (infp *InferencePlugin) fetchModelResult(ctx context.Context) {
var wg sync.WaitGroup

Expand All @@ -135,3 +173,8 @@ func (infp *InferencePlugin) fetchModelResult(ctx context.Context) {

wg.Wait()
}

func (infp *InferencePlugin) inference(ctx context.Context) {
infp.fetchModelInput(ctx)
infp.fetchModelResult(ctx)
}
78 changes: 78 additions & 0 deletions pkg/agent/sysadvisor/plugin/inference/inference_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"time"

"github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/metacache"
"github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/plugin/inference/modelinputfetcher"
genericinput "github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/plugin/inference/modelinputfetcher/generic"
"github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/plugin/inference/modelresultfetcher"
borweinfetcher "github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/plugin/inference/modelresultfetcher/borwein"
"github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/types"
Expand All @@ -31,6 +33,13 @@ import (
metricspool "github.com/kubewharf/katalyst-core/pkg/metrics/metrics-pool"
)

func NewNilModelInputFetcher(fetcherName string, conf *config.Configuration, extraConf interface{},
emitterPool metricspool.MetricsEmitterPool, metaServer *metaserver.MetaServer,
metaCache metacache.MetaCache,
) (modelinputfetcher.ModelInputFetcher, error) {
return nil, nil
}

func NewNilModelResultFetcher(fetcherName string, conf *config.Configuration, extraConf interface{},
emitterPool metricspool.MetricsEmitterPool, metaServer *metaserver.MetaServer,
metaCache metacache.MetaCache,
Expand Down Expand Up @@ -68,6 +77,10 @@ func TestNewInferencePlugin(t *testing.T) {
},
}

modelinputfetcher.RegisterModelInputFetcherInitFunc(genericinput.GenericModelInputFetcherName,
modelinputfetcher.NewDummyModelInputFetcher)
modelinputfetcher.RegisterModelInputFetcherInitFunc("test-nil-fetcher",
NewNilModelInputFetcher)
modelresultfetcher.RegisterModelResultFetcherInitFunc(borweinfetcher.BorweinModelResultFetcherName,
modelresultfetcher.NewDummyModelResultFetcher)
modelresultfetcher.RegisterModelResultFetcherInitFunc("test-nil-fetcher",
Expand All @@ -91,6 +104,7 @@ func TestInferencePlugin_Run(t *testing.T) {
type fields struct {
name string
period time.Duration
modelsInputFetchers map[string]modelinputfetcher.ModelInputFetcher
modelsResultFetchers map[string]modelresultfetcher.ModelResultFetcher
metaServer *metaserver.MetaServer
emitter metrics.MetricEmitter
Expand All @@ -110,6 +124,9 @@ func TestInferencePlugin_Run(t *testing.T) {
fields: fields{
name: types.AdvisorPluginNameInference,
period: 5 * time.Second,
modelsInputFetchers: map[string]modelinputfetcher.ModelInputFetcher{
"test": modelinputfetcher.DummyModelInputFetcher{},
},
modelsResultFetchers: map[string]modelresultfetcher.ModelResultFetcher{
"test": modelresultfetcher.DummyModelResultFetcher{},
},
Expand All @@ -128,6 +145,7 @@ func TestInferencePlugin_Run(t *testing.T) {
infp := &InferencePlugin{
name: tt.fields.name,
period: tt.fields.period,
modelsInputFetchers: tt.fields.modelsInputFetchers,
modelsResultFetchers: tt.fields.modelsResultFetchers,
metaServer: tt.fields.metaServer,
metricEmitter: tt.fields.emitter,
Expand All @@ -145,6 +163,7 @@ func TestInferencePlugin_Name(t *testing.T) {
type fields struct {
name string
period time.Duration
modelsInputFetchers map[string]modelinputfetcher.ModelInputFetcher
modelsResultFetchers map[string]modelresultfetcher.ModelResultFetcher
metaServer *metaserver.MetaServer
emitter metrics.MetricEmitter
Expand All @@ -161,6 +180,9 @@ func TestInferencePlugin_Name(t *testing.T) {
fields: fields{
name: types.AdvisorPluginNameInference,
period: 5 * time.Second,
modelsInputFetchers: map[string]modelinputfetcher.ModelInputFetcher{
"test": modelinputfetcher.DummyModelInputFetcher{},
},
modelsResultFetchers: map[string]modelresultfetcher.ModelResultFetcher{
"test": modelresultfetcher.DummyModelResultFetcher{},
},
Expand All @@ -178,6 +200,7 @@ func TestInferencePlugin_Name(t *testing.T) {
infp := &InferencePlugin{
name: tt.fields.name,
period: tt.fields.period,
modelsInputFetchers: tt.fields.modelsInputFetchers,
modelsResultFetchers: tt.fields.modelsResultFetchers,
metaServer: tt.fields.metaServer,
metricEmitter: tt.fields.emitter,
Expand All @@ -196,6 +219,7 @@ func TestInferencePlugin_Init(t *testing.T) {
type fields struct {
name string
period time.Duration
modelsInputFetchers map[string]modelinputfetcher.ModelInputFetcher
modelsResultFetchers map[string]modelresultfetcher.ModelResultFetcher
metaServer *metaserver.MetaServer
emitter metrics.MetricEmitter
Expand All @@ -212,6 +236,9 @@ func TestInferencePlugin_Init(t *testing.T) {
fields: fields{
name: types.AdvisorPluginNameInference,
period: 5 * time.Second,
modelsInputFetchers: map[string]modelinputfetcher.ModelInputFetcher{
"test": modelinputfetcher.DummyModelInputFetcher{},
},
modelsResultFetchers: map[string]modelresultfetcher.ModelResultFetcher{
"test": modelresultfetcher.DummyModelResultFetcher{},
},
Expand All @@ -229,6 +256,7 @@ func TestInferencePlugin_Init(t *testing.T) {
infp := &InferencePlugin{
name: tt.fields.name,
period: tt.fields.period,
modelsInputFetchers: tt.fields.modelsInputFetchers,
modelsResultFetchers: tt.fields.modelsResultFetchers,
metaServer: tt.fields.metaServer,
metricEmitter: tt.fields.emitter,
Expand Down Expand Up @@ -291,3 +319,53 @@ func TestInferencePlugin_fetchModelResult(t *testing.T) {
})
}
}

func TestInferencePlugin_fetchModelInput(t *testing.T) {
t.Parallel()
type fields struct {
name string
period time.Duration
modelsInputFetchers map[string]modelinputfetcher.ModelInputFetcher
metaServer *metaserver.MetaServer
emitter metrics.MetricEmitter
metaReader metacache.MetaReader
metaWriter metacache.MetaWriter
}
type args struct {
ctx context.Context
}
tests := []struct {
name string
fields fields
args args
}{
{
name: "test inference plugin fetching model input",
fields: fields{
name: types.AdvisorPluginNameInference,
period: 5 * time.Second,
modelsInputFetchers: map[string]modelinputfetcher.ModelInputFetcher{
"test": modelinputfetcher.DummyModelInputFetcher{},
},
metaServer: &metaserver.MetaServer{},
emitter: metrics.DummyMetrics{},
},
},
}
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
infp := &InferencePlugin{
name: tt.fields.name,
period: tt.fields.period,
modelsInputFetchers: tt.fields.modelsInputFetchers,
metaServer: tt.fields.metaServer,
metricEmitter: tt.fields.emitter,
metaReader: tt.fields.metaReader,
metaWriter: tt.fields.metaWriter,
}
infp.fetchModelInput(tt.args.ctx)
})
}
}
Loading