From 30207cae0c2580df081a160feab7939122385ac0 Mon Sep 17 00:00:00 2001 From: Rostislav Porohnya Date: Thu, 25 Jul 2024 18:15:08 +0300 Subject: [PATCH 1/2] Modified DisableInitialHostLookup flag name and logic --- cluster.go | 4 +- conn.go | 25 +++++------ disable_host_lookup_test.go | 84 +++++++++++++++++++++++++++++++++++++ host_source.go | 5 +++ session.go | 6 +-- 5 files changed, 107 insertions(+), 17 deletions(-) create mode 100644 disable_host_lookup_test.go diff --git a/cluster.go b/cluster.go index 2f9619134..be60f9613 100644 --- a/cluster.go +++ b/cluster.go @@ -195,12 +195,12 @@ type ClusterConfig struct { // set to 10.0.0.1 which is what will be used to connect to. IgnorePeerAddr bool - // If DisableInitialHostLookup then the driver will not attempt to get host info + // If DisableHostLookup then the driver will not attempt to get host info // from the system.peers table, this will mean that the driver will connect to // hosts supplied and will not attempt to lookup the hosts information, this will // mean that data_center, rack and token information will not be available and as // such host filtering and token aware query routing will not be available. - DisableInitialHostLookup bool + DisableHostLookup bool // Configure events the driver will register for Events struct { diff --git a/conn.go b/conn.go index 114f159d8..af8fe6ca9 100644 --- a/conn.go +++ b/conn.go @@ -1898,20 +1898,20 @@ func (c *Conn) querySystemLocal(ctx context.Context) *Iter { func (c *Conn) awaitSchemaAgreement(ctx context.Context) (err error) { const localSchemas = "SELECT schema_version FROM system.local WHERE key='local'" - var versions map[string]struct{} + versions := make(map[string]struct{}) var schemaVersion string endDeadline := time.Now().Add(c.session.cfg.MaxWaitSchemaAgreement) for time.Now().Before(endDeadline) { - iter := c.querySystemPeers(ctx, c.host.version) + iter := &Iter{} + if !c.session.cfg.DisableHostLookup { + iter = c.querySystemPeers(ctx, c.host.version) - versions = make(map[string]struct{}) - - rows, err := iter.SliceMap() - if err != nil { - goto cont - } + rows, err := iter.SliceMap() + if err != nil { + goto cont + } for _, row := range rows { h, err := NewHostInfo(c.host.ConnectAddress(), c.session.cfg.Port) @@ -1927,11 +1927,12 @@ func (c *Conn) awaitSchemaAgreement(ctx context.Context) (err error) { continue } - versions[host.schemaVersion] = struct{}{} - } + versions[host.schemaVersion] = struct{}{} + } - if err = iter.Close(); err != nil { - goto cont + if err = iter.Close(); err != nil { + goto cont + } } iter = c.query(ctx, localSchemas) diff --git a/disable_host_lookup_test.go b/disable_host_lookup_test.go new file mode 100644 index 000000000..eaaa40031 --- /dev/null +++ b/disable_host_lookup_test.go @@ -0,0 +1,84 @@ +package gocql + +import ( + "fmt" + "log" + "testing" + "time" +) + +func TestDisableHostLookup(t *testing.T) { + cluster := createCluster() + cluster.DisableHostLookup = true + + cluster.NumConns = 1 + session := createSessionFromCluster(cluster, t) + defer session.Close() + + if err := createTable(session, "CREATE TABLE IF NOT EXISTS gocql_test.test_table (id int primary key)"); err != nil { + t.Fatal(err) + } + if err := session.Query("insert into gocql_test.test_table (id) values (?)", 123).Exec(); err != nil { + t.Error(err) + return + } + + go simulateHeartbeatFailure(session) + + queryCycles := 20 + for ; queryCycles > 0; queryCycles-- { + time.Sleep(2 * time.Second) + queryTestKeyspace(session) + triggerSchemaChange(session) + } +} + +func queryTestKeyspace(session *Session) { + iter := session.Query("SELECT * FROM gocql_test.test_table").Iter() + + var id string + for iter.Scan(&id) { + fmt.Printf("id: %s\n", id) + } + if err := iter.Close(); err != nil { + log.Printf("error querying: %v", err) + } +} + +func triggerSchemaChange(session *Session) { + // Create a keyspace to trigger schema agreement + err := session.Query(`CREATE KEYSPACE IF NOT EXISTS test_keyspace WITH replication = {'class': 'SimpleStrategy', 'replication_factor' : 1}`).Exec() + if err != nil { + log.Printf("unable to create keyspace: %v", err) + } + + // Alter the keyspace to trigger schema agreement + err = session.Query(`ALTER KEYSPACE test_keyspace WITH replication = {'class': 'SimpleStrategy', 'replication_factor' : 2}`).Exec() + if err != nil { + log.Printf("unable to alter keyspace: %v", err) + } + + // Drop the keyspace after schema agreement + err = session.Query(`DROP KEYSPACE IF EXISTS test_keyspace`).Exec() + if err != nil { + log.Printf("unable to drop keyspace: %v", err) + } +} + +func simulateHeartbeatFailure(session *Session) { + time.Sleep(20 * time.Second) // simulate 20 seconds of normal operation + log.Println("Simulating heartbeat failure...") + cluster := session.cfg + session.Close() // close the session to simulate a heartbeat failure + + time.Sleep(10 * time.Second) // wait for 10 seconds to simulate downtime + + // Reconnect + log.Println("Reconnecting...") + newSession, err := cluster.CreateSession() + if err != nil { + log.Fatalf("unable to create session: %v", err) + } + *session = *newSession + log.Println("Reconnected") +} diff --git a/host_source.go b/host_source.go index adcf1a729..1f3d7e3ac 100644 --- a/host_source.go +++ b/host_source.go @@ -665,6 +665,11 @@ func (r *ringDescriber) getClusterPeerInfo(localHost *HostInfo) ([]*HostInfo, er return nil, errNoControl } + // Check if host lookup is disabled + if r.session.cfg.DisableHostLookup { + return []*HostInfo{}, nil + } + var peers []*HostInfo iter := r.session.control.withConnHost(func(ch *connHost) *Iter { return ch.conn.querySystemPeers(context.TODO(), localHost.version) diff --git a/session.go b/session.go index ed1a078d3..c4201d437 100644 --- a/session.go +++ b/session.go @@ -241,7 +241,7 @@ func (s *Session) init() error { return err } - if !s.cfg.DisableInitialHostLookup { + if !s.cfg.DisableHostLookup { var partitioner string newHosts, partitioner, err := s.hostSource.GetHosts() if err != nil { @@ -340,11 +340,11 @@ func (s *Session) init() error { go s.reconnectDownedHosts(s.cfg.ReconnectInterval) } - // If we disable the initial host lookup, we need to still check if the + // If we disable the host lookup, we need to still check if the // cluster is using the newer system schema or not... however, if control // connection is disable, we really have no choice, so we just make our // best guess... - if !s.cfg.disableControlConn && s.cfg.DisableInitialHostLookup { + if !s.cfg.disableControlConn && s.cfg.DisableHostLookup { newer, _ := checkSystemSchema(s.control) s.useSystemSchema = newer } else { From f5b6f8acfd8d3223207b011b39c0177c7847364e Mon Sep 17 00:00:00 2001 From: tengu-alt Date: Thu, 24 Apr 2025 13:59:34 +0300 Subject: [PATCH 2/2] Fix DisableInitialHostLookup flag ignored when querying system.peers(PR update) This commit provides update for the 'Modified DisableInitialHostLookup flag name and logic' refreshRing() does NOOP when 'DisableInitialHostLookup' is true. system.peers can be queried when flag is true for the schema version retrieving. patch by Oleksandr Luzhniy; for CASSGO-5 --- CHANGELOG.md | 2 + cassandra_test.go | 36 +++++++++++++++- cluster.go | 4 +- conn.go | 25 ++++++----- disable_host_lookup_test.go | 84 ------------------------------------- host_source.go | 8 ++-- session.go | 4 +- 7 files changed, 56 insertions(+), 107 deletions(-) delete mode 100644 disable_host_lookup_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 37ae55e3e..59a54d724 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -50,6 +50,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Refactoring hostpool package test and Expose HostInfo creation (CASSGO-59) +- Fix DisableInitialHostLookup flag ignored when querying system.peers (CASSGO-5) + ### Fixed - Cassandra version unmarshal fix (CASSGO-49) diff --git a/cassandra_test.go b/cassandra_test.go index 54a54f426..2c9ad9583 100644 --- a/cassandra_test.go +++ b/cassandra_test.go @@ -44,7 +44,7 @@ import ( "time" "unicode" - inf "gopkg.in/inf.v0" + "gopkg.in/inf.v0" "github.com/stretchr/testify/require" ) @@ -108,6 +108,40 @@ func TestUseStatementError(t *testing.T) { } } +// TestDisableHostLookupRingRefresh checks that session.ring will not be updated if cluster.DisableInitialHostLookup == true +func TestDisableHostLookupRingRefresh(t *testing.T) { + cluster := createCluster() + cluster.DisableInitialHostLookup = true + + cluster.NumConns = 1 + session := createSessionFromCluster(cluster, t) + defer session.Close() + + oldHosts := make(map[string]*HostInfo) + + for key, host := range session.ring.hosts { + host.broadcastAddress = net.ParseIP("10.10.10.10") + oldHosts[key] = host + } + + // if DisableInitialHostLookup == true - host.broadcastAddress must not be updated. + err := session.refreshRing() + if err != nil { + t.Fatal(err) + } + + for key, host := range session.ring.hosts { + oldHost, ok := oldHosts[key] + if !ok { + t.Fatalf("old host not found for key: %s", key) + } + + if !oldHost.broadcastAddress.Equal(host.broadcastAddress) { + t.Fatalf("broadcast addresses do not match for key: %s", key) + } + } +} + // TestInvalidKeyspace checks that an invalid keyspace will return promptly and without a flood of connections func TestInvalidKeyspace(t *testing.T) { cluster := createCluster() diff --git a/cluster.go b/cluster.go index be60f9613..2f9619134 100644 --- a/cluster.go +++ b/cluster.go @@ -195,12 +195,12 @@ type ClusterConfig struct { // set to 10.0.0.1 which is what will be used to connect to. IgnorePeerAddr bool - // If DisableHostLookup then the driver will not attempt to get host info + // If DisableInitialHostLookup then the driver will not attempt to get host info // from the system.peers table, this will mean that the driver will connect to // hosts supplied and will not attempt to lookup the hosts information, this will // mean that data_center, rack and token information will not be available and as // such host filtering and token aware query routing will not be available. - DisableHostLookup bool + DisableInitialHostLookup bool // Configure events the driver will register for Events struct { diff --git a/conn.go b/conn.go index af8fe6ca9..114f159d8 100644 --- a/conn.go +++ b/conn.go @@ -1898,20 +1898,20 @@ func (c *Conn) querySystemLocal(ctx context.Context) *Iter { func (c *Conn) awaitSchemaAgreement(ctx context.Context) (err error) { const localSchemas = "SELECT schema_version FROM system.local WHERE key='local'" - versions := make(map[string]struct{}) + var versions map[string]struct{} var schemaVersion string endDeadline := time.Now().Add(c.session.cfg.MaxWaitSchemaAgreement) for time.Now().Before(endDeadline) { - iter := &Iter{} - if !c.session.cfg.DisableHostLookup { - iter = c.querySystemPeers(ctx, c.host.version) + iter := c.querySystemPeers(ctx, c.host.version) - rows, err := iter.SliceMap() - if err != nil { - goto cont - } + versions = make(map[string]struct{}) + + rows, err := iter.SliceMap() + if err != nil { + goto cont + } for _, row := range rows { h, err := NewHostInfo(c.host.ConnectAddress(), c.session.cfg.Port) @@ -1927,12 +1927,11 @@ func (c *Conn) awaitSchemaAgreement(ctx context.Context) (err error) { continue } - versions[host.schemaVersion] = struct{}{} - } + versions[host.schemaVersion] = struct{}{} + } - if err = iter.Close(); err != nil { - goto cont - } + if err = iter.Close(); err != nil { + goto cont } iter = c.query(ctx, localSchemas) diff --git a/disable_host_lookup_test.go b/disable_host_lookup_test.go deleted file mode 100644 index eaaa40031..000000000 --- a/disable_host_lookup_test.go +++ /dev/null @@ -1,84 +0,0 @@ -package gocql - -import ( - "fmt" - "log" - "testing" - "time" -) - -func TestDisableHostLookup(t *testing.T) { - cluster := createCluster() - cluster.DisableHostLookup = true - - cluster.NumConns = 1 - session := createSessionFromCluster(cluster, t) - defer session.Close() - - if err := createTable(session, "CREATE TABLE IF NOT EXISTS gocql_test.test_table (id int primary key)"); err != nil { - t.Fatal(err) - } - if err := session.Query("insert into gocql_test.test_table (id) values (?)", 123).Exec(); err != nil { - t.Error(err) - return - } - - go simulateHeartbeatFailure(session) - - queryCycles := 20 - for ; queryCycles > 0; queryCycles-- { - time.Sleep(2 * time.Second) - queryTestKeyspace(session) - triggerSchemaChange(session) - } -} - -func queryTestKeyspace(session *Session) { - iter := session.Query("SELECT * FROM gocql_test.test_table").Iter() - - var id string - for iter.Scan(&id) { - fmt.Printf("id: %s\n", id) - } - if err := iter.Close(); err != nil { - log.Printf("error querying: %v", err) - } -} - -func triggerSchemaChange(session *Session) { - // Create a keyspace to trigger schema agreement - err := session.Query(`CREATE KEYSPACE IF NOT EXISTS test_keyspace WITH replication = {'class': 'SimpleStrategy', 'replication_factor' : 1}`).Exec() - if err != nil { - log.Printf("unable to create keyspace: %v", err) - } - - // Alter the keyspace to trigger schema agreement - err = session.Query(`ALTER KEYSPACE test_keyspace WITH replication = {'class': 'SimpleStrategy', 'replication_factor' : 2}`).Exec() - if err != nil { - log.Printf("unable to alter keyspace: %v", err) - } - - // Drop the keyspace after schema agreement - err = session.Query(`DROP KEYSPACE IF EXISTS test_keyspace`).Exec() - if err != nil { - log.Printf("unable to drop keyspace: %v", err) - } -} - -func simulateHeartbeatFailure(session *Session) { - time.Sleep(20 * time.Second) // simulate 20 seconds of normal operation - log.Println("Simulating heartbeat failure...") - cluster := session.cfg - session.Close() // close the session to simulate a heartbeat failure - - time.Sleep(10 * time.Second) // wait for 10 seconds to simulate downtime - - // Reconnect - log.Println("Reconnecting...") - newSession, err := cluster.CreateSession() - if err != nil { - log.Fatalf("unable to create session: %v", err) - } - *session = *newSession - log.Println("Reconnected") -} diff --git a/host_source.go b/host_source.go index 1f3d7e3ac..914d4ecb0 100644 --- a/host_source.go +++ b/host_source.go @@ -665,11 +665,6 @@ func (r *ringDescriber) getClusterPeerInfo(localHost *HostInfo) ([]*HostInfo, er return nil, errNoControl } - // Check if host lookup is disabled - if r.session.cfg.DisableHostLookup { - return []*HostInfo{}, nil - } - var peers []*HostInfo iter := r.session.control.withConnHost(func(ch *connHost) *Iter { return ch.conn.querySystemPeers(context.TODO(), localHost.version) @@ -752,6 +747,9 @@ func (s *Session) refreshRing() error { } func refreshRing(r *ringDescriber) error { + if r.session.cfg.DisableInitialHostLookup { + return nil + } hosts, partitioner, err := r.GetHosts() if err != nil { return err diff --git a/session.go b/session.go index c4201d437..1fb3be593 100644 --- a/session.go +++ b/session.go @@ -241,7 +241,7 @@ func (s *Session) init() error { return err } - if !s.cfg.DisableHostLookup { + if !s.cfg.DisableInitialHostLookup { var partitioner string newHosts, partitioner, err := s.hostSource.GetHosts() if err != nil { @@ -344,7 +344,7 @@ func (s *Session) init() error { // cluster is using the newer system schema or not... however, if control // connection is disable, we really have no choice, so we just make our // best guess... - if !s.cfg.disableControlConn && s.cfg.DisableHostLookup { + if !s.cfg.disableControlConn && s.cfg.DisableInitialHostLookup { newer, _ := checkSystemSchema(s.control) s.useSystemSchema = newer } else {