@@ -61,6 +61,7 @@ type Config struct {
61
61
ClusterUpdateThreshold time.Duration
62
62
ClusterUpdateInterval time.Duration
63
63
IdleConnectionReapDelay time.Duration
64
+ ClientHealthCheckInterval time.Duration
64
65
65
66
HostPorts []string
66
67
Region string
@@ -110,6 +111,7 @@ var defaultConfig = Config{
110
111
MaxPendingConnectionsPerHost : 10 ,
111
112
ClusterUpdateInterval : time .Second * 4 ,
112
113
ClusterUpdateThreshold : time .Millisecond * 125 ,
114
+ ClientHealthCheckInterval : time .Second * 5 ,
113
115
114
116
Credentials : defaults .CredChain (defaults .Config (), defaults .Handlers ()),
115
117
@@ -418,10 +420,10 @@ func (cc *ClusterDaxClient) shouldRetry(o RequestOptions, err error) (request.Re
418
420
419
421
type cluster struct {
420
422
lock sync.RWMutex
421
- active map [hostPort ]DaxAPI // protected by lock
422
- routes []DaxAPI // protected by lock
423
- closed bool // protected by lock
424
- lastRefreshErr error // protected by lock
423
+ active map [hostPort ]clientAndConfig // protected by lock
424
+ routes []DaxAPI // protected by lock
425
+ closed bool // protected by lock
426
+ lastRefreshErr error // protected by lock
425
427
426
428
lastUpdateNs int64
427
429
executor * taskExecutor
@@ -431,6 +433,11 @@ type cluster struct {
431
433
clientBuilder clientBuilder
432
434
}
433
435
436
+ type clientAndConfig struct {
437
+ client DaxAPI
438
+ cfg serviceEndpoint
439
+ }
440
+
434
441
func newCluster (cfg Config ) (* cluster , error ) {
435
442
if err := cfg .validate (); err != nil {
436
443
return nil , err
@@ -599,7 +606,7 @@ func (c *cluster) refresh(force bool) error {
599
606
func (c * cluster ) refreshNow () error {
600
607
cfg , err := c .pullEndpoints ()
601
608
if err != nil {
602
- c .config . logger . Log (fmt .Sprintf ("ERROR: Failed to refresh endpoint : %s" , err ))
609
+ c .debugLog (fmt .Sprintf ("ERROR: Failed to refresh endpoint : %s" , err ))
603
610
return err
604
611
}
605
612
if ! c .hasChanged (cfg ) {
@@ -608,55 +615,116 @@ func (c *cluster) refreshNow() error {
608
615
return c .update (cfg )
609
616
}
610
617
618
+ // This method is responsible for updating the set of active routes tracked by
619
+ // the clsuter-dax-client in response to updates in the roster.
611
620
func (c * cluster ) update (config []serviceEndpoint ) error {
612
621
newEndpoints := make (map [hostPort ]struct {}, len (config ))
613
622
for _ , cfg := range config {
614
623
newEndpoints [cfg .hostPort ()] = struct {}{}
615
624
}
616
625
617
- newActive := make (map [hostPort ]DaxAPI , len (config ))
626
+ newActive := make (map [hostPort ]clientAndConfig , len (config ))
618
627
newRoutes := make ([]DaxAPI , len (config ))
628
+ shouldUpdateRoutes := true
629
+ var toClose []clientAndConfig
630
+ // Track the newly created client instances, so that we can clean them up in case of partial failures.
631
+ var newCliCfg []clientAndConfig
632
+
633
+ c .lock .Lock ()
619
634
620
- c .lock .RLock ()
621
635
cls := c .closed
622
636
oldActive := c .active
623
- c .lock .RUnlock ()
624
- if cls {
625
- return nil
626
- }
627
637
628
- var toClose []DaxAPI
629
- for ep , cli := range oldActive {
630
- _ , ok := newEndpoints [ep ]
631
- if ! ok {
632
- toClose = append (toClose , cli )
638
+ if cls {
639
+ shouldUpdateRoutes = false
640
+ } else {
641
+ // Close the client instances that are no longer part of roster.
642
+ for ep , clicfg := range oldActive {
643
+ _ , isPartOfUpdatedEndpointsConfig := newEndpoints [ep ]
644
+ if ! isPartOfUpdatedEndpointsConfig {
645
+ c .debugLog (fmt .Sprintf ("Found updated endpoing configs, will close inactive endpoint client : %s" , ep .host ))
646
+ toClose = append (toClose , clicfg )
647
+ }
633
648
}
634
- }
635
- for i , ep := range config {
636
- cli , ok := oldActive [ep .hostPort ()]
637
- var err error
638
- if ! ok {
639
- cli , err = c .newSingleClient (ep )
640
- if err != nil {
641
- return nil
649
+
650
+ // Create client instances for the new endpoints in roster.
651
+ for i , ep := range config {
652
+ cliAndCfg , alreadyExists := oldActive [ep .hostPort ()]
653
+ if ! alreadyExists {
654
+ cli , err := c .newSingleClient (ep )
655
+ if err != nil {
656
+ shouldUpdateRoutes = false
657
+ break
658
+ } else {
659
+ cliAndCfg = clientAndConfig {client : cli , cfg : ep }
660
+ newCliCfg = append (newCliCfg , cliAndCfg )
661
+ }
662
+
663
+ if singleCli , ok := cli .(HealthCheckDaxAPI ); ok {
664
+ singleCli .startHealthChecks (c , ep .hostPort ())
665
+ }
642
666
}
667
+ newActive [ep .hostPort ()] = cliAndCfg
668
+ newRoutes [i ] = cliAndCfg .client
643
669
}
644
- newActive [ep .hostPort ()] = cli
645
- newRoutes [i ] = cli
646
670
}
647
- c .lock .Lock ()
648
- c .active = newActive
649
- c .routes = newRoutes
671
+
672
+ if shouldUpdateRoutes {
673
+ c .active = newActive
674
+ c .routes = newRoutes
675
+ } else {
676
+ // cleanup newly created clients if they are not going to be tracked further.
677
+ toClose = append (toClose , newCliCfg ... )
678
+ }
650
679
c .lock .Unlock ()
651
680
652
681
go func () {
653
682
for _ , client := range toClose {
654
- c .closeClient (client )
683
+ c .debugLog (fmt .Sprintf ("Closing client for : %s" , client .cfg .hostname ))
684
+ c .closeClient (client .client )
655
685
}
656
686
}()
657
687
return nil
658
688
}
659
689
690
+ func (c * cluster ) onHealthCheckFailed (host hostPort ) {
691
+ c .lock .Lock ()
692
+ c .debugLog ("DEBUG: Refreshing cache for host: " + host .host )
693
+ shouldCloseOldClient := true
694
+ var oldClientConfig , ok = c .active [host ]
695
+ if ok {
696
+ var err error
697
+ var cli DaxAPI
698
+ cli , err = c .newSingleClient (oldClientConfig .cfg )
699
+ if singleCli , ok := cli .(HealthCheckDaxAPI ); ok {
700
+ singleCli .startHealthChecks (c , host )
701
+ }
702
+
703
+ if err == nil {
704
+ c .active [host ] = clientAndConfig {client : cli , cfg : oldClientConfig .cfg }
705
+
706
+ newRoutes := make ([]DaxAPI , len (c .active ))
707
+ i := 0
708
+ for _ , cliAndCfg := range c .active {
709
+ newRoutes [i ] = cliAndCfg .client
710
+ i ++
711
+ }
712
+ c .routes = newRoutes
713
+ } else {
714
+ shouldCloseOldClient = false
715
+ c .debugLog (fmt .Sprintf ("DEBUG: Failed to refresh cache for host: " + host .host ))
716
+ }
717
+ } else {
718
+ c .debugLog (fmt .Sprintf ("DEBUG: The node is not part of active routes. Ignoring the health check failure for host: " + host .host ))
719
+ }
720
+ c .lock .Unlock ()
721
+
722
+ if shouldCloseOldClient {
723
+ c .debugLog (fmt .Sprintf ("DEBUG: Closing old instance of a replaced client for endpoint: %s" , oldClientConfig .cfg .hostPort ().host ))
724
+ c .closeClient (oldClientConfig .client )
725
+ }
726
+ }
727
+
660
728
func (c * cluster ) hasChanged (cfg []serviceEndpoint ) bool {
661
729
c .lock .RLock ()
662
730
defer c .lock .RUnlock ()
@@ -692,9 +760,7 @@ func (c *cluster) pullEndpoints() ([]serviceEndpoint, error) {
692
760
lastErr = err
693
761
continue
694
762
}
695
- if c .config .logger != nil && c .config .logLevel .AtLeast (aws .LogDebug ) {
696
- c .config .logger .Log (fmt .Sprintf ("DEBUG: Pulled endpoints from %s : %v" , ip , endpoints ))
697
- }
763
+ c .debugLog (fmt .Sprintf ("DEBUG: Pulled endpoints from %s : %v" , ip , endpoints ))
698
764
if len (endpoints ) > 0 {
699
765
return endpoints , nil
700
766
}
@@ -720,6 +786,14 @@ func (c *cluster) closeClient(client DaxAPI) {
720
786
}
721
787
}
722
788
789
+ func (c * cluster ) debugLog (args ... interface {}) {
790
+ if c .config .logger != nil && c .config .logLevel .AtLeast (aws .LogDebug ) {
791
+ {
792
+ c .config .logger .Log (args )
793
+ }
794
+ }
795
+ }
796
+
723
797
func (c * cluster ) newSingleClient (cfg serviceEndpoint ) (DaxAPI , error ) {
724
798
return c .clientBuilder .newClient (net .IP (cfg .address ), cfg .port , c .config .connConfig , c .config .Region , c .config .Credentials , c .config .MaxPendingConnectionsPerHost , c .config .DialContext )
725
799
}
0 commit comments