Skip to content

Commit af9ea6e

Browse files
Enhance server discovery
1 parent e728dc1 commit af9ea6e

File tree

2 files changed

+151
-25
lines changed

2 files changed

+151
-25
lines changed

pkg/config/types.go

+13-5
Original file line numberDiff line numberDiff line change
@@ -39,11 +39,11 @@ var (
3939

4040
type Config struct {
4141
RuntimeConfig
42-
KubernetesVersion string `json:"kubernetesVersion,omitempty"`
43-
RancherVersion string `json:"rancherVersion,omitempty"`
44-
Server string `json:"server,omitempty"`
45-
Discovery map[string]string `json:"discovery,omitempty"`
46-
Role string `json:"role,omitempty"`
42+
KubernetesVersion string `json:"kubernetesVersion,omitempty"`
43+
RancherVersion string `json:"rancherVersion,omitempty"`
44+
Server string `json:"server,omitempty"`
45+
Discovery *DiscoveryConfig `json:"discovery,omitempty"`
46+
Role string `json:"role,omitempty"`
4747

4848
RancherValues map[string]interface{} `json:"rancherValues,omitempty"`
4949
PreInstructions []plan.Instruction `json:"preInstructions,omitempty"`
@@ -57,6 +57,14 @@ type Config struct {
5757
Registries *registries.Registry `json:"registries,omitempty"`
5858
}
5959

60+
type DiscoveryConfig struct {
61+
Params map[string]string `json:"params,omitempty"`
62+
ExpectedServers int `json:"expectedServers,omitempty"`
63+
// ServerCacheDuration will remember discovered servers for this amount of time. This
64+
// helps with some discovery protocols like mDNS that can be unreliable
65+
ServerCacheDuration string `json:"serverCacheDuration,omitempty"`
66+
}
67+
6068
func Load(path string) (result Config, err error) {
6169
var (
6270
values = map[string]interface{}{}

pkg/discovery/discovery.go

+138-20
Original file line numberDiff line numberDiff line change
@@ -7,18 +7,20 @@ import (
77
"fmt"
88
"io/ioutil"
99
"log"
10+
"net"
1011
"net/http"
1112
"sort"
13+
"strconv"
1214
"sync"
1315
"time"
1416

1517
"github.com/hashicorp/go-discover"
16-
"github.com/rancher/dynamiclistener/server"
1718
"github.com/rancher/rancherd/pkg/config"
1819
"github.com/rancher/wrangler/pkg/data/convert"
1920
"github.com/rancher/wrangler/pkg/randomtoken"
2021
"github.com/rancher/wrangler/pkg/slice"
2122
"github.com/sirupsen/logrus"
23+
"k8s.io/client-go/util/cert"
2224

2325
// Include kubernetes provider
2426
_ "github.com/hashicorp/go-discover/provider/k8s"
@@ -38,10 +40,17 @@ var (
3840
)
3941

4042
func DiscoverServerAndRole(ctx context.Context, cfg *config.Config) error {
41-
if len(cfg.Discovery) == 0 {
43+
if cfg.Discovery == nil {
44+
if cfg.Server == "" && cfg.Role == "server" && cfg.Token == "" {
45+
cfg.Role = "cluster-init"
46+
}
4247
return nil
4348
}
4449

50+
if cfg.Token == "" {
51+
return fmt.Errorf("token is required to be set")
52+
}
53+
4554
server, clusterInit, err := discoverServerAndRole(ctx, cfg)
4655
if err != nil {
4756
return err
@@ -51,6 +60,7 @@ func DiscoverServerAndRole(ctx context.Context, cfg *config.Config) error {
5160
} else if server != "" {
5261
cfg.Server = server
5362
}
63+
logrus.Infof("Using role=%s and server=%s", cfg.Role, cfg.Server)
5464
return nil
5565

5666
}
@@ -68,13 +78,18 @@ func discoverServerAndRole(ctx context.Context, cfg *config.Config) (string, boo
6878
ctx, cancel := context.WithCancel(ctx)
6979
defer cancel()
7080

71-
server, err := newJoinServer(ctx, port)
81+
server, err := newJoinServer(ctx, cfg.Discovery.ServerCacheDuration, port)
7282
if err != nil {
7383
return "", false, err
7484
}
7585

86+
count := cfg.Discovery.ExpectedServers
87+
if count == 0 {
88+
count = 3
89+
}
90+
7691
for {
77-
server, clusterInit := server.loop(ctx, cfg.Discovery, port, discovery)
92+
server, clusterInit := server.loop(ctx, count, cfg.Discovery.Params, port, discovery)
7893
if clusterInit {
7994
return "", true, nil
8095
}
@@ -90,15 +105,33 @@ func discoverServerAndRole(ctx context.Context, cfg *config.Config) (string, boo
90105
}
91106
}
92107

93-
func (j *joinServer) loop(ctx context.Context, params map[string]string, port int64, discovery *discover.Discover) (string, bool) {
108+
func (j *joinServer) addresses(params map[string]string, discovery *discover.Discover) ([]string, error) {
94109
addrs, err := discovery.Addrs(discover.Config(params).String(), log.Default())
110+
if err != nil {
111+
return nil, err
112+
}
113+
114+
var ips []string
115+
for _, addr := range addrs {
116+
host, _, err := net.SplitHostPort(addr)
117+
if err == nil {
118+
ips = append(ips, host)
119+
} else {
120+
ips = append(ips, addr)
121+
}
122+
}
123+
124+
return ips, nil
125+
}
126+
127+
func (j *joinServer) loop(ctx context.Context, count int, params map[string]string, port int64, discovery *discover.Discover) (string, bool) {
128+
addrs, err := j.addresses(params, discovery)
95129
if err != nil {
96130
logrus.Errorf("failed to discover peers to: %v", err)
97131
return "", false
98132
}
99133

100-
sort.Strings(addrs)
101-
j.setPeers(addrs)
134+
addrs = j.setPeers(addrs)
102135

103136
var (
104137
allAgree = true
@@ -128,7 +161,7 @@ func (j *joinServer) loop(ctx context.Context, params map[string]string, port in
128161

129162
rancherID := resp.Header.Get("X-Cattle-Rancherd-Id")
130163
if rancherID == "" {
131-
return fmt.Sprintf("https://%s:%d", addr, port), false
164+
return fmt.Sprintf("https://%s", net.JoinHostPort(addr, strconv.FormatInt(port, 10))), false
132165
}
133166
if i == 0 {
134167
firstID = rancherID
@@ -148,41 +181,126 @@ func (j *joinServer) loop(ctx context.Context, params map[string]string, port in
148181
}
149182
}
150183

151-
if allAgree && len(addrs) > 2 && firstID == j.id {
152-
return "", true
184+
if firstID != j.id {
185+
logrus.Infof("Waiting for peer %s from %v to initialize", addrs[0], addrs)
186+
return "", false
153187
}
154188

155-
return "", false
189+
if len(addrs) != count {
190+
logrus.Infof("Expecting %d servers currently have %v", count, addrs)
191+
return "", false
192+
}
193+
194+
if !allAgree {
195+
logrus.Infof("All peers %v do not agree on the peer list", addrs)
196+
return "", false
197+
}
198+
199+
logrus.Infof("Currently the elected leader %s from peers %v", firstID, addrs)
200+
return "", true
156201
}
157202

158203
type joinServer struct {
159-
lock sync.Mutex
160-
id string
161-
peers []string
204+
lock sync.Mutex
205+
id string
206+
peers []string
207+
peerSeen map[string]time.Time
208+
cacheDuration time.Duration
162209
}
163210

164211
type pingResponse struct {
165212
Peers []string `json:"peers,omitempty"`
166213
}
167214

168-
func newJoinServer(ctx context.Context, port int64) (*joinServer, error) {
215+
func newJoinServer(ctx context.Context, cacheDuration string, port int64) (*joinServer, error) {
169216
id, err := randomtoken.Generate()
170217
if err != nil {
171218
return nil, err
172219
}
173220

221+
if cacheDuration == "" {
222+
cacheDuration = "5m"
223+
}
224+
225+
duration, err := time.ParseDuration(cacheDuration)
226+
if err != nil {
227+
return nil, err
228+
}
229+
174230
j := &joinServer{
175-
id: id,
231+
id: id,
232+
cacheDuration: duration,
233+
peerSeen: map[string]time.Time{},
234+
}
235+
236+
cert, key, err := cert.GenerateSelfSignedCertKey("rancherd-bootstrap", nil, nil)
237+
if err != nil {
238+
return nil, err
239+
}
240+
certs, err := tls.X509KeyPair(cert, key)
241+
if err != nil {
242+
return nil, err
243+
}
244+
l, err := tls.Listen("tcp", fmt.Sprintf(":%d", port), &tls.Config{
245+
Certificates: []tls.Certificate{
246+
certs,
247+
},
248+
})
249+
if err != nil {
250+
return nil, err
176251
}
252+
server := &http.Server{
253+
BaseContext: func(_ net.Listener) context.Context {
254+
return ctx
255+
},
256+
Handler: j,
257+
}
258+
go func() {
259+
err := server.Serve(l)
260+
if err != nil {
261+
logrus.Errorf("failed to server bootstrap http server: %v", err)
262+
}
263+
}()
264+
go func() {
265+
<-ctx.Done()
266+
server.Shutdown(context.Background())
267+
l.Close()
268+
}()
177269

178-
return j, server.ListenAndServe(ctx, int(port), 0, j, nil)
270+
return j, nil
179271
}
180272

181-
func (j *joinServer) setPeers(peers []string) {
273+
func (j *joinServer) setPeers(peers []string) []string {
182274
j.lock.Lock()
183275
defer j.lock.Unlock()
184-
logrus.Infof("current set of peers: %v", peers)
185-
j.peers = peers
276+
277+
// purge
278+
now := time.Now()
279+
for k, v := range j.peerSeen {
280+
if v.Add(j.cacheDuration).Before(now) {
281+
logrus.Info("Forgetting peer %s", k)
282+
delete(j.peerSeen, k)
283+
}
284+
}
285+
286+
// add
287+
for _, peer := range peers {
288+
if _, ok := j.peerSeen[peer]; !ok {
289+
logrus.Info("New peer discovered %s", peer)
290+
}
291+
j.peerSeen[peer] = now
292+
}
293+
294+
// sort
295+
newPeers := make([]string, 0, len(j.peerSeen))
296+
for k := range j.peerSeen {
297+
newPeers = append(newPeers, k)
298+
}
299+
sort.Strings(newPeers)
300+
301+
j.peers = newPeers
302+
logrus.Infof("current set of peers: %v", j.peers)
303+
return j.peers
186304
}
187305

188306
func (j *joinServer) ServeHTTP(rw http.ResponseWriter, req *http.Request) {

0 commit comments

Comments
 (0)