Skip to content

Commit ce100a1

Browse files
committed
Merge Fallback to system.peers when system.peers_v2 is not available (#1646)
2 parents c44af82 + dc449c4 commit ce100a1

File tree

3 files changed

+36
-17
lines changed

3 files changed

+36
-17
lines changed

conn.go

Lines changed: 34 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,7 @@ type Conn struct {
190190
version uint8
191191
currentKeyspace string
192192
host *HostInfo
193+
isSchemaV2 bool
193194

194195
session *Session
195196

@@ -255,6 +256,7 @@ func (s *Session) dialWithoutObserver(ctx context.Context, host *HostInfo, cfg *
255256
session: s,
256257
streams: streams.New(cfg.ProtoVersion),
257258
host: host,
259+
isSchemaV2: true, // Try using "system.peers_v2" until proven otherwise
258260
frameObserver: s.frameObserver,
259261
w: &deadlineContextWriter{
260262
w: dialedHost.Conn,
@@ -1618,21 +1620,47 @@ func (c *Conn) query(ctx context.Context, statement string, values ...interface{
16181620
return c.executeQuery(ctx, q)
16191621
}
16201622

1621-
func (c *Conn) awaitSchemaAgreement(ctx context.Context) (err error) {
1623+
func (c *Conn) querySystemPeers(ctx context.Context, version cassVersion) *Iter {
16221624
const (
1623-
peerSchemasTemplate = "SELECT * FROM %s"
1624-
localSchemas = "SELECT schema_version FROM system.local WHERE key='local'"
1625+
peerSchema = "SELECT * FROM system.peers"
1626+
peerV2Schemas = "SELECT * FROM system.peers_v2"
16251627
)
16261628

1629+
c.mu.Lock()
1630+
isSchemaV2 := c.isSchemaV2
1631+
c.mu.Unlock()
1632+
1633+
if version.AtLeast(4, 0, 0) && isSchemaV2 {
1634+
// Try "system.peers_v2" and fallback to "system.peers" if it's not found
1635+
iter := c.query(ctx, peerV2Schemas)
1636+
1637+
err := iter.checkErrAndNotFound()
1638+
if err != nil {
1639+
if errFrame, ok := err.(errorFrame); ok && errFrame.code == ErrCodeInvalid { // system.peers_v2 not found, try system.peers
1640+
c.mu.Lock()
1641+
c.isSchemaV2 = false
1642+
c.mu.Unlock()
1643+
return c.query(ctx, peerSchema)
1644+
} else {
1645+
return iter
1646+
}
1647+
}
1648+
return iter
1649+
} else {
1650+
return c.query(ctx, peerSchema)
1651+
}
1652+
}
1653+
1654+
func (c *Conn) awaitSchemaAgreement(ctx context.Context) (err error) {
1655+
const localSchemas = "SELECT schema_version FROM system.local WHERE key='local'"
1656+
16271657
var versions map[string]struct{}
16281658
var schemaVersion string
16291659

16301660
endDeadline := time.Now().Add(c.session.cfg.MaxWaitSchemaAgreement)
16311661

1632-
queryString := fmt.Sprintf(peerSchemasTemplate, peersTableName(c.host.version))
1633-
16341662
for time.Now().Before(endDeadline) {
1635-
iter := c.query(ctx, queryString)
1663+
iter := c.querySystemPeers(ctx, c.host.version)
16361664

16371665
versions = make(map[string]struct{})
16381666

helpers.go

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -446,10 +446,3 @@ func LookupIP(host string) ([]net.IP, error) {
446446
return net.LookupIP(host)
447447

448448
}
449-
450-
func peersTableName(version cassVersion) string {
451-
if version.AtLeast(4, 0, 0) {
452-
return "system.peers_v2"
453-
}
454-
return "system.peers"
455-
}

host_source.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -561,8 +561,7 @@ func (r *ringDescriber) getClusterPeerInfo() ([]*HostInfo, error) {
561561
var hosts []*HostInfo
562562
iter := r.session.control.withConnHost(func(ch *connHost) *Iter {
563563
hosts = append(hosts, ch.host)
564-
return ch.conn.query(context.TODO(),
565-
fmt.Sprintf("SELECT * FROM %s", peersTableName(ch.host.version)))
564+
return ch.conn.querySystemPeers(context.TODO(), ch.host.version)
566565
})
567566

568567
if iter == nil {
@@ -631,8 +630,7 @@ func (r *ringDescriber) getHostInfo(hostID UUID) (*HostInfo, error) {
631630
}
632631

633632
if table == "system.peers" {
634-
return ch.conn.query(context.TODO(),
635-
fmt.Sprintf("SELECT * from %s", peersTableName(ch.host.version)))
633+
return ch.conn.querySystemPeers(context.TODO(), ch.host.version)
636634
} else {
637635
return ch.conn.query(context.TODO(), fmt.Sprintf("SELECT * FROM %s", table))
638636
}

0 commit comments

Comments
 (0)