Skip to content
This repository was archived by the owner on Jul 21, 2025. It is now read-only.

Commit bcb363a

Browse files
committed
session: allow log configuration using Logger interface.
The driver now provides a Logger interface allowing to supply a custom logger as part of (Session)ConnConfig. Driver also provides three implementations of Logger by itself: - DefaultLogger, logging only warnings - DebugLogger, logging everything that can be useful for debugging, it is used by default in tests - NopLogger, logging nothing Fixes #271
1 parent 885964b commit bcb363a

File tree

11 files changed

+167
-72
lines changed

11 files changed

+167
-72
lines changed

session.go

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package scylla
33
import (
44
"context"
55
"fmt"
6-
"log"
76
"sync"
87
"time"
98

@@ -66,6 +65,16 @@ var (
6665
Lz4 Compression = frame.Lz4
6766
)
6867

68+
type Logger = transport.Logger
69+
70+
func NewDefaultLogger() Logger {
71+
return transport.NewDefaultLogger()
72+
}
73+
74+
func NewDebugLogger() Logger {
75+
return transport.NewDebugLogger()
76+
}
77+
6978
type SessionConfig struct {
7079
Hosts []string
7180
Events []EventType
@@ -271,6 +280,6 @@ func (s *Session) NewTokenAwareDCAwarePolicy(localDC string) transport.HostSelec
271280
}
272281

273282
func (s *Session) Close() {
274-
log.Println("session: close")
283+
s.cfg.Logger.Info("session: close")
275284
s.cluster.Close()
276285
}

session_integration_test.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,11 @@ import (
2424

2525
const TestHost = "192.168.100.100"
2626

27-
var testingSessionConfig = DefaultSessionConfig("mykeyspace", TestHost)
27+
var testingSessionConfig = func() SessionConfig {
28+
cfg := DefaultSessionConfig("mykeyspace", TestHost)
29+
cfg.Logger = NewDebugLogger()
30+
return cfg
31+
}()
2832

2933
func initKeyspace(ctx context.Context, t testing.TB) {
3034
t.Helper()

transport/cluster.go

Lines changed: 18 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package transport
33
import (
44
"context"
55
"fmt"
6-
"log"
76
"net"
87
"sort"
98
"strconv"
@@ -160,7 +159,7 @@ func NewCluster(ctx context.Context, cfg ConnConfig, p HostSelectionPolicy, e []
160159
}
161160

162161
func (c *Cluster) NewControl(ctx context.Context) (*Conn, error) {
163-
log.Printf("cluster: open control connection")
162+
c.cfg.Logger.Info("cluster: open control connection")
164163
var errs []string
165164
for addr := range c.knownHosts {
166165
conn, err := OpenConn(ctx, addr, nil, c.cfg)
@@ -184,7 +183,7 @@ func (c *Cluster) NewControl(ctx context.Context) (*Conn, error) {
184183
// refreshTopology creates new topology filled with the result of keyspaceQuery, localQuery and peerQuery.
185184
// Old topology is replaced with the new one atomically to prevent dirty reads.
186185
func (c *Cluster) refreshTopology(ctx context.Context) error {
187-
log.Printf("cluster: refresh topology")
186+
c.cfg.Logger.Infoln("cluster: refresh topology")
188187
rows, err := c.getAllNodesInfo(ctx)
189188
if err != nil {
190189
return fmt.Errorf("query info about nodes in cluster: %w", err)
@@ -242,9 +241,9 @@ func (c *Cluster) refreshTopology(ctx context.Context) error {
242241
}
243242

244243
if ks, ok := t.keyspaces[c.cfg.Keyspace]; ok {
245-
t.policyInfo.Preprocess(t, ks)
244+
t.policyInfo.Preprocess(t, ks, c.cfg.Logger)
246245
} else {
247-
t.policyInfo.Preprocess(t, keyspace{})
246+
t.policyInfo.Preprocess(t, keyspace{}, c.cfg.Logger)
248247
}
249248

250249
c.setTopology(t)
@@ -455,7 +454,7 @@ func (c *Cluster) setTopology(t *topology) {
455454
// of registering handlers for them.
456455
func (c *Cluster) handleEvent(r response) {
457456
if r.Err != nil {
458-
log.Printf("cluster: received event with error: %v", r.Err)
457+
c.cfg.Logger.Infoln("cluster: received event with error: %v", r.Err)
459458
c.RequestReopenControl()
460459
return
461460
}
@@ -467,17 +466,17 @@ func (c *Cluster) handleEvent(r response) {
467466
case *SchemaChange:
468467
// TODO: add schema change.
469468
default:
470-
log.Printf("cluster: unsupported event type: %v", r.Response)
469+
c.cfg.Logger.Warnf("cluster: unsupported event type: %v", r.Response)
471470
}
472471
}
473472

474473
func (c *Cluster) handleTopologyChange(v *TopologyChange) {
475-
log.Printf("cluster: handle topology change: %+#v", v)
474+
c.cfg.Logger.Infof("cluster: handle topology change: %+#v", v)
476475
c.RequestRefresh()
477476
}
478477

479478
func (c *Cluster) handleStatusChange(v *StatusChange) {
480-
log.Printf("cluster: handle status change: %+#v", v)
479+
c.cfg.Logger.Infof("cluster: handle status change: %+#v", v)
481480
m := c.Topology().peers
482481
addr := v.Address.String()
483482
if n, ok := m[addr]; ok {
@@ -487,10 +486,10 @@ func (c *Cluster) handleStatusChange(v *StatusChange) {
487486
case frame.Down:
488487
n.setStatus(statusDown)
489488
default:
490-
log.Printf("cluster: status change not supported: %+#v", v)
489+
c.cfg.Logger.Warnf("cluster: status change not supported: %+#v", v)
491490
}
492491
} else {
493-
log.Printf("cluster: unknown node %s received status change: %+#v in topology %v", addr, v, m)
492+
c.cfg.Logger.Infof("cluster: unknown node %s received status change: %+#v in topology %v, requesting topology refresh", addr, v, m)
494493
c.RequestRefresh()
495494
}
496495
}
@@ -508,7 +507,7 @@ func (c *Cluster) loop(ctx context.Context) {
508507
case <-c.reopenControlChan:
509508
c.tryReopenControl(ctx)
510509
case <-ctx.Done():
511-
log.Printf("cluster closing due to: %v", ctx.Err())
510+
c.cfg.Logger.Infof("cluster closing due to: %v", ctx.Err())
512511
c.handleClose()
513512
return
514513
case <-c.closeChan:
@@ -528,17 +527,17 @@ func (c *Cluster) tryRefresh(ctx context.Context) {
528527
if err := c.refreshTopology(ctx); err != nil {
529528
c.RequestReopenControl()
530529
time.AfterFunc(tryRefreshInterval, c.RequestRefresh)
531-
log.Printf("cluster: refresh topology: %v", err)
530+
c.cfg.Logger.Infof("cluster: refresh topology: %v", err)
532531
}
533532
}
534533

535534
const tryReopenControlInterval = time.Second
536535

537536
func (c *Cluster) tryReopenControl(ctx context.Context) {
538-
log.Printf("cluster: reopen control connection")
537+
c.cfg.Logger.Infoln("cluster: reopen control connection")
539538
if control, err := c.NewControl(ctx); err != nil {
540539
time.AfterFunc(tryReopenControlInterval, c.RequestReopenControl)
541-
log.Printf("cluster: failed to reopen control connection: %v", err)
540+
c.cfg.Logger.Infof("cluster: failed to reopen control connection: %v", err)
542541
} else {
543542
c.control.Close()
544543
c.control = control
@@ -547,7 +546,7 @@ func (c *Cluster) tryReopenControl(ctx context.Context) {
547546
}
548547

549548
func (c *Cluster) handleClose() {
550-
log.Printf("cluster: handle cluster close")
549+
c.cfg.Logger.Infoln("cluster: handle cluster close")
551550
c.control.Close()
552551
m := c.Topology().peers
553552
for _, v := range m {
@@ -558,23 +557,23 @@ func (c *Cluster) handleClose() {
558557
}
559558

560559
func (c *Cluster) RequestRefresh() {
561-
log.Printf("cluster: requested to refresh cluster topology")
560+
c.cfg.Logger.Infoln("cluster: requested to refresh cluster topology")
562561
select {
563562
case c.refreshChan <- struct{}{}:
564563
default:
565564
}
566565
}
567566

568567
func (c *Cluster) RequestReopenControl() {
569-
log.Printf("cluster: requested to reopen control connection")
568+
c.cfg.Logger.Infoln("cluster: requested to reopen control connection")
570569
select {
571570
case c.reopenControlChan <- struct{}{}:
572571
default:
573572
}
574573
}
575574

576575
func (c *Cluster) Close() {
577-
log.Printf("cluster: requested to close cluster")
576+
c.cfg.Logger.Infoln("cluster: requested to close cluster")
578577
select {
579578
case c.closeChan <- struct{}{}:
580579
default:

transport/conn.go

Lines changed: 22 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ type connWriter struct {
7171

7272
// For use only when skipping sending a request.
7373
freeStream func(frame.StreamID)
74+
log Logger
7475
}
7576

7677
func (c *connWriter) submit(r request) {
@@ -105,14 +106,14 @@ func (c *connWriter) loop(ctx context.Context) {
105106
c.freeStream(r.StreamID)
106107
continue
107108
}
108-
log.Printf("%s fatal send error, closing connection due to %s", c.connString(), err)
109+
c.log.Infof("%s fatal send error, closing connection due to %s", c.connString(), err)
109110
c.connClose()
110111
return
111112
}
112113
c.stats.inFlight.Inc()
113114
}
114115
if err := c.conn.Flush(); err != nil {
115-
log.Printf("%s fatal flush error, closing connection due to %s", c.connString(), err)
116+
c.log.Infof("%s fatal flush error, closing connection due to %s", c.connString(), err)
116117
c.connClose()
117118
return
118119
}
@@ -172,6 +173,8 @@ type connReader struct {
172173
s streamIDAllocator
173174
closed bool
174175
mu sync.Mutex // mu guards h, s and closed
176+
177+
log Logger
175178
}
176179

177180
func (c *connReader) setHandler(h ResponseHandler) (frame.StreamID, error) {
@@ -221,7 +224,7 @@ func (c *connReader) loop() {
221224
}
222225

223226
if resp.Err != nil {
224-
log.Printf("%s fatal receive error, closing connection due to %s", c.connString(), resp.Err)
227+
c.log.Infof("%s fatal receive error, closing connection due to %s", c.connString(), resp.Err)
225228
c.connClose()
226229
c.drainHandlers()
227230
return
@@ -232,7 +235,7 @@ func (c *connReader) loop() {
232235
if h := c.handler(resp.StreamID); h != nil {
233236
h <- resp
234237
} else {
235-
log.Printf("%s received unknown stream ID %d, closing connection", c.connString(), resp.StreamID)
238+
c.log.Warnf("%s received unknown stream ID %d, closing connection", c.connString(), resp.StreamID)
236239
c.connClose()
237240
c.drainHandlers()
238241
return
@@ -347,9 +350,11 @@ type ConnConfig struct {
347350
ComprBufferSize int
348351

349352
ConnObserver ConnObserver
353+
Logger Logger
350354
}
351355

352356
func DefaultConnConfig(keyspace string) ConnConfig {
357+
logger := NewDefaultLogger()
353358
return ConnConfig{
354359
Username: "cassandra",
355360
Password: "cassandra",
@@ -358,8 +363,9 @@ func DefaultConnConfig(keyspace string) ConnConfig {
358363
Timeout: 500 * time.Millisecond,
359364
DefaultConsistency: frame.LOCALQUORUM,
360365
DefaultPort: "9042",
361-
ConnObserver: LoggingConnObserver{},
366+
ConnObserver: LoggingConnObserver{logger},
362367
ComprBufferSize: comprBufferSize,
368+
Logger: logger,
363369
}
364370
}
365371

@@ -378,7 +384,7 @@ func OpenShardConn(ctx context.Context, addr string, si ShardInfo, cfg ConnConfi
378384
for i := 0; i < maxTries; i++ {
379385
conn, err := OpenLocalPortConn(ctx, addr, it(), cfg)
380386
if err != nil {
381-
log.Printf("%s dial error: %s (try %d/%d)", addr, err, i, maxTries)
387+
cfg.Logger.Infof("%s dial error: %s (try %d/%d)", addr, err, i, maxTries)
382388
if conn != nil {
383389
conn.Close()
384390
}
@@ -422,7 +428,7 @@ func OpenConn(ctx context.Context, addr string, localAddr *net.TCPAddr, cfg Conn
422428
}
423429

424430
if cfg.TLSConfig != nil {
425-
tConn, err := WrapTLS(ctx, tcpConn, cfg.TLSConfig)
431+
tConn, err := WrapTLS(ctx, tcpConn, cfg)
426432
if err != nil {
427433
return nil, err
428434
}
@@ -433,14 +439,14 @@ func OpenConn(ctx context.Context, addr string, localAddr *net.TCPAddr, cfg Conn
433439
return WrapConn(ctx, tcpConn, cfg)
434440
}
435441

436-
func WrapTLS(ctx context.Context, conn *net.TCPConn, cfg *tls.Config) (net.Conn, error) {
437-
cfg = cfg.Clone()
438-
tconn := tls.Client(conn, cfg)
442+
func WrapTLS(ctx context.Context, conn *net.TCPConn, cfg ConnConfig) (net.Conn, error) {
443+
tlsConfig := cfg.TLSConfig.Clone()
444+
tconn := tls.Client(conn, tlsConfig)
439445
if err := tconn.HandshakeContext(ctx); err != nil {
440446
if err := tconn.Close(); err != nil {
441-
log.Printf("%s failed to close: %s", tconn.RemoteAddr(), err)
447+
cfg.Logger.Warnf("%s failed to close: %s", tconn.RemoteAddr(), err)
442448
} else {
443-
log.Printf("%s closed", tconn.RemoteAddr())
449+
cfg.Logger.Infof("%s closed", tconn.RemoteAddr())
444450
}
445451

446452
return nil, err
@@ -467,6 +473,7 @@ func WrapConn(ctx context.Context, conn net.Conn, cfg ConnConfig) (*Conn, error)
467473
stats: s,
468474
connString: c.String,
469475
connClose: c.Close,
476+
log: cfg.Logger,
470477
},
471478
r: connReader{
472479
conn: io.LimitedReader{
@@ -476,6 +483,7 @@ func WrapConn(ctx context.Context, conn net.Conn, cfg ConnConfig) (*Conn, error)
476483
h: make(map[frame.StreamID]ResponseHandler),
477484
connString: c.String,
478485
connClose: c.Close,
486+
log: cfg.Logger,
479487
},
480488
stats: s,
481489
}
@@ -793,9 +801,9 @@ func (c *Conn) Shard() int {
793801
func (c *Conn) Close() {
794802
c.closeOnce.Do(func() {
795803
if err := c.conn.Close(); err != nil {
796-
log.Printf("%s failed to close: %s", c, err)
804+
c.cfg.Logger.Warnf("%s failed to close: %s", c, err)
797805
} else {
798-
log.Printf("%s closed", c)
806+
c.cfg.Logger.Infof("%s closed", c)
799807
}
800808
c.w.requestCh <- _connCloseRequest
801809
if c.onClose != nil {

transport/conn_integration_test.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,12 @@ import (
1818
"github.com/google/go-cmp/cmp"
1919
)
2020

21+
var testingConnConfig = func() ConnConfig {
22+
cfg := DefaultConnConfig("")
23+
cfg.Logger = NewDebugLogger()
24+
return cfg
25+
}()
26+
2127
func TestOpenShardConnIntegration(t *testing.T) {
2228
ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGABRT, syscall.SIGTERM)
2329
defer cancel()
@@ -28,7 +34,7 @@ func TestOpenShardConnIntegration(t *testing.T) {
2834

2935
for i := uint16(0); i < si.NrShards; i++ {
3036
si.Shard = i
31-
c, err := OpenShardConn(ctx, TestHost+":19042", si, DefaultConnConfig(""))
37+
c, err := OpenShardConn(ctx, TestHost+":19042", si, testingConnConfig)
3238
if err != nil {
3339
t.Fatal(err)
3440
}
@@ -232,7 +238,7 @@ func TestCompressionIntegration(t *testing.T) {
232238
func testCompression(ctx context.Context, t *testing.T, c frame.Compression, toSend []byte) {
233239
t.Helper()
234240

235-
cfg := DefaultConnConfig("")
241+
cfg := testingConnConfig
236242
cfg.Compression = c
237243
conn, err := OpenConn(ctx, TestHost, nil, cfg)
238244
if err != nil {

0 commit comments

Comments
 (0)