Skip to content

Commit e786d72

Browse files
committed
kube: controller lifecycle improvements
* Optimize the `Dockerfile` to use cache mounts * Use the `context.Context` during HTTP operations * Add an internal context for normal controller operations * Sequence shutdown to wait for informer to stop and then attempt to unexpose any ports before exiting * Unexpose ports in parallel, we just log out all the errors regardless Signed-off-by: Milas Bowman <[email protected]>
1 parent dc331cb commit e786d72

File tree

5 files changed

+291
-186
lines changed

5 files changed

+291
-186
lines changed

go/Dockerfile.kube-forwarder

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,21 @@
11
# syntax=docker/dockerfile:1
22
FROM golang:1.19-alpine AS builder
33

4-
WORKDIR /go/src/github.com/moby/vpnkit/go
5-
COPY . /go/src/github.com/moby/vpnkit
6-
74
RUN apk add --no-cache musl-dev build-base
8-
RUN GOPATH=/go CGO_ENABLED=1 go build -buildmode pie -ldflags "-linkmode=external -s -extldflags \"-fno-PIC -static\"" -o /kube-vpnkit-forwarder /go/src/github.com/moby/vpnkit/go/cmd/kube-vpnkit-forwarder/main.go
5+
6+
# no separate go mod download step because vendoring is in use
7+
COPY . /src
8+
WORKDIR /src
9+
10+
RUN --mount=type=bind,target=. \
11+
--mount=type=cache,target=/root/.cache \
12+
--mount=type=cache,target=/go/pkg/mod \
13+
CGO_ENABLED=1 go build \
14+
-mod=vendor \
15+
-buildmode pie \
16+
-ldflags '-linkmode=external -s -extldflags "-fno-PIC -static"' \
17+
-o /kube-vpnkit-forwarder \
18+
./go/cmd/kube-vpnkit-forwarder
919

1020
FROM scratch
1121
COPY --link --from=builder /kube-vpnkit-forwarder /kube-vpnkit-forwarder

go/cmd/kube-vpnkit-forwarder/main.go

Lines changed: 42 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,42 @@
11
package main
22

33
import (
4+
"context"
45
"flag"
5-
"log"
66
"os"
77
"os/signal"
88
"syscall"
99
"time"
1010

1111
"github.com/moby/vpnkit/go/pkg/controller"
1212
"github.com/moby/vpnkit/go/pkg/vpnkit"
13+
log "github.com/sirupsen/logrus"
1314
"k8s.io/client-go/informers"
1415
"k8s.io/client-go/kubernetes"
1516
"k8s.io/client-go/rest"
1617
)
1718

19+
const defaultLogLevel = log.InfoLevel
20+
1821
var path string
22+
var logLevelName string
1923

2024
func main() {
2125
flag.StringVar(&path, "path", "", "unix socket to vpnkit port forward API")
26+
flag.StringVar(&logLevelName, "log-level", defaultLogLevel.String(), "log output level (error, warn, info, debug)")
2227
flag.Parse()
2328

29+
if logLevel, err := log.ParseLevel(logLevelName); err == nil {
30+
log.SetLevel(logLevel)
31+
} else {
32+
log.SetLevel(defaultLogLevel)
33+
log.Warnf("Using default log level (%s): %v", defaultLogLevel.String(), err)
34+
}
35+
2436
log.Println("Starting kube-vpnkit-forwarder...")
2537

38+
rootCtx := context.Background()
39+
2640
clusterConfig, err := rest.InClusterConfig()
2741
if err != nil {
2842
log.Fatal(err.Error())
@@ -40,18 +54,38 @@ func main() {
4054
if err != nil {
4155
log.Fatal(err)
4256
}
43-
controller := controller.New(vpnkitClient, clientset.CoreV1())
44-
defer controller.Dispose()
45-
46-
informer.AddEventHandler(controller)
57+
vpnkitController := controller.New(rootCtx, vpnkitClient, clientset.CoreV1())
58+
if _, err := informer.AddEventHandler(vpnkitController); err != nil {
59+
log.Fatal(err)
60+
}
4761

62+
// stop signals to the informer to stop the controllers
63+
// informerDone signals that the informer has actually stopped running
4864
stop := make(chan struct{})
49-
go informer.Run(stop)
65+
informerDone := make(chan struct{})
66+
go func() {
67+
defer close(informerDone)
68+
informer.Run(stop)
69+
}()
5070

5171
signalChan := make(chan os.Signal, 1)
5272
signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM)
5373
<-signalChan
54-
55-
log.Println("Shutdown signal received, exiting...")
74+
log.Println("Shutdown signal received")
5675
close(stop)
76+
77+
// allow the informer a chance to stop cleanly
78+
log.Println("Waiting for controller to finish")
79+
select {
80+
case <-time.After(10 * time.Second):
81+
log.Warn("Controller shutdown timed out")
82+
case <-informerDone:
83+
}
84+
85+
// always attempt cleanup, even if the informer didn't stop nicely,
86+
// we can still hopefully unexpose any open ports
87+
log.Println("Cleaning up controller")
88+
cleanupCtx, cancel := context.WithTimeout(rootCtx, 15*time.Second)
89+
defer cancel()
90+
vpnkitController.Dispose(cleanupCtx)
5791
}

go/pkg/controller/controller.go

Lines changed: 27 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"fmt"
66
"net"
7+
"sync"
78

89
"github.com/moby/vpnkit/go/pkg/vpnkit"
910
"github.com/pkg/errors"
@@ -20,34 +21,50 @@ const annotation = "vpnkit-k8s-controller"
2021
type Controller struct {
2122
services corev1client.ServicesGetter
2223
client vpnkit.Client
24+
25+
internalCtx context.Context
26+
cancel context.CancelFunc
2327
}
2428

2529
// New creates a new controller
26-
func New(client vpnkit.Client, services corev1client.ServicesGetter) *Controller {
30+
func New(rootCtx context.Context, client vpnkit.Client, services corev1client.ServicesGetter) *Controller {
31+
internalCtx, cancel := context.WithCancel(rootCtx)
2732
return &Controller{
28-
services: services,
29-
client: client,
33+
internalCtx: internalCtx,
34+
cancel: cancel,
35+
services: services,
36+
client: client,
3037
}
3138
}
3239

3340
var _ cache.ResourceEventHandler = &Controller{}
3441

35-
// Dispose unexpose all ports previously exposed by this controller
36-
func (c *Controller) Dispose() {
37-
ctx := context.Background()
42+
// Dispose unexposes all ports previously exposed by this controller
43+
func (c *Controller) Dispose(ctx context.Context) {
44+
// stop any ongoing operations using the internalCtx
45+
c.cancel()
46+
3847
ports, err := c.client.ListExposed(ctx)
3948
if err != nil {
4049
log.Infof("Cannot list exposed ports: %v", err)
4150
return
4251
}
43-
for _, port := range ports {
52+
var wg sync.WaitGroup
53+
for i := range ports {
54+
port := ports[i]
4455
if port.Annotation != annotation {
4556
continue
4657
}
47-
if err := c.client.Unexpose(ctx, &port); err != nil {
48-
log.Infof("cannot unexpose port: %v", err)
49-
}
58+
wg.Add(1)
59+
go func() {
60+
defer wg.Done()
61+
log.Infof("Unexposing port: %s", port.String())
62+
if err := c.client.Unexpose(ctx, &port); err != nil {
63+
log.Infof("cannot unexpose port: %v", err)
64+
}
65+
}()
5066
}
67+
wg.Wait()
5168
}
5269

5370
// OnAdd exposes port if necessary

0 commit comments

Comments
 (0)