@@ -171,6 +171,7 @@ type HostInfo struct {
171171 port int
172172 dataCenter string
173173 rack string
174+ missingRack bool
174175 hostId string
175176 workload string
176177 graph bool
@@ -413,8 +414,9 @@ func (h *HostInfo) update(from *HostInfo) {
413414 if h .dataCenter == "" {
414415 h .dataCenter = from .dataCenter
415416 }
416- if h .rack == "" {
417+ if h .missingRack || h . rack == "" {
417418 h .rack = from .rack
419+ h .missingRack = from .missingRack
418420 }
419421 if h .hostId == "" {
420422 h .hostId = from .hostId
@@ -530,7 +532,7 @@ func newHostInfoFromRow(s *Session, defaultAddr net.IP, defaultPort int, row map
530532 const assertErrorMsg = "Assertion failed for %s, type was %T"
531533 var ok bool
532534
533- host := & HostInfo {connectAddress : defaultAddr , port : defaultPort }
535+ host := & HostInfo {connectAddress : defaultAddr , port : defaultPort , missingRack : true }
534536
535537 // Process all fields from the row
536538 for key , value := range row {
@@ -541,14 +543,30 @@ func newHostInfoFromRow(s *Session, defaultAddr net.IP, defaultPort int, row map
541543 return nil , fmt .Errorf (assertErrorMsg , "data_center" , value )
542544 }
543545 case "rack" :
544- host . rack , ok = value .(string )
546+ rack , ok : = value .(* string )
545547 if ! ok {
546- return nil , fmt .Errorf (assertErrorMsg , "rack" , value )
548+ if rack , ok := value .(string ); ! ok {
549+ return nil , fmt .Errorf (assertErrorMsg , "rack" , value )
550+ } else {
551+ host .rack = rack
552+ host .missingRack = false
553+ }
554+ } else if rack != nil {
555+ host .rack = * rack
556+ host .missingRack = false
547557 }
548558 case "host_id" :
549559 hostId , ok := value .(UUID )
550560 if ! ok {
551- return nil , fmt .Errorf (assertErrorMsg , "host_id" , value )
561+ if str , ok := value .(string ); ok {
562+ var err error
563+ hostId , err = ParseUUID (str )
564+ if err != nil {
565+ return nil , fmt .Errorf ("failed to parse host_id: %w" , err )
566+ }
567+ } else {
568+ return nil , fmt .Errorf (assertErrorMsg , "host_id" , value )
569+ }
552570 }
553571 host .hostId = hostId .String ()
554572 case "release_version" :
@@ -560,7 +578,11 @@ func newHostInfoFromRow(s *Session, defaultAddr net.IP, defaultPort int, row map
560578 case "peer" :
561579 ip , ok := value .(net.IP )
562580 if ! ok {
563- return nil , fmt .Errorf (assertErrorMsg , "peer" , value )
581+ if str , ok := value .(string ); ok {
582+ ip = net .ParseIP (str )
583+ } else {
584+ return nil , fmt .Errorf (assertErrorMsg , "peer" , value )
585+ }
564586 }
565587 host .peer = ip
566588 case "cluster_name" :
@@ -576,31 +598,51 @@ func newHostInfoFromRow(s *Session, defaultAddr net.IP, defaultPort int, row map
576598 case "broadcast_address" :
577599 ip , ok := value .(net.IP )
578600 if ! ok {
579- return nil , fmt .Errorf (assertErrorMsg , "broadcast_address" , value )
601+ if str , ok := value .(string ); ok {
602+ ip = net .ParseIP (str )
603+ } else {
604+ return nil , fmt .Errorf (assertErrorMsg , "broadcast_address" , value )
605+ }
580606 }
581607 host .broadcastAddress = ip
582608 case "preferred_ip" :
583609 ip , ok := value .(net.IP )
584610 if ! ok {
585- return nil , fmt .Errorf (assertErrorMsg , "preferred_ip" , value )
611+ if str , ok := value .(string ); ok {
612+ ip = net .ParseIP (str )
613+ } else {
614+ return nil , fmt .Errorf (assertErrorMsg , "preferred_ip" , value )
615+ }
586616 }
587617 host .preferredIP = ip
588618 case "rpc_address" :
589619 ip , ok := value .(net.IP )
590620 if ! ok {
591- return nil , fmt .Errorf (assertErrorMsg , "rpc_address" , value )
621+ if str , ok := value .(string ); ok {
622+ ip = net .ParseIP (str )
623+ } else {
624+ return nil , fmt .Errorf (assertErrorMsg , "rpc_address" , value )
625+ }
592626 }
593627 host .rpcAddress = ip
594628 case "native_address" :
595629 ip , ok := value .(net.IP )
596630 if ! ok {
597- return nil , fmt .Errorf (assertErrorMsg , "native_address" , value )
631+ if str , ok := value .(string ); ok {
632+ ip = net .ParseIP (str )
633+ } else {
634+ return nil , fmt .Errorf (assertErrorMsg , "native_address" , value )
635+ }
598636 }
599637 host .rpcAddress = ip
600638 case "listen_address" :
601639 ip , ok := value .(net.IP )
602640 if ! ok {
603- return nil , fmt .Errorf (assertErrorMsg , "listen_address" , value )
641+ if str , ok := value .(string ); ok {
642+ ip = net .ParseIP (str )
643+ } else {
644+ return nil , fmt .Errorf (assertErrorMsg , "listen_address" , value )
645+ }
604646 }
605647 host .listenAddress = ip
606648 case "native_port" :
@@ -666,18 +708,23 @@ func newHostInfoFromRow(s *Session, defaultAddr net.IP, defaultPort int, row map
666708 }
667709}
668710
711+ // this will return nil, nil if there were no rows left in the Iter
669712func (s * Session ) hostInfoFromIter (iter * Iter , connectAddress net.IP , defaultPort int ) (* HostInfo , error ) {
670- rows , err := iter .SliceMap ()
671- if err != nil {
672- // TODO(zariel): make typed error
673- return nil , err
713+ // TODO: switch this to a new iterator method once CASSGO-36 is solved
714+ m := map [string ]interface {}{
715+ // we set rack to a double pointer so we can know if it's NULL or not since
716+ // we need to be able to filter out NULL rack hosts but not empty string hosts
717+ // see CASSGO-6
718+ "rack" : new (* string ),
674719 }
675-
676- if len (rows ) == 0 {
677- return nil , errors .New ("query returned 0 rows" )
720+ if ! iter .MapScan (m ) {
721+ if err := iter .Close (); err != nil {
722+ return nil , err
723+ }
724+ return nil , nil
678725 }
679726
680- host , err := s .newHostInfoFromMap (connectAddress , defaultPort , rows [ 0 ] )
727+ host , err := s .newHostInfoFromMap (connectAddress , defaultPort , m )
681728 if err != nil {
682729 return nil , err
683730 }
@@ -700,8 +747,12 @@ func (r *ringDescriber) getLocalHostInfo() (*HostInfo, error) {
700747
701748 host , err := r .session .hostInfoFromIter (iter , nil , r .session .cfg .Port )
702749 if err != nil {
750+ iter .Close ()
703751 return nil , fmt .Errorf ("could not retrieve local host info: %w" , err )
704752 }
753+ if host == nil {
754+ return nil , errors .New ("could not retrieve local host info: query returned 0 rows" )
755+ }
705756 return host , nil
706757}
707758
@@ -711,7 +762,6 @@ func (r *ringDescriber) getClusterPeerInfo(localHost *HostInfo) ([]*HostInfo, er
711762 return nil , errNoControl
712763 }
713764
714- var peers []* HostInfo
715765 iter := r .session .control .withConnHost (func (ch * connHost ) * Iter {
716766 return ch .conn .querySystemPeers (context .TODO (), localHost .version )
717767 })
@@ -720,18 +770,25 @@ func (r *ringDescriber) getClusterPeerInfo(localHost *HostInfo) ([]*HostInfo, er
720770 return nil , errNoControl
721771 }
722772
723- rows , err := iter .SliceMap ()
724- if err != nil {
725- // TODO(zariel): make typed error
726- return nil , fmt .Errorf ("unable to fetch peer host info: %s" , err )
727- }
728-
729- for _ , row := range rows {
773+ var peers []* HostInfo
774+ for {
730775 // extract all available info about the peer
731- host , err := r .session .newHostInfoFromMap ( nil , r .session .cfg .Port , row )
776+ host , err := r .session .hostInfoFromIter ( iter , nil , r .session .cfg .Port )
732777 if err != nil {
733- return nil , err
734- } else if ! isValidPeer (host ) {
778+ // if the error came from the iterator then return it, otherwise ignore
779+ // and warn
780+ if iterErr := iter .Close (); iterErr != nil {
781+ return nil , fmt .Errorf ("unable to fetch peer host info: %s" , iterErr )
782+ }
783+ // skip over peers that we couldn't parse
784+ r .session .logger .Warning ("Failed to parse peer this host will be ignored." , newLogFieldError ("err" , err ))
785+ continue
786+ }
787+ // if nil then none left
788+ if host == nil {
789+ break
790+ }
791+ if ! isValidPeer (host ) {
735792 // If it's not a valid peer
736793 r .session .logger .Warning ("Found invalid peer " +
737794 "likely due to a gossip or snitch issue, this host will be ignored." , newLogFieldStringer ("host" , host ))
@@ -749,7 +806,7 @@ func isValidPeer(host *HostInfo) bool {
749806 return ! (len (host .RPCAddress ()) == 0 ||
750807 host .hostId == "" ||
751808 host .dataCenter == "" ||
752- host .rack == "" ||
809+ host .missingRack ||
753810 len (host .tokens ) == 0 )
754811}
755812
0 commit comments