diff --git a/CHANGELOG.md b/CHANGELOG.md index d8a3da0a7..2fa50aa00 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -76,6 +76,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Driver closes connection when timeout occurs (CASSGO-87) - Do not set beta protocol flag when using v5 (CASSGO-88) +- Driver is using system table ip addresses over the connection address (CASSGO-91) #### 2.0.0-rc1 diff --git a/control.go b/control.go index 25957da10..de374c589 100644 --- a/control.go +++ b/control.go @@ -343,7 +343,7 @@ type connHost struct { func (c *controlConn) setupConn(conn *Conn, sessionInit bool) error { // we need up-to-date host info for the filterHost call below iter := conn.querySystemLocal(context.TODO()) - host, err := c.session.hostInfoFromIter(iter, conn.host.connectAddress, conn.r.RemoteAddr().(*net.TCPAddr).Port) + host, err := c.session.hostInfoFromIter(iter, conn.host.ConnectAddress(), conn.r.RemoteAddr().(*net.TCPAddr).Port) if err != nil { // just cleanup iter.Close() diff --git a/docker_test.go b/docker_test.go new file mode 100644 index 000000000..3d6947510 --- /dev/null +++ b/docker_test.go @@ -0,0 +1,82 @@ +//go:build all || cassandra +// +build all cassandra + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/* + * Content before git sha 34fdeebefcbf183ed7f916f931aa0586fdaa1b40 + * Copyright (c) 2016, The Gocql authors, + * provided under the BSD-3-Clause License. + * See the NOTICE file distributed with this work for additional information. + */ + +package gocql + +import ( + "fmt" + "os/exec" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +// This test tests that gocql is able to connect to a C* node provisioned with Docker +// This is useful to make sure we don't break common testing configurations +func TestDocker(t *testing.T) { + version := "3.11.11" + randomUuid := MustRandomUUID().String() + err := exec.Command("docker", "run", "-d", "--name", randomUuid, "-p", "9080:9042", fmt.Sprintf("cassandra:%s", version)).Run() + defer exec.Command("docker", "rm", "-f", randomUuid).Run() + + if err != nil { + t.Fatal(err) + } + + cluster := NewCluster("localhost:9080") + cluster.Logger = NewLogger(LogLevelDebug) + + timer := time.After(60 * time.Second) + var session *Session + done := false + for !done { + select { + case <-timer: + t.Fatalf("timed out, last err: %v", err) + default: + session, err = cluster.CreateSession() + if err == nil { + done = true + } else if strings.Contains(err.Error(), "unable to discover protocol version") { + time.Sleep(5 * time.Second) + } else { + t.Fatal(err) + } + } + } + + defer session.Close() + var parsedVersion string + err = session.Query("SELECT release_version FROM system.local").Scan(&parsedVersion) + if err != nil { + t.Fatalf("failed to query: %s", err) + } + + assert.Equal(t, version, parsedVersion) +} diff --git a/host_source.go b/host_source.go index 3ee3dd07d..ab6537409 100644 --- a/host_source.go +++ b/host_source.go @@ -267,6 +267,13 @@ func (h *HostInfo) ConnectAddress() net.IP { return addr } +// actualConnectAddress can be used to access the connectAddress field with the lock (mu). +func (h *HostInfo) actualConnectAddress() net.IP { + h.mu.RLock() + defer h.mu.RUnlock() + return h.connectAddress +} + func (h *HostInfo) BroadcastAddress() net.IP { h.mu.RLock() defer h.mu.RUnlock() @@ -680,16 +687,10 @@ func newHostInfoFromRow(s *Session, defaultAddr net.IP, defaultPort int, row map } } - // Determine the connect address from available addresses - if validIpAddr(host.rpcAddress) { - host.connectAddress = host.rpcAddress - } else if validIpAddr(host.preferredIP) { - host.connectAddress = host.preferredIP - } else if validIpAddr(host.broadcastAddress) { - host.connectAddress = host.broadcastAddress - } else if validIpAddr(host.peer) { - host.connectAddress = host.peer - } + // this ensures that connectAddress gets a valid IP starting with host.connectAddress and if it's not valid + // then falls back to an address read from the system table + // it is important that a system table address is not picked up UNLESS connectAddress is nil or not valid + host.connectAddress, _ = host.connectAddressLocked() if s != nil && s.cfg.AddressTranslator != nil { ip, port := s.cfg.translateAddressPort(host.ConnectAddress(), host.port, s.logger) @@ -745,7 +746,8 @@ func (r *ringDescriber) getLocalHostInfo() (*HostInfo, error) { return nil, errNoControl } - host, err := r.session.hostInfoFromIter(iter, nil, r.session.cfg.Port) + // keep connect address for local host, ignore address from system.local + host, err := r.session.hostInfoFromIter(iter, iter.host.actualConnectAddress(), r.session.cfg.Port) if err != nil { // just cleanup iter.Close() @@ -873,7 +875,7 @@ func refreshRing(r *ringDescriber) error { if !ok { return fmt.Errorf("get existing host=%s from prevHosts: %w", h, ErrCannotFindHost) } - if h.connectAddress.Equal(existing.connectAddress) && h.nodeToNodeAddress().Equal(existing.nodeToNodeAddress()) { + if h.actualConnectAddress().Equal(existing.actualConnectAddress()) && h.nodeToNodeAddress().Equal(existing.nodeToNodeAddress()) { // no host IP change host.update(h) } else { diff --git a/session.go b/session.go index 722161f12..111c7d01c 100644 --- a/session.go +++ b/session.go @@ -1333,7 +1333,12 @@ func (q *Query) iterInternal(c *Conn, ctx context.Context) *Iter { internalQry := newInternalQuery(q, ctx) internalQry.conn = c - return c.executeQuery(internalQry.Context(), internalQry) + iter := c.executeQuery(internalQry.Context(), internalQry) + if iter != nil { + // set iter.host so that the caller can retrieve the connect address which should be preferable (if valid) for the local host + iter.host = c.host + } + return iter } // MapScan executes the query, copies the columns of the first selected