Skip to content

Commit bb416bb

Browse files
committed
Support for Native Protocol 5 new frames format
1 parent 6f2487e commit bb416bb

File tree

11 files changed

+568
-25
lines changed

11 files changed

+568
-25
lines changed

.github/workflows/main.yml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ jobs:
3737
go: [ '1.19', '1.20' ]
3838
cassandra_version: [ '4.0.8', '4.1.1' ]
3939
auth: [ "false" ]
40-
compressor: [ "snappy" ]
40+
compressor: [ "lz4" ]
4141
tags: [ "cassandra", "integration", "ccm" ]
4242
steps:
4343
- uses: actions/checkout@v2
@@ -101,7 +101,7 @@ jobs:
101101
ccm status
102102
ccm node1 nodetool status
103103
104-
args="-gocql.timeout=60s -runssl -proto=4 -rf=3 -clusterSize=3 -autowait=2000ms -compressor=${{ matrix.compressor }} -gocql.cversion=$VERSION -cluster=$(ccm liveset) ./..."
104+
args="-gocql.timeout=60s -runssl -proto=5 -rf=3 -clusterSize=3 -autowait=2000ms -compressor=${{ matrix.compressor }} -gocql.cversion=$VERSION -cluster=$(ccm liveset) ./..."
105105
106106
echo "args=$args" >> $GITHUB_ENV
107107
echo "JVM_EXTRA_OPTS=$JVM_EXTRA_OPTS" >> $GITHUB_ENV
@@ -127,7 +127,7 @@ jobs:
127127
matrix:
128128
go: [ '1.19', '1.20' ]
129129
cassandra_version: [ '4.0.8' ]
130-
compressor: [ "snappy" ]
130+
compressor: [ "lz4" ]
131131
tags: [ "integration" ]
132132

133133
steps:
@@ -190,7 +190,7 @@ jobs:
190190
ccm status
191191
ccm node1 nodetool status
192192
193-
args="-gocql.timeout=60s -runssl -proto=4 -rf=3 -clusterSize=1 -autowait=2000ms -compressor=${{ matrix.compressor }} -gocql.cversion=$VERSION -cluster=$(ccm liveset) ./..."
193+
args="-gocql.timeout=60s -runssl -proto=5 -rf=3 -clusterSize=1 -autowait=2000ms -compressor=${{ matrix.compressor }} -gocql.cversion=$VERSION -cluster=$(ccm liveset) ./..."
194194
195195
echo "args=$args" >> $GITHUB_ENV
196196
echo "JVM_EXTRA_OPTS=$JVM_EXTRA_OPTS" >> $GITHUB_ENV

cassandra_test.go

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import (
3535
"io"
3636
"math"
3737
"math/big"
38+
"math/rand"
3839
"net"
3940
"reflect"
4041
"strconv"
@@ -3288,3 +3289,40 @@ func TestQuery_NamedValues(t *testing.T) {
32883289
t.Fatal(err)
32893290
}
32903291
}
3292+
3293+
func TestLargeSizeQuery(t *testing.T) {
3294+
session := createSession(t)
3295+
defer session.Close()
3296+
3297+
if err := createTable(session, "CREATE TABLE gocql_test.large_size_query(id int, text_col text, PRIMARY KEY (id))"); err != nil {
3298+
t.Fatal(err)
3299+
}
3300+
3301+
defer session.Close()
3302+
3303+
longString := randomString(2_000_000)
3304+
3305+
err := session.Query("INSERT INTO gocql_test.large_size_query (id, text_col) VALUES (?, ?)", "1", longString).Exec()
3306+
if err != nil {
3307+
t.Fatal(err)
3308+
}
3309+
3310+
var result string
3311+
err = session.Query("SELECT text_col FROM gocql_test.large_size_query").Scan(&result)
3312+
if err != nil {
3313+
t.Fatal(err)
3314+
}
3315+
3316+
assertEqual(t, "result should equal inserted longString", longString, result)
3317+
}
3318+
3319+
func randomString(n int) string {
3320+
source := rand.NewSource(time.Now().UnixMilli())
3321+
r := rand.New(source)
3322+
var aplhabet = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
3323+
buf := make([]byte, n)
3324+
for i := 0; i < n; i++ {
3325+
buf[i] = aplhabet[r.Intn(len(aplhabet))]
3326+
}
3327+
return string(buf)
3328+
}

common_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,8 @@ func createCluster(opts ...func(*ClusterConfig)) *ClusterConfig {
111111
switch *flagCompressTest {
112112
case "snappy":
113113
cluster.Compressor = &SnappyCompressor{}
114+
case "lz4":
115+
cluster.Compressor = &LZ4Compressor{}
114116
case "":
115117
default:
116118
panic("invalid compressor: " + *flagCompressTest)

compressor.go

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,17 @@
2525
package gocql
2626

2727
import (
28+
"encoding/binary"
29+
"fmt"
2830
"github.com/golang/snappy"
31+
"github.com/pierrec/lz4/v4"
2932
)
3033

3134
type Compressor interface {
3235
Name() string
3336
Encode(data []byte) ([]byte, error)
3437
Decode(data []byte) ([]byte, error)
38+
DecodeSized(data []byte, size uint32) ([]byte, error)
3539
}
3640

3741
// SnappyCompressor implements the Compressor interface and can be used to
@@ -50,3 +54,51 @@ func (s SnappyCompressor) Encode(data []byte) ([]byte, error) {
5054
func (s SnappyCompressor) Decode(data []byte) ([]byte, error) {
5155
return snappy.Decode(nil, data)
5256
}
57+
58+
func (s SnappyCompressor) DecodeSized(data []byte, size uint32) ([]byte, error) {
59+
buf := make([]byte, size)
60+
return snappy.Decode(buf, data)
61+
}
62+
63+
type LZ4Compressor struct{}
64+
65+
func (s LZ4Compressor) Name() string {
66+
return "lz4"
67+
}
68+
69+
func (s LZ4Compressor) Encode(data []byte) ([]byte, error) {
70+
buf := make([]byte, lz4.CompressBlockBound(len(data)+4))
71+
var compressor lz4.Compressor
72+
n, err := compressor.CompressBlock(data, buf[4:])
73+
// According to lz4.CompressBlock doc, it doesn't fail as long as the dst
74+
// buffer length is at least lz4.CompressBlockBound(len(data))) bytes, but
75+
// we check for error anyway just to be thorough.
76+
if err != nil {
77+
return nil, err
78+
}
79+
binary.BigEndian.PutUint32(buf, uint32(len(data)))
80+
return buf[:n+4], nil
81+
}
82+
83+
func (s LZ4Compressor) Decode(data []byte) ([]byte, error) {
84+
if len(data) < 4 {
85+
return nil, fmt.Errorf("cassandra lz4 block size should be >4, got=%d", len(data))
86+
}
87+
uncompressedLength := binary.BigEndian.Uint32(data)
88+
if uncompressedLength == 0 {
89+
return nil, nil
90+
}
91+
buf := make([]byte, uncompressedLength)
92+
n, err := lz4.UncompressBlock(data[4:], buf)
93+
return buf[:n], err
94+
}
95+
96+
func (s LZ4Compressor) DecodeSized(data []byte, size uint32) ([]byte, error) {
97+
buf := make([]byte, size)
98+
_, err := lz4.UncompressBlock(data, buf)
99+
if err != nil {
100+
return nil, err
101+
}
102+
103+
return buf, nil
104+
}

0 commit comments

Comments
 (0)