@@ -4,18 +4,30 @@ package v1alpha1
44
55import (
66 "context"
7+ "errors"
78 "fmt"
9+ "reflect"
10+ "sort"
11+ "strings"
12+ "time"
13+
14+ scyllav1 "github.com/scylladb/scylla-operator/pkg/api/scylla/v1"
815 scyllav1alpha1 "github.com/scylladb/scylla-operator/pkg/api/scylla/v1alpha1"
916 "github.com/scylladb/scylla-operator/pkg/controllerhelpers"
1017 "github.com/scylladb/scylla-operator/pkg/helpers"
1118 "github.com/scylladb/scylla-operator/pkg/helpers/slices"
1219 "github.com/scylladb/scylla-operator/pkg/naming"
1320 "github.com/scylladb/scylla-operator/pkg/pointer"
1421 "github.com/scylladb/scylla-operator/pkg/scyllaclient"
22+ "github.com/scylladb/scylla-operator/test/e2e/framework"
23+ appsv1 "k8s.io/api/apps/v1"
1524 corev1 "k8s.io/api/core/v1"
1625 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1726 "k8s.io/apimachinery/pkg/labels"
27+ "k8s.io/apimachinery/pkg/util/wait"
28+ appv1client "k8s.io/client-go/kubernetes/typed/apps/v1"
1829 corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
30+ "k8s.io/klog/v2"
1931)
2032
2133func GetScyllaClient (ctx context.Context , client corev1client.CoreV1Interface , sdc * scyllav1alpha1.ScyllaDBDatacenter ) (* scyllaclient.Client , []string , error ) {
@@ -160,4 +172,153 @@ func GetBroadcastAddress(ctx context.Context, client corev1client.CoreV1Interfac
160172 }
161173
162174 return broadcastAddress , nil
175+
176+ }
177+
178+ func ContextForRollout (parent context.Context , sdc * scyllav1alpha1.ScyllaDBDatacenter ) (context.Context , context.CancelFunc ) {
179+ return context .WithTimeout (parent , RolloutTimeoutForScyllaDBDatacenter (sdc ))
180+ }
181+
182+ func RolloutTimeoutForScyllaDBDatacenter (sdc * scyllav1alpha1.ScyllaDBDatacenter ) time.Duration {
183+ return SyncTimeout + time .Duration (GetNodeCount (sdc ))* memberRolloutTimeout + cleanupJobTimeout
184+ }
185+
186+ func GetNodeCount (sdc * scyllav1alpha1.ScyllaDBDatacenter ) int32 {
187+ nodes := int32 (0 )
188+ rackTemplateNodes := int32 (0 )
189+
190+ if sdc .Spec .RackTemplate != nil && sdc .Spec .RackTemplate .Nodes != nil {
191+ rackTemplateNodes = * sdc .Spec .RackTemplate .Nodes
192+ }
193+
194+ for _ , r := range sdc .Spec .Racks {
195+ if r .Nodes != nil {
196+ nodes += * r .Nodes
197+ } else {
198+ nodes += rackTemplateNodes
199+ }
200+ }
201+
202+ return nodes
203+ }
204+
205+ func IsScyllaDBDatacenterRolledOut (sdc * scyllav1alpha1.ScyllaDBDatacenter ) (bool , error ) {
206+ if ! helpers .IsStatusConditionPresentAndTrue (sdc .Status .Conditions , scyllav1 .AvailableCondition , sdc .Generation ) {
207+ return false , nil
208+ }
209+
210+ if ! helpers .IsStatusConditionPresentAndFalse (sdc .Status .Conditions , scyllav1 .ProgressingCondition , sdc .Generation ) {
211+ return false , nil
212+ }
213+
214+ if ! helpers .IsStatusConditionPresentAndFalse (sdc .Status .Conditions , scyllav1 .DegradedCondition , sdc .Generation ) {
215+ return false , nil
216+ }
217+
218+ framework .Infof ("ScyllaDBDatacenter %s (RV=%s) is rolled out" , klog .KObj (sdc ), sdc .ResourceVersion )
219+
220+ return true , nil
221+ }
222+
223+ func IsScyllaDBManagerClusterRegistrationRolledOut (smcr * scyllav1alpha1.ScyllaDBManagerClusterRegistration ) (bool , error ) {
224+ if ! helpers .IsStatusConditionPresentAndFalse (smcr .Status .Conditions , scyllav1 .ProgressingCondition , smcr .Generation ) {
225+ return false , nil
226+ }
227+
228+ if ! helpers .IsStatusConditionPresentAndFalse (smcr .Status .Conditions , scyllav1 .DegradedCondition , smcr .Generation ) {
229+ return false , nil
230+ }
231+
232+ framework .Infof ("ScyllaDBManagerClusterRegistration %s (RV=%s) is rolled out" , klog .KObj (smcr ), smcr .ResourceVersion )
233+
234+ return true , nil
235+ }
236+
237+ func GetStatefulSetsForScyllaDBDatacenter (ctx context.Context , client appv1client.AppsV1Interface , sdc * scyllav1alpha1.ScyllaDBDatacenter ) (map [string ]* appsv1.StatefulSet , error ) {
238+ statefulsetList , err := client .StatefulSets (sdc .Namespace ).List (ctx , metav1.ListOptions {
239+ LabelSelector : labels.Set {
240+ naming .ClusterNameLabel : sdc .Name ,
241+ }.AsSelector ().String (),
242+ })
243+ if err != nil {
244+ return nil , err
245+ }
246+
247+ res := map [string ]* appsv1.StatefulSet {}
248+ for _ , s := range statefulsetList .Items {
249+ rackName := s .Labels [naming .RackNameLabel ]
250+ res [rackName ] = & s
251+ }
252+
253+ return res , nil
254+ }
255+
256+ func GetPodsForStatefulSet (ctx context.Context , client corev1client.CoreV1Interface , sts * appsv1.StatefulSet ) (map [string ]* corev1.Pod , error ) {
257+ selector , err := metav1 .LabelSelectorAsSelector (sts .Spec .Selector )
258+ if err != nil {
259+ return nil , fmt .Errorf ("can't convert StatefulSet %q selector: %w" , naming .ObjRef (sts ), err )
260+ }
261+
262+ podList , err := client .Pods (sts .Namespace ).List (ctx , metav1.ListOptions {
263+ LabelSelector : selector .String (),
264+ })
265+ if err != nil {
266+ return nil , fmt .Errorf ("can't list Pods for StatefulSet %q: %w" , naming .ObjRef (sts ), err )
267+ }
268+
269+ res := map [string ]* corev1.Pod {}
270+ for _ , pod := range podList .Items {
271+ res [pod .Name ] = & pod
272+ }
273+
274+ return res , nil
275+ }
276+
277+ // TODO: Should be unified with function coming from test/helpers once e2e's there starts using ScyllaDBDatacenter API.
278+ func WaitForFullQuorum (ctx context.Context , client corev1client.CoreV1Interface , sdc * scyllav1alpha1.ScyllaDBDatacenter , sortedExpectedHosts []string ) error {
279+ scyllaClient , hosts , err := GetScyllaClient (ctx , client , sdc )
280+ if err != nil {
281+ return fmt .Errorf ("can't get scylla client: %w" , err )
282+ }
283+ defer scyllaClient .Close ()
284+
285+ // Wait for node status to propagate and reach consistency.
286+ // This can take a while so let's set a large enough timeout to avoid flakes.
287+ return wait .PollImmediateWithContext (ctx , 1 * time .Second , 5 * time .Minute , func (ctx context.Context ) (done bool , err error ) {
288+ allSeeAllAsUN := true
289+ infoMessages := make ([]string , 0 , len (hosts ))
290+ var errs []error
291+ for _ , h := range hosts {
292+ var s scyllaclient.NodeStatusInfoSlice
293+ s , err = scyllaClient .Status (ctx , h )
294+ if err != nil {
295+ return true , fmt .Errorf ("can't get scylla status on node %q: %w" , h , err )
296+ }
297+
298+ sHosts := s .Hosts ()
299+ sort .Strings (sHosts )
300+ if ! reflect .DeepEqual (sHosts , sortedExpectedHosts ) {
301+ errs = append (errs , fmt .Errorf ("node %q thinks the cluster consists of different nodes: %s" , h , sHosts ))
302+ }
303+
304+ downHosts := s .DownHosts ()
305+ infoMessages = append (infoMessages , fmt .Sprintf ("Node %q, down: %q, up: %q" , h , strings .Join (downHosts , "\n " ), strings .Join (s .LiveHosts (), "," )))
306+
307+ if len (downHosts ) != 0 {
308+ allSeeAllAsUN = false
309+ }
310+ }
311+
312+ if ! allSeeAllAsUN {
313+ framework .Infof ("ScyllaDB nodes have not reached status consistency yet. Statuses:\n %s" , strings .Join (infoMessages , "," ))
314+ }
315+
316+ err = errors .Join (errs ... )
317+ if err != nil {
318+ framework .Infof ("ScyllaDB nodes encountered an error. Statuses:\n %s" , strings .Join (infoMessages , "," ))
319+ return true , err
320+ }
321+
322+ return allSeeAllAsUN , nil
323+ })
163324}
0 commit comments