Skip to content

Commit 30207ca

Browse files
RostislavPorohnyatengu-alt
authored andcommitted
Modified DisableInitialHostLookup flag name and logic
1 parent c75ff5f commit 30207ca

File tree

5 files changed

+107
-17
lines changed

5 files changed

+107
-17
lines changed

cluster.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -195,12 +195,12 @@ type ClusterConfig struct {
195195
// set to 10.0.0.1 which is what will be used to connect to.
196196
IgnorePeerAddr bool
197197

198-
// If DisableInitialHostLookup then the driver will not attempt to get host info
198+
// If DisableHostLookup then the driver will not attempt to get host info
199199
// from the system.peers table, this will mean that the driver will connect to
200200
// hosts supplied and will not attempt to lookup the hosts information, this will
201201
// mean that data_center, rack and token information will not be available and as
202202
// such host filtering and token aware query routing will not be available.
203-
DisableInitialHostLookup bool
203+
DisableHostLookup bool
204204

205205
// Configure events the driver will register for
206206
Events struct {

conn.go

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1898,20 +1898,20 @@ func (c *Conn) querySystemLocal(ctx context.Context) *Iter {
18981898
func (c *Conn) awaitSchemaAgreement(ctx context.Context) (err error) {
18991899
const localSchemas = "SELECT schema_version FROM system.local WHERE key='local'"
19001900

1901-
var versions map[string]struct{}
1901+
versions := make(map[string]struct{})
19021902
var schemaVersion string
19031903

19041904
endDeadline := time.Now().Add(c.session.cfg.MaxWaitSchemaAgreement)
19051905

19061906
for time.Now().Before(endDeadline) {
1907-
iter := c.querySystemPeers(ctx, c.host.version)
1907+
iter := &Iter{}
1908+
if !c.session.cfg.DisableHostLookup {
1909+
iter = c.querySystemPeers(ctx, c.host.version)
19081910

1909-
versions = make(map[string]struct{})
1910-
1911-
rows, err := iter.SliceMap()
1912-
if err != nil {
1913-
goto cont
1914-
}
1911+
rows, err := iter.SliceMap()
1912+
if err != nil {
1913+
goto cont
1914+
}
19151915

19161916
for _, row := range rows {
19171917
h, err := NewHostInfo(c.host.ConnectAddress(), c.session.cfg.Port)
@@ -1927,11 +1927,12 @@ func (c *Conn) awaitSchemaAgreement(ctx context.Context) (err error) {
19271927
continue
19281928
}
19291929

1930-
versions[host.schemaVersion] = struct{}{}
1931-
}
1930+
versions[host.schemaVersion] = struct{}{}
1931+
}
19321932

1933-
if err = iter.Close(); err != nil {
1934-
goto cont
1933+
if err = iter.Close(); err != nil {
1934+
goto cont
1935+
}
19351936
}
19361937

19371938
iter = c.query(ctx, localSchemas)

disable_host_lookup_test.go

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
package gocql
2+
3+
import (
4+
"fmt"
5+
"log"
6+
"testing"
7+
"time"
8+
)
9+
10+
func TestDisableHostLookup(t *testing.T) {
11+
cluster := createCluster()
12+
cluster.DisableHostLookup = true
13+
14+
cluster.NumConns = 1
15+
session := createSessionFromCluster(cluster, t)
16+
defer session.Close()
17+
18+
if err := createTable(session, "CREATE TABLE IF NOT EXISTS gocql_test.test_table (id int primary key)"); err != nil {
19+
t.Fatal(err)
20+
}
21+
if err := session.Query("insert into gocql_test.test_table (id) values (?)", 123).Exec(); err != nil {
22+
t.Error(err)
23+
return
24+
}
25+
26+
go simulateHeartbeatFailure(session)
27+
28+
queryCycles := 20
29+
for ; queryCycles > 0; queryCycles-- {
30+
time.Sleep(2 * time.Second)
31+
queryTestKeyspace(session)
32+
triggerSchemaChange(session)
33+
}
34+
}
35+
36+
func queryTestKeyspace(session *Session) {
37+
iter := session.Query("SELECT * FROM gocql_test.test_table").Iter()
38+
39+
var id string
40+
for iter.Scan(&id) {
41+
fmt.Printf("id: %s\n", id)
42+
}
43+
if err := iter.Close(); err != nil {
44+
log.Printf("error querying: %v", err)
45+
}
46+
}
47+
48+
func triggerSchemaChange(session *Session) {
49+
// Create a keyspace to trigger schema agreement
50+
err := session.Query(`CREATE KEYSPACE IF NOT EXISTS test_keyspace WITH replication = {'class': 'SimpleStrategy', 'replication_factor' : 1}`).Exec()
51+
if err != nil {
52+
log.Printf("unable to create keyspace: %v", err)
53+
}
54+
55+
// Alter the keyspace to trigger schema agreement
56+
err = session.Query(`ALTER KEYSPACE test_keyspace WITH replication = {'class': 'SimpleStrategy', 'replication_factor' : 2}`).Exec()
57+
if err != nil {
58+
log.Printf("unable to alter keyspace: %v", err)
59+
}
60+
61+
// Drop the keyspace after schema agreement
62+
err = session.Query(`DROP KEYSPACE IF EXISTS test_keyspace`).Exec()
63+
if err != nil {
64+
log.Printf("unable to drop keyspace: %v", err)
65+
}
66+
}
67+
68+
func simulateHeartbeatFailure(session *Session) {
69+
time.Sleep(20 * time.Second) // simulate 20 seconds of normal operation
70+
log.Println("Simulating heartbeat failure...")
71+
cluster := session.cfg
72+
session.Close() // close the session to simulate a heartbeat failure
73+
74+
time.Sleep(10 * time.Second) // wait for 10 seconds to simulate downtime
75+
76+
// Reconnect
77+
log.Println("Reconnecting...")
78+
newSession, err := cluster.CreateSession()
79+
if err != nil {
80+
log.Fatalf("unable to create session: %v", err)
81+
}
82+
*session = *newSession
83+
log.Println("Reconnected")
84+
}

host_source.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -665,6 +665,11 @@ func (r *ringDescriber) getClusterPeerInfo(localHost *HostInfo) ([]*HostInfo, er
665665
return nil, errNoControl
666666
}
667667

668+
// Check if host lookup is disabled
669+
if r.session.cfg.DisableHostLookup {
670+
return []*HostInfo{}, nil
671+
}
672+
668673
var peers []*HostInfo
669674
iter := r.session.control.withConnHost(func(ch *connHost) *Iter {
670675
return ch.conn.querySystemPeers(context.TODO(), localHost.version)

session.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -241,7 +241,7 @@ func (s *Session) init() error {
241241
return err
242242
}
243243

244-
if !s.cfg.DisableInitialHostLookup {
244+
if !s.cfg.DisableHostLookup {
245245
var partitioner string
246246
newHosts, partitioner, err := s.hostSource.GetHosts()
247247
if err != nil {
@@ -340,11 +340,11 @@ func (s *Session) init() error {
340340
go s.reconnectDownedHosts(s.cfg.ReconnectInterval)
341341
}
342342

343-
// If we disable the initial host lookup, we need to still check if the
343+
// If we disable the host lookup, we need to still check if the
344344
// cluster is using the newer system schema or not... however, if control
345345
// connection is disable, we really have no choice, so we just make our
346346
// best guess...
347-
if !s.cfg.disableControlConn && s.cfg.DisableInitialHostLookup {
347+
if !s.cfg.disableControlConn && s.cfg.DisableHostLookup {
348348
newer, _ := checkSystemSchema(s.control)
349349
s.useSystemSchema = newer
350350
} else {

0 commit comments

Comments
 (0)