Skip to content

Commit d4b239d

Browse files
committed
Reconcile remote datacenters independently
Refactors ScyllaDBCluster controller to reconcile each datacenter independenly. Errors or connection issues of down datacenter no longer affects reconcilation of other datacenter.
1 parent 48b523d commit d4b239d

13 files changed

+1921
-2763
lines changed

pkg/controller/scylladbcluster/resource.go

+411-504
Large diffs are not rendered by default.

pkg/controller/scylladbcluster/resource_test.go

+1,052-1,713
Large diffs are not rendered by default.

pkg/controller/scylladbcluster/sync.go

+169-146
Large diffs are not rendered by default.

pkg/controller/scylladbcluster/sync_configmap.go

+27-45
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package scylladbcluster
33
import (
44
"context"
55
"fmt"
6+
67
scyllav1alpha1 "github.com/scylladb/scylla-operator/pkg/api/scylla/v1alpha1"
78
"github.com/scylladb/scylla-operator/pkg/controllerhelpers"
89
"github.com/scylladb/scylla-operator/pkg/naming"
@@ -12,15 +13,16 @@ import (
1213
utilerrors "k8s.io/apimachinery/pkg/util/errors"
1314
)
1415

15-
func (scc *Controller) syncConfigMaps(
16+
func (scc *Controller) syncRemoteConfigMaps(
1617
ctx context.Context,
1718
sc *scyllav1alpha1.ScyllaDBCluster,
18-
remoteNamespaces map[string]*corev1.Namespace,
19-
remoteControllers map[string]metav1.Object,
20-
remoteConfigMaps map[string]map[string]*corev1.ConfigMap,
19+
dc *scyllav1alpha1.ScyllaDBClusterDatacenter,
20+
remoteNamespace *corev1.Namespace,
21+
remoteController metav1.Object,
22+
remoteConfigMaps map[string]*corev1.ConfigMap,
2123
managingClusterDomain string,
2224
) ([]metav1.Condition, error) {
23-
progressingConditions, requiredConfigMaps, err := MakeRemoteConfigMaps(sc, remoteNamespaces, remoteControllers, scc.configMapLister, managingClusterDomain)
25+
progressingConditions, requiredConfigMaps, err := MakeRemoteConfigMaps(sc, dc, remoteNamespace, remoteController, scc.configMapLister, managingClusterDomain)
2426
if err != nil {
2527
return progressingConditions, fmt.Errorf("can't make required configmaps: %w", err)
2628
}
@@ -29,54 +31,34 @@ func (scc *Controller) syncConfigMaps(
2931
return progressingConditions, nil
3032
}
3133

32-
// Delete has to be the first action to avoid getting stuck on quota.
33-
var deletionErrors []error
34-
for _, dc := range sc.Spec.Datacenters {
35-
ns, ok := remoteNamespaces[dc.RemoteKubernetesClusterName]
36-
if !ok {
37-
continue
38-
}
39-
40-
clusterClient, err := scc.kubeRemoteClient.Cluster(dc.RemoteKubernetesClusterName)
41-
if err != nil {
42-
return nil, fmt.Errorf("can't get client to %q cluster: %w", dc.RemoteKubernetesClusterName, err)
43-
}
44-
45-
err = controllerhelpers.Prune(ctx,
46-
requiredConfigMaps[dc.RemoteKubernetesClusterName],
47-
remoteConfigMaps[dc.RemoteKubernetesClusterName],
48-
&controllerhelpers.PruneControlFuncs{
49-
DeleteFunc: clusterClient.CoreV1().ConfigMaps(ns.Name).Delete,
50-
},
51-
scc.eventRecorder,
52-
)
53-
if err != nil {
54-
return progressingConditions, fmt.Errorf("can't prune configmap(s) in %q Datacenter of %q ScyllaDBCluster: %w", dc.Name, naming.ObjRef(sc), err)
55-
}
34+
clusterClient, err := scc.kubeRemoteClient.Cluster(dc.RemoteKubernetesClusterName)
35+
if err != nil {
36+
return nil, fmt.Errorf("can't get client to %q cluster: %w", dc.RemoteKubernetesClusterName, err)
5637
}
5738

58-
if err := utilerrors.NewAggregate(deletionErrors); err != nil {
59-
return nil, fmt.Errorf("can't prune remote configmap(s): %w", err)
39+
// Delete has to be the first action to avoid getting stuck on quota.
40+
err = controllerhelpers.Prune(ctx,
41+
requiredConfigMaps,
42+
remoteConfigMaps,
43+
&controllerhelpers.PruneControlFuncs{
44+
DeleteFunc: clusterClient.CoreV1().ConfigMaps(remoteNamespace.Name).Delete,
45+
},
46+
scc.eventRecorder,
47+
)
48+
if err != nil {
49+
return progressingConditions, fmt.Errorf("can't prune configmap(s) in %q Datacenter of %q ScyllaDBCluster: %w", dc.Name, naming.ObjRef(sc), err)
6050
}
6151

6252
var errs []error
63-
for _, dc := range sc.Spec.Datacenters {
64-
clusterClient, err := scc.kubeRemoteClient.Cluster(dc.RemoteKubernetesClusterName)
53+
for _, cm := range requiredConfigMaps {
54+
_, changed, err := resourceapply.ApplyConfigMap(ctx, clusterClient.CoreV1(), scc.remoteConfigMapLister.Cluster(dc.RemoteKubernetesClusterName), scc.eventRecorder, cm, resourceapply.ApplyOptions{})
55+
if changed {
56+
controllerhelpers.AddGenericProgressingStatusCondition(&progressingConditions, remoteConfigMapControllerProgressingCondition, cm, "apply", sc.Generation)
57+
}
6558
if err != nil {
66-
errs = append(errs, fmt.Errorf("can't get client to %q cluster: %w", dc.Name, err))
59+
errs = append(errs, fmt.Errorf("can't apply configmap: %w", err))
6760
continue
6861
}
69-
70-
for _, cm := range requiredConfigMaps[dc.RemoteKubernetesClusterName] {
71-
_, changed, err := resourceapply.ApplyConfigMap(ctx, clusterClient.CoreV1(), scc.remoteConfigMapLister.Cluster(dc.RemoteKubernetesClusterName), scc.eventRecorder, cm, resourceapply.ApplyOptions{})
72-
if changed {
73-
controllerhelpers.AddGenericProgressingStatusCondition(&progressingConditions, remoteConfigMapControllerProgressingCondition, cm, "apply", sc.Generation)
74-
}
75-
if err != nil {
76-
errs = append(errs, fmt.Errorf("can't apply configmap: %w", err))
77-
continue
78-
}
79-
}
8062
}
8163

8264
err = utilerrors.NewAggregate(errs)

pkg/controller/scylladbcluster/sync_endpoints.go

+32-52
Original file line numberDiff line numberDiff line change
@@ -12,78 +12,58 @@ import (
1212
"github.com/scylladb/scylla-operator/pkg/resourceapply"
1313
corev1 "k8s.io/api/core/v1"
1414
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
15-
utilerrors "k8s.io/apimachinery/pkg/util/errors"
1615
)
1716

18-
func (scc *Controller) syncEndpoints(
17+
func (scc *Controller) syncRemoteEndpoints(
1918
ctx context.Context,
2019
sc *scyllav1alpha1.ScyllaDBCluster,
20+
dc *scyllav1alpha1.ScyllaDBClusterDatacenter,
21+
remoteNamespace *corev1.Namespace,
22+
remoteController metav1.Object,
23+
remoteEndpoints map[string]*corev1.Endpoints,
2124
remoteNamespaces map[string]*corev1.Namespace,
22-
remoteControllers map[string]metav1.Object,
23-
remoteEndpoints map[string]map[string]*corev1.Endpoints,
2425
managingClusterDomain string,
2526
) ([]metav1.Condition, error) {
26-
progressingConditions, requiredEndpointSlices, err := MakeRemoteEndpointSlices(sc, remoteNamespaces, remoteControllers, scc.remoteServiceLister, scc.remotePodLister, managingClusterDomain)
27+
progressingConditions, requiredEndpointSlices, err := MakeRemoteEndpointSlices(sc, dc, remoteNamespace, remoteController, remoteNamespaces, scc.remoteServiceLister, scc.remotePodLister, managingClusterDomain)
2728
if err != nil {
2829
return progressingConditions, fmt.Errorf("can't make endpointslices: %w", err)
2930
}
3031

31-
requiredEndpoints := make(map[string][]*corev1.Endpoints, len(requiredEndpointSlices))
32+
requiredEndpoints := make([]*corev1.Endpoints, 0, len(requiredEndpointSlices))
3233

33-
for _, dc := range sc.Spec.Datacenters {
34-
re, err := controllerhelpers.ConvertEndpointSlicesToEndpoints(requiredEndpointSlices[dc.RemoteKubernetesClusterName])
35-
if err != nil {
36-
return progressingConditions, fmt.Errorf("can't convert endpointslices to endpoints: %w", err)
37-
}
38-
39-
requiredEndpoints[dc.RemoteKubernetesClusterName] = re
34+
re, err := controllerhelpers.ConvertEndpointSlicesToEndpoints(requiredEndpointSlices)
35+
if err != nil {
36+
return progressingConditions, fmt.Errorf("can't convert endpointslices to endpoints: %w", err)
4037
}
4138

42-
// Delete any excessive Endpoints.
43-
// Delete has to be the first action to avoid getting stuck on quota.
44-
var deletionErrors []error
45-
for _, dc := range sc.Spec.Datacenters {
46-
ns, ok := remoteNamespaces[dc.RemoteKubernetesClusterName]
47-
if !ok {
48-
continue
49-
}
50-
51-
clusterClient, err := scc.kubeRemoteClient.Cluster(dc.RemoteKubernetesClusterName)
52-
if err != nil {
53-
return nil, fmt.Errorf("can't get client to %q cluster: %w", dc.RemoteKubernetesClusterName, err)
54-
}
39+
requiredEndpoints = append(requiredEndpoints, re...)
5540

56-
err = controllerhelpers.Prune(ctx,
57-
requiredEndpoints[dc.RemoteKubernetesClusterName],
58-
remoteEndpoints[dc.RemoteKubernetesClusterName],
59-
&controllerhelpers.PruneControlFuncs{
60-
DeleteFunc: clusterClient.CoreV1().Endpoints(ns.Name).Delete,
61-
},
62-
scc.eventRecorder,
63-
)
64-
if err != nil {
65-
return progressingConditions, fmt.Errorf("can't prune endpoints in %q Datacenter of %q ScyllaDBCluster: %w", dc.Name, naming.ObjRef(sc), err)
66-
}
41+
clusterClient, err := scc.kubeRemoteClient.Cluster(dc.RemoteKubernetesClusterName)
42+
if err != nil {
43+
return nil, fmt.Errorf("can't get client to %q cluster: %w", dc.RemoteKubernetesClusterName, err)
6744
}
6845

69-
if err := utilerrors.NewAggregate(deletionErrors); err != nil {
70-
return nil, fmt.Errorf("can't prune remote endpoints: %w", err)
46+
// Delete any excessive Endpoints.
47+
// Delete has to be the first action to avoid getting stuck on quota.
48+
err = controllerhelpers.Prune(ctx,
49+
requiredEndpoints,
50+
remoteEndpoints,
51+
&controllerhelpers.PruneControlFuncs{
52+
DeleteFunc: clusterClient.CoreV1().Endpoints(remoteNamespace.Name).Delete,
53+
},
54+
scc.eventRecorder,
55+
)
56+
if err != nil {
57+
return progressingConditions, fmt.Errorf("can't prune endpoints in %q Datacenter of %q ScyllaDBCluster: %w", dc.Name, naming.ObjRef(sc), err)
7158
}
7259

73-
for _, dc := range sc.Spec.Datacenters {
74-
clusterClient, err := scc.kubeRemoteClient.Cluster(dc.RemoteKubernetesClusterName)
75-
if err != nil {
76-
return nil, fmt.Errorf("can't get client to %q cluster: %w", dc.Name, err)
60+
for _, e := range requiredEndpoints {
61+
_, changed, err := resourceapply.ApplyEndpoints(ctx, clusterClient.CoreV1(), scc.remoteEndpointsLister.Cluster(dc.RemoteKubernetesClusterName), scc.eventRecorder, e, resourceapply.ApplyOptions{})
62+
if changed {
63+
controllerhelpers.AddGenericProgressingStatusCondition(&progressingConditions, remoteEndpointsControllerProgressingCondition, e, "apply", sc.Generation)
7764
}
78-
79-
for _, e := range requiredEndpoints[dc.RemoteKubernetesClusterName] {
80-
_, changed, err := resourceapply.ApplyEndpoints(ctx, clusterClient.CoreV1(), scc.remoteEndpointsLister.Cluster(dc.RemoteKubernetesClusterName), scc.eventRecorder, e, resourceapply.ApplyOptions{})
81-
if changed {
82-
controllerhelpers.AddGenericProgressingStatusCondition(&progressingConditions, remoteEndpointsControllerProgressingCondition, e, "apply", sc.Generation)
83-
}
84-
if err != nil {
85-
return nil, fmt.Errorf("can't apply endpoints: %w", err)
86-
}
65+
if err != nil {
66+
return nil, fmt.Errorf("can't apply endpoints: %w", err)
8767
}
8868
}
8969

pkg/controller/scylladbcluster/sync_endpointslices.go

+27-45
Original file line numberDiff line numberDiff line change
@@ -13,70 +13,52 @@ import (
1313
corev1 "k8s.io/api/core/v1"
1414
discoveryv1 "k8s.io/api/discovery/v1"
1515
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
16-
utilerrors "k8s.io/apimachinery/pkg/util/errors"
1716
)
1817

19-
func (scc *Controller) syncEndpointSlices(
18+
func (scc *Controller) syncRemoteEndpointSlices(
2019
ctx context.Context,
2120
sc *scyllav1alpha1.ScyllaDBCluster,
21+
dc *scyllav1alpha1.ScyllaDBClusterDatacenter,
22+
remoteNamespace *corev1.Namespace,
23+
remoteController metav1.Object,
24+
remoteEndpointSlices map[string]*discoveryv1.EndpointSlice,
2225
remoteNamespaces map[string]*corev1.Namespace,
23-
remoteControllers map[string]metav1.Object,
24-
remoteEndpointSlices map[string]map[string]*discoveryv1.EndpointSlice,
2526
managingClusterDomain string,
2627
) ([]metav1.Condition, error) {
27-
progressingConditions, requiredEndpointSlices, err := MakeRemoteEndpointSlices(sc, remoteNamespaces, remoteControllers, scc.remoteServiceLister, scc.remotePodLister, managingClusterDomain)
28+
progressingConditions, requiredEndpointSlices, err := MakeRemoteEndpointSlices(sc, dc, remoteNamespace, remoteController, remoteNamespaces, scc.remoteServiceLister, scc.remotePodLister, managingClusterDomain)
2829
if err != nil {
2930
return progressingConditions, fmt.Errorf("can't make endpointslices: %w", err)
3031
}
3132
if len(progressingConditions) > 0 {
3233
return progressingConditions, nil
3334
}
3435

35-
// Delete any excessive EndpointSlices.
36-
// Delete has to be the first action to avoid getting stuck on quota.
37-
var deletionErrors []error
38-
for _, dc := range sc.Spec.Datacenters {
39-
ns, ok := remoteNamespaces[dc.RemoteKubernetesClusterName]
40-
if !ok {
41-
continue
42-
}
43-
44-
clusterClient, err := scc.kubeRemoteClient.Cluster(dc.RemoteKubernetesClusterName)
45-
if err != nil {
46-
return nil, fmt.Errorf("can't get client to %q cluster: %w", dc.RemoteKubernetesClusterName, err)
47-
}
48-
49-
err = controllerhelpers.Prune(ctx,
50-
requiredEndpointSlices[dc.RemoteKubernetesClusterName],
51-
remoteEndpointSlices[dc.RemoteKubernetesClusterName],
52-
&controllerhelpers.PruneControlFuncs{
53-
DeleteFunc: clusterClient.DiscoveryV1().EndpointSlices(ns.Name).Delete,
54-
},
55-
scc.eventRecorder,
56-
)
57-
if err != nil {
58-
return progressingConditions, fmt.Errorf("can't prune endpointslices in %q Datacenter of %q ScyllaDBCluster: %w", dc.Name, naming.ObjRef(sc), err)
59-
}
36+
clusterClient, err := scc.kubeRemoteClient.Cluster(dc.RemoteKubernetesClusterName)
37+
if err != nil {
38+
return nil, fmt.Errorf("can't get client to %q cluster: %w", dc.RemoteKubernetesClusterName, err)
6039
}
6140

62-
if err := utilerrors.NewAggregate(deletionErrors); err != nil {
63-
return nil, fmt.Errorf("can't prune remote endpointslice(s): %w", err)
41+
// Delete any excessive EndpointSlices.
42+
// Delete has to be the first action to avoid getting stuck on quota.
43+
err = controllerhelpers.Prune(ctx,
44+
requiredEndpointSlices,
45+
remoteEndpointSlices,
46+
&controllerhelpers.PruneControlFuncs{
47+
DeleteFunc: clusterClient.DiscoveryV1().EndpointSlices(remoteNamespace.Name).Delete,
48+
},
49+
scc.eventRecorder,
50+
)
51+
if err != nil {
52+
return progressingConditions, fmt.Errorf("can't prune endpointslices in %q Datacenter of %q ScyllaDBCluster: %w", dc.Name, naming.ObjRef(sc), err)
6453
}
6554

66-
for _, dc := range sc.Spec.Datacenters {
67-
clusterClient, err := scc.kubeRemoteClient.Cluster(dc.RemoteKubernetesClusterName)
68-
if err != nil {
69-
return nil, fmt.Errorf("can't get client to %q region: %w", dc.Name, err)
55+
for _, es := range requiredEndpointSlices {
56+
_, changed, err := resourceapply.ApplyEndpointSlice(ctx, clusterClient.DiscoveryV1(), scc.remoteEndpointSliceLister.Cluster(dc.RemoteKubernetesClusterName), scc.eventRecorder, es, resourceapply.ApplyOptions{})
57+
if changed {
58+
controllerhelpers.AddGenericProgressingStatusCondition(&progressingConditions, remoteEndpointSliceControllerProgressingCondition, es, "apply", sc.Generation)
7059
}
71-
72-
for _, es := range requiredEndpointSlices[dc.RemoteKubernetesClusterName] {
73-
_, changed, err := resourceapply.ApplyEndpointSlice(ctx, clusterClient.DiscoveryV1(), scc.remoteEndpointSliceLister.Cluster(dc.RemoteKubernetesClusterName), scc.eventRecorder, es, resourceapply.ApplyOptions{})
74-
if changed {
75-
controllerhelpers.AddGenericProgressingStatusCondition(&progressingConditions, remoteEndpointSliceControllerProgressingCondition, es, "apply", sc.Generation)
76-
}
77-
if err != nil {
78-
return nil, fmt.Errorf("can't apply endpointslice: %w", err)
79-
}
60+
if err != nil {
61+
return nil, fmt.Errorf("can't apply endpointslice: %w", err)
8062
}
8163
}
8264

0 commit comments

Comments
 (0)