Skip to content

Commit c2337f9

Browse files
committed
Evacuate some frames to a separate package
We need that to be able to implement code that target these frames on a separate package.
1 parent d7223bb commit c2337f9

File tree

16 files changed

+630
-593
lines changed

16 files changed

+630
-593
lines changed

cassandra_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ import (
4343
"time"
4444
"unicode"
4545

46+
frm "github.com/gocql/gocql/internal/frame"
4647
"github.com/gocql/gocql/internal/tests"
4748

4849
"github.com/stretchr/testify/require"
@@ -2484,7 +2485,7 @@ func TestNegativeStream(t *testing.T) {
24842485

24852486
const stream = -50
24862487
writer := frameWriterFunc(func(f *framer, streamID int) error {
2487-
f.writeHeader(0, opOptions, stream)
2488+
f.writeHeader(0, frm.OpOptions, stream)
24882489
return f.finish()
24892490
})
24902491

conn.go

Lines changed: 31 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ import (
3939
"sync/atomic"
4040
"time"
4141

42+
frm "github.com/gocql/gocql/internal/frame"
4243
"github.com/gocql/gocql/tablets"
4344

4445
"github.com/gocql/gocql/internal/lru"
@@ -509,12 +510,12 @@ func (s *startupCoordinator) options(ctx context.Context) error {
509510
return err
510511
}
511512

512-
v, ok := frame.(*supportedFrame)
513+
v, ok := frame.(*frm.SupportedFrame)
513514
if !ok {
514515
return NewErrProtocol("Unknown type of response to startup frame: %T", frame)
515516
}
516517
// Keep raw supported multimap for debug purposes
517-
s.conn.supported = v.supported
518+
s.conn.supported = v.Supported
518519
s.conn.scyllaSupported = parseSupported(s.conn.supported, s.conn.logger)
519520
s.conn.recalculateSystemRequestTimeout()
520521
s.conn.host.setScyllaSupported(s.conn.scyllaSupported)
@@ -564,21 +565,21 @@ func (s *startupCoordinator) startup(ctx context.Context) error {
564565
switch v := frame.(type) {
565566
case error:
566567
return v
567-
case *readyFrame:
568+
case *frm.ReadyFrame:
568569
return nil
569-
case *authenticateFrame:
570+
case *frm.AuthenticateFrame:
570571
return s.authenticateHandshake(ctx, v)
571572
default:
572573
return NewErrProtocol("Unknown type of response to startup frame: %s", v)
573574
}
574575
}
575576

576-
func (s *startupCoordinator) authenticateHandshake(ctx context.Context, authFrame *authenticateFrame) error {
577+
func (s *startupCoordinator) authenticateHandshake(ctx context.Context, authFrame *frm.AuthenticateFrame) error {
577578
if s.conn.auth == nil {
578-
return fmt.Errorf("authentication required (using %q)", authFrame.class)
579+
return fmt.Errorf("authentication required (using %q)", authFrame.Class)
579580
}
580581

581-
resp, challenger, err := s.conn.auth.Challenge([]byte(authFrame.class))
582+
resp, challenger, err := s.conn.auth.Challenge([]byte(authFrame.Class))
582583
if err != nil {
583584
return err
584585
}
@@ -593,13 +594,13 @@ func (s *startupCoordinator) authenticateHandshake(ctx context.Context, authFram
593594
switch v := frame.(type) {
594595
case error:
595596
return v
596-
case *authSuccessFrame:
597+
case *frm.AuthSuccessFrame:
597598
if challenger != nil {
598-
return challenger.Success(v.data)
599+
return challenger.Success(v.Data)
599600
}
600601
return nil
601-
case *authChallengeFrame:
602-
resp, challenger, err = challenger.Challenge(v.data)
602+
case *frm.AuthChallengeFrame:
603+
resp, challenger, err = challenger.Challenge(v.Data)
603604
if err != nil {
604605
return err
605606
}
@@ -696,8 +697,8 @@ func (c *Conn) serve(ctx context.Context) {
696697
c.closeWithError(err)
697698
}
698699

699-
func (c *Conn) discardFrame(head frameHeader) error {
700-
_, err := io.CopyN(ioutil.Discard, c, int64(head.length))
700+
func (c *Conn) discardFrame(head frm.FrameHeader) error {
701+
_, err := io.CopyN(ioutil.Discard, c, int64(head.Length))
701702
if err != nil {
702703
return err
703704
}
@@ -712,7 +713,7 @@ func (p *protocolError) Error() string {
712713
if err, ok := p.frame.(error); ok {
713714
return err.Error()
714715
}
715-
return fmt.Sprintf("gocql: received unexpected frame on stream %d: %v", p.frame.Header().stream, p.frame)
716+
return fmt.Sprintf("gocql: received unexpected frame on stream %d: %v", p.frame.Header().Stream, p.frame)
716717
}
717718

718719
func (c *Conn) heartBeat(ctx context.Context) {
@@ -750,7 +751,7 @@ func (c *Conn) heartBeat(ctx context.Context) {
750751
}
751752

752753
switch resp.(type) {
753-
case *supportedFrame:
754+
case *frm.SupportedFrame:
754755
// Everything ok
755756
sleepTime = 30 * time.Second
756757
failures = 0
@@ -781,20 +782,20 @@ func (c *Conn) recv(ctx context.Context) error {
781782

782783
if c.frameObserver != nil {
783784
c.frameObserver.ObserveFrameHeader(context.Background(), ObservedFrameHeader{
784-
Version: protoVersion(head.version),
785-
Flags: head.flags,
786-
Stream: int16(head.stream),
787-
Opcode: frameOp(head.op),
788-
Length: int32(head.length),
785+
Version: head.Version,
786+
Flags: head.Flags,
787+
Stream: int16(head.Stream),
788+
Opcode: head.Op,
789+
Length: int32(head.Length),
789790
Start: headStartTime,
790791
End: headEndTime,
791792
Host: c.host,
792793
})
793794
}
794795

795-
if head.stream > c.streams.NumStreams {
796-
return fmt.Errorf("gocql: frame header stream is beyond call expected bounds: %d", head.stream)
797-
} else if head.stream == -1 {
796+
if head.Stream > c.streams.NumStreams {
797+
return fmt.Errorf("gocql: frame header stream is beyond call expected bounds: %d", head.Stream)
798+
} else if head.Stream == -1 {
798799
// TODO: handle cassandra event frames, we shouldnt get any currently
799800
framer := newFramerWithExts(c.compressor, c.version, c.cqlProtoExts, c.logger)
800801
c.setTabletSupported(framer.tabletsRoutingV1)
@@ -803,7 +804,7 @@ func (c *Conn) recv(ctx context.Context) error {
803804
}
804805
go c.session.handleEvent(framer)
805806
return nil
806-
} else if head.stream <= 0 {
807+
} else if head.Stream <= 0 {
807808
// reserved stream that we dont use, probably due to a protocol error
808809
// or a bug in Cassandra, this should be an error, parse it and return.
809810
framer := newFramerWithExts(c.compressor, c.version, c.cqlProtoExts, c.logger)
@@ -827,14 +828,14 @@ func (c *Conn) recv(ctx context.Context) error {
827828
c.mu.Unlock()
828829
return ErrConnectionClosed
829830
}
830-
call, ok := c.calls[head.stream]
831-
delete(c.calls, head.stream)
831+
call, ok := c.calls[head.Stream]
832+
delete(c.calls, head.Stream)
832833
c.mu.Unlock()
833834
if call == nil || !ok {
834835
c.logger.Printf("gocql: received response for stream which has no handler: header=%v\n", head)
835836
return c.discardFrame(head)
836-
} else if head.stream != call.streamID {
837-
panic(fmt.Sprintf("call has incorrect streamID: got %d expected %d", call.streamID, head.stream))
837+
} else if head.Stream != call.streamID {
838+
panic(fmt.Sprintf("call has incorrect streamID: got %d expected %d", call.streamID, head.Stream))
838839
}
839840

840841
framer := newFramerWithExts(c.compressor, c.version, c.cqlProtoExts, c.logger)
@@ -1262,7 +1263,7 @@ func (c *Conn) exec(ctx context.Context, req frameBuilder, tracer Tracer, reques
12621263
// requests on the stream to prevent nil pointer dereferences in recv().
12631264
defer c.releaseStream(call)
12641265

1265-
if v := resp.framer.header.version.version(); v != c.version {
1266+
if v := resp.framer.header.Version.Version(); v != c.version {
12661267
return nil, &QueryError{err: NewErrProtocol("unexpected protocol version in response: got %d expected %d", v, c.version), potentiallyExecuted: true}
12671268
}
12681269

@@ -1610,7 +1611,7 @@ func (c *Conn) executeQuery(ctx context.Context, qry *Query) (iter *Iter) {
16101611
return iter
16111612
case *resultKeyspaceFrame:
16121613
return &Iter{framer: framer}
1613-
case *schemaChangeKeyspace, *schemaChangeTable, *schemaChangeFunction, *schemaChangeAggregate, *schemaChangeType:
1614+
case *frm.SchemaChangeKeyspace, *frm.SchemaChangeTable, *frm.SchemaChangeFunction, *frm.SchemaChangeAggregate, *frm.SchemaChangeType:
16141615
iter := &Iter{framer: framer}
16151616
if err := c.awaitSchemaAgreement(ctx); err != nil {
16161617
// TODO: should have this behind a flag

0 commit comments

Comments
 (0)