Skip to content

Commit 5ea5efe

Browse files
add checkpoint for npd fetcher
Signed-off-by: xudong.l <xudong.l@bytedance.com>
1 parent 34635a2 commit 5ea5efe

File tree

8 files changed

+284
-299
lines changed

8 files changed

+284
-299
lines changed

pkg/metaserver/metaserver.go

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -81,23 +81,20 @@ func NewMetaServer(clientSet *client.GenericClientSet, emitter metrics.MetricEmi
8181

8282
var npdFetcher npd.NPDFetcher
8383
if conf.EnableNPDFetcher {
84-
npdFetcher = npd.NewNPDFetcher(clientSet, metaAgent.CNCFetcher, conf.KCCConfiguration)
84+
npdFetcher, err = npd.NewNPDFetcher(clientSet, metaAgent.CNCFetcher, conf, emitter)
85+
if err != nil {
86+
return nil, fmt.Errorf("initializes npd fetcher failed: %s", err)
87+
}
8588
} else {
8689
npdFetcher = npd.NewDummyNPDFetcher()
8790
}
8891

89-
var resourcePackageManager resourcepackage.ResourcePackageManager
90-
resourcePackageManager, err = resourcepackage.NewResourcePackageManager(npdFetcher, conf, emitter)
91-
if err != nil {
92-
return nil, fmt.Errorf("initializes resource package manager failed: %s", err)
93-
}
94-
9592
return &MetaServer{
9693
MetaAgent: metaAgent,
9794
ConfigurationManager: configurationManager,
9895
ServiceProfilingManager: spd.NewServiceProfilingManager(spdFetcher),
9996
ExternalManager: external.InitExternalManager(metaAgent.PodFetcher),
100-
ResourcePackageManager: resourcePackageManager,
97+
ResourcePackageManager: resourcepackage.NewResourcePackageManager(npdFetcher),
10198
NPDFetcher: npdFetcher,
10299
}, nil
103100
}
@@ -114,7 +111,6 @@ func (m *MetaServer) Run(ctx context.Context) {
114111
go m.ConfigurationManager.Run(ctx)
115112
go m.ServiceProfilingManager.Run(ctx)
116113
go m.ExternalManager.Run(ctx)
117-
go m.ResourcePackageManager.Run(ctx)
118114

119115
m.Unlock()
120116
<-ctx.Done()

pkg/metaserver/metaserver_test.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ import (
3737
"github.com/kubewharf/katalyst-core/pkg/metaserver/agent/pod"
3838
"github.com/kubewharf/katalyst-core/pkg/metaserver/external"
3939
dynamicconfig "github.com/kubewharf/katalyst-core/pkg/metaserver/kcc"
40+
"github.com/kubewharf/katalyst-core/pkg/metaserver/npd"
4041
"github.com/kubewharf/katalyst-core/pkg/metaserver/resourcepackage"
4142
"github.com/kubewharf/katalyst-core/pkg/metaserver/spd"
4243
"github.com/kubewharf/katalyst-core/pkg/metrics"
@@ -61,10 +62,11 @@ func generateTestMetaServer(clientSet *client.GenericClientSet, conf *config.Con
6162
metrics.DummyMetrics{}, &pod.PodFetcherStub{}, &machine.KatalystMachineInfo{}),
6263
AgentConf: conf.MetaServerConfiguration.AgentConfiguration,
6364
},
65+
NPDFetcher: npd.NewDummyNPDFetcher(),
6466
ConfigurationManager: &dynamicconfig.DummyConfigurationManager{},
6567
ServiceProfilingManager: &spd.DummyServiceProfilingManager{},
6668
ExternalManager: &external.DummyExternalManager{},
67-
ResourcePackageManager: &resourcepackage.DummyResourcePackageManager{},
69+
ResourcePackageManager: resourcepackage.NewResourcePackageManager(npd.NewDummyNPDFetcher()),
6870
}
6971
}
7072

pkg/metaserver/resourcepackage/checkpoint.go renamed to pkg/metaserver/npd/checkpoint.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
1414
limitations under the License.
1515
*/
1616

17-
package resourcepackage
17+
package npd
1818

