Skip to content

Commit 1b7637c

Browse files
add checkpoint for npd fetcher
Signed-off-by: xudong.l <xudong.l@bytedance.com>
1 parent 34635a2 commit 1b7637c

File tree

8 files changed

+312
-299
lines changed

8 files changed

+312
-299
lines changed

pkg/metaserver/metaserver.go

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -81,23 +81,20 @@ func NewMetaServer(clientSet *client.GenericClientSet, emitter metrics.MetricEmi
8181

8282
var npdFetcher npd.NPDFetcher
8383
if conf.EnableNPDFetcher {
84-
npdFetcher = npd.NewNPDFetcher(clientSet, metaAgent.CNCFetcher, conf.KCCConfiguration)
84+
npdFetcher, err = npd.NewNPDFetcher(clientSet, metaAgent.CNCFetcher, conf, emitter)
85+
if err != nil {
86+
return nil, fmt.Errorf("initializes npd fetcher failed: %s", err)
87+
}
8588
} else {
8689
npdFetcher = npd.NewDummyNPDFetcher()
8790
}
8891

89-
var resourcePackageManager resourcepackage.ResourcePackageManager
90-
resourcePackageManager, err = resourcepackage.NewResourcePackageManager(npdFetcher, conf, emitter)
91-
if err != nil {
92-
return nil, fmt.Errorf("initializes resource package manager failed: %s", err)
93-
}
94-
9592
return &MetaServer{
9693
MetaAgent: metaAgent,
9794
ConfigurationManager: configurationManager,
9895
ServiceProfilingManager: spd.NewServiceProfilingManager(spdFetcher),
9996
ExternalManager: external.InitExternalManager(metaAgent.PodFetcher),
100-
ResourcePackageManager: resourcePackageManager,
97+
ResourcePackageManager: resourcepackage.NewResourcePackageManager(npdFetcher),
10198
NPDFetcher: npdFetcher,
10299
}, nil
103100
}
@@ -114,7 +111,7 @@ func (m *MetaServer) Run(ctx context.Context) {
114111
go m.ConfigurationManager.Run(ctx)
115112
go m.ServiceProfilingManager.Run(ctx)
116113
go m.ExternalManager.Run(ctx)
117-
go m.ResourcePackageManager.Run(ctx)
114+
go m.NPDFetcher.Run(ctx)
118115

119116
m.Unlock()
120117
<-ctx.Done()

pkg/metaserver/metaserver_test.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ import (
3737
"github.com/kubewharf/katalyst-core/pkg/metaserver/agent/pod"
3838
"github.com/kubewharf/katalyst-core/pkg/metaserver/external"
3939
dynamicconfig "github.com/kubewharf/katalyst-core/pkg/metaserver/kcc"
40+
"github.com/kubewharf/katalyst-core/pkg/metaserver/npd"
4041
"github.com/kubewharf/katalyst-core/pkg/metaserver/resourcepackage"
4142
"github.com/kubewharf/katalyst-core/pkg/metaserver/spd"
4243
"github.com/kubewharf/katalyst-core/pkg/metrics"
@@ -61,10 +62,11 @@ func generateTestMetaServer(clientSet *client.GenericClientSet, conf *config.Con
6162
metrics.DummyMetrics{}, &pod.PodFetcherStub{}, &machine.KatalystMachineInfo{}),
6263
AgentConf: conf.MetaServerConfiguration.AgentConfiguration,
6364
},
65+
NPDFetcher: npd.NewDummyNPDFetcher(),
6466
ConfigurationManager: &dynamicconfig.DummyConfigurationManager{},
6567
ServiceProfilingManager: &spd.DummyServiceProfilingManager{},
6668
ExternalManager: &external.DummyExternalManager{},
67-
ResourcePackageManager: &resourcepackage.DummyResourcePackageManager{},
69+
ResourcePackageManager: resourcepackage.NewResourcePackageManager(npd.NewDummyNPDFetcher()),
6870
}
6971
}
7072

pkg/metaserver/resourcepackage/checkpoint.go renamed to pkg/metaserver/npd/checkpoint.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
1414
limitations under the License.
1515
*/
1616

17-
package resourcepackage
17+
package npd
1818

