Skip to content

Commit 8408080

Browse files

33 files changed

+1692
-1662
lines changed

batch_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ func TestBatch_Errors(t *testing.T) {
3939
session := createSession(t)
4040
defer session.Close()
4141

42-
if session.cfg.ProtoVersion < protoVersion2 {
42+
if session.cfg.ProtoVersion < internal.ProtoVersion2 {
4343
t.Skip("atomic batches not supported. Please use Cassandra >= 2.0")
4444
}
4545

@@ -58,7 +58,7 @@ func TestBatch_WithTimestamp(t *testing.T) {
5858
session := createSession(t)
5959
defer session.Close()
6060

61-
if session.cfg.ProtoVersion < protoVersion3 {
61+
if session.cfg.ProtoVersion < internal.ProtoVersion3 {
6262
t.Skip("Batch timestamps are only available on protocol >= 3")
6363
}
6464

cassandra_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1120,7 +1120,7 @@ func TestSmallInt(t *testing.T) {
11201120
session := createSession(t)
11211121
defer session.Close()
11221122

1123-
if session.cfg.ProtoVersion < protoVersion4 {
1123+
if session.cfg.ProtoVersion < internal.ProtoVersion4 {
11241124
t.Skip("smallint is only supported in cassandra 2.2+")
11251125
}
11261126

@@ -2146,7 +2146,7 @@ func TestGetTableMetadata(t *testing.T) {
21462146
if testTable == nil {
21472147
t.Fatal("Expected table metadata for name 'test_table_metadata'")
21482148
}
2149-
if session.cfg.ProtoVersion == protoVersion1 {
2149+
if session.cfg.ProtoVersion == internal.ProtoVersion1 {
21502150
if testTable.KeyValidator != "org.apache.cassandra.db.marshal.Int32Type" {
21512151
t.Errorf("Expected test_table_metadata key validator to be 'org.apache.cassandra.db.marshal.Int32Type' but was '%s'", testTable.KeyValidator)
21522152
}
@@ -2813,7 +2813,7 @@ func TestNegativeStream(t *testing.T) {
28132813

28142814
const stream = -50
28152815
writer := frameWriterFunc(func(f *framer, streamID int) error {
2816-
f.writeHeader(0, opOptions, stream)
2816+
f.writeHeader(0, internal.OpOptions, stream)
28172817
return f.finish()
28182818
})
28192819

@@ -3116,7 +3116,7 @@ func TestUnmarshallNestedTypes(t *testing.T) {
31163116
session := createSession(t)
31173117
defer session.Close()
31183118

3119-
if session.cfg.ProtoVersion < protoVersion3 {
3119+
if session.cfg.ProtoVersion < internal.ProtoVersion3 {
31203120
t.Skip("can not have frozen types in cassandra < 2.1.3")
31213121
}
31223122

cluster.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@ type ClusterConfig struct {
174174
ReconnectInterval time.Duration
175175

176176
// The maximum amount of time to wait for schema agreement in a cluster after
177-
// receiving a schema change frame. (default: 60s)
177+
// receiving a schema change internal.Frame. (default: 60s)
178178
MaxWaitSchemaAgreement time.Duration
179179

180180
// HostFilter will filter all incoming events for host, any which don't pass

conn.go

Lines changed: 32 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -39,24 +39,11 @@ import (
3939
"sync/atomic"
4040
"time"
4141

42+
"github.com/gocql/gocql/internal"
4243
"github.com/gocql/gocql/internal/lru"
4344
"github.com/gocql/gocql/internal/streams"
4445
)
4546

46-
// approve the authenticator with the list of allowed authenticators. If the provided list is empty,
47-
// the given authenticator is allowed.
48-
func approve(authenticator string, approvedAuthenticators []string) bool {
49-
if len(approvedAuthenticators) == 0 {
50-
return true
51-
}
52-
for _, s := range approvedAuthenticators {
53-
if authenticator == s {
54-
return true
55-
}
56-
}
57-
return false
58-
}
59-
6047
// JoinHostPort is a utility to return an address string that can be used
6148
// by `gocql.Conn` to form a connection with a host.
6249
func JoinHostPort(addr string, port int) string {
@@ -85,7 +72,7 @@ type PasswordAuthenticator struct {
8572
}
8673

8774
func (p PasswordAuthenticator) Challenge(req []byte) ([]byte, Authenticator, error) {
88-
if !approve(string(req), p.AllowedAuthenticators) {
75+
if !internal.Approve(string(req), p.AllowedAuthenticators) {
8976
return nil, nil, fmt.Errorf("unexpected authenticator %q", req)
9077
}
9178
resp := make([]byte, 2+len(p.Username)+len(p.Password))
@@ -188,7 +175,7 @@ type Conn struct {
188175
frameObserver FrameHeaderObserver
189176
streamObserver StreamObserver
190177

191-
headerBuf [maxFrameHeaderSize]byte
178+
headerBuf [internal.MaxFrameHeaderSize]byte
192179

193180
streams *streams.IDGenerator
194181
mu sync.Mutex
@@ -406,7 +393,7 @@ func (s *startupCoordinator) setupConn(ctx context.Context) error {
406393
return nil
407394
}
408395

409-
func (s *startupCoordinator) write(ctx context.Context, frame frameBuilder) (frame, error) {
396+
func (s *startupCoordinator) write(ctx context.Context, frame frameBuilder) (internal.Frame, error) {
410397
select {
411398
case s.frameTicker <- struct{}{}:
412399
case <-ctx.Done():
@@ -585,23 +572,23 @@ func (c *Conn) serve(ctx context.Context) {
585572
c.closeWithError(err)
586573
}
587574

588-
func (c *Conn) discardFrame(head frameHeader) error {
589-
_, err := io.CopyN(ioutil.Discard, c, int64(head.length))
575+
func (c *Conn) discardFrame(head internal.FrameHeader) error {
576+
_, err := io.CopyN(ioutil.Discard, c, int64(head.Length))
590577
if err != nil {
591578
return err
592579
}
593580
return nil
594581
}
595582

596583
type protocolError struct {
597-
frame frame
584+
frame internal.Frame
598585
}
599586

600587
func (p *protocolError) Error() string {
601588
if err, ok := p.frame.(error); ok {
602589
return err.Error()
603590
}
604-
return fmt.Sprintf("gocql: received unexpected frame on stream %d: %v", p.frame.Header().stream, p.frame)
591+
return fmt.Sprintf("gocql: received unexpected frame on stream %d: %v", p.frame.Header().Stream, p.frame)
605592
}
606593

607594
func (c *Conn) heartBeat(ctx context.Context) {
@@ -670,28 +657,28 @@ func (c *Conn) recv(ctx context.Context) error {
670657

671658
if c.frameObserver != nil {
672659
c.frameObserver.ObserveFrameHeader(context.Background(), ObservedFrameHeader{
673-
Version: protoVersion(head.version),
674-
Flags: head.flags,
675-
Stream: int16(head.stream),
676-
Opcode: frameOp(head.op),
677-
Length: int32(head.length),
660+
Version: internal.ProtoVersion(head.Version),
661+
Flags: head.Flags,
662+
Stream: int16(head.Stream),
663+
Opcode: internal.FrameOp(head.Op),
664+
Length: int32(head.Length),
678665
Start: headStartTime,
679666
End: headEndTime,
680667
Host: c.host,
681668
})
682669
}
683670

684-
if head.stream > c.streams.NumStreams {
685-
return fmt.Errorf("gocql: frame header stream is beyond call expected bounds: %d", head.stream)
686-
} else if head.stream == -1 {
671+
if head.Stream > c.streams.NumStreams {
672+
return fmt.Errorf("gocql: frame header stream is beyond call expected bounds: %d", head.Stream)
673+
} else if head.Stream == -1 {
687674
// TODO: handle cassandra event frames, we shouldnt get any currently
688675
framer := newFramer(c.compressor, c.version)
689676
if err := framer.readFrame(c, &head); err != nil {
690677
return err
691678
}
692679
go c.session.handleEvent(framer)
693680
return nil
694-
} else if head.stream <= 0 {
681+
} else if head.Stream <= 0 {
695682
// reserved stream that we dont use, probably due to a protocol error
696683
// or a bug in Cassandra, this should be an error, parse it and return.
697684
framer := newFramer(c.compressor, c.version)
@@ -714,14 +701,14 @@ func (c *Conn) recv(ctx context.Context) error {
714701
c.mu.Unlock()
715702
return ErrConnectionClosed
716703
}
717-
call, ok := c.calls[head.stream]
718-
delete(c.calls, head.stream)
704+
call, ok := c.calls[head.Stream]
705+
delete(c.calls, head.Stream)
719706
c.mu.Unlock()
720707
if call == nil || !ok {
721708
c.logger.Printf("gocql: received response for stream which has no handler: header=%v\n", head)
722709
return c.discardFrame(head)
723-
} else if head.stream != call.streamID {
724-
panic(fmt.Sprintf("call has incorrect streamID: got %d expected %d", call.streamID, head.stream))
710+
} else if head.Stream != call.streamID {
711+
panic(fmt.Sprintf("call has incorrect streamID: got %d expected %d", call.streamID, head.Stream))
725712
}
726713

727714
framer := newFramer(c.compressor, c.version)
@@ -1150,7 +1137,7 @@ func (c *Conn) exec(ctx context.Context, req frameBuilder, tracer Tracer) (*fram
11501137
// requests on the stream to prevent nil pointer dereferences in recv().
11511138
defer c.releaseStream(call)
11521139

1153-
if v := resp.framer.header.version.version(); v != c.version {
1140+
if v := resp.framer.header.Version.Version(); v != c.version {
11541141
return nil, NewErrProtocol("unexpected protocol version in response: got %d expected %d", v, c.version)
11551142
}
11561143

@@ -1244,7 +1231,7 @@ func (c *Conn) prepareStatement(ctx context.Context, stmt string, tracer Tracer)
12441231
prep := &writePrepareFrame{
12451232
statement: stmt,
12461233
}
1247-
if c.version > protoVersion4 {
1234+
if c.version > internal.ProtoVersion4 {
12481235
prep.keyspace = c.currentKeyspace
12491236
}
12501237

@@ -1276,7 +1263,7 @@ func (c *Conn) prepareStatement(ctx context.Context, stmt string, tracer Tracer)
12761263
flight.preparedStatment = &preparedStatment{
12771264
// defensively copy as we will recycle the underlying buffer after we
12781265
// return.
1279-
id: copyBytes(x.preparedID),
1266+
id: internal.CopyBytes(x.preparedID),
12801267
// the type info's should _not_ have a reference to the framers read buffer,
12811268
// therefore we can just copy them directly.
12821269
request: x.reqMeta,
@@ -1303,12 +1290,12 @@ func (c *Conn) prepareStatement(ctx context.Context, stmt string, tracer Tracer)
13031290
}
13041291

13051292
func marshalQueryValue(typ TypeInfo, value interface{}, dst *queryValues) error {
1306-
if named, ok := value.(*namedValue); ok {
1307-
dst.name = named.name
1308-
value = named.value
1293+
if named, ok := value.(*internal.NamedValue); ok {
1294+
dst.name = named.Name
1295+
value = named.Value
13091296
}
13101297

1311-
if _, ok := value.(unsetColumn); !ok {
1298+
if _, ok := value.(internal.UnsetColumn); !ok {
13121299
val, err := Marshal(typ, value)
13131300
if err != nil {
13141301
return err
@@ -1338,7 +1325,7 @@ func (c *Conn) executeQuery(ctx context.Context, qry *Query) *Iter {
13381325
if qry.pageSize > 0 {
13391326
params.pageSize = qry.pageSize
13401327
}
1341-
if c.version > protoVersion4 {
1328+
if c.version > internal.ProtoVersion4 {
13421329
params.keyspace = c.currentKeyspace
13431330
}
13441331

@@ -1431,7 +1418,7 @@ func (c *Conn) executeQuery(ctx context.Context, qry *Query) *Iter {
14311418
if params.skipMeta {
14321419
if info != nil {
14331420
iter.meta = info.response
1434-
iter.meta.pagingState = copyBytes(x.meta.pagingState)
1421+
iter.meta.pagingState = internal.CopyBytes(x.meta.pagingState)
14351422
} else {
14361423
return &Iter{framer: framer, err: errors.New("gocql: did not receive metadata but prepared info is nil")}
14371424
}
@@ -1442,7 +1429,7 @@ func (c *Conn) executeQuery(ctx context.Context, qry *Query) *Iter {
14421429
if x.meta.morePages() && !qry.disableAutoPage {
14431430
newQry := new(Query)
14441431
*newQry = *qry
1445-
newQry.pageState = copyBytes(x.meta.pagingState)
1432+
newQry.pageState = internal.CopyBytes(x.meta.pagingState)
14461433
newQry.metrics = &queryMetrics{m: make(map[string]*hostMetrics)}
14471434

14481435
iter.next = &nextIter{
@@ -1531,7 +1518,7 @@ func (c *Conn) UseKeyspace(keyspace string) error {
15311518
}
15321519

15331520
func (c *Conn) executeBatch(ctx context.Context, batch *Batch) *Iter {
1534-
if c.version == protoVersion1 {
1521+
if c.version == internal.ProtoVersion1 {
15351522
return &Iter{err: ErrUnsupported}
15361523
}
15371524

0 commit comments

Comments
 (0)