Skip to content

Commit 82167ca

Browse files
committed
feat: remove old checkpoint file
feat: remove old checkpoint file feat: remove old checkpoint file revert: sysadvisor changes
1 parent 264e848 commit 82167ca

File tree

6 files changed

+61
-100
lines changed

6 files changed

+61
-100
lines changed

pkg/agent/qrm-plugins/cpu/dynamicpolicy/state/state_checkpoint.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,11 @@ func (sc *stateCheckpoint) tryMigrateState(topology *machine.CPUTopology, stateD
179179
return fmt.Errorf("[cpu_plugin] failed to store checkpoint state during end of migration: %v", err)
180180
}
181181

182+
// remove old checkpoint file
183+
if err = oldCheckpointManager.RemoveCheckpoint(sc.checkpointName); err != nil {
184+
return fmt.Errorf("[cpu_plugin] failed to remove old checkpoint: %v", err)
185+
}
186+
182187
klog.Infof("[cpu_plugin] migrate checkpoint succeeded")
183188
return nil
184189
}

pkg/agent/qrm-plugins/cpu/dynamicpolicy/state/state_test.go

Lines changed: 42 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3270,16 +3270,11 @@ func generateTestMachineStateFromPodEntries(topology *machine.CPUTopology, _ Pod
32703270
func TestTryMigrateState(t *testing.T) {
32713271
t.Parallel()
32723272

3273-
tmpDir, err := ioutil.TempDir("", "checkpoint-test")
3274-
assert.NoError(t, err)
3275-
defer os.RemoveAll(tmpDir)
3276-
3277-
inMemoryTmpDir, err := ioutil.TempDir("", "checkpoint-memory-test")
3278-
assert.NoError(t, err)
3279-
defer os.RemoveAll(inMemoryTmpDir)
3273+
tmpDir := t.TempDir()
3274+
inMemoryTmpDir := t.TempDir()
32803275

32813276
stateDir := filepath.Join(tmpDir, "state")
3282-
err = os.MkdirAll(stateDir, 0o775)
3277+
err := os.MkdirAll(stateDir, 0o775)
32833278
assert.NoError(t, err)
32843279

32853280
cpuTopology, err := machine.GenerateDummyCPUTopology(16, 2, 4)
@@ -3320,3 +3315,42 @@ func TestTryMigrateState(t *testing.T) {
33203315
assert.NoError(t, err)
33213316
assert.Equal(t, newCheckpoint, oldCheckpoint)
33223317
}
3318+
3319+
func TestTryMigrateStateCorrupted(t *testing.T) {
3320+
t.Parallel()
3321+
3322+
tmpDir := t.TempDir()
3323+
3324+
stateDir := filepath.Join(tmpDir, "state")
3325+
err := os.MkdirAll(stateDir, 0o775)
3326+
assert.NoError(t, err)
3327+
3328+
cpuTopology, err := machine.GenerateDummyCPUTopology(16, 2, 4)
3329+
assert.NoError(t, err)
3330+
3331+
policyName := "test-policy"
3332+
checkpointName := "test-checkpoint"
3333+
3334+
// create a corrupted old checkpoint
3335+
corruptedFile := filepath.Join(stateDir, fmt.Sprintf("%s", checkpointName))
3336+
err = ioutil.WriteFile(corruptedFile, []byte("corrupted data"), 0o644)
3337+
assert.NoError(t, err)
3338+
3339+
// create a new state checkpoint with a new checkpoint manager
3340+
sc := &stateCheckpoint{
3341+
policyName: policyName,
3342+
checkpointName: checkpointName,
3343+
cache: NewCPUPluginState(cpuTopology),
3344+
isInMemoryState: true,
3345+
skipStateCorruption: false,
3346+
GenerateMachineStateFromPodEntries: generateTestMachineStateFromPodEntries,
3347+
emitter: metrics.DummyMetrics{},
3348+
}
3349+
3350+
sc.checkpointManager, err = inmemorystate.CreateCheckpointManager(stateDir, tmpDir, true)
3351+
assert.NoError(t, err)
3352+
3353+
// call tryMigrateState
3354+
err = sc.tryMigrateState(cpuTopology, stateDir, NewCPUPluginCheckpoint())
3355+
assert.Error(t, err)
3356+
}

pkg/agent/qrm-plugins/memory/dynamicpolicy/state/state_checkpoint.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,11 @@ func (sc *stateCheckpoint) tryMigrateState(machineInfo *info.MachineInfo, reserv
191191
return fmt.Errorf("[memory_plugin] failed to store state during end of migration: %v", err)
192192
}
193193

194+
// remove old checkpoint file
195+
if err = oldCheckpointManager.RemoveCheckpoint(sc.checkpointName); err != nil {
196+
return fmt.Errorf("[memory_plugin] failed to remove old checkpoint: %v", err)
197+
}
198+
194199
klog.Infof("[memory_plugin] migrate checkpoint succeeded")
195200
return nil
196201
}

pkg/agent/qrm-plugins/memory/dynamicpolicy/state/state_test.go

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ limitations under the License.
1717
package state
1818

1919
import (
20-
"io/ioutil"
2120
"os"
2221
"path/filepath"
2322
"testing"
@@ -55,16 +54,11 @@ func TestGetWriteOnlyState(t *testing.T) {
5554
func TestTryMigrateState(t *testing.T) {
5655
t.Parallel()
5756

58-
tmpDir, err := ioutil.TempDir("", "checkpoint-test")
59-
assert.NoError(t, err)
60-
defer os.RemoveAll(tmpDir)
61-
62-
inMemoryTmpDir, err := ioutil.TempDir("", "checkpoint-memory-test")
63-
assert.NoError(t, err)
64-
defer os.RemoveAll(inMemoryTmpDir)
57+
tmpDir := t.TempDir()
58+
inMemoryTmpDir := t.TempDir()
6559

6660
stateDir := filepath.Join(tmpDir, "state")
67-
err = os.MkdirAll(stateDir, 0o775)
61+
err := os.MkdirAll(stateDir, 0o775)
6862
assert.NoError(t, err)
6963

7064
cpuTopology, err := machine.GenerateDummyCPUTopology(16, 2, 4)

pkg/agent/sysadvisor/metacache/inmemorystate.go

Lines changed: 0 additions & 31 deletions
This file was deleted.

pkg/agent/sysadvisor/metacache/metacache.go

Lines changed: 6 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -176,9 +176,6 @@ type MetaCacheImp struct {
176176

177177
// Lock for the entire MetaCache. Useful when you want to make multiple writes atomically.
178178
sync.Mutex
179-
180-
// Determines if we want to store the state in memory (tmpfs) or disk
181-
isInMemoryState bool
182179
}
183180

184181
var _ MetaCache = &MetaCacheImp{}
@@ -193,8 +190,7 @@ func NewMetaCacheImp(conf *config.Configuration, emitterPool metricspool.Metrics
193190
}
194191
}
195192
stateFileDir := conf.GenericSysAdvisorConfiguration.StateFileDirectory
196-
isInMemoryState := conf.GenericSysAdvisorConfiguration.EnableInMemoryState
197-
checkpointManager, err := CreateCheckpointManager(stateFileDir, TmpfsCheckpointPath, isInMemoryState)
193+
checkpointManager, err := checkpointmanager.NewCheckpointManager(stateFileDir)
198194
if err != nil {
199195
return nil, fmt.Errorf("failed to initialize checkpoint manager: %v", err)
200196
}
@@ -215,7 +211,7 @@ func NewMetaCacheImp(conf *config.Configuration, emitterPool metricspool.Metrics
215211
}
216212

217213
// Restore from checkpoint before any function call to metacache api
218-
if err := mc.restoreState(stateFileDir); err != nil {
214+
if err := mc.restoreState(); err != nil {
219215
return mc, err
220216
}
221217

@@ -632,14 +628,15 @@ func (mc *MetaCacheImp) storeState() error {
632628
return nil
633629
}
634630

635-
func (mc *MetaCacheImp) restoreState(stateDir string) error {
631+
func (mc *MetaCacheImp) restoreState() error {
636632
checkpoint := NewMetaCacheCheckpoint()
637633

638634
foundAndSkippedStateCorruption := false
639635
if err := mc.checkpointManager.GetCheckpoint(mc.checkpointName, checkpoint); err != nil {
640636
if err == errors.ErrCheckpointNotFound {
641-
// We cannot find checkpoint, so it is possible that previous checkpoint was stored in either disk or memory
642-
return mc.tryMigrateState(stateDir, checkpoint)
637+
// create a new store state
638+
klog.Infof("[metacache] checkpoint %v doesn't exist, create it", mc.checkpointName, err)
639+
return mc.storeState()
643640
} else if err == errors.ErrCorruptCheckpoint {
644641
if !mc.skipStateCorruption {
645642
return err
@@ -651,10 +648,6 @@ func (mc *MetaCacheImp) restoreState(stateDir string) error {
651648
}
652649
}
653650

654-
return mc.populateCacheAndState(checkpoint, foundAndSkippedStateCorruption)
655-
}
656-
657-
func (mc *MetaCacheImp) populateCacheAndState(checkpoint *MetaCacheCheckpoint, foundAndSkippedStateCorruption bool) error {
658651
mc.podEntries = checkpoint.PodEntries
659652
mc.poolEntries = checkpoint.PoolEntries
660653
mc.regionEntries = checkpoint.RegionEntries
@@ -670,45 +663,6 @@ func (mc *MetaCacheImp) populateCacheAndState(checkpoint *MetaCacheCheckpoint, f
670663
return nil
671664
}
672665

673-
// tryMigrateState tries to migrate state from disk to memory or from memory to disk during initialisation of meta cache
674-
func (mc *MetaCacheImp) tryMigrateState(stateDir string, checkpoint *MetaCacheCheckpoint) error {
675-
var foundAndSkippedStateCorruption bool
676-
general.InfoS("[metacache] trying to migrate state from disk to memory")
677-
678-
// Get the old checkpoint using the provided file directory
679-
oldCheckpointManager, err := CreateCheckpointManager(stateDir, TmpfsCheckpointPath, !mc.isInMemoryState)
680-
if err != nil {
681-
return fmt.Errorf("[metacache] failed to initialize old checkpoint manager for migration: %v", err)
682-
}
683-
684-
if err = oldCheckpointManager.GetCheckpoint(mc.checkpointName, checkpoint); err != nil {
685-
if err == errors.ErrCheckpointNotFound {
686-
// create a new store state
687-
klog.Infof("[metacache] checkpoint %v doesn't exist, create it", mc.checkpointName, err)
688-
return mc.storeState()
689-
} else if err == errors.ErrCorruptCheckpoint {
690-
if !mc.skipStateCorruption {
691-
return err
692-
}
693-
foundAndSkippedStateCorruption = true
694-
klog.Warningf("[metacache] restore checkpoint failed with err: %s, but we skip it", err)
695-
} else {
696-
return err
697-
}
698-
}
699-
700-
if err = mc.populateCacheAndState(checkpoint, foundAndSkippedStateCorruption); err != nil {
701-
return fmt.Errorf("[metacache] populateCacheAndState failed with error: %v", err)
702-
}
703-
704-
// always store state after migrating to new checkpoint
705-
if err = mc.storeState(); err != nil {
706-
return fmt.Errorf("[network_plugin] failed to store checkpoint state during end of migration: %v", err)
707-
}
708-
709-
return nil
710-
}
711-
712666
func (mc *MetaCacheImp) setContainerCreateTimestamp(podUID, containerName string, timestamp int64) {
713667
mc.containerCreateTimestamp[fmt.Sprintf("%s/%s", podUID, containerName)] = timestamp
714668
}

0 commit comments

Comments
 (0)