Skip to content

Commit a7b6bd1

Browse files
authored
Merge pull request #274 from kleewho/fix/remove-default-context-k8s
feat(k8s): gracefully shutdown Kubernetes pod on exit
2 parents c57fe50 + 74d7796 commit a7b6bd1

File tree

4 files changed

+107
-53
lines changed

4 files changed

+107
-53
lines changed

internal/k8s/executer_test.go

Lines changed: 18 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package k8s_test
22

33
import (
4+
"context"
45
"encoding/json"
56
"strings"
67
"testing"
@@ -44,16 +45,15 @@ func (runner *TestRunner) Execute(binary string, args []string) error {
4445
}
4546

4647
func TestExecWithImageAndImagePullSecretProvided(t *testing.T) {
48+
var clientContext internal.ClientContext
49+
clientContext.Kubernetes.Image = "private.registry.com/deviceinsight/kafkactl"
50+
clientContext.Kubernetes.ImagePullSecret = "registry-secret"
4751

48-
var context internal.ClientContext
49-
context.Kubernetes.Image = "private.registry.com/deviceinsight/kafkactl"
50-
context.Kubernetes.ImagePullSecret = "registry-secret"
51-
52-
var testRunner = TestRunner{}
52+
testRunner := TestRunner{}
5353
testRunner.response = []byte(sampleKubectlVersionOutput)
5454
var runner k8s.Runner = &testRunner
5555

56-
exec, err := k8s.NewExecutor(context, runner)
56+
exec, err := k8s.NewExecutor(context.Background(), clientContext, runner)
5757
if err != nil {
5858
t.Fatal(err)
5959
}
@@ -64,7 +64,7 @@ func TestExecWithImageAndImagePullSecretProvided(t *testing.T) {
6464
}
6565

6666
image := extractParam(t, testRunner.args, "--image")
67-
if image != context.Kubernetes.Image+":latest-scratch" {
67+
if image != clientContext.Kubernetes.Image+":latest-scratch" {
6868
t.Fatalf("wrong image: %s", image)
6969
}
7070

@@ -74,21 +74,20 @@ func TestExecWithImageAndImagePullSecretProvided(t *testing.T) {
7474
t.Fatalf("unable to unmarshall overrides: %v", err)
7575
}
7676
if len(podOverrides.Spec.ImagePullSecrets) != 1 ||
77-
podOverrides.Spec.ImagePullSecrets[0].Name != context.Kubernetes.ImagePullSecret {
77+
podOverrides.Spec.ImagePullSecrets[0].Name != clientContext.Kubernetes.ImagePullSecret {
7878
t.Fatalf("wrong overrides: %s", overrides)
7979
}
8080
}
8181

8282
func TestExecWithoutPodOverridesProvided(t *testing.T) {
83+
var clientContext internal.ClientContext
84+
clientContext.Kubernetes.Image = "private.registry.com/deviceinsight/kafkactl"
8385

84-
var context internal.ClientContext
85-
context.Kubernetes.Image = "private.registry.com/deviceinsight/kafkactl"
86-
87-
var testRunner = TestRunner{}
86+
testRunner := TestRunner{}
8887
testRunner.response = []byte(sampleKubectlVersionOutput)
8988
var runner k8s.Runner = &testRunner
9089

91-
exec, err := k8s.NewExecutor(context, runner)
90+
exec, err := k8s.NewExecutor(context.Background(), clientContext, runner)
9291
if err != nil {
9392
t.Fatal(err)
9493
}
@@ -104,31 +103,29 @@ func TestExecWithoutPodOverridesProvided(t *testing.T) {
104103
}
105104

106105
func TestExecWithImageAndTagAddsSuffix(t *testing.T) {
106+
var clientContext internal.ClientContext
107+
clientContext.Kubernetes.Image = "private.registry.com/deviceinsight/kafkactl:latest"
107108

108-
var context internal.ClientContext
109-
context.Kubernetes.Image = "private.registry.com/deviceinsight/kafkactl:latest"
110-
111-
var testRunner = TestRunner{}
109+
testRunner := TestRunner{}
112110
testRunner.response = []byte(sampleKubectlVersionOutput)
113111
var runner k8s.Runner = &testRunner
114112

115-
exec, err := k8s.NewExecutor(context, runner)
113+
exec, err := k8s.NewExecutor(context.Background(), clientContext, runner)
116114
if err != nil {
117115
t.Fatal(err)
118116
}
119117

120118
_ = exec.Run("scratch", "/kafkactl", []string{"version"}, []string{"ENV_A=1"})
121119

122120
image := extractParam(t, testRunner.args, "--image")
123-
if image != context.Kubernetes.Image+"-scratch" {
121+
if image != clientContext.Kubernetes.Image+"-scratch" {
124122
t.Fatalf("wrong image: %s", image)
125123
}
126124
}
127125

128126
//nolint:gocognit
129127
func TestParseKubectlVersion(t *testing.T) {
130-
131-
var testRunner = TestRunner{}
128+
testRunner := TestRunner{}
132129
var runner k8s.Runner = &testRunner
133130

134131
type tests struct {
@@ -178,7 +175,6 @@ func TestParseKubectlVersion(t *testing.T) {
178175
},
179176
} {
180177
t.Run(test.description, func(t *testing.T) {
181-
182178
testRunner.response = []byte(test.kubectlOutput)
183179

184180
version, err := k8s.GetKubectlVersion("kubectl", runner)
@@ -214,7 +210,6 @@ func TestParseKubectlVersion(t *testing.T) {
214210
}
215211

216212
func extractParam(t *testing.T, args []string, param string) string {
217-
218213
var paramIdx int
219214

220215
if paramIdx = indexOf(param, args); paramIdx < 0 {

internal/k8s/executor.go

Lines changed: 37 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package k8s
22

33
import (
4+
"context"
45
"encoding/json"
56
"fmt"
67
"math/rand"
@@ -40,6 +41,7 @@ type executor struct {
4041
nodeSelector map[string]string
4142
affinity map[string]any
4243
tolerations []internal.K8sToleration
44+
ctx context.Context
4345
}
4446

4547
const letterBytes = "abcdefghijklmnpqrstuvwxyz123456789"
@@ -95,30 +97,31 @@ func getKubectlVersion(kubectlBinary string, runner Runner) (Version, error) {
9597
}, nil
9698
}
9799

98-
func newExecutor(context internal.ClientContext, runner Runner) (*executor, error) {
99-
version, err := getKubectlVersion(context.Kubernetes.Binary, runner)
100+
func newExecutor(ctx context.Context, clientContext internal.ClientContext, runner Runner) (*executor, error) {
101+
version, err := getKubectlVersion(clientContext.Kubernetes.Binary, runner)
100102
if err != nil {
101103
return nil, err
102104
}
103105

104106
return &executor{
105-
kubectlBinary: context.Kubernetes.Binary,
107+
kubectlBinary: clientContext.Kubernetes.Binary,
106108
version: version,
107-
image: context.Kubernetes.Image,
108-
imagePullSecret: context.Kubernetes.ImagePullSecret,
109-
clientID: internal.GetClientID(&context, ""),
110-
kubeConfig: context.Kubernetes.KubeConfig,
111-
kubeContext: context.Kubernetes.KubeContext,
112-
namespace: context.Kubernetes.Namespace,
113-
serviceAccount: context.Kubernetes.ServiceAccount,
114-
asUser: context.Kubernetes.AsUser,
115-
keepPod: context.Kubernetes.KeepPod,
116-
labels: context.Kubernetes.Labels,
117-
annotations: context.Kubernetes.Annotations,
118-
nodeSelector: context.Kubernetes.NodeSelector,
119-
affinity: context.Kubernetes.Affinity,
120-
tolerations: context.Kubernetes.Tolerations,
109+
image: clientContext.Kubernetes.Image,
110+
imagePullSecret: clientContext.Kubernetes.ImagePullSecret,
111+
clientID: internal.GetClientID(&clientContext, ""),
112+
kubeConfig: clientContext.Kubernetes.KubeConfig,
113+
kubeContext: clientContext.Kubernetes.KubeContext,
114+
namespace: clientContext.Kubernetes.Namespace,
115+
serviceAccount: clientContext.Kubernetes.ServiceAccount,
116+
asUser: clientContext.Kubernetes.AsUser,
117+
keepPod: clientContext.Kubernetes.KeepPod,
118+
labels: clientContext.Kubernetes.Labels,
119+
annotations: clientContext.Kubernetes.Annotations,
120+
nodeSelector: clientContext.Kubernetes.NodeSelector,
121+
affinity: clientContext.Kubernetes.Affinity,
122+
tolerations: clientContext.Kubernetes.Tolerations,
121123
runner: runner,
124+
ctx: ctx,
122125
}, nil
123126
}
124127

@@ -176,8 +179,24 @@ func (kubectl *executor) Run(dockerImageType, entryPoint string, kafkactlArgs []
176179
return !strings.HasPrefix(s, "-C=") && !strings.HasPrefix(s, "--config-file=") && !strings.HasPrefix(s, "--context=")
177180
}
178181
kubectlArgs = append(kubectlArgs, filter(kafkactlArgs, allExceptConfigFileFilter)...)
182+
errChan := make(chan error, 1)
179183

180-
return kubectl.exec(kubectlArgs)
184+
go func() {
185+
errChan <- kubectl.exec(kubectlArgs)
186+
close(errChan)
187+
}()
188+
189+
select {
190+
case <-kubectl.ctx.Done():
191+
err := kubectl.exec([]string{"delete", "pod", podName, "-n", kubectl.namespace, "--wait=true"})
192+
if err != nil {
193+
output.Warnf("delete pod %s returned an error %w", podName, err)
194+
return err
195+
}
196+
return context.Canceled
197+
case err := <-errChan:
198+
return err
199+
}
181200
}
182201

183202
func addTerminalSizeEnv(args []string) []string {

internal/k8s/k8s-operation.go

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package k8s
22

33
import (
4+
"context"
45
"encoding/json"
56
"fmt"
67
"strings"
@@ -47,21 +48,21 @@ func (op *operation) initialize(context internal.ClientContext) error {
4748
}
4849

4950
func (op *operation) Attach() error {
50-
context, err := internal.CreateClientContext()
51+
clientContext, err := internal.CreateClientContext()
5152
if err != nil {
5253
return err
5354
}
5455

55-
if err := op.initialize(context); err != nil {
56+
if err := op.initialize(clientContext); err != nil {
5657
return err
5758
}
5859

59-
exec, err := newExecutor(context, op.runner)
60+
exec, err := newExecutor(context.Background(), clientContext, op.runner)
6061
if err != nil {
6162
return err
6263
}
6364

64-
podEnvironment := parsePodEnvironment(context)
65+
podEnvironment := parsePodEnvironment(clientContext)
6566

6667
return exec.Run("ubuntu", "bash", nil, podEnvironment, "--tty")
6768
}
@@ -76,15 +77,15 @@ func (op *operation) Run(cmd *cobra.Command, args []string) error {
7677
return fmt.Errorf("kubernetes not enabled")
7778
}
7879

79-
return op.run(context, cmd, args)
80+
return op.run(cmd.Context(), context, cmd, args)
8081
}
8182

82-
func (op *operation) run(context internal.ClientContext, cmd *cobra.Command, args []string) error {
83-
if err := op.initialize(context); err != nil {
83+
func (op *operation) run(ctx context.Context, clientContext internal.ClientContext, cmd *cobra.Command, args []string) error {
84+
if err := op.initialize(clientContext); err != nil {
8485
return err
8586
}
8687

87-
exec, err := newExecutor(context, op.runner)
88+
exec, err := newExecutor(ctx, clientContext, op.runner)
8889
if err != nil {
8990
return err
9091
}
@@ -95,7 +96,7 @@ func (op *operation) run(context internal.ClientContext, cmd *cobra.Command, arg
9596
return err
9697
}
9798

98-
podEnvironment := parsePodEnvironment(context)
99+
podEnvironment := parsePodEnvironment(clientContext)
99100

100101
kafkaCtlCommand = append(kafkaCtlCommand, args...)
101102
kafkaCtlCommand = append(kafkaCtlCommand, kafkaCtlFlags...)

main.go

Lines changed: 42 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,18 +15,57 @@
1515
package main
1616

1717
import (
18+
"context"
19+
"errors"
1820
"os"
21+
"os/signal"
22+
"syscall"
23+
"time"
1924

2025
"github.com/deviceinsight/kafkactl/v5/cmd"
2126
"github.com/deviceinsight/kafkactl/v5/internal/output"
2227
)
2328

2429
func main() {
30+
ctx, cancel := signal.NotifyContext(context.Background(),
31+
syscall.SIGTERM, syscall.SIGINT)
32+
defer cancel()
2533

26-
ioStreams := output.DefaultIOStreams()
27-
28-
if err := cmd.NewKafkactlCommand(ioStreams).Execute(); err != nil {
34+
if err := runWithShutdownTimeout(ctx, 30*time.Second); err != nil {
35+
if errors.Is(err, context.Canceled) {
36+
os.Exit(0)
37+
}
38+
if errors.Is(err, context.DeadlineExceeded) {
39+
output.Warnf("Shutdown timeout exceeded, forced exit")
40+
os.Exit(1)
41+
}
2942
output.Warnf("%v", err)
3043
os.Exit(1)
3144
}
3245
}
46+
47+
func runWithShutdownTimeout(ctx context.Context, shutdownTimeout time.Duration) error {
48+
errChan := make(chan error, 1)
49+
ioStreams := output.DefaultIOStreams()
50+
51+
rootCmd := cmd.NewKafkactlCommand(ioStreams)
52+
53+
go func() {
54+
errChan <- rootCmd.ExecuteContext(ctx)
55+
close(errChan)
56+
}()
57+
58+
select {
59+
case err := <-errChan:
60+
return err
61+
case <-ctx.Done():
62+
output.Debugf("Shutdown signal received, allowing %v for graceful shutdown...", shutdownTimeout)
63+
64+
select {
65+
case err := <-errChan:
66+
return err
67+
case <-time.Tick(shutdownTimeout):
68+
return errors.Join(context.DeadlineExceeded, ctx.Err())
69+
}
70+
}
71+
}

0 commit comments

Comments
 (0)