@@ -140,20 +140,19 @@ type SslOptions struct {
140140}
141141
142142type ConnConfig struct {
143- ProtoVersion int
144- CQLVersion string
145- WriteTimeout time.Duration
146- ReadTimeout time.Duration
147- ConnectTimeout time.Duration
148- Dialer Dialer
149- HostDialer HostDialer
150- Compressor Compressor
151- Authenticator Authenticator
152- AuthProvider func (h * HostInfo ) (Authenticator , error )
153- Keepalive time.Duration
154- Logger StdLogger
155-
143+ Dialer Dialer
144+ Logger StdLogger
145+ Authenticator Authenticator
146+ Compressor Compressor
147+ HostDialer HostDialer
148+ AuthProvider func (h * HostInfo ) (Authenticator , error )
156149 tlsConfig * tls.Config
150+ CQLVersion string
151+ ConnectTimeout time.Duration
152+ ReadTimeout time.Duration
153+ WriteTimeout time.Duration
154+ ProtoVersion int
155+ Keepalive time.Duration
157156 disableCoalesce bool
158157}
159158
@@ -189,52 +188,43 @@ type ConnInterface interface {
189188// queries, but users are usually advised to use a more reliable, higher
190189// level API.
191190type Conn struct {
192- conn net.Conn
193- r * bufio.Reader
194- w contextWriter
195-
196- systemRequestTimeout time.Duration
197- usingTimeoutClause string
198- readTimeout atomic.Int64
199- writeTimeout atomic.Int64
200- cfg * ConnConfig
201- frameObserver FrameHeaderObserver
202- streamObserver StreamObserver
203-
204- headerBuf [headSize ]byte
205-
206- streams * streams.IDGenerator
207- mu sync.Mutex
191+ auth Authenticator
192+ streamObserver StreamObserver
193+ w contextWriter
194+ logger StdLogger
195+ frameObserver FrameHeaderObserver
196+ ctx context.Context
197+ errorHandler ConnErrorHandler
198+ compressor Compressor
199+ conn net.Conn
200+ cfg * ConnConfig
201+ supported map [string ][]string
202+ streams * streams.IDGenerator
203+ host * HostInfo
208204 // calls stores a map from stream ID to callReq.
209205 // This map is protected by mu.
210206 // calls should not be used when closed is true, calls is set to nil when closed=true.
211- calls map [int ]* callReq
212-
213- errorHandler ConnErrorHandler
214- compressor Compressor
215- auth Authenticator
216- addr string
217-
218- version uint8
219- currentKeyspace string
220- host * HostInfo
221- supported map [string ][]string
222- scyllaSupported scyllaSupported
223- cqlProtoExts []cqlProtocolExtension
224- isSchemaV2 bool
225-
226- session * Session
227-
207+ calls map [int ]* callReq
208+ r * bufio.Reader
209+ session * Session
210+ cancel context.CancelFunc
211+ addr string
212+ usingTimeoutClause string
213+ currentKeyspace string
214+ cqlProtoExts []cqlProtocolExtension
215+ scyllaSupported scyllaSupported
216+ systemRequestTimeout time.Duration
217+ writeTimeout atomic.Int64
218+ timeouts int64
219+ readTimeout atomic.Int64
220+ mu sync.Mutex
221+ tabletsRoutingV1 int32
222+ headerBuf [headSize ]byte
228223 // true if connection close process for the connection started.
229224 // closed is protected by mu.
230- closed bool
231- ctx context.Context
232- cancel context.CancelFunc
233-
234- timeouts int64
235-
236- logger StdLogger
237- tabletsRoutingV1 int32
225+ closed bool
226+ isSchemaV2 bool
227+ version uint8
238228}
239229
240230func (c * Conn ) getIsSchemaV2 () bool {
@@ -887,16 +877,13 @@ func (c *Conn) releaseStream(call *callReq) {
887877}
888878
889879type callReq struct {
880+ // streamObserverContext is notified about events regarding this stream
881+ streamObserverContext StreamObserverContext
890882 // resp will receive the frame that was sent as a response to this stream.
891883 resp chan callResp
892884 timeout chan struct {} // indicates to recv() that a call has timed out
893- streamID int // current stream in use
894-
895- timer * time.Timer
896-
897- // streamObserverContext is notified about events regarding this stream
898- streamObserverContext StreamObserverContext
899-
885+ timer * time.Timer
886+ streamID int // current stream in use
900887 // streamObserverEndOnce ensures that either StreamAbandoned or StreamFinished is called,
901888 // but not both.
902889 streamObserverEndOnce sync.Once
@@ -932,14 +919,13 @@ type deadlineWriter interface {
932919}
933920
934921type deadlineContextWriter struct {
935- w deadlineWriter
936- timeout atomic.Int64
922+ w deadlineWriter
937923 // semaphore protects critical section for SetWriteDeadline/Write.
938924 // It is a channel with capacity 1.
939925 semaphore chan struct {}
940-
941926 // quit closed once the connection is closed.
942- quit chan struct {}
927+ quit chan struct {}
928+ timeout atomic.Int64
943929}
944930
945931func (c * deadlineContextWriter ) setWriteTimeout (timeout time.Duration ) {
@@ -985,17 +971,13 @@ func newWriteCoalescer(conn deadlineWriter, writeTimeout, coalesceDuration time.
985971}
986972
987973type writeCoalescer struct {
988- c deadlineWriter
989-
990- mu sync.Mutex
991-
992- quit <- chan struct {}
993- writeCh chan writeRequest
994-
995- timeout atomic.Int64
996-
974+ c deadlineWriter
975+ quit <- chan struct {}
976+ writeCh chan writeRequest
997977 testEnqueuedHook func ()
998978 testFlushedHook func ()
979+ timeout atomic.Int64
980+ mu sync.Mutex
999981}
1000982
1001983func (w * writeCoalescer ) setWriteTimeout (timeout time.Duration ) {
@@ -1010,8 +992,8 @@ type writeRequest struct {
1010992}
1011993
1012994type writeResult struct {
1013- n int
1014995 err error
996+ n int
1015997}
1016998
1017999// writeContext implements contextWriter.
@@ -1345,8 +1327,8 @@ type StreamObserverContext interface {
13451327
13461328type preparedStatment struct {
13471329 id []byte
1348- request preparedMetadata
13491330 response resultMetadata
1331+ request preparedMetadata
13501332}
13511333
13521334type inflightPrepare struct {
@@ -1869,9 +1851,9 @@ func getSchemaAgreement(queryLocalSchemasRows []string, querySystemPeersRows []s
18691851type schemaAgreementHost struct {
18701852 DataCenter string
18711853 Rack string
1854+ RPCAddress string
18721855 HostID UUID
18731856 SchemaVersion UUID
1874- RPCAddress string
18751857}
18761858
18771859func (h * schemaAgreementHost ) IsValid () bool {
0 commit comments