9
9
"net/http"
10
10
"os"
11
11
"sync"
12
+ "sync/atomic"
12
13
"time"
13
14
14
15
"github.com/cloudability/metrics-agent/retrieval/raw"
@@ -29,7 +30,7 @@ func (err nodeError) Error() string {
29
30
}
30
31
31
32
const (
32
- FatalNodeError = nodeError ("unable to retrieve required set of node metrics via direct or proxy connection" )
33
+ FatalNodeError = nodeError ("unable to retrieve required metrics from any node via direct or proxy connection" )
33
34
)
34
35
35
36
// NodeSource is an interface to get a list of Nodes
@@ -346,36 +347,87 @@ func ensureNodeSource(ctx context.Context, config KubeAgentConfig) (KubeAgentCon
346
347
return config , fmt .Errorf ("error retrieving nodes: %s" , err )
347
348
}
348
349
349
- firstNode := & nodes [0 ]
350
+ directNodes := int32 (0 )
351
+ proxyNodes := int32 (0 )
352
+ failedDirect := int32 (0 )
353
+ failedProxy := int32 (0 )
354
+ directAllowed := allowDirectConnect (config , nodes )
350
355
351
- ip , port , err := clientSetNodeSource .NodeAddress (firstNode )
352
- if err != nil {
353
- return config , fmt .Errorf ("error retrieving node addresses: %s" , err )
354
- }
356
+ var wg sync.WaitGroup
355
357
356
- if allowDirectConnect (config , nodes ) {
357
- // test node direct connectivity
358
- d := directNodeEndpoints (ip , port )
359
- success , err := checkEndpointConnections (config , & nodeHTTPClient , Direct , d .statsSummary ())
360
- if err != nil {
361
- return config , err
362
- }
363
- if success {
364
- return config , nil
365
- }
358
+ limiter := make (chan struct {}, config .ConcurrentPollers )
359
+
360
+ for _ , n := range nodes {
361
+ // block if channel is full (limiting number of goroutines)
362
+ limiter <- struct {}{}
363
+ wg .Add (1 )
364
+ go func (currentNode v1.Node ) {
365
+ defer func () {
366
+ <- limiter
367
+ wg .Done ()
368
+ }()
369
+ directlyConnected := false
370
+ ip , port , err := clientSetNodeSource .NodeAddress (& currentNode )
371
+ if err != nil {
372
+ log .Warnf ("error retrieving node addresses: %s" , err )
373
+ return
374
+ }
375
+ if directAllowed {
376
+ // test node direct connectivity
377
+ d := directNodeEndpoints (ip , port )
378
+ success , err := checkEndpointConnections (config , & nodeHTTPClient , Direct , d .statsSummary ())
379
+ if err != nil {
380
+ log .Warnf ("Failed to connect to node [%s] directly with cause [%s]" ,
381
+ d .statsSummary (), err .Error ())
382
+ atomic .AddInt32 (& failedDirect , 1 )
383
+ }
384
+ if success {
385
+ directlyConnected = true
386
+ atomic .AddInt32 (& directNodes , 1 )
387
+ }
388
+ }
389
+ if ! directlyConnected {
390
+ p := setupProxyAPI (config .ClusterHostURL , currentNode .Name )
391
+ success , err := checkEndpointConnections (config , & config .HTTPClient , Proxy , p .statsSummary ())
392
+ if err != nil {
393
+ log .Warnf ("Failed to connect to node [%s] via proxy with cause [%s]" ,
394
+ p .statsSummary (), err .Error ())
395
+ atomic .AddInt32 (& failedProxy , 1 )
396
+ }
397
+ if success {
398
+ atomic .AddInt32 (& proxyNodes , 1 )
399
+ }
400
+ }
401
+ }(n )
366
402
}
403
+ log .Debugln ("Currently Waiting for all node data to be gathered" )
404
+ wg .Wait ()
405
+ log .Infof ("Of %d nodes, %d connected directly, %d connected via proxy, and %d could not be reached" ,
406
+ len (nodes ), directNodes , proxyNodes , failedProxy )
367
407
368
- // test node connectivity via kube-proxy
369
- p := setupProxyAPI (config .ClusterHostURL , firstNode .Name )
370
- success , err := checkEndpointConnections (config , & config .HTTPClient , Proxy , p .statsSummary ())
371
- if err != nil {
372
- return config , err
408
+ if len (nodes ) != int (directNodes + proxyNodes ) {
409
+ pct := int (directNodes + proxyNodes ) * 100 / len (nodes )
410
+ log .Warnf ("Only %d percent of ready nodes could could be connected to, " +
411
+ "agent will operate in a limited mode." , pct )
373
412
}
374
- if success {
375
- return config , nil
413
+
414
+ if (directNodes + proxyNodes ) == 0 {
415
+ return config , FatalNodeError
376
416
}
377
417
378
- return config , FatalNodeError
418
+ validateConfig (config , proxyNodes , directNodes )
419
+ return config , nil
420
+ }
421
+
422
+ func validateConfig (config KubeAgentConfig , proxyNodes , directNodes int32 ) {
423
+ if proxyNodes > 0 {
424
+ config .NodeMetrics .SetAvailability (NodeStatsSummaryEndpoint , Proxy , true )
425
+ } else if directNodes > 0 {
426
+ config .NodeMetrics .SetAvailability (NodeStatsSummaryEndpoint , Direct , true )
427
+ } else {
428
+ config .NodeMetrics .SetAvailability (NodeStatsSummaryEndpoint , Proxy , false )
429
+ config .NodeMetrics .SetAvailability (NodeStatsSummaryEndpoint , Direct , false )
430
+ }
379
431
}
380
432
381
433
func checkEndpointConnections (config KubeAgentConfig , client * http.Client , method Connection ,
@@ -384,8 +436,7 @@ func checkEndpointConnections(config KubeAgentConfig, client *http.Client, metho
384
436
if err != nil {
385
437
return false , err
386
438
}
387
- log .Infof ("/stats/summary endpoint available via %s connection? %v" , method , ns )
388
- config .NodeMetrics .SetAvailability (NodeStatsSummaryEndpoint , method , ns )
439
+ log .Infof ("Node [%s] available via %s connection? %v" , nodeStatSum , method , ns )
389
440
390
441
return ns , nil
391
442
}
0 commit comments