Skip to content

Commit 8486495

Browse files
committed
Add event bus
Create a bus that will allow consume different events from the Session. Targeted event types: 1. Cluster events that comes from Control connection, like: alter schema, topology change, node status change 2. Session events: Control Connection being recreated Session API should allow to subscribe to events, and unsubscribe, it should allow event filtering.
1 parent c2337f9 commit 8486495

15 files changed

+2431
-261
lines changed

cluster.go

Lines changed: 66 additions & 220 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ import (
3737
"time"
3838

3939
"github.com/gocql/gocql/internal/debug"
40+
"github.com/gocql/gocql/internal/eventbus"
4041
)
4142

4243
const defaultDriverName = "ScyllaDB GoCQL Driver"
@@ -61,228 +62,68 @@ func (p PoolConfig) buildPool(session *Session) *policyConnPool {
6162
// behavior to fit the most common use cases. Applications that require a
6263
// different setup must implement their own cluster.
6364
type ClusterConfig struct {
64-
// BatchObserver will set the provided batch observer on all queries created from this session.
65-
// Use it to collect metrics / stats from batch queries by providing an implementation of BatchObserver.
66-
BatchObserver BatchObserver
67-
// Dialer will be used to establish all connections created for this Cluster.
68-
// If not provided, a default dialer configured with ConnectTimeout will be used.
69-
// Dialer is ignored if HostDialer is provided.
70-
Dialer Dialer
71-
// ApplicationInfo reports application information to the server by inserting it into options of the STARTUP frame
72-
ApplicationInfo ApplicationInfo
73-
// DNSResolver Resolves DNS names to IP addresses
74-
DNSResolver DNSResolver
75-
// Logger for this ClusterConfig.
76-
// If not specified, defaults to the gocql.defaultLogger.
77-
Logger StdLogger
78-
// HostDialer will be used to establish all connections for this Cluster.
79-
// Unlike Dialer, HostDialer is responsible for setting up the entire connection, including the TLS session.
80-
// To support shard-aware port, HostDialer should implement ShardDialer.
81-
// If not provided, Dialer will be used instead.
82-
HostDialer HostDialer
83-
// StreamObserver will be notified of stream state changes.
84-
// This can be used to track in-flight protocol requests and responses.
85-
StreamObserver StreamObserver
86-
// FrameHeaderObserver will set the provided frame header observer on all frames' headers created from this session.
87-
// Use it to collect metrics / stats from frames by providing an implementation of FrameHeaderObserver.
88-
FrameHeaderObserver FrameHeaderObserver
89-
// ConnectObserver will set the provided connect observer on all queries
90-
// created from this session.
91-
ConnectObserver ConnectObserver
92-
// QueryObserver will set the provided query observer on all queries created from this session.
93-
// Use it to collect metrics / stats from queries by providing an implementation of QueryObserver.
94-
QueryObserver QueryObserver
95-
// AddressTranslator will translate addresses found on peer discovery and/or
96-
// node change events.
97-
AddressTranslator AddressTranslator
98-
// HostFilter will filter all incoming events for host, any which don't pass
99-
// the filter will be ignored. If set will take precedence over any options set
100-
// via Discovery
101-
HostFilter HostFilter
102-
// Compression algorithm.
103-
// Default: nil
104-
Compressor Compressor
105-
// Default: nil
106-
Authenticator Authenticator
107-
actualSslOpts atomic.Value
108-
// PoolConfig configures the underlying connection pool, allowing the
109-
// configuration of host selection and connection selection policies.
110-
PoolConfig PoolConfig
111-
// Default retry policy to use for queries.
112-
// Default: SimpleRetryPolicy{NumRetries: 3}.
113-
RetryPolicy RetryPolicy
114-
// ConvictionPolicy decides whether to mark host as down based on the error and host info.
115-
// Default: SimpleConvictionPolicy
116-
ConvictionPolicy ConvictionPolicy
117-
// Default reconnection policy to use for reconnecting before trying to mark host as down.
118-
ReconnectionPolicy ReconnectionPolicy
119-
// A reconnection policy to use for reconnecting when connecting to the cluster first time.
120-
InitialReconnectionPolicy ReconnectionPolicy
121-
WarningsHandlerBuilder WarningHandlerBuilder
122-
// SslOpts configures TLS use when HostDialer is not set.
123-
// SslOpts is ignored if HostDialer is set.
124-
SslOpts *SslOptions
125-
// An Authenticator factory. Can be used to create alternative authenticators.
126-
// Default: nil
127-
AuthProvider func(h *HostInfo) (Authenticator, error)
128-
// The version of the driver that is going to be reported to the server.
129-
// Defaulted to current library version
130-
DriverVersion string
131-
// The name of the driver that is going to be reported to the server.
132-
// Default: "ScyllaDB GoLang Driver"
133-
DriverName string
134-
// Initial keyspace. Optional.
135-
Keyspace string
136-
// CQL version (default: 3.0.0)
137-
CQLVersion string
138-
// addresses for the initial connections. It is recommended to use the value set in
139-
// the Cassandra config for broadcast_address or listen_address, an IP address not
140-
// a domain name. This is because events from Cassandra will use the configured IP
141-
// address, which is used to index connected hosts. If the domain name specified
142-
// resolves to more than 1 IP address then the driver may connect multiple times to
143-
// the same host, and will not mark the node being down or up from events.
144-
Hosts []string
145-
// The time to wait for frames before flushing the frames connection to Cassandra.
146-
// Can help reduce syscall overhead by making less calls to write. Set to 0 to
147-
// disable.
148-
//
149-
// (default: 200 microseconds)
150-
WriteCoalesceWaitTime time.Duration
151-
// WriteTimeout limits the time the driver waits to write a request to a network connection.
152-
// WriteTimeout should be lower than or equal to Timeout.
153-
// WriteTimeout defaults to the value of Timeout.
154-
WriteTimeout time.Duration
155-
// The keepalive period to use, enabled if > 0 (default: 15 seconds)
156-
// SocketKeepalive is used to set up the default dialer and is ignored if Dialer or HostDialer is provided.
157-
SocketKeepalive time.Duration
158-
// If not zero, gocql attempt to reconnect known DOWN nodes in every ReconnectInterval.
159-
ReconnectInterval time.Duration
160-
// The maximum amount of time to wait for schema agreement in a cluster after
161-
// receiving a schema change frame. (default: 60s)
162-
MaxWaitSchemaAgreement time.Duration
163-
// ProtoVersion sets the version of the native protocol to use, this will
164-
// enable features in the driver for specific protocol versions, generally this
165-
// should be set to a known version (2,3,4) for the cluster being connected to.
166-
//
167-
// If it is 0 or unset (the default) then the driver will attempt to discover the
168-
// highest supported protocol for the cluster. In clusters with nodes of different
169-
// versions the protocol selected is not defined (ie, it can be any of the supported in the cluster)
170-
ProtoVersion int
171-
// Maximum number of inflight requests allowed per connection.
172-
// Default: 32768 for CQL v3 and newer
173-
// Default: 128 for older CQL versions
174-
MaxRequestsPerConn int
175-
// Timeout defines the maximum time to wait for a single server response.
176-
// The default is 11 seconds, which is slightly higher than the default
177-
// server-side timeout for most query types.
178-
//
179-
// When a session creates a Query or Batch, it inherits this timeout as
180-
// the request timeout.
181-
//
182-
// Important notes:
183-
// 1. This value should be greater than the server timeout for all queries
184-
// you execute. Otherwise, you risk creating retry storms: the server
185-
// may still be processing the request while the client times out and retries.
186-
// 2. This timeout does not apply during initial connection setup.
187-
// For that, see ConnectTimeout.
188-
Timeout time.Duration
189-
// The timeout for the requests to the schema tables. (default: 60s)
190-
MetadataSchemaRequestTimeout time.Duration
191-
// ConnectTimeout limits the time spent during connection setup.
192-
// During initial connection setup, internal queries, AUTH requests will return an error if the client
193-
// does not receive a response within the ConnectTimeout period.
194-
// ConnectTimeout is applied to the connection setup queries independently.
195-
// ConnectTimeout also limits the duration of dialing a new TCP connection
196-
// in case there is no Dialer nor HostDialer configured.
197-
// ConnectTimeout has a default value of 11 seconds.
198-
ConnectTimeout time.Duration
199-
// Port used when dialing.
200-
// Default: 9042
201-
Port int
202-
// The size of the connection pool for each host.
203-
// The pool filling runs in separate gourutine during the session initialization phase.
204-
// gocql will always try to get 1 connection on each host pool
205-
// during session initialization AND it will attempt
206-
// to fill each pool afterward asynchronously if NumConns > 1.
207-
// Notice: There is no guarantee that pool filling will be finished in the initialization phase.
208-
// Also, it describes a maximum number of connections at the same time.
209-
// Default: 2
210-
NumConns int
211-
// The gocql driver may hold excess shard connections to reuse them when existing connections are dropped.
212-
// This configuration variable defines the limit for such excess connections. Once the limit is reached,
213-
// gocql starts dropping any additional excess connections.
214-
// The limit is computed as `MaxExcessShardConnectionsRate` * <number_of_shards>.
65+
EventBusConfig eventbus.EventBusConfig
66+
BatchObserver BatchObserver
67+
Dialer Dialer
68+
ApplicationInfo ApplicationInfo
69+
DNSResolver DNSResolver
70+
Logger StdLogger
71+
HostDialer HostDialer
72+
StreamObserver StreamObserver
73+
FrameHeaderObserver FrameHeaderObserver
74+
ConnectObserver ConnectObserver
75+
QueryObserver QueryObserver
76+
AddressTranslator AddressTranslator
77+
HostFilter HostFilter
78+
Compressor Compressor
79+
Authenticator Authenticator
80+
actualSslOpts atomic.Value
81+
PoolConfig PoolConfig
82+
RetryPolicy RetryPolicy
83+
ConvictionPolicy ConvictionPolicy
84+
ReconnectionPolicy ReconnectionPolicy
85+
InitialReconnectionPolicy ReconnectionPolicy
86+
WarningsHandlerBuilder WarningHandlerBuilder
87+
SslOpts *SslOptions
88+
AuthProvider func(h *HostInfo) (Authenticator, error)
89+
DriverVersion string
90+
DriverName string
91+
Keyspace string
92+
CQLVersion string
93+
Hosts []string
94+
WriteCoalesceWaitTime time.Duration
95+
WriteTimeout time.Duration
96+
SocketKeepalive time.Duration
97+
ReconnectInterval time.Duration
98+
MaxWaitSchemaAgreement time.Duration
99+
ProtoVersion int
100+
MaxRequestsPerConn int
101+
Timeout time.Duration
102+
MetadataSchemaRequestTimeout time.Duration
103+
ConnectTimeout time.Duration
104+
Port int
105+
NumConns int
106+
MaxPreparedStmts int
107+
PageSize int
108+
MaxRoutingKeyInfo int
109+
ReadTimeout time.Duration
215110
MaxExcessShardConnectionsRate float32
216-
// Maximum cache size for prepared statements globally for gocql.
217-
// Default: 1000
218-
MaxPreparedStmts int
219-
// Default page size to use for created sessions.
220-
// Default: 5000
221-
PageSize int
222-
// Maximum cache size for query info about statements for each session.
223-
// Default: 1000
224-
MaxRoutingKeyInfo int
225-
// ReadTimeout limits the time the driver waits for data from the connection.
226-
// It has only one purpose, identify faulty connection early and drop it.
227-
// Default: 11 Seconds
228-
ReadTimeout time.Duration
229-
// Consistency for the serial part of queries, values can be either SERIAL or LOCAL_SERIAL.
230-
// Default: unset
231-
SerialConsistency Consistency
232-
// Default consistency level.
233-
// Default: Quorum
234-
Consistency Consistency
235-
// Configure events the driver will register for
236-
Events struct {
237-
// disable registering for status events (node up/down)
111+
SerialConsistency Consistency
112+
Consistency Consistency
113+
Events struct {
238114
DisableNodeStatusEvents bool
239-
// disable registering for topology events (node added/removed/moved)
240-
DisableTopologyEvents bool
241-
// disable registering for schema events (keyspace/table/function removed/created/updated)
242-
DisableSchemaEvents bool
243-
}
244-
// Default idempotence for queries
245-
DefaultIdempotence bool
246-
// Sends a client side timestamp for all requests which overrides the timestamp at which it arrives at the server.
247-
// Default: true, only enabled for protocol 3 and above.
248-
DefaultTimestamp bool
249-
// DisableSkipMetadata will override the internal result metadata cache so that the driver does not
250-
// send skip_metadata for queries, this means that the result will always contain
251-
// the metadata to parse the rows and will not reuse the metadata from the prepared
252-
// statement.
253-
//
254-
// See https://issues.apache.org/jira/browse/CASSANDRA-10786
255-
// See https://github.com/scylladb/scylladb/issues/20860
256-
//
257-
// Default: true
258-
DisableSkipMetadata bool
259-
// DisableShardAwarePort will prevent the driver from connecting to Scylla's shard-aware port,
260-
// even if there are nodes in the cluster that support it.
261-
//
262-
// It is generally recommended to leave this option turned off because gocql can use
263-
// the shard-aware port to make the process of establishing more robust.
264-
// However, if you have a cluster with nodes which expose shard-aware port
265-
// but the port is unreachable due to network configuration issues, you can use
266-
// this option to work around the issue. Set it to true only if you neither can fix
267-
// your network nor disable shard-aware port on your nodes.
268-
DisableShardAwarePort bool
269-
// If DisableInitialHostLookup then the driver will not attempt to get host info
270-
// from the system.peers table, this will mean that the driver will connect to
271-
// hosts supplied and will not attempt to lookup the hosts information, this will
272-
// mean that data_center, rack and token information will not be available and as
273-
// such host filtering and token aware query routing will not be available.
115+
DisableTopologyEvents bool
116+
DisableSchemaEvents bool
117+
}
118+
DefaultIdempotence bool
119+
DefaultTimestamp bool
120+
DisableSkipMetadata bool
121+
DisableShardAwarePort bool
274122
DisableInitialHostLookup bool
275-
// internal config for testing
276-
disableControlConn bool
277-
disableInit bool
278-
// If IgnorePeerAddr is true and the address in system.peers does not match
279-
// the supplied host by either initial hosts or discovered via events then the
280-
// host will be replaced with the supplied address.
281-
//
282-
// For example if an event comes in with host=10.0.0.1 but when looking up that
283-
// address in system.local or system.peers returns 127.0.0.1, the peer will be
284-
// set to 10.0.0.1 which is what will be used to connect to.
285-
IgnorePeerAddr bool
123+
disableControlConn bool
124+
disableInit bool
125+
IgnorePeerAddr bool // disable registering for status events (node up/down)
126+
// disable registering for schema events (keyspace/table/function removed/created/updated)
286127
}
287128

288129
type DNSResolver interface {
@@ -365,6 +206,7 @@ type Dialer interface {
365206
// resolves to more than 1 IP address then the driver may connect multiple times to
366207
// the same host, and will not mark the node being down or up from events.
367208
func NewCluster(hosts ...string) *ClusterConfig {
209+
logger := &defaultLogger{}
368210
cfg := &ClusterConfig{
369211
Hosts: hosts,
370212
CQLVersion: "3.0.0",
@@ -392,8 +234,12 @@ func NewCluster(hosts ...string) *ClusterConfig {
392234
MetadataSchemaRequestTimeout: 60 * time.Second,
393235
DisableSkipMetadata: true,
394236
WarningsHandlerBuilder: DefaultWarningHandlerBuilder,
395-
Logger: &defaultLogger{},
237+
Logger: logger,
396238
DNSResolver: defaultDnsResolver,
239+
EventBusConfig: eventbus.EventBusConfig{
240+
InputEventsQueueSize: 10240,
241+
Logger: logger,
242+
},
397243
}
398244

399245
return cfg

control.go

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ import (
3737
"sync/atomic"
3838
"time"
3939

40+
"github.com/gocql/gocql/events"
4041
"github.com/gocql/gocql/internal/debug"
4142
frm "github.com/gocql/gocql/internal/frame"
4243
)
@@ -337,8 +338,21 @@ func (c *controlConn) setupConn(conn *Conn) error {
337338
conn: conn,
338339
host: host,
339340
}
340-
341-
c.conn.Store(ch)
341+
old, _ := c.conn.Swap(ch).(*connHost)
342+
var oldHost events.HostInfo
343+
if old != nil && old.host != nil {
344+
oldHost.HostID = old.host.HostID()
345+
oldHost.Host = old.host.ConnectAddress()
346+
oldHost.Port = old.host.Port()
347+
}
348+
c.session.publishEvent(&events.ControlConnectionRecreatedEvent{
349+
OldHost: oldHost,
350+
NewHost: events.HostInfo{
351+
HostID: host.HostID(),
352+
Host: host.ConnectAddress(),
353+
Port: host.Port(),
354+
},
355+
})
342356
if c.session.initialized() {
343357
// We connected to control conn, so add the connect the host in pool as well.
344358
// Notify session we can start trying to connect to the node.
@@ -465,6 +479,13 @@ func (c *controlConn) attemptReconnectToAnyOfHosts(hosts []*HostInfo) error {
465479
continue
466480
}
467481
conn.finalizeConnection()
482+
c.session.publishEvent(&events.ControlConnectionRecreatedEvent{
483+
NewHost: events.HostInfo{
484+
Host: host.ConnectAddress(),
485+
Port: host.Port(),
486+
HostID: host.HostID(),
487+
},
488+
})
468489
return nil
469490
}
470491
return fmt.Errorf("unable to connect to any known node: %v", hosts)

events.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import (
2929
"sync"
3030
"time"
3131

32+
"github.com/gocql/gocql/events"
3233
"github.com/gocql/gocql/internal/debug"
3334
frm "github.com/gocql/gocql/internal/frame"
3435
)
@@ -107,6 +108,14 @@ func (e *eventDebouncer) debounce(frame frame) {
107108
e.mu.Unlock()
108109
}
109110

111+
func (s *Session) publishEvent(event events.Event) {
112+
if s.eventBus == nil {
113+
return
114+
}
115+
116+
s.eventBus.PublishEvent(event)
117+
}
118+
110119
func (s *Session) handleEvent(framer *framer) {
111120
frame, err := framer.parseFrame()
112121
if err != nil {
@@ -118,6 +127,10 @@ func (s *Session) handleEvent(framer *framer) {
118127
s.logger.Printf("gocql: handling frame: %v\n", frame)
119128
}
120129

130+
if event := events.FrameToEvent(frame); event != nil {
131+
s.publishEvent(event)
132+
}
133+
121134
switch f := frame.(type) {
122135
case *frm.SchemaChangeKeyspace, *frm.SchemaChangeFunction,
123136
*frm.SchemaChangeTable, *frm.SchemaChangeAggregate, *frm.SchemaChangeType:

0 commit comments

Comments
 (0)