Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

- Driver closes connection when timeout occurs (CASSGO-87)
- Do not set beta protocol flag when using v5 (CASSGO-88)
- Driver is using system table ip addresses over the connection address (CASSGO-91)

#### 2.0.0-rc1

Expand Down
2 changes: 1 addition & 1 deletion control.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ type connHost struct {
func (c *controlConn) setupConn(conn *Conn, sessionInit bool) error {
// we need up-to-date host info for the filterHost call below
iter := conn.querySystemLocal(context.TODO())
host, err := c.session.hostInfoFromIter(iter, conn.host.connectAddress, conn.r.RemoteAddr().(*net.TCPAddr).Port)
host, err := c.session.hostInfoFromIter(iter, conn.host.ConnectAddress(), conn.r.RemoteAddr().(*net.TCPAddr).Port)
if err != nil {
// just cleanup
iter.Close()
Expand Down
82 changes: 82 additions & 0 deletions docker_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
//go:build all || cassandra
// +build all cassandra

/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
* Content before git sha 34fdeebefcbf183ed7f916f931aa0586fdaa1b40
* Copyright (c) 2016, The Gocql authors,
* provided under the BSD-3-Clause License.
* See the NOTICE file distributed with this work for additional information.
*/

package gocql

import (
"fmt"
"os/exec"
"strings"
"testing"
"time"

"github.com/stretchr/testify/assert"
)

// This test tests that gocql is able to connect to a C* node provisioned with Docker
// This is useful to make sure we don't break common testing configurations
func TestDocker(t *testing.T) {
version := "3.11.11"
randomUuid := MustRandomUUID().String()
err := exec.Command("docker", "run", "-d", "--name", randomUuid, "-p", "9080:9042", fmt.Sprintf("cassandra:%s", version)).Run()
defer exec.Command("docker", "rm", "-f", randomUuid).Run()

if err != nil {
t.Fatal(err)
}

cluster := NewCluster("localhost:9080")
cluster.Logger = NewLogger(LogLevelDebug)

timer := time.After(60 * time.Second)
var session *Session
done := false
for !done {
select {
case <-timer:
t.Fatalf("timed out, last err: %v", err)
default:
session, err = cluster.CreateSession()
if err == nil {
done = true
} else if strings.Contains(err.Error(), "unable to discover protocol version") {
time.Sleep(5 * time.Second)
} else {
t.Fatal(err)
}
}
}

defer session.Close()
var parsedVersion string
err = session.Query("SELECT release_version FROM system.local").Scan(&parsedVersion)
if err != nil {
t.Fatalf("failed to query: %s", err)
}

assert.Equal(t, version, parsedVersion)
}
26 changes: 14 additions & 12 deletions host_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,13 @@ func (h *HostInfo) ConnectAddress() net.IP {
return addr
}

// actualConnectAddress can be used to access the connectAddress field with the lock (mu).
func (h *HostInfo) actualConnectAddress() net.IP {
h.mu.RLock()
defer h.mu.RUnlock()
return h.connectAddress
}

func (h *HostInfo) BroadcastAddress() net.IP {
h.mu.RLock()
defer h.mu.RUnlock()
Expand Down Expand Up @@ -680,16 +687,10 @@ func newHostInfoFromRow(s *Session, defaultAddr net.IP, defaultPort int, row map
}
}

// Determine the connect address from available addresses
if validIpAddr(host.rpcAddress) {
host.connectAddress = host.rpcAddress
} else if validIpAddr(host.preferredIP) {
host.connectAddress = host.preferredIP
} else if validIpAddr(host.broadcastAddress) {
host.connectAddress = host.broadcastAddress
} else if validIpAddr(host.peer) {
host.connectAddress = host.peer
}
// this ensures that connectAddress gets a valid IP starting with host.connectAddress and if it's not valid
// then falls back to an address read from the system table
// it is important that a system table address is not picked up UNLESS connectAddress is nil or not valid
host.connectAddress, _ = host.connectAddressLocked()

if s != nil && s.cfg.AddressTranslator != nil {
ip, port := s.cfg.translateAddressPort(host.ConnectAddress(), host.port, s.logger)
Expand Down Expand Up @@ -745,7 +746,8 @@ func (r *ringDescriber) getLocalHostInfo() (*HostInfo, error) {
return nil, errNoControl
}

host, err := r.session.hostInfoFromIter(iter, nil, r.session.cfg.Port)
// keep connect address for local host, ignore address from system.local
host, err := r.session.hostInfoFromIter(iter, iter.host.actualConnectAddress(), r.session.cfg.Port)
if err != nil {
// just cleanup
iter.Close()
Expand Down Expand Up @@ -873,7 +875,7 @@ func refreshRing(r *ringDescriber) error {
if !ok {
return fmt.Errorf("get existing host=%s from prevHosts: %w", h, ErrCannotFindHost)
}
if h.connectAddress.Equal(existing.connectAddress) && h.nodeToNodeAddress().Equal(existing.nodeToNodeAddress()) {
if h.actualConnectAddress().Equal(existing.actualConnectAddress()) && h.nodeToNodeAddress().Equal(existing.nodeToNodeAddress()) {
// no host IP change
host.update(h)
} else {
Expand Down
7 changes: 6 additions & 1 deletion session.go
Original file line number Diff line number Diff line change
Expand Up @@ -1333,7 +1333,12 @@ func (q *Query) iterInternal(c *Conn, ctx context.Context) *Iter {
internalQry := newInternalQuery(q, ctx)
internalQry.conn = c

return c.executeQuery(internalQry.Context(), internalQry)
iter := c.executeQuery(internalQry.Context(), internalQry)
if iter != nil {
// set iter.host so that the caller can retrieve the connect address which should be preferable (if valid) for the local host
iter.host = c.host
}
return iter
}

// MapScan executes the query, copies the columns of the first selected
Expand Down