Skip to content

Commit b7b796e

Browse files
committed
Add event bus
Create a bus that will allow consume different events from the Session. Targeted event types: 1. Cluster events that comes from Control connection, like: alter schema, topology change, node status change 2. Session events: Control Connection being recreated Session API should allow to subscribe to events, and unsubscribe, it should allow event filtering.
1 parent c2337f9 commit b7b796e

14 files changed

+2311
-42
lines changed

cluster.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ import (
3737
"time"
3838

3939
"github.com/gocql/gocql/internal/debug"
40+
"github.com/gocql/gocql/internal/eventbus"
4041
)
4142

4243
const defaultDriverName = "ScyllaDB GoCQL Driver"
@@ -283,6 +284,8 @@ type ClusterConfig struct {
283284
// address in system.local or system.peers returns 127.0.0.1, the peer will be
284285
// set to 10.0.0.1 which is what will be used to connect to.
285286
IgnorePeerAddr bool
287+
// An event bus configuration
288+
EventBusConfig eventbus.EventBusConfig
286289
}
287290

288291
type DNSResolver interface {
@@ -365,6 +368,7 @@ type Dialer interface {
365368
// resolves to more than 1 IP address then the driver may connect multiple times to
366369
// the same host, and will not mark the node being down or up from events.
367370
func NewCluster(hosts ...string) *ClusterConfig {
371+
logger := &defaultLogger{}
368372
cfg := &ClusterConfig{
369373
Hosts: hosts,
370374
CQLVersion: "3.0.0",
@@ -392,8 +396,11 @@ func NewCluster(hosts ...string) *ClusterConfig {
392396
MetadataSchemaRequestTimeout: 60 * time.Second,
393397
DisableSkipMetadata: true,
394398
WarningsHandlerBuilder: DefaultWarningHandlerBuilder,
395-
Logger: &defaultLogger{},
399+
Logger: logger,
396400
DNSResolver: defaultDnsResolver,
401+
EventBusConfig: eventbus.EventBusConfig{
402+
InputEventsQueueSize: 10240,
403+
},
397404
}
398405

399406
return cfg

control.go

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ import (
3737
"sync/atomic"
3838
"time"
3939

40+
"github.com/gocql/gocql/events"
4041
"github.com/gocql/gocql/internal/debug"
4142
frm "github.com/gocql/gocql/internal/frame"
4243
)
@@ -337,8 +338,21 @@ func (c *controlConn) setupConn(conn *Conn) error {
337338
conn: conn,
338339
host: host,
339340
}
340-
341-
c.conn.Store(ch)
341+
old, _ := c.conn.Swap(ch).(*connHost)
342+
var oldHost events.HostInfo
343+
if old != nil && old.host != nil {
344+
oldHost.HostID = old.host.HostID()
345+
oldHost.Host = old.host.ConnectAddress()
346+
oldHost.Port = old.host.Port()
347+
}
348+
c.session.publishEvent(&events.ControlConnectionRecreatedEvent{
349+
OldHost: oldHost,
350+
NewHost: events.HostInfo{
351+
HostID: host.HostID(),
352+
Host: host.ConnectAddress(),
353+
Port: host.Port(),
354+
},
355+
})
342356
if c.session.initialized() {
343357
// We connected to control conn, so add the connect the host in pool as well.
344358
// Notify session we can start trying to connect to the node.
@@ -465,6 +479,13 @@ func (c *controlConn) attemptReconnectToAnyOfHosts(hosts []*HostInfo) error {
465479
continue
466480
}
467481
conn.finalizeConnection()
482+
c.session.publishEvent(&events.ControlConnectionRecreatedEvent{
483+
NewHost: events.HostInfo{
484+
Host: host.ConnectAddress(),
485+
Port: host.Port(),
486+
HostID: host.HostID(),
487+
},
488+
})
468489
return nil
469490
}
470491
return fmt.Errorf("unable to connect to any known node: %v", hosts)

events.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import (
2929
"sync"
3030
"time"
3131

32+
"github.com/gocql/gocql/events"
3233
"github.com/gocql/gocql/internal/debug"
3334
frm "github.com/gocql/gocql/internal/frame"
3435
)
@@ -107,6 +108,16 @@ func (e *eventDebouncer) debounce(frame frame) {
107108
e.mu.Unlock()
108109
}
109110

111+
func (s *Session) publishEvent(event events.Event) {
112+
if s.eventBus == nil {
113+
return
114+
}
115+
116+
if !s.eventBus.PublishEvent(event) {
117+
s.logger.Printf("can't publish event, eventbus is full, increase Cluster.EventBusConfig.InputEventsQueueSize; event is dropped")
118+
}
119+
}
120+
110121
func (s *Session) handleEvent(framer *framer) {
111122
frame, err := framer.parseFrame()
112123
if err != nil {
@@ -118,6 +129,10 @@ func (s *Session) handleEvent(framer *framer) {
118129
s.logger.Printf("gocql: handling frame: %v\n", frame)
119130
}
120131

132+
if event := events.FrameToEvent(frame); event != nil {
133+
s.publishEvent(event)
134+
}
135+
121136
switch f := frame.(type) {
122137
case *frm.SchemaChangeKeyspace, *frm.SchemaChangeFunction,
123138
*frm.SchemaChangeTable, *frm.SchemaChangeAggregate, *frm.SchemaChangeType:

events/event_converter.go

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package events
20+
21+
import (
22+
frm "github.com/gocql/gocql/internal/frame"
23+
)
24+
25+
// FrameToEvent converts an internal frame to a public Event interface.
26+
// This function has access to internal frame types and can perform
27+
// type-safe conversions.
28+
// Returns nil if the frame is not an event frame.
29+
func FrameToEvent(f interface{}) Event {
30+
if f == nil {
31+
return nil
32+
}
33+
34+
switch frame := f.(type) {
35+
case *frm.TopologyChangeEventFrame:
36+
return &TopologyChangeEvent{
37+
Change: frame.Change,
38+
Host: frame.Host,
39+
Port: frame.Port,
40+
}
41+
42+
case *frm.StatusChangeEventFrame:
43+
return &StatusChangeEvent{
44+
Change: frame.Change,
45+
Host: frame.Host,
46+
Port: frame.Port,
47+
}
48+
49+
case *frm.SchemaChangeKeyspace:
50+
return &SchemaChangeKeyspaceEvent{
51+
Change: frame.Change,
52+
Keyspace: frame.Keyspace,
53+
}
54+
55+
case *frm.SchemaChangeTable:
56+
return &SchemaChangeTableEvent{
57+
Change: frame.Change,
58+
Keyspace: frame.Keyspace,
59+
Table: frame.Object,
60+
}
61+
62+
case *frm.SchemaChangeType:
63+
return &SchemaChangeTypeEvent{
64+
Change: frame.Change,
65+
Keyspace: frame.Keyspace,
66+
TypeName: frame.Object,
67+
}
68+
69+
case *frm.SchemaChangeFunction:
70+
return &SchemaChangeFunctionEvent{
71+
Change: frame.Change,
72+
Keyspace: frame.Keyspace,
73+
Function: frame.Name,
74+
Arguments: frame.Args,
75+
}
76+
77+
case *frm.SchemaChangeAggregate:
78+
return &SchemaChangeAggregateEvent{
79+
Change: frame.Change,
80+
Keyspace: frame.Keyspace,
81+
Aggregate: frame.Name,
82+
Arguments: frame.Args,
83+
}
84+
85+
default:
86+
return nil
87+
}
88+
}

0 commit comments

Comments
 (0)