1919
import (
2020
"encoding/json"

pkg/metaserver/resourcepackage/checkpoint_test.go renamed to pkg/metaserver/npd/checkpoint_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
1414
limitations under the License.
1515
*/
1616

17-
package resourcepackage
17+
package npd
1818

1919
import (
2020
"testing"

pkg/metaserver/npd/fetcher.go

Lines changed: 136 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,17 +18,47 @@ package npd
1818

1919
import (
2020
"context"
21+
"fmt"
22+
"time"
23+
24+
"github.com/pkg/errors"
25+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
26+
"k8s.io/apimachinery/pkg/util/wait"
27+
"k8s.io/klog/v2"
28+
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
2129

2230
"github.com/kubewharf/katalyst-api/pkg/apis/node/v1alpha1"
2331
"github.com/kubewharf/katalyst-core/pkg/client"
24-
metaconfig "github.com/kubewharf/katalyst-core/pkg/config/agent/metaserver"
32+
pkgconfig "github.com/kubewharf/katalyst-core/pkg/config"
2533
"github.com/kubewharf/katalyst-core/pkg/metaserver/agent/cnc"
2634
"github.com/kubewharf/katalyst-core/pkg/metaserver/kcc"
35+
"github.com/kubewharf/katalyst-core/pkg/metrics"
2736
"github.com/kubewharf/katalyst-core/pkg/util"
2837
)
2938

39+
const (
40+
updateConfigInterval = 5 * time.Second
41+
updateConfigJitterFactor = 0.5
42+
)
43+
44+
const (
45+
metricsNameUpdateNPD = "metaserver_update_npd"
46+
metricsNameLoadNPDCheckpoint = "metaserver_load_npd_checkpoint"
47+
48+
metricsValueStatusCheckpointNotFoundOrCorrupted = "notFoundOrCorrupted"
49+
metricsValueStatusCheckpointInvalidOrExpired = "invalidOrExpired"
50+
metricsValueStatusCheckpointSuccess = "success"
51+
)
52+
53+
const (
54+
npdFetcherCheckpoint = "npd_fetcher_checkpoint"
55+
)
56+
3057
type NPDFetcher interface {
3158
GetNPD(ctx context.Context) (*v1alpha1.NodeProfileDescriptor, error)
59+
60+
// Run starts the service profiling manager
61+
Run(ctx context.Context)
3262
}
3363

3464
type DummyNPDFetcher struct {
@@ -39,28 +69,129 @@ func (f *DummyNPDFetcher) GetNPD(_ context.Context) (*v1alpha1.NodeProfileDescri
3969
return f.NPD, nil
4070
}
4171

72+
func (f *DummyNPDFetcher) Run(_ context.Context) {}
73+
4274
type npdFetcher struct {
4375
configLoader kcc.ConfigurationLoader
76+
77+
// checkpoint stores recent fetched NPDs
78+
checkpointManager checkpointmanager.CheckpointManager
79+
80+
// checkpointGraceTime is the duration to consider a checkpoint valid
81+
checkpointGraceTime time.Duration
82+
83+
// emitter is the metric emitter
84+
emitter metrics.MetricEmitter
4485
}
4586

4687
func NewNPDFetcher(clientSet *client.GenericClientSet,
4788
cncFetcher cnc.CNCFetcher,
48-
conf *metaconfig.KCCConfiguration,
49-
) NPDFetcher {
89+
conf *pkgconfig.Configuration,
90+
emitter metrics.MetricEmitter,
91+
) (NPDFetcher, error) {
92+
checkpointManager, err := checkpointmanager.NewCheckpointManager(conf.CheckpointManagerDir)
93+
if err != nil {
94+
return nil, fmt.Errorf("failed to initialize checkpoint manager: %v", err)
95+
}
96+
5097
configLoader := kcc.NewKatalystCustomConfigLoader(clientSet, conf.ConfigCacheTTL, cncFetcher)
5198
return &npdFetcher{
52-
configLoader: configLoader,
53-
}
99+
configLoader: configLoader,
100+
checkpointManager: checkpointManager,
101+
checkpointGraceTime: conf.ConfigCheckpointGraceTime,
102+
emitter: emitter,
103+
}, nil
54104
}
55105

56106
func NewDummyNPDFetcher() *DummyNPDFetcher {
57107
return &DummyNPDFetcher{}
58108
}
59109

60110
func (f *npdFetcher) GetNPD(ctx context.Context) (*v1alpha1.NodeProfileDescriptor, error) {
111+
latestNpd, err := f.getNPD(ctx)
112+
if err != nil || latestNpd == nil {
113+
klog.Errorf("[npd-fetcher] try get new npd error: %v", err)
114+
// load from checkpoint if fetcher fails
115+
data, err := f.readCheckpoint()
116+
if err != nil {
117+
_ = f.emitter.StoreInt64(metricsNameUpdateNPD, 1, metrics.MetricTypeNameRaw, []metrics.MetricTag{
118+
{Key: "status", Val: metricsValueStatusCheckpointNotFoundOrCorrupted},
119+
}...)
120+
return nil, errors.Wrap(err, "failed to load npd checkpoint")
121+
}
122+
npdData, timestamp := data.GetProfile()
123+
if time.Now().Before(timestamp.Add(f.checkpointGraceTime)) && npdData != nil {
124+
latestNpd = npdData
125+
klog.Infof("[npd-fetcher] failed to load npd from remote, use local checkpoint instead")
126+
_ = f.emitter.StoreInt64(metricsNameLoadNPDCheckpoint, 1, metrics.MetricTypeNameRaw, []metrics.MetricTag{
127+
{Key: "status", Val: metricsValueStatusCheckpointSuccess},
128+
{Key: "npd", Val: latestNpd.Name},
129+
}...)
130+
} else {
131+
_ = f.emitter.StoreInt64(metricsNameLoadNPDCheckpoint, 1, metrics.MetricTypeNameRaw, []metrics.MetricTag{
132+
{Key: "status", Val: metricsValueStatusCheckpointInvalidOrExpired},
133+
{Key: "npd", Val: "invalid"},
134+
}...)
135+
return nil, fmt.Errorf("failed to load npd checkpoint, checkpoint expired or invalid")
136+
}
137+
}
138+
f.writeCheckpoint(latestNpd)
139+
return latestNpd, nil
140+
}
141+
142+
func (f *npdFetcher) Run(ctx context.Context) {
143+
go wait.JitterUntilWithContext(ctx, func(context.Context) {
144+
latestNpd, err := f.GetNPD(ctx)
145+
if err != nil {
146+
klog.Errorf("[npd-fetcher] failed to update npd checkpoint: %v", err)
147+
_ = f.emitter.StoreInt64(metricsNameUpdateNPD, 1, metrics.MetricTypeNameRaw, []metrics.MetricTag{
148+
{Key: "status", Val: "failed"},
149+
}...)
150+
return
151+
}
152+
f.writeCheckpoint(latestNpd)
153+
}, updateConfigInterval, updateConfigJitterFactor, true)
154+
<-ctx.Done()
155+
}
156+
157+
// getNPD retrieves the latest NodeProfileDescriptor from the config loader.
158+
func (f *npdFetcher) getNPD(ctx context.Context) (*v1alpha1.NodeProfileDescriptor, error) {
61159
npd := &v1alpha1.NodeProfileDescriptor{}
62160
if err := f.configLoader.LoadConfig(ctx, util.NPDGVR, npd); err != nil {
63161
return nil, err
64162
}
65163
return npd, nil
66164
}
165+
166+
// readCheckpoint reads the checkpoint from disk
167+
func (f *npdFetcher) readCheckpoint() (NodeProfileCheckpoint, error) {
168+
cp := NewCheckpoint(NodeProfileData{})
169+
if err := f.checkpointManager.GetCheckpoint(npdFetcherCheckpoint, cp); err != nil {
170+
klog.Errorf("[npd-fetcher] failed to get npd fetcher checkpoint: %v", err)
171+
return nil, err
172+
}
173+
return cp, nil
174+
}
175+
176+
// writeCheckpoint writes the checkpoint to disk
177+
func (f *npdFetcher) writeCheckpoint(npd *v1alpha1.NodeProfileDescriptor) {
178+
data, err := f.readCheckpoint()
179+
if err != nil {
180+
klog.Errorf("[npd-fetcher] load checkpoint from %q failed: %v, try to overwrite it", npdFetcherCheckpoint, err)
181+
_ = f.emitter.StoreInt64(metricsNameLoadNPDCheckpoint, 1, metrics.MetricTypeNameRaw, []metrics.MetricTag{
182+
{Key: "status", Val: metricsValueStatusCheckpointNotFoundOrCorrupted},
183+
{Key: "node", Val: npd.Name},
184+
}...)
185+
}
186+
// checkpoint doesn't exist or became corrupted, make a new checkpoint
187+
if data == nil {
188+
data = NewCheckpoint(NodeProfileData{})
189+
}
190+
191+
// set config value and timestamp for kind
192+
data.SetProfile(npd, metav1.Now())
193+
err = f.checkpointManager.CreateCheckpoint(npdFetcherCheckpoint, data)
194+
if err != nil {
195+
klog.Errorf("[npd-fetcher] failed to write checkpoint file %q: %v", npdFetcherCheckpoint, err)
196+
}
197+
}

0 commit comments

Comments
 (0)