Skip to content

Commit 36dd9ee

Browse files
authored
Merge pull request #187 from aojea/race_http
Improve startup latency
2 parents 0ca7080 + ed431a0 commit 36dd9ee

File tree

4 files changed

+168
-59
lines changed

4 files changed

+168
-59
lines changed

.github/workflows/k8s.yml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,10 @@ jobs:
5656
sudo cp ${TMP_DIR}/e2e.test /usr/local/bin/e2e.test
5757
sudo cp ${TMP_DIR}/kubectl /usr/local/bin/kubectl
5858
sudo cp ${TMP_DIR}/kind /usr/local/bin/kind
59-
sudo chmod +x /usr/local/bin/*
59+
sudo chmod +x /usr/local/bin/ginkgo
60+
sudo chmod +x /usr/local/bin/e2e.test
61+
sudo chmod +x /usr/local/bin/kubectl
62+
sudo chmod +x /usr/local/bin/kind
6063
# Create folder to store artifacts
6164
mkdir -p _artifacts
6265

pkg/controller/controller.go

Lines changed: 53 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,7 @@ package controller
22

33
import (
44
"context"
5-
"crypto/tls"
65
"fmt"
7-
"io"
86
"net/http"
97
"sync"
108
"time"
@@ -14,6 +12,7 @@ import (
1412
utilfeature "k8s.io/apiserver/pkg/util/feature"
1513
"k8s.io/client-go/informers"
1614
"k8s.io/client-go/kubernetes"
15+
"k8s.io/client-go/rest"
1716
"k8s.io/client-go/tools/clientcmd"
1817
cloudprovider "k8s.io/cloud-provider"
1918
nodecontroller "k8s.io/cloud-provider/controllers/node"
@@ -109,80 +108,76 @@ func (c *Controller) Run(ctx context.Context) {
109108
}
110109
}
111110

111+
func (c *Controller) getKubeConfig(cluster string, internal bool) (*rest.Config, error) {
112+
kconfig, err := c.kind.KubeConfig(cluster, internal)
113+
if err != nil {
114+
klog.Errorf("Failed to get kubeconfig for cluster %s: %v", cluster, err)
115+
return nil, err
116+
}
117+
118+
config, err := clientcmd.RESTConfigFromKubeConfig([]byte(kconfig))
119+
if err != nil {
120+
klog.Errorf("Failed to convert kubeconfig for cluster %s: %v", cluster, err)
121+
return nil, err
122+
}
123+
return config, nil
124+
}
125+
112126
// getKubeClient returns a kubeclient for the cluster passed as argument
113127
// It tries first to connect to the internal endpoint.
114128
func (c *Controller) getKubeClient(ctx context.Context, cluster string) (kubernetes.Interface, error) {
115-
httpClient := &http.Client{
116-
Timeout: 5 * time.Second,
117-
Transport: &http.Transport{
118-
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
119-
},
129+
addresses := []string{}
130+
internalConfig, err := c.getKubeConfig(cluster, true)
131+
if err != nil {
132+
klog.Errorf("Failed to get internal kubeconfig for cluster %s: %v", cluster, err)
133+
} else {
134+
addresses = append(addresses, internalConfig.Host)
135+
}
136+
externalConfig, err := c.getKubeConfig(cluster, false)
137+
if err != nil {
138+
klog.Errorf("Failed to get external kubeconfig for cluster %s: %v", cluster, err)
139+
} else {
140+
addresses = append(addresses, externalConfig.Host)
120141
}
121-
// prefer internal (direct connectivity) over no-internal (commonly portmap)
122-
for _, internal := range []bool{true, false} {
123-
kconfig, err := c.kind.KubeConfig(cluster, internal)
124-
if err != nil {
125-
klog.Errorf("Failed to get kubeconfig for cluster %s: %v", cluster, err)
126-
continue
127-
}
128142

129-
config, err := clientcmd.RESTConfigFromKubeConfig([]byte(kconfig))
130-
if err != nil {
131-
klog.Errorf("Failed to convert kubeconfig for cluster %s: %v", cluster, err)
132-
continue
133-
}
143+
if len(addresses) == 0 {
144+
return nil, fmt.Errorf("could not find kubeconfig for cluster %s", cluster)
145+
}
134146

135-
// check that the apiserver is reachable before continue
136-
// to fail fast and avoid waiting until the client operations timeout
137-
var ok bool
138-
for i := 0; i < 5; i++ {
139-
select {
140-
case <-ctx.Done():
141-
return nil, ctx.Err()
142-
default:
143-
}
144-
if probeHTTP(httpClient, config.Host) {
145-
ok = true
146-
break
147-
}
147+
var host string
148+
for i := 0; i < 5; i++ {
149+
host, err = firstSuccessfulProbe(ctx, addresses)
150+
if err != nil {
151+
klog.Errorf("Failed to connect to any address in %v: %v", addresses, err)
148152
time.Sleep(time.Second * time.Duration(i))
153+
} else {
154+
klog.Infof("Connected succesfully to %s", host)
155+
break
149156
}
150-
if !ok {
151-
klog.Errorf("Failed to connect to apiserver %s: %v", cluster, err)
152-
continue
153-
}
157+
}
154158

155-
kubeClient, err := kubernetes.NewForConfig(config)
156-
if err != nil {
157-
klog.Errorf("Failed to create kubeClient for cluster %s: %v", cluster, err)
158-
continue
159-
}
159+
var config *rest.Config
160+
switch host {
161+
case internalConfig.Host:
162+
config = internalConfig
160163
// the first cluster will give us the type of connectivity between
161164
// cloud-provider-kind and the clusters and load balancer containers.
162165
// In Linux or containerized cloud-provider-kind this will be direct.
163166
once.Do(func() {
164-
if internal {
165-
cpkconfig.DefaultConfig.ControlPlaneConnectivity = cpkconfig.Direct
166-
}
167+
cpkconfig.DefaultConfig.ControlPlaneConnectivity = cpkconfig.Direct
167168
})
168-
return kubeClient, err
169+
case externalConfig.Host:
170+
config = externalConfig
171+
default:
172+
return nil, fmt.Errorf("restConfig for host %s not avaliable", host)
169173
}
170-
return nil, fmt.Errorf("can not find a working kubernetes clientset")
171-
}
172174

173-
func probeHTTP(client *http.Client, address string) bool {
174-
klog.Infof("probe HTTP address %s", address)
175-
resp, err := client.Get(address)
175+
kubeClient, err := kubernetes.NewForConfig(config)
176176
if err != nil {
177-
klog.Infof("Failed to connect to HTTP address %s: %v", address, err)
178-
return false
177+
klog.Errorf("Failed to create kubeClient for cluster %s: %v", cluster, err)
178+
return kubeClient, err
179179
}
180-
defer resp.Body.Close()
181-
// drain the body
182-
io.ReadAll(resp.Body) // nolint:errcheck
183-
// we only want to verify connectivity so don't need to check the http status code
184-
// as the apiserver may not be ready
185-
return true
180+
return kubeClient, nil
186181
}
187182

188183
// TODO: implement leader election to not have problems with multiple providers

pkg/controller/http.go

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
package controller
2+
3+
import (
4+
"context"
5+
"crypto/tls"
6+
"fmt"
7+
"io"
8+
"net/http"
9+
"sync"
10+
"time"
11+
12+
"k8s.io/klog/v2"
13+
)
14+
15+
func probeHTTP(ctx context.Context, address string) bool {
16+
klog.Infof("probe HTTP address %s", address)
17+
httpClient := &http.Client{
18+
Timeout: 2 * time.Second,
19+
Transport: &http.Transport{
20+
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
21+
},
22+
}
23+
req, err := http.NewRequest("GET", address, nil)
24+
if err != nil {
25+
return false
26+
}
27+
req = req.WithContext(ctx)
28+
resp, err := httpClient.Do(req)
29+
if err != nil {
30+
klog.Infof("Failed to connect to HTTP address %s: %v", address, err)
31+
return false
32+
}
33+
defer resp.Body.Close()
34+
// drain the body
35+
io.ReadAll(resp.Body) // nolint:errcheck
36+
// we only want to verify connectivity so don't need to check the http status code
37+
// as the apiserver may not be ready
38+
return true
39+
}
40+
41+
// firstSuccessfulProbe probes the given addresses in parallel and returns the first address to succeed, cancelling the other probes.
42+
func firstSuccessfulProbe(ctx context.Context, addresses []string) (string, error) {
43+
var wg sync.WaitGroup
44+
resultChan := make(chan string, 1)
45+
46+
ctx, cancel := context.WithCancel(ctx)
47+
defer cancel()
48+
49+
for _, addr := range addresses {
50+
wg.Add(1)
51+
go func(address string) {
52+
defer wg.Done()
53+
if probeHTTP(ctx, address) {
54+
select {
55+
case resultChan <- address:
56+
default:
57+
}
58+
cancel()
59+
}
60+
}(addr)
61+
}
62+
63+
go func() {
64+
wg.Wait()
65+
close(resultChan)
66+
}()
67+
68+
select {
69+
case result := <-resultChan:
70+
return result, nil
71+
case <-ctx.Done():
72+
return "", fmt.Errorf("no address succeeded")
73+
}
74+
}

pkg/controller/http_test.go

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package controller
2+
3+
import (
4+
"context"
5+
"net/http"
6+
"net/http/httptest"
7+
"testing"
8+
"time"
9+
)
10+
11+
func Test_firstSuccessfulProbe(t *testing.T) {
12+
reqCh := make(chan struct{})
13+
ts := httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
14+
t.Logf("received connection ")
15+
close(reqCh)
16+
}))
17+
ts.EnableHTTP2 = true
18+
ts.StartTLS()
19+
defer ts.Close()
20+
// use an address that is not likely to exist to avoid flakes
21+
addresses := []string{"https://127.0.1.201:12349", ts.URL}
22+
got, err := firstSuccessfulProbe(context.Background(), addresses)
23+
if err != nil {
24+
t.Errorf("firstSuccessfulProbe() error = %v", err)
25+
return
26+
}
27+
if got != ts.URL {
28+
t.Errorf("firstSuccessfulProbe() = %v, want %v", got, ts.URL)
29+
}
30+
31+
select {
32+
case <-reqCh:
33+
case <-time.After(10 * time.Second):
34+
t.Fatalf("test timed out, no request received")
35+
}
36+
37+
}

0 commit comments

Comments
 (0)