@@ -2,10 +2,197 @@ package discovery
2
2
3
3
import (
4
4
"context"
5
+ "crypto/tls"
6
+ "encoding/json"
7
+ "fmt"
8
+ "io/ioutil"
9
+ "log"
10
+ "net/http"
11
+ "sort"
12
+ "sync"
13
+ "time"
5
14
15
+ "github.com/hashicorp/go-discover"
16
+ "github.com/rancher/dynamiclistener/server"
6
17
"github.com/rancher/rancherd/pkg/config"
18
+ "github.com/rancher/wrangler/pkg/data/convert"
19
+ "github.com/rancher/wrangler/pkg/randomtoken"
20
+ "github.com/rancher/wrangler/pkg/slice"
21
+ "github.com/sirupsen/logrus"
22
+
23
+ // Include kubernetes provider
24
+ _ "github.com/hashicorp/go-discover/provider/k8s"
25
+ )
26
+
27
+ var (
28
+ insecureHTTPClient = http.Client {
29
+ Timeout : 10 * time .Second ,
30
+ Transport : & http.Transport {
31
+ Proxy : http .ProxyFromEnvironment ,
32
+ TLSHandshakeTimeout : 5 * time .Second ,
33
+ TLSClientConfig : & tls.Config {
34
+ InsecureSkipVerify : true ,
35
+ },
36
+ },
37
+ }
7
38
)
8
39
9
- func FindServer (ctx context.Context , cfg * config.Config ) (string , error ) {
10
- return cfg .Server , nil
40
+ func DiscoverServerAndRole (ctx context.Context , cfg * config.Config ) error {
41
+ if len (cfg .Discovery ) == 0 {
42
+ return nil
43
+ }
44
+
45
+ server , clusterInit , err := discoverServerAndRole (ctx , cfg )
46
+ if err != nil {
47
+ return err
48
+ }
49
+ if clusterInit {
50
+ cfg .Role = "cluster-init"
51
+ } else if server != "" {
52
+ cfg .Server = server
53
+ }
54
+ return nil
55
+
56
+ }
57
+ func discoverServerAndRole (ctx context.Context , cfg * config.Config ) (string , bool , error ) {
58
+ discovery , err := discover .New ()
59
+ if err != nil {
60
+ return "" , false , err
61
+ }
62
+
63
+ port , err := convert .ToNumber (cfg .RancherValues ["hostPort" ])
64
+ if err != nil || port == 0 {
65
+ port = 8443
66
+ }
67
+
68
+ ctx , cancel := context .WithCancel (ctx )
69
+ defer cancel ()
70
+
71
+ server , err := NewJoinServer (ctx , port )
72
+ if err != nil {
73
+ return "" , false , err
74
+ }
75
+
76
+ for {
77
+ server , clusterInit := server .loop (ctx , cfg .Discovery , port , discovery )
78
+ if clusterInit {
79
+ return "" , true , nil
80
+ }
81
+ if server != "" {
82
+ return server , false , nil
83
+ }
84
+ logrus .Info ("Waiting to discover server" )
85
+ select {
86
+ case <- ctx .Done ():
87
+ return "" , false , fmt .Errorf ("interrupted waiting to discover server: %w" , ctx .Err ())
88
+ case <- time .After (5 * time .Second ):
89
+ }
90
+ }
91
+ }
92
+
93
+ func (j * joinServer ) loop (ctx context.Context , params map [string ]string , port int64 , discovery * discover.Discover ) (string , bool ) {
94
+ addrs , err := discovery .Addrs (discover .Config (params ).String (), log .Default ())
95
+ if err != nil {
96
+ logrus .Errorf ("failed to discover peers to: %v" , err )
97
+ return "" , false
98
+ }
99
+
100
+ sort .Strings (addrs )
101
+ j .setPeers (addrs )
102
+
103
+ var (
104
+ allAgree = true
105
+ firstID = ""
106
+ )
107
+ for i , addr := range addrs {
108
+ url := fmt .Sprintf ("https://%s:%d/cacerts" , addr , port )
109
+ req , err := http .NewRequestWithContext (ctx , http .MethodGet , url , nil )
110
+ if err != nil {
111
+ logrus .Errorf ("failed to construct request for %s: %v" , url , err )
112
+ allAgree = false
113
+ return "" , false
114
+ }
115
+ resp , err := insecureHTTPClient .Do (req )
116
+ if err != nil {
117
+ logrus .Errorf ("failed to connect to %s: %v" , url , err )
118
+ allAgree = false
119
+ continue
120
+ }
121
+
122
+ data , err := ioutil .ReadAll (resp .Body )
123
+ resp .Body .Close ()
124
+ if err != nil || resp .StatusCode != http .StatusOK {
125
+ logrus .Errorf ("failed to read response from %s: code %d: %v" , url , resp .StatusCode , err )
126
+ allAgree = false
127
+ continue
128
+ }
129
+
130
+ rancherID := resp .Header .Get ("X-Cattle-Rancherd-Id" )
131
+ if rancherID == "" {
132
+ return fmt .Sprintf ("https://%s:%d" , addr , port ), false
133
+ }
134
+ if i == 0 {
135
+ firstID = rancherID
136
+ }
137
+
138
+ var pingResponse pingResponse
139
+ if err := json .Unmarshal (data , & pingResponse ); err != nil {
140
+ logrus .Errorf ("failed to unmarshal response (%s) from %s: %v" , data , url , err )
141
+ allAgree = false
142
+ continue
143
+ }
144
+
145
+ if ! slice .StringsEqual (addrs , pingResponse .Peers ) {
146
+ logrus .Infof ("Peer %s does not agree on peer list, %v != %v" , addr , addrs , pingResponse .Peers )
147
+ allAgree = false
148
+ continue
149
+ }
150
+ }
151
+
152
+ if allAgree && len (addrs ) > 2 && firstID == j .id {
153
+ return "" , true
154
+ }
155
+
156
+ return "" , false
157
+ }
158
+
159
+ type joinServer struct {
160
+ lock sync.Mutex
161
+ id string
162
+ peers []string
163
+ }
164
+
165
+ type pingResponse struct {
166
+ Peers []string `json:"peers,omitempty"`
167
+ }
168
+
169
+ func NewJoinServer (ctx context.Context , port int64 ) (* joinServer , error ) {
170
+ id , err := randomtoken .Generate ()
171
+ if err != nil {
172
+ return nil , err
173
+ }
174
+
175
+ j := & joinServer {
176
+ id : id ,
177
+ }
178
+
179
+ return j , server .ListenAndServe (ctx , int (port ), 0 , j , nil )
180
+ }
181
+
182
+ func (j * joinServer ) setPeers (peers []string ) {
183
+ j .lock .Lock ()
184
+ defer j .lock .Unlock ()
185
+ logrus .Infof ("current set of peers: %v" , peers )
186
+ j .peers = peers
187
+ }
188
+
189
+ func (j * joinServer ) ServeHTTP (rw http.ResponseWriter , req * http.Request ) {
190
+ j .lock .Lock ()
191
+ defer j .lock .Unlock ()
192
+
193
+ rw .Header ().Set ("X-Cattle-Rancherd-Id" , j .id )
194
+ rw .Header ().Set ("Content-Type" , "application/json" )
195
+ _ = json .NewEncoder (rw ).Encode (pingResponse {
196
+ Peers : j .peers ,
197
+ })
11
198
}
0 commit comments