Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/5062.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
Updates code logic in CNI during application pod creation.
```
71 changes: 66 additions & 5 deletions control-plane/cni/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ import (
"encoding/json"
"fmt"
"net"
"os"
"path/filepath"
"time"

"github.com/containernetworking/cni/pkg/skel"
"github.com/containernetworking/cni/pkg/types"
Expand Down Expand Up @@ -267,19 +269,78 @@ func main() {
skel.PluginMain(c.cmdAdd, cmdCheck, cmdDel, cniv.All, bv.BuildString("consul-cni"))
}

func resolveKubeconfigPath(dir, base string) (string, error) {
// we will return the actual kubeconfig path if present
stable := filepath.Join(dir, base)
if fi, err := os.Stat(stable); err == nil && !fi.IsDir() {
return stable, nil
}
// this will be a fallback to find the most recently modified kubeconfig file with the given base name pattern
// example file names: kubeconfig-<time.Now().UnixNano()>.
pattern := stable + "-*"
matches, err := filepath.Glob(pattern)
if err != nil {
return "", fmt.Errorf("glob failed for %s: %w", pattern, err)
}
if len(matches) == 0 {
return "", fmt.Errorf("no kubeconfig found at %s or %s-*", stable, stable)
}

var newest string
var newestTime time.Time

// we are looping over the matched files to find the most recently modified kubeconfig file, with O(n) complexity
for _, fp := range matches {
fi, err := os.Stat(fp)
if err != nil || fi.IsDir() {
continue
}
if fi.ModTime().After(newestTime) {
newestTime = fi.ModTime()
newest = fp
}
}
// checking if a file was found
if newest == "" {
return "", fmt.Errorf("no valid kubeconfig found at %s or %s-*", stable, stable)
}

return newest, nil
}

// createK8sClient configures the command's Kubernetes API client if it doesn't
// already exist.
// TODO: remove logger for auth provider details
func (c *Command) createK8sClient(cfg *PluginConf, logger hclog.Logger) error {
restConfig, err := clientcmd.BuildConfigFromFlags("", filepath.Join(cfg.CNINetDir, cfg.Kubeconfig))
logger.Info("tokenFile used - ", restConfig.BearerTokenFile)
dir := cfg.CNINetDir
base := cfg.Kubeconfig

path, err := resolveKubeconfigPath(dir, base)
if err != nil {
return fmt.Errorf("could not get rest config from kubernetes api: %s", err)
logger.Warn(
"kubeconfig not found, falling back to default client-go behavior",
"dir", dir,
"base", base,
"err", err,
)
path = ""
}
c.client, err = kubernetes.NewForConfig(restConfig)

restConfig, err := clientcmd.BuildConfigFromFlags("", path)
if err != nil {
return fmt.Errorf("error initializing Kubernetes client: %s", err)
return fmt.Errorf("failed to load kubeconfig %q: %w", path, err)
}
if restConfig == nil {
return fmt.Errorf("restConfig is nil for kubeconfig %q", path)
}
logger.Info("tokenFile used - ", restConfig.BearerTokenFile)

client, err := kubernetes.NewForConfig(restConfig)
if err != nil {
return fmt.Errorf("error initializing Kubernetes client: %w", err)
}

c.client = client
return nil
}

Expand Down
226 changes: 226 additions & 0 deletions control-plane/cni/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,15 @@ import (
"context"
"encoding/json"
"fmt"
"os"
"path/filepath"
"strings"
"testing"
"time"

"github.com/containernetworking/cni/pkg/skel"
"github.com/hashicorp/consul/sdk/iptables"
"github.com/hashicorp/go-hclog"
"github.com/stretchr/testify/require"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -120,6 +124,7 @@ func Test_cmdAdd(t *testing.T) {
iptablesConfigJson, err := json.Marshal(&cfg)
require.NoError(t, err)
pod.Annotations[annotationRedirectTraffic] = string(iptablesConfigJson)
pod.Annotations[annotationDualStack] = "consul.hashicorp.com/dual-stack"
_, err = cmd.client.CoreV1().Pods(defaultNamespace).Create(context.Background(), pod, metav1.CreateOptions{})
require.NoError(t, err)

Expand All @@ -146,6 +151,7 @@ func Test_cmdAdd(t *testing.T) {
iptablesConfigJson, err := json.Marshal(&cfg)
require.NoError(t, err)
pod.Annotations[annotationRedirectTraffic] = string(iptablesConfigJson)
pod.Annotations[annotationDualStack] = "consul.hashicorp.com/dual-stack"
_, err = cmd.client.CoreV1().Pods(defaultNamespace).Create(context.Background(), pod, metav1.CreateOptions{})
require.NoError(t, err)

Expand Down Expand Up @@ -190,6 +196,226 @@ func Test_cmdAdd(t *testing.T) {
}
}

func writeKubeconfig(t *testing.T, dir string, valid bool) string {
t.Helper()

var data string
if valid {
data = `
apiVersion: v1
kind: Config
clusters:
- name: test
cluster:
server: https://127.0.0.1:6443
contexts:
- name: test
context:
cluster: test
user: test
current-context: test
users:
- name: test
user:
token: fake-token
`
} else {
data = `
apiVersion: v1
kind: Config
clusters:
- name: test
cluster: {}
`
}

path := filepath.Join(dir, "kubeconfig")

if err := os.WriteFile(path, []byte(data), 0600); err != nil {
t.Fatalf("failed to write kubeconfig: %v", err)
}

return path
}

// TestResolveKubeconfigPath tests the resolveKubeconfigPath function
func TestResolveKubeconfigPath(t *testing.T) {
tests := []struct {
name string
setup func(t *testing.T, dir string)
wantSuffix string
expectError bool
expectedErrorContains error
}{
{
name: "stable kubeconfig exists",
setup: func(t *testing.T, dir string) {
path := filepath.Join(dir, "kubeconfig")
if err := os.WriteFile(path, []byte("stable"), 0644); err != nil {
t.Fatal(err)
}
},
wantSuffix: "kubeconfig",
expectError: false,
},
{
name: "stable path is directory, fallback to versioned",
setup: func(t *testing.T, dir string) {
if err := os.Mkdir(filepath.Join(dir, "kubeconfig"), 0755); err != nil {
t.Fatal(err)
}
if err := os.WriteFile(
filepath.Join(dir, "kubeconfig-1"),
[]byte("v1"),
0644,
); err != nil {
t.Fatal(err)
}
},
wantSuffix: "kubeconfig-1",
expectError: false,
},
{
name: "single versioned kubeconfig",
setup: func(t *testing.T, dir string) {
if err := os.WriteFile(
filepath.Join(dir, "kubeconfig-123"),
[]byte("v123"),
0644,
); err != nil {
t.Fatal(err)
}
},
wantSuffix: "kubeconfig-123",
expectError: false,
},
{
name: "multiple versioned kubeconfigs, newest chosen",
setup: func(t *testing.T, dir string) {
old := filepath.Join(dir, "kubeconfig-old")
newer := filepath.Join(dir, "kubeconfig-new")

if err := os.WriteFile(old, []byte("old"), 0644); err != nil {
t.Fatal(err)
}
time.Sleep(10 * time.Millisecond) // ensure mtime difference
if err := os.WriteFile(newer, []byte("new"), 0644); err != nil {
t.Fatal(err)
}
},
wantSuffix: "kubeconfig-new",
expectError: false,
},
{
name: "no kubeconfig files",
setup: func(t *testing.T, dir string) {
// nothing created
},
expectError: true,
expectedErrorContains: fmt.Errorf("no kubeconfig found"),
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
dir := t.TempDir()
tc.setup(t, dir)

got, err := resolveKubeconfigPath(dir, "kubeconfig")

if tc.expectError {
if err == nil {
t.Fatalf("expected error, got nil (path=%s)", got)
return
}
require.Contains(t, err.Error(), tc.expectedErrorContains.Error())
return

}

if err != nil {
t.Fatalf("unexpected error: %v", err)
}

if !filepath.IsAbs(got) {
t.Fatalf("expected absolute path, got %s", got)
}

if !strings.HasSuffix(got, tc.wantSuffix) {
t.Fatalf("expected suffix %q, got %q", tc.wantSuffix, got)
}
})
}
}

func TestCreateK8sClient(t *testing.T) {
t.Parallel()
logger := hclog.NewNullLogger()

tests := []struct {
setup func(t *testing.T) *PluginConf
expectedErrorContains error
expectedError bool
expectClient bool
name string
}{
{
name: "Client success",
setup: func(t *testing.T) *PluginConf {
dir := t.TempDir()
writeKubeconfig(t, dir, true)
return &PluginConf{
CNINetDir: dir,
Kubeconfig: "kubeconfig",
}
},
expectedError: false,
expectClient: true,
},
{
name: "No Kubeconfig found",
setup: func(t *testing.T) *PluginConf {
dir := t.TempDir()
return &PluginConf{
CNINetDir: dir,
Kubeconfig: "",
}
},
expectedErrorContains: fmt.Errorf("failed to load kubeconfig"),
expectedError: true,
expectClient: false,
},
{
name: "error from BuildConfigFromFlags",
setup: func(t *testing.T) *PluginConf {
dir := t.TempDir()
writeKubeconfig(t, dir, false) // invalid content
return &PluginConf{
CNINetDir: dir,
Kubeconfig: "kubeconfig", // ALWAYS this
}
},
expectedErrorContains: fmt.Errorf("failed to load kubeconfig"),
expectedError: true,
expectClient: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
cfg := tt.setup(t)
cmd := &Command{}
err := cmd.createK8sClient(cfg, logger)
if tt.expectedError {
require.Contains(t, err.Error(), tt.expectedErrorContains.Error())
t.Logf("✅ expected error occurred: %v", err)
} else {
require.NoError(t, err)
require.NotNil(t, cmd.client)
t.Log("✅ client created successfully")
}
})
}
}
func TestSkipTrafficRedirection(t *testing.T) {
t.Parallel()
cases := []struct {
Expand Down
21 changes: 16 additions & 5 deletions control-plane/subcommand/inject-connect/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"strings"
"sync"
"syscall"
"time"

"github.com/hashicorp/consul-server-connection-manager/discovery"
"github.com/mitchellh/cli"
Expand All @@ -26,6 +27,7 @@ import (
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
"k8s.io/klog/v2"
"k8s.io/utils/ptr"
ctrl "sigs.k8s.io/controller-runtime"
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
"sigs.k8s.io/controller-runtime/pkg/webhook"
Expand Down Expand Up @@ -394,11 +396,20 @@ func (c *Command) Run(args []string) int {
return 1
}

mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
Scheme: scheme,
LeaderElection: true,
LeaderElectionID: "consul-controller-lock",
Logger: zapLogger,
cfg := ctrl.GetConfigOrDie()
cfg.Timeout = 90 * time.Second
cfg.QPS = 50
cfg.Burst = 100

mgr, err := ctrl.NewManager(cfg, ctrl.Options{
Scheme: scheme,
LeaderElection: true,
LeaderElectionID: "consul-controller-lock",
Logger: zapLogger,
LeaderElectionNamespace: c.flagReleaseNamespace,
LeaseDuration: ptr.To(90 * time.Second),
RenewDeadline: ptr.To(60 * time.Second),
RetryPeriod: ptr.To(15 * time.Second),
Metrics: metricsserver.Options{
BindAddress: "0.0.0.0:9444",
},
Expand Down
Loading