Skip to content

Commit 7109290

Browse files
authored
Refactor commands for testing (#9)
Signed-off-by: Jakub Scholz <www@scholzj.com>
1 parent 0e87996 commit 7109290

File tree

6 files changed

+539
-200
lines changed

6 files changed

+539
-200
lines changed

cmd/continue.go

Lines changed: 85 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -17,44 +17,87 @@ package cmd
1717

1818
import (
1919
"context"
20+
"fmt"
21+
"log"
22+
23+
strimzi "github.com/scholzj/strimzi-go/pkg/client/clientset/versioned"
2024
"github.com/spf13/cobra"
2125
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
22-
"log"
2326
)
2427

25-
// continueCmd represents the continue command
26-
var continueCmd = &cobra.Command{
27-
Use: "continue",
28-
Short: "Restarts the Kafka cluster",
29-
Long: "Restarts the Kafka cluster.",
30-
Run: func(cmd *cobra.Command, args []string) {
31-
timeout, _ := cmd.Flags().GetUint32("timeout")
32-
33-
name := cmd.Flag("name").Value.String()
34-
if name == "" {
35-
log.Fatal("--name option is required")
36-
}
28+
type continueOptions struct {
29+
name string
30+
namespace string
31+
kubeconfig string
32+
timeout uint32
33+
}
3734

38-
kubeConfig, kubeConfigNamespace, err := kubeConfigAndNamespace(cmd.Flag("kubeconfig").Value.String())
39-
if err != nil {
40-
log.Fatal(err)
41-
}
35+
// Used for testing
36+
var waitUntilReadyFn = waitUntilReady
4237

43-
namespace, err := determineNamespace(cmd.Flag("namespace").Value.String(), kubeConfigNamespace)
44-
if err != nil {
45-
log.Fatal(err)
46-
}
38+
func newContinueCommand() *cobra.Command {
39+
return &cobra.Command{
40+
Use: "continue",
41+
Short: "Restarts the Kafka cluster",
42+
Long: "Restarts the Kafka cluster.",
43+
RunE: runContinueCommand,
44+
}
45+
}
4746

48-
strimzi, err := strimziClient(kubeConfig)
49-
if err != nil {
50-
log.Fatalf("Failed to create Strimzi client: %v", err)
51-
}
47+
func runContinueCommand(cmd *cobra.Command, args []string) error {
48+
opts, err := continueOptionsFromCmd(cmd)
49+
if err != nil {
50+
return err
51+
}
5252

53-
kafka, err := strimzi.KafkaV1().Kafkas(namespace).Get(context.TODO(), name, metav1.GetOptions{})
54-
if err != nil {
55-
log.Fatalf("Kafka cluster %v in namespace %s not found: %v", name, namespace, err)
56-
}
53+
return runContinue(opts)
54+
}
55+
56+
func continueOptionsFromCmd(cmd *cobra.Command) (continueOptions, error) {
57+
timeout, _ := cmd.Flags().GetUint32("timeout")
58+
59+
name := cmd.Flag("name").Value.String()
60+
if name == "" {
61+
return continueOptions{}, fmt.Errorf("--name option is required")
62+
}
63+
64+
return continueOptions{
65+
name: name,
66+
namespace: cmd.Flag("namespace").Value.String(),
67+
kubeconfig: cmd.Flag("kubeconfig").Value.String(),
68+
timeout: timeout,
69+
}, nil
70+
}
5771

72+
func runContinue(opts continueOptions) error {
73+
kubeConfig, kubeConfigNamespace, err := kubeConfigAndNamespace(opts.kubeconfig)
74+
if err != nil {
75+
return err
76+
}
77+
78+
namespace, err := determineNamespace(opts.namespace, kubeConfigNamespace)
79+
if err != nil {
80+
return err
81+
}
82+
83+
strimziClient, err := strimziClient(kubeConfig)
84+
if err != nil {
85+
return fmt.Errorf("failed to create Strimzi client: %w", err)
86+
}
87+
88+
return runContinueWithClients(opts.name, namespace, opts.timeout, strimziClient)
89+
}
90+
91+
func runContinueWithClients(name string, namespace string, timeout uint32, strimziClient strimzi.Interface) error {
92+
kafka, err := strimziClient.KafkaV1().Kafkas(namespace).Get(context.TODO(), name, metav1.GetOptions{})
93+
if err != nil {
94+
return fmt.Errorf("Kafka cluster %v in namespace %s not found: %w", name, namespace, err)
95+
}
96+
97+
if isReady(kafka) {
98+
log.Printf("Kafka cluster %s in namespace %s is ready and does not need to be restarted", name, namespace)
99+
return nil
100+
} else {
58101
if isReconciliationPaused(kafka) {
59102
log.Printf("Reconciliation of Kafka cluster %s in namespace %s will be unpaused", name, namespace)
60103
unpausedKafka := kafka.DeepCopy()
@@ -66,43 +109,25 @@ var continueCmd = &cobra.Command{
66109
}
67110

68111
log.Printf("Unpausing reconciliation of Kafka cluster %s in namespace %s", name, namespace)
69-
_, err = strimzi.KafkaV1().Kafkas(namespace).Update(context.TODO(), unpausedKafka, metav1.UpdateOptions{})
70-
if err != nil {
71-
log.Fatalf("failed to unpause Kafka cluster %s in namespace %s: %v", name, namespace, err)
72-
}
73-
74-
log.Printf("Waiting for Kafka cluster %s in namespace %s to get ready.", name, namespace)
75-
_, err = waitUntilReady(strimzi, name, namespace, timeout)
112+
_, err = strimziClient.KafkaV1().Kafkas(namespace).Update(context.TODO(), unpausedKafka, metav1.UpdateOptions{})
76113
if err != nil {
77-
log.Fatal(err)
114+
return fmt.Errorf("failed to unpause Kafka cluster %s in namespace %s: %w", name, namespace, err)
78115
}
79-
80-
log.Printf("Kafka cluster %s in namespace %s has been restarted and should be ready", name, namespace)
81-
} else if isReady(kafka) {
82-
log.Printf("Kafka cluster %s in namespace %s is ready and does not need to be restarted", name, namespace)
83116
} else {
84-
log.Printf("Waiting for Kafka cluster %s in namespace %s does not have paused reconciliation, but is not ready.", name, namespace)
85-
log.Printf("Waiting for Kafka cluster %s in namespace %s to get ready.", name, namespace)
86-
_, err = waitUntilReady(strimzi, name, namespace, timeout)
87-
if err != nil {
88-
log.Fatal(err)
89-
}
117+
log.Printf("Kafka cluster %s in namespace %s does not have paused reconciliation, but is not ready.", name, namespace)
118+
}
90119

91-
log.Printf("Kafka cluster %s in namespace %s has been restarted and should be ready", name, namespace)
120+
log.Printf("Waiting for Kafka cluster %s in namespace %s to get ready.", name, namespace)
121+
_, err = waitUntilReadyFn(strimziClient, name, namespace, timeout)
122+
if err != nil {
123+
return err
92124
}
93-
},
125+
126+
log.Printf("Kafka cluster %s in namespace %s is ready", name, namespace)
127+
return nil
128+
}
94129
}
95130

96131
func init() {
97-
rootCmd.AddCommand(continueCmd)
98-
99-
// Here you will define your flags and configuration settings.
100-
101-
// Cobra supports Persistent Flags which will work for this command
102-
// and all subcommands, e.g.:
103-
// continueCmd.PersistentFlags().String("foo", "", "A help for foo")
104-
105-
// Cobra supports local flags which will only run when this command
106-
// is called directly, e.g.:
107-
// continueCmd.Flags().BoolP("toggle", "t", false, "Help message for toggle")
132+
rootCmd.AddCommand(newContinueCommand())
108133
}

cmd/continue_test.go

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
/*
2+
Copyright © 2025 Jakub Scholz
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
package cmd
17+
18+
import (
19+
"context"
20+
"testing"
21+
22+
strimzi "github.com/scholzj/strimzi-go/pkg/client/clientset/versioned"
23+
strimzifake "github.com/scholzj/strimzi-go/pkg/client/clientset/versioned/fake"
24+
"github.com/spf13/cobra"
25+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
26+
)
27+
28+
func TestRunContinueCommand_ReturnsErrorWhenNameMissing(t *testing.T) {
29+
cmd := &cobra.Command{}
30+
addPersistentFlags(cmd)
31+
32+
err := runContinueCommand(cmd, nil)
33+
if err == nil {
34+
t.Fatal("expected error, got nil")
35+
}
36+
37+
if err.Error() != "--name option is required" {
38+
t.Fatalf("unexpected error: %v", err)
39+
}
40+
}
41+
42+
func TestRunContinueWithClients_UnpausesPausedClusterAndWaitsForReady(t *testing.T) {
43+
strimziClient := strimzifake.NewSimpleClientset(newKafkaResource(
44+
"my-cluster",
45+
"ns",
46+
3,
47+
3,
48+
map[string]string{"strimzi.io/pause-reconciliation": "true"},
49+
newCondition("ReconciliationPaused", metav1.ConditionTrue),
50+
))
51+
52+
waitCalled := false
53+
oldWaitFn := waitUntilReadyFn
54+
waitUntilReadyFn = func(client strimzi.Interface, name string, namespace string, timeout uint32) (bool, error) {
55+
waitCalled = true
56+
if name != "my-cluster" || namespace != "ns" || timeout != 123 {
57+
t.Fatalf("unexpected wait args: %s %s %d", name, namespace, timeout)
58+
}
59+
return true, nil
60+
}
61+
t.Cleanup(func() {
62+
waitUntilReadyFn = oldWaitFn
63+
})
64+
65+
err := runContinueWithClients("my-cluster", "ns", 123, strimziClient)
66+
if err != nil {
67+
t.Fatalf("expected no error, got %v", err)
68+
}
69+
70+
if !waitCalled {
71+
t.Fatal("expected waitUntilReady to be called")
72+
}
73+
74+
updatedKafka, err := strimziClient.KafkaV1().Kafkas("ns").Get(context.TODO(), "my-cluster", metav1.GetOptions{})
75+
if err != nil {
76+
t.Fatalf("failed to get updated Kafka resource: %v", err)
77+
}
78+
79+
if updatedKafka.Annotations["strimzi.io/pause-reconciliation"] != "false" {
80+
t.Fatalf("expected reconciliation annotation to be false, got %q", updatedKafka.Annotations["strimzi.io/pause-reconciliation"])
81+
}
82+
}
83+
84+
func TestRunContinueWithClients_ReturnsNilWhenClusterAlreadyReady(t *testing.T) {
85+
strimziClient := strimzifake.NewSimpleClientset(newKafkaResource(
86+
"my-cluster",
87+
"ns",
88+
3,
89+
3,
90+
nil,
91+
newCondition("Ready", metav1.ConditionTrue),
92+
))
93+
94+
waitCalled := false
95+
oldWaitFn := waitUntilReadyFn
96+
waitUntilReadyFn = func(client strimzi.Interface, name string, namespace string, timeout uint32) (bool, error) {
97+
waitCalled = true
98+
return true, nil
99+
}
100+
t.Cleanup(func() {
101+
waitUntilReadyFn = oldWaitFn
102+
})
103+
104+
err := runContinueWithClients("my-cluster", "ns", 123, strimziClient)
105+
if err != nil {
106+
t.Fatalf("expected no error, got %v", err)
107+
}
108+
109+
if waitCalled {
110+
t.Fatal("did not expect waitUntilReady to be called")
111+
}
112+
}

cmd/root.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,14 +40,18 @@ func Execute() {
4040
}
4141
}
4242

43+
func addPersistentFlags(cmd *cobra.Command) {
44+
cmd.PersistentFlags().String("kubeconfig", "", "Path to the kubeconfig file to use for Kubernetes API requests. If not specified, strimzi-shutdown will try to auto-detect the Kubernetes configuration.")
45+
cmd.PersistentFlags().String("namespace", "", "Namespace of the Kafka cluster. If not specified, defaults to the namespace from your Kubernetes configuration.")
46+
cmd.PersistentFlags().String("name", "", "Name of the Kafka cluster")
47+
cmd.PersistentFlags().Uint32P("timeout", "t", 300000, "Timeout for how long to wait when stopping or continuing the Kafka cluster. In milliseconds.")
48+
}
49+
4350
func init() {
4451
// Here you will define your flags and configuration settings.
4552
// Cobra supports persistent flags, which, if defined here,
4653
// will be global for your application.
47-
rootCmd.PersistentFlags().String("kubeconfig", "", "Path to the kubeconfig file to use for Kubernetes API requests. If not specified, strimzi-shutdown will try to auto-detect the Kubernetes configuration.")
48-
rootCmd.PersistentFlags().String("namespace", "", "Namespace of the Kafka cluster. If not specified, defaults to the namespace from your Kubernetes configuration.")
49-
rootCmd.PersistentFlags().String("name", "", "Name of the Kafka cluster")
50-
rootCmd.PersistentFlags().Uint32P("timeout", "t", 300000, "Timeout for how long to wait when stopping or continuing the Kafka cluster. In milliseconds.")
54+
addPersistentFlags(rootCmd)
5155

5256
// Cobra also supports local flags, which will only run
5357
// when this action is called directly.

0 commit comments

Comments
 (0)