Skip to content

Commit 873aa9e

Browse files
feat: improve direct clientsets for yurthub (#2290)
Signed-off-by: rambohe-ch <[email protected]> (cherry picked from commit 7b661fe) Co-authored-by: rambohe-ch <[email protected]>
1 parent f6f1f87 commit 873aa9e

File tree

13 files changed

+288
-178
lines changed

13 files changed

+288
-178
lines changed

cmd/yurthub/app/start.go

+6-6
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ import (
3535
"github.com/openyurtio/openyurt/pkg/yurthub/cachemanager"
3636
"github.com/openyurtio/openyurt/pkg/yurthub/gc"
3737
"github.com/openyurtio/openyurt/pkg/yurthub/healthchecker"
38-
hubrest "github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/rest"
38+
"github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/directclient"
3939
"github.com/openyurtio/openyurt/pkg/yurthub/locallb"
4040
"github.com/openyurtio/openyurt/pkg/yurthub/proxy"
4141
"github.com/openyurtio/openyurt/pkg/yurthub/server"
@@ -121,8 +121,8 @@ func Run(ctx context.Context, cfg *config.YurtHubConfiguration) error {
121121
}
122122
trace++
123123

124-
klog.Infof("%d. new restConfig manager", trace)
125-
restConfigMgr, err := hubrest.NewRestConfigManager(cfg.CertManager, cloudHealthChecker)
124+
klog.Infof("%d. new direct client manager", trace)
125+
directClientManager, err := directclient.NewRestClientManager(cfg.RemoteServers, transportManager, cloudHealthChecker)
126126
if err != nil {
127127
return fmt.Errorf("could not new restConfig manager, %w", err)
128128
}
@@ -139,7 +139,7 @@ func Run(ctx context.Context, cfg *config.YurtHubConfiguration) error {
139139

140140
if cfg.WorkingMode == util.WorkingModeEdge {
141141
klog.Infof("%d. new gc manager for node %s, and gc frequency is a random time between %d min and %d min", trace, cfg.NodeName, cfg.GCFrequency, 3*cfg.GCFrequency)
142-
gcMgr, err := gc.NewGCManager(cfg, restConfigMgr, ctx.Done())
142+
gcMgr, err := gc.NewGCManager(cfg, directClientManager, ctx.Done())
143143
if err != nil {
144144
return fmt.Errorf("could not new gc manager, %w", err)
145145
}
@@ -161,7 +161,7 @@ func Run(ctx context.Context, cfg *config.YurtHubConfiguration) error {
161161
yurtProxyHandler, err := proxy.NewYurtReverseProxyHandler(
162162
cfg,
163163
cacheMgr,
164-
restConfigMgr,
164+
directClientManager,
165165
transportManager,
166166
cloudHealthChecker,
167167
tenantMgr,
@@ -176,7 +176,7 @@ func Run(ctx context.Context, cfg *config.YurtHubConfiguration) error {
176176
}
177177

178178
klog.Infof("%d. new %s server and begin to serve", trace, projectinfo.GetHubName())
179-
if err := server.RunYurtHubServers(cfg, yurtProxyHandler, restConfigMgr, ctx.Done()); err != nil {
179+
if err := server.RunYurtHubServers(cfg, yurtProxyHandler, directClientManager, ctx.Done()); err != nil {
180180
return fmt.Errorf("could not run hub servers, %w", err)
181181
}
182182
} else {

pkg/yurthub/gc/gc.go

+11-20
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ import (
3131

3232
"github.com/openyurtio/openyurt/cmd/yurthub/app/config"
3333
"github.com/openyurtio/openyurt/pkg/yurthub/cachemanager"
34-
"github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/rest"
34+
"github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/directclient"
3535
"github.com/openyurtio/openyurt/pkg/yurthub/storage"
3636
"github.com/openyurtio/openyurt/pkg/yurthub/util"
3737
)
@@ -43,15 +43,15 @@ var (
4343
// GCManager is responsible for cleanup garbage of yurthub
4444
type GCManager struct {
4545
store cachemanager.StorageWrapper
46-
restConfigManager *rest.RestConfigManager
46+
manager *directclient.DirectClientManager
4747
nodeName string
4848
eventsGCFrequency time.Duration
4949
lastTime time.Time
5050
stopCh <-chan struct{}
5151
}
5252

5353
// NewGCManager creates a *GCManager object
54-
func NewGCManager(cfg *config.YurtHubConfiguration, restConfigManager *rest.RestConfigManager, stopCh <-chan struct{}) (*GCManager, error) {
54+
func NewGCManager(cfg *config.YurtHubConfiguration, directClientManager *directclient.DirectClientManager, stopCh <-chan struct{}) (*GCManager, error) {
5555
gcFrequency := cfg.GCFrequency
5656
if gcFrequency == 0 {
5757
gcFrequency = defaultEventGcInterval
@@ -60,7 +60,7 @@ func NewGCManager(cfg *config.YurtHubConfiguration, restConfigManager *rest.Rest
6060
// TODO: use disk storage directly
6161
store: cfg.StorageWrapper,
6262
nodeName: cfg.NodeName,
63-
restConfigManager: restConfigManager,
63+
manager: directClientManager,
6464
eventsGCFrequency: time.Duration(gcFrequency) * time.Minute,
6565
stopCh: stopCh,
6666
}
@@ -75,14 +75,9 @@ func (m *GCManager) Run() {
7575
go wait.JitterUntil(func() {
7676
klog.V(2).Infof("start gc events after waiting %v from previous gc", time.Since(m.lastTime))
7777
m.lastTime = time.Now()
78-
cfg := m.restConfigManager.GetRestConfig(true)
79-
if cfg == nil {
80-
klog.Errorf("could not get rest config, so skip gc")
81-
return
82-
}
83-
kubeClient, err := clientset.NewForConfig(cfg)
84-
if err != nil {
85-
klog.Errorf("could not new kube client, %v", err)
78+
kubeClient := m.manager.GetDirectClientset(true)
79+
if kubeClient == nil {
80+
klog.Warningf("all remote servers are unhealthy, skip gc events")
8681
return
8782
}
8883

@@ -109,14 +104,10 @@ func (m *GCManager) gcPodsWhenRestart() {
109104
if len(localPodKeys) == 0 {
110105
return
111106
}
112-
cfg := m.restConfigManager.GetRestConfig(true)
113-
if cfg == nil {
114-
klog.Errorf("could not get rest config, so skip gc pods when restart")
115-
return
116-
}
117-
kubeClient, err := clientset.NewForConfig(cfg)
118-
if err != nil {
119-
klog.Errorf("could not new kube client, %v", err)
107+
108+
kubeClient := m.manager.GetDirectClientset(true)
109+
if kubeClient == nil {
110+
klog.Warningf("all remote servers are unhealthy, skip gc pods")
120111
return
121112
}
122113

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
/*
2+
Copyright 2021 The OpenYurt Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package directclient
18+
19+
import (
20+
"fmt"
21+
"net/url"
22+
23+
"k8s.io/client-go/kubernetes"
24+
"k8s.io/client-go/rest"
25+
"k8s.io/klog/v2"
26+
27+
"github.com/openyurtio/openyurt/pkg/yurthub/healthchecker"
28+
"github.com/openyurtio/openyurt/pkg/yurthub/transport"
29+
)
30+
31+
// DirectClientManager is a holder for clientsets which are used to connecting cloud kube-apiserver directly.
32+
// All clientsets are prepared when yurthub startup, so it is efficient to get a clientset by this manager
33+
// for accessing cloud kube-apiserver.
34+
type DirectClientManager struct {
35+
checker healthchecker.MultipleBackendsHealthChecker
36+
serverToClientset map[string]*kubernetes.Clientset
37+
}
38+
39+
func NewRestClientManager(servers []*url.URL, tansportManager transport.Interface, healthChecker healthchecker.MultipleBackendsHealthChecker) (*DirectClientManager, error) {
40+
mgr := &DirectClientManager{
41+
checker: healthChecker,
42+
serverToClientset: make(map[string]*kubernetes.Clientset),
43+
}
44+
45+
for i := range servers {
46+
config := &rest.Config{
47+
Host: servers[i].String(),
48+
Transport: tansportManager.CurrentTransport(),
49+
}
50+
51+
clientset, err := kubernetes.NewForConfig(config)
52+
if err != nil {
53+
return nil, err
54+
}
55+
56+
if len(servers[i].String()) != 0 {
57+
mgr.serverToClientset[servers[i].String()] = clientset
58+
}
59+
}
60+
61+
if len(mgr.serverToClientset) == 0 {
62+
return nil, fmt.Errorf("clientset should not be empty")
63+
}
64+
65+
return mgr, nil
66+
}
67+
68+
// GetDirectClientset gets kube clientset according to the healthy status of server
69+
func (rcm *DirectClientManager) GetDirectClientset(needHealthyServer bool) *kubernetes.Clientset {
70+
var serverHost string
71+
if needHealthyServer {
72+
healthyServer, _ := rcm.checker.PickHealthyServer()
73+
if healthyServer == nil {
74+
klog.Infof("all of remote servers are unhealthy, so return nil for clientset")
75+
return nil
76+
}
77+
serverHost = healthyServer.String()
78+
} else {
79+
for host := range rcm.serverToClientset {
80+
serverHost = host
81+
}
82+
}
83+
84+
return rcm.serverToClientset[serverHost]
85+
}

pkg/yurthub/kubernetes/rest/config_test.go pkg/yurthub/kubernetes/directclient/manager_test.go

+34-20
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
1414
limitations under the License.
1515
*/
1616

17-
package rest
17+
package directclient
1818

1919
import (
2020
"context"
@@ -29,18 +29,18 @@ import (
2929
"github.com/openyurtio/openyurt/pkg/yurthub/certificate/manager"
3030
"github.com/openyurtio/openyurt/pkg/yurthub/certificate/testdata"
3131
"github.com/openyurtio/openyurt/pkg/yurthub/healthchecker"
32+
"github.com/openyurtio/openyurt/pkg/yurthub/transport"
3233
)
3334

34-
var (
35-
testDir = "/tmp/rest/"
36-
)
37-
38-
func TestGetRestConfig(t *testing.T) {
35+
func TestGetDirectClientset(t *testing.T) {
36+
testDir, err := os.MkdirTemp("", "test-client")
37+
if err != nil {
38+
t.Fatalf("failed to make temp dir, %v", err)
39+
}
3940
nodeName := "foo"
4041
servers := map[string]int{"https://10.10.10.113:6443": 2}
4142
u, _ := url.Parse("https://10.10.10.113:6443")
4243
remoteServers := []*url.URL{u}
43-
fakeHealthyChecker := healthchecker.NewFakeChecker(false, servers)
4444

4545
client, err := testdata.CreateCertFakeClient("../../certificate/testdata")
4646
if err != nil {
@@ -73,32 +73,46 @@ func TestGetRestConfig(t *testing.T) {
7373
t.Errorf("certificates are not ready, %v", err)
7474
}
7575

76-
rcm, _ := NewRestConfigManager(certManager, fakeHealthyChecker)
76+
transportManager, err := transport.NewTransportManager(certManager, context.Background().Done())
77+
if err != nil {
78+
t.Fatalf("could not new transport manager, %v", err)
79+
}
7780

7881
testcases := map[string]struct {
82+
healthy bool
7983
needHealthyServer bool
80-
cfgIsNil bool
84+
clientIsNil bool
8185
}{
82-
"do not need healthy server": {
83-
needHealthyServer: false,
84-
cfgIsNil: false,
86+
"get client from healthy servers": {
87+
healthy: true,
88+
needHealthyServer: true,
89+
clientIsNil: false,
8590
},
86-
"need healthy server": {
91+
"get client from unhealthy servers": {
92+
healthy: false,
8793
needHealthyServer: true,
88-
cfgIsNil: true,
94+
clientIsNil: true,
95+
},
96+
"get client at random": {
97+
healthy: true,
98+
needHealthyServer: false,
99+
clientIsNil: false,
89100
},
90101
}
91102

92103
for k, tc := range testcases {
93104
t.Run(k, func(t *testing.T) {
94-
cfg := rcm.GetRestConfig(tc.needHealthyServer)
95-
if tc.cfgIsNil {
96-
if cfg != nil {
97-
t.Errorf("expect rest config is nil, but got %v", cfg)
105+
fakeHealthyChecker := healthchecker.NewFakeChecker(tc.healthy, servers)
106+
rcm, _ := NewRestClientManager(remoteServers, transportManager, fakeHealthyChecker)
107+
108+
client := rcm.GetDirectClientset(tc.needHealthyServer)
109+
if tc.clientIsNil {
110+
if client != nil {
111+
t.Errorf("expect rest client is nil, but got %v", client)
98112
}
99113
} else {
100-
if cfg == nil {
101-
t.Errorf("expect non nil rest config, but got nil")
114+
if client == nil {
115+
t.Errorf("expect non nil rest client, but got nil")
102116
}
103117
}
104118
})

pkg/yurthub/kubernetes/rest/config.go

-67
This file was deleted.

0 commit comments

Comments
 (0)