@@ -20,18 +20,12 @@ along with this program. If not, see <http://www.gnu.org/licenses/>.
2020package parameters
2121
2222import (
23- "context"
2423 "fmt"
2524
26- corev1 "k8s.io/api/core/v1"
27- metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
28- "k8s.io/apimachinery/pkg/labels"
29- "sigs.k8s.io/controller-runtime/pkg/client"
25+ "k8s.io/utils/ptr"
3026
27+ apisappsv1 "github.com/apecloud/kubeblocks/apis/apps/v1"
3128 parametersv1alpha1 "github.com/apecloud/kubeblocks/apis/parameters/v1alpha1"
32- intctrlutil "github.com/apecloud/kubeblocks/pkg/controllerutil"
33- "github.com/apecloud/kubeblocks/pkg/parameters"
34- "github.com/apecloud/kubeblocks/pkg/parameters/core"
3529)
3630
3731var syncPolicyInstance = & syncPolicy {}
@@ -47,87 +41,61 @@ func (o *syncPolicy) Upgrade(rctx reconfigureContext) (returnedStatus, error) {
4741 if len (updatedParameters ) == 0 {
4842 return makeReturnedStatus (ESNone ), nil
4943 }
44+ return o .sync (rctx , updatedParameters )
45+ }
5046
51- pods , err := getPodsForOnlineUpdate (rctx )
52- if err != nil {
53- return makeReturnedStatus (ESFailedAndRetry ), err
47+ func (o * syncPolicy ) sync (rctx reconfigureContext , parameters map [string ]string ) (returnedStatus , error ) {
48+ var targetCfg * apisappsv1.ClusterComponentConfig
49+ for i , cfg := range rctx .ClusterComponent .Configs {
50+ if ptr .Deref (cfg .Name , "" ) == rctx .ConfigTemplate .Name {
51+ targetCfg = & rctx .ClusterComponent .Configs [i ]
52+ break
53+ }
54+ }
55+ if targetCfg == nil {
56+ return makeReturnedStatus (ESFailedAndRetry ), fmt .Errorf ("config %s not found" , rctx .ConfigTemplate .Name )
5457 }
55- return o .sync (rctx , updatedParameters , pods )
58+ if targetCfg .VersionHash != rctx .getTargetVersionHash () {
59+ return o .update (rctx , targetCfg , parameters )
60+ }
61+ return o .check (rctx )
5662}
5763
58- func (o * syncPolicy ) sync (rctx reconfigureContext , updatedParameters map [ string ] string , pods []corev1. Pod ) (returnedStatus , error ) {
64+ func (o * syncPolicy ) update (rctx reconfigureContext , targetCfg * apisappsv1. ClusterComponentConfig , parameters map [ string ] string ) (returnedStatus , error ) {
5965 var (
60- r = ESNone
61- total = int32 (len (pods ))
62- replicas = int32 (rctx .getTargetReplicas ())
63- progress = core .NotStarted
64-
65- err error
66- ctx = rctx .Ctx
67- configKey = rctx .generateConfigIdentifier ()
68- versionHash = rctx .getTargetVersionHash ()
69- selector = parameters .GetPodSelector (rctx .ParametersDef )
70- fileName string
66+ replicas = rctx .getTargetReplicas ()
67+ // fileName string
7168 )
69+ // if rctx.ConfigDescription != nil {
70+ // fileName = rctx.ConfigDescription.Name
71+ // }
7272
73- if selector != nil {
74- pods , err = o .matchLabel (pods , selector )
75- }
76- if err != nil {
77- return makeReturnedStatus (ESFailedAndRetry ), err
78- }
79- if len (pods ) == 0 {
80- rctx .Log .Info (fmt .Sprintf ("no pods to update, and retry, selector: %v" , selector ))
81- return makeReturnedStatus (ESRetry ), nil
82- }
83- if rctx .ConfigDescription != nil {
84- fileName = rctx .ConfigDescription .Name
85- }
73+ targetCfg .Variables = parameters
74+ targetCfg .VersionHash = rctx .getTargetVersionHash ()
8675
87- requireUpdatedCount := int32 (len (pods ))
88- for _ , pod := range pods {
89- rctx .Log .V (1 ).Info (fmt .Sprintf ("sync pod: %s" , pod .Name ))
90- if intctrlutil .IsMatchConfigVersion (& pod , configKey , versionHash ) {
91- progress ++
92- continue
93- }
94- if ! intctrlutil .IsPodReady (& pod ) {
95- continue
96- }
97- if err = commonOnlineUpdateWithPod (& pod , ctx , rctx .ConfigTemplate .Name , fileName , updatedParameters ); err != nil {
98- return makeReturnedStatus (ESFailedAndRetry ), err
99- }
100- if err = o .updatePodLabelsWithConfigVersion (& pod , configKey , versionHash , rctx .Client , ctx ); err != nil {
101- return makeReturnedStatus (ESFailedAndRetry ), err
102- }
103- progress ++
104- }
76+ // TODO: update cluster spec
10577
106- if requireUpdatedCount != progress || replicas != total {
107- r = ESRetry
108- }
109- return makeReturnedStatus (r , withExpected (requireUpdatedCount ), withSucceed (progress )), nil
78+ return makeReturnedStatus (ESRetry , withExpected (replicas ), withSucceed (0 )), nil
11079}
11180
112- func (o * syncPolicy ) matchLabel (pods []corev1.Pod , selector * metav1.LabelSelector ) ([]corev1.Pod , error ) {
113- var result []corev1.Pod
114- match , err := metav1 .LabelSelectorAsSelector (selector )
115- if err != nil {
116- return nil , core .WrapError (err , "failed to convert selector: %v" , selector )
117- }
118- for _ , pod := range pods {
119- if match .Matches (labels .Set (pod .Labels )) {
120- result = append (result , pod )
81+ func (o * syncPolicy ) check (rctx reconfigureContext ) (returnedStatus , error ) {
82+ var (
83+ replicas = rctx .getTargetReplicas ()
84+ versionHash = rctx .getTargetVersionHash ()
85+ )
86+ progress := int32 (0 )
87+ for _ , inst := range rctx .ITS .Status .InstanceStatus {
88+ for _ , cfg := range inst .Configs {
89+ if cfg .Name == rctx .ConfigTemplate .Name {
90+ if cfg .VersionHash == versionHash {
91+ progress ++
92+ }
93+ break
94+ }
12195 }
12296 }
123- return result , nil
124- }
125-
126- func (o * syncPolicy ) updatePodLabelsWithConfigVersion (pod * corev1.Pod , labelKey , configVersion string , cli client.Client , ctx context.Context ) error {
127- patch := client .MergeFrom (pod .DeepCopy ())
128- if pod .Labels == nil {
129- pod .Labels = make (map [string ]string , 1 )
97+ if progress == replicas {
98+ return makeReturnedStatus (ESNone , withExpected (replicas ), withSucceed (progress )), nil
13099 }
131- pod .Labels [labelKey ] = configVersion
132- return cli .Patch (ctx , pod , patch )
100+ return makeReturnedStatus (ESRetry , withExpected (replicas ), withSucceed (progress )), nil
133101}
0 commit comments