Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ func (p *topologyAdapterImpl) GetTopologyZones(parentCtx context.Context) ([]*no
return nil, errors.Wrap(err, "get device zone topology failed")
}

return topologyZoneGenerator.GenerateTopologyZoneStatus(zoneAllocations, zoneResources, zoneAttributes, zoneSiblings), nil
return topologyZoneGenerator.GenerateTopologyZoneStatus(zoneAllocations, zoneResources, zoneAttributes, zoneSiblings, nil), nil
}

// GetTopologyPolicy return newest topology policy status
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,7 @@ func (r *headroomReporterPlugin) getReportNUMAReclaimedResource(reclaimedResourc
}
}

topologyZone := topologyZoneGenerator.GenerateTopologyZoneStatus(nil, zoneResources, nil, nil)
topologyZone := topologyZoneGenerator.GenerateTopologyZoneStatus(nil, zoneResources, nil, nil, nil)
value, err := json.Marshal(&topologyZone)
if err != nil {
return nil, fmt.Errorf("marshal topology zone failed: %s", err)
Expand Down
19 changes: 18 additions & 1 deletion pkg/metaserver/metaserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/kubewharf/katalyst-core/pkg/metaserver/external"
"github.com/kubewharf/katalyst-core/pkg/metaserver/kcc"
"github.com/kubewharf/katalyst-core/pkg/metaserver/npd"
"github.com/kubewharf/katalyst-core/pkg/metaserver/resourcepackage"
"github.com/kubewharf/katalyst-core/pkg/metaserver/spd"
"github.com/kubewharf/katalyst-core/pkg/metrics"
)
Expand All @@ -44,6 +45,7 @@ type MetaServer struct {
*agent.MetaAgent
kcc.ConfigurationManager
spd.ServiceProfilingManager
resourcepackage.ResourcePackageManager
external.ExternalManager
npd.NPDFetcher
}
Expand Down Expand Up @@ -79,7 +81,10 @@ func NewMetaServer(clientSet *client.GenericClientSet, emitter metrics.MetricEmi

var npdFetcher npd.NPDFetcher
if conf.EnableNPDFetcher {
npdFetcher = npd.NewNPDFetcher(clientSet, metaAgent.CNCFetcher, conf.KCCConfiguration)
npdFetcher, err = npd.NewNPDFetcher(clientSet, metaAgent.CNCFetcher, conf, emitter)
if err != nil {
return nil, fmt.Errorf("initializes npd fetcher failed: %s", err)
}
} else {
npdFetcher = npd.NewDummyNPDFetcher()
}
Expand All @@ -89,6 +94,7 @@ func NewMetaServer(clientSet *client.GenericClientSet, emitter metrics.MetricEmi
ConfigurationManager: configurationManager,
ServiceProfilingManager: spd.NewServiceProfilingManager(spdFetcher),
ExternalManager: external.InitExternalManager(metaAgent.PodFetcher),
ResourcePackageManager: resourcepackage.NewResourcePackageManager(npdFetcher),
NPDFetcher: npdFetcher,
}, nil
}
Expand Down Expand Up @@ -120,3 +126,14 @@ func (m *MetaServer) SetServiceProfilingManager(manager spd.ServiceProfilingMana
m.ServiceProfilingManager = manager
return nil
}

func (m *MetaServer) SetResourcePackageManager(manager resourcepackage.ResourcePackageManager) error {
m.Lock()
defer m.Unlock()
if m.start {
return fmt.Errorf("meta agent has already started, not allowed to set implementations")
}

m.ResourcePackageManager = manager
return nil
}
4 changes: 4 additions & 0 deletions pkg/metaserver/metaserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ import (
"github.com/kubewharf/katalyst-core/pkg/metaserver/agent/pod"
"github.com/kubewharf/katalyst-core/pkg/metaserver/external"
dynamicconfig "github.com/kubewharf/katalyst-core/pkg/metaserver/kcc"
"github.com/kubewharf/katalyst-core/pkg/metaserver/npd"
"github.com/kubewharf/katalyst-core/pkg/metaserver/resourcepackage"
"github.com/kubewharf/katalyst-core/pkg/metaserver/spd"
"github.com/kubewharf/katalyst-core/pkg/metrics"
"github.com/kubewharf/katalyst-core/pkg/util/machine"
Expand All @@ -60,9 +62,11 @@ func generateTestMetaServer(clientSet *client.GenericClientSet, conf *config.Con
metrics.DummyMetrics{}, &pod.PodFetcherStub{}, &machine.KatalystMachineInfo{}),
AgentConf: conf.MetaServerConfiguration.AgentConfiguration,
},
NPDFetcher: npd.NewDummyNPDFetcher(),
ConfigurationManager: &dynamicconfig.DummyConfigurationManager{},
ServiceProfilingManager: &spd.DummyServiceProfilingManager{},
ExternalManager: &external.DummyExternalManager{},
ResourcePackageManager: resourcepackage.NewResourcePackageManager(npd.NewDummyNPDFetcher()),
}
}

