Skip to content

Commit 9519c1e

Browse files
committed
Merge branch 'trunk' into cassgo-22-prototype
# Conflicts: # cassandra_test.go # session.go
2 parents 34a1890 + b251492 commit 9519c1e

19 files changed

+984
-200
lines changed

.github/workflows/main.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ on:
88
types: [ opened, synchronize, reopened ]
99

1010
env:
11-
CCM_VERSION: "6e71061146f7ae67b84ccd2b1d90d7319b640e4c"
11+
CCM_VERSION: "4621dfee5ad73956b831091a8b863d100d25c610"
1212

1313
jobs:
1414
build:
@@ -35,7 +35,7 @@ jobs:
3535
fail-fast: false
3636
matrix:
3737
go: [ '1.22', '1.23' ]
38-
cassandra_version: [ '4.0.13', '4.1.6' ]
38+
cassandra_version: [ '4.1.6', '5.0.3' ]
3939
auth: [ "false" ]
4040
compressor: [ "no-compression", "snappy", "lz4" ]
4141
tags: [ "cassandra", "integration", "ccm" ]

CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
99

1010
### Added
1111

12+
- Support vector type [CASSGO-11](https://issues.apache.org/jira/browse/CASSGO-11)
13+
1214
- Allow SERIAL and LOCAL_SERIAL on SELECT statements [CASSGO-26](https://issues.apache.org/jira/browse/CASSGO-26)
1315

1416
- Support of sending queries to the specific node with Query.SetHostID() (CASSGO-4)
@@ -50,6 +52,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
5052

5153
- Refactoring hostpool package test and Expose HostInfo creation (CASSGO-59)
5254

55+
- Move "execute batch" methods to Batch type (CASSGO-57)
56+
57+
- Make `Session` immutable by removing setters and associated mutex (CASSGO-23)
58+
5359
### Fixed
5460
- Cassandra version unmarshal fix (CASSGO-49)
5561

cassandra_test.go

Lines changed: 48 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -152,11 +152,10 @@ func TestTracing(t *testing.T) {
152152
}
153153

154154
// also works from session tracer
155-
session.SetTrace(trace)
156155
trace.mu.Lock()
157156
buf.Reset()
158157
trace.mu.Unlock()
159-
if err := session.Query(`SELECT id FROM trace WHERE id = ?`, 42).Scan(&value); err != nil {
158+
if err := session.Query(`SELECT id FROM trace WHERE id = ?`, 42).Trace(trace).Scan(&value); err != nil {
160159
t.Fatal("select:", err)
161160
}
162161
if buf.Len() == 0 {
@@ -457,7 +456,7 @@ func TestCAS(t *testing.T) {
457456

458457
successBatch := session.Batch(LoggedBatch)
459458
successBatch.Query("INSERT INTO cas_table (title, revid, last_modified) VALUES (?, ?, ?) IF NOT EXISTS", title, revid, modified)
460-
if applied, _, err := session.ExecuteBatchCAS(successBatch, &titleCAS, &revidCAS, &modifiedCAS); err != nil {
459+
if applied, _, err := successBatch.ExecCAS(&titleCAS, &revidCAS, &modifiedCAS); err != nil {
461460
t.Fatal("insert:", err)
462461
} else if !applied {
463462
t.Fatalf("insert should have been applied: title=%v revID=%v modified=%v", titleCAS, revidCAS, modifiedCAS)
@@ -466,31 +465,31 @@ func TestCAS(t *testing.T) {
466465
successBatch = session.Batch(LoggedBatch)
467466
successBatch.Query("INSERT INTO cas_table (title, revid, last_modified) VALUES (?, ?, ?) IF NOT EXISTS", title+"_foo", revid, modified)
468467
casMap := make(map[string]interface{})
469-
if applied, _, err := session.MapExecuteBatchCAS(successBatch, casMap); err != nil {
468+
if applied, _, err := successBatch.MapExecCAS(casMap); err != nil {
470469
t.Fatal("insert:", err)
471470
} else if !applied {
472471
t.Fatal("insert should have been applied")
473472
}
474473

475474
failBatch := session.Batch(LoggedBatch)
476475
failBatch.Query("INSERT INTO cas_table (title, revid, last_modified) VALUES (?, ?, ?) IF NOT EXISTS", title, revid, modified)
477-
if applied, _, err := session.ExecuteBatchCAS(successBatch, &titleCAS, &revidCAS, &modifiedCAS); err != nil {
476+
if applied, _, err := successBatch.ExecCAS(&titleCAS, &revidCAS, &modifiedCAS); err != nil {
478477
t.Fatal("insert:", err)
479478
} else if applied {
480479
t.Fatalf("insert should have not been applied: title=%v revID=%v modified=%v", titleCAS, revidCAS, modifiedCAS)
481480
}
482481

483482
insertBatch := session.Batch(LoggedBatch)
484-
insertBatch.Query("INSERT INTO cas_table (title, revid, last_modified) VALUES ('_foo', 2c3af400-73a4-11e5-9381-29463d90c3f0, DATEOF(NOW()))")
485-
insertBatch.Query("INSERT INTO cas_table (title, revid, last_modified) VALUES ('_foo', 3e4ad2f1-73a4-11e5-9381-29463d90c3f0, DATEOF(NOW()))")
486-
if err := session.ExecuteBatch(insertBatch); err != nil {
483+
insertBatch.Query("INSERT INTO cas_table (title, revid, last_modified) VALUES ('_foo', 2c3af400-73a4-11e5-9381-29463d90c3f0, TOTIMESTAMP(NOW()))")
484+
insertBatch.Query("INSERT INTO cas_table (title, revid, last_modified) VALUES ('_foo', 3e4ad2f1-73a4-11e5-9381-29463d90c3f0, TOTIMESTAMP(NOW()))")
485+
if err := insertBatch.Exec(); err != nil {
487486
t.Fatal("insert:", err)
488487
}
489488

490489
failBatch = session.Batch(LoggedBatch)
491-
failBatch.Query("UPDATE cas_table SET last_modified = DATEOF(NOW()) WHERE title='_foo' AND revid=2c3af400-73a4-11e5-9381-29463d90c3f0 IF last_modified=DATEOF(NOW());")
492-
failBatch.Query("UPDATE cas_table SET last_modified = DATEOF(NOW()) WHERE title='_foo' AND revid=3e4ad2f1-73a4-11e5-9381-29463d90c3f0 IF last_modified=DATEOF(NOW());")
493-
if applied, iter, err := session.ExecuteBatchCAS(failBatch, &titleCAS, &revidCAS, &modifiedCAS); err != nil {
490+
failBatch.Query("UPDATE cas_table SET last_modified = TOTIMESTAMP(NOW()) WHERE title='_foo' AND revid=2c3af400-73a4-11e5-9381-29463d90c3f0 IF last_modified=TOTIMESTAMP(NOW());")
491+
failBatch.Query("UPDATE cas_table SET last_modified = TOTIMESTAMP(NOW()) WHERE title='_foo' AND revid=3e4ad2f1-73a4-11e5-9381-29463d90c3f0 IF last_modified=TOTIMESTAMP(NOW());")
492+
if applied, iter, err := failBatch.ExecCAS(&titleCAS, &revidCAS, &modifiedCAS); err != nil {
494493
t.Fatal("insert:", err)
495494
} else if applied {
496495
t.Fatalf("insert should have not been applied: title=%v revID=%v modified=%v", titleCAS, revidCAS, modifiedCAS)
@@ -521,36 +520,36 @@ func TestCAS(t *testing.T) {
521520
notCASBatch := session.Batch(LoggedBatch)
522521
notCASBatch.Query("INSERT INTO cas_table (title, revid, last_modified) VALUES (?, ?, ?)", title+"_baz", revid, modified)
523522
casMap = make(map[string]interface{})
524-
if _, _, err := session.MapExecuteBatchCAS(notCASBatch, casMap); err != ErrNotFound {
523+
if _, _, err := notCASBatch.MapExecCAS(casMap); err != ErrNotFound {
525524
t.Fatal("insert should have returned not found:", err)
526525
}
527526

528527
notCASBatch = session.Batch(LoggedBatch)
529528
notCASBatch.Query("INSERT INTO cas_table (title, revid, last_modified) VALUES (?, ?, ?)", title+"_baz", revid, modified)
530529
casMap = make(map[string]interface{})
531-
if _, _, err := session.ExecuteBatchCAS(notCASBatch, &revidCAS); err != ErrNotFound {
530+
if _, _, err := notCASBatch.ExecCAS(&revidCAS); err != ErrNotFound {
532531
t.Fatal("insert should have returned not found:", err)
533532
}
534533

535534
failBatch = session.Batch(LoggedBatch)
536-
failBatch.Query("UPDATE cas_table SET last_modified = DATEOF(NOW()) WHERE title='_foo' AND revid=3e4ad2f1-73a4-11e5-9381-29463d90c3f0 IF last_modified = ?", modified)
537-
if _, _, err := session.ExecuteBatchCAS(failBatch, new(bool)); err == nil {
535+
failBatch.Query("UPDATE cas_table SET last_modified = TOTIMESTAMP(NOW()) WHERE title='_foo' AND revid=3e4ad2f1-73a4-11e5-9381-29463d90c3f0 IF last_modified = ?", modified)
536+
if _, _, err := failBatch.ExecCAS(new(bool)); err == nil {
538537
t.Fatal("update should have errored")
539538
}
540539
// make sure MapScanCAS does not panic when MapScan fails
541540
casMap = make(map[string]interface{})
542541
casMap["last_modified"] = false
543-
if _, err := session.Query(`UPDATE cas_table SET last_modified = DATEOF(NOW()) WHERE title='_foo' AND revid=3e4ad2f1-73a4-11e5-9381-29463d90c3f0 IF last_modified = ?`,
542+
if _, err := session.Query(`UPDATE cas_table SET last_modified = TOTIMESTAMP(NOW()) WHERE title='_foo' AND revid=3e4ad2f1-73a4-11e5-9381-29463d90c3f0 IF last_modified = ?`,
544543
modified).MapScanCAS(casMap); err == nil {
545544
t.Fatal("update should hvae errored", err)
546545
}
547546

548547
// make sure MapExecuteBatchCAS does not panic when MapScan fails
549548
failBatch = session.Batch(LoggedBatch)
550-
failBatch.Query("UPDATE cas_table SET last_modified = DATEOF(NOW()) WHERE title='_foo' AND revid=3e4ad2f1-73a4-11e5-9381-29463d90c3f0 IF last_modified = ?", modified)
549+
failBatch.Query("UPDATE cas_table SET last_modified = TOTIMESTAMP(NOW()) WHERE title='_foo' AND revid=3e4ad2f1-73a4-11e5-9381-29463d90c3f0 IF last_modified = ?", modified)
551550
casMap = make(map[string]interface{})
552551
casMap["last_modified"] = false
553-
if _, _, err := session.MapExecuteBatchCAS(failBatch, casMap); err == nil {
552+
if _, _, err := failBatch.MapExecCAS(casMap); err == nil {
554553
t.Fatal("update should have errored")
555554
}
556555
}
@@ -752,7 +751,7 @@ func TestBatch(t *testing.T) {
752751
batch.Query(`INSERT INTO batch_table (id) VALUES (?)`, i)
753752
}
754753

755-
if err := session.ExecuteBatch(batch); err != nil {
754+
if err := batch.Exec(); err != nil {
756755
t.Fatal("execute batch:", err)
757756
}
758757

@@ -788,7 +787,7 @@ func TestUnpreparedBatch(t *testing.T) {
788787
batch.Query(`UPDATE batch_unprepared SET c = c + 1 WHERE id = 1`)
789788
}
790789

791-
if err := session.ExecuteBatch(batch); err != nil {
790+
if err := batch.Exec(); err != nil {
792791
t.Fatal("execute batch:", err)
793792
}
794793

@@ -824,8 +823,8 @@ func TestBatchLimit(t *testing.T) {
824823
for i := 0; i < 65537; i++ {
825824
batch.Query(`INSERT INTO batch_table2 (id) VALUES (?)`, i)
826825
}
827-
if err := session.ExecuteBatch(batch); err != ErrTooManyStmts {
828-
t.Fatal("gocql attempted to execute a batch larger than the support limit of statements.")
826+
if err := batch.Exec(); err != ErrTooManyStmts {
827+
t.Fatalf("gocql attempted to execute a batch larger than the support limit of statements: expected %v, got %v", ErrTooManyStmts, err)
829828
}
830829

831830
}
@@ -876,7 +875,7 @@ func TestTooManyQueryArgs(t *testing.T) {
876875

877876
batch := session.Batch(UnloggedBatch)
878877
batch.Query("INSERT INTO too_many_query_args (id, value) VALUES (?, ?)", 1, 2, 3)
879-
err = session.ExecuteBatch(batch)
878+
err = batch.Exec()
880879

881880
if err == nil {
882881
t.Fatal("'`INSERT INTO too_many_query_args (id, value) VALUES (?, ?)`, 1, 2, 3' should return an error")
@@ -908,7 +907,7 @@ func TestNotEnoughQueryArgs(t *testing.T) {
908907

909908
batch := session.Batch(UnloggedBatch)
910909
batch.Query("INSERT INTO not_enough_query_args (id, cluster, value) VALUES (?, ?, ?)", 1, 2)
911-
err = session.ExecuteBatch(batch)
910+
err = batch.Exec()
912911

913912
if err == nil {
914913
t.Fatal("'`INSERT INTO not_enough_query_args (id, cluster, value) VALUES (?, ?, ?)`, 1, 2' should return an error")
@@ -1534,7 +1533,7 @@ func TestBatchQueryInfo(t *testing.T) {
15341533
batch := session.Batch(LoggedBatch)
15351534
batch.Bind("INSERT INTO batch_query_info (id, cluster, value) VALUES (?, ?,?)", write)
15361535

1537-
if err := session.ExecuteBatch(batch); err != nil {
1536+
if err := batch.Exec(); err != nil {
15381537
t.Fatalf("batch insert into batch_query_info failed, err '%v'", err)
15391538
}
15401539

@@ -2107,7 +2106,7 @@ func TestBatchObserve(t *testing.T) {
21072106
batch.Query(fmt.Sprintf(`INSERT INTO batch_observe_table (id,other) VALUES (?,%d)`, i), i)
21082107
}
21092108

2110-
if err := session.ExecuteBatch(batch); err != nil {
2109+
if err := batch.Exec(); err != nil {
21112110
t.Fatal("execute batch:", err)
21122111
}
21132112
if observedBatch == nil {
@@ -2519,18 +2518,19 @@ func TestAggregateMetadata(t *testing.T) {
25192518
t.Fatal("expected two aggregates")
25202519
}
25212520

2521+
protoVer := byte(session.cfg.ProtoVersion)
25222522
expectedAggregrate := AggregateMetadata{
25232523
Keyspace: "gocql_test",
25242524
Name: "average",
2525-
ArgumentTypes: []TypeInfo{NativeType{typ: TypeInt}},
2525+
ArgumentTypes: []TypeInfo{NativeType{typ: TypeInt, proto: protoVer}},
25262526
InitCond: "(0, 0)",
2527-
ReturnType: NativeType{typ: TypeDouble},
2527+
ReturnType: NativeType{typ: TypeDouble, proto: protoVer},
25282528
StateType: TupleTypeInfo{
2529-
NativeType: NativeType{typ: TypeTuple},
2529+
NativeType: NativeType{typ: TypeTuple, proto: protoVer},
25302530

25312531
Elems: []TypeInfo{
2532-
NativeType{typ: TypeInt},
2533-
NativeType{typ: TypeBigInt},
2532+
NativeType{typ: TypeInt, proto: protoVer},
2533+
NativeType{typ: TypeBigInt, proto: protoVer},
25342534
},
25352535
},
25362536
stateFunc: "avgstate",
@@ -2569,28 +2569,29 @@ func TestFunctionMetadata(t *testing.T) {
25692569
avgState := functions[1]
25702570
avgFinal := functions[0]
25712571

2572+
protoVer := byte(session.cfg.ProtoVersion)
25722573
avgStateBody := "if (val !=null) {state.setInt(0, state.getInt(0)+1); state.setLong(1, state.getLong(1)+val.intValue());}return state;"
25732574
expectedAvgState := FunctionMetadata{
25742575
Keyspace: "gocql_test",
25752576
Name: "avgstate",
25762577
ArgumentTypes: []TypeInfo{
25772578
TupleTypeInfo{
2578-
NativeType: NativeType{typ: TypeTuple},
2579+
NativeType: NativeType{typ: TypeTuple, proto: protoVer},
25792580

25802581
Elems: []TypeInfo{
2581-
NativeType{typ: TypeInt},
2582-
NativeType{typ: TypeBigInt},
2582+
NativeType{typ: TypeInt, proto: protoVer},
2583+
NativeType{typ: TypeBigInt, proto: protoVer},
25832584
},
25842585
},
2585-
NativeType{typ: TypeInt},
2586+
NativeType{typ: TypeInt, proto: protoVer},
25862587
},
25872588
ArgumentNames: []string{"state", "val"},
25882589
ReturnType: TupleTypeInfo{
2589-
NativeType: NativeType{typ: TypeTuple},
2590+
NativeType: NativeType{typ: TypeTuple, proto: protoVer},
25902591

25912592
Elems: []TypeInfo{
2592-
NativeType{typ: TypeInt},
2593-
NativeType{typ: TypeBigInt},
2593+
NativeType{typ: TypeInt, proto: protoVer},
2594+
NativeType{typ: TypeBigInt, proto: protoVer},
25942595
},
25952596
},
25962597
CalledOnNullInput: true,
@@ -2607,16 +2608,16 @@ func TestFunctionMetadata(t *testing.T) {
26072608
Name: "avgfinal",
26082609
ArgumentTypes: []TypeInfo{
26092610
TupleTypeInfo{
2610-
NativeType: NativeType{typ: TypeTuple},
2611+
NativeType: NativeType{typ: TypeTuple, proto: protoVer},
26112612

26122613
Elems: []TypeInfo{
2613-
NativeType{typ: TypeInt},
2614-
NativeType{typ: TypeBigInt},
2614+
NativeType{typ: TypeInt, proto: protoVer},
2615+
NativeType{typ: TypeBigInt, proto: protoVer},
26152616
},
26162617
},
26172618
},
26182619
ArgumentNames: []string{"state"},
2619-
ReturnType: NativeType{typ: TypeDouble},
2620+
ReturnType: NativeType{typ: TypeDouble, proto: protoVer},
26202621
CalledOnNullInput: true,
26212622
Language: "java",
26222623
Body: finalStateBody,
@@ -2720,15 +2721,16 @@ func TestKeyspaceMetadata(t *testing.T) {
27202721
if flagCassVersion.Before(3, 0, 0) {
27212722
textType = TypeVarchar
27222723
}
2724+
protoVer := byte(session.cfg.ProtoVersion)
27232725
expectedType := UserTypeMetadata{
27242726
Keyspace: "gocql_test",
27252727
Name: "basicview",
27262728
FieldNames: []string{"birthday", "nationality", "weight", "height"},
27272729
FieldTypes: []TypeInfo{
2728-
NativeType{typ: TypeTimestamp},
2729-
NativeType{typ: textType},
2730-
NativeType{typ: textType},
2731-
NativeType{typ: textType},
2730+
NativeType{typ: TypeTimestamp, proto: protoVer},
2731+
NativeType{typ: textType, proto: protoVer},
2732+
NativeType{typ: textType, proto: protoVer},
2733+
NativeType{typ: textType, proto: protoVer},
27322734
},
27332735
}
27342736
if !reflect.DeepEqual(*keyspaceMetadata.UserTypes["basicview"], expectedType) {

cluster.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -263,6 +263,17 @@ type ClusterConfig struct {
263263
// If not specified, defaults to the gocql.defaultLogger.
264264
Logger StdLogger
265265

266+
// Tracer will be used for all queries. Alternatively it can be set of on a
267+
// per query basis.
268+
// default: nil
269+
Tracer Tracer
270+
271+
// NextPagePrefetch sets the default threshold for pre-fetching new pages. If
272+
// there are only p*pageSize rows remaining, the next page will be requested
273+
// automatically. This value can also be changed on a per-query basis.
274+
// default: 0.25.
275+
NextPagePrefetch float64
276+
266277
// internal config for testing
267278
disableControlConn bool
268279
}
@@ -298,6 +309,7 @@ func NewCluster(hosts ...string) *ClusterConfig {
298309
ConvictionPolicy: &SimpleConvictionPolicy{},
299310
ReconnectionPolicy: &ConstantReconnectionPolicy{MaxRetries: 3, Interval: 1 * time.Second},
300311
WriteCoalesceWaitTime: 200 * time.Microsecond,
312+
NextPagePrefetch: 0.25,
301313
}
302314
return cfg
303315
}

common_test.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
"flag"
2929
"fmt"
3030
"log"
31+
"math/rand"
3132
"net"
3233
"reflect"
3334
"strings"
@@ -54,6 +55,10 @@ var (
5455
flagCassVersion cassVersion
5556
)
5657

58+
var seededRand *rand.Rand = rand.New(rand.NewSource(time.Now().UnixNano()))
59+
60+
const randCharset = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
61+
5762
func init() {
5863
flag.Var(&flagCassVersion, "gocql.cversion", "the cassandra version being tested against")
5964

@@ -281,6 +286,14 @@ func assertTrue(t *testing.T, description string, value bool) {
281286
}
282287
}
283288

289+
func randomText(size int) string {
290+
result := make([]byte, size)
291+
for i := range result {
292+
result[i] = randCharset[rand.Intn(len(randCharset))]
293+
}
294+
return string(result)
295+
}
296+
284297
func assertEqual(t *testing.T, description string, expected, actual interface{}) {
285298
t.Helper()
286299
if expected != actual {

0 commit comments

Comments
 (0)