Skip to content

Commit 6cd5280

Browse files
Modified DisableInitialHostLookup flag name and logic
1 parent 974fa12 commit 6cd5280

File tree

5 files changed

+114
-24
lines changed

5 files changed

+114
-24
lines changed

cluster.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -189,12 +189,12 @@ type ClusterConfig struct {
189189
// set to 10.0.0.1 which is what will be used to connect to.
190190
IgnorePeerAddr bool
191191

192-
// If DisableInitialHostLookup then the driver will not attempt to get host info
192+
// If DisableHostLookup then the driver will not attempt to get host info
193193
// from the system.peers table, this will mean that the driver will connect to
194194
// hosts supplied and will not attempt to lookup the hosts information, this will
195195
// mean that data_centre, rack and token information will not be available and as
196196
// such host filtering and token aware query routing will not be available.
197-
DisableInitialHostLookup bool
197+
DisableHostLookup bool
198198

199199
// Configure events the driver will register for
200200
Events struct {

conn.go

Lines changed: 20 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1689,36 +1689,37 @@ func (c *Conn) querySystemLocal(ctx context.Context) *Iter {
16891689
func (c *Conn) awaitSchemaAgreement(ctx context.Context) (err error) {
16901690
const localSchemas = "SELECT schema_version FROM system.local WHERE key='local'"
16911691

1692-
var versions map[string]struct{}
1692+
versions := make(map[string]struct{})
16931693
var schemaVersion string
16941694

16951695
endDeadline := time.Now().Add(c.session.cfg.MaxWaitSchemaAgreement)
16961696

16971697
for time.Now().Before(endDeadline) {
1698-
iter := c.querySystemPeers(ctx, c.host.version)
1698+
iter := &Iter{}
1699+
if !c.session.cfg.DisableHostLookup {
1700+
iter = c.querySystemPeers(ctx, c.host.version)
16991701

1700-
versions = make(map[string]struct{})
1701-
1702-
rows, err := iter.SliceMap()
1703-
if err != nil {
1704-
goto cont
1705-
}
1706-
1707-
for _, row := range rows {
1708-
host, err := c.session.hostInfoFromMap(row, &HostInfo{connectAddress: c.host.ConnectAddress(), port: c.session.cfg.Port})
1702+
rows, err := iter.SliceMap()
17091703
if err != nil {
17101704
goto cont
17111705
}
1712-
if !isValidPeer(host) || host.schemaVersion == "" {
1713-
c.logger.Printf("invalid peer or peer with empty schema_version: peer=%q", host)
1714-
continue
1715-
}
17161706

1717-
versions[host.schemaVersion] = struct{}{}
1718-
}
1707+
for _, row := range rows {
1708+
host, err := c.session.hostInfoFromMap(row, &HostInfo{connectAddress: c.host.ConnectAddress(), port: c.session.cfg.Port})
1709+
if err != nil {
1710+
goto cont
1711+
}
1712+
if !isValidPeer(host) || host.schemaVersion == "" {
1713+
c.logger.Printf("invalid peer or peer with empty schema_version: peer=%q", host)
1714+
continue
1715+
}
17191716

1720-
if err = iter.Close(); err != nil {
1721-
goto cont
1717+
versions[host.schemaVersion] = struct{}{}
1718+
}
1719+
1720+
if err = iter.Close(); err != nil {
1721+
goto cont
1722+
}
17221723
}
17231724

17241725
iter = c.query(ctx, localSchemas)

disable_host_lookup_test.go

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
package gocql
2+
3+
import (
4+
"fmt"
5+
"log"
6+
"testing"
7+
"time"
8+
)
9+
10+
func TestDisableHostLookup(t *testing.T) {
11+
cluster := createCluster()
12+
cluster.DisableHostLookup = true
13+
14+
cluster.NumConns = 1
15+
session := createSessionFromCluster(cluster, t)
16+
defer session.Close()
17+
18+
if err := createTable(session, "CREATE TABLE IF NOT EXISTS gocql_test.test_table (id int primary key)"); err != nil {
19+
t.Fatal(err)
20+
}
21+
if err := session.Query("insert into gocql_test.test_table (id) values (?)", 123).Exec(); err != nil {
22+
t.Error(err)
23+
return
24+
}
25+
26+
go simulateHeartbeatFailure(session)
27+
28+
queryCycles := 20
29+
for ; queryCycles > 0; queryCycles-- {
30+
time.Sleep(2 * time.Second)
31+
queryTestKeyspace(session)
32+
triggerSchemaChange(session)
33+
}
34+
}
35+
36+
func queryTestKeyspace(session *Session) {
37+
iter := session.Query("SELECT * FROM gocql_test.test_table").Iter()
38+
39+
var id string
40+
for iter.Scan(&id) {
41+
fmt.Printf("id: %s\n", id)
42+
}
43+
if err := iter.Close(); err != nil {
44+
log.Printf("error querying: %v", err)
45+
}
46+
}
47+
48+
func triggerSchemaChange(session *Session) {
49+
// Create a keyspace to trigger schema agreement
50+
err := session.Query(`CREATE KEYSPACE IF NOT EXISTS test_keyspace WITH replication = {'class': 'SimpleStrategy', 'replication_factor' : 1}`).Exec()
51+
if err != nil {
52+
log.Printf("unable to create keyspace: %v", err)
53+
}
54+
55+
// Alter the keyspace to trigger schema agreement
56+
err = session.Query(`ALTER KEYSPACE test_keyspace WITH replication = {'class': 'SimpleStrategy', 'replication_factor' : 2}`).Exec()
57+
if err != nil {
58+
log.Printf("unable to alter keyspace: %v", err)
59+
}
60+
61+
// Drop the keyspace after schema agreement
62+
err = session.Query(`DROP KEYSPACE IF EXISTS test_keyspace`).Exec()
63+
if err != nil {
64+
log.Printf("unable to drop keyspace: %v", err)
65+
}
66+
}
67+
68+
func simulateHeartbeatFailure(session *Session) {
69+
time.Sleep(20 * time.Second) // simulate 20 seconds of normal operation
70+
log.Println("Simulating heartbeat failure...")
71+
cluster := session.cfg
72+
session.Close() // close the session to simulate a heartbeat failure
73+
74+
time.Sleep(10 * time.Second) // wait for 10 seconds to simulate downtime
75+
76+
// Reconnect
77+
log.Println("Reconnecting...")
78+
newSession, err := cluster.CreateSession()
79+
if err != nil {
80+
log.Fatalf("unable to create session: %v", err)
81+
}
82+
*session = *newSession
83+
log.Println("Reconnected")
84+
}

host_source.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -635,6 +635,11 @@ func (r *ringDescriber) getClusterPeerInfo(localHost *HostInfo) ([]*HostInfo, er
635635
return nil, errNoControl
636636
}
637637

638+
// Check if host lookup is disabled
639+
if r.session.cfg.DisableHostLookup {
640+
return []*HostInfo{}, nil
641+
}
642+
638643
var peers []*HostInfo
639644
iter := r.session.control.withConnHost(func(ch *connHost) *Iter {
640645
return ch.conn.querySystemPeers(context.TODO(), localHost.version)

session.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -237,7 +237,7 @@ func (s *Session) init() error {
237237
return err
238238
}
239239

240-
if !s.cfg.DisableInitialHostLookup {
240+
if !s.cfg.DisableHostLookup {
241241
var partitioner string
242242
newHosts, partitioner, err := s.hostSource.GetHosts()
243243
if err != nil {
@@ -336,11 +336,11 @@ func (s *Session) init() error {
336336
go s.reconnectDownedHosts(s.cfg.ReconnectInterval)
337337
}
338338

339-
// If we disable the initial host lookup, we need to still check if the
339+
// If we disable the host lookup, we need to still check if the
340340
// cluster is using the newer system schema or not... however, if control
341341
// connection is disable, we really have no choice, so we just make our
342342
// best guess...
343-
if !s.cfg.disableControlConn && s.cfg.DisableInitialHostLookup {
343+
if !s.cfg.disableControlConn && s.cfg.DisableHostLookup {
344344
newer, _ := checkSystemSchema(s.control)
345345
s.useSystemSchema = newer
346346
} else {

0 commit comments

Comments
 (0)