Skip to content

Commit 9356602

Browse files
committed
Implement per node Affinity & Tolerations
1 parent 7d751fe commit 9356602

File tree

15 files changed

+1950
-31
lines changed

15 files changed

+1950
-31
lines changed

api/v1/kubegres_types.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,13 @@ import (
2727

2828
// ----------------------- SPEC -------------------------------------------
2929

30+
type KubegresNodeSet struct {
31+
// Name of this set of nodes. Becomes a part of the StatefulSet name.
32+
Name string `json:"name"`
33+
Affinity *v1.Affinity `json:"affinity,omitempty"`
34+
Tolerations []v1.Toleration `json:"tolerations,omitempty"`
35+
}
36+
3037
type KubegresDatabase struct {
3138
Size string `json:"size,omitempty"`
3239
VolumeMount string `json:"volumeMount,omitempty"`
@@ -67,6 +74,7 @@ type Probe struct {
6774

6875
type KubegresSpec struct {
6976
Replicas *int32 `json:"replicas,omitempty"`
77+
NodeSets []KubegresNodeSet `json:"nodeSets,omitempty"`
7078
Image string `json:"image,omitempty"`
7179
Port int32 `json:"port,omitempty"`
7280
ImagePullSecrets []v1.LocalObjectReference `json:"imagePullSecrets,omitempty"`

api/v1/zz_generated.deepcopy.go

Lines changed: 35 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

config/crd/bases/kubegres.reactive-tech.io_kubegres.yaml

Lines changed: 678 additions & 0 deletions
Large diffs are not rendered by default.

controllers/ctx/KubegresContext.go

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ package ctx
2222

2323
import (
2424
"context"
25+
apps "k8s.io/api/apps/v1"
2526
"reactive-tech.io/kubegres/api/v1"
2627
"reactive-tech.io/kubegres/controllers/ctx/log"
2728
"reactive-tech.io/kubegres/controllers/ctx/status"
@@ -63,7 +64,12 @@ func (r *KubegresContext) GetServiceResourceName(isPrimary bool) string {
6364
}
6465

6566
func (r *KubegresContext) GetStatefulSetResourceName(instanceIndex int32) string {
66-
return r.Kubegres.Name + "-" + strconv.Itoa(int(instanceIndex))
67+
if r.HasNodeSets() && len(r.Kubegres.Spec.NodeSets) >= int(instanceIndex) {
68+
nodeSetSpec := r.Kubegres.Spec.NodeSets[instanceIndex-1]
69+
return r.Kubegres.Name + "-" + nodeSetSpec.Name
70+
} else {
71+
return r.Kubegres.Name + "-" + strconv.Itoa(int(instanceIndex))
72+
}
6773
}
6874

6975
func (r *KubegresContext) IsReservedVolumeName(volumeName string) bool {
@@ -72,3 +78,25 @@ func (r *KubegresContext) IsReservedVolumeName(volumeName string) bool {
7278
volumeName == CustomConfigMapVolumeName ||
7379
strings.Contains(volumeName, "kube-api")
7480
}
81+
82+
func (r *KubegresContext) HasNodeSets() bool {
83+
return r.Kubegres.Spec.NodeSets != nil
84+
}
85+
86+
func (r *KubegresContext) Replicas() *int32 {
87+
if r.HasNodeSets() {
88+
replicas := int32(len(r.Kubegres.Spec.NodeSets))
89+
return &replicas
90+
}
91+
return r.Kubegres.Spec.Replicas
92+
}
93+
94+
func (r *KubegresContext) GetInstanceIndexFromSpec(statefulSet apps.StatefulSet) (int32, error) {
95+
instanceIndexStr := statefulSet.Spec.Template.Labels["index"]
96+
instanceIndex, err := strconv.ParseInt(instanceIndexStr, 10, 32)
97+
if err != nil {
98+
r.Log.ErrorEvent("StatefulSetLoadingErr", err, "Unable to convert StatefulSet's label 'index' with value: "+instanceIndexStr+" into an integer. The name of statefulSet with this label is "+statefulSet.Name+".")
99+
return 0, err
100+
}
101+
return int32(instanceIndex), nil
102+
}

controllers/spec/checker/SpecChecker.go

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,11 +147,25 @@ func (r *SpecChecker) CheckSpec() (SpecCheckResult, error) {
147147
specCheckResult.FatalErrorMessage = r.createErrMsgSpecUndefined("spec.env.POSTGRES_REPLICATION_PASSWORD")
148148
}
149149

150-
if *spec.Replicas <= 0 {
150+
if *spec.Replicas <= 0 && len(spec.NodeSets) == 0 {
151151
specCheckResult.HasSpecFatalError = true
152152
specCheckResult.FatalErrorMessage = r.createErrMsgSpecUndefined("spec.replicas")
153153
}
154154

155+
if *spec.Replicas > 0 && len(spec.NodeSets) > 0 {
156+
specCheckResult.HasSpecFatalError = true
157+
specCheckResult.FatalErrorMessage = r.logSpecErrMsg("In the Resources Spec the value of " +
158+
"'spec.replicas' and 'spec.nodeSets' are mutually exclusive. " +
159+
"Please set only one of the value otherwise this operator cannot work correctly.")
160+
}
161+
162+
for _, nodeSet := range spec.NodeSets {
163+
if nodeSet.Name == "" {
164+
specCheckResult.HasSpecFatalError = true
165+
specCheckResult.FatalErrorMessage = r.createErrMsgSpecUndefined("spec.nodeSets[].Name")
166+
}
167+
}
168+
155169
if spec.Image == "" {
156170
specCheckResult.HasSpecFatalError = true
157171
specCheckResult.FatalErrorMessage = r.createErrMsgSpecUndefined("spec.image")

controllers/spec/enforcer/resources_count_spec/statefulset/PrimaryDbCountSpecEnforcer.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ func (r *PrimaryDbCountSpecEnforcer) initialiseStatusEnforcedReplicas() {
9797
return
9898
}
9999

100-
specReplicas := *r.kubegresContext.Kubegres.Spec.Replicas
100+
specReplicas := *r.kubegresContext.Replicas()
101101

102102
if specReplicas >= 1 && specReplicas == r.resourcesStates.StatefulSets.NbreDeployed {
103103
r.kubegresContext.Status.SetEnforcedReplicas(specReplicas)
@@ -123,7 +123,7 @@ func (r *PrimaryDbCountSpecEnforcer) shouldWeDeployNewPrimaryDb() bool {
123123
r.resourcesStates.StatefulSets.Replicas.NbreDeployed == 0
124124

125125
if shouldWeDeployNewPrimary {
126-
if *r.kubegresContext.Kubegres.Spec.Replicas == 1 || !r.hasPrimaryEverBeenDeployed() {
126+
if *r.kubegresContext.Replicas() == 1 || !r.hasPrimaryEverBeenDeployed() {
127127
return true
128128
}
129129
}
@@ -207,7 +207,7 @@ func (r *PrimaryDbCountSpecEnforcer) getInstanceIndexIfPrimaryNeedsToBeRecreated
207207
}
208208

209209
func (r *PrimaryDbCountSpecEnforcer) isThereReplicaToFailOverToInSpec() bool {
210-
return *r.kubegresContext.Kubegres.Spec.Replicas > 1
210+
return *r.kubegresContext.Replicas() > 1
211211
}
212212

213213
func (r *PrimaryDbCountSpecEnforcer) wasPrimaryNeverDeployed() bool {

controllers/spec/enforcer/resources_count_spec/statefulset/ReplicaDbCountSpecEnforcer.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,7 @@ func (r *ReplicaDbCountSpecEnforcer) isManualFailoverRequested() bool {
206206
}
207207

208208
func (r *ReplicaDbCountSpecEnforcer) doesSpecRequireTheDeploymentOfAdditionalReplicas() bool {
209-
return *r.kubegresContext.Kubegres.Spec.Replicas > r.kubegresContext.Kubegres.Status.EnforcedReplicas
209+
return *r.kubegresContext.Replicas() > r.kubegresContext.Kubegres.Status.EnforcedReplicas
210210
}
211211

212212
func (r *ReplicaDbCountSpecEnforcer) resetInSpecManualFailover() error {

controllers/spec/enforcer/statefulset_spec/AffinitySpecEnforcer.go

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ package statefulset_spec
2222

2323
import (
2424
apps "k8s.io/api/apps/v1"
25+
v1 "k8s.io/api/core/v1"
2526
"reactive-tech.io/kubegres/controllers/ctx"
2627
"reflect"
2728
)
@@ -39,9 +40,15 @@ func (r *AffinitySpecEnforcer) GetSpecName() string {
3940
}
4041

4142
func (r *AffinitySpecEnforcer) CheckForSpecDifference(statefulSet *apps.StatefulSet) StatefulSetSpecDifference {
42-
4343
current := statefulSet.Spec.Template.Spec.Affinity
44-
expected := r.kubegresContext.Kubegres.Spec.Scheduler.Affinity
44+
45+
var expected *v1.Affinity
46+
statefulInstanceIndex, _ := r.kubegresContext.GetInstanceIndexFromSpec(*statefulSet)
47+
if r.kubegresContext.HasNodeSets() && r.kubegresContext.Kubegres.Spec.NodeSets[statefulInstanceIndex-1].Affinity != nil {
48+
expected = r.kubegresContext.Kubegres.Spec.NodeSets[statefulInstanceIndex-1].Affinity
49+
} else {
50+
expected = r.kubegresContext.Kubegres.Spec.Scheduler.Affinity
51+
}
4552

4653
if !reflect.DeepEqual(current, expected) {
4754
return StatefulSetSpecDifference{
@@ -55,7 +62,14 @@ func (r *AffinitySpecEnforcer) CheckForSpecDifference(statefulSet *apps.Stateful
5562
}
5663

5764
func (r *AffinitySpecEnforcer) EnforceSpec(statefulSet *apps.StatefulSet) (wasSpecUpdated bool, err error) {
58-
statefulSet.Spec.Template.Spec.Affinity = r.kubegresContext.Kubegres.Spec.Scheduler.Affinity
65+
statefulInstanceIndex, _ := r.kubegresContext.GetInstanceIndexFromSpec(*statefulSet)
66+
67+
if r.kubegresContext.HasNodeSets() && r.kubegresContext.Kubegres.Spec.NodeSets[statefulInstanceIndex-1].Affinity != nil {
68+
statefulSet.Spec.Template.Spec.Affinity = r.kubegresContext.Kubegres.Spec.NodeSets[statefulInstanceIndex-1].Affinity
69+
} else {
70+
statefulSet.Spec.Template.Spec.Affinity = r.kubegresContext.Kubegres.Spec.Scheduler.Affinity
71+
}
72+
5973
return true, nil
6074
}
6175

controllers/spec/enforcer/statefulset_spec/TolerationsSpecEnforcer.go

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,15 @@ func (r *TolerationsSpecEnforcer) GetSpecName() string {
4040
}
4141

4242
func (r *TolerationsSpecEnforcer) CheckForSpecDifference(statefulSet *apps.StatefulSet) StatefulSetSpecDifference {
43-
4443
current := statefulSet.Spec.Template.Spec.Tolerations
45-
expected := r.kubegresContext.Kubegres.Spec.Scheduler.Tolerations
44+
45+
var expected []v1.Toleration
46+
statefulInstanceIndex, _ := r.kubegresContext.GetInstanceIndexFromSpec(*statefulSet)
47+
if r.kubegresContext.HasNodeSets() && r.kubegresContext.Kubegres.Spec.NodeSets[statefulInstanceIndex-1].Tolerations != nil {
48+
expected = r.kubegresContext.Kubegres.Spec.NodeSets[statefulInstanceIndex-1].Tolerations
49+
} else {
50+
expected = r.kubegresContext.Kubegres.Spec.Scheduler.Tolerations
51+
}
4652

4753
if !r.compare(current, expected) {
4854
return StatefulSetSpecDifference{
@@ -56,7 +62,13 @@ func (r *TolerationsSpecEnforcer) CheckForSpecDifference(statefulSet *apps.State
5662
}
5763

5864
func (r *TolerationsSpecEnforcer) EnforceSpec(statefulSet *apps.StatefulSet) (wasSpecUpdated bool, err error) {
59-
statefulSet.Spec.Template.Spec.Tolerations = r.kubegresContext.Kubegres.Spec.Scheduler.Tolerations
65+
statefulInstanceIndex, _ := r.kubegresContext.GetInstanceIndexFromSpec(*statefulSet)
66+
if r.kubegresContext.HasNodeSets() && r.kubegresContext.Kubegres.Spec.NodeSets[statefulInstanceIndex-1].Tolerations != nil {
67+
statefulSet.Spec.Template.Spec.Tolerations = r.kubegresContext.Kubegres.Spec.NodeSets[statefulInstanceIndex-1].Tolerations
68+
} else {
69+
statefulSet.Spec.Template.Spec.Tolerations = r.kubegresContext.Kubegres.Spec.Scheduler.Tolerations
70+
}
71+
6072
return true, nil
6173
}
6274

controllers/spec/template/ResourcesCreatorFromTemplate.go

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,7 @@ func (r *ResourcesCreatorFromTemplate) CreateBackUpCronJob(configMapNameForBackU
164164
backUpCronJobContainer.Env = append(backUpCronJobContainer.Env, r.kubegresContext.Kubegres.Spec.Env...)
165165

166166
backSourceDbHostName := r.kubegresContext.GetServiceResourceName(false)
167-
if *postgres.Spec.Replicas == 1 {
167+
if *r.kubegresContext.Replicas() == 1 {
168168
backSourceDbHostName = r.kubegresContext.GetServiceResourceName(true)
169169
}
170170
backUpCronJobContainer.Env[3].Value = backSourceDbHostName
@@ -223,13 +223,26 @@ func (r *ResourcesCreatorFromTemplate) initStatefulSet(
223223
statefulSetTemplate.Spec.VolumeClaimTemplates[0].Spec.StorageClassName = postgresSpec.Database.StorageClassName
224224
statefulSetTemplate.Spec.VolumeClaimTemplates[0].Spec.Resources.Requests = core.ResourceList{core.ResourceStorage: resource.MustParse(postgresSpec.Database.Size)}
225225

226-
if postgresSpec.Scheduler.Affinity != nil {
227-
statefulSetTemplateSpec.Affinity = postgresSpec.Scheduler.Affinity
226+
if r.kubegresContext.HasNodeSets() {
227+
if postgresSpec.NodeSets[statefulSetInstanceIndex-1].Affinity != nil {
228+
statefulSetTemplateSpec.Affinity = postgresSpec.NodeSets[statefulSetInstanceIndex-1].Affinity
229+
} else if postgresSpec.Scheduler.Affinity != nil {
230+
statefulSetTemplateSpec.Affinity = postgresSpec.Scheduler.Affinity
231+
}
232+
if len(postgresSpec.NodeSets[statefulSetInstanceIndex-1].Tolerations) > 0 {
233+
statefulSetTemplateSpec.Tolerations = postgresSpec.NodeSets[statefulSetInstanceIndex-1].Tolerations
234+
} else if len(postgresSpec.Scheduler.Tolerations) > 0 {
235+
statefulSetTemplateSpec.Tolerations = postgresSpec.Scheduler.Tolerations
236+
}
237+
} else {
238+
if postgresSpec.Scheduler.Affinity != nil {
239+
statefulSetTemplateSpec.Affinity = postgresSpec.Scheduler.Affinity
240+
}
241+
if len(postgresSpec.Scheduler.Tolerations) > 0 {
242+
statefulSetTemplateSpec.Tolerations = postgresSpec.Scheduler.Tolerations
243+
}
228244
}
229245

230-
if len(postgresSpec.Scheduler.Tolerations) > 0 {
231-
statefulSetTemplateSpec.Tolerations = postgresSpec.Scheduler.Tolerations
232-
}
233246
if postgresSpec.Resources.Requests != nil || postgresSpec.Resources.Limits != nil {
234247
statefulSetTemplate.Spec.Template.Spec.Containers[0].Resources = postgresSpec.Resources
235248
}

0 commit comments

Comments
 (0)