Skip to content

Commit 7e4069e

Browse files
committed
Prefer connection address over system table addresses
There is an unintended change in 2.0-rc1 causing the driver to prefer system table addresses over the connection address which breaks deployments that rely on this behavior from 1.x. This patch fixes this and keeps the behavior the same as it was in 1.x. It also fixes an issue where connection address was not being used when a full ring refresh was triggered. Patch by João Reis; reviewed by James Hartig for CASSGO-91
1 parent 5c60751 commit 7e4069e

File tree

5 files changed

+104
-14
lines changed

5 files changed

+104
-14
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
7676

7777
- Driver closes connection when timeout occurs (CASSGO-87)
7878
- Do not set beta protocol flag when using v5 (CASSGO-88)
79+
- Driver is using system table ip addresses over the connection address (CASSGO-91)
7980

8081
#### 2.0.0-rc1
8182

control.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -343,7 +343,7 @@ type connHost struct {
343343
func (c *controlConn) setupConn(conn *Conn, sessionInit bool) error {
344344
// we need up-to-date host info for the filterHost call below
345345
iter := conn.querySystemLocal(context.TODO())
346-
host, err := c.session.hostInfoFromIter(iter, conn.host.connectAddress, conn.r.RemoteAddr().(*net.TCPAddr).Port)
346+
host, err := c.session.hostInfoFromIter(iter, conn.host.ConnectAddress(), conn.r.RemoteAddr().(*net.TCPAddr).Port)
347347
if err != nil {
348348
// just cleanup
349349
iter.Close()

docker_test.go

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
//go:build all || cassandra
2+
// +build all cassandra
3+
4+
/*
5+
* Licensed to the Apache Software Foundation (ASF) under one
6+
* or more contributor license agreements. See the NOTICE file
7+
* distributed with this work for additional information
8+
* regarding copyright ownership. The ASF licenses this file
9+
* to you under the Apache License, Version 2.0 (the
10+
* "License"); you may not use this file except in compliance
11+
* with the License. You may obtain a copy of the License at
12+
*
13+
* http://www.apache.org/licenses/LICENSE-2.0
14+
*
15+
* Unless required by applicable law or agreed to in writing, software
16+
* distributed under the License is distributed on an "AS IS" BASIS,
17+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18+
* See the License for the specific language governing permissions and
19+
* limitations under the License.
20+
*/
21+
/*
22+
* Content before git sha 34fdeebefcbf183ed7f916f931aa0586fdaa1b40
23+
* Copyright (c) 2016, The Gocql authors,
24+
* provided under the BSD-3-Clause License.
25+
* See the NOTICE file distributed with this work for additional information.
26+
*/
27+
28+
package gocql
29+
30+
import (
31+
"fmt"
32+
"os/exec"
33+
"strings"
34+
"testing"
35+
"time"
36+
37+
"github.com/stretchr/testify/assert"
38+
)
39+
40+
// This test tests that gocql is able to connect to a C* node provisioned with Docker
41+
// This is useful to make sure we don't break common testing configurations
42+
func TestDocker(t *testing.T) {
43+
version := "3.11.11"
44+
randomUuid := MustRandomUUID().String()
45+
err := exec.Command("docker", "run", "-d", "--name", randomUuid, "-p", "9080:9042", fmt.Sprintf("cassandra:%s", version)).Run()
46+
defer exec.Command("docker", "rm", "-f", randomUuid).Run()
47+
48+
if err != nil {
49+
t.Fatal(err)
50+
}
51+
52+
cluster := NewCluster("localhost:9080")
53+
cluster.Logger = NewLogger(LogLevelDebug)
54+
55+
timer := time.After(60 * time.Second)
56+
var session *Session
57+
done := false
58+
for !done {
59+
select {
60+
case <-timer:
61+
t.Fatalf("timed out, last err: %v", err)
62+
default:
63+
session, err = cluster.CreateSession()
64+
if err == nil {
65+
done = true
66+
} else if strings.Contains(err.Error(), "unable to discover protocol version") {
67+
time.Sleep(5 * time.Second)
68+
} else {
69+
t.Fatal(err)
70+
}
71+
}
72+
}
73+
74+
defer session.Close()
75+
var parsedVersion string
76+
err = session.Query("SELECT release_version FROM system.local").Scan(&parsedVersion)
77+
if err != nil {
78+
t.Fatalf("failed to query: %s", err)
79+
}
80+
81+
assert.Equal(t, version, parsedVersion)
82+
}

host_source.go

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -267,6 +267,13 @@ func (h *HostInfo) ConnectAddress() net.IP {
267267
return addr
268268
}
269269

270+
// actualConnectAddress can be used to access the connectAddress field with the lock (mu).
271+
func (h *HostInfo) actualConnectAddress() net.IP {
272+
h.mu.RLock()
273+
defer h.mu.RUnlock()
274+
return h.connectAddress
275+
}
276+
270277
func (h *HostInfo) BroadcastAddress() net.IP {
271278
h.mu.RLock()
272279
defer h.mu.RUnlock()
@@ -680,16 +687,10 @@ func newHostInfoFromRow(s *Session, defaultAddr net.IP, defaultPort int, row map
680687
}
681688
}
682689

683-
// Determine the connect address from available addresses
684-
if validIpAddr(host.rpcAddress) {
685-
host.connectAddress = host.rpcAddress
686-
} else if validIpAddr(host.preferredIP) {
687-
host.connectAddress = host.preferredIP
688-
} else if validIpAddr(host.broadcastAddress) {
689-
host.connectAddress = host.broadcastAddress
690-
} else if validIpAddr(host.peer) {
691-
host.connectAddress = host.peer
692-
}
690+
// this ensures that connectAddress gets a valid IP starting with host.connectAddress and if it's not valid
691+
// then falls back to an address read from the system table
692+
// it is important that a system table address is not picked up UNLESS connectAddress is nil or not valid
693+
host.connectAddress, _ = host.connectAddressLocked()
693694

694695
if s != nil && s.cfg.AddressTranslator != nil {
695696
ip, port := s.cfg.translateAddressPort(host.ConnectAddress(), host.port, s.logger)
@@ -745,7 +746,8 @@ func (r *ringDescriber) getLocalHostInfo() (*HostInfo, error) {
745746
return nil, errNoControl
746747
}
747748

748-
host, err := r.session.hostInfoFromIter(iter, nil, r.session.cfg.Port)
749+
// keep connect address for local host, ignore address from system.local
750+
host, err := r.session.hostInfoFromIter(iter, iter.host.actualConnectAddress(), r.session.cfg.Port)
749751
if err != nil {
750752
// just cleanup
751753
iter.Close()
@@ -873,7 +875,7 @@ func refreshRing(r *ringDescriber) error {
873875
if !ok {
874876
return fmt.Errorf("get existing host=%s from prevHosts: %w", h, ErrCannotFindHost)
875877
}
876-
if h.connectAddress.Equal(existing.connectAddress) && h.nodeToNodeAddress().Equal(existing.nodeToNodeAddress()) {
878+
if h.actualConnectAddress().Equal(existing.actualConnectAddress()) && h.nodeToNodeAddress().Equal(existing.nodeToNodeAddress()) {
877879
// no host IP change
878880
host.update(h)
879881
} else {

session.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1333,7 +1333,12 @@ func (q *Query) iterInternal(c *Conn, ctx context.Context) *Iter {
13331333
internalQry := newInternalQuery(q, ctx)
13341334
internalQry.conn = c
13351335

1336-
return c.executeQuery(internalQry.Context(), internalQry)
1336+
iter := c.executeQuery(internalQry.Context(), internalQry)
1337+
if iter != nil {
1338+
// set iter.host so that the caller can retrieve the connect address which should be preferable (if valid) for the local host
1339+
iter.host = c.host
1340+
}
1341+
return iter
13371342
}
13381343

13391344
// MapScan executes the query, copies the columns of the first selected

0 commit comments

Comments
 (0)