Skip to content

Commit 34635a2

Browse files
reconcile npd for resource package manager
Signed-off-by: xudong.l <xudong.l@bytedance.com>
1 parent abf54b6 commit 34635a2

File tree

6 files changed

+500
-7
lines changed

6 files changed

+500
-7
lines changed

pkg/metaserver/metaserver.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,12 +86,18 @@ func NewMetaServer(clientSet *client.GenericClientSet, emitter metrics.MetricEmi
8686
npdFetcher = npd.NewDummyNPDFetcher()
8787
}
8888

89+
var resourcePackageManager resourcepackage.ResourcePackageManager
90+
resourcePackageManager, err = resourcepackage.NewResourcePackageManager(npdFetcher, conf, emitter)
91+
if err != nil {
92+
return nil, fmt.Errorf("initializes resource package manager failed: %s", err)
93+
}
94+
8995
return &MetaServer{
9096
MetaAgent: metaAgent,
9197
ConfigurationManager: configurationManager,
9298
ServiceProfilingManager: spd.NewServiceProfilingManager(spdFetcher),
9399
ExternalManager: external.InitExternalManager(metaAgent.PodFetcher),
94-
ResourcePackageManager: resourcepackage.NewResourcePackageManager(npdFetcher),
100+
ResourcePackageManager: resourcePackageManager,
95101
NPDFetcher: npdFetcher,
96102
}, nil
97103
}
@@ -108,6 +114,7 @@ func (m *MetaServer) Run(ctx context.Context) {
108114
go m.ConfigurationManager.Run(ctx)
109115
go m.ServiceProfilingManager.Run(ctx)
110116
go m.ExternalManager.Run(ctx)
117+
go m.ResourcePackageManager.Run(ctx)
111118

112119
m.Unlock()
113120
<-ctx.Done()

pkg/metaserver/metaserver_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ import (
3737
"github.com/kubewharf/katalyst-core/pkg/metaserver/agent/pod"
3838
"github.com/kubewharf/katalyst-core/pkg/metaserver/external"
3939
dynamicconfig "github.com/kubewharf/katalyst-core/pkg/metaserver/kcc"
40+
"github.com/kubewharf/katalyst-core/pkg/metaserver/resourcepackage"
4041
"github.com/kubewharf/katalyst-core/pkg/metaserver/spd"
4142
"github.com/kubewharf/katalyst-core/pkg/metrics"
4243
"github.com/kubewharf/katalyst-core/pkg/util/machine"
@@ -63,6 +64,7 @@ func generateTestMetaServer(clientSet *client.GenericClientSet, conf *config.Con
6364
ConfigurationManager: &dynamicconfig.DummyConfigurationManager{},
6465
ServiceProfilingManager: &spd.DummyServiceProfilingManager{},
6566
ExternalManager: &external.DummyExternalManager{},
67+
ResourcePackageManager: &resourcepackage.DummyResourcePackageManager{},
6668
}
6769
}
6870

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
/*
2+
Copyright 2022 The Katalyst Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package resourcepackage
18+
19+
import (
20+
"encoding/json"
21+
"sync"
22+
23+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
24+
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
25+
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager/checksum"
26+
27+
nodev1alpha1 "github.com/kubewharf/katalyst-api/pkg/apis/node/v1alpha1"
28+
)
29+
30+
type NodeProfileCheckpoint interface {
31+
checkpointmanager.Checkpoint
32+
GetProfile() (*nodev1alpha1.NodeProfileDescriptor, metav1.Time)
33+
SetProfile(npd *nodev1alpha1.NodeProfileDescriptor, t metav1.Time)
34+
}
35+
36+
type NodeProfileData struct {
37+
// Value only store spec of dynamic config crd
38+
Value *nodev1alpha1.NodeProfileDescriptor
39+
Timestamp int64
40+
}
41+
42+
// Data holds checkpoint data and its checksum
43+
type Data struct {
44+
sync.Mutex
45+
Item *DataItem
46+
}
47+
48+
// DataItem stores the checkpoint data and checksum.
49+
type DataItem struct {
50+
Data NodeProfileData `json:"data"`
51+
Checksum checksum.Checksum `json:"checksum"`
52+
}
53+
54+
// NewCheckpoint returns an instance of Checkpoint
55+
func NewCheckpoint(npdData NodeProfileData) NodeProfileCheckpoint {
56+
return &Data{
57+
Item: &DataItem{
58+
Data: npdData,
59+
},
60+
}
61+
}
62+
63+
func (d *Data) MarshalCheckpoint() ([]byte, error) {
64+
d.Lock()
65+
defer d.Unlock()
66+
67+
d.Item.Checksum = checksum.New(d.Item.Data)
68+
return json.Marshal(*(d.Item))
69+
}
70+
71+
func (d *Data) UnmarshalCheckpoint(blob []byte) error {
72+
d.Lock()
73+
defer d.Unlock()
74+
75+
return json.Unmarshal(blob, d.Item)
76+
}
77+
78+
func (d *Data) VerifyChecksum() error {
79+
d.Lock()
80+
defer d.Unlock()
81+
82+
return d.Item.Checksum.Verify(d.Item.Data)
83+
}
84+
85+
// GetProfile retrieves a NodeProfileDescriptor.
86+
func (d *Data) GetProfile() (*nodev1alpha1.NodeProfileDescriptor, metav1.Time) {
87+
d.Lock()
88+
defer d.Unlock()
89+
return d.Item.Data.Value, metav1.Unix(d.Item.Data.Timestamp, 0)
90+
}
91+
92+
// SetProfile sets a NodeProfileDescriptor for a node.
93+
func (d *Data) SetProfile(npd *nodev1alpha1.NodeProfileDescriptor, t metav1.Time) {
94+
d.Lock()
95+
defer d.Unlock()
96+
97+
d.Item.Data = NodeProfileData{
98+
Value: npd,
99+
Timestamp: t.Unix(),
100+
}
101+
}
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/*
2+
Copyright 2022 The Katalyst Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package resourcepackage
18+
19+
import (
20+
"testing"
21+
22+
"github.com/stretchr/testify/assert"
23+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
24+
25+
nodev1alpha1 "github.com/kubewharf/katalyst-api/pkg/apis/node/v1alpha1"
26+
)
27+
28+
func TestNewCheckpoint(t *testing.T) {
29+
t.Parallel()
30+
31+
now := metav1.Now()
32+
33+
// Prepare a test NodeProfileDescriptor
34+
npd := &nodev1alpha1.NodeProfileDescriptor{
35+
ObjectMeta: metav1.ObjectMeta{
36+
Name: "node-a",
37+
Namespace: "default",
38+
},
39+
Spec: nodev1alpha1.NodeProfileDescriptorSpec{},
40+
Status: nodev1alpha1.NodeProfileDescriptorStatus{
41+
NodeMetrics: []nodev1alpha1.ScopedNodeMetrics{
42+
{
43+
Scope: "resource-package",
44+
},
45+
},
46+
},
47+
}
48+
49+
// Create a new checkpoint and set data
50+
cp := NewCheckpoint(NodeProfileData{})
51+
cp.SetProfile(npd, now)
52+
53+
// Marshal the checkpoint
54+
checkpointBytes, err := cp.MarshalCheckpoint()
55+
assert.NoError(t, err, "should marshal checkpoint successfully")
56+
57+
// Unmarshal into a new checkpoint object
58+
loaded := &Data{Item: &DataItem{}}
59+
err = loaded.UnmarshalCheckpoint(checkpointBytes)
60+
assert.NoError(t, err, "should unmarshal checkpoint successfully")
61+
62+
// Verify checksum integrity
63+
err = loaded.VerifyChecksum()
64+
assert.NoError(t, err, "checksum verification should pass")
65+
66+
// Retrieve stored profile and timestamp
67+
restoredProfile, ts := loaded.GetProfile()
68+
69+
assert.Equal(t, metav1.Unix(now.Unix(), 0), ts, "timestamp should match")
70+
assert.Equal(t, npd, restoredProfile, "restored profile should match original")
71+
}

0 commit comments

Comments
 (0)