Skip to content
This repository was archived by the owner on Jul 21, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 69 additions & 0 deletions log/logger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package log

import (
"log"
"os"
)

type Logger interface {
Info(v ...any)
Infof(format string, v ...any)
Infoln(v ...any)

Warn(v ...any)
Warnf(format string, v ...any)
Warnln(v ...any)
}

// DefaultLogger only logs warnings and critical errors.
type DefaultLogger struct {
warn *log.Logger
}

func NewDefaultLogger() *DefaultLogger {
res := &DefaultLogger{
warn: log.New(os.Stderr, "WARNING ", log.LstdFlags),
}
return res
}

func (logger *DefaultLogger) Info(v ...any) {}
func (logger *DefaultLogger) Infof(format string, v ...any) {}
func (logger *DefaultLogger) Infoln(v ...any) {}

func (logger *DefaultLogger) Warn(v ...any) { logger.warn.Print(v...) }
func (logger *DefaultLogger) Warnf(format string, v ...any) { logger.warn.Printf(format, v...) }
func (logger *DefaultLogger) Warnln(v ...any) { logger.warn.Println(v...) }

// DebugLogger logs both warnings and information about important events in driver's runtime.
type DebugLogger struct {
info *log.Logger
warn *log.Logger
}

func NewDebugLogger() *DebugLogger {
res := &DebugLogger{
info: log.New(os.Stderr, "INFO ", log.LstdFlags),
warn: log.New(os.Stderr, "WARNING ", log.LstdFlags),
}
return res
}

func (logger *DebugLogger) Info(v ...any) { logger.info.Print(v...) }
func (logger *DebugLogger) Infof(format string, v ...any) { logger.info.Printf(format, v...) }
func (logger *DebugLogger) Infoln(v ...any) { logger.info.Println(v...) }

func (logger *DebugLogger) Warn(v ...any) { logger.warn.Print(v...) }
func (logger *DebugLogger) Warnf(format string, v ...any) { logger.warn.Printf(format, v...) }
func (logger *DebugLogger) Warnln(v ...any) { logger.warn.Println(v...) }

// NopLogger doesn't log anything.
type NopLogger struct{}

func (NopLogger) Info(v ...any) {}
func (NopLogger) Infof(format string, v ...any) {}
func (NopLogger) Infoln(v ...any) {}

func (NopLogger) Warn(v ...any) {}
func (NopLogger) Warnf(format string, v ...any) {}
func (NopLogger) Warnln(v ...any) {}
3 changes: 1 addition & 2 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package scylla
import (
"context"
"fmt"
"log"
"sync"
"time"

Expand Down Expand Up @@ -271,6 +270,6 @@ func (s *Session) NewTokenAwareDCAwarePolicy(localDC string) transport.HostSelec
}

func (s *Session) Close() {
log.Println("session: close")
s.cfg.Logger.Info("session: close")
s.cluster.Close()
}
7 changes: 6 additions & 1 deletion session_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,18 @@ import (

"github.com/scylladb/scylla-go-driver/frame"
"github.com/scylladb/scylla-go-driver/frame/response"
"github.com/scylladb/scylla-go-driver/log"
"github.com/scylladb/scylla-go-driver/transport"
"go.uber.org/goleak"
)

const TestHost = "192.168.100.100"

var testingSessionConfig = DefaultSessionConfig("mykeyspace", TestHost)
var testingSessionConfig = func() SessionConfig {
cfg := DefaultSessionConfig("mykeyspace", TestHost)
cfg.Logger = log.NewDebugLogger()
return cfg
}()

