Skip to content

Commit f9a0f63

Browse files
committed
Changes to Query and Batch API
Before this change queries were mutated while being executed (the query metrics and the consistency for example). Instead copy query properties to an internal query object and move query metrics to Iter. This allows users to reuse Query and Batch objects. Query object pooling was also removed. Some query and batch properties were not accessible via ObservedBatch and ObservedQuery. Added the original Batch and Query objects to ObservedBatch and ObservedQuery to fix this. Patch by João Reis; reviewed by James Hartig and Stanislav Bychkov for CASSGO-22 and CASSGO-73
1 parent 1fd2cba commit f9a0f63

16 files changed

+942
-563
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1919
- Support for Native Protocol 5. Following protocol changes exposed new API
2020
Query.SetKeyspace(), Query.WithNowInSeconds(), Batch.SetKeyspace(), Batch.WithNowInSeconds() (CASSGO-1)
2121
- Externally-defined type registration (CASSGO-43)
22+
- Add Query and Batch to ObservedQuery and ObservedBatch (CASSGO-73)
2223

2324
### Changed
2425

@@ -43,6 +44,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
4344
- inet columns default to net.IP when using MapScan or SliceMap (CASSGO-43)
4445
- NativeType removed (CASSGO-43)
4546
- `New` and `NewWithError` removed and replaced with `Zero` (CASSGO-43)
47+
- Changes to Query and Batch to make them safely reusable (CASSGO-22)
4648

4749
### Fixed
4850

batch_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,10 @@
2828
package gocql
2929

3030
import (
31-
"github.com/stretchr/testify/require"
3231
"testing"
3332
"time"
33+
34+
"github.com/stretchr/testify/require"
3435
)
3536

