Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Support for Native Protocol 5. Following protocol changes exposed new API
Query.SetKeyspace(), Query.WithNowInSeconds(), Batch.SetKeyspace(), Batch.WithNowInSeconds() (CASSGO-1)
- Externally-defined type registration (CASSGO-43)
- Add Query and Batch to ObservedQuery and ObservedBatch (CASSGO-73)

### Changed

Expand All @@ -43,6 +44,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- inet columns default to net.IP when using MapScan or SliceMap (CASSGO-43)
- NativeType removed (CASSGO-43)
- `New` and `NewWithError` removed and replaced with `Zero` (CASSGO-43)
- Changes to Query and Batch to make them safely reusable (CASSGO-22)

### Fixed

Expand Down
3 changes: 2 additions & 1 deletion batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,10 @@
package gocql

import (
"github.com/stretchr/testify/require"
"testing"
"time"

"github.com/stretchr/testify/require"
)

func TestBatch_Errors(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion cass1batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func TestShouldPrepareFunction(t *testing.T) {
}

for _, test := range shouldPrepareTests {
q := &Query{stmt: test.Stmt, routingInfo: &queryRoutingInfo{}}
q := &Query{stmt: test.Stmt}
if got := q.shouldPrepare(); got != test.Result {
t.Fatalf("%q: got %v, expected %v\n", test.Stmt, got, test.Result)
}
Expand Down
103 changes: 62 additions & 41 deletions cassandra_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1302,8 +1302,8 @@ func Test_RetryPolicyIdempotence(t *testing.T) {
q.RetryPolicy(&MyRetryPolicy{})
q.Consistency(All)

_ = q.Exec()
require.Equal(t, tc.expectedNumberOfTries, q.Attempts())
it := q.Iter()
require.Equal(t, tc.expectedNumberOfTries, it.Attempts())
})
}
}
Expand Down Expand Up @@ -1673,15 +1673,15 @@ func TestPrepare_MissingSchemaPrepare(t *testing.T) {
defer s.Close()

insertQry := s.Query("INSERT INTO invalidschemaprep (val) VALUES (?)", 5)
if err := conn.executeQuery(ctx, insertQry).err; err == nil {
if err := conn.executeQuery(ctx, newInternalQuery(insertQry, nil)).err; err == nil {
t.Fatal("expected error, but got nil.")
}

if err := createTable(s, "CREATE TABLE gocql_test.invalidschemaprep (val int, PRIMARY KEY (val))"); err != nil {
t.Fatal("create table:", err)
}

if err := conn.executeQuery(ctx, insertQry).err; err != nil {
if err := conn.executeQuery(ctx, newInternalQuery(insertQry, nil)).err; err != nil {
t.Fatal(err) // unconfigured columnfamily
}
}
Expand All @@ -1695,7 +1695,7 @@ func TestPrepare_ReprepareStatement(t *testing.T) {

stmt, conn := injectInvalidPreparedStatement(t, session, "test_reprepare_statement")
query := session.Query(stmt, "bar")
if err := conn.executeQuery(ctx, query).Close(); err != nil {
if err := conn.executeQuery(ctx, newInternalQuery(query, nil)).Close(); err != nil {
t.Fatalf("Failed to execute query for reprepare statement: %v", err)
}
}
Expand All @@ -1714,7 +1714,7 @@ func TestPrepare_ReprepareBatch(t *testing.T) {
stmt, conn := injectInvalidPreparedStatement(t, session, "test_reprepare_statement_batch")
batch := session.Batch(UnloggedBatch)
batch.Query(stmt, "bar")
if err := conn.executeBatch(ctx, batch).Close(); err != nil {
if err := conn.executeBatch(ctx, newInternalBatch(batch, nil)).Close(); err != nil {
t.Fatalf("Failed to execute query for reprepare statement: %v", err)
}
}
Expand Down Expand Up @@ -2059,14 +2059,16 @@ func TestQueryStats(t *testing.T) {
session := createSession(t)
defer session.Close()
qry := session.Query("SELECT * FROM system.peers")
if err := qry.Exec(); err != nil {
iter := qry.Iter()
err := iter.Close()
if err != nil {
t.Fatalf("query failed. %v", err)
} else {
if qry.Attempts() < 1 {
if iter.Attempts() < 1 {
t.Fatal("expected at least 1 attempt, but got 0")
}
if qry.Latency() <= 0 {
t.Fatalf("expected latency to be greater than 0, but got %v instead.", qry.Latency())
if iter.Latency() <= 0 {
t.Fatalf("expected latency to be greater than 0, but got %v instead.", iter.Latency())
}
}
}
Expand Down Expand Up @@ -2099,15 +2101,16 @@ func TestBatchStats(t *testing.T) {
b := session.Batch(LoggedBatch)
b.Query("INSERT INTO batchStats (id) VALUES (?)", 1)
b.Query("INSERT INTO batchStats (id) VALUES (?)", 2)

if err := b.Exec(); err != nil {
iter := b.Iter()
err := iter.Close()
if err != nil {
t.Fatalf("query failed. %v", err)
} else {
if b.Attempts() < 1 {
if iter.Attempts() < 1 {
t.Fatal("expected at least 1 attempt, but got 0")
}
if b.Latency() <= 0 {
t.Fatalf("expected latency to be greater than 0, but got %v instead.", b.Latency())
if iter.Latency() <= 0 {
t.Fatalf("expected latency to be greater than 0, but got %v instead.", iter.Latency())
}
}
}
Expand Down Expand Up @@ -2850,7 +2853,7 @@ func TestRoutingKey(t *testing.T) {
t.Errorf("Expected cache size to be 1 but was %d", cacheSize)
}

query := session.Query("SELECT * FROM test_single_routing_key WHERE second_id=? AND first_id=?", 1, 2)
query := newInternalQuery(session.Query("SELECT * FROM test_single_routing_key WHERE second_id=? AND first_id=?", 1, 2), nil)
routingKey, err := query.GetRoutingKey()
if err != nil {
t.Fatalf("Failed to get routing key due to error: %v", err)
Expand Down Expand Up @@ -2892,7 +2895,7 @@ func TestRoutingKey(t *testing.T) {
t.Fatalf("Expected routing key types[0] to be %v but was %v", TypeInt, routingKeyInfo.types[1].Type())
}

query = session.Query("SELECT * FROM test_composite_routing_key WHERE second_id=? AND first_id=?", 1, 2)
query = newInternalQuery(session.Query("SELECT * FROM test_composite_routing_key WHERE second_id=? AND first_id=?", 1, 2), nil)
routingKey, err = query.GetRoutingKey()
if err != nil {
t.Fatalf("Failed to get routing key due to error: %v", err)
Expand Down Expand Up @@ -3441,15 +3444,16 @@ func TestUnsetColBatch(t *testing.T) {
b.Query("INSERT INTO gocql_test.batchUnsetInsert(id, my_int, my_text) VALUES (?,?,?)", 1, 1, UnsetValue)
b.Query("INSERT INTO gocql_test.batchUnsetInsert(id, my_int, my_text) VALUES (?,?,?)", 1, UnsetValue, "")
b.Query("INSERT INTO gocql_test.batchUnsetInsert(id, my_int, my_text) VALUES (?,?,?)", 2, 2, UnsetValue)

if err := b.Exec(); err != nil {
iter := b.Iter()
err := iter.Close()
if err != nil {
t.Fatalf("query failed. %v", err)
} else {
if b.Attempts() < 1 {
if iter.Attempts() < 1 {
t.Fatal("expected at least 1 attempt, but got 0")
}
if b.Latency() <= 0 {
t.Fatalf("expected latency to be greater than 0, but got %v instead.", b.Latency())
if iter.Latency() <= 0 {
t.Fatalf("expected latency to be greater than 0, but got %v instead.", iter.Latency())
}
}
var id, mInt, count int
Expand Down Expand Up @@ -3702,7 +3706,9 @@ func TestQueryCompressionNotWorthIt(t *testing.T) {
// The driver should handle this by updating its prepared statement inside the cache
// when it receives RESULT/ROWS with Metadata_changed flag
func TestPrepareExecuteMetadataChangedFlag(t *testing.T) {
session := createSession(t)
session := createSession(t, func(config *ClusterConfig) {
config.NumConns = 1
})
defer session.Close()

if session.cfg.ProtoVersion < protoVersion5 {
Expand All @@ -3726,13 +3732,17 @@ func TestPrepareExecuteMetadataChangedFlag(t *testing.T) {
t.Fatal(err)
}

// We have to specify conn for all queries to ensure that
// We have to specify host for all queries to ensure that
// all queries are running on the same node
conn := session.getConn()
hosts := session.GetHosts()
if len(hosts) == 0 {
t.Fatal("no hosts found")
}
hostid := hosts[0].HostID()

const selectStmt = "SELECT * FROM gocql_test.metadata_changed"
queryBeforeTableAltering := session.Query(selectStmt)
queryBeforeTableAltering.conn = conn
queryBeforeTableAltering.SetHostID(hostid)
row := make(map[string]interface{})
err = queryBeforeTableAltering.MapScan(row)
if err != nil {
Expand All @@ -3742,13 +3752,16 @@ func TestPrepareExecuteMetadataChangedFlag(t *testing.T) {
require.Len(t, row, 1, "Expected to retrieve a single column")
require.Equal(t, 1, row["id"])

stmtCacheKey := session.stmtsLRU.keyFor(conn.host.HostID(), conn.currentKeyspace, queryBeforeTableAltering.stmt)
inflight, _ := session.stmtsLRU.get(stmtCacheKey)
stmtCacheKey := session.stmtsLRU.keyFor(hostid, "gocql_test", queryBeforeTableAltering.stmt)
inflight, ok := session.stmtsLRU.get(stmtCacheKey)
if !ok {
t.Fatalf("failed to find inflight entry for key %v", stmtCacheKey)
}
preparedStatementBeforeTableAltering := inflight.preparedStatment

// Changing table schema in order to cause C* to return RESULT/ROWS Metadata_changed
alteringTableQuery := session.Query("ALTER TABLE gocql_test.metadata_changed ADD new_col int")
alteringTableQuery.conn = conn
alteringTableQuery.SetHostID(hostid)
err = alteringTableQuery.Exec()
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -3800,7 +3813,7 @@ func TestPrepareExecuteMetadataChangedFlag(t *testing.T) {
// Expecting C* will return RESULT/ROWS Metadata_changed
// and it will be properly handled
queryAfterTableAltering := session.Query(selectStmt)
queryAfterTableAltering.conn = conn
queryAfterTableAltering.SetHostID(hostid)
iter := queryAfterTableAltering.Iter()
handleRows(iter)

Expand All @@ -3825,7 +3838,7 @@ func TestPrepareExecuteMetadataChangedFlag(t *testing.T) {
defer cancel()

queryAfterTableAltering2 := session.Query(selectStmt).WithContext(ctx)
queryAfterTableAltering2.conn = conn
queryAfterTableAltering2.SetHostID(hostid)
iter = queryAfterTableAltering2.Iter()
handleRows(iter)
err = iter.Close()
Expand All @@ -3842,7 +3855,7 @@ func TestPrepareExecuteMetadataChangedFlag(t *testing.T) {
// Executing prepared stmt and expecting that C* won't return
// Metadata_changed because the table is not being changed.
queryAfterTableAltering3 := session.Query(selectStmt).WithContext(ctx)
queryAfterTableAltering3.conn = conn
queryAfterTableAltering3.SetHostID(hostid)
iter = queryAfterTableAltering2.Iter()
handleRows(iter)

Expand Down Expand Up @@ -3946,7 +3959,10 @@ func TestRoutingKeyCacheUsesOverriddenKeyspace(t *testing.T) {
getRoutingKeyInfo := func(key string) *routingKeyInfo {
t.Helper()
session.routingKeyInfoCache.mu.Lock()
value, _ := session.routingKeyInfoCache.lru.Get(key)
value, ok := session.routingKeyInfoCache.lru.Get(key)
if !ok {
t.Fatalf("routing key not found in cache for key %v", key)
}
session.routingKeyInfoCache.mu.Unlock()

inflight := value.(*inflightCachedEntry)
Expand All @@ -3956,20 +3972,22 @@ func TestRoutingKeyCacheUsesOverriddenKeyspace(t *testing.T) {
const insertQuery = "INSERT INTO routing_key_cache_uses_overridden_ks (id) VALUES (?)"

// Running batch in default ks gocql_test
b1 := session.NewBatch(LoggedBatch)
b1 := session.Batch(LoggedBatch)
b1.Query(insertQuery, 1)
_, err = b1.GetRoutingKey()
internalB := newInternalBatch(b1, nil)
_, err = internalB.GetRoutingKey()
require.NoError(t, err)

// Ensuring that the cache contains the query with default ks
routingKeyInfo1 := getRoutingKeyInfo("gocql_test" + b1.Entries[0].Stmt)
require.Equal(t, "gocql_test", routingKeyInfo1.keyspace)

// Running batch in gocql_test_routing_key_cache ks
b2 := session.NewBatch(LoggedBatch)
b2 := session.Batch(LoggedBatch)
b2.SetKeyspace("gocql_test_routing_key_cache")
b2.Query(insertQuery, 2)
_, err = b2.GetRoutingKey()
internalB2 := newInternalBatch(b2, nil)
_, err = internalB2.GetRoutingKey()
require.NoError(t, err)

// Ensuring that the cache contains the query with gocql_test_routing_key_cache ks
Expand All @@ -3980,15 +3998,18 @@ func TestRoutingKeyCacheUsesOverriddenKeyspace(t *testing.T) {

// Running query in default ks gocql_test
q1 := session.Query(selectStmt, 1)
_, err = q1.GetRoutingKey()
iter := q1.Iter()
err = iter.Close()
require.NoError(t, err)
require.Equal(t, "gocql_test", q1.routingInfo.keyspace)
require.Equal(t, "gocql_test", iter.Keyspace())

// Running query in gocql_test_routing_key_cache ks
q2 := session.Query(selectStmt, 1)
_, err = q2.SetKeyspace("gocql_test_routing_key_cache").GetRoutingKey()
q2.SetKeyspace("gocql_test_routing_key_cache")
iter = q2.Iter()
err = iter.Close()
require.NoError(t, err)
require.Equal(t, "gocql_test_routing_key_cache", q2.routingInfo.keyspace)
require.Equal(t, "gocql_test_routing_key_cache", iter.Keyspace())

session.Query("DROP KEYSPACE IF EXISTS gocql_test_routing_key_cache").Exec()
}
Loading