func initKeyspace(ctx context.Context, t testing.TB) {
t.Helper()
Expand Down
37 changes: 18 additions & 19 deletions transport/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package transport
import (
"context"
"fmt"
"log"
"net"
"sort"
"strconv"
Expand Down Expand Up @@ -160,7 +159,7 @@ func NewCluster(ctx context.Context, cfg ConnConfig, p HostSelectionPolicy, e []
}

func (c *Cluster) NewControl(ctx context.Context) (*Conn, error) {
log.Printf("cluster: open control connection")
c.cfg.Logger.Info("cluster: open control connection")
var errs []string
for addr := range c.knownHosts {
conn, err := OpenConn(ctx, addr, nil, c.cfg)
Expand All @@ -184,7 +183,7 @@ func (c *Cluster) NewControl(ctx context.Context) (*Conn, error) {
// refreshTopology creates new topology filled with the result of keyspaceQuery, localQuery and peerQuery.
// Old topology is replaced with the new one atomically to prevent dirty reads.
func (c *Cluster) refreshTopology(ctx context.Context) error {
log.Printf("cluster: refresh topology")
c.cfg.Logger.Infoln("cluster: refresh topology")
rows, err := c.getAllNodesInfo(ctx)
if err != nil {
return fmt.Errorf("query info about nodes in cluster: %w", err)
Expand Down Expand Up @@ -236,9 +235,9 @@ func (c *Cluster) refreshTopology(ctx context.Context) error {
}

if ks, ok := t.keyspaces[c.cfg.Keyspace]; ok {
t.policyInfo.Preprocess(t, ks)
t.policyInfo.Preprocess(t, ks, c.cfg.Logger)
} else {
t.policyInfo.Preprocess(t, keyspace{})
t.policyInfo.Preprocess(t, keyspace{}, c.cfg.Logger)
}

c.setTopology(t)
Expand Down Expand Up @@ -450,7 +449,7 @@ func (c *Cluster) setTopology(t *topology) {
// of registering handlers for them.
func (c *Cluster) handleEvent(ctx context.Context, r response) {
if r.Err != nil {
log.Printf("cluster: received event with error: %v", r.Err)
c.cfg.Logger.Infoln("cluster: received event with error: %v", r.Err)
c.RequestReopenControl()
return
}
Expand All @@ -462,17 +461,17 @@ func (c *Cluster) handleEvent(ctx context.Context, r response) {
case *SchemaChange:
// TODO: add schema change.
default:
log.Printf("cluster: unsupported event type: %v", r.Response)
c.cfg.Logger.Warnf("cluster: unsupported event type: %v", r.Response)
}
}

func (c *Cluster) handleTopologyChange(v *TopologyChange) {
log.Printf("cluster: handle topology change: %+#v", v)
c.cfg.Logger.Infof("cluster: handle topology change: %+#v", v)
c.RequestRefresh()
}

func (c *Cluster) handleStatusChange(ctx context.Context, v *StatusChange) {
log.Printf("cluster: handle status change: %+#v", v)
c.cfg.Logger.Infof("cluster: handle status change: %+#v", v)
m := c.Topology().peers
addr := v.Address.String()
if n, ok := m[addr]; ok {
Expand All @@ -482,10 +481,10 @@ func (c *Cluster) handleStatusChange(ctx context.Context, v *StatusChange) {
case frame.Down:
n.setStatus(statusDown)
default:
log.Printf("cluster: status change not supported: %+#v", v)
c.cfg.Logger.Warnf("cluster: status change not supported: %+#v", v)
}
} else {
log.Printf("cluster: unknown node %s received status change: %+#v in topology %v", addr, v, m)
c.cfg.Logger.Infof("cluster: unknown node %s received status change: %+#v in topology %v, requesting topology refresh", addr, v, m)
c.RequestRefresh()
}
}
Expand All @@ -503,7 +502,7 @@ func (c *Cluster) loop(ctx context.Context) {
case <-c.reopenControlChan:
c.tryReopenControl(ctx)
case <-ctx.Done():
log.Printf("cluster closing due to: %v", ctx.Err())
c.cfg.Logger.Infof("cluster closing due to: %v", ctx.Err())
c.handleClose()
return
case <-c.closeChan:
Expand All @@ -523,17 +522,17 @@ func (c *Cluster) tryRefresh(ctx context.Context) {
if err := c.refreshTopology(ctx); err != nil {
c.RequestReopenControl()
time.AfterFunc(tryRefreshInterval, c.RequestRefresh)
log.Printf("cluster: refresh topology: %v", err)
c.cfg.Logger.Infof("cluster: refresh topology: %v", err)
}
}

const tryReopenControlInterval = time.Second

func (c *Cluster) tryReopenControl(ctx context.Context) {
log.Printf("cluster: reopen control connection")
c.cfg.Logger.Infoln("cluster: reopen control connection")
if control, err := c.NewControl(ctx); err != nil {
time.AfterFunc(tryReopenControlInterval, c.RequestReopenControl)
log.Printf("cluster: failed to reopen control connection: %v", err)
c.cfg.Logger.Infof("cluster: failed to reopen control connection: %v", err)
} else {
c.control.Close()
c.control = control
Expand All @@ -542,7 +541,7 @@ func (c *Cluster) tryReopenControl(ctx context.Context) {
}

func (c *Cluster) handleClose() {
log.Printf("cluster: handle cluster close")
c.cfg.Logger.Infoln("cluster: handle cluster close")
c.control.Close()
m := c.Topology().peers
for _, n := range m {
Expand All @@ -551,23 +550,23 @@ func (c *Cluster) handleClose() {
}

func (c *Cluster) RequestRefresh() {
log.Printf("cluster: requested to refresh cluster topology")
c.cfg.Logger.Infoln("cluster: requested to refresh cluster topology")
select {
case c.refreshChan <- struct{}{}:
default:
}
}

func (c *Cluster) RequestReopenControl() {
log.Printf("cluster: requested to reopen control connection")
c.cfg.Logger.Infoln("cluster: requested to reopen control connection")
select {
case c.reopenControlChan <- struct{}{}:
default:
}
}

func (c *Cluster) Close() {
log.Printf("cluster: requested to close cluster")
c.cfg.Logger.Infoln("cluster: requested to close cluster")
select {
case c.closeChan <- struct{}{}:
default:
Expand Down
Loading