diff --git a/cmd/katalyst-agent/app/options/qrm/qrm_base.go b/cmd/katalyst-agent/app/options/qrm/qrm_base.go index 349bf91e24..c02a6ddd9d 100644 --- a/cmd/katalyst-agent/app/options/qrm/qrm_base.go +++ b/cmd/katalyst-agent/app/options/qrm/qrm_base.go @@ -19,12 +19,12 @@ package qrm import ( cliflag "k8s.io/component-base/cli/flag" + "github.com/kubewharf/katalyst-core/cmd/katalyst-agent/app/options/qrm/statedirectory" qrmconfig "github.com/kubewharf/katalyst-core/pkg/config/agent/qrm" ) type GenericQRMPluginOptions struct { QRMPluginSocketDirs []string - StateFileDirectory string ExtraStateFileAbsPath string PodDebugAnnoKeys []string UseKubeletReservedConfig bool @@ -32,15 +32,16 @@ type GenericQRMPluginOptions struct { PodLabelKeptKeys []string EnableReclaimNUMABinding bool EnableSNBHighNumaPreference bool + *statedirectory.StateDirectoryOptions } func NewGenericQRMPluginOptions() *GenericQRMPluginOptions { return &GenericQRMPluginOptions{ QRMPluginSocketDirs: []string{"/var/lib/kubelet/plugins_registry"}, - StateFileDirectory: "/var/lib/katalyst/qrm_advisor", PodDebugAnnoKeys: []string{}, PodAnnotationKeptKeys: []string{}, PodLabelKeptKeys: []string{}, + StateDirectoryOptions: statedirectory.NewStateDirectoryOptions(), } } @@ -49,7 +50,6 @@ func (o *GenericQRMPluginOptions) AddFlags(fss *cliflag.NamedFlagSets) { fs.StringSliceVar(&o.QRMPluginSocketDirs, "qrm-socket-dirs", o.QRMPluginSocketDirs, "socket file directories that qrm plugins communicate witch other components") - fs.StringVar(&o.StateFileDirectory, "qrm-state-dir", o.StateFileDirectory, "Directory that qrm plugins are using") fs.StringVar(&o.ExtraStateFileAbsPath, "qrm-extra-state-file", o.ExtraStateFileAbsPath, "The absolute path to an extra state file to specify cpuset.mems for specific pods") fs.StringSliceVar(&o.PodDebugAnnoKeys, "qrm-pod-debug-anno-keys", o.PodDebugAnnoKeys, "pod annotations keys to identify the pod is a debug pod, and qrm plugins will apply specific strategy to it") @@ -63,11 +63,11 @@ func (o *GenericQRMPluginOptions) AddFlags(fss *cliflag.NamedFlagSets) { o.EnableReclaimNUMABinding, "if set true, reclaim pod will be allocated on a specific NUMA node best-effort, otherwise, reclaim pod will be allocated on multi NUMA nodes") fs.BoolVar(&o.EnableSNBHighNumaPreference, "enable-snb-high-numa-preference", o.EnableSNBHighNumaPreference, "default false,if set true, snb pod will be preferentially allocated on high numa node") + o.StateDirectoryOptions.AddFlags(fss) } func (o *GenericQRMPluginOptions) ApplyTo(conf *qrmconfig.GenericQRMPluginConfiguration) error { conf.QRMPluginSocketDirs = o.QRMPluginSocketDirs - conf.StateFileDirectory = o.StateFileDirectory conf.ExtraStateFileAbsPath = o.ExtraStateFileAbsPath conf.PodDebugAnnoKeys = o.PodDebugAnnoKeys conf.UseKubeletReservedConfig = o.UseKubeletReservedConfig @@ -76,6 +76,10 @@ func (o *GenericQRMPluginOptions) ApplyTo(conf *qrmconfig.GenericQRMPluginConfig conf.EnableReclaimNUMABinding = o.EnableReclaimNUMABinding conf.EnableSNBHighNumaPreference = o.EnableSNBHighNumaPreference + if err := o.StateDirectoryOptions.ApplyTo(conf.StateDirectoryConfiguration); err != nil { + return err + } + return nil } diff --git a/cmd/katalyst-agent/app/options/qrm/statedirectory/statedirectory_base.go b/cmd/katalyst-agent/app/options/qrm/statedirectory/statedirectory_base.go new file mode 100644 index 0000000000..78edd00405 --- /dev/null +++ b/cmd/katalyst-agent/app/options/qrm/statedirectory/statedirectory_base.go @@ -0,0 +1,57 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package statedirectory + +import ( + cliflag "k8s.io/component-base/cli/flag" + + "github.com/kubewharf/katalyst-core/pkg/config/agent/qrm/statedirectory" +) + +type StateDirectoryOptions struct { + StateFileDirectory string + InMemoryStateFileDirectory string + EnableInMemoryState bool + HasPreStop bool +} + +func NewStateDirectoryOptions() *StateDirectoryOptions { + return &StateDirectoryOptions{ + StateFileDirectory: "/var/lib/katalyst/qrm_advisor", + InMemoryStateFileDirectory: "/dev/shm/qrm/state", + } +} + +func (o *StateDirectoryOptions) AddFlags(fss *cliflag.NamedFlagSets) { + fs := fss.FlagSet("qrm_state_directory") + + fs.StringVar(&o.StateFileDirectory, "qrm-state-dir", o.StateFileDirectory, "The directory to store the state file.") + fs.StringVar(&o.InMemoryStateFileDirectory, "qrm-state-dir-in-memory", + o.InMemoryStateFileDirectory, "The in memory directory to store the state file.") + fs.BoolVar(&o.EnableInMemoryState, "qrm-enable-in-memory-state", + o.EnableInMemoryState, "if set true, the state will be stored in the in-memory directory.") + fs.BoolVar(&o.HasPreStop, "qrm-has-pre-stop", + o.HasPreStop, "if set true, there is a pre-stop script in place.") +} + +func (o *StateDirectoryOptions) ApplyTo(conf *statedirectory.StateDirectoryConfiguration) error { + conf.StateFileDirectory = o.StateFileDirectory + conf.InMemoryStateFileDirectory = o.InMemoryStateFileDirectory + conf.EnableInMemoryState = o.EnableInMemoryState + conf.HasPreStop = o.HasPreStop + return nil +} diff --git a/pkg/agent/qrm-plugins/cpu/dynamicpolicy/cpueviction/cpu_eviciton_test.go b/pkg/agent/qrm-plugins/cpu/dynamicpolicy/cpueviction/cpu_eviciton_test.go index 3a95fd6afd..64e9868338 100644 --- a/pkg/agent/qrm-plugins/cpu/dynamicpolicy/cpueviction/cpu_eviciton_test.go +++ b/pkg/agent/qrm-plugins/cpu/dynamicpolicy/cpueviction/cpu_eviciton_test.go @@ -27,6 +27,7 @@ import ( qrmstate "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/cpu/dynamicpolicy/state" "github.com/kubewharf/katalyst-core/pkg/config" + "github.com/kubewharf/katalyst-core/pkg/config/agent/qrm/statedirectory" "github.com/kubewharf/katalyst-core/pkg/metaserver" "github.com/kubewharf/katalyst-core/pkg/metaserver/agent" "github.com/kubewharf/katalyst-core/pkg/metaserver/agent/metric" @@ -50,10 +51,13 @@ func makeMetaServer(metricsFetcher types.MetricsFetcher, cpuTopology *machine.CP func makeState(topo *machine.CPUTopology) (qrmstate.State, error) { tmpDir, err := os.MkdirTemp("", "checkpoint-makeState") + stateDirectoryConfig := &statedirectory.StateDirectoryConfiguration{ + StateFileDirectory: tmpDir, + } if err != nil { return nil, fmt.Errorf("make tmp dir for checkpoint failed with error: %v", err) } - return qrmstate.NewCheckpointState(tmpDir, "test", "test", topo, false, qrmstate.GenerateMachineStateFromPodEntries, metrics.DummyMetrics{}) + return qrmstate.NewCheckpointState(stateDirectoryConfig, "test", "test", topo, false, qrmstate.GenerateMachineStateFromPodEntries, metrics.DummyMetrics{}) } func TestNewCPUPressureEviction(t *testing.T) { diff --git a/pkg/agent/qrm-plugins/cpu/dynamicpolicy/cpueviction/strategy/pressure_load_test.go b/pkg/agent/qrm-plugins/cpu/dynamicpolicy/cpueviction/strategy/pressure_load_test.go index 1eb8eb366e..825e3f6e60 100644 --- a/pkg/agent/qrm-plugins/cpu/dynamicpolicy/cpueviction/strategy/pressure_load_test.go +++ b/pkg/agent/qrm-plugins/cpu/dynamicpolicy/cpueviction/strategy/pressure_load_test.go @@ -41,6 +41,7 @@ import ( qrmstate "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/cpu/dynamicpolicy/state" "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/cpu/util" "github.com/kubewharf/katalyst-core/pkg/config" + "github.com/kubewharf/katalyst-core/pkg/config/agent/qrm/statedirectory" "github.com/kubewharf/katalyst-core/pkg/consts" "github.com/kubewharf/katalyst-core/pkg/metaserver" "github.com/kubewharf/katalyst-core/pkg/metaserver/agent" @@ -62,7 +63,9 @@ const ( defaultReservedForSystem = 0 ) -func makeMetaServer(metricsFetcher metrictypes.MetricsFetcher, cpuTopology *machine.CPUTopology) *metaserver.MetaServer { +func makeMetaServer( + metricsFetcher metrictypes.MetricsFetcher, cpuTopology *machine.CPUTopology, +) *metaserver.MetaServer { metaServer := &metaserver.MetaServer{ MetaAgent: &agent.MetaAgent{}, } @@ -75,7 +78,8 @@ func makeMetaServer(metricsFetcher metrictypes.MetricsFetcher, cpuTopology *mach return metaServer } -func makeConf(metricRingSize int, gracePeriod int64, loadUpperBoundRatio, loadLowerBoundRatio, +func makeConf( + metricRingSize int, gracePeriod int64, loadUpperBoundRatio, loadLowerBoundRatio, loadThresholdMetPercentage float64, reservedForReclaim, reservedForAllocate string, reservedForSystem int, ) *config.Configuration { conf := config.NewConfiguration() @@ -103,10 +107,13 @@ func makeConf(metricRingSize int, gracePeriod int64, loadUpperBoundRatio, loadLo func makeState(topo *machine.CPUTopology) (qrmstate.State, error) { tmpDir, err := os.MkdirTemp("", "checkpoint-makeState") + stateDirectoryConfig := &statedirectory.StateDirectoryConfiguration{ + StateFileDirectory: tmpDir, + } if err != nil { return nil, fmt.Errorf("make tmp dir for checkpoint failed with error: %v", err) } - return qrmstate.NewCheckpointState(tmpDir, "test", "test", topo, false, qrmstate.GenerateMachineStateFromPodEntries, metrics.DummyMetrics{}) + return qrmstate.NewCheckpointState(stateDirectoryConfig, "test", "test", topo, false, qrmstate.GenerateMachineStateFromPodEntries, metrics.DummyMetrics{}) } func TestNewCPUPressureLoadEviction(t *testing.T) { diff --git a/pkg/agent/qrm-plugins/cpu/dynamicpolicy/cpueviction/strategy/pressure_numa_test.go b/pkg/agent/qrm-plugins/cpu/dynamicpolicy/cpueviction/strategy/pressure_numa_test.go index 9e28ee6873..822a7ef27f 100644 --- a/pkg/agent/qrm-plugins/cpu/dynamicpolicy/cpueviction/strategy/pressure_numa_test.go +++ b/pkg/agent/qrm-plugins/cpu/dynamicpolicy/cpueviction/strategy/pressure_numa_test.go @@ -25,6 +25,8 @@ import ( "testing" "time" + "github.com/kubewharf/katalyst-core/pkg/config/agent/qrm/statedirectory" + "github.com/stretchr/testify/assert" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -96,7 +98,10 @@ func TestNumaCPUPressureEviction_update(t *testing.T) { defer os.RemoveAll(testingDir) cpuTopology, _ := machine.GenerateDummyCPUTopology(16, 2, 4) - state1, _ := state.NewCheckpointState(testingDir, "test", "test", cpuTopology, false, state.GenerateMachineStateFromPodEntries, metrics.DummyMetrics{}) + stateDirectoryConfig := &statedirectory.StateDirectoryConfiguration{ + StateFileDirectory: testingDir, + } + state1, _ := state.NewCheckpointState(stateDirectoryConfig, "test", "test", cpuTopology, false, state.GenerateMachineStateFromPodEntries, metrics.DummyMetrics{}) podEntry := state.PodEntries{ "pod1": state.ContainerEntries{ diff --git a/pkg/agent/qrm-plugins/cpu/dynamicpolicy/hintoptimizer/policy/canonical/optimizer_test.go b/pkg/agent/qrm-plugins/cpu/dynamicpolicy/hintoptimizer/policy/canonical/optimizer_test.go index 3505eab306..74b3901896 100644 --- a/pkg/agent/qrm-plugins/cpu/dynamicpolicy/hintoptimizer/policy/canonical/optimizer_test.go +++ b/pkg/agent/qrm-plugins/cpu/dynamicpolicy/hintoptimizer/policy/canonical/optimizer_test.go @@ -34,6 +34,7 @@ import ( hintoptimizerutil "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/cpu/dynamicpolicy/hintoptimizer/util" "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/cpu/dynamicpolicy/state" "github.com/kubewharf/katalyst-core/pkg/config" + "github.com/kubewharf/katalyst-core/pkg/config/agent/qrm/statedirectory" "github.com/kubewharf/katalyst-core/pkg/metrics" "github.com/kubewharf/katalyst-core/pkg/util/machine" ) @@ -50,7 +51,10 @@ func TestNewCanonicalHintOptimizer(t *testing.T) { require.NoError(t, err) defer os.RemoveAll(tmpDir) - stateImpl, err := state.NewCheckpointState(tmpDir, "test", "test", cpuTopology, false, state.GenerateMachineStateFromPodEntries, metrics.DummyMetrics{}) + stateDirectoryConfig := &statedirectory.StateDirectoryConfiguration{ + StateFileDirectory: tmpDir, + } + stateImpl, err := state.NewCheckpointState(stateDirectoryConfig, "test", "test", cpuTopology, false, state.GenerateMachineStateFromPodEntries, metrics.DummyMetrics{}) require.NoError(t, err) options := policy.HintOptimizerFactoryOptions{ @@ -77,6 +81,9 @@ func TestCanonicalHintOptimizer_OptimizeHints(t *testing.T) { tmpDir, _ := ioutil.TempDir("", "test-canonical-hint-optimizer") defer os.RemoveAll(tmpDir) + stateDirectoryConfig := &statedirectory.StateDirectoryConfiguration{ + StateFileDirectory: tmpDir, + } type fields struct { state state.State reservedCPUs machine.CPUSet @@ -142,7 +149,7 @@ func TestCanonicalHintOptimizer_OptimizeHints(t *testing.T) { 1: &state.NUMANodeState{DefaultCPUSet: machine.NewCPUSet(5, 6, 7, 8), PodEntries: make(state.PodEntries)}, } cpuTopology, _ := machine.GenerateDummyCPUTopology(16, 1, 2) // 2 NUMA node - st, _ := state.NewCheckpointState(tmpDir, "policy_packing", "test", cpuTopology, false, state.GenerateMachineStateFromPodEntries, metrics.DummyMetrics{}) + st, _ := state.NewCheckpointState(stateDirectoryConfig, "policy_packing", "test", cpuTopology, false, state.GenerateMachineStateFromPodEntries, metrics.DummyMetrics{}) st.SetMachineState(ms, false) return st }(), @@ -180,7 +187,7 @@ func TestCanonicalHintOptimizer_OptimizeHints(t *testing.T) { 1: &state.NUMANodeState{DefaultCPUSet: machine.NewCPUSet(5, 6, 7, 8), PodEntries: make(state.PodEntries)}, // 4 CPUs } cpuTopology, _ := machine.GenerateDummyCPUTopology(16, 1, 2) // 2 NUMA node - st, _ := state.NewCheckpointState(tmpDir, "policy_packing_one_NUMA_insufficient", "test", cpuTopology, false, state.GenerateMachineStateFromPodEntries, metrics.DummyMetrics{}) + st, _ := state.NewCheckpointState(stateDirectoryConfig, "policy_packing_one_NUMA_insufficient", "test", cpuTopology, false, state.GenerateMachineStateFromPodEntries, metrics.DummyMetrics{}) st.SetMachineState(ms, false) return st }(), @@ -217,7 +224,7 @@ func TestCanonicalHintOptimizer_OptimizeHints(t *testing.T) { 1: &state.NUMANodeState{DefaultCPUSet: machine.NewCPUSet(5, 6, 7, 8), PodEntries: make(state.PodEntries)}, // 4 CPUs, 3 left } cpuTopology, _ := machine.GenerateDummyCPUTopology(16, 1, 2) // 2 NUMA node - st, _ := state.NewCheckpointState(tmpDir, "policy_spreading", "test", cpuTopology, false, state.GenerateMachineStateFromPodEntries, metrics.DummyMetrics{}) + st, _ := state.NewCheckpointState(stateDirectoryConfig, "policy_spreading", "test", cpuTopology, false, state.GenerateMachineStateFromPodEntries, metrics.DummyMetrics{}) st.SetMachineState(ms, false) return st }(), @@ -255,7 +262,7 @@ func TestCanonicalHintOptimizer_OptimizeHints(t *testing.T) { 1: &state.NUMANodeState{DefaultCPUSet: machine.NewCPUSet(5, 6, 7, 8), PodEntries: make(state.PodEntries)}, // 4 CPUs, available ratio 1.0 } cpuTopology, _ := machine.GenerateDummyCPUTopology(16, 1, 2) // 2 NUMA node - st, _ := state.NewCheckpointState(tmpDir, "policy_dynamic_packing_all_above_threshold", "test", cpuTopology, false, state.GenerateMachineStateFromPodEntries, metrics.DummyMetrics{}) + st, _ := state.NewCheckpointState(stateDirectoryConfig, "policy_dynamic_packing_all_above_threshold", "test", cpuTopology, false, state.GenerateMachineStateFromPodEntries, metrics.DummyMetrics{}) st.SetMachineState(ms, false) return st }(), @@ -329,7 +336,7 @@ func TestCanonicalHintOptimizer_OptimizeHints(t *testing.T) { }, } cpuTopology, _ := machine.GenerateDummyCPUTopology(16, 1, 2) // 2 NUMA node - st, _ := state.NewCheckpointState(tmpDir, "policy_dynamic_packing_one_below_threshold", "test", cpuTopology, false, state.GenerateMachineStateFromPodEntries, metrics.DummyMetrics{}) + st, _ := state.NewCheckpointState(stateDirectoryConfig, "policy_dynamic_packing_one_below_threshold", "test", cpuTopology, false, state.GenerateMachineStateFromPodEntries, metrics.DummyMetrics{}) st.SetMachineState(ms, false) return st }(), @@ -403,7 +410,7 @@ func TestCanonicalHintOptimizer_OptimizeHints(t *testing.T) { }, } cpuTopology, _ := machine.GenerateDummyCPUTopology(16, 1, 2) // 2 NUMA node - st, _ := state.NewCheckpointState(tmpDir, "policy_dynamic_packing_all_below_threshold_fallback_to_spreading", "test", cpuTopology, false, state.GenerateMachineStateFromPodEntries, metrics.DummyMetrics{}) + st, _ := state.NewCheckpointState(stateDirectoryConfig, "policy_dynamic_packing_all_below_threshold_fallback_to_spreading", "test", cpuTopology, false, state.GenerateMachineStateFromPodEntries, metrics.DummyMetrics{}) st.SetMachineState(ms, false) return st }(), @@ -444,7 +451,7 @@ func TestCanonicalHintOptimizer_OptimizeHints(t *testing.T) { 1: &state.NUMANodeState{DefaultCPUSet: machine.NewCPUSet(5, 6, 7, 8), PodEntries: make(state.PodEntries)}, // 4 CPUs, 3 left } cpuTopology, _ := machine.GenerateDummyCPUTopology(16, 1, 2) // 2 NUMA node - st, _ := state.NewCheckpointState(tmpDir, "unknown_policy_fallback_to_spreading", "test", cpuTopology, false, state.GenerateMachineStateFromPodEntries, metrics.DummyMetrics{}) + st, _ := state.NewCheckpointState(stateDirectoryConfig, "unknown_policy_fallback_to_spreading", "test", cpuTopology, false, state.GenerateMachineStateFromPodEntries, metrics.DummyMetrics{}) st.SetMachineState(ms, false) return st }(), @@ -481,7 +488,7 @@ func TestCanonicalHintOptimizer_OptimizeHints(t *testing.T) { 1: &state.NUMANodeState{DefaultCPUSet: machine.NewCPUSet(5), PodEntries: make(state.PodEntries)}, // 1 CPU } cpuTopology, _ := machine.GenerateDummyCPUTopology(16, 1, 2) // 2 NUMA node - st, _ := state.NewCheckpointState(tmpDir, "no_available_NUMA_nodes_after_filtering", "test", cpuTopology, false, state.GenerateMachineStateFromPodEntries, metrics.DummyMetrics{}) + st, _ := state.NewCheckpointState(stateDirectoryConfig, "no_available_NUMA_nodes_after_filtering", "test", cpuTopology, false, state.GenerateMachineStateFromPodEntries, metrics.DummyMetrics{}) st.SetMachineState(ms, false) return st }(), @@ -516,7 +523,7 @@ func TestCanonicalHintOptimizer_OptimizeHints(t *testing.T) { 2: &state.NUMANodeState{DefaultCPUSet: machine.NewCPUSet(10), PodEntries: make(state.PodEntries)}, // 1 left } cpuTopology, _ := machine.GenerateDummyCPUTopology(12, 1, 3) // 3 NUMA node - st, _ := state.NewCheckpointState(tmpDir, "policy_packing_multiple_preferred_due_to_same_minLeft", "test", cpuTopology, false, state.GenerateMachineStateFromPodEntries, metrics.DummyMetrics{}) + st, _ := state.NewCheckpointState(stateDirectoryConfig, "policy_packing_multiple_preferred_due_to_same_minLeft", "test", cpuTopology, false, state.GenerateMachineStateFromPodEntries, metrics.DummyMetrics{}) st.SetMachineState(ms, false) return st }(), @@ -555,7 +562,7 @@ func TestCanonicalHintOptimizer_OptimizeHints(t *testing.T) { 2: &state.NUMANodeState{DefaultCPUSet: machine.NewCPUSet(10), PodEntries: make(state.PodEntries)}, // 1 left } cpuTopology, _ := machine.GenerateDummyCPUTopology(16, 1, 2) // 2 NUMA node - st, _ := state.NewCheckpointState(tmpDir, "policy_spreading_multiple_preferred_due_to_same_maxLeft", "test", cpuTopology, false, state.GenerateMachineStateFromPodEntries, metrics.DummyMetrics{}) + st, _ := state.NewCheckpointState(stateDirectoryConfig, "policy_spreading_multiple_preferred_due_to_same_maxLeft", "test", cpuTopology, false, state.GenerateMachineStateFromPodEntries, metrics.DummyMetrics{}) st.SetMachineState(ms, false) return st }(), @@ -595,7 +602,7 @@ func TestCanonicalHintOptimizer_OptimizeHints(t *testing.T) { 1: &state.NUMANodeState{DefaultCPUSet: machine.NewCPUSet(5, 6, 7, 8), PodEntries: make(state.PodEntries)}, } cpuTopology, _ := machine.GenerateDummyCPUTopology(16, 1, 2) // 2 NUMA node - st, _ := state.NewCheckpointState(tmpDir, "dynamic_packing_reserved_CPUs_affect_availability", "test", cpuTopology, false, state.GenerateMachineStateFromPodEntries, metrics.DummyMetrics{}) + st, _ := state.NewCheckpointState(stateDirectoryConfig, "dynamic_packing_reserved_CPUs_affect_availability", "test", cpuTopology, false, state.GenerateMachineStateFromPodEntries, metrics.DummyMetrics{}) st.SetMachineState(ms, false) return st }(), @@ -660,7 +667,7 @@ func TestCanonicalHintOptimizer_OptimizeHints(t *testing.T) { }, } cpuTopology, _ := machine.GenerateDummyCPUTopology(16, 1, 2) // 2 NUMA node - st, _ := state.NewCheckpointState(tmpDir, "dynamic_packing_one_NUMA_filtered_out_due_to_threshold_reserved_CPUs_considered", "test", cpuTopology, false, state.GenerateMachineStateFromPodEntries, metrics.DummyMetrics{}) + st, _ := state.NewCheckpointState(stateDirectoryConfig, "dynamic_packing_one_NUMA_filtered_out_due_to_threshold_reserved_CPUs_considered", "test", cpuTopology, false, state.GenerateMachineStateFromPodEntries, metrics.DummyMetrics{}) st.SetMachineState(ms, false) return st }(), @@ -702,7 +709,7 @@ func TestCanonicalHintOptimizer_OptimizeHints(t *testing.T) { 1: &state.NUMANodeState{DefaultCPUSet: machine.NewCPUSet(3, 4), PodEntries: make(state.PodEntries)}, } cpuTopology, _ := machine.GenerateDummyCPUTopology(16, 1, 2) // 2 NUMA node - st, _ := state.NewCheckpointState(tmpDir, "dynamic_packing_allocatableCPUQuantity_is_zero_for_a_NUMA", "test", cpuTopology, false, state.GenerateMachineStateFromPodEntries, metrics.DummyMetrics{}) + st, _ := state.NewCheckpointState(stateDirectoryConfig, "dynamic_packing_allocatableCPUQuantity_is_zero_for_a_NUMA", "test", cpuTopology, false, state.GenerateMachineStateFromPodEntries, metrics.DummyMetrics{}) st.SetMachineState(ms, false) return st }(), diff --git a/pkg/agent/qrm-plugins/cpu/dynamicpolicy/hintoptimizer/policy/memorybandwidth/optimizer_test.go b/pkg/agent/qrm-plugins/cpu/dynamicpolicy/hintoptimizer/policy/memorybandwidth/optimizer_test.go index d6360e7dac..dec0adc11d 100644 --- a/pkg/agent/qrm-plugins/cpu/dynamicpolicy/hintoptimizer/policy/memorybandwidth/optimizer_test.go +++ b/pkg/agent/qrm-plugins/cpu/dynamicpolicy/hintoptimizer/policy/memorybandwidth/optimizer_test.go @@ -37,6 +37,7 @@ import ( "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/cpu/dynamicpolicy/state" cpuutil "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/cpu/util" "github.com/kubewharf/katalyst-core/pkg/config" + "github.com/kubewharf/katalyst-core/pkg/config/agent/qrm/statedirectory" pkgconsts "github.com/kubewharf/katalyst-core/pkg/consts" "github.com/kubewharf/katalyst-core/pkg/metaserver" "github.com/kubewharf/katalyst-core/pkg/metaserver/agent" @@ -75,7 +76,10 @@ func TestNewMemoryBandwidthHintOptimizer(t *testing.T) { require.NoError(t, err) defer os.RemoveAll(tmpDir) - stateImpl, err := state.NewCheckpointState(tmpDir, "test", "test", cpuTopology, false, state.GenerateMachineStateFromPodEntries, metrics.DummyMetrics{}) + stateDirectoryConfig := &statedirectory.StateDirectoryConfiguration{ + StateFileDirectory: tmpDir, + } + stateImpl, err := state.NewCheckpointState(stateDirectoryConfig, "test", "test", cpuTopology, false, state.GenerateMachineStateFromPodEntries, metrics.DummyMetrics{}) require.NoError(t, err) type args struct { @@ -613,7 +617,10 @@ func TestMemoryBandwidthOptimizer_OptimizeHints(t *testing.T) { require.NoError(t, err) defer os.RemoveAll(tmpDir) - stateImpl, err := state.NewCheckpointState(tmpDir, "test", "test", cpuTopology, false, state.GenerateMachineStateFromPodEntries, dummyEmitter) + stateDirectoryConfig := &statedirectory.StateDirectoryConfiguration{ + StateFileDirectory: tmpDir, + } + stateImpl, err := state.NewCheckpointState(stateDirectoryConfig, "test", "test", cpuTopology, false, state.GenerateMachineStateFromPodEntries, dummyEmitter) require.NoError(t, err) type fields struct { @@ -678,7 +685,9 @@ func TestMemoryBandwidthOptimizer_OptimizeHints(t *testing.T) { mockey.Mock(hintoptimizerutil.GenericOptimizeHintsCheck).Return(nil).Build() // Simulate an error from a dependency of getNUMAAllocatedMemBW. mockey.Mock(spd.GetContainerMemoryBandwidthRequest). - To(func(profilingManager spd.ServiceProfilingManager, podMeta metav1.ObjectMeta, cpuRequest int) (int, error) { + To(func( + profilingManager spd.ServiceProfilingManager, podMeta metav1.ObjectMeta, cpuRequest int, + ) (int, error) { if podMeta.Name != a.request.PodName { return 0, fmt.Errorf("spd failed for current pod") } @@ -702,7 +711,9 @@ func TestMemoryBandwidthOptimizer_OptimizeHints(t *testing.T) { }, }, } - st, _ := state.NewCheckpointState(tmpDir, "test-state-err", "test", cpuTopology, false, func(_ *machine.CPUTopology, _ state.PodEntries, _ state.NUMANodeMap) (state.NUMANodeMap, error) { + st, _ := state.NewCheckpointState(stateDirectoryConfig, "test-state-err", "test", cpuTopology, false, func( + _ *machine.CPUTopology, _ state.PodEntries, _ state.NUMANodeMap, + ) (state.NUMANodeMap, error) { return ms, nil }, dummyEmitter) return st @@ -738,7 +749,9 @@ func TestMemoryBandwidthOptimizer_OptimizeHints(t *testing.T) { mocks: func(f *fields, a *args) { mockey.Mock(hintoptimizerutil.GenericOptimizeHintsCheck).Return(nil).Build() mockey.Mock(spd.GetContainerMemoryBandwidthRequest). - To(func(profilingManager spd.ServiceProfilingManager, podMeta metav1.ObjectMeta, cpuRequest int) (int, error) { + To(func( + profilingManager spd.ServiceProfilingManager, podMeta metav1.ObjectMeta, cpuRequest int, + ) (int, error) { if podMeta.Name == a.request.PodName { return 0, fmt.Errorf("spd failed for current pod") } @@ -776,7 +789,9 @@ func TestMemoryBandwidthOptimizer_OptimizeHints(t *testing.T) { mocks: func(f *fields, a *args) { mockey.Mock(hintoptimizerutil.GenericOptimizeHintsCheck).Return(nil).Build() mockey.Mock(spd.GetContainerMemoryBandwidthRequest). - To(func(profilingManager spd.ServiceProfilingManager, podMeta metav1.ObjectMeta, cpuRequest int) (int, error) { + To(func( + profilingManager spd.ServiceProfilingManager, podMeta metav1.ObjectMeta, cpuRequest int, + ) (int, error) { if podMeta.Name == a.request.PodName { return 0, nil } @@ -946,11 +961,13 @@ func TestMemoryBandwidthOptimizer_OptimizeHints(t *testing.T) { }, }}, } - st, _ := state.NewCheckpointState(tmpDir, "test-state-success", "test", cpuTopology, false, state.GenerateMachineStateFromPodEntries, dummyEmitter) + st, _ := state.NewCheckpointState(stateDirectoryConfig, "test-state-success", "test", cpuTopology, false, state.GenerateMachineStateFromPodEntries, dummyEmitter) st.SetMachineState(ms, false) return st }() - mockey.Mock(spd.GetContainerMemoryBandwidthRequest).To(func(profilingManager spd.ServiceProfilingManager, podMeta metav1.ObjectMeta, cpuRequest int) (int, error) { + mockey.Mock(spd.GetContainerMemoryBandwidthRequest).To(func( + profilingManager spd.ServiceProfilingManager, podMeta metav1.ObjectMeta, cpuRequest int, + ) (int, error) { switch podMeta.UID { case "existing-pod-0": // From getNUMAAllocatedMemBW return 100, nil @@ -1041,11 +1058,13 @@ func TestMemoryBandwidthOptimizer_OptimizeHints(t *testing.T) { }, }}, } - st, _ := state.NewCheckpointState(tmpDir, "test-state-spread", "test", cpuTopology, false, state.GenerateMachineStateFromPodEntries, dummyEmitter) + st, _ := state.NewCheckpointState(stateDirectoryConfig, "test-state-spread", "test", cpuTopology, false, state.GenerateMachineStateFromPodEntries, dummyEmitter) st.SetMachineState(ms, false) return st }() - mockey.Mock(spd.GetContainerMemoryBandwidthRequest).To(func(profilingManager spd.ServiceProfilingManager, podMeta metav1.ObjectMeta, cpuRequest int) (int, error) { + mockey.Mock(spd.GetContainerMemoryBandwidthRequest).To(func( + profilingManager spd.ServiceProfilingManager, podMeta metav1.ObjectMeta, cpuRequest int, + ) (int, error) { switch podMeta.UID { case "existing-pod-s0": return 100, nil @@ -1130,11 +1149,13 @@ func TestMemoryBandwidthOptimizer_OptimizeHints(t *testing.T) { 0: &state.NUMANodeState{PodEntries: state.PodEntries{"p0": state.ContainerEntries{"c0": &state.AllocationInfo{AllocationMeta: commonstate.AllocationMeta{PodUid: "p0", ContainerType: pluginapi.ContainerType_MAIN.String(), Labels: map[string]string{apiconsts.PodAnnotationMemoryEnhancementNumaBinding: apiconsts.PodAnnotationMemoryEnhancementNumaBindingEnable}, Annotations: map[string]string{apiconsts.PodAnnotationMemoryEnhancementNumaBinding: apiconsts.PodAnnotationMemoryEnhancementNumaBindingEnable}}, RequestQuantity: 1}}}}, 1: &state.NUMANodeState{PodEntries: state.PodEntries{"p1": state.ContainerEntries{"c1": &state.AllocationInfo{AllocationMeta: commonstate.AllocationMeta{PodUid: "p1", ContainerType: pluginapi.ContainerType_MAIN.String(), Labels: map[string]string{apiconsts.PodAnnotationMemoryEnhancementNumaBinding: apiconsts.PodAnnotationMemoryEnhancementNumaBindingEnable}, Annotations: map[string]string{apiconsts.PodAnnotationMemoryEnhancementNumaBinding: apiconsts.PodAnnotationMemoryEnhancementNumaBindingEnable}}, RequestQuantity: 1}}}}, } - st, _ := state.NewCheckpointState(tmpDir, "test-state-ign-neg", "test", cpuTopology, false, state.GenerateMachineStateFromPodEntries, dummyEmitter) + st, _ := state.NewCheckpointState(stateDirectoryConfig, "test-state-ign-neg", "test", cpuTopology, false, state.GenerateMachineStateFromPodEntries, dummyEmitter) st.SetMachineState(ms, false) return st }() - mockey.Mock(spd.GetContainerMemoryBandwidthRequest).To(func(profilingManager spd.ServiceProfilingManager, podMeta metav1.ObjectMeta, cpuRequest int) (int, error) { + mockey.Mock(spd.GetContainerMemoryBandwidthRequest).To(func( + profilingManager spd.ServiceProfilingManager, podMeta metav1.ObjectMeta, cpuRequest int, + ) (int, error) { switch podMeta.UID { case "p0": return 100, nil diff --git a/pkg/agent/qrm-plugins/cpu/dynamicpolicy/hintoptimizer/policy/metricbased/optimizer_test.go b/pkg/agent/qrm-plugins/cpu/dynamicpolicy/hintoptimizer/policy/metricbased/optimizer_test.go index 9bb7f56925..cacf0b9e2b 100644 --- a/pkg/agent/qrm-plugins/cpu/dynamicpolicy/hintoptimizer/policy/metricbased/optimizer_test.go +++ b/pkg/agent/qrm-plugins/cpu/dynamicpolicy/hintoptimizer/policy/metricbased/optimizer_test.go @@ -41,6 +41,7 @@ import ( "github.com/kubewharf/katalyst-core/pkg/config" "github.com/kubewharf/katalyst-core/pkg/config/agent/dynamic" "github.com/kubewharf/katalyst-core/pkg/config/agent/dynamic/metricthreshold" + "github.com/kubewharf/katalyst-core/pkg/config/agent/qrm/statedirectory" "github.com/kubewharf/katalyst-core/pkg/consts" "github.com/kubewharf/katalyst-core/pkg/metaserver" "github.com/kubewharf/katalyst-core/pkg/metaserver/agent" @@ -69,7 +70,11 @@ func TestNewMetricBasedHintOptimizer(t *testing.T) { require.NoError(t, err) defer os.RemoveAll(tmpDir) - stateImpl, err := state.NewCheckpointState(tmpDir, "test", "test", metaServer.CPUTopology, false, state.GenerateMachineStateFromPodEntries, emitter) + stateDirectoryConfig := &statedirectory.StateDirectoryConfiguration{ + StateFileDirectory: tmpDir, + } + + stateImpl, err := state.NewCheckpointState(stateDirectoryConfig, "test", "test", metaServer.CPUTopology, false, state.GenerateMachineStateFromPodEntries, emitter) require.NoError(t, err) options := policy.HintOptimizerFactoryOptions{ @@ -191,8 +196,11 @@ func TestMetricBasedHintOptimizer_OptimizeHints(t *testing.T) { state: func() state.State { tmpDir, _ := ioutil.TempDir("", "test-state") defer os.RemoveAll(tmpDir) + stateDirectoryConfig := &statedirectory.StateDirectoryConfiguration{ + StateFileDirectory: tmpDir, + } cpuTopology, _ := machine.GenerateDummyCPUTopology(4, 1, 1) // 1 NUMA node - st, _ := state.NewCheckpointState(tmpDir, "test", "test", cpuTopology, false, state.GenerateMachineStateFromPodEntries, metrics.DummyMetrics{}) + st, _ := state.NewCheckpointState(stateDirectoryConfig, "test", "test", cpuTopology, false, state.GenerateMachineStateFromPodEntries, metrics.DummyMetrics{}) return st }(), numaMetrics: func() map[int]cpuUtil.SubEntries { @@ -259,7 +267,10 @@ func TestMetricBasedHintOptimizer_OptimizeHints(t *testing.T) { tmpDir, _ := ioutil.TempDir("", "test-state-fail") defer os.RemoveAll(tmpDir) cpuTopology, _ := machine.GenerateDummyCPUTopology(16, 1, 1) - st, _ := state.NewCheckpointState(tmpDir, "test", "test", cpuTopology, false, state.GenerateMachineStateFromPodEntries, metrics.DummyMetrics{}) + stateDirectoryConfig := &statedirectory.StateDirectoryConfiguration{ + StateFileDirectory: tmpDir, + } + st, _ := state.NewCheckpointState(stateDirectoryConfig, "test", "test", cpuTopology, false, state.GenerateMachineStateFromPodEntries, metrics.DummyMetrics{}) return st }(), }, @@ -324,8 +335,11 @@ func TestMetricBasedHintOptimizer_OptimizeHints(t *testing.T) { state: func() state.State { tmpDir, _ := ioutil.TempDir("", "test-state-over") defer os.RemoveAll(tmpDir) + stateDirectoryConfig := &statedirectory.StateDirectoryConfiguration{ + StateFileDirectory: tmpDir, + } cpuTopology, _ := machine.GenerateDummyCPUTopology(4, 1, 2) - st, _ := state.NewCheckpointState(tmpDir, "test", "test", cpuTopology, false, state.GenerateMachineStateFromPodEntries, metrics.DummyMetrics{}) + st, _ := state.NewCheckpointState(stateDirectoryConfig, "test", "test", cpuTopology, false, state.GenerateMachineStateFromPodEntries, metrics.DummyMetrics{}) return st }(), numaMetrics: func() map[int]cpuUtil.SubEntries { @@ -563,8 +577,11 @@ func TestMetricBasedHintOptimizer_collectNUMAMetrics(t *testing.T) { tmpDir, err := ioutil.TempDir("", "checkpoint-TestCollectNUMAMetrics") require.NoError(t, err) defer os.RemoveAll(tmpDir) + stateDirectoryConfig := &statedirectory.StateDirectoryConfiguration{ + StateFileDirectory: tmpDir, + } - stateImpl, err := state.NewCheckpointState(tmpDir, "test", "test", cpuTopology, false, state.GenerateMachineStateFromPodEntries, metrics.DummyMetrics{}) + stateImpl, err := state.NewCheckpointState(stateDirectoryConfig, "test", "test", cpuTopology, false, state.GenerateMachineStateFromPodEntries, metrics.DummyMetrics{}) require.NoError(t, err) podUID1 := "pod1" diff --git a/pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy.go b/pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy.go index ef7a7db880..3c61d79b70 100644 --- a/pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy.go +++ b/pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy.go @@ -156,7 +156,7 @@ func NewDynamicPolicy(agentCtx *agent.GenericContext, conf *config.Configuration Val: cpuconsts.CPUResourcePluginPolicyNameDynamic, }) - stateImpl, stateErr := state.NewCheckpointState(conf.GenericQRMPluginConfiguration.StateFileDirectory, cpuPluginStateFileName, + stateImpl, stateErr := state.NewCheckpointState(conf.StateDirectoryConfiguration, cpuPluginStateFileName, cpuconsts.CPUResourcePluginPolicyNameDynamic, agentCtx.CPUTopology, conf.SkipCPUStateCorruption, state.GenerateMachineStateFromPodEntries, wrappedEmitter) if stateErr != nil { return false, agent.ComponentStub{}, fmt.Errorf("NewCheckpointState failed with error: %v", stateErr) diff --git a/pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy_test.go b/pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy_test.go index 0c12d62a97..8046535b40 100644 --- a/pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy_test.go +++ b/pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy_test.go @@ -57,6 +57,7 @@ import ( "github.com/kubewharf/katalyst-core/pkg/agent/utilcomponent/featuregatenegotiation" "github.com/kubewharf/katalyst-core/pkg/config" "github.com/kubewharf/katalyst-core/pkg/config/agent/dynamic" + "github.com/kubewharf/katalyst-core/pkg/config/agent/qrm/statedirectory" "github.com/kubewharf/katalyst-core/pkg/config/generic" "github.com/kubewharf/katalyst-core/pkg/metaserver" "github.com/kubewharf/katalyst-core/pkg/metaserver/agent" @@ -94,7 +95,9 @@ func generateSharedNumaBindingPoolAllocationMeta(poolName string) commonstate.Al return meta } -func getTestDynamicPolicyWithInitialization(topology *machine.CPUTopology, stateFileDirectory string) (*DynamicPolicy, error) { +func getTestDynamicPolicyWithInitialization( + topology *machine.CPUTopology, stateFileDirectory string, +) (*DynamicPolicy, error) { dynamicPolicy, err := getTestDynamicPolicyWithoutInitialization(topology, stateFileDirectory) if err != nil { return nil, err @@ -114,8 +117,13 @@ func getTestDynamicPolicyWithInitialization(topology *machine.CPUTopology, state return dynamicPolicy, nil } -func getTestDynamicPolicyWithoutInitialization(topology *machine.CPUTopology, stateFileDirectory string) (*DynamicPolicy, error) { - stateImpl, err := state.NewCheckpointState(stateFileDirectory, cpuPluginStateFileName, cpuconsts.CPUResourcePluginPolicyNameDynamic, topology, false, state.GenerateMachineStateFromPodEntries, metrics.DummyMetrics{}) +func getTestDynamicPolicyWithoutInitialization( + topology *machine.CPUTopology, stateFileDirectory string, +) (*DynamicPolicy, error) { + stateDirectoryConfig := &statedirectory.StateDirectoryConfiguration{ + StateFileDirectory: stateFileDirectory, + } + stateImpl, err := state.NewCheckpointState(stateDirectoryConfig, cpuPluginStateFileName, cpuconsts.CPUResourcePluginPolicyNameDynamic, topology, false, state.GenerateMachineStateFromPodEntries, metrics.DummyMetrics{}) if err != nil { return nil, err } @@ -6210,7 +6218,9 @@ type mockCPUAdvisor struct { advisorapi.CPUAdvisorServer } -func (m *mockCPUAdvisor) GetAdvice(ctx context.Context, in *advisorapi.GetAdviceRequest) (*advisorapi.GetAdviceResponse, error) { +func (m *mockCPUAdvisor) GetAdvice( + ctx context.Context, in *advisorapi.GetAdviceRequest, +) (*advisorapi.GetAdviceResponse, error) { args := m.Called(ctx, in) return args.Get(0).(*advisorapi.GetAdviceResponse), args.Error(1) } @@ -6220,11 +6230,15 @@ func (m *mockCPUAdvisor) ListAndWatch(in *advisorsvc.Empty, srv advisorapi.CPUAd return args.Error(0) } -func (m *mockCPUAdvisor) AddContainer(ctx context.Context, req *advisorsvc.ContainerMetadata) (*advisorsvc.AddContainerResponse, error) { +func (m *mockCPUAdvisor) AddContainer( + ctx context.Context, req *advisorsvc.ContainerMetadata, +) (*advisorsvc.AddContainerResponse, error) { return &advisorsvc.AddContainerResponse{}, nil } -func (m *mockCPUAdvisor) RemovePod(ctx context.Context, req *advisorsvc.RemovePodRequest) (*advisorsvc.RemovePodResponse, error) { +func (m *mockCPUAdvisor) RemovePod( + ctx context.Context, req *advisorsvc.RemovePodRequest, +) (*advisorsvc.RemovePodResponse, error) { return &advisorsvc.RemovePodResponse{}, nil } @@ -6482,7 +6496,9 @@ func TestNewDynamicPolicy(t *testing.T) { t.Fatal(err) } defer os.RemoveAll(testingDir) - conf.GenericQRMPluginConfiguration.StateFileDirectory = testingDir + conf.GenericQRMPluginConfiguration.StateDirectoryConfiguration = &statedirectory.StateDirectoryConfiguration{ + StateFileDirectory: testingDir, + } type args struct { agentCtx *componentagent.GenericContext conf *config.Configuration diff --git a/pkg/agent/qrm-plugins/cpu/dynamicpolicy/state/state_checkpoint.go b/pkg/agent/qrm-plugins/cpu/dynamicpolicy/state/state_checkpoint.go index c1678cea8d..40357959b6 100644 --- a/pkg/agent/qrm-plugins/cpu/dynamicpolicy/state/state_checkpoint.go +++ b/pkg/agent/qrm-plugins/cpu/dynamicpolicy/state/state_checkpoint.go @@ -17,6 +17,7 @@ limitations under the License. package state import ( + stdErrors "errors" "fmt" "path" "reflect" @@ -24,12 +25,13 @@ import ( "time" "k8s.io/klog/v2" - "k8s.io/kubernetes/pkg/kubelet/checkpointmanager" "k8s.io/kubernetes/pkg/kubelet/checkpointmanager/errors" + "github.com/kubewharf/katalyst-core/pkg/config/agent/qrm/statedirectory" "github.com/kubewharf/katalyst-core/pkg/metrics" "github.com/kubewharf/katalyst-core/pkg/util/general" "github.com/kubewharf/katalyst-core/pkg/util/machine" + "github.com/kubewharf/katalyst-core/pkg/util/qrmcheckpointmanager" ) const ( @@ -41,56 +43,66 @@ const ( // go to in-memory State, and then go to disk State, i.e. in write-back mode type stateCheckpoint struct { sync.RWMutex - cache *cpuPluginState - policyName string - checkpointManager checkpointmanager.CheckpointManager - checkpointName string + cache *cpuPluginState + policyName string + qrmCheckpointManager *qrmcheckpointmanager.QRMCheckpointManager + checkpointName string // when we add new properties to checkpoint, // it will cause checkpoint corruption, and we should skip it skipStateCorruption bool GenerateMachineStateFromPodEntries GenerateMachineStateFromPodEntriesFunc emitter metrics.MetricEmitter + hasPreStop bool } var _ State = &stateCheckpoint{} -func NewCheckpointState(stateDir, checkpointName, policyName string, - topology *machine.CPUTopology, skipStateCorruption bool, generateMachineStateFunc GenerateMachineStateFromPodEntriesFunc, +func NewCheckpointState( + stateDirectoryConfig *statedirectory.StateDirectoryConfiguration, checkpointName, policyName string, + topology *machine.CPUTopology, skipStateCorruption bool, + generateMachineStateFunc GenerateMachineStateFromPodEntriesFunc, emitter metrics.MetricEmitter, ) (State, error) { - checkpointManager, err := checkpointmanager.NewCheckpointManager(stateDir) + currentStateDir, otherStateDir := stateDirectoryConfig.GetCurrentAndPreviousStateFileDirectory() + hasPreStop := stateDirectoryConfig.HasPreStop + + qrmCheckpointManager, err := qrmcheckpointmanager.NewQRMCheckpointManager(currentStateDir, otherStateDir, checkpointName, "cpu_plugin") if err != nil { return nil, fmt.Errorf("failed to initialize checkpoint manager: %v", err) } - sc := &stateCheckpoint{ cache: NewCPUPluginState(topology), policyName: policyName, - checkpointManager: checkpointManager, + qrmCheckpointManager: qrmCheckpointManager, checkpointName: checkpointName, skipStateCorruption: skipStateCorruption, GenerateMachineStateFromPodEntries: generateMachineStateFunc, emitter: emitter, + hasPreStop: hasPreStop, } - if err := sc.RestoreState(topology); err != nil { + if err := sc.restoreState(topology); err != nil { return nil, fmt.Errorf("could not restore state from checkpoint: %v, please drain this node and delete "+ - "the cpu plugin checkpoint file %q before restarting Kubelet", err, path.Join(stateDir, checkpointName)) + "the cpu plugin checkpoint file %q before restarting Kubelet", err, path.Join(currentStateDir, checkpointName)) } + return sc, nil } -func (sc *stateCheckpoint) RestoreState(topology *machine.CPUTopology) error { +// restoreState is first done by searching the current directory for the state file. +// If it does not exist, we search the other directory for the state file and try to migrate the state file over to the current directory. +func (sc *stateCheckpoint) restoreState(topology *machine.CPUTopology) error { sc.Lock() defer sc.Unlock() var err error var foundAndSkippedStateCorruption bool checkpoint := NewCPUPluginCheckpoint() - if err = sc.checkpointManager.GetCheckpoint(sc.checkpointName, checkpoint); err != nil { - if err == errors.ErrCheckpointNotFound { - return sc.storeState() - } else if err == errors.ErrCorruptCheckpoint { + if err = sc.qrmCheckpointManager.GetCurrentCheckpoint(sc.checkpointName, checkpoint, true); err != nil { + if stdErrors.Is(err, errors.ErrCheckpointNotFound) { + // We cannot find checkpoint, so it is possible that previous checkpoint was stored in either disk or memory + return sc.tryMigrateState(topology, checkpoint) + } else if stdErrors.Is(err, errors.ErrCorruptCheckpoint) { if !sc.skipStateCorruption { return err } @@ -102,13 +114,22 @@ func (sc *stateCheckpoint) RestoreState(topology *machine.CPUTopology) error { } } + _, err = sc.updateCacheAndReturnChanged(topology, checkpoint, foundAndSkippedStateCorruption) + return err +} + +// updateCacheAndReturnChanged updates the cache and returns whether the state has changed +func (sc *stateCheckpoint) updateCacheAndReturnChanged( + topology *machine.CPUTopology, checkpoint *CPUPluginCheckpoint, foundAndSkippedStateCorruption bool, +) (bool, error) { + var hasStateChanged bool if sc.policyName != checkpoint.PolicyName && !sc.skipStateCorruption { - return fmt.Errorf("[cpu_plugin] configured policy %q differs from state checkpoint policy %q", sc.policyName, checkpoint.PolicyName) + return hasStateChanged, fmt.Errorf("[cpu_plugin] configured policy %q differs from state checkpoint policy %q", sc.policyName, checkpoint.PolicyName) } generatedMachineState, err := sc.GenerateMachineStateFromPodEntries(topology, checkpoint.PodEntries, checkpoint.MachineState) if err != nil { - return fmt.Errorf("GenerateMachineStateFromPodEntries failed with error: %v", err) + return hasStateChanged, fmt.Errorf("GenerateMachineStateFromPodEntries failed with error: %v", err) } sc.cache.SetMachineState(generatedMachineState) @@ -119,21 +140,70 @@ func (sc *stateCheckpoint) RestoreState(topology *machine.CPUTopology) error { if !reflect.DeepEqual(generatedMachineState, checkpoint.MachineState) { klog.Warningf("[cpu_plugin] machine state changed: generatedMachineState: %s; checkpointMachineState: %s", generatedMachineState.String(), checkpoint.MachineState.String()) + hasStateChanged = true err = sc.storeState() if err != nil { - return fmt.Errorf("storeState when machine state changed failed with error: %v", err) + return hasStateChanged, fmt.Errorf("storeState when machine state changed failed with error: %v", err) } } if foundAndSkippedStateCorruption { klog.Infof("[cpu_plugin] found and skipped state corruption, we should store to rectify the checksum") + hasStateChanged = true err = sc.storeState() if err != nil { - return fmt.Errorf("storeState failed with error: %v", err) + return hasStateChanged, fmt.Errorf("storeState failed with error after skipping corruption: %v", err) } } klog.InfoS("[cpu_plugin] State checkpoint: restored state from checkpoint") + return hasStateChanged, nil +} + +// tryMigrateState tries to migrate the state file from the other directory to current directory. +// If the other directory does not have a state file, then we build a new checkpoint. +func (sc *stateCheckpoint) tryMigrateState( + topology *machine.CPUTopology, checkpoint *CPUPluginCheckpoint, +) error { + var foundAndSkippedStateCorruption bool + klog.Infof("[cpu_plugin] trying to migrate state") + + // Do not migrate and build new checkpoint if there is no pre-stop script + if !sc.hasPreStop { + return sc.storeState() + } + + if err := sc.qrmCheckpointManager.GetPreviousCheckpoint(sc.checkpointName, checkpoint); err != nil { + if stdErrors.Is(err, errors.ErrCheckpointNotFound) { + // Old checkpoint file is not found, so we just store state in new checkpoint + general.Infof("[cpu_plugin] checkpoint %v doesn't exist, create it", sc.checkpointName) + return sc.storeState() + } else if stdErrors.Is(err, errors.ErrCorruptCheckpoint) { + if !sc.skipStateCorruption { + return err + } + foundAndSkippedStateCorruption = true + klog.Warningf("[cpu_plugin] restore checkpoint failed with err: %s, but we skip it", err) + } else { + return err + } + } + + hasStateChanged, err := sc.updateCacheAndReturnChanged(topology, checkpoint, foundAndSkippedStateCorruption) + if err != nil { + return fmt.Errorf("[cpu_plugin] failed to populate checkpoint state during state migration: %v", err) + } + + // always store state after migrating to new checkpoint + if err := sc.storeState(); err != nil { + return fmt.Errorf("[cpu_plugin] failed to store checkpoint state during end of migration: %v", err) + } + + if err := sc.qrmCheckpointManager.ValidateCheckpointFilesMigration(hasStateChanged); err != nil { + return fmt.Errorf("[cpu_plugin] ValidateCheckpointFilesMigration failed with error: %v", err) + } + + klog.Infof("[cpu_plugin] migrate checkpoint succeeded") return nil } @@ -158,7 +228,7 @@ func (sc *stateCheckpoint) storeState() error { checkpoint.PodEntries = sc.cache.GetPodEntries() checkpoint.AllowSharedCoresOverlapReclaimedCores = sc.cache.GetAllowSharedCoresOverlapReclaimedCores() - err := sc.checkpointManager.CreateCheckpoint(sc.checkpointName, checkpoint) + err := sc.qrmCheckpointManager.CreateCheckpoint(sc.checkpointName, checkpoint) if err != nil { klog.ErrorS(err, "Could not save checkpoint") return err @@ -220,7 +290,9 @@ func (sc *stateCheckpoint) SetNUMAHeadroom(m map[int]float64, persist bool) { } } -func (sc *stateCheckpoint) SetAllocationInfo(podUID string, containerName string, allocationInfo *AllocationInfo, persist bool) { +func (sc *stateCheckpoint) SetAllocationInfo( + podUID string, containerName string, allocationInfo *AllocationInfo, persist bool, +) { sc.Lock() defer sc.Unlock() diff --git a/pkg/agent/qrm-plugins/cpu/dynamicpolicy/state/state_test.go b/pkg/agent/qrm-plugins/cpu/dynamicpolicy/state/state_test.go index 0aeb547f5f..34662ade21 100644 --- a/pkg/agent/qrm-plugins/cpu/dynamicpolicy/state/state_test.go +++ b/pkg/agent/qrm-plugins/cpu/dynamicpolicy/state/state_test.go @@ -22,6 +22,7 @@ import ( "io/ioutil" "math" "os" + "path/filepath" "reflect" "strings" "testing" @@ -32,14 +33,17 @@ import ( "k8s.io/klog/v2" pluginapi "k8s.io/kubelet/pkg/apis/resourceplugin/v1alpha1" "k8s.io/kubernetes/pkg/kubelet/checkpointmanager" + "k8s.io/kubernetes/pkg/kubelet/checkpointmanager/errors" testutil "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state/testing" "github.com/kubewharf/katalyst-api/pkg/consts" apiconsts "github.com/kubewharf/katalyst-api/pkg/consts" "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/commonstate" cpuconsts "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/cpu/consts" + "github.com/kubewharf/katalyst-core/pkg/config/agent/qrm/statedirectory" "github.com/kubewharf/katalyst-core/pkg/metrics" "github.com/kubewharf/katalyst-core/pkg/util/machine" + "github.com/kubewharf/katalyst-core/pkg/util/qrmcheckpointmanager" ) const ( @@ -1473,6 +1477,9 @@ func TestNewCheckpointState(t *testing.T) { t.Fatal(err) } defer os.RemoveAll(testingDir) + stateDirectoryConfig := &statedirectory.StateDirectoryConfiguration{ + StateFileDirectory: testingDir, + } // create checkpoint manager for testing cpm, err := checkpointmanager.NewCheckpointManager(testingDir) @@ -1488,7 +1495,7 @@ func TestNewCheckpointState(t *testing.T) { require.NoError(t, cpm.CreateCheckpoint(cpuPluginStateFileName, checkpoint), "could not create testing checkpoint") } - restoredState, err := NewCheckpointState(testingDir, cpuPluginStateFileName, policyName, cpuTopology, false, GenerateMachineStateFromPodEntries, metrics.DummyMetrics{}) + restoredState, err := NewCheckpointState(stateDirectoryConfig, cpuPluginStateFileName, policyName, cpuTopology, false, GenerateMachineStateFromPodEntries, metrics.DummyMetrics{}) if strings.TrimSpace(tc.expectedError) != "" { require.Error(t, err) require.Contains(t, err.Error(), "could not restore state from checkpoint:") @@ -1496,7 +1503,7 @@ func TestNewCheckpointState(t *testing.T) { // test skip corruption if strings.Contains(err.Error(), "checkpoint is corrupted") { - _, err = NewCheckpointState(testingDir, cpuPluginStateFileName, policyName, cpuTopology, true, GenerateMachineStateFromPodEntries, metrics.DummyMetrics{}) + _, err = NewCheckpointState(stateDirectoryConfig, cpuPluginStateFileName, policyName, cpuTopology, true, GenerateMachineStateFromPodEntries, metrics.DummyMetrics{}) require.Nil(t, err) } } else { @@ -2013,8 +2020,11 @@ func TestClearState(t *testing.T) { t.Fatal(err) } defer os.RemoveAll(testingDir) + stateDirectoryConfig := &statedirectory.StateDirectoryConfiguration{ + StateFileDirectory: testingDir, + } - state1, err := NewCheckpointState(testingDir, cpuPluginStateFileName, policyName, tc.cpuTopology, false, GenerateMachineStateFromPodEntries, metrics.DummyMetrics{}) + state1, err := NewCheckpointState(stateDirectoryConfig, cpuPluginStateFileName, policyName, tc.cpuTopology, false, GenerateMachineStateFromPodEntries, metrics.DummyMetrics{}) as.Nil(err) state1.ClearState() @@ -2022,7 +2032,7 @@ func TestClearState(t *testing.T) { state1.SetMachineState(tc.machineState, true) state1.SetPodEntries(tc.podEntries, true) - state2, err := NewCheckpointState(testingDir, cpuPluginStateFileName, policyName, tc.cpuTopology, false, GenerateMachineStateFromPodEntries, metrics.DummyMetrics{}) + state2, err := NewCheckpointState(stateDirectoryConfig, cpuPluginStateFileName, policyName, tc.cpuTopology, false, GenerateMachineStateFromPodEntries, metrics.DummyMetrics{}) as.Nil(err) assertStateEqual(t, state2, state1) }) @@ -2525,13 +2535,16 @@ func TestCheckpointStateHelpers(t *testing.T) { t.Fatal(err) } defer os.RemoveAll(testingDir) + stateDirectoryConfig := &statedirectory.StateDirectoryConfiguration{ + StateFileDirectory: testingDir, + } for _, tc := range testCases { tc := tc t.Run(tc.description, func(t *testing.T) { t.Parallel() - state, err := NewCheckpointState(testingDir, cpuPluginStateFileName, policyName, tc.cpuTopology, false, GenerateMachineStateFromPodEntries, metrics.DummyMetrics{}) + state, err := NewCheckpointState(stateDirectoryConfig, cpuPluginStateFileName, policyName, tc.cpuTopology, false, GenerateMachineStateFromPodEntries, metrics.DummyMetrics{}) as.Nil(err) state.ClearState() @@ -3260,3 +3273,124 @@ func TestGetWriteOnlyState(t *testing.T) { as.NotNil(err) } } + +func generateTestMachineStateFromPodEntries( + topology *machine.CPUTopology, _ PodEntries, _ NUMANodeMap, +) (NUMANodeMap, error) { + return GetDefaultMachineState(topology), nil +} + +func TestTryMigrateState(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + preStop bool + corruptFile bool + expectEqual bool + expectOldExists bool + }{ + { + name: "successful migration with pre-stop", + preStop: true, + corruptFile: false, + expectEqual: true, + expectOldExists: false, + }, + { + name: "migration without pre-stop", + preStop: false, + corruptFile: false, + expectEqual: false, + expectOldExists: true, + }, + { + name: "corrupted checkpoint", + preStop: true, + corruptFile: true, + expectEqual: false, + expectOldExists: true, + }, + } + + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + tmpDir := t.TempDir() + inMemoryTmpDir := t.TempDir() + + stateDir := filepath.Join(tmpDir, "state") + err := os.MkdirAll(stateDir, 0o775) + assert.NoError(t, err) + + cpuTopology, err := machine.GenerateDummyCPUTopology(16, 2, 4) + assert.NoError(t, err) + + policyName := "test-policy" + checkpointName := "test-checkpoint" + + // create old checkpoint manager to save the checkpoint + oldCheckpointManager, err := checkpointmanager.NewCheckpointManager(stateDir) + assert.NoError(t, err) + + oldCheckpoint := NewCPUPluginCheckpoint() + if tt.corruptFile { + // create a corrupted old checkpoint + corruptedFile := filepath.Join(stateDir, fmt.Sprintf("%s", checkpointName)) + err = ioutil.WriteFile(corruptedFile, []byte("corrupted data"), 0o644) + assert.NoError(t, err) + } else { + oldCheckpoint.PolicyName = policyName + oldCheckpoint.MachineState = GetDefaultMachineState(cpuTopology) + err = oldCheckpointManager.CreateCheckpoint(checkpointName, oldCheckpoint) + assert.NoError(t, err) + } + + // create a new state checkpoint with new checkpoint manager + sc := &stateCheckpoint{ + policyName: policyName, + checkpointName: checkpointName, + cache: NewCPUPluginState(cpuTopology), + skipStateCorruption: false, + GenerateMachineStateFromPodEntries: generateTestMachineStateFromPodEntries, + emitter: metrics.DummyMetrics{}, + hasPreStop: tt.preStop, + } + + // current checkpoint is pointing to the in memory directory + sc.qrmCheckpointManager, err = qrmcheckpointmanager.NewQRMCheckpointManager(inMemoryTmpDir, stateDir, checkpointName, "cpu_plugin") + assert.NoError(t, err) + + newCheckpoint := NewCPUPluginCheckpoint() + err = sc.tryMigrateState(cpuTopology, newCheckpoint) + + if tt.corruptFile { + assert.Error(t, err) + return + } + assert.NoError(t, err) + + // check if new checkpoint is created and verify equality + err = sc.qrmCheckpointManager.GetCurrentCheckpoint(checkpointName, newCheckpoint, false) + assert.NoError(t, err) + + // verify old checkpoint file existence + checkpoint := NewCPUPluginCheckpoint() + err = oldCheckpointManager.GetCheckpoint(checkpointName, checkpoint) + + if tt.expectOldExists { + assert.NoError(t, err) + } else { + assert.Error(t, err) + assert.Equal(t, errors.ErrCheckpointNotFound, err) + } + + if tt.expectEqual { + assert.Equal(t, newCheckpoint, oldCheckpoint) + } else { + assert.NotEqual(t, newCheckpoint, oldCheckpoint) + } + }) + } +} diff --git a/pkg/agent/qrm-plugins/cpu/nativepolicy/policy.go b/pkg/agent/qrm-plugins/cpu/nativepolicy/policy.go index 1775a8acc8..e220770356 100644 --- a/pkg/agent/qrm-plugins/cpu/nativepolicy/policy.go +++ b/pkg/agent/qrm-plugins/cpu/nativepolicy/policy.go @@ -97,7 +97,7 @@ func NewNativePolicy(agentCtx *agent.GenericContext, conf *config.Configuration, Val: cpuconsts.CPUResourcePluginPolicyNameNative, }) - stateImpl, stateErr := state.NewCheckpointState(conf.GenericQRMPluginConfiguration.StateFileDirectory, cpuPluginStateFileName, + stateImpl, stateErr := state.NewCheckpointState(conf.StateDirectoryConfiguration, cpuPluginStateFileName, cpuconsts.CPUResourcePluginPolicyNameNative, agentCtx.CPUTopology, conf.SkipCPUStateCorruption, nativepolicyutil.GenerateMachineStateFromPodEntries, wrappedEmitter) if stateErr != nil { return false, agent.ComponentStub{}, fmt.Errorf("NewCheckpointState failed with error: %v", stateErr) diff --git a/pkg/agent/qrm-plugins/cpu/nativepolicy/policy_test.go b/pkg/agent/qrm-plugins/cpu/nativepolicy/policy_test.go index b42d1adac0..44f59ac410 100644 --- a/pkg/agent/qrm-plugins/cpu/nativepolicy/policy_test.go +++ b/pkg/agent/qrm-plugins/cpu/nativepolicy/policy_test.go @@ -33,6 +33,7 @@ import ( nativepolicyutil "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/cpu/nativepolicy/util" "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/util" "github.com/kubewharf/katalyst-core/pkg/config/agent/dynamic" + "github.com/kubewharf/katalyst-core/pkg/config/agent/qrm/statedirectory" "github.com/kubewharf/katalyst-core/pkg/metrics" "github.com/kubewharf/katalyst-core/pkg/util/machine" ) @@ -42,7 +43,10 @@ const ( ) func getTestNativePolicy(topology *machine.CPUTopology, stateFileDirectory string) (*NativePolicy, error) { - stateImpl, err := state.NewCheckpointState(stateFileDirectory, cpuPluginStateFileName, + stateDirectoryConfig := &statedirectory.StateDirectoryConfiguration{ + StateFileDirectory: stateFileDirectory, + } + stateImpl, err := state.NewCheckpointState(stateDirectoryConfig, cpuPluginStateFileName, cpuconsts.CPUResourcePluginPolicyNameNative, topology, false, nativepolicyutil.GenerateMachineStateFromPodEntries, metrics.DummyMetrics{}) if err != nil { return nil, err diff --git a/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy.go b/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy.go index 16dc296d9a..6db20be45e 100644 --- a/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy.go +++ b/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy.go @@ -179,7 +179,7 @@ func NewDynamicPolicy(agentCtx *agent.GenericContext, conf *config.Configuration resourcesReservedMemory := map[v1.ResourceName]map[int]uint64{ v1.ResourceMemory: reservedMemory, } - stateImpl, err := state.NewCheckpointState(conf.GenericQRMPluginConfiguration.StateFileDirectory, memoryPluginStateFileName, + stateImpl, err := state.NewCheckpointState(conf.StateDirectoryConfiguration, memoryPluginStateFileName, memconsts.MemoryResourcePluginPolicyNameDynamic, agentCtx.CPUTopology, agentCtx.MachineInfo, resourcesReservedMemory, conf.SkipMemoryStateCorruption, wrappedEmitter) if err != nil { return false, agent.ComponentStub{}, fmt.Errorf("NewCheckpointState failed with error: %v", err) diff --git a/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy_test.go b/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy_test.go index 12471e5de3..92b1a54bfe 100644 --- a/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy_test.go +++ b/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy_test.go @@ -68,6 +68,7 @@ import ( "github.com/kubewharf/katalyst-core/pkg/config/agent/dynamic" "github.com/kubewharf/katalyst-core/pkg/config/agent/global" qrmconfig "github.com/kubewharf/katalyst-core/pkg/config/agent/qrm" + "github.com/kubewharf/katalyst-core/pkg/config/agent/qrm/statedirectory" "github.com/kubewharf/katalyst-core/pkg/config/generic" coreconsts "github.com/kubewharf/katalyst-core/pkg/consts" "github.com/kubewharf/katalyst-core/pkg/metaserver" @@ -110,7 +111,9 @@ var fakeConf = &config.Configuration{ }, } -func getTestDynamicPolicyWithInitialization(topology *machine.CPUTopology, machineInfo *info.MachineInfo, stateFileDirectory string) (*DynamicPolicy, error) { +func getTestDynamicPolicyWithInitialization( + topology *machine.CPUTopology, machineInfo *info.MachineInfo, stateFileDirectory string, +) (*DynamicPolicy, error) { reservedMemory, err := getReservedMemory(fakeConf, &metaserver.MetaServer{}, machineInfo) if err != nil { return nil, err @@ -131,7 +134,10 @@ func getTestDynamicPolicyWithInitialization(topology *machine.CPUTopology, machi consts.PodAnnotationQoSLevelKey: consts.PodAnnotationQoSLevelReclaimedCores, }) - stateImpl, err := state.NewCheckpointState(stateFileDirectory, memoryPluginStateFileName, + stateDirectoryConfig := &statedirectory.StateDirectoryConfiguration{ + StateFileDirectory: stateFileDirectory, + } + stateImpl, err := state.NewCheckpointState(stateDirectoryConfig, memoryPluginStateFileName, memconsts.MemoryResourcePluginPolicyNameDynamic, topology, machineInfo, resourcesReservedMemory, false, metrics.DummyMetrics{}) if err != nil { return nil, fmt.Errorf("NewCheckpointState failed with error: %v", err) @@ -3222,7 +3228,9 @@ func TestNewAndStartDynamicPolicy(t *testing.T) { GenericAgentConfiguration: &configagent.GenericAgentConfiguration{ QRMAdvisorConfiguration: &global.QRMAdvisorConfiguration{}, GenericQRMPluginConfiguration: &qrmconfig.GenericQRMPluginConfiguration{ - StateFileDirectory: tmpDir, + StateDirectoryConfiguration: &statedirectory.StateDirectoryConfiguration{ + StateFileDirectory: tmpDir, + }, QRMPluginSocketDirs: []string{path.Join(tmpDir, "test.sock")}, }, }, @@ -4864,7 +4872,9 @@ type mockMemoryAdvisor struct { advisorsvc.AdvisorServiceServer } -func (m *mockMemoryAdvisor) GetAdvice(ctx context.Context, in *advisorsvc.GetAdviceRequest) (*advisorsvc.GetAdviceResponse, error) { +func (m *mockMemoryAdvisor) GetAdvice( + ctx context.Context, in *advisorsvc.GetAdviceRequest, +) (*advisorsvc.GetAdviceResponse, error) { args := m.Called(ctx, in) return args.Get(0).(*advisorsvc.GetAdviceResponse), args.Error(1) } diff --git a/pkg/agent/qrm-plugins/memory/dynamicpolicy/state/state_checkpoint.go b/pkg/agent/qrm-plugins/memory/dynamicpolicy/state/state_checkpoint.go index 8113259219..51d4d67602 100644 --- a/pkg/agent/qrm-plugins/memory/dynamicpolicy/state/state_checkpoint.go +++ b/pkg/agent/qrm-plugins/memory/dynamicpolicy/state/state_checkpoint.go @@ -17,6 +17,7 @@ limitations under the License. package state import ( + stdErrors "errors" "fmt" "path" "reflect" @@ -26,12 +27,13 @@ import ( info "github.com/google/cadvisor/info/v1" v1 "k8s.io/api/core/v1" "k8s.io/klog/v2" - "k8s.io/kubernetes/pkg/kubelet/checkpointmanager" "k8s.io/kubernetes/pkg/kubelet/checkpointmanager/errors" + "github.com/kubewharf/katalyst-core/pkg/config/agent/qrm/statedirectory" "github.com/kubewharf/katalyst-core/pkg/metrics" "github.com/kubewharf/katalyst-core/pkg/util/general" "github.com/kubewharf/katalyst-core/pkg/util/machine" + "github.com/kubewharf/katalyst-core/pkg/util/qrmcheckpointmanager" ) const ( @@ -45,59 +47,69 @@ var _ State = &stateCheckpoint{} // go to in-memory State, and then go to disk State, i.e. in write-back mode type stateCheckpoint struct { sync.RWMutex - cache *memoryPluginState - policyName string - checkpointManager checkpointmanager.CheckpointManager - checkpointName string + cache *memoryPluginState + policyName string + qrmCheckpointManager *qrmcheckpointmanager.QRMCheckpointManager + checkpointName string // when we add new properties to checkpoint, // it will cause checkpoint corruption and we should skip it skipStateCorruption bool emitter metrics.MetricEmitter + hasPreStop bool } -func NewCheckpointState(stateDir, checkpointName, policyName string, +func NewCheckpointState( + stateDirectoryConfig *statedirectory.StateDirectoryConfiguration, checkpointName, policyName string, topology *machine.CPUTopology, machineInfo *info.MachineInfo, reservedMemory map[v1.ResourceName]map[int]uint64, skipStateCorruption bool, emitter metrics.MetricEmitter, ) (State, error) { - checkpointManager, err := checkpointmanager.NewCheckpointManager(stateDir) + currentStateDir, otherStateDir := stateDirectoryConfig.GetCurrentAndPreviousStateFileDirectory() + hasPreStop := stateDirectoryConfig.HasPreStop + + qrmCheckpointManager, err := qrmcheckpointmanager.NewQRMCheckpointManager(currentStateDir, otherStateDir, checkpointName, "memory_plugin") if err != nil { return nil, fmt.Errorf("failed to initialize checkpoint manager: %v", err) } - defaultCache, err := NewMemoryPluginState(topology, machineInfo, reservedMemory) if err != nil { return nil, fmt.Errorf("NewMemoryPluginState failed with error: %v", err) } stateCheckpoint := &stateCheckpoint{ - cache: defaultCache, - policyName: policyName, - checkpointManager: checkpointManager, - checkpointName: checkpointName, - skipStateCorruption: skipStateCorruption, - emitter: emitter, + cache: defaultCache, + policyName: policyName, + qrmCheckpointManager: qrmCheckpointManager, + checkpointName: checkpointName, + skipStateCorruption: skipStateCorruption, + emitter: emitter, + hasPreStop: hasPreStop, } if err := stateCheckpoint.restoreState(machineInfo, reservedMemory); err != nil { return nil, fmt.Errorf("could not restore state from checkpoint: %v, please drain this node and delete the memory plugin checkpoint file %q before restarting Kubelet", - err, path.Join(stateDir, checkpointName)) + err, path.Join(currentStateDir, checkpointName)) } return stateCheckpoint, nil } -func (sc *stateCheckpoint) restoreState(machineInfo *info.MachineInfo, reservedMemory map[v1.ResourceName]map[int]uint64) error { +// restoreState is first done by searching the current directory for the state file. +// If it does not exist, we search the other directory for the state file and try to migrate the state file over to the current directory. +func (sc *stateCheckpoint) restoreState( + machineInfo *info.MachineInfo, reservedMemory map[v1.ResourceName]map[int]uint64, +) error { sc.Lock() defer sc.Unlock() var err error var foundAndSkippedStateCorruption bool checkpoint := NewMemoryPluginCheckpoint() - if err = sc.checkpointManager.GetCheckpoint(sc.checkpointName, checkpoint); err != nil { - if err == errors.ErrCheckpointNotFound { - return sc.storeState() - } else if err == errors.ErrCorruptCheckpoint { + if err = sc.qrmCheckpointManager.GetCurrentCheckpoint(sc.checkpointName, checkpoint, true); err != nil { + if stdErrors.Is(err, errors.ErrCheckpointNotFound) { + // We cannot find checkpoint, so it is possible that previous checkpoint was stored in either disk or memory + return sc.tryMigrateState(machineInfo, reservedMemory, checkpoint) + } else if stdErrors.Is(err, errors.ErrCorruptCheckpoint) { if !sc.skipStateCorruption { return err } @@ -109,13 +121,23 @@ func (sc *stateCheckpoint) restoreState(machineInfo *info.MachineInfo, reservedM } } + _, err = sc.updateCacheAndReturnChanged(machineInfo, reservedMemory, checkpoint, foundAndSkippedStateCorruption) + return err +} + +// updateCacheAndReturnChanged updates the cache and returns whether the state has changed +func (sc *stateCheckpoint) updateCacheAndReturnChanged( + machineInfo *info.MachineInfo, reservedMemory map[v1.ResourceName]map[int]uint64, + checkpoint *MemoryPluginCheckpoint, foundAndSkippedStateCorruption bool, +) (bool, error) { + var hasStateChanged bool if sc.policyName != checkpoint.PolicyName && !sc.skipStateCorruption { - return fmt.Errorf("[memory_plugin] configured policy %q differs from state checkpoint policy %q", sc.policyName, checkpoint.PolicyName) + return hasStateChanged, fmt.Errorf("[memory_plugin] configured policy %q differs from state checkpoint policy %q", sc.policyName, checkpoint.PolicyName) } generatedResourcesMachineState, err := GenerateMachineStateFromPodEntries(machineInfo, checkpoint.PodResourceEntries, checkpoint.MachineState, reservedMemory) if err != nil { - return fmt.Errorf("GenerateMachineStateFromPodEntries failed with error: %v", err) + return hasStateChanged, fmt.Errorf("GenerateMachineStateFromPodEntries failed with error: %v", err) } sc.cache.SetMachineState(generatedResourcesMachineState) @@ -126,22 +148,72 @@ func (sc *stateCheckpoint) restoreState(machineInfo *info.MachineInfo, reservedM klog.Warningf("[memory_plugin] machine state changed: "+ "generatedResourcesMachineState: %s; checkpointMachineState: %s", generatedResourcesMachineState.String(), checkpoint.MachineState.String()) + hasStateChanged = true err = sc.storeState() if err != nil { - return fmt.Errorf("storeState when machine state changed failed with error: %v", err) + return hasStateChanged, fmt.Errorf("storeState when machine state changed failed with error: %v", err) } } if foundAndSkippedStateCorruption { klog.Infof("[memory_plugin] found and skipped state corruption, we shoud store to rectify the checksum") + hasStateChanged = true err = sc.storeState() if err != nil { - return fmt.Errorf("storeState failed with error: %v", err) + return hasStateChanged, fmt.Errorf("storeState failed with error: %v", err) } } klog.InfoS("[memory_plugin] state checkpoint: restored state from checkpoint") + return hasStateChanged, nil +} + +// tryMigrateState tries to migrate the state file from the other directory to current directory. +// If the other directory does not have a state file, then we build a new checkpoint. +func (sc *stateCheckpoint) tryMigrateState( + machineInfo *info.MachineInfo, reservedMemory map[v1.ResourceName]map[int]uint64, + checkpoint *MemoryPluginCheckpoint, +) error { + var foundAndSkippedStateCorruption bool + klog.Infof("[memory_plugin] trying to migrate state") + + // Build new checkpoint if the state directory that we want to migrate from is empty + if !sc.hasPreStop { + return sc.storeState() + } + + if err := sc.qrmCheckpointManager.GetPreviousCheckpoint(sc.checkpointName, checkpoint); err != nil { + if stdErrors.Is(err, errors.ErrCheckpointNotFound) { + // Old checkpoint file is not found, so we just store state in new checkpoint + general.Infof("[memory_plugin] checkpoint %v doesn't exist, create it", sc.checkpointName) + return sc.storeState() + } else if stdErrors.Is(err, errors.ErrCorruptCheckpoint) { + if !sc.skipStateCorruption { + return err + } + foundAndSkippedStateCorruption = true + klog.Warningf("[memory_plugin] restore checkpoint failed with err: %s, but we skip it", err) + } else { + return err + } + } + + hasStateChanged, err := sc.updateCacheAndReturnChanged(machineInfo, reservedMemory, checkpoint, foundAndSkippedStateCorruption) + if err != nil { + return fmt.Errorf("[memory_plugin] updateCacheAndReturnChanged failed with error: %v", err) + } + + // always store state after migrating to new checkpoint + if err := sc.storeState(); err != nil { + return fmt.Errorf("[memory_plugin] failed to store state during end of migration: %v", err) + } + + if err := sc.qrmCheckpointManager.ValidateCheckpointFilesMigration(hasStateChanged); err != nil { + return fmt.Errorf("[memory_plugin] ValidateCheckpointFilesMigration failed with error: %v", err) + } + + klog.Infof("[memory_plugin] migrate checkpoint succeeded") return nil } @@ -166,7 +238,7 @@ func (sc *stateCheckpoint) storeState() error { checkpoint.NUMAHeadroom = sc.cache.GetNUMAHeadroom() checkpoint.PodResourceEntries = sc.cache.GetPodResourceEntries() - err := sc.checkpointManager.CreateCheckpoint(sc.checkpointName, checkpoint) + err := sc.qrmCheckpointManager.CreateCheckpoint(sc.checkpointName, checkpoint) if err != nil { klog.ErrorS(err, "Could not save checkpoint") return err @@ -202,7 +274,9 @@ func (sc *stateCheckpoint) GetNUMAHeadroom() map[int]int64 { return sc.cache.GetNUMAHeadroom() } -func (sc *stateCheckpoint) GetAllocationInfo(resourceName v1.ResourceName, podUID, containerName string) *AllocationInfo { +func (sc *stateCheckpoint) GetAllocationInfo( + resourceName v1.ResourceName, podUID, containerName string, +) *AllocationInfo { sc.RLock() defer sc.RUnlock() @@ -242,7 +316,9 @@ func (sc *stateCheckpoint) SetNUMAHeadroom(numaHeadroom map[int]int64, persist b } } -func (sc *stateCheckpoint) SetAllocationInfo(resourceName v1.ResourceName, podUID, containerName string, allocationInfo *AllocationInfo, persist bool) { +func (sc *stateCheckpoint) SetAllocationInfo( + resourceName v1.ResourceName, podUID, containerName string, allocationInfo *AllocationInfo, persist bool, +) { sc.Lock() defer sc.Unlock() diff --git a/pkg/agent/qrm-plugins/memory/dynamicpolicy/state/state_test.go b/pkg/agent/qrm-plugins/memory/dynamicpolicy/state/state_test.go index a3e8c91d58..4f29144e9d 100644 --- a/pkg/agent/qrm-plugins/memory/dynamicpolicy/state/state_test.go +++ b/pkg/agent/qrm-plugins/memory/dynamicpolicy/state/state_test.go @@ -17,9 +17,25 @@ limitations under the License. package state import ( + "fmt" + "io/ioutil" + "os" + "path/filepath" "testing" + info "github.com/google/cadvisor/info/v1" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + v1 "k8s.io/api/core/v1" + pluginapi "k8s.io/kubelet/pkg/apis/resourceplugin/v1alpha1" + "k8s.io/kubernetes/pkg/kubelet/checkpointmanager" + "k8s.io/kubernetes/pkg/kubelet/checkpointmanager/errors" + + "github.com/kubewharf/katalyst-api/pkg/consts" + "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/commonstate" + "github.com/kubewharf/katalyst-core/pkg/metrics" + "github.com/kubewharf/katalyst-core/pkg/util/machine" + "github.com/kubewharf/katalyst-core/pkg/util/qrmcheckpointmanager" ) func TestGetReadonlyState(t *testing.T) { @@ -41,3 +57,154 @@ func TestGetWriteOnlyState(t *testing.T) { as.NotNil(err) } } + +func TestTryMigrateState(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + preStop bool + expectEqual bool + expectOldExists bool + corruptFile bool + }{ + { + name: "successful migration with pre-stop", + preStop: true, + expectEqual: true, + expectOldExists: false, + corruptFile: false, + }, + { + name: "migration without pre-stop", + preStop: false, + expectEqual: false, + expectOldExists: true, + corruptFile: false, + }, + { + name: "corrupted checkpoint", + preStop: true, + expectEqual: false, + corruptFile: true, + expectOldExists: true, + }, + } + + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + tmpDir := t.TempDir() + inMemoryTmpDir := t.TempDir() + + stateDir := filepath.Join(tmpDir, "state") + err := os.MkdirAll(stateDir, 0o775) + assert.NoError(t, err) + + cpuTopology, err := machine.GenerateDummyCPUTopology(16, 2, 4) + assert.NoError(t, err) + + machineInfo := &info.MachineInfo{} + reservedMemory := make(map[v1.ResourceName]map[int]uint64) + defaultCache, err := NewMemoryPluginState(cpuTopology, machineInfo, reservedMemory) + assert.NoError(t, err) + + policyName := "test-policy" + checkpointName := "test-checkpoint" + + // create old checkpoint manager to save checkpoint + oldCheckpointManager, err := checkpointmanager.NewCheckpointManager(stateDir) + assert.NoError(t, err) + + oldCheckpoint := NewMemoryPluginCheckpoint() + if tt.corruptFile { + // create a corrupted old checkpoint + corruptedFile := filepath.Join(stateDir, fmt.Sprintf("%s", checkpointName)) + err = ioutil.WriteFile(corruptedFile, []byte("corrupted data"), 0o644) + assert.NoError(t, err) + } else { + oldCheckpoint.PolicyName = policyName + podResourceEntries := PodResourceEntries{ + v1.ResourceMemory: PodEntries{ + "podUID": ContainerEntries{ + "testName": &AllocationInfo{ + AllocationMeta: commonstate.AllocationMeta{ + PodUid: "podUID", + PodNamespace: "testName", + PodName: "testName", + ContainerName: "testName", + ContainerType: pluginapi.ContainerType_MAIN.String(), + ContainerIndex: 0, + QoSLevel: consts.PodAnnotationQoSLevelDedicatedCores, + Annotations: map[string]string{ + consts.PodAnnotationQoSLevelKey: consts.PodAnnotationQoSLevelDedicatedCores, + consts.PodAnnotationMemoryEnhancementNumaBinding: consts.PodAnnotationMemoryEnhancementNumaBindingEnable, + }, + Labels: map[string]string{ + consts.PodAnnotationQoSLevelKey: consts.PodAnnotationQoSLevelDedicatedCores, + }, + }, + AggregatedQuantity: 9663676416, + NumaAllocationResult: machine.NewCPUSet(0), + TopologyAwareAllocations: map[int]uint64{ + 0: 9663676416, + }, + }, + }, + }, + } + oldCheckpoint.PodResourceEntries = podResourceEntries + machineState, err := GenerateMachineStateFromPodEntries(machineInfo, podResourceEntries, nil, reservedMemory) + assert.NoError(t, err) + oldCheckpoint.MachineState = machineState + err = oldCheckpointManager.CreateCheckpoint(checkpointName, oldCheckpoint) + assert.NoError(t, err) + } + + // create a new checkpoint with a new checkpoint manager + sc := &stateCheckpoint{ + policyName: policyName, + checkpointName: checkpointName, + cache: defaultCache, + skipStateCorruption: false, + emitter: metrics.DummyMetrics{}, + hasPreStop: tt.preStop, + } + + // current checkpoint is pointing to the in memory directory + sc.qrmCheckpointManager, err = qrmcheckpointmanager.NewQRMCheckpointManager(inMemoryTmpDir, stateDir, checkpointName, "memory_plugin") + assert.NoError(t, err) + + newCheckpoint := NewMemoryPluginCheckpoint() + err = sc.tryMigrateState(machineInfo, reservedMemory, newCheckpoint) + + if tt.corruptFile { + assert.Error(t, err) + return + } + assert.NoError(t, err) + + // check if new checkpoint is created and verify equality + err = sc.qrmCheckpointManager.GetCurrentCheckpoint(checkpointName, newCheckpoint, false) + assert.NoError(t, err) + + // verify old checkpoint file existence + checkpoint := NewMemoryPluginCheckpoint() + err = oldCheckpointManager.GetCheckpoint(checkpointName, checkpoint) + + if tt.expectOldExists { + assert.NoError(t, err) + } else { + assert.Error(t, err) + assert.Equal(t, errors.ErrCheckpointNotFound, err) + } + + if tt.expectEqual { + assert.Equal(t, newCheckpoint, oldCheckpoint) + } else { + assert.NotEqual(t, newCheckpoint, oldCheckpoint) + } + }) + } +} diff --git a/pkg/agent/qrm-plugins/network/state/state_checkpoint.go b/pkg/agent/qrm-plugins/network/state/state_checkpoint.go index c630615bb3..045f82b186 100644 --- a/pkg/agent/qrm-plugins/network/state/state_checkpoint.go +++ b/pkg/agent/qrm-plugins/network/state/state_checkpoint.go @@ -17,6 +17,7 @@ limitations under the License. package state import ( + stdErrors "errors" "fmt" "path" "reflect" @@ -24,14 +25,15 @@ import ( "time" info "github.com/google/cadvisor/info/v1" + "k8s.io/klog/v2" + "k8s.io/kubernetes/pkg/kubelet/checkpointmanager/errors" "github.com/kubewharf/katalyst-core/pkg/config/agent/qrm" + "github.com/kubewharf/katalyst-core/pkg/config/agent/qrm/statedirectory" "github.com/kubewharf/katalyst-core/pkg/metrics" "github.com/kubewharf/katalyst-core/pkg/util/general" "github.com/kubewharf/katalyst-core/pkg/util/machine" - - "k8s.io/kubernetes/pkg/kubelet/checkpointmanager" - "k8s.io/kubernetes/pkg/kubelet/checkpointmanager/errors" + "github.com/kubewharf/katalyst-core/pkg/util/qrmcheckpointmanager" ) const ( @@ -48,21 +50,27 @@ var ( // go to in-memory State, and then go to disk State, i.e. in write-back mode type stateCheckpoint struct { sync.RWMutex - cache *networkPluginState - policyName string - checkpointManager checkpointmanager.CheckpointManager - checkpointName string + cache *networkPluginState + policyName string + qrmCheckpointManager *qrmcheckpointmanager.QRMCheckpointManager + checkpointName string // when we add new properties to checkpoint, // it will cause checkpoint corruption and we should skip it skipStateCorruption bool emitter metrics.MetricEmitter + hasPreStop bool } -func NewCheckpointState(conf *qrm.QRMPluginsConfiguration, stateDir, checkpointName, policyName string, +func NewCheckpointState( + conf *qrm.QRMPluginsConfiguration, stateDirectoryConfig *statedirectory.StateDirectoryConfiguration, + checkpointName, policyName string, machineInfo *info.MachineInfo, nics []machine.InterfaceInfo, reservedBandwidth map[string]uint32, skipStateCorruption bool, emitter metrics.MetricEmitter, ) (State, error) { - checkpointManager, err := checkpointmanager.NewCheckpointManager(stateDir) + currentStateDir, otherStateDir := stateDirectoryConfig.GetCurrentAndPreviousStateFileDirectory() + hasPreStop := stateDirectoryConfig.HasPreStop + + qrmCheckpointManager, err := qrmcheckpointmanager.NewQRMCheckpointManager(currentStateDir, otherStateDir, checkpointName, "network_plugin") if err != nil { return nil, fmt.Errorf("failed to initialize checkpoint manager: %v", err) } @@ -73,33 +81,39 @@ func NewCheckpointState(conf *qrm.QRMPluginsConfiguration, stateDir, checkpointN } stateCheckpoint := &stateCheckpoint{ - cache: defaultCache, - policyName: policyName, - checkpointManager: checkpointManager, - checkpointName: checkpointName, - skipStateCorruption: skipStateCorruption, - emitter: emitter, + cache: defaultCache, + policyName: policyName, + qrmCheckpointManager: qrmCheckpointManager, + checkpointName: checkpointName, + skipStateCorruption: skipStateCorruption, + emitter: emitter, + hasPreStop: hasPreStop, } if err := stateCheckpoint.restoreState(conf, nics, reservedBandwidth); err != nil { return nil, fmt.Errorf("could not restore state from checkpoint: %v, please drain this node and delete the network plugin checkpoint file %q before restarting Kubelet", - err, path.Join(stateDir, checkpointName)) + err, path.Join(currentStateDir, checkpointName)) } return stateCheckpoint, nil } -func (sc *stateCheckpoint) restoreState(conf *qrm.QRMPluginsConfiguration, nics []machine.InterfaceInfo, reservedBandwidth map[string]uint32) error { +// restoreState is first done by searching the current directory for the state file. +// If it does not exist, we search the other directory for the state file and try to migrate the state file over to the current directory. +func (sc *stateCheckpoint) restoreState( + conf *qrm.QRMPluginsConfiguration, nics []machine.InterfaceInfo, reservedBandwidth map[string]uint32, +) error { sc.Lock() defer sc.Unlock() var err error var foundAndSkippedStateCorruption bool checkpoint := NewNetworkPluginCheckpoint() - if err = sc.checkpointManager.GetCheckpoint(sc.checkpointName, checkpoint); err != nil { - if err == errors.ErrCheckpointNotFound { - return sc.storeState() - } else if err == errors.ErrCorruptCheckpoint { + if err = sc.qrmCheckpointManager.GetCurrentCheckpoint(sc.checkpointName, checkpoint, true); err != nil { + if stdErrors.Is(err, errors.ErrCheckpointNotFound) { + // We cannot find checkpoint, so it is possible that previous checkpoint was stored in either disk or memory + return sc.tryMigrateState(conf, nics, reservedBandwidth, checkpoint) + } else if stdErrors.Is(err, errors.ErrCorruptCheckpoint) { if !sc.skipStateCorruption { return err } @@ -111,13 +125,23 @@ func (sc *stateCheckpoint) restoreState(conf *qrm.QRMPluginsConfiguration, nics } } + _, err = sc.updateCacheAndReturnChanged(conf, nics, reservedBandwidth, checkpoint, foundAndSkippedStateCorruption) + return err +} + +// updateCacheAndReturnChanged updates the cache and returns whether the state has changed +func (sc *stateCheckpoint) updateCacheAndReturnChanged( + conf *qrm.QRMPluginsConfiguration, nics []machine.InterfaceInfo, reservedBandwidth map[string]uint32, + checkpoint *NetworkPluginCheckpoint, foundAndSkippedStateCorruption bool, +) (bool, error) { + var hasStateChanged bool if sc.policyName != checkpoint.PolicyName && !sc.skipStateCorruption { - return fmt.Errorf("[network_plugin] configured policy %q differs from state checkpoint policy %q", sc.policyName, checkpoint.PolicyName) + return hasStateChanged, fmt.Errorf("[network_plugin] configured policy %q differs from state checkpoint policy %q", sc.policyName, checkpoint.PolicyName) } generatedNetworkState, err := GenerateMachineStateFromPodEntries(conf, nics, checkpoint.PodEntries, reservedBandwidth) if err != nil { - return fmt.Errorf("GenerateMachineStateFromPodEntries failed with error: %v", err) + return hasStateChanged, fmt.Errorf("GenerateMachineStateFromPodEntries failed with error: %v", err) } sc.cache.SetMachineState(generatedNetworkState) @@ -128,23 +152,73 @@ func (sc *stateCheckpoint) restoreState(conf *qrm.QRMPluginsConfiguration, nics "generatedNetworkState: %s; checkpointMachineState: %s", generatedNetworkState.String(), checkpoint.MachineState.String()) + hasStateChanged = true err = sc.storeState() if err != nil { - return fmt.Errorf("storeState when machine state changed failed with error: %v", err) + return hasStateChanged, fmt.Errorf("storeState when machine state changed failed with error: %v", err) } } if foundAndSkippedStateCorruption { generalLog.Infof("found and skipped state corruption, we shoud store to rectify the checksum") + hasStateChanged = true err = sc.storeState() if err != nil { - return fmt.Errorf("storeState failed with error: %v", err) + return hasStateChanged, fmt.Errorf("storeState failed with error: %v", err) } } generalLog.InfoS("state checkpoint: restored state from checkpoint") + return hasStateChanged, nil +} + +// tryMigrateState tries to migrate the state file from the other directory to current directory. +// If the other directory does not have a state file, then we build a new checkpoint. +func (sc *stateCheckpoint) tryMigrateState( + conf *qrm.QRMPluginsConfiguration, nics []machine.InterfaceInfo, reservedBandwidth map[string]uint32, + checkpoint *NetworkPluginCheckpoint, +) error { + var foundAndSkippedStateCorruption bool + klog.Infof("[network_plugin] trying to migrate state") + + // Build new checkpoint if the state directory that we want to migrate from is empty + if !sc.hasPreStop { + return sc.storeState() + } + + if err := sc.qrmCheckpointManager.GetPreviousCheckpoint(sc.checkpointName, checkpoint); err != nil { + if stdErrors.Is(err, errors.ErrCheckpointNotFound) { + // Old checkpoint file is not found, so we just store state in new checkpoint + general.InfoS("[network_plugin] checkpoint %v doesn't exist, create it", sc.checkpointName) + return sc.storeState() + } else if stdErrors.Is(err, errors.ErrCorruptCheckpoint) { + if !sc.skipStateCorruption { + return err + } + foundAndSkippedStateCorruption = true + klog.Warningf("[network_plugin] restore checkpoint failed with err: %s, but we skip it", err) + } else { + return err + } + } + + hasStateChanged, err := sc.updateCacheAndReturnChanged(conf, nics, reservedBandwidth, checkpoint, foundAndSkippedStateCorruption) + if err != nil { + return fmt.Errorf("[network_plugin] failed to populate checkpoint state during state migration: %v", err) + } + + // always store state after migrating to new checkpoint + if err := sc.storeState(); err != nil { + return fmt.Errorf("[network_plugin] failed to store checkpoint state during end of migration: %v", err) + } + + if err := sc.qrmCheckpointManager.ValidateCheckpointFilesMigration(hasStateChanged); err != nil { + return fmt.Errorf("[network_plugin] ValidateCheckpointFilesMigration failed with error: %v", err) + } + + klog.Infof("[network_plugin] checkpoint migration succeeded") return nil } @@ -161,7 +235,7 @@ func (sc *stateCheckpoint) storeState() error { checkpoint.MachineState = sc.cache.GetMachineState() checkpoint.PodEntries = sc.cache.GetPodEntries() - err := sc.checkpointManager.CreateCheckpoint(sc.checkpointName, checkpoint) + err := sc.qrmCheckpointManager.CreateCheckpoint(sc.checkpointName, checkpoint) if err != nil { generalLog.ErrorS(err, "could not save checkpoint") return err @@ -224,7 +298,9 @@ func (sc *stateCheckpoint) SetMachineState(nicMap NICMap, persist bool) { } } -func (sc *stateCheckpoint) SetAllocationInfo(podUID, containerName string, allocationInfo *AllocationInfo, persist bool) { +func (sc *stateCheckpoint) SetAllocationInfo( + podUID, containerName string, allocationInfo *AllocationInfo, persist bool, +) { sc.Lock() defer sc.Unlock() diff --git a/pkg/agent/qrm-plugins/network/state/state_test.go b/pkg/agent/qrm-plugins/network/state/state_test.go new file mode 100644 index 0000000000..489baabed9 --- /dev/null +++ b/pkg/agent/qrm-plugins/network/state/state_test.go @@ -0,0 +1,191 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package state + +import ( + "fmt" + "io/ioutil" + "os" + "path/filepath" + "testing" + + info "github.com/google/cadvisor/info/v1" + "github.com/stretchr/testify/assert" + pluginapi "k8s.io/kubelet/pkg/apis/resourceplugin/v1alpha1" + "k8s.io/kubernetes/pkg/kubelet/checkpointmanager" + "k8s.io/kubernetes/pkg/kubelet/checkpointmanager/errors" + + "github.com/kubewharf/katalyst-api/pkg/consts" + "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/commonstate" + "github.com/kubewharf/katalyst-core/pkg/config/agent/qrm" + "github.com/kubewharf/katalyst-core/pkg/metrics" + "github.com/kubewharf/katalyst-core/pkg/util/machine" + "github.com/kubewharf/katalyst-core/pkg/util/qrmcheckpointmanager" +) + +func TestTryMigrateState(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + preStop bool + expectEqual bool + corruptFile bool + expectOldExists bool + }{ + { + name: "successful migration with pre-stop", + preStop: true, + expectEqual: true, + expectOldExists: false, + corruptFile: false, + }, + { + name: "migration without pre-stop", + preStop: false, + expectEqual: false, + expectOldExists: true, + corruptFile: false, + }, + { + name: "corrupted checkpoint", + preStop: true, + expectEqual: false, + corruptFile: true, + expectOldExists: true, + }, + } + + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + tmpDir := t.TempDir() + inMemoryTmpDir := t.TempDir() + + stateDir := filepath.Join(tmpDir, "state") + err := os.Mkdir(stateDir, 0o775) + assert.NoError(t, err) + defer os.RemoveAll(stateDir) + + qrmConfig := &qrm.QRMPluginsConfiguration{ + CPUQRMPluginConfig: &qrm.CPUQRMPluginConfig{ + ReservedCPUCores: 4, + CPUDynamicPolicyConfig: qrm.CPUDynamicPolicyConfig{ + EnableReserveCPUReversely: true, + }, + }, + } + machineInfo := &info.MachineInfo{} + nics := make([]machine.InterfaceInfo, 0) + reservedBandwidth := make(map[string]uint32) + + defaultCache, err := NewNetworkPluginState(qrmConfig, machineInfo, nics, reservedBandwidth) + assert.NoError(t, err) + + policyName := "test-policy" + checkpointName := "test-checkpoint" + + // create old checkpoint manager to save the checkpoint + oldCheckpointManager, err := checkpointmanager.NewCheckpointManager(stateDir) + assert.NoError(t, err) + + oldCheckpoint := NewNetworkPluginCheckpoint() + if tt.corruptFile { + // create a corrupted old checkpoint + corruptedFile := filepath.Join(stateDir, fmt.Sprintf("%s", checkpointName)) + err = ioutil.WriteFile(corruptedFile, []byte("corrupted data"), 0o644) + assert.NoError(t, err) + } else { + oldCheckpoint.PolicyName = policyName + podEntries := PodEntries{ + "podUID": ContainerEntries{ + "testName": &AllocationInfo{ + AllocationMeta: commonstate.AllocationMeta{ + PodUid: "podUID", + PodNamespace: "testName", + PodName: "testName", + ContainerName: "testName", + ContainerType: pluginapi.ContainerType_MAIN.String(), + ContainerIndex: 0, + QoSLevel: consts.PodAnnotationQoSLevelDedicatedCores, + Annotations: map[string]string{ + consts.PodAnnotationQoSLevelKey: consts.PodAnnotationQoSLevelDedicatedCores, + consts.PodAnnotationMemoryEnhancementNumaBinding: consts.PodAnnotationMemoryEnhancementNumaBindingEnable, + }, + Labels: map[string]string{ + consts.PodAnnotationQoSLevelKey: consts.PodAnnotationQoSLevelDedicatedCores, + }, + }, + NumaNodes: machine.NewCPUSet(0), + }, + }, + } + oldCheckpoint.PodEntries = podEntries + generatedNetworkState, err := GenerateMachineStateFromPodEntries(qrmConfig, nics, podEntries, reservedBandwidth) + assert.NoError(t, err) + oldCheckpoint.MachineState = generatedNetworkState + err = oldCheckpointManager.CreateCheckpoint(checkpointName, oldCheckpoint) + assert.NoError(t, err) + } + + // create a new checkpoint with a new checkpoint manager + sc := &stateCheckpoint{ + policyName: policyName, + checkpointName: checkpointName, + cache: defaultCache, + skipStateCorruption: false, + emitter: metrics.DummyMetrics{}, + hasPreStop: tt.preStop, + } + + // current checkpoint is pointing to the in memory directory + sc.qrmCheckpointManager, err = qrmcheckpointmanager.NewQRMCheckpointManager(inMemoryTmpDir, stateDir, checkpointName, "network_plugin") + assert.NoError(t, err) + + newCheckpoint := NewNetworkPluginCheckpoint() + err = sc.tryMigrateState(qrmConfig, nics, reservedBandwidth, newCheckpoint) + + if tt.corruptFile { + assert.Error(t, err) + return + } + assert.NoError(t, err) + + // check if new checkpoint is created and verify equality + err = sc.qrmCheckpointManager.GetCurrentCheckpoint(sc.checkpointName, newCheckpoint, false) + assert.NoError(t, err) + + // verify old checkpoint file existence + checkpoint := NewNetworkPluginCheckpoint() + err = oldCheckpointManager.GetCheckpoint(checkpointName, checkpoint) + + if tt.expectOldExists { + assert.NoError(t, err) + } else { + assert.Error(t, err) + assert.Equal(t, errors.ErrCheckpointNotFound, err) + } + + if tt.expectEqual { + assert.Equal(t, newCheckpoint, oldCheckpoint) + } else { + assert.NotEqual(t, newCheckpoint, oldCheckpoint) + } + }) + } +} diff --git a/pkg/agent/qrm-plugins/network/staticpolicy/policy.go b/pkg/agent/qrm-plugins/network/staticpolicy/policy.go index fb792ac189..53caad77a1 100644 --- a/pkg/agent/qrm-plugins/network/staticpolicy/policy.go +++ b/pkg/agent/qrm-plugins/network/staticpolicy/policy.go @@ -138,7 +138,7 @@ func NewStaticPolicy(agentCtx *agent.GenericContext, conf *config.Configuration, return false, agent.ComponentStub{}, fmt.Errorf("getReservedBandwidth failed with error: %v", err) } - stateImpl, err := state.NewCheckpointState(conf.QRMPluginsConfiguration, conf.GenericQRMPluginConfiguration.StateFileDirectory, NetworkPluginStateFileName, + stateImpl, err := state.NewCheckpointState(conf.QRMPluginsConfiguration, conf.StateDirectoryConfiguration, NetworkPluginStateFileName, NetworkResourcePluginPolicyNameStatic, agentCtx.MachineInfo, enabledNICs, reservation, conf.SkipNetworkStateCorruption, wrappedEmitter) if err != nil { return false, agent.ComponentStub{}, fmt.Errorf("NewCheckpointState failed with error: %v", err) diff --git a/pkg/agent/qrm-plugins/network/staticpolicy/policy_test.go b/pkg/agent/qrm-plugins/network/staticpolicy/policy_test.go index ff126ec232..856670de4c 100644 --- a/pkg/agent/qrm-plugins/network/staticpolicy/policy_test.go +++ b/pkg/agent/qrm-plugins/network/staticpolicy/policy_test.go @@ -47,6 +47,7 @@ import ( "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/util" "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/util/reactor" "github.com/kubewharf/katalyst-core/pkg/config" + "github.com/kubewharf/katalyst-core/pkg/config/agent/qrm/statedirectory" "github.com/kubewharf/katalyst-core/pkg/metaserver" metaserveragent "github.com/kubewharf/katalyst-core/pkg/metaserver/agent" "github.com/kubewharf/katalyst-core/pkg/metaserver/agent/pod" @@ -179,7 +180,10 @@ func makeStaticPolicy(t *testing.T, hasNic bool) *StaticPolicy { assert.NoError(t, err) defer os.RemoveAll(tmpDir) - stateImpl, err := state.NewCheckpointState(mockQrmConfig, tmpDir, NetworkPluginStateFileName, + stateDirectoryConfig := &statedirectory.StateDirectoryConfiguration{ + StateFileDirectory: tmpDir, + } + stateImpl, err := state.NewCheckpointState(mockQrmConfig, stateDirectoryConfig, NetworkPluginStateFileName, NetworkResourcePluginPolicyNameStatic, &info.MachineInfo{}, availableNICs, reservation, false, metrics.DummyMetrics{}) assert.NoError(t, err) diff --git a/pkg/config/agent/qrm/qrm_base.go b/pkg/config/agent/qrm/qrm_base.go index 9b4bb6f9bc..18136bb490 100644 --- a/pkg/config/agent/qrm/qrm_base.go +++ b/pkg/config/agent/qrm/qrm_base.go @@ -16,14 +16,18 @@ limitations under the License. package qrm -import "github.com/kubewharf/katalyst-api/pkg/consts" +import ( + "github.com/kubewharf/katalyst-api/pkg/consts" + "github.com/kubewharf/katalyst-core/pkg/config/agent/qrm/statedirectory" +) type GenericQRMPluginConfiguration struct { - StateFileDirectory string - QRMPluginSocketDirs []string - ExtraStateFileAbsPath string - PodDebugAnnoKeys []string - UseKubeletReservedConfig bool + StateFileDirectory string + InMemoryStateFileDirectory string + QRMPluginSocketDirs []string + ExtraStateFileAbsPath string + PodDebugAnnoKeys []string + UseKubeletReservedConfig bool // PodAnnotationKeptKeys indicates pod annotation keys will be kept in qrm state PodAnnotationKeptKeys []string // PodLabelKeptKeys indicates pod label keys will be kept in qrm state @@ -35,6 +39,10 @@ type GenericQRMPluginConfiguration struct { // EnableSNBHighNumaPreference indicates whether to enable high numa preference for snb pods // if set true, snb pod will be preferentially allocated on high numa node EnableSNBHighNumaPreference bool + // IsInMemoryStore indicates whether we want to store the state in memory or on disk + // if set true, the state will be stored in tmpfs + EnableInMemoryState bool + *statedirectory.StateDirectoryConfiguration } type QRMPluginsConfiguration struct { @@ -50,7 +58,8 @@ func NewGenericQRMPluginConfiguration() *GenericQRMPluginConfiguration { consts.PodAnnotationAggregatedRequestsKey, consts.PodAnnotationInplaceUpdateResizingKey, }, - PodLabelKeptKeys: []string{}, + PodLabelKeptKeys: []string{}, + StateDirectoryConfiguration: statedirectory.NewStateDirectoryConfiguration(), } } diff --git a/pkg/config/agent/qrm/statedirectory/statedirectory_base.go b/pkg/config/agent/qrm/statedirectory/statedirectory_base.go new file mode 100644 index 0000000000..08266b0265 --- /dev/null +++ b/pkg/config/agent/qrm/statedirectory/statedirectory_base.go @@ -0,0 +1,41 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package statedirectory + +type StateDirectoryConfiguration struct { + StateFileDirectory string + InMemoryStateFileDirectory string + // EnableInMemoryState indicates whether we want to store the state in memory or on disk + // if set true, the state will be stored in tmpfs + EnableInMemoryState bool + // HasPreStop indicates whether we have pre-stop script in place + HasPreStop bool +} + +func NewStateDirectoryConfiguration() *StateDirectoryConfiguration { + return &StateDirectoryConfiguration{} +} + +func (c *StateDirectoryConfiguration) GetCurrentAndPreviousStateFileDirectory() (string, string) { + if !c.HasPreStop { + return c.StateFileDirectory, c.InMemoryStateFileDirectory + } + if c.EnableInMemoryState { + return c.InMemoryStateFileDirectory, c.StateFileDirectory + } + return c.StateFileDirectory, c.InMemoryStateFileDirectory +} diff --git a/pkg/util/general/file.go b/pkg/util/general/file.go index 9c9cf540eb..350615849d 100644 --- a/pkg/util/general/file.go +++ b/pkg/util/general/file.go @@ -22,10 +22,12 @@ import ( "encoding/json" "errors" "fmt" + "io" "io/ioutil" "os" "path" "path/filepath" + "reflect" "strconv" "strings" "syscall" @@ -37,8 +39,9 @@ import ( ) const ( - FlockCoolingInterval = 6 * time.Second - FlockTryLockMaxTimes = 10 + FlockCoolingInterval = 6 * time.Second + FlockTryLockMaxTimes = 10 + ModificationTimeDifferenceThreshold = 2 * time.Second ) type FileWatcherInfo struct { @@ -115,7 +118,8 @@ func GetOneExistPath(paths []string) string { } // GetOneExistPathUntilExist returns a path until one provided path exists -func GetOneExistPathUntilExist(paths []string, checkInterval, +func GetOneExistPathUntilExist( + paths []string, checkInterval, timeoutDuration time.Duration, ) (string, error) { ctx, cancel := context.WithTimeout(context.Background(), timeoutDuration) @@ -375,3 +379,61 @@ func ParseLinuxListFormatFromFile(filePath string) ([]int64, error) { } return ParseLinuxListFormat(s) } + +// JSONFilesEqual unmarshals the contents of JSON files into structs and checks if they are identical +func JSONFilesEqual(path1, path2 string) (bool, error) { + decode := func(path string) (interface{}, error) { + f, err := os.Open(path) + if err != nil { + return nil, fmt.Errorf("failed to open file %s: %w", path, err) + } + defer f.Close() + var obj interface{} + if err := json.NewDecoder(f).Decode(&obj); err != nil { + if errors.Is(err, io.EOF) { + return obj, nil + } + return nil, fmt.Errorf("failed to decode file %s: %w", path, err) + } + return obj, nil + } + + obj1, err := decode(path1) + if err != nil { + return false, err + } + obj2, err := decode(path2) + if err != nil { + return false, err + } + + return reflect.DeepEqual(obj1, obj2), nil +} + +// IsFileUpToDate checks if the target file is updated by comparing its last modification time with the other file +// The modification time of the target file has to fulfill any of the following conditions to be considered up to date: +// 1. Be updated more recently than the other file +// 2. Fall within a threshold difference of the other file's modification time +func IsFileUpToDate(targetFilePath string, otherFilePath string) (bool, error) { + targetInfo, err := os.Stat(targetFilePath) + if err != nil { + return false, fmt.Errorf("failed to stat target file %s, err %v", targetFilePath, err) + } + otherInfo, err := os.Stat(otherFilePath) + if err != nil { + return false, fmt.Errorf("failed to stat other file %s, err %v", otherFilePath, err) + } + targetModTime := targetInfo.ModTime() + otherModTime := otherInfo.ModTime() + + // Target file is updated more recently than the other file + if targetModTime.After(otherModTime) { + return true, nil + } + + // Check if the modification time of the target file is within the threshold difference of the other file's modification time + if otherModTime.Sub(targetModTime).Seconds() <= ModificationTimeDifferenceThreshold.Seconds() { + return true, nil + } + return false, nil +} diff --git a/pkg/util/general/file_test.go b/pkg/util/general/file_test.go index f7971f9c37..7dfae5c699 100644 --- a/pkg/util/general/file_test.go +++ b/pkg/util/general/file_test.go @@ -457,3 +457,293 @@ func TestParseLinuxListFormatFromFile(t *testing.T) { }) } } + +func TestFilesEqual(t *testing.T) { + t.Parallel() + tmpDir := t.TempDir() + + createTempFile := func(content string) string { + f, err := os.CreateTemp(tmpDir, "testfile-") + assert.NoError(t, err) + _, err = f.WriteString(content) + assert.NoError(t, err) + err = f.Close() + assert.NoError(t, err) + return f.Name() + } + + jsonContent := `{"key":"value1", "number": 1}` + identicalJsonContent := `{"key":"value1", "number": 1}` + differentJsonContent := `{"key":"value1", "number": 0}` + differentFormatContent := `{"number": 1, "key": "value1"}` + + tests := []struct { + name string + setup func() (path1, path2 string) + wantEqual bool + wantErr bool + }{ + { + name: "one of the files is not of JSON format", + setup: func() (string, string) { + path1 := createTempFile("hello world") + path2 := createTempFile(jsonContent) + return path1, path2 + }, + wantEqual: false, + wantErr: true, + }, + { + name: "both files are not of JSON format", + setup: func() (string, string) { + path1 := createTempFile("hello world") + path2 := createTempFile("hello world") + return path1, path2 + }, + wantEqual: false, + wantErr: true, + }, + { + name: "one file does not exist", + setup: func() (string, string) { + path1 := createTempFile(jsonContent) + return path1, "non-existent-file" + }, + wantEqual: false, + wantErr: true, + }, + { + name: "empty files", + setup: func() (string, string) { + path1 := createTempFile("") + path2 := createTempFile("") + return path1, path2 + }, + wantEqual: true, + wantErr: false, + }, + { + name: "identical json files", + setup: func() (string, string) { + path1 := createTempFile(jsonContent) + path2 := createTempFile(identicalJsonContent) + return path1, path2 + }, + wantEqual: true, + wantErr: false, + }, + { + name: "different json files", + setup: func() (string, string) { + path1 := createTempFile(jsonContent) + path2 := createTempFile(differentJsonContent) + return path1, path2 + }, + wantEqual: false, + wantErr: false, + }, + { + name: "copied files should be equal", + setup: func() (string, string) { + path1 := createTempFile(jsonContent) + path2 := path1 + ".copy" + + input, err := os.ReadFile(path1) + assert.NoError(t, err) + + err = os.WriteFile(path2, input, 0o644) + assert.NoError(t, err) + + return path1, path2 + }, + wantEqual: true, + wantErr: false, + }, + { + name: "different formatted JSON files should still return true", + setup: func() (string, string) { + path1 := createTempFile(jsonContent) + path2 := createTempFile(differentFormatContent) + return path1, path2 + }, + wantEqual: true, + wantErr: false, + }, + } + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + path1, path2 := tt.setup() + + equal, err := JSONFilesEqual(path1, path2) + + if tt.wantErr { + assert.Error(t, err) + } else { + assert.NoError(t, err) + } + assert.Equal(t, tt.wantEqual, equal) + }) + } +} + +func TestIsFileUpToDate(t *testing.T) { + t.Parallel() + tmpDir := t.TempDir() + + createTempFile := func(name string, content string) (string, error) { + filePath := filepath.Join(tmpDir, name) + err := os.WriteFile(filePath, []byte(content), 0o644) + return filePath, err + } + + tests := []struct { + name string + setup func() (string, string) + wantUpToDate bool + wantErr bool + cleanup func(string, string) + adjustModTime func(string, string) + }{ + { + name: "target file does not exist", + setup: func() (string, string) { + otherFile, err := createTempFile("other.txt", "test") + assert.NoError(t, err) + return "non-existent-file", otherFile + }, + wantErr: true, + cleanup: func(targetFilePath, otherFilePath string) { + os.Remove(otherFilePath) + }, + }, + { + name: "other file does not exist", + setup: func() (string, string) { + targetFile, err := createTempFile("target.txt", "test") + assert.NoError(t, err) + return targetFile, "non-existent-file" + }, + wantErr: true, + cleanup: func(targetFilePath, otherFilePath string) { + os.Remove(targetFilePath) + }, + }, + { + name: "target file is up to date", + setup: func() (string, string) { + targetFile, err := createTempFile("target_updated_1.txt", "test") + assert.NoError(t, err) + otherFile, err := createTempFile("other_updated_1.txt", "test") + assert.NoError(t, err) + return targetFile, otherFile + }, + wantUpToDate: true, + adjustModTime: func(targetFilePath, otherFilePath string) { + now := time.Now() + err := os.Chtimes(otherFilePath, now, now) + assert.NoError(t, err) + updatedTime := now.Add(-(ModificationTimeDifferenceThreshold - time.Second)) + err = os.Chtimes(targetFilePath, updatedTime, updatedTime) + assert.NoError(t, err) + }, + cleanup: func(targetFilePath, otherFilePath string) { + os.Remove(targetFilePath) + os.Remove(otherFilePath) + }, + }, + { + name: "target file is up to date as it is more recently updated", + setup: func() (string, string) { + targetFile, err := createTempFile("target_updated_2.txt", "test") + assert.NoError(t, err) + otherFile, err := createTempFile("other_updated_2.txt", "test") + assert.NoError(t, err) + return targetFile, otherFile + }, + wantUpToDate: true, + adjustModTime: func(targetFilePath, otherFilePath string) { + now := time.Now() + err := os.Chtimes(otherFilePath, now, now) + assert.NoError(t, err) + updatedTime := now.Add(ModificationTimeDifferenceThreshold + time.Second) + err = os.Chtimes(targetFilePath, updatedTime, updatedTime) + assert.NoError(t, err) + }, + cleanup: func(targetFilePath, otherFilePath string) { + os.Remove(targetFilePath) + os.Remove(otherFilePath) + }, + }, + { + name: "files modification time difference equals threshold", + setup: func() (string, string) { + targetFile, err := createTempFile("target_updated_3.txt", "test") + assert.NoError(t, err) + otherFile, err := createTempFile("other_updated_3.txt", "test") + assert.NoError(t, err) + return targetFile, otherFile + }, + wantUpToDate: true, + adjustModTime: func(targetFilePath, otherFilePath string) { + now := time.Now() + err := os.Chtimes(otherFilePath, now, now) + assert.NoError(t, err) + updatedTime := now.Add(-ModificationTimeDifferenceThreshold) + err = os.Chtimes(targetFilePath, updatedTime, updatedTime) + assert.NoError(t, err) + }, + cleanup: func(targetFilePath, otherFilePath string) { + os.Remove(targetFilePath) + os.Remove(otherFilePath) + }, + }, + { + name: "target file is updated earlier and files modification time difference is more than threshold", + setup: func() (string, string) { + targetFile, err := createTempFile("target_not_updated.txt", "test") + assert.NoError(t, err) + otherFile, err := createTempFile("other_not_updated.txt", "test") + assert.NoError(t, err) + return targetFile, otherFile + }, + wantUpToDate: false, + adjustModTime: func(targetFilePath, otherFilePath string) { + now := time.Now() + err := os.Chtimes(otherFilePath, now, now) + assert.NoError(t, err) + updatedTime := now.Add(-(ModificationTimeDifferenceThreshold + time.Second)) + err = os.Chtimes(targetFilePath, updatedTime, updatedTime) + assert.NoError(t, err) + }, + cleanup: func(targetFilePath, otherFilePath string) { + os.Remove(targetFilePath) + os.Remove(otherFilePath) + }, + }, + } + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + targetFile, otherFile := tt.setup() + if tt.cleanup != nil { + defer tt.cleanup(targetFile, otherFile) + } + + if tt.adjustModTime != nil { + tt.adjustModTime(targetFile, otherFile) + } + + isUpToDate, err := IsFileUpToDate(targetFile, otherFile) + + if tt.wantErr { + assert.Error(t, err) + } else { + assert.NoError(t, err) + } + assert.Equal(t, tt.wantUpToDate, isUpToDate) + }) + } +} diff --git a/pkg/util/qrmcheckpointmanager/qrmcheckpointmanager.go b/pkg/util/qrmcheckpointmanager/qrmcheckpointmanager.go new file mode 100644 index 0000000000..9fe55e5725 --- /dev/null +++ b/pkg/util/qrmcheckpointmanager/qrmcheckpointmanager.go @@ -0,0 +1,181 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package qrmcheckpointmanager + +import ( + "fmt" + "path/filepath" + + "k8s.io/klog/v2" + "k8s.io/kubernetes/pkg/kubelet/checkpointmanager" + "k8s.io/kubernetes/pkg/kubelet/checkpointmanager/errors" + + "github.com/kubewharf/katalyst-core/pkg/util/general" +) + +type checkpointInfo struct { + checkpointmanager.CheckpointManager + checkpointPath string +} + +// QRMCheckpointManager manages old and new checkpoints for all QRM plugins +// and ensures seamless transfer between them. +type QRMCheckpointManager struct { + currentCheckpointInfo *checkpointInfo + previousCheckpointInfo *checkpointInfo + checkpointName string + pluginName string +} + +func NewQRMCheckpointManager(currentStateDir, previousStateDir, checkpointName, pluginName string) (*QRMCheckpointManager, error) { + currentCheckpointManager, err := checkpointmanager.NewCheckpointManager(currentStateDir) + if err != nil { + return nil, fmt.Errorf("error creating new checkpoint manager: %w", err) + } + var previousCheckpointManager checkpointmanager.CheckpointManager + if previousStateDir != "" { + previousCheckpointManager, err = checkpointmanager.NewCheckpointManager(previousStateDir) + if err != nil { + return nil, fmt.Errorf("error creating previous checkpoint manager: %w", err) + } + } + return &QRMCheckpointManager{ + currentCheckpointInfo: &checkpointInfo{ + checkpointPath: filepath.Join(currentStateDir, checkpointName), + CheckpointManager: currentCheckpointManager, + }, + previousCheckpointInfo: &checkpointInfo{ + checkpointPath: filepath.Join(previousStateDir, checkpointName), + CheckpointManager: previousCheckpointManager, + }, + checkpointName: checkpointName, + pluginName: pluginName, + }, err +} + +// GetCurrentCheckpoint retrieves the current checkpoint and removes the previous checkpoint file if needed +func (cm *QRMCheckpointManager) GetCurrentCheckpoint( + checkpointName string, checkpoint checkpointmanager.Checkpoint, isRemovePreviousCheckpoint bool, +) error { + currentCheckpointManager := cm.currentCheckpointInfo.CheckpointManager + + isUpToDate, err := cm.isCheckpointUpToDate() + if err != nil { + return err + } + // When the current checkpoint is not up to date, we consider the checkpoint missing, so we propagate the checkpoint not found error upwards + if !isUpToDate { + klog.Infof("[%v] checkpoint %s is not up-to-date, we ignore it", cm.pluginName, checkpointName) + return errors.ErrCheckpointNotFound + } + + if err := currentCheckpointManager.GetCheckpoint(checkpointName, checkpoint); err != nil { + return err + } + + if !isRemovePreviousCheckpoint { + return nil + } + + previousCheckpointManager := cm.previousCheckpointInfo.CheckpointManager + if previousCheckpointManager == nil { + return nil + } + if err := previousCheckpointManager.RemoveCheckpoint(checkpointName); err != nil { + return fmt.Errorf("[%v] failed to remove checkpoint %v: %w", cm.pluginName, checkpointName, err) + } + return nil +} + +func (cm *QRMCheckpointManager) GetPreviousCheckpoint( + checkpointName string, checkpoint checkpointmanager.Checkpoint, +) error { + previousCheckpointManager := cm.previousCheckpointInfo.CheckpointManager + if previousCheckpointManager == nil { + return fmt.Errorf("[%v] previous checkpoint manager is nil, which is unexpected", cm.pluginName) + } + + if err := previousCheckpointManager.GetCheckpoint(checkpointName, checkpoint); err != nil { + return err + } + + return nil +} + +// ValidateCheckpointFilesMigration checks if the two checkpoint files are equal after migrating from previous checkpoint to current checkpoint +// If they are not equal, we fall back to the previous checkpoint and continue using it, and make sure we remove the current checkpoint +// If they are equal, we remove the previous checkpoint. +func (cm *QRMCheckpointManager) ValidateCheckpointFilesMigration(hasStateChanged bool) error { + equal, err := cm.checkpointFilesEqual(hasStateChanged) + if err != nil { + return fmt.Errorf("[%v] failed to compare checkpoint files: %w", cm.pluginName, err) + } + if !equal { + klog.Infof("[%v] checkpoint files are not equal, migration failed, fall back to previous checkpoint", cm.pluginName) + if err := cm.currentCheckpointInfo.CheckpointManager.RemoveCheckpoint(cm.checkpointName); err != nil { + return fmt.Errorf("[%v] failed to remove current checkpoint %v during fallback: %w", cm.pluginName, cm.checkpointName, err) + } + cm.currentCheckpointInfo = cm.previousCheckpointInfo + } else { + klog.Infof("[%v] checkpoint files are equal, try to remove previous checkpoint", cm.pluginName) + oldCheckpointManager := cm.previousCheckpointInfo.CheckpointManager + if err := oldCheckpointManager.RemoveCheckpoint(cm.checkpointName); err != nil { + return fmt.Errorf("[%v] failed to remove previous checkpoint %v: %w", cm.pluginName, cm.checkpointName, err) + } + } + return nil +} + +// CheckpointFilesEqual checks if the checkpoints are identical by comparing the 2 files' contents +func (cm *QRMCheckpointManager) checkpointFilesEqual(hasStateChanged bool) (bool, error) { + // A change in machine state is already detected, so we do not need to actually check if the files are identical + if hasStateChanged { + return true, nil + } + currentFilePath := filepath.Join(cm.currentCheckpointInfo.checkpointPath) + previousFilePath := filepath.Join(cm.previousCheckpointInfo.checkpointPath) + return general.JSONFilesEqual(currentFilePath, previousFilePath) +} + +// CreateCheckpoint creates a checkpoint only using the current checkpoint manager. +func (cm *QRMCheckpointManager) CreateCheckpoint(checkpointName string, checkpoint checkpointmanager.Checkpoint) error { + currentCheckpointManager := cm.currentCheckpointInfo.CheckpointManager + return currentCheckpointManager.CreateCheckpoint(checkpointName, checkpoint) +} + +// isCheckpointUpToDate checks if the current checkpoint is up to date +func (cm *QRMCheckpointManager) isCheckpointUpToDate() (bool, error) { + currentCheckpointFilePath := cm.currentCheckpointInfo.checkpointPath + + // Current checkpoint is not up to date as it does not exist + if !general.IsPathExists(currentCheckpointFilePath) { + return false, nil + } + + previousCheckpointFilePath := cm.previousCheckpointInfo.checkpointPath + + // When there is no previous checkpoint, we consider the current checkpoint is up to date + if !general.IsPathExists(previousCheckpointFilePath) { + return true, nil + } + + isUpToDate, err := general.IsFileUpToDate(currentCheckpointFilePath, previousCheckpointFilePath) + if err != nil { + return false, fmt.Errorf("[%v] failed to check if file up to date: %w", cm.pluginName, err) + } + return isUpToDate, nil +} diff --git a/pkg/util/qrmcheckpointmanager/qrmcheckpointmanager_test.go b/pkg/util/qrmcheckpointmanager/qrmcheckpointmanager_test.go new file mode 100644 index 0000000000..b49afda03b --- /dev/null +++ b/pkg/util/qrmcheckpointmanager/qrmcheckpointmanager_test.go @@ -0,0 +1,253 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package qrmcheckpointmanager + +import ( + "encoding/json" + "os" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "k8s.io/kubernetes/pkg/kubelet/checkpointmanager" + "k8s.io/kubernetes/pkg/kubelet/checkpointmanager/checksum" + "k8s.io/kubernetes/pkg/kubelet/checkpointmanager/errors" +) + +var _ checkpointmanager.Checkpoint = &mockCheckpoint{} + +// mockCheckpoint struct is a simple checkpoint for testing purposes +type mockCheckpoint struct { + Content string `json:"Content"` + Checksum checksum.Checksum `json:"Checksum"` +} + +func (mc *mockCheckpoint) MarshalCheckpoint() ([]byte, error) { + mc.Checksum = 0 + mc.Checksum = checksum.New(mc) + return json.Marshal(*mc) +} + +func (mc *mockCheckpoint) UnmarshalCheckpoint(blob []byte) error { + return json.Unmarshal(blob, mc) +} + +func (mc *mockCheckpoint) VerifyChecksum() error { + ck := mc.Checksum + mc.Checksum = 0 + err := ck.Verify(mc) + mc.Checksum = ck + return err +} + +func TestQRMCheckpointManager_GetCurrentCheckpoint(t *testing.T) { + t.Parallel() + tests := []struct { + name string + hasPreviousStateDir bool + adjustModTime func(string, string) + isNotUpToDate bool + }{ + { + name: "no previous state directory", + hasPreviousStateDir: false, + }, + { + name: "has previous state directory, previous checkpoint should not exist", + hasPreviousStateDir: true, + }, + { + name: "has previous state directory and the current file is up to date", + hasPreviousStateDir: true, + adjustModTime: func(currentFilePath, previousFilePath string) { + now := time.Now() + err := os.Chtimes(previousFilePath, now, now) + assert.NoError(t, err) + updatedTime := now.Add(-2 * time.Second) // 2 seconds is the threshold of modification time difference between the two files + err = os.Chtimes(currentFilePath, updatedTime, updatedTime) + assert.NoError(t, err) + }, + }, + { + name: "has previous state directory but the current file is not up to date", + hasPreviousStateDir: true, + adjustModTime: func(currentFilePath, previousFilePath string) { + now := time.Now() + err := os.Chtimes(previousFilePath, now, now) + assert.NoError(t, err) + updatedTime := now.Add(-3 * time.Second) // 3 seconds is out of the threshold of modification time difference between the two files + err = os.Chtimes(currentFilePath, updatedTime, updatedTime) + assert.NoError(t, err) + }, + isNotUpToDate: true, + }, + } + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + currentStateDir := t.TempDir() + + var previousStateDir string + if tt.hasPreviousStateDir { + previousStateDir = t.TempDir() + } + + qrmCheckpointManager, err := NewQRMCheckpointManager(currentStateDir, previousStateDir, "test_checkpoint", "test_plugin") + assert.NoError(t, err) + + checkpoint := &mockCheckpoint{Content: "test_content"} + if tt.hasPreviousStateDir { + err := qrmCheckpointManager.previousCheckpointInfo.CreateCheckpoint("test_checkpoint", checkpoint) + assert.NoError(t, err) + } + err = qrmCheckpointManager.currentCheckpointInfo.CreateCheckpoint("test_checkpoint", checkpoint) + assert.NoError(t, err) + + if tt.adjustModTime != nil && tt.hasPreviousStateDir { + tt.adjustModTime(qrmCheckpointManager.currentCheckpointInfo.checkpointPath, qrmCheckpointManager.previousCheckpointInfo.checkpointPath) + } + + newCheckpoint := &mockCheckpoint{} + err = qrmCheckpointManager.GetCurrentCheckpoint("test_checkpoint", newCheckpoint, true) + + if tt.isNotUpToDate { + assert.Error(t, err) + assert.Equal(t, errors.ErrCheckpointNotFound, err) + return + } + + assert.NoError(t, err) + + // Verify equality of checkpoints + assert.Equal(t, checkpoint, newCheckpoint) + + if tt.hasPreviousStateDir { + // Ensure previous checkpoint does not exist + err = qrmCheckpointManager.previousCheckpointInfo.GetCheckpoint("test_checkpoint", newCheckpoint) + assert.Error(t, err) + assert.Equal(t, errors.ErrCheckpointNotFound, err) + } + }) + } +} + +func TestQRMCheckpointManager_GetPreviousCheckpoint(t *testing.T) { + t.Parallel() + tests := []struct { + name string + hasPreviousCheckpoint bool + }{ + { + name: "no previous checkpoint", + hasPreviousCheckpoint: false, + }, + { + name: "has previous checkpoint", + hasPreviousCheckpoint: true, + }, + } + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + previousStateDir := t.TempDir() + qrmCheckpointManager, err := NewQRMCheckpointManager(t.TempDir(), previousStateDir, "test_checkpoint", "test_plugin") + assert.NoError(t, err) + + checkpoint := &mockCheckpoint{Content: "test_content"} + if tt.hasPreviousCheckpoint { + err := qrmCheckpointManager.previousCheckpointInfo.CreateCheckpoint("test_checkpoint", checkpoint) + assert.NoError(t, err) + } + + newCheckpoint := &mockCheckpoint{} + err = qrmCheckpointManager.GetPreviousCheckpoint("test_checkpoint", newCheckpoint) + + if !tt.hasPreviousCheckpoint { + assert.Error(t, err) + assert.Equal(t, errors.ErrCheckpointNotFound, err) + } else { + assert.NoError(t, err) + assert.Equal(t, checkpoint, newCheckpoint) + } + }) + } +} + +func TestQRMCheckpointManager_ValidateCheckpointFilesMigration(t *testing.T) { + t.Parallel() + tests := []struct { + name string + isEqual bool + hasStateChanged bool + }{ + { + name: "Checkpoints are not equal, fallback to previous checkpoint", + isEqual: false, + hasStateChanged: false, + }, + { + name: "Checkpoints are equal, previous checkpoint should not exist", + isEqual: true, + hasStateChanged: false, + }, + { + name: "Checkpoints are not equal but a state change is detected, previous checkpoint should not exist", + isEqual: false, + hasStateChanged: true, + }, + } + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + currentStateDir := t.TempDir() + previousStateDir := t.TempDir() + + qrmCheckpointManager, err := NewQRMCheckpointManager(currentStateDir, previousStateDir, "test_checkpoint", "test_plugin") + assert.NoError(t, err) + + checkpoint1 := &mockCheckpoint{Content: "test_content"} + err = qrmCheckpointManager.currentCheckpointInfo.CreateCheckpoint("test_checkpoint", checkpoint1) + assert.NoError(t, err) + + if tt.isEqual { + checkpoint2 := &mockCheckpoint{Content: "test_content"} + err = qrmCheckpointManager.previousCheckpointInfo.CreateCheckpoint("test_checkpoint", checkpoint2) + assert.NoError(t, err) + } else { + checkpoint2 := &mockCheckpoint{Content: "different_content"} + err = qrmCheckpointManager.previousCheckpointInfo.CreateCheckpoint("test_checkpoint", checkpoint2) + assert.NoError(t, err) + } + + err = qrmCheckpointManager.ValidateCheckpointFilesMigration(tt.hasStateChanged) + assert.NoError(t, err) + newCheckpoint := &mockCheckpoint{} + + if tt.isEqual || tt.hasStateChanged { + // Ensure previous checkpoint does not exist + err = qrmCheckpointManager.previousCheckpointInfo.GetCheckpoint("test_checkpoint", newCheckpoint) + assert.Error(t, err) + assert.Equal(t, errors.ErrCheckpointNotFound, err) + } else { + assert.Equal(t, qrmCheckpointManager.currentCheckpointInfo, qrmCheckpointManager.previousCheckpointInfo) + } + }) + } +}