Skip to content

Commit cf12fa1

Browse files
committed
Prefer connection address over system table addresses
There is an unintended change in 2.0-rc1 causing the driver to prefer system table addresses over the connection address which breaks deployments that rely on this behavior from 1.x. This patch fixes this and keeps the behavior the same as it was in 1.x. It also fixes an issue where connection address was not being used when a full ring refresh was triggered. Patch by João Reis; reviewed by James Hartig for CASSGO-91
1 parent 5c60751 commit cf12fa1

File tree

5 files changed

+98
-14
lines changed

5 files changed

+98
-14
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
7676

7777
- Driver closes connection when timeout occurs (CASSGO-87)
7878
- Do not set beta protocol flag when using v5 (CASSGO-88)
79+
- Driver is using system table ip addresses over the connection address (CASSGO-91)
7980

8081
#### 2.0.0-rc1
8182

control.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -343,7 +343,7 @@ type connHost struct {
343343
func (c *controlConn) setupConn(conn *Conn, sessionInit bool) error {
344344
// we need up-to-date host info for the filterHost call below
345345
iter := conn.querySystemLocal(context.TODO())
346-
host, err := c.session.hostInfoFromIter(iter, conn.host.connectAddress, conn.r.RemoteAddr().(*net.TCPAddr).Port)
346+
host, err := c.session.hostInfoFromIter(iter, conn.host.ConnectAddress(), conn.r.RemoteAddr().(*net.TCPAddr).Port)
347347
if err != nil {
348348
// just cleanup
349349
iter.Close()

docker_test.go

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
//go:build all || cassandra
2+
// +build all cassandra
3+
4+
/*
5+
* Licensed to the Apache Software Foundation (ASF) under one
6+
* or more contributor license agreements. See the NOTICE file
7+
* distributed with this work for additional information
8+
* regarding copyright ownership. The ASF licenses this file
9+
* to you under the Apache License, Version 2.0 (the
10+
* "License"); you may not use this file except in compliance
11+
* with the License. You may obtain a copy of the License at
12+
*
13+
* http://www.apache.org/licenses/LICENSE-2.0
14+
*
15+
* Unless required by applicable law or agreed to in writing, software
16+
* distributed under the License is distributed on an "AS IS" BASIS,
17+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18+
* See the License for the specific language governing permissions and
19+
* limitations under the License.
20+
*/
21+
/*
22+
* Content before git sha 34fdeebefcbf183ed7f916f931aa0586fdaa1b40
23+
* Copyright (c) 2016, The Gocql authors,
24+
* provided under the BSD-3-Clause License.
25+
* See the NOTICE file distributed with this work for additional information.
26+
*/
27+
28+
package gocql
29+
30+
import (
31+
"fmt"
32+
"os/exec"
33+
"strings"
34+
"testing"
35+
"time"
36+
37+
"github.com/stretchr/testify/assert"
38+
)
39+
40+
// This test tests that gocql is able to connect to a C* node provisioned with Docker
41+
// This is useful to make sure we don't break common testing configurations
42+
func TestDocker(t *testing.T) {
43+
version := "3.11.11"
44+
randomUuid := MustRandomUUID().String()
45+
err := exec.Command("docker", "run", "-d", "--name", randomUuid, "-p", "9080:9042", fmt.Sprintf("cassandra:%s", version)).Run()
46+
defer exec.Command("docker", "rm", "-f", randomUuid).Run()
47+
48+
if err != nil {
49+
t.Fatal(err)
50+
}
51+
52+
cluster := NewCluster("localhost:9080")
53+
cluster.Logger = NewLogger(LogLevelDebug)
54+
55+
timer := time.After(60 * time.Second)
56+
var session *Session
57+
done := false
58+
for !done {
59+
select {
60+
case <-timer:
61+
t.Fatalf("timed out, last err: %v", err)
62+
default:
63+
session, err = cluster.CreateSession()
64+
if err == nil {
65+
done = true
66+
} else if strings.Contains(err.Error(), "unable to discover protocol version") {
67+
time.Sleep(5 * time.Second)
68+
} else {
69+
t.Fatal(err)
70+
}
71+
}
72+
}
73+
74+
defer session.Close()
75+
var parsedVersion string
76+
err = session.Query("SELECT release_version FROM system.local").Scan(&parsedVersion)
77+
if err != nil {
78+
t.Fatalf("failed to query: %s", err)
79+
}
80+
81+
assert.Equal(t, version, parsedVersion)
82+
}

host_source.go

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -267,6 +267,12 @@ func (h *HostInfo) ConnectAddress() net.IP {
267267
return addr
268268
}
269269

270+
func (h *HostInfo) actualConnectAddress() net.IP {
271+
h.mu.RLock()
272+
defer h.mu.RUnlock()
273+
return h.connectAddress
274+
}
275+
270276
func (h *HostInfo) BroadcastAddress() net.IP {
271277
h.mu.RLock()
272278
defer h.mu.RUnlock()
@@ -680,16 +686,7 @@ func newHostInfoFromRow(s *Session, defaultAddr net.IP, defaultPort int, row map
680686
}
681687
}
682688

683-
// Determine the connect address from available addresses
684-
if validIpAddr(host.rpcAddress) {
685-
host.connectAddress = host.rpcAddress
686-
} else if validIpAddr(host.preferredIP) {
687-
host.connectAddress = host.preferredIP
688-
} else if validIpAddr(host.broadcastAddress) {
689-
host.connectAddress = host.broadcastAddress
690-
} else if validIpAddr(host.peer) {
691-
host.connectAddress = host.peer
692-
}
689+
host.connectAddress, _ = host.connectAddressLocked()
693690

694691
if s != nil && s.cfg.AddressTranslator != nil {
695692
ip, port := s.cfg.translateAddressPort(host.ConnectAddress(), host.port, s.logger)
@@ -745,7 +742,7 @@ func (r *ringDescriber) getLocalHostInfo() (*HostInfo, error) {
745742
return nil, errNoControl
746743
}
747744

748-
host, err := r.session.hostInfoFromIter(iter, nil, r.session.cfg.Port)
745+
host, err := r.session.hostInfoFromIter(iter, iter.host.actualConnectAddress(), r.session.cfg.Port)
749746
if err != nil {
750747
// just cleanup
751748
iter.Close()
@@ -873,7 +870,7 @@ func refreshRing(r *ringDescriber) error {
873870
if !ok {
874871
return fmt.Errorf("get existing host=%s from prevHosts: %w", h, ErrCannotFindHost)
875872
}
876-
if h.connectAddress.Equal(existing.connectAddress) && h.nodeToNodeAddress().Equal(existing.nodeToNodeAddress()) {
873+
if h.actualConnectAddress().Equal(existing.actualConnectAddress()) && h.nodeToNodeAddress().Equal(existing.nodeToNodeAddress()) {
877874
// no host IP change
878875
host.update(h)
879876
} else {

session.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1333,7 +1333,11 @@ func (q *Query) iterInternal(c *Conn, ctx context.Context) *Iter {
13331333
internalQry := newInternalQuery(q, ctx)
13341334
internalQry.conn = c
13351335

1336-
return c.executeQuery(internalQry.Context(), internalQry)
1336+
iter := c.executeQuery(internalQry.Context(), internalQry)
1337+
if iter != nil {
1338+
iter.host = c.host
1339+
}
1340+
return iter
13371341
}
13381342

13391343
// MapScan executes the query, copies the columns of the first selected

0 commit comments

Comments
 (0)