Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 15 additions & 2 deletions src/k8s/pkg/proxy/apiserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,20 @@ func (p *APIServerProxy) watchForNewEndpoints(ctx context.Context, cancel func()
}

// TODO: use k8s.GetKubernetesEndpoints instead
newEndpoints, err := getKubernetesEndpoints(ctx, p.KubeconfigFile)
var (
newEndpoints []string
err error
)
for _, ep := range endpoints {
epCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
newEndpoints, err = getKubernetesEndpoints(epCtx, p.KubeconfigFile, ep)
cancel()
if err == nil {
break
}
log.Error(err, "Failed to get kubernetes endpoints", "server", ep)
}

switch {
case err != nil:
log.Error(err, "Failed to retrieve Kubernetes endpoints")
Expand All @@ -80,7 +93,7 @@ func (p *APIServerProxy) watchForNewEndpoints(ctx context.Context, cancel func()
case len(newEndpoints) == len(endpoints) && reflect.DeepEqual(newEndpoints, endpoints):
continue
}
log = log.WithValues("endpoints", endpoints)
log = log.WithValues("endpoints", newEndpoints)
log.Info("Updating endpoints")

if err := WriteEndpointsConfig(newEndpoints, p.EndpointsConfigFile); err != nil {
Expand Down
4 changes: 2 additions & 2 deletions src/k8s/pkg/proxy/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ import (
"k8s.io/client-go/tools/clientcmd"
)

func getKubernetesEndpoints(ctx context.Context, kubeconfigFile string) ([]string, error) {
config, err := clientcmd.BuildConfigFromFlags("", kubeconfigFile)
func getKubernetesEndpoints(ctx context.Context, kubeconfigFile string, server string) ([]string, error) {
config, err := clientcmd.BuildConfigFromFlags(fmt.Sprintf("https://%s", server), kubeconfigFile)
if err != nil {
return nil, fmt.Errorf("failed to read load kubeconfig: %w", err)
}
Expand Down
20 changes: 17 additions & 3 deletions src/k8s/pkg/proxy/userspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package proxy

import (
"context"
"fmt"
"io"
"log"
Expand All @@ -45,7 +46,7 @@ func (r *remote) inactivate() {
}

func (r *remote) tryReactivate() error {
conn, err := net.Dial("tcp", r.addr)
conn, err := r.connect()
if err != nil {
return err
}
Expand All @@ -62,6 +63,20 @@ func (r *remote) isActive() bool {
return !r.inactive
}

// connect establishes a TCP connection to the remote endpoint.
// closing the returned connection is the caller's responsibility.
func (r *remote) connect() (net.Conn, error) {
var d net.Dialer
dialCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
out, err := d.DialContext(dialCtx, "tcp", r.addr)
cancel()
if err != nil {
return nil, fmt.Errorf("failed to establish tcp connection: %w", err)
}

return out, nil
}

type tcpproxy struct {
Listener net.Listener
Endpoints []*net.SRV
Expand Down Expand Up @@ -172,8 +187,7 @@ func (tp *tcpproxy) serve(in net.Conn) {
if remote == nil {
break
}
// TODO: add timeout
out, err = net.Dial("tcp", remote.addr)
out, err = remote.connect()
if err == nil {
break
}
Expand Down