Expand Down
101 changes: 101 additions & 0 deletions pkg/metaserver/npd/checkpoint.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
Copyright 2022 The Katalyst Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package npd

import (
"encoding/json"
"sync"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager/checksum"

nodev1alpha1 "github.com/kubewharf/katalyst-api/pkg/apis/node/v1alpha1"
)

type NodeProfileCheckpoint interface {
checkpointmanager.Checkpoint
GetProfile() (*nodev1alpha1.NodeProfileDescriptor, metav1.Time)
SetProfile(npd *nodev1alpha1.NodeProfileDescriptor, t metav1.Time)
}

type NodeProfileData struct {
// Value store the actual NodeProfileDescriptor
Value *nodev1alpha1.NodeProfileDescriptor
Timestamp int64
}

// Data holds checkpoint data and its checksum
type Data struct {
sync.Mutex
Item *DataItem
}

// DataItem stores the checkpoint data and checksum.
type DataItem struct {
Data NodeProfileData `json:"data"`
Checksum checksum.Checksum `json:"checksum"`
}

// NewCheckpoint returns an instance of Checkpoint
func NewCheckpoint(npdData NodeProfileData) NodeProfileCheckpoint {
return &Data{
Item: &DataItem{
Data: npdData,
},
}
}

func (d *Data) MarshalCheckpoint() ([]byte, error) {
d.Lock()
defer d.Unlock()

d.Item.Checksum = checksum.New(d.Item.Data)
return json.Marshal(*(d.Item))
}

func (d *Data) UnmarshalCheckpoint(blob []byte) error {
d.Lock()
defer d.Unlock()

return json.Unmarshal(blob, d.Item)
}

func (d *Data) VerifyChecksum() error {
d.Lock()
defer d.Unlock()

return d.Item.Checksum.Verify(d.Item.Data)
}

// GetProfile retrieves a NodeProfileDescriptor.
func (d *Data) GetProfile() (*nodev1alpha1.NodeProfileDescriptor, metav1.Time) {
d.Lock()
defer d.Unlock()
return d.Item.Data.Value, metav1.Unix(d.Item.Data.Timestamp, 0)
}

// SetProfile sets a NodeProfileDescriptor for a node.
func (d *Data) SetProfile(npd *nodev1alpha1.NodeProfileDescriptor, t metav1.Time) {
d.Lock()
defer d.Unlock()

d.Item.Data = NodeProfileData{
Value: npd,
Timestamp: t.Unix(),
}
}
71 changes: 71 additions & 0 deletions pkg/metaserver/npd/checkpoint_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
Copyright 2022 The Katalyst Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package npd

import (
"testing"

"github.com/stretchr/testify/assert"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

nodev1alpha1 "github.com/kubewharf/katalyst-api/pkg/apis/node/v1alpha1"
)

func TestNewCheckpoint(t *testing.T) {
t.Parallel()

now := metav1.Now()

// Prepare a test NodeProfileDescriptor
npd := &nodev1alpha1.NodeProfileDescriptor{
ObjectMeta: metav1.ObjectMeta{
Name: "node-a",
Namespace: "default",
},
Spec: nodev1alpha1.NodeProfileDescriptorSpec{},
Status: nodev1alpha1.NodeProfileDescriptorStatus{
NodeMetrics: []nodev1alpha1.ScopedNodeMetrics{
{
Scope: "resource-package",
},
},
},
}

// Create a new checkpoint and set data
cp := NewCheckpoint(NodeProfileData{})
cp.SetProfile(npd, now)

// Marshal the checkpoint
checkpointBytes, err := cp.MarshalCheckpoint()
assert.NoError(t, err, "should marshal checkpoint successfully")

// Unmarshal into a new checkpoint object
loaded := &Data{Item: &DataItem{}}
err = loaded.UnmarshalCheckpoint(checkpointBytes)
assert.NoError(t, err, "should unmarshal checkpoint successfully")

// Verify checksum integrity
err = loaded.VerifyChecksum()
assert.NoError(t, err, "checksum verification should pass")

// Retrieve stored profile and timestamp
restoredProfile, ts := loaded.GetProfile()

assert.Equal(t, metav1.Unix(now.Unix(), 0), ts, "timestamp should match")
assert.Equal(t, npd, restoredProfile, "restored profile should match original")
}
Loading