Skip to content

Commit a29aa69

Browse files
committed
feat: refactor logic into new file
feat: refactor logic into new file feat: refactor logic into new file
1 parent 78d8d15 commit a29aa69

File tree

15 files changed

+482
-264
lines changed

15 files changed

+482
-264
lines changed

cmd/katalyst-agent/app/options/qrm/statedirectory/statedirectory_base.go

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,11 @@ type StateDirectoryOptions struct {
2626
StateFileDirectory string
2727
InMemoryStateFileDirectory string
2828
EnableInMemoryState bool
29-
HasPreStop bool
3029
}
3130

3231
func NewStateDirectoryOptions() *StateDirectoryOptions {
3332
return &StateDirectoryOptions{
34-
StateFileDirectory: "/var/lib/katalyst/qrm_advisor",
35-
InMemoryStateFileDirectory: "/dev/shm/qrm/state",
33+
StateFileDirectory: "/var/lib/katalyst/qrm_advisor",
3634
}
3735
}
3836

@@ -44,13 +42,11 @@ func (o *StateDirectoryOptions) AddFlags(fss *cliflag.NamedFlagSets) {
4442
o.InMemoryStateFileDirectory, "The in memory directory to store the state file.")
4543
fs.BoolVar(&o.EnableInMemoryState, "qrm-enable-in-memory-state",
4644
o.EnableInMemoryState, "if set true, the state will be stored in the in-memory directory.")
47-
fs.BoolVar(&o.HasPreStop, "qrm-has-pre-stop", o.HasPreStop, "if set true, we will be able to migrate state files")
4845
}
4946

5047
func (o *StateDirectoryOptions) ApplyTo(conf *statedirectory.StateDirectoryConfiguration) error {
5148
conf.StateFileDirectory = o.StateFileDirectory
5249
conf.InMemoryStateFileDirectory = o.InMemoryStateFileDirectory
5350
conf.EnableInMemoryState = o.EnableInMemoryState
54-
conf.HasPreStop = o.HasPreStop
5551
return nil
5652
}

pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy_test.go

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,6 @@ import (
2727
"testing"
2828
"time"
2929

30-
"github.com/kubewharf/katalyst-core/pkg/config/agent/qrm/statedirectory"
31-
3230
"github.com/stretchr/testify/mock"
3331
"github.com/stretchr/testify/require"
3432
"google.golang.org/grpc"
@@ -59,6 +57,7 @@ import (
5957
"github.com/kubewharf/katalyst-core/pkg/agent/utilcomponent/featuregatenegotiation"
6058
"github.com/kubewharf/katalyst-core/pkg/config"
6159
"github.com/kubewharf/katalyst-core/pkg/config/agent/dynamic"
60+
"github.com/kubewharf/katalyst-core/pkg/config/agent/qrm/statedirectory"
6261
"github.com/kubewharf/katalyst-core/pkg/config/generic"
6362
"github.com/kubewharf/katalyst-core/pkg/metaserver"
6463
"github.com/kubewharf/katalyst-core/pkg/metaserver/agent"
@@ -95,7 +94,9 @@ func generateSharedNumaBindingPoolAllocationMeta(poolName string) commonstate.Al
9594
return meta
9695
}
9796

98-
func getTestDynamicPolicyWithInitialization(topology *machine.CPUTopology, stateFileDirectory string) (*DynamicPolicy, error) {
97+
func getTestDynamicPolicyWithInitialization(
98+
topology *machine.CPUTopology, stateFileDirectory string,
99+
) (*DynamicPolicy, error) {
99100
dynamicPolicy, err := getTestDynamicPolicyWithoutInitialization(topology, stateFileDirectory)
100101
if err != nil {
101102
return nil, err
@@ -115,7 +116,9 @@ func getTestDynamicPolicyWithInitialization(topology *machine.CPUTopology, state
115116
return dynamicPolicy, nil
116117
}
117118

118-
func getTestDynamicPolicyWithoutInitialization(topology *machine.CPUTopology, stateFileDirectory string) (*DynamicPolicy, error) {
119+
func getTestDynamicPolicyWithoutInitialization(
120+
topology *machine.CPUTopology, stateFileDirectory string,
121+
) (*DynamicPolicy, error) {
119122
stateDirectoryConfig := &statedirectory.StateDirectoryConfiguration{
120123
StateFileDirectory: stateFileDirectory,
121124
}
@@ -6212,7 +6215,9 @@ type mockCPUAdvisor struct {
62126215
advisorapi.CPUAdvisorServer
62136216
}
62146217

6215-
func (m *mockCPUAdvisor) GetAdvice(ctx context.Context, in *advisorapi.GetAdviceRequest) (*advisorapi.GetAdviceResponse, error) {
6218+
func (m *mockCPUAdvisor) GetAdvice(
6219+
ctx context.Context, in *advisorapi.GetAdviceRequest,
6220+
) (*advisorapi.GetAdviceResponse, error) {
62166221
args := m.Called(ctx, in)
62176222
return args.Get(0).(*advisorapi.GetAdviceResponse), args.Error(1)
62186223
}
@@ -6222,11 +6227,15 @@ func (m *mockCPUAdvisor) ListAndWatch(in *advisorsvc.Empty, srv advisorapi.CPUAd
62226227
return args.Error(0)
62236228
}
62246229

6225-
func (m *mockCPUAdvisor) AddContainer(ctx context.Context, req *advisorsvc.ContainerMetadata) (*advisorsvc.AddContainerResponse, error) {
6230+
func (m *mockCPUAdvisor) AddContainer(
6231+
ctx context.Context, req *advisorsvc.ContainerMetadata,
6232+
) (*advisorsvc.AddContainerResponse, error) {
62266233
return &advisorsvc.AddContainerResponse{}, nil
62276234
}
62286235

6229-
func (m *mockCPUAdvisor) RemovePod(ctx context.Context, req *advisorsvc.RemovePodRequest) (*advisorsvc.RemovePodResponse, error) {
6236+
func (m *mockCPUAdvisor) RemovePod(
6237+
ctx context.Context, req *advisorsvc.RemovePodRequest,
6238+
) (*advisorsvc.RemovePodResponse, error) {
62306239
return &advisorsvc.RemovePodResponse{}, nil
62316240
}
62326241

pkg/agent/qrm-plugins/cpu/dynamicpolicy/state/state_checkpoint.go

Lines changed: 24 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -19,20 +19,18 @@ package state
1919
import (
2020
"fmt"
2121
"path"
22-
"path/filepath"
2322
"reflect"
2423
"sync"
2524
"time"
2625

2726
"k8s.io/klog/v2"
28-
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
2927
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager/errors"
3028

3129
"github.com/kubewharf/katalyst-core/pkg/config/agent/qrm/statedirectory"
3230
"github.com/kubewharf/katalyst-core/pkg/metrics"
33-
"github.com/kubewharf/katalyst-core/pkg/util/file"
3431
"github.com/kubewharf/katalyst-core/pkg/util/general"
3532
"github.com/kubewharf/katalyst-core/pkg/util/machine"
33+
"github.com/kubewharf/katalyst-core/pkg/util/qrmcheckpointmanager"
3634
)
3735

3836
const (
@@ -44,15 +42,16 @@ const (
4442
// go to in-memory State, and then go to disk State, i.e. in write-back mode
4543
type stateCheckpoint struct {
4644
sync.RWMutex
47-
cache *cpuPluginState
48-
policyName string
49-
checkpointManager checkpointmanager.CheckpointManager
50-
checkpointName string
45+
cache *cpuPluginState
46+
policyName string
47+
qrmCheckpointManager *qrmcheckpointmanager.QRMCheckpointManager
48+
checkpointName string
5149
// when we add new properties to checkpoint,
5250
// it will cause checkpoint corruption, and we should skip it
5351
skipStateCorruption bool
5452
GenerateMachineStateFromPodEntries GenerateMachineStateFromPodEntriesFunc
5553
emitter metrics.MetricEmitter
54+
hasPreStop bool
5655
}
5756

5857
var _ State = &stateCheckpoint{}
@@ -64,23 +63,24 @@ func NewCheckpointState(
6463
emitter metrics.MetricEmitter,
6564
) (State, error) {
6665
currentStateDir, otherStateDir := stateDirectoryConfig.GetCurrentAndOtherStateFileDirectory()
67-
hasPreStop := stateDirectoryConfig.HasPreStop
68-
checkpointManager, err := checkpointmanager.NewCheckpointManager(currentStateDir)
66+
// If there is an empty otherStateDir, this means that there is no pre-stop script in place
67+
hasPreStop := otherStateDir != ""
68+
qrmCheckpointManager, err := qrmcheckpointmanager.NewQRMCheckpointManager(currentStateDir, otherStateDir, checkpointName, "cpu_plugin")
6969
if err != nil {
7070
return nil, fmt.Errorf("failed to initialize checkpoint manager: %v", err)
7171
}
72-
7372
sc := &stateCheckpoint{
7473
cache: NewCPUPluginState(topology),
7574
policyName: policyName,
76-
checkpointManager: checkpointManager,
75+
qrmCheckpointManager: qrmCheckpointManager,
7776
checkpointName: checkpointName,
7877
skipStateCorruption: skipStateCorruption,
7978
GenerateMachineStateFromPodEntries: generateMachineStateFunc,
8079
emitter: emitter,
80+
hasPreStop: hasPreStop,
8181
}
8282

83-
if err := sc.restoreState(currentStateDir, otherStateDir, hasPreStop, topology); err != nil {
83+
if err := sc.restoreState(topology); err != nil {
8484
return nil, fmt.Errorf("could not restore state from checkpoint: %v, please drain this node and delete "+
8585
"the cpu plugin checkpoint file %q before restarting Kubelet", err, path.Join(currentStateDir, checkpointName))
8686
}
@@ -90,19 +90,17 @@ func NewCheckpointState(
9090

9191
// restoreState is first done by searching the current directory for the state file.
9292
// If it does not exist, we search the other directory for the state file and try to migrate the state file over to the current directory.
93-
func (sc *stateCheckpoint) restoreState(
94-
currentStateDir, otherStateDir string, hasPreStop bool, topology *machine.CPUTopology,
95-
) error {
93+
func (sc *stateCheckpoint) restoreState(topology *machine.CPUTopology) error {
9694
sc.Lock()
9795
defer sc.Unlock()
9896
var err error
9997
var foundAndSkippedStateCorruption bool
10098

10199
checkpoint := NewCPUPluginCheckpoint()
102-
if err = sc.checkpointManager.GetCheckpoint(sc.checkpointName, checkpoint); err != nil {
100+
if err = sc.qrmCheckpointManager.GetCurrentCheckpoint(sc.checkpointName, checkpoint, true); err != nil {
103101
if err == errors.ErrCheckpointNotFound {
104102
// We cannot find checkpoint, so it is possible that previous checkpoint was stored in either disk or memory
105-
return sc.tryMigrateState(topology, currentStateDir, otherStateDir, hasPreStop, checkpoint)
103+
return sc.tryMigrateState(topology, checkpoint)
106104
} else if err == errors.ErrCorruptCheckpoint {
107105
if !sc.skipStateCorruption {
108106
return err
@@ -155,27 +153,20 @@ func (sc *stateCheckpoint) populateCacheAndState(
155153
// tryMigrateState tries to migrate the state file from the other directory to current directory.
156154
// If the other directory does not have a state file, then we build a new checkpoint.
157155
func (sc *stateCheckpoint) tryMigrateState(
158-
topology *machine.CPUTopology, currentStateDir, otherStateDir string, hasPreStop bool,
159-
checkpoint *CPUPluginCheckpoint,
156+
topology *machine.CPUTopology, checkpoint *CPUPluginCheckpoint,
160157
) error {
161158
var foundAndSkippedStateCorruption bool
162159
klog.Infof("[cpu_plugin] trying to migrate state")
163160

164161
// Do not migrate and build new checkpoint if there is no pre-stop script
165-
if !hasPreStop {
162+
if !sc.hasPreStop {
166163
return sc.storeState()
167164
}
168165

169-
// Get the old checkpoint using the provided file directory
170-
oldCheckpointManager, err := checkpointmanager.NewCheckpointManager(otherStateDir)
171-
if err != nil {
172-
return fmt.Errorf("[cpu_plugin] failed to initialize old checkpoint manager for migration: %v", err)
173-
}
174-
175-
if err = oldCheckpointManager.GetCheckpoint(sc.checkpointName, checkpoint); err != nil {
166+
if err := sc.qrmCheckpointManager.GetPreviousCheckpoint(sc.checkpointName, checkpoint); err != nil {
176167
if err == errors.ErrCheckpointNotFound {
177168
// Old checkpoint file is not found, so we just store state in new checkpoint
178-
general.Infof("[cpu_plugin] checkpoint %v doesn't exist in dir %v, create it", sc.checkpointName, otherStateDir)
169+
general.Infof("[cpu_plugin] checkpoint %v doesn't exist, create it", sc.checkpointName)
179170
return sc.storeState()
180171
} else if err == errors.ErrCorruptCheckpoint {
181172
if !sc.skipStateCorruption {
@@ -188,41 +179,23 @@ func (sc *stateCheckpoint) tryMigrateState(
188179
}
189180
}
190181

191-
if err = sc.populateCacheAndState(topology, checkpoint, foundAndSkippedStateCorruption); err != nil {
182+
if err := sc.populateCacheAndState(topology, checkpoint, foundAndSkippedStateCorruption); err != nil {
192183
return fmt.Errorf("[cpu_plugin] failed to populate checkpoint state during state migration: %v", err)
193184
}
194185

195186
// always store state after migrating to new checkpoint
196-
if err = sc.storeState(); err != nil {
187+
if err := sc.storeState(); err != nil {
197188
return fmt.Errorf("[cpu_plugin] failed to store checkpoint state during end of migration: %v", err)
198189
}
199190

200-
// validate that the two files are equal
201-
equal, err := sc.checkpointFilesEqual(currentStateDir, otherStateDir)
202-
if err != nil {
203-
return fmt.Errorf("[cpu_plugin] failed to compare checkpoint files: %v", err)
204-
}
205-
if !equal {
206-
klog.Infof("[cpu_plugin] checkpoint files are not equal, migration failed, fall back to old checkpoint")
207-
sc.checkpointManager = oldCheckpointManager
208-
return nil
209-
}
210-
211-
// remove old checkpoint file
212-
if err = oldCheckpointManager.RemoveCheckpoint(sc.checkpointName); err != nil {
213-
return fmt.Errorf("[cpu_plugin] failed to remove old checkpoint: %v", err)
191+
if err := sc.qrmCheckpointManager.ValidateCheckpointFilesMigration(); err != nil {
192+
return fmt.Errorf("[cpu_plugin] ValidateCheckpointFilesMigration failed with error: %v", err)
214193
}
215194

216195
klog.Infof("[cpu_plugin] migrate checkpoint succeeded")
217196
return nil
218197
}
219198

220-
func (sc *stateCheckpoint) checkpointFilesEqual(currentStateDir, otherStateDir string) (bool, error) {
221-
currentFilePath := filepath.Join(currentStateDir, sc.checkpointName)
222-
otherFilePath := filepath.Join(otherStateDir, sc.checkpointName)
223-
return file.FilesEqual(currentFilePath, otherFilePath)
224-
}
225-
226199
func (sc *stateCheckpoint) StoreState() error {
227200
sc.Lock()
228201
defer sc.Unlock()
@@ -244,7 +217,7 @@ func (sc *stateCheckpoint) storeState() error {
244217
checkpoint.PodEntries = sc.cache.GetPodEntries()
245218
checkpoint.AllowSharedCoresOverlapReclaimedCores = sc.cache.GetAllowSharedCoresOverlapReclaimedCores()
246219

247-
err := sc.checkpointManager.CreateCheckpoint(sc.checkpointName, checkpoint)
220+
err := sc.qrmCheckpointManager.CreateCheckpoint(sc.checkpointName, checkpoint)
248221
if err != nil {
249222
klog.ErrorS(err, "Could not save checkpoint")
250223
return err

pkg/agent/qrm-plugins/cpu/dynamicpolicy/state/state_test.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ import (
4343
"github.com/kubewharf/katalyst-core/pkg/config/agent/qrm/statedirectory"
4444
"github.com/kubewharf/katalyst-core/pkg/metrics"
4545
"github.com/kubewharf/katalyst-core/pkg/util/machine"
46+
"github.com/kubewharf/katalyst-core/pkg/util/qrmcheckpointmanager"
4647
)
4748

4849
const (
@@ -3352,14 +3353,15 @@ func TestTryMigrateState(t *testing.T) {
33523353
skipStateCorruption: false,
33533354
GenerateMachineStateFromPodEntries: generateTestMachineStateFromPodEntries,
33543355
emitter: metrics.DummyMetrics{},
3356+
hasPreStop: tt.preStop,
33553357
}
33563358

33573359
// current checkpoint is pointing to the in memory directory
3358-
sc.checkpointManager, err = checkpointmanager.NewCheckpointManager(inMemoryTmpDir)
3360+
sc.qrmCheckpointManager, err = qrmcheckpointmanager.NewQRMCheckpointManager(inMemoryTmpDir, stateDir, checkpointName, "cpu_plugin")
33593361
assert.NoError(t, err)
33603362

33613363
newCheckpoint := NewCPUPluginCheckpoint()
3362-
err = sc.tryMigrateState(cpuTopology, inMemoryTmpDir, stateDir, tt.preStop, newCheckpoint)
3364+
err = sc.tryMigrateState(cpuTopology, newCheckpoint)
33633365

33643366
if tt.corruptFile {
33653367
assert.Error(t, err)
@@ -3368,7 +3370,7 @@ func TestTryMigrateState(t *testing.T) {
33683370
assert.NoError(t, err)
33693371

33703372
// check if new checkpoint is created and verify equality
3371-
err = sc.checkpointManager.GetCheckpoint(checkpointName, newCheckpoint)
3373+
err = sc.qrmCheckpointManager.GetCurrentCheckpoint(checkpointName, newCheckpoint, false)
33723374
assert.NoError(t, err)
33733375

33743376
// verify old checkpoint file existence

pkg/agent/qrm-plugins/memory/dynamicpolicy/policy_test.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,6 @@ import (
3232
"testing"
3333
"time"
3434

35-
"github.com/kubewharf/katalyst-core/pkg/config/agent/qrm/statedirectory"
36-
3735
"github.com/cilium/ebpf"
3836
"github.com/cilium/ebpf/rlimit"
3937
info "github.com/google/cadvisor/info/v1"
@@ -70,6 +68,7 @@ import (
7068
"github.com/kubewharf/katalyst-core/pkg/config/agent/dynamic"
7169
"github.com/kubewharf/katalyst-core/pkg/config/agent/global"
7270
qrmconfig "github.com/kubewharf/katalyst-core/pkg/config/agent/qrm"
71+
"github.com/kubewharf/katalyst-core/pkg/config/agent/qrm/statedirectory"
7372
"github.com/kubewharf/katalyst-core/pkg/config/generic"
7473
coreconsts "github.com/kubewharf/katalyst-core/pkg/consts"
7574
"github.com/kubewharf/katalyst-core/pkg/metaserver"
@@ -112,7 +111,9 @@ var fakeConf = &config.Configuration{
112111
},
113112
}
114113

115-
func getTestDynamicPolicyWithInitialization(topology *machine.CPUTopology, machineInfo *info.MachineInfo, stateFileDirectory string) (*DynamicPolicy, error) {
114+
func getTestDynamicPolicyWithInitialization(
115+
topology *machine.CPUTopology, machineInfo *info.MachineInfo, stateFileDirectory string,
116+
) (*DynamicPolicy, error) {
116117
reservedMemory, err := getReservedMemory(fakeConf, &metaserver.MetaServer{}, machineInfo)
117118
if err != nil {
118119
return nil, err
@@ -4871,7 +4872,9 @@ type mockMemoryAdvisor struct {
48714872
advisorsvc.AdvisorServiceServer
48724873
}
48734874

4874-
func (m *mockMemoryAdvisor) GetAdvice(ctx context.Context, in *advisorsvc.GetAdviceRequest) (*advisorsvc.GetAdviceResponse, error) {
4875+
func (m *mockMemoryAdvisor) GetAdvice(
4876+
ctx context.Context, in *advisorsvc.GetAdviceRequest,
4877+
) (*advisorsvc.GetAdviceResponse, error) {
48754878
args := m.Called(ctx, in)
48764879
return args.Get(0).(*advisorsvc.GetAdviceResponse), args.Error(1)
48774880
}

0 commit comments

Comments
 (0)