1919
import (
2020
"encoding/json"

pkg/metaserver/resourcepackage/checkpoint_test.go renamed to pkg/metaserver/npd/checkpoint_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
1414
limitations under the License.
1515
*/
1616

17-
package resourcepackage
17+
package npd
1818

1919
import (
2020
"testing"

pkg/metaserver/npd/fetcher.go

Lines changed: 110 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,36 @@ package npd
1818

1919
import (
2020
"context"
21+
"fmt"
22+
"time"
23+
24+
"github.com/pkg/errors"
25+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
26+
"k8s.io/klog/v2"
27+
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
2128

2229
"github.com/kubewharf/katalyst-api/pkg/apis/node/v1alpha1"
2330
"github.com/kubewharf/katalyst-core/pkg/client"
24-
metaconfig "github.com/kubewharf/katalyst-core/pkg/config/agent/metaserver"
31+
pkgconfig "github.com/kubewharf/katalyst-core/pkg/config"
2532
"github.com/kubewharf/katalyst-core/pkg/metaserver/agent/cnc"
2633
"github.com/kubewharf/katalyst-core/pkg/metaserver/kcc"
34+
"github.com/kubewharf/katalyst-core/pkg/metrics"
2735
"github.com/kubewharf/katalyst-core/pkg/util"
2836
)
2937

38+
const (
39+
metricsNameUpdateNPD = "metaserver_update_npd"
40+
metricsNameLoadNPDCheckpoint = "metaserver_load_npd_checkpoint"
41+
42+
metricsValueStatusCheckpointNotFoundOrCorrupted = "notFoundOrCorrupted"
43+
metricsValueStatusCheckpointInvalidOrExpired = "invalidOrExpired"
44+
metricsValueStatusCheckpointSuccess = "success"
45+
)
46+
47+
const (
48+
npdFetcherCheckpoint = "npd_fetcher_checkpoint"
49+
)
50+
3051
type NPDFetcher interface {
3152
GetNPD(ctx context.Context) (*v1alpha1.NodeProfileDescriptor, error)
3253
}
@@ -41,26 +62,110 @@ func (f *DummyNPDFetcher) GetNPD(_ context.Context) (*v1alpha1.NodeProfileDescri
4162

4263
type npdFetcher struct {
4364
configLoader kcc.ConfigurationLoader
65+
66+
// checkpoint stores recent fetched NPDs
67+
checkpointManager checkpointmanager.CheckpointManager
68+
69+
// checkpointGraceTime is the duration to consider a checkpoint valid
70+
checkpointGraceTime time.Duration
71+
72+
// emitter is the metric emitter
73+
emitter metrics.MetricEmitter
4474
}
4575

4676
func NewNPDFetcher(clientSet *client.GenericClientSet,
4777
cncFetcher cnc.CNCFetcher,
48-
conf *metaconfig.KCCConfiguration,
49-
) NPDFetcher {
78+
conf *pkgconfig.Configuration,
79+
emitter metrics.MetricEmitter,
80+
) (NPDFetcher, error) {
81+
checkpointManager, err := checkpointmanager.NewCheckpointManager(conf.CheckpointManagerDir)
82+
if err != nil {
83+
return nil, fmt.Errorf("failed to initialize checkpoint manager: %v", err)
84+
}
85+
5086
configLoader := kcc.NewKatalystCustomConfigLoader(clientSet, conf.ConfigCacheTTL, cncFetcher)
5187
return &npdFetcher{
52-
configLoader: configLoader,
53-
}
88+
configLoader: configLoader,
89+
checkpointManager: checkpointManager,
90+
checkpointGraceTime: conf.ConfigCheckpointGraceTime,
91+
emitter: emitter,
92+
}, nil
5493
}
5594

5695
func NewDummyNPDFetcher() *DummyNPDFetcher {
5796
return &DummyNPDFetcher{}
5897
}
5998

6099
func (f *npdFetcher) GetNPD(ctx context.Context) (*v1alpha1.NodeProfileDescriptor, error) {
100+
latestNpd, err := f.getNPD(ctx)
101+
if err != nil || latestNpd == nil {
102+
klog.Errorf("[npd-fetcher] try get new npd error: %v", err)
103+
// load from checkpoint if fetcher fails
104+
data, err := f.readCheckpoint()
105+
if err != nil {
106+
_ = f.emitter.StoreInt64(metricsNameUpdateNPD, 1, metrics.MetricTypeNameRaw, []metrics.MetricTag{
107+
{Key: "status", Val: metricsValueStatusCheckpointNotFoundOrCorrupted},
108+
}...)
109+
return nil, errors.Wrap(err, "failed to load npd checkpoint")
110+
}
111+
npdData, timestamp := data.GetProfile()
112+
if time.Now().Before(timestamp.Add(f.checkpointGraceTime)) && npdData != nil {
113+
latestNpd = npdData
114+
klog.Infof("[npd-fetcher] failed to load npd from remote, use local checkpoint instead")
115+
_ = f.emitter.StoreInt64(metricsNameLoadNPDCheckpoint, 1, metrics.MetricTypeNameRaw, []metrics.MetricTag{
116+
{Key: "status", Val: metricsValueStatusCheckpointSuccess},
117+
{Key: "npd", Val: latestNpd.Name},
118+
}...)
119+
} else {
120+
_ = f.emitter.StoreInt64(metricsNameLoadNPDCheckpoint, 1, metrics.MetricTypeNameRaw, []metrics.MetricTag{
121+
{Key: "status", Val: metricsValueStatusCheckpointInvalidOrExpired},
122+
{Key: "npd", Val: "invalid"},
123+
}...)
124+
return nil, fmt.Errorf("failed to load npd checkpoint, checkpoint expired or invalid")
125+
}
126+
}
127+
f.writeCheckpoint(latestNpd)
128+
return latestNpd, nil
129+
}
130+
131+
// getNPD retrieves the latest NodeProfileDescriptor from the config loader.
132+
func (f *npdFetcher) getNPD(ctx context.Context) (*v1alpha1.NodeProfileDescriptor, error) {
61133
npd := &v1alpha1.NodeProfileDescriptor{}
62134
if err := f.configLoader.LoadConfig(ctx, util.NPDGVR, npd); err != nil {
63135
return nil, err
64136
}
65137
return npd, nil
66138
}
139+
140+
// readCheckpoint reads the checkpoint from disk
141+
func (f *npdFetcher) readCheckpoint() (NodeProfileCheckpoint, error) {
142+
cp := NewCheckpoint(NodeProfileData{})
143+
if err := f.checkpointManager.GetCheckpoint(npdFetcherCheckpoint, cp); err != nil {
144+
klog.Errorf("[npd-fetcher] failed to get npd fetcher checkpoint: %v", err)
145+
return nil, err
146+
}
147+
return cp, nil
148+
}
149+
150+
// writeCheckpoint writes the checkpoint to disk
151+
func (f *npdFetcher) writeCheckpoint(npd *v1alpha1.NodeProfileDescriptor) {
152+
data, err := f.readCheckpoint()
153+
if err != nil {
154+
klog.Errorf("[npd-fetcher] load checkpoint from %q failed: %v, try to overwrite it", npdFetcherCheckpoint, err)
155+
_ = f.emitter.StoreInt64(metricsNameLoadNPDCheckpoint, 1, metrics.MetricTypeNameRaw, []metrics.MetricTag{
156+
{Key: "status", Val: metricsValueStatusCheckpointNotFoundOrCorrupted},
157+
{Key: "node", Val: npd.Name},
158+
}...)
159+
}
160+
// checkpoint doesn't exist or became corrupted, make a new checkpoint
161+
if data == nil {
162+
data = NewCheckpoint(NodeProfileData{})
163+
}
164+
165+
// set config value and timestamp for kind
166+
data.SetProfile(npd, metav1.Now())
167+
err = f.checkpointManager.CreateCheckpoint(npdFetcherCheckpoint, data)
168+
if err != nil {
169+
klog.Errorf("[npd-fetcher] failed to write checkpoint file %q: %v", npdFetcherCheckpoint, err)
170+
}
171+
}

0 commit comments

Comments
 (0)