Skip to content

Commit f47bf6a

Browse files
CASSGO-26 consistency serial was added
The user should be able to set consistency to SERIAL or LOCAL_SERIAL for Paxos reads, but the previous implementation doesn't support such a feature. patch by Mykyta Oleksiienko; reviewed by João Reis and James Harting for CASSGO-26
1 parent f3d13d4 commit f47bf6a

File tree

5 files changed

+125
-45
lines changed

5 files changed

+125
-45
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
99

1010
### Added
1111

12+
- Allow SERIAL and LOCAL_SERIAL on SELECT statements [CASSGO-26](https://issues.apache.org/jira/browse/CASSGO-26)
13+
1214
### Changed
1315

1416
- Don't restrict server authenticator unless PasswordAuthentictor.AllowedAuthenticators is provided (CASSGO-19)

cassandra_test.go

Lines changed: 88 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,9 +44,9 @@ import (
4444
"time"
4545
"unicode"
4646

47-
"github.com/stretchr/testify/require"
47+
inf "gopkg.in/inf.v0"
4848

49-
"gopkg.in/inf.v0"
49+
"github.com/stretchr/testify/require"
5050
)
5151

5252
func TestEmptyHosts(t *testing.T) {
@@ -555,6 +555,92 @@ func TestCAS(t *testing.T) {
555555
}
556556
}
557557

558+
func TestConsistencySerial(t *testing.T) {
559+
session := createSession(t)
560+
defer session.Close()
561+
562+
type testStruct struct {
563+
name string
564+
id int
565+
consistency Consistency
566+
expectedPanicValue string
567+
}
568+
569+
testCases := []testStruct{
570+
{
571+
name: "Any",
572+
consistency: Any,
573+
expectedPanicValue: "serial consistency can only be SERIAL or LOCAL_SERIAL got ANY",
574+
}, {
575+
name: "One",
576+
consistency: One,
577+
expectedPanicValue: "serial consistency can only be SERIAL or LOCAL_SERIAL got ONE",
578+
}, {
579+
name: "Two",
580+
consistency: Two,
581+
expectedPanicValue: "serial consistency can only be SERIAL or LOCAL_SERIAL got TWO",
582+
}, {
583+
name: "Three",
584+
consistency: Three,
585+
expectedPanicValue: "serial consistency can only be SERIAL or LOCAL_SERIAL got THREE",
586+
}, {
587+
name: "Quorum",
588+
consistency: Quorum,
589+
expectedPanicValue: "serial consistency can only be SERIAL or LOCAL_SERIAL got QUORUM",
590+
}, {
591+
name: "LocalQuorum",
592+
consistency: LocalQuorum,
593+
expectedPanicValue: "serial consistency can only be SERIAL or LOCAL_SERIAL got LOCAL_QUORUM",
594+
}, {
595+
name: "EachQuorum",
596+
consistency: EachQuorum,
597+
expectedPanicValue: "serial consistency can only be SERIAL or LOCAL_SERIAL got EACH_QUORUM",
598+
}, {
599+
name: "Serial",
600+
id: 8,
601+
consistency: Serial,
602+
expectedPanicValue: "",
603+
}, {
604+
name: "LocalSerial",
605+
id: 9,
606+
consistency: LocalSerial,
607+
expectedPanicValue: "",
608+
}, {
609+
name: "LocalOne",
610+
consistency: LocalOne,
611+
expectedPanicValue: "serial consistency can only be SERIAL or LOCAL_SERIAL got LOCAL_ONE",
612+
},
613+
}
614+
615+
err := session.Query("CREATE TABLE IF NOT EXISTS gocql_test.consistency_serial (id int PRIMARY KEY)").Exec()
616+
if err != nil {
617+
t.Fatalf("can't create consistency_serial table:%v", err)
618+
}
619+
620+
for _, tc := range testCases {
621+
t.Run(tc.name, func(t *testing.T) {
622+
if tc.expectedPanicValue == "" {
623+
err = session.Query("INSERT INTO gocql_test.consistency_serial (id) VALUES (?)", tc.id).SerialConsistency(tc.consistency).Exec()
624+
if err != nil {
625+
t.Fatal(err)
626+
}
627+
628+
var receivedID int
629+
err = session.Query("SELECT * FROM gocql_test.consistency_serial WHERE id=?", tc.id).Scan(&receivedID)
630+
if err != nil {
631+
t.Fatal(err)
632+
}
633+
634+
require.Equal(t, tc.id, receivedID)
635+
} else {
636+
require.PanicsWithValue(t, tc.expectedPanicValue, func() {
637+
session.Query("INSERT INTO gocql_test.consistency_serial (id) VALUES (?)", tc.id).SerialConsistency(tc.consistency)
638+
})
639+
}
640+
})
641+
}
642+
}
643+
558644
func TestDurationType(t *testing.T) {
559645
session := createSession(t)
560646
defer session.Close()

cluster.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ type ClusterConfig struct {
156156

157157
// Consistency for the serial part of queries, values can be either SERIAL or LOCAL_SERIAL.
158158
// Default: unset
159-
SerialConsistency SerialConsistency
159+
SerialConsistency Consistency
160160

161161
// SslOpts configures TLS use when HostDialer is not set.
162162
// SslOpts is ignored if HostDialer is set.

frame.go

Lines changed: 20 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,9 @@ const (
192192

193193
type Consistency uint16
194194

195+
// SerialConsistency is deprecated. Use Consistency instead.
196+
type SerialConsistency = Consistency
197+
195198
const (
196199
Any Consistency = 0x00
197200
One Consistency = 0x01
@@ -201,6 +204,8 @@ const (
201204
All Consistency = 0x05
202205
LocalQuorum Consistency = 0x06
203206
EachQuorum Consistency = 0x07
207+
Serial Consistency = 0x08
208+
LocalSerial Consistency = 0x09
204209
LocalOne Consistency = 0x0A
205210
)
206211

@@ -224,6 +229,10 @@ func (c Consistency) String() string {
224229
return "EACH_QUORUM"
225230
case LocalOne:
226231
return "LOCAL_ONE"
232+
case Serial:
233+
return "SERIAL"
234+
case LocalSerial:
235+
return "LOCAL_SERIAL"
227236
default:
228237
return fmt.Sprintf("UNKNOWN_CONS_0x%x", uint16(c))
229238
}
@@ -253,13 +262,21 @@ func (c *Consistency) UnmarshalText(text []byte) error {
253262
*c = EachQuorum
254263
case "LOCAL_ONE":
255264
*c = LocalOne
265+
case "SERIAL":
266+
*c = Serial
267+
case "LOCAL_SERIAL":
268+
*c = LocalSerial
256269
default:
257270
return fmt.Errorf("invalid consistency %q", string(text))
258271
}
259272

260273
return nil
261274
}
262275

276+
func (c Consistency) isSerial() bool {
277+
return c == Serial || c == LocalSerial
278+
279+
}
263280
func ParseConsistency(s string) Consistency {
264281
var c Consistency
265282
if err := c.UnmarshalText([]byte(strings.ToUpper(s))); err != nil {
@@ -275,41 +292,6 @@ func ParseConsistencyWrapper(s string) (consistency Consistency, err error) {
275292
return
276293
}
277294

278-
type SerialConsistency uint16
279-
280-
const (
281-
Serial SerialConsistency = 0x08
282-
LocalSerial SerialConsistency = 0x09
283-
)
284-
285-
func (s SerialConsistency) String() string {
286-
switch s {
287-
case Serial:
288-
return "SERIAL"
289-
case LocalSerial:
290-
return "LOCAL_SERIAL"
291-
default:
292-
return fmt.Sprintf("UNKNOWN_SERIAL_CONS_0x%x", uint16(s))
293-
}
294-
}
295-
296-
func (s SerialConsistency) MarshalText() (text []byte, err error) {
297-
return []byte(s.String()), nil
298-
}
299-
300-
func (s *SerialConsistency) UnmarshalText(text []byte) error {
301-
switch string(text) {
302-
case "SERIAL":
303-
*s = Serial
304-
case "LOCAL_SERIAL":
305-
*s = LocalSerial
306-
default:
307-
return fmt.Errorf("invalid consistency %q", string(text))
308-
}
309-
310-
return nil
311-
}
312-
313295
const (
314296
apacheCassandraTypePrefix = "org.apache.cassandra.db.marshal."
315297
)
@@ -1441,7 +1423,7 @@ type queryParams struct {
14411423
values []queryValues
14421424
pageSize int
14431425
pagingState []byte
1444-
serialConsistency SerialConsistency
1426+
serialConsistency Consistency
14451427
// v3+
14461428
defaultTimestamp bool
14471429
defaultTimestampValue int64
@@ -1530,7 +1512,7 @@ func (f *framer) writeQueryParams(opts *queryParams) {
15301512
}
15311513

15321514
if opts.serialConsistency > 0 {
1533-
f.writeConsistency(Consistency(opts.serialConsistency))
1515+
f.writeConsistency(opts.serialConsistency)
15341516
}
15351517

15361518
if f.proto > protoVersion2 && opts.defaultTimestamp {
@@ -1642,7 +1624,7 @@ type writeBatchFrame struct {
16421624
consistency Consistency
16431625

16441626
// v3+
1645-
serialConsistency SerialConsistency
1627+
serialConsistency Consistency
16461628
defaultTimestamp bool
16471629
defaultTimestampValue int64
16481630

session.go

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,10 @@ func NewSession(cfg ClusterConfig) (*Session, error) {
144144
return nil, errors.New("Can't use both Authenticator and AuthProvider in cluster config.")
145145
}
146146

147+
if cfg.SerialConsistency > 0 && !cfg.SerialConsistency.isSerial() {
148+
return nil, fmt.Errorf("the default SerialConsistency level is not allowed to be anything else but SERIAL or LOCAL_SERIAL. Recived value: %v", cfg.SerialConsistency)
149+
}
150+
147151
// TODO: we should take a context in here at some point
148152
ctx, cancel := context.WithCancel(context.TODO())
149153

@@ -928,7 +932,7 @@ type Query struct {
928932
rt RetryPolicy
929933
spec SpeculativeExecutionPolicy
930934
binding func(q *QueryInfo) ([]interface{}, error)
931-
serialCons SerialConsistency
935+
serialCons Consistency
932936
defaultTimestamp bool
933937
defaultTimestampValue int64
934938
disableSkipMetadata bool
@@ -1277,7 +1281,10 @@ func (q *Query) Bind(v ...interface{}) *Query {
12771281
// either SERIAL or LOCAL_SERIAL and if not present, it defaults to
12781282
// SERIAL. This option will be ignored for anything else that a
12791283
// conditional update/insert.
1280-
func (q *Query) SerialConsistency(cons SerialConsistency) *Query {
1284+
func (q *Query) SerialConsistency(cons Consistency) *Query {
1285+
if !cons.isSerial() {
1286+
panic("serial consistency can only be SERIAL or LOCAL_SERIAL got " + cons.String())
1287+
}
12811288
q.serialCons = cons
12821289
return q
12831290
}
@@ -1754,7 +1761,7 @@ type Batch struct {
17541761
trace Tracer
17551762
observer BatchObserver
17561763
session *Session
1757-
serialCons SerialConsistency
1764+
serialCons Consistency
17581765
defaultTimestamp bool
17591766
defaultTimestampValue int64
17601767
context context.Context
@@ -1929,7 +1936,10 @@ func (b *Batch) Size() int {
19291936
// conditional update/insert.
19301937
//
19311938
// Only available for protocol 3 and above
1932-
func (b *Batch) SerialConsistency(cons SerialConsistency) *Batch {
1939+
func (b *Batch) SerialConsistency(cons Consistency) *Batch {
1940+
if !cons.isSerial() {
1941+
panic("serial consistency can only be SERIAL or LOCAL_SERIAL got " + cons.String())
1942+
}
19331943
b.serialCons = cons
19341944
return b
19351945
}

0 commit comments

Comments
 (0)