3637
func TestBatch_Errors(t *testing.T) {

cass1batch_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ func TestShouldPrepareFunction(t *testing.T) {
7777
}
7878

7979
for _, test := range shouldPrepareTests {
80-
q := &Query{stmt: test.Stmt, routingInfo: &queryRoutingInfo{}}
80+
q := &Query{stmt: test.Stmt}
8181
if got := q.shouldPrepare(); got != test.Result {
8282
t.Fatalf("%q: got %v, expected %v\n", test.Stmt, got, test.Result)
8383
}

cassandra_test.go

Lines changed: 62 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -1302,8 +1302,8 @@ func Test_RetryPolicyIdempotence(t *testing.T) {
13021302
q.RetryPolicy(&MyRetryPolicy{})
13031303
q.Consistency(All)
13041304

1305-
_ = q.Exec()
1306-
require.Equal(t, tc.expectedNumberOfTries, q.Attempts())
1305+
it := q.Iter()
1306+
require.Equal(t, tc.expectedNumberOfTries, it.Attempts())
13071307
})
13081308
}
13091309
}
@@ -1673,15 +1673,15 @@ func TestPrepare_MissingSchemaPrepare(t *testing.T) {
16731673
defer s.Close()
16741674

16751675
insertQry := s.Query("INSERT INTO invalidschemaprep (val) VALUES (?)", 5)
1676-
if err := conn.executeQuery(ctx, insertQry).err; err == nil {
1676+
if err := conn.executeQuery(ctx, newInternalQuery(insertQry, nil)).err; err == nil {
16771677
t.Fatal("expected error, but got nil.")
16781678
}
16791679

16801680
if err := createTable(s, "CREATE TABLE gocql_test.invalidschemaprep (val int, PRIMARY KEY (val))"); err != nil {
16811681
t.Fatal("create table:", err)
16821682
}
16831683

1684-
if err := conn.executeQuery(ctx, insertQry).err; err != nil {
1684+
if err := conn.executeQuery(ctx, newInternalQuery(insertQry, nil)).err; err != nil {
16851685
t.Fatal(err) // unconfigured columnfamily
16861686
}
16871687
}
@@ -1695,7 +1695,7 @@ func TestPrepare_ReprepareStatement(t *testing.T) {
16951695

16961696
stmt, conn := injectInvalidPreparedStatement(t, session, "test_reprepare_statement")
16971697
query := session.Query(stmt, "bar")
1698-
if err := conn.executeQuery(ctx, query).Close(); err != nil {
1698+
if err := conn.executeQuery(ctx, newInternalQuery(query, nil)).Close(); err != nil {
16991699
t.Fatalf("Failed to execute query for reprepare statement: %v", err)
17001700
}
17011701
}
@@ -1714,7 +1714,7 @@ func TestPrepare_ReprepareBatch(t *testing.T) {
17141714
stmt, conn := injectInvalidPreparedStatement(t, session, "test_reprepare_statement_batch")
17151715
batch := session.Batch(UnloggedBatch)
17161716
batch.Query(stmt, "bar")
1717-
if err := conn.executeBatch(ctx, batch).Close(); err != nil {
1717+
if err := conn.executeBatch(ctx, newInternalBatch(batch, nil)).Close(); err != nil {
17181718
t.Fatalf("Failed to execute query for reprepare statement: %v", err)
17191719
}
17201720
}
@@ -2059,14 +2059,16 @@ func TestQueryStats(t *testing.T) {
20592059
session := createSession(t)
20602060
defer session.Close()
20612061
qry := session.Query("SELECT * FROM system.peers")
2062-
if err := qry.Exec(); err != nil {
2062+
iter := qry.Iter()
2063+
err := iter.Close()
2064+
if err != nil {
20632065
t.Fatalf("query failed. %v", err)
20642066
} else {
2065-
if qry.Attempts() < 1 {
2067+
if iter.Attempts() < 1 {
20662068
t.Fatal("expected at least 1 attempt, but got 0")
20672069
}
2068-
if qry.Latency() <= 0 {
2069-
t.Fatalf("expected latency to be greater than 0, but got %v instead.", qry.Latency())
2070+
if iter.Latency() <= 0 {
2071+
t.Fatalf("expected latency to be greater than 0, but got %v instead.", iter.Latency())
20702072
}
20712073
}
20722074
}
@@ -2099,15 +2101,16 @@ func TestBatchStats(t *testing.T) {
20992101
b := session.Batch(LoggedBatch)
21002102
b.Query("INSERT INTO batchStats (id) VALUES (?)", 1)
21012103
b.Query("INSERT INTO batchStats (id) VALUES (?)", 2)
2102-
2103-
if err := b.Exec(); err != nil {
2104+
iter := b.Iter()
2105+
err := iter.Close()
2106+
if err != nil {
21042107
t.Fatalf("query failed. %v", err)
21052108
} else {
2106-
if b.Attempts() < 1 {
2109+
if iter.Attempts() < 1 {
21072110
t.Fatal("expected at least 1 attempt, but got 0")
21082111
}
2109-
if b.Latency() <= 0 {
2110-
t.Fatalf("expected latency to be greater than 0, but got %v instead.", b.Latency())
2112+
if iter.Latency() <= 0 {
2113+
t.Fatalf("expected latency to be greater than 0, but got %v instead.", iter.Latency())
21112114
}
21122115
}
21132116
}
@@ -2850,7 +2853,7 @@ func TestRoutingKey(t *testing.T) {
28502853
t.Errorf("Expected cache size to be 1 but was %d", cacheSize)
28512854
}
28522855

2853-
query := session.Query("SELECT * FROM test_single_routing_key WHERE second_id=? AND first_id=?", 1, 2)
2856+
query := newInternalQuery(session.Query("SELECT * FROM test_single_routing_key WHERE second_id=? AND first_id=?", 1, 2), nil)
28542857
routingKey, err := query.GetRoutingKey()
28552858
if err != nil {
28562859
t.Fatalf("Failed to get routing key due to error: %v", err)
@@ -2892,7 +2895,7 @@ func TestRoutingKey(t *testing.T) {
28922895
t.Fatalf("Expected routing key types[0] to be %v but was %v", TypeInt, routingKeyInfo.types[1].Type())
28932896
}
28942897

2895-
query = session.Query("SELECT * FROM test_composite_routing_key WHERE second_id=? AND first_id=?", 1, 2)
2898+
query = newInternalQuery(session.Query("SELECT * FROM test_composite_routing_key WHERE second_id=? AND first_id=?", 1, 2), nil)
28962899
routingKey, err = query.GetRoutingKey()
28972900
if err != nil {
28982901
t.Fatalf("Failed to get routing key due to error: %v", err)
@@ -3441,15 +3444,16 @@ func TestUnsetColBatch(t *testing.T) {
34413444
b.Query("INSERT INTO gocql_test.batchUnsetInsert(id, my_int, my_text) VALUES (?,?,?)", 1, 1, UnsetValue)
34423445
b.Query("INSERT INTO gocql_test.batchUnsetInsert(id, my_int, my_text) VALUES (?,?,?)", 1, UnsetValue, "")
34433446
b.Query("INSERT INTO gocql_test.batchUnsetInsert(id, my_int, my_text) VALUES (?,?,?)", 2, 2, UnsetValue)
3444-
3445-
if err := b.Exec(); err != nil {
3447+
iter := b.Iter()
3448+
err := iter.Close()
3449+
if err != nil {
34463450
t.Fatalf("query failed. %v", err)
34473451
} else {
3448-
if b.Attempts() < 1 {
3452+
if iter.Attempts() < 1 {
34493453
t.Fatal("expected at least 1 attempt, but got 0")
34503454
}
3451-
if b.Latency() <= 0 {
3452-
t.Fatalf("expected latency to be greater than 0, but got %v instead.", b.Latency())
3455+
if iter.Latency() <= 0 {
3456+
t.Fatalf("expected latency to be greater than 0, but got %v instead.", iter.Latency())
34533457
}
34543458
}
34553459
var id, mInt, count int
@@ -3702,7 +3706,9 @@ func TestQueryCompressionNotWorthIt(t *testing.T) {
37023706
// The driver should handle this by updating its prepared statement inside the cache
37033707
// when it receives RESULT/ROWS with Metadata_changed flag
37043708
func TestPrepareExecuteMetadataChangedFlag(t *testing.T) {
3705-
session := createSession(t)
3709+
session := createSession(t, func(config *ClusterConfig) {
3710+
config.NumConns = 1
3711+
})
37063712
defer session.Close()
37073713

37083714
if session.cfg.ProtoVersion < protoVersion5 {
@@ -3726,13 +3732,17 @@ func TestPrepareExecuteMetadataChangedFlag(t *testing.T) {
37263732
t.Fatal(err)
37273733
}
37283734

3729-
// We have to specify conn for all queries to ensure that
3735+
// We have to specify host for all queries to ensure that
37303736
// all queries are running on the same node
3731-
conn := session.getConn()
3737+
hosts := session.GetHosts()
3738+
if len(hosts) == 0 {
3739+
t.Fatal("no hosts found")
3740+
}
3741+
hostid := hosts[0].HostID()
37323742

37333743
const selectStmt = "SELECT * FROM gocql_test.metadata_changed"
37343744
queryBeforeTableAltering := session.Query(selectStmt)
3735-
queryBeforeTableAltering.conn = conn
3745+
queryBeforeTableAltering.SetHostID(hostid)
37363746
row := make(map[string]interface{})
37373747
err = queryBeforeTableAltering.MapScan(row)
37383748
if err != nil {
@@ -3742,13 +3752,16 @@ func TestPrepareExecuteMetadataChangedFlag(t *testing.T) {
37423752
require.Len(t, row, 1, "Expected to retrieve a single column")
37433753
require.Equal(t, 1, row["id"])
37443754

3745-
stmtCacheKey := session.stmtsLRU.keyFor(conn.host.HostID(), conn.currentKeyspace, queryBeforeTableAltering.stmt)
3746-
inflight, _ := session.stmtsLRU.get(stmtCacheKey)
3755+
stmtCacheKey := session.stmtsLRU.keyFor(hostid, "gocql_test", queryBeforeTableAltering.stmt)
3756+
inflight, ok := session.stmtsLRU.get(stmtCacheKey)
3757+
if !ok {
3758+
t.Fatalf("failed to find inflight entry for key %v", stmtCacheKey)
3759+
}
37473760
preparedStatementBeforeTableAltering := inflight.preparedStatment
37483761

37493762
// Changing table schema in order to cause C* to return RESULT/ROWS Metadata_changed
37503763
alteringTableQuery := session.Query("ALTER TABLE gocql_test.metadata_changed ADD new_col int")
3751-
alteringTableQuery.conn = conn
3764+
alteringTableQuery.SetHostID(hostid)
37523765
err = alteringTableQuery.Exec()
37533766
if err != nil {
37543767
t.Fatal(err)
@@ -3800,7 +3813,7 @@ func TestPrepareExecuteMetadataChangedFlag(t *testing.T) {
38003813
// Expecting C* will return RESULT/ROWS Metadata_changed
38013814
// and it will be properly handled
38023815
queryAfterTableAltering := session.Query(selectStmt)
3803-
queryAfterTableAltering.conn = conn
3816+
queryAfterTableAltering.SetHostID(hostid)
38043817
iter := queryAfterTableAltering.Iter()
38053818
handleRows(iter)
38063819

@@ -3825,7 +3838,7 @@ func TestPrepareExecuteMetadataChangedFlag(t *testing.T) {
38253838
defer cancel()
38263839

38273840
queryAfterTableAltering2 := session.Query(selectStmt).WithContext(ctx)
3828-
queryAfterTableAltering2.conn = conn
3841+
queryAfterTableAltering2.SetHostID(hostid)
38293842
iter = queryAfterTableAltering2.Iter()
38303843
handleRows(iter)
38313844
err = iter.Close()
@@ -3842,7 +3855,7 @@ func TestPrepareExecuteMetadataChangedFlag(t *testing.T) {
38423855
// Executing prepared stmt and expecting that C* won't return
38433856
// Metadata_changed because the table is not being changed.
38443857
queryAfterTableAltering3 := session.Query(selectStmt).WithContext(ctx)
3845-
queryAfterTableAltering3.conn = conn
3858+
queryAfterTableAltering3.SetHostID(hostid)
38463859
iter = queryAfterTableAltering2.Iter()
38473860
handleRows(iter)
38483861

@@ -3946,7 +3959,10 @@ func TestRoutingKeyCacheUsesOverriddenKeyspace(t *testing.T) {
39463959
getRoutingKeyInfo := func(key string) *routingKeyInfo {
39473960
t.Helper()
39483961
session.routingKeyInfoCache.mu.Lock()
3949-
value, _ := session.routingKeyInfoCache.lru.Get(key)
3962+
value, ok := session.routingKeyInfoCache.lru.Get(key)
3963+
if !ok {
3964+
t.Fatalf("routing key not found in cache for key %v", key)
3965+
}
39503966
session.routingKeyInfoCache.mu.Unlock()
39513967

39523968
inflight := value.(*inflightCachedEntry)
@@ -3956,20 +3972,22 @@ func TestRoutingKeyCacheUsesOverriddenKeyspace(t *testing.T) {
39563972
const insertQuery = "INSERT INTO routing_key_cache_uses_overridden_ks (id) VALUES (?)"
39573973

39583974
// Running batch in default ks gocql_test
3959-
b1 := session.NewBatch(LoggedBatch)
3975+
b1 := session.Batch(LoggedBatch)
39603976
b1.Query(insertQuery, 1)
3961-
_, err = b1.GetRoutingKey()
3977+
internalB := newInternalBatch(b1, nil)
3978+
_, err = internalB.GetRoutingKey()
39623979
require.NoError(t, err)
39633980

39643981
// Ensuring that the cache contains the query with default ks
39653982
routingKeyInfo1 := getRoutingKeyInfo("gocql_test" + b1.Entries[0].Stmt)
39663983
require.Equal(t, "gocql_test", routingKeyInfo1.keyspace)
39673984

39683985
// Running batch in gocql_test_routing_key_cache ks
3969-
b2 := session.NewBatch(LoggedBatch)
3986+
b2 := session.Batch(LoggedBatch)
39703987
b2.SetKeyspace("gocql_test_routing_key_cache")
39713988
b2.Query(insertQuery, 2)
3972-
_, err = b2.GetRoutingKey()
3989+
internalB2 := newInternalBatch(b2, nil)
3990+
_, err = internalB2.GetRoutingKey()
39733991
require.NoError(t, err)
39743992

39753993
// Ensuring that the cache contains the query with gocql_test_routing_key_cache ks
@@ -3980,15 +3998,18 @@ func TestRoutingKeyCacheUsesOverriddenKeyspace(t *testing.T) {
39803998

39813999
// Running query in default ks gocql_test
39824000
q1 := session.Query(selectStmt, 1)
3983-
_, err = q1.GetRoutingKey()
4001+
iter := q1.Iter()
4002+
err = iter.Close()
39844003
require.NoError(t, err)
3985-
require.Equal(t, "gocql_test", q1.routingInfo.keyspace)
4004+
require.Equal(t, "gocql_test", iter.Keyspace())
39864005

39874006
// Running query in gocql_test_routing_key_cache ks
39884007
q2 := session.Query(selectStmt, 1)
3989-
_, err = q2.SetKeyspace("gocql_test_routing_key_cache").GetRoutingKey()
4008+
q2.SetKeyspace("gocql_test_routing_key_cache")
4009+
iter = q2.Iter()
4010+
err = iter.Close()
39904011
require.NoError(t, err)
3991-
require.Equal(t, "gocql_test_routing_key_cache", q2.routingInfo.keyspace)
4012+
require.Equal(t, "gocql_test_routing_key_cache", iter.Keyspace())
39924013

39934014
session.Query("DROP KEYSPACE IF EXISTS gocql_test_routing_key_cache").Exec()
39944015
}

0 commit comments

Comments
 (0)