Skip to content

Commit 45d59bb

Browse files
add resource package manager to meta server
Signed-off-by: xudong.l <xudong.l@bytedance.com>
1 parent 221a24d commit 45d59bb

File tree

5 files changed

+502
-0
lines changed

5 files changed

+502
-0
lines changed

pkg/metaserver/metaserver.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import (
3131
"github.com/kubewharf/katalyst-core/pkg/metaserver/external"
3232
"github.com/kubewharf/katalyst-core/pkg/metaserver/kcc"
3333
"github.com/kubewharf/katalyst-core/pkg/metaserver/npd"
34+
"github.com/kubewharf/katalyst-core/pkg/metaserver/resourcepackage"
3435
"github.com/kubewharf/katalyst-core/pkg/metaserver/spd"
3536
"github.com/kubewharf/katalyst-core/pkg/metrics"
3637
)
@@ -44,6 +45,7 @@ type MetaServer struct {
4445
*agent.MetaAgent
4546
kcc.ConfigurationManager
4647
spd.ServiceProfilingManager
48+
resourcepackage.ResourcePackageManager
4749
external.ExternalManager
4850
npd.NPDFetcher
4951
}
@@ -89,6 +91,7 @@ func NewMetaServer(clientSet *client.GenericClientSet, emitter metrics.MetricEmi
8991
ConfigurationManager: configurationManager,
9092
ServiceProfilingManager: spd.NewServiceProfilingManager(spdFetcher),
9193
ExternalManager: external.InitExternalManager(metaAgent.PodFetcher),
94+
ResourcePackageManager: resourcepackage.NewResourcePackageManager(npdFetcher),
9295
NPDFetcher: npdFetcher,
9396
}, nil
9497
}
@@ -120,3 +123,14 @@ func (m *MetaServer) SetServiceProfilingManager(manager spd.ServiceProfilingMana
120123
m.ServiceProfilingManager = manager
121124
return nil
122125
}
126+
127+
func (m *MetaServer) SetResourcePackageManager(manager resourcepackage.ResourcePackageManager) error {
128+
m.Lock()
129+
defer m.Unlock()
130+
if m.start {
131+
return fmt.Errorf("meta agent has already started, not allowed to set implementations")
132+
}
133+
134+
m.ResourcePackageManager = manager
135+
return nil
136+
}
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
/*
2+
Copyright 2022 The Katalyst Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package resourcepackage
18+
19+
import (
20+
"context"
21+
"strconv"
22+
23+
"github.com/pkg/errors"
24+
25+
apierrors "k8s.io/apimachinery/pkg/util/errors"
26+
27+
nodev1alpha1 "github.com/kubewharf/katalyst-api/pkg/apis/node/v1alpha1"
28+
"github.com/kubewharf/katalyst-core/pkg/metaserver/npd"
29+
resourcepackage "github.com/kubewharf/katalyst-core/pkg/util/resource-package"
30+
)
31+
32+
// ResourcePackageManager provides access to a node's resource package division
33+
type ResourcePackageManager interface {
34+
// NodeResourcePackages returns the resource package division for the
35+
// specified node. The returned map's keys are NUMA IDs (as strings)
36+
// and the values are slices of ResourcePackage belonging to that
37+
// NUMA node: map[NUMA ID] -> []nodev1alpha1.ResourcePackage.
38+
NodeResourcePackages(ctx context.Context, nodeName string) (map[int][]nodev1alpha1.ResourcePackage, error)
39+
}
40+
41+
// resourcePackageManager is the default implementation of ResourcePackageManager
42+
type resourcePackageManager struct {
43+
// fetcher provides access to node-level package information (from
44+
// the NPD component).
45+
fetcher npd.NPDFetcher
46+
}
47+
48+
func (m *resourcePackageManager) NodeResourcePackages(ctx context.Context, nodeName string) (map[int][]nodev1alpha1.ResourcePackage, error) {
49+
npd, err := m.fetcher.GetNPD(context.Background())
50+
if err != nil {
51+
return nil, errors.Wrap(err, "get npd failed")
52+
}
53+
54+
resourcePackageMetrics := resourcepackage.ConvertNPDMetricsToResourcePackages(npd.Status.NodeMetrics)
55+
resourcePackageMap := make(map[int][]nodev1alpha1.ResourcePackage)
56+
57+
var errList []error
58+
for _, metric := range resourcePackageMetrics {
59+
numaID, err := strconv.Atoi(metric.NumaID)
60+
if err != nil {
61+
errList = append(errList, errors.Wrap(err, "numa ID invalid"))
62+
continue
63+
}
64+
resourcePackageMap[numaID] = metric.ResourcePackages
65+
}
66+
return resourcePackageMap, apierrors.NewAggregate(errList)
67+
}
68+
69+
// NewResourcePackageManager creates a new ResourcePackageManager that uses the provided NPD fetcher.
70+
func NewResourcePackageManager(fetcher npd.NPDFetcher) ResourcePackageManager {
71+
return &resourcePackageManager{
72+
fetcher: fetcher,
73+
}
74+
}
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
/*
2+
Copyright 2022 The Katalyst Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package resourcepackage
Lines changed: 208 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,208 @@
1+
/*
2+
Copyright 2022 The Katalyst Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package resourcepackage
18+
19+
import (
20+
"sort"
21+
22+
v1 "k8s.io/api/core/v1"
23+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
24+
25+
nodev1alpha1 "github.com/kubewharf/katalyst-api/pkg/apis/node/v1alpha1"
26+
)
27+
28+
const (
29+
MetricScope = "resource-package"
30+
metricLabelPackageName = "package-name"
31+
metricLabelNumaID = "numa-id"
32+
)
33+
34+
type ResourcePackageMetric struct {
35+
NumaID string `json:"numaID"`
36+
ResourcePackages []nodev1alpha1.ResourcePackage `json:"resourcePackages"`
37+
}
38+
39+
// ConvertResourcePoolsToNPDMetrics example:
40+
/* CNR
41+
status:
42+
topologyZone:
43+
- children:
44+
- name: "0"
45+
type: Numa
46+
resources:
47+
resourcePackages:
48+
- packageName: x8
49+
allocatable:
50+
cpu: 64
51+
memory: 512Gi
52+
- name: "1"
53+
type: Numa
54+
resources:
55+
resourcePackages:
56+
- packageName: x8
57+
allocatable:
58+
cpu: 64
59+
memory: 512Gi
60+
*/
61+
/* NPD
62+
status:
63+
nodeMetrics:
64+
- scope: "resource-package"
65+
metrics:
66+
- metricName: "cpu"
67+
metricLabels:
68+
package-name: "x8"
69+
numa-id: "0"
70+
aggregator: "min"
71+
value: "64"
72+
- metricName: "memory"
73+
metricLabels:
74+
package-name: "x8"
75+
numa-id: "0"
76+
aggregator: "min"
77+
value: "512Gi"
78+
- metricName: "cpu"
79+
metricLabels:
80+
package-name: "x8"
81+
numa-id: "1"
82+
aggregator: "min"
83+
value: "64"
84+
- metricName: "memory"
85+
metricLabels:
86+
package-name: "x8"
87+
numa-id: "1"
88+
aggregator: "min"
89+
value: "512Gi"
90+
*/
91+
92+
// ConvertResourcePackagesToNPDMetrics converts resource packages to NPD metrics.
93+
// converted npd metrics are sorted by packageName, numaID, and metricName
94+
func ConvertResourcePackagesToNPDMetrics(resourcePackageMetrics []ResourcePackageMetric, timestamp metav1.Time) []nodev1alpha1.ScopedNodeMetrics {
95+
m := nodev1alpha1.ScopedNodeMetrics{
96+
Scope: MetricScope,
97+
}
98+
minAggregator := nodev1alpha1.AggregatorMin
99+
for _, pkgMetric := range resourcePackageMetrics {
100+
for _, pkg := range pkgMetric.ResourcePackages {
101+
if pkg.Allocatable == nil {
102+
continue
103+
}
104+
var metrics []nodev1alpha1.MetricValue
105+
for r, q := range *pkg.Allocatable {
106+
metrics = append(metrics, nodev1alpha1.MetricValue{
107+
MetricName: r.String(),
108+
Value: q.DeepCopy(),
109+
Aggregator: &minAggregator,
110+
Timestamp: timestamp,
111+
MetricLabels: map[string]string{
112+
metricLabelPackageName: pkg.PackageName,
113+
metricLabelNumaID: pkgMetric.NumaID,
114+
},
115+
})
116+
}
117+
m.Metrics = append(m.Metrics, metrics...)
118+
}
119+
}
120+
121+
if len(m.Metrics) == 0 {
122+
return []nodev1alpha1.ScopedNodeMetrics{m}
123+
}
124+
// sort: numaID > packageName > metricName
125+
sort.Slice(m.Metrics, func(i, j int) bool {
126+
numai := m.Metrics[i].MetricLabels[metricLabelNumaID]
127+
numaj := m.Metrics[j].MetricLabels[metricLabelNumaID]
128+
if numai != numaj {
129+
return numai < numaj
130+
}
131+
pkgi := m.Metrics[i].MetricLabels[metricLabelPackageName]
132+
pkgj := m.Metrics[j].MetricLabels[metricLabelPackageName]
133+
if pkgi != pkgj {
134+
return pkgi < pkgj
135+
}
136+
return m.Metrics[i].MetricName < m.Metrics[j].MetricName
137+
})
138+
139+
return []nodev1alpha1.ScopedNodeMetrics{m}
140+
}
141+
142+
func updatePkgMapFromMetrics(metrics []nodev1alpha1.MetricValue, pkgMap map[string]map[string]nodev1alpha1.ResourcePackage) {
143+
for _, v := range metrics {
144+
numaID := v.MetricLabels[metricLabelNumaID]
145+
packageName := v.MetricLabels[metricLabelPackageName]
146+
if numaID == "" || packageName == "" {
147+
continue
148+
}
149+
if v.Aggregator == nil || *v.Aggregator != nodev1alpha1.AggregatorMin {
150+
continue
151+
}
152+
if _, ok := pkgMap[numaID]; !ok {
153+
pkgMap[numaID] = make(map[string]nodev1alpha1.ResourcePackage)
154+
}
155+
resourcePkgs := pkgMap[numaID]
156+
metric, ok := resourcePkgs[packageName]
157+
if !ok {
158+
metric = nodev1alpha1.ResourcePackage{
159+
PackageName: packageName,
160+
Allocatable: &v1.ResourceList{},
161+
}
162+
}
163+
if metric.Allocatable == nil {
164+
metric.Allocatable = &v1.ResourceList{}
165+
}
166+
(*metric.Allocatable)[v1.ResourceName(v.MetricName)] = v.Value.DeepCopy()
167+
resourcePkgs[packageName] = metric
168+
}
169+
}
170+
171+
// ConvertNPDMetricsToResourcePackages converts NPD metrics to resource packages.
172+
// converted resource packages are sorted by packageName and numaID
173+
func ConvertNPDMetricsToResourcePackages(metrics []nodev1alpha1.ScopedNodeMetrics) []ResourcePackageMetric {
174+
// numa id -> package name -> resource package metric
175+
pkgMap := make(map[string]map[string]nodev1alpha1.ResourcePackage)
176+
177+
for _, m := range metrics {
178+
if m.Scope != MetricScope {
179+
continue
180+
}
181+
updatePkgMapFromMetrics(m.Metrics, pkgMap)
182+
}
183+
// sort: numaID > packageName
184+
var packageMetrics []ResourcePackageMetric
185+
numaIDs := make([]string, 0, len(pkgMap))
186+
for numaID := range pkgMap {
187+
numaIDs = append(numaIDs, numaID)
188+
}
189+
sort.Strings(numaIDs)
190+
for _, numaID := range numaIDs {
191+
pkgMetric := ResourcePackageMetric{
192+
NumaID: numaID,
193+
ResourcePackages: []nodev1alpha1.ResourcePackage{},
194+
}
195+
resourcePkgs := pkgMap[numaID]
196+
// sort package names within this numa
197+
pkgNames := make([]string, 0, len(resourcePkgs))
198+
for pkgName := range resourcePkgs {
199+
pkgNames = append(pkgNames, pkgName)
200+
}
201+
sort.Strings(pkgNames)
202+
for _, pkgName := range pkgNames {
203+
pkgMetric.ResourcePackages = append(pkgMetric.ResourcePackages, resourcePkgs[pkgName])
204+
}
205+
packageMetrics = append(packageMetrics, pkgMetric)
206+
}
207+
return packageMetrics
208+
}

0 commit comments

Comments
 (0)