From 29c8c44467c5ce9d2214bbe70ed326088c74e268 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Putra?= Date: Wed, 31 Aug 2022 15:14:14 +0200 Subject: [PATCH] session: allow log configuration using Logger interface. The driver now provides a Logger interface allowing to supply a custom logger as part of (Session)ConnConfig. Driver also provides three implementations of Logger by itself: - DefaultLogger, logging only warnings - DebugLogger, logging everything that can be useful for debugging, it is used by default in tests - NopLogger, logging nothing Fixes #271 --- log/logger.go | 69 ++++++++++++++++++++++++++++++ session.go | 3 +- session_integration_test.go | 7 ++- transport/cluster.go | 37 ++++++++-------- transport/conn.go | 40 ++++++++++------- transport/conn_integration_test.go | 19 +++++--- transport/node.go | 3 +- transport/observer.go | 13 +++--- transport/policy.go | 7 +-- transport/policy_test.go | 5 ++- transport/pool.go | 2 +- transport/pool_integration_test.go | 2 +- 12 files changed, 148 insertions(+), 59 deletions(-) create mode 100644 log/logger.go diff --git a/log/logger.go b/log/logger.go new file mode 100644 index 00000000..81f72482 --- /dev/null +++ b/log/logger.go @@ -0,0 +1,69 @@ +package log + +import ( + "log" + "os" +) + +type Logger interface { + Info(v ...any) + Infof(format string, v ...any) + Infoln(v ...any) + + Warn(v ...any) + Warnf(format string, v ...any) + Warnln(v ...any) +} + +// DefaultLogger only logs warnings and critical errors. +type DefaultLogger struct { + warn *log.Logger +} + +func NewDefaultLogger() *DefaultLogger { + res := &DefaultLogger{ + warn: log.New(os.Stderr, "WARNING ", log.LstdFlags), + } + return res +} + +func (logger *DefaultLogger) Info(v ...any) {} +func (logger *DefaultLogger) Infof(format string, v ...any) {} +func (logger *DefaultLogger) Infoln(v ...any) {} + +func (logger *DefaultLogger) Warn(v ...any) { logger.warn.Print(v...) } +func (logger *DefaultLogger) Warnf(format string, v ...any) { logger.warn.Printf(format, v...) } +func (logger *DefaultLogger) Warnln(v ...any) { logger.warn.Println(v...) } + +// DebugLogger logs both warnings and information about important events in driver's runtime. +type DebugLogger struct { + info *log.Logger + warn *log.Logger +} + +func NewDebugLogger() *DebugLogger { + res := &DebugLogger{ + info: log.New(os.Stderr, "INFO ", log.LstdFlags), + warn: log.New(os.Stderr, "WARNING ", log.LstdFlags), + } + return res +} + +func (logger *DebugLogger) Info(v ...any) { logger.info.Print(v...) } +func (logger *DebugLogger) Infof(format string, v ...any) { logger.info.Printf(format, v...) } +func (logger *DebugLogger) Infoln(v ...any) { logger.info.Println(v...) } + +func (logger *DebugLogger) Warn(v ...any) { logger.warn.Print(v...) } +func (logger *DebugLogger) Warnf(format string, v ...any) { logger.warn.Printf(format, v...) } +func (logger *DebugLogger) Warnln(v ...any) { logger.warn.Println(v...) } + +// NopLogger doesn't log anything. +type NopLogger struct{} + +func (NopLogger) Info(v ...any) {} +func (NopLogger) Infof(format string, v ...any) {} +func (NopLogger) Infoln(v ...any) {} + +func (NopLogger) Warn(v ...any) {} +func (NopLogger) Warnf(format string, v ...any) {} +func (NopLogger) Warnln(v ...any) {} diff --git a/session.go b/session.go index 21a50425..3ded4165 100644 --- a/session.go +++ b/session.go @@ -3,7 +3,6 @@ package scylla import ( "context" "fmt" - "log" "sync" "time" @@ -271,6 +270,6 @@ func (s *Session) NewTokenAwareDCAwarePolicy(localDC string) transport.HostSelec } func (s *Session) Close() { - log.Println("session: close") + s.cfg.Logger.Info("session: close") s.cluster.Close() } diff --git a/session_integration_test.go b/session_integration_test.go index 626e6f1b..9b6c4f41 100644 --- a/session_integration_test.go +++ b/session_integration_test.go @@ -18,13 +18,18 @@ import ( "github.com/scylladb/scylla-go-driver/frame" "github.com/scylladb/scylla-go-driver/frame/response" + "github.com/scylladb/scylla-go-driver/log" "github.com/scylladb/scylla-go-driver/transport" "go.uber.org/goleak" ) const TestHost = "192.168.100.100" -var testingSessionConfig = DefaultSessionConfig("mykeyspace", TestHost) +var testingSessionConfig = func() SessionConfig { + cfg := DefaultSessionConfig("mykeyspace", TestHost) + cfg.Logger = log.NewDebugLogger() + return cfg +}() func initKeyspace(ctx context.Context, t testing.TB) { t.Helper() diff --git a/transport/cluster.go b/transport/cluster.go index 022a22bd..15f94530 100644 --- a/transport/cluster.go +++ b/transport/cluster.go @@ -3,7 +3,6 @@ package transport import ( "context" "fmt" - "log" "net" "sort" "strconv" @@ -160,7 +159,7 @@ func NewCluster(ctx context.Context, cfg ConnConfig, p HostSelectionPolicy, e [] } func (c *Cluster) NewControl(ctx context.Context) (*Conn, error) { - log.Printf("cluster: open control connection") + c.cfg.Logger.Info("cluster: open control connection") var errs []string for addr := range c.knownHosts { conn, err := OpenConn(ctx, addr, nil, c.cfg) @@ -184,7 +183,7 @@ func (c *Cluster) NewControl(ctx context.Context) (*Conn, error) { // refreshTopology creates new topology filled with the result of keyspaceQuery, localQuery and peerQuery. // Old topology is replaced with the new one atomically to prevent dirty reads. func (c *Cluster) refreshTopology(ctx context.Context) error { - log.Printf("cluster: refresh topology") + c.cfg.Logger.Infoln("cluster: refresh topology") rows, err := c.getAllNodesInfo(ctx) if err != nil { return fmt.Errorf("query info about nodes in cluster: %w", err) @@ -236,9 +235,9 @@ func (c *Cluster) refreshTopology(ctx context.Context) error { } if ks, ok := t.keyspaces[c.cfg.Keyspace]; ok { - t.policyInfo.Preprocess(t, ks) + t.policyInfo.Preprocess(t, ks, c.cfg.Logger) } else { - t.policyInfo.Preprocess(t, keyspace{}) + t.policyInfo.Preprocess(t, keyspace{}, c.cfg.Logger) } c.setTopology(t) @@ -450,7 +449,7 @@ func (c *Cluster) setTopology(t *topology) { // of registering handlers for them. func (c *Cluster) handleEvent(ctx context.Context, r response) { if r.Err != nil { - log.Printf("cluster: received event with error: %v", r.Err) + c.cfg.Logger.Infoln("cluster: received event with error: %v", r.Err) c.RequestReopenControl() return } @@ -462,17 +461,17 @@ func (c *Cluster) handleEvent(ctx context.Context, r response) { case *SchemaChange: // TODO: add schema change. default: - log.Printf("cluster: unsupported event type: %v", r.Response) + c.cfg.Logger.Warnf("cluster: unsupported event type: %v", r.Response) } } func (c *Cluster) handleTopologyChange(v *TopologyChange) { - log.Printf("cluster: handle topology change: %+#v", v) + c.cfg.Logger.Infof("cluster: handle topology change: %+#v", v) c.RequestRefresh() } func (c *Cluster) handleStatusChange(ctx context.Context, v *StatusChange) { - log.Printf("cluster: handle status change: %+#v", v) + c.cfg.Logger.Infof("cluster: handle status change: %+#v", v) m := c.Topology().peers addr := v.Address.String() if n, ok := m[addr]; ok { @@ -482,10 +481,10 @@ func (c *Cluster) handleStatusChange(ctx context.Context, v *StatusChange) { case frame.Down: n.setStatus(statusDown) default: - log.Printf("cluster: status change not supported: %+#v", v) + c.cfg.Logger.Warnf("cluster: status change not supported: %+#v", v) } } else { - log.Printf("cluster: unknown node %s received status change: %+#v in topology %v", addr, v, m) + c.cfg.Logger.Infof("cluster: unknown node %s received status change: %+#v in topology %v, requesting topology refresh", addr, v, m) c.RequestRefresh() } } @@ -503,7 +502,7 @@ func (c *Cluster) loop(ctx context.Context) { case <-c.reopenControlChan: c.tryReopenControl(ctx) case <-ctx.Done(): - log.Printf("cluster closing due to: %v", ctx.Err()) + c.cfg.Logger.Infof("cluster closing due to: %v", ctx.Err()) c.handleClose() return case <-c.closeChan: @@ -523,17 +522,17 @@ func (c *Cluster) tryRefresh(ctx context.Context) { if err := c.refreshTopology(ctx); err != nil { c.RequestReopenControl() time.AfterFunc(tryRefreshInterval, c.RequestRefresh) - log.Printf("cluster: refresh topology: %v", err) + c.cfg.Logger.Infof("cluster: refresh topology: %v", err) } } const tryReopenControlInterval = time.Second func (c *Cluster) tryReopenControl(ctx context.Context) { - log.Printf("cluster: reopen control connection") + c.cfg.Logger.Infoln("cluster: reopen control connection") if control, err := c.NewControl(ctx); err != nil { time.AfterFunc(tryReopenControlInterval, c.RequestReopenControl) - log.Printf("cluster: failed to reopen control connection: %v", err) + c.cfg.Logger.Infof("cluster: failed to reopen control connection: %v", err) } else { c.control.Close() c.control = control @@ -542,7 +541,7 @@ func (c *Cluster) tryReopenControl(ctx context.Context) { } func (c *Cluster) handleClose() { - log.Printf("cluster: handle cluster close") + c.cfg.Logger.Infoln("cluster: handle cluster close") c.control.Close() m := c.Topology().peers for _, n := range m { @@ -551,7 +550,7 @@ func (c *Cluster) handleClose() { } func (c *Cluster) RequestRefresh() { - log.Printf("cluster: requested to refresh cluster topology") + c.cfg.Logger.Infoln("cluster: requested to refresh cluster topology") select { case c.refreshChan <- struct{}{}: default: @@ -559,7 +558,7 @@ func (c *Cluster) RequestRefresh() { } func (c *Cluster) RequestReopenControl() { - log.Printf("cluster: requested to reopen control connection") + c.cfg.Logger.Infoln("cluster: requested to reopen control connection") select { case c.reopenControlChan <- struct{}{}: default: @@ -567,7 +566,7 @@ func (c *Cluster) RequestReopenControl() { } func (c *Cluster) Close() { - log.Printf("cluster: requested to close cluster") + c.cfg.Logger.Infoln("cluster: requested to close cluster") select { case c.closeChan <- struct{}{}: default: diff --git a/transport/conn.go b/transport/conn.go index 67c6acc2..290fefb1 100644 --- a/transport/conn.go +++ b/transport/conn.go @@ -8,7 +8,6 @@ import ( "errors" "fmt" "io" - "log" "net" "strconv" "strings" @@ -19,6 +18,7 @@ import ( "github.com/scylladb/scylla-go-driver/frame" . "github.com/scylladb/scylla-go-driver/frame/request" . "github.com/scylladb/scylla-go-driver/frame/response" + "github.com/scylladb/scylla-go-driver/log" "go.uber.org/atomic" ) @@ -71,6 +71,7 @@ type connWriter struct { // For use only when skipping sending a request. freeStream func(frame.StreamID) + log log.Logger } func (c *connWriter) submit(r request) { @@ -105,14 +106,14 @@ func (c *connWriter) loop(ctx context.Context) { c.freeStream(r.StreamID) continue } - log.Printf("%s fatal send error, closing connection due to %s", c.connString(), err) + c.log.Infof("%s fatal send error, closing connection due to %s", c.connString(), err) c.connClose() return } c.stats.inFlight.Inc() } if err := c.conn.Flush(); err != nil { - log.Printf("%s fatal flush error, closing connection due to %s", c.connString(), err) + c.log.Infof("%s fatal flush error, closing connection due to %s", c.connString(), err) c.connClose() return } @@ -172,6 +173,8 @@ type connReader struct { s streamIDAllocator closed bool mu sync.Mutex // mu guards h, s and closed + + log log.Logger } func (c *connReader) setHandler(h ResponseHandler) (frame.StreamID, error) { @@ -221,7 +224,7 @@ func (c *connReader) loop(ctx context.Context) { } if resp.Err != nil { - log.Printf("%s fatal receive error, closing connection due to %s", c.connString(), resp.Err) + c.log.Infof("%s fatal receive error, closing connection due to %s", c.connString(), resp.Err) c.connClose() c.drainHandlers() return @@ -232,7 +235,7 @@ func (c *connReader) loop(ctx context.Context) { if h := c.handler(resp.StreamID); h != nil { h <- resp } else { - log.Printf("%s received unknown stream ID %d, closing connection", c.connString(), resp.StreamID) + c.log.Warnf("%s received unknown stream ID %d, closing connection", c.connString(), resp.StreamID) c.connClose() c.drainHandlers() return @@ -313,7 +316,7 @@ func (c *connReader) parse(op frame.OpCode) frame.Response { case frame.OpAuthChallenge: return ParseAuthChallenge(&c.buf) default: - log.Fatalf("not supported") + c.log.Warnf("not supported") return nil } } @@ -347,9 +350,11 @@ type ConnConfig struct { ComprBufferSize int ConnObserver ConnObserver + Logger log.Logger } func DefaultConnConfig(keyspace string) ConnConfig { + l := log.NewDefaultLogger() return ConnConfig{ Username: "cassandra", Password: "cassandra", @@ -358,8 +363,9 @@ func DefaultConnConfig(keyspace string) ConnConfig { Timeout: 500 * time.Millisecond, DefaultConsistency: frame.LOCALQUORUM, DefaultPort: "9042", - ConnObserver: LoggingConnObserver{}, + ConnObserver: LoggingConnObserver{l}, ComprBufferSize: comprBufferSize, + Logger: l, } } @@ -378,7 +384,7 @@ func OpenShardConn(ctx context.Context, addr string, si ShardInfo, cfg ConnConfi for i := 0; i < maxTries; i++ { conn, err := OpenLocalPortConn(ctx, addr, it(), cfg) if err != nil { - log.Printf("%s dial error: %s (try %d/%d)", addr, err, i, maxTries) + cfg.Logger.Infof("%s dial error: %s (try %d/%d)", addr, err, i, maxTries) if conn != nil { conn.Close() } @@ -422,7 +428,7 @@ func OpenConn(ctx context.Context, addr string, localAddr *net.TCPAddr, cfg Conn } if cfg.TLSConfig != nil { - tConn, err := WrapTLS(ctx, tcpConn, cfg.TLSConfig) + tConn, err := WrapTLS(ctx, tcpConn, cfg) if err != nil { return nil, err } @@ -433,14 +439,14 @@ func OpenConn(ctx context.Context, addr string, localAddr *net.TCPAddr, cfg Conn return WrapConn(ctx, tcpConn, cfg) } -func WrapTLS(ctx context.Context, conn *net.TCPConn, cfg *tls.Config) (net.Conn, error) { - cfg = cfg.Clone() - tconn := tls.Client(conn, cfg) +func WrapTLS(ctx context.Context, conn *net.TCPConn, cfg ConnConfig) (net.Conn, error) { + tlsConfig := cfg.TLSConfig.Clone() + tconn := tls.Client(conn, tlsConfig) if err := tconn.HandshakeContext(ctx); err != nil { if err := tconn.Close(); err != nil { - log.Printf("%s failed to close: %s", tconn.RemoteAddr(), err) + cfg.Logger.Warnf("%s failed to close: %s", tconn.RemoteAddr(), err) } else { - log.Printf("%s closed", tconn.RemoteAddr()) + cfg.Logger.Infof("%s closed", tconn.RemoteAddr()) } return nil, err @@ -467,6 +473,7 @@ func WrapConn(ctx context.Context, conn net.Conn, cfg ConnConfig) (*Conn, error) stats: s, connString: c.String, connClose: c.Close, + log: cfg.Logger, }, r: connReader{ conn: io.LimitedReader{ @@ -476,6 +483,7 @@ func WrapConn(ctx context.Context, conn net.Conn, cfg ConnConfig) (*Conn, error) h: make(map[frame.StreamID]ResponseHandler), connString: c.String, connClose: c.Close, + log: cfg.Logger, }, stats: s, } @@ -793,9 +801,9 @@ func (c *Conn) Shard() int { func (c *Conn) Close() { c.closeOnce.Do(func() { if err := c.conn.Close(); err != nil { - log.Printf("%s failed to close: %s", c, err) + c.cfg.Logger.Warnf("%s failed to close: %s", c, err) } else { - log.Printf("%s closed", c) + c.cfg.Logger.Infof("%s closed", c) } c.w.requestCh <- _connCloseRequest if c.onClose != nil { diff --git a/transport/conn_integration_test.go b/transport/conn_integration_test.go index 90269d5f..a161ca61 100644 --- a/transport/conn_integration_test.go +++ b/transport/conn_integration_test.go @@ -5,7 +5,6 @@ package transport import ( "context" "fmt" - "log" "math/rand" "os/signal" "strconv" @@ -13,11 +12,17 @@ import ( "syscall" "testing" - "github.com/scylladb/scylla-go-driver/frame" - "github.com/google/go-cmp/cmp" + "github.com/scylladb/scylla-go-driver/frame" + "github.com/scylladb/scylla-go-driver/log" ) +var testingConnConfig = func() ConnConfig { + cfg := DefaultConnConfig("") + cfg.Logger = log.NewDebugLogger() + return cfg +}() + func TestOpenShardConnIntegration(t *testing.T) { ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGABRT, syscall.SIGTERM) defer cancel() @@ -28,7 +33,7 @@ func TestOpenShardConnIntegration(t *testing.T) { for i := uint16(0); i < si.NrShards; i++ { si.Shard = i - c, err := OpenShardConn(ctx, TestHost+":19042", si, DefaultConnConfig("")) + c, err := OpenShardConn(ctx, TestHost+":19042", si, testingConnConfig) if err != nil { t.Fatal(err) } @@ -59,7 +64,7 @@ func (h *connTestHelper) applyFixture(ctx context.Context) { h.exec(ctx, "INSERT INTO mykeyspace.users(user_id, fname, lname) VALUES (1, 'rick', 'sanchez')") h.exec(ctx, "INSERT INTO mykeyspace.users(user_id, fname, lname) VALUES (4, 'rust', 'cohle')") if err := h.conn.UseKeyspace(ctx, "mykeyspace"); err != nil { - log.Fatalf("use keyspace %v", err) + h.t.Fatalf("use keyspace %v", err) } } @@ -195,7 +200,7 @@ func (h *connTestHelper) applyCompressionFixture(ctx context.Context, toSend []b h.execCompression(ctx, fmt.Sprintf("INSERT INTO mykeyspace.users(user_id, fname, lname) VALUES (1, '%s', 'sanchez')", toSend)) h.execCompression(ctx, "INSERT INTO mykeyspace.users(user_id, fname, lname) VALUES (4, 'rust', 'cohle')") if err := h.conn.UseKeyspace(ctx, "mykeyspace"); err != nil { - log.Fatalf("use keyspace %v", err) + h.t.Fatalf("use keyspace %v", err) } } @@ -232,7 +237,7 @@ func TestCompressionIntegration(t *testing.T) { func testCompression(ctx context.Context, t *testing.T, c frame.Compression, toSend []byte) { t.Helper() - cfg := DefaultConnConfig("") + cfg := testingConnConfig cfg.Compression = c conn, err := OpenConn(ctx, TestHost, nil, cfg) if err != nil { diff --git a/transport/node.go b/transport/node.go index e701446a..254b1951 100644 --- a/transport/node.go +++ b/transport/node.go @@ -3,7 +3,6 @@ package transport import ( "context" "fmt" - "log" "github.com/scylladb/scylla-go-driver/frame" "go.uber.org/atomic" @@ -40,7 +39,7 @@ func (n *Node) Init(ctx context.Context, cfg ConnConfig) { if err == nil { n.setStatus(statusUP) } else { - log.Printf("couldn't create a connection pool to node %v: %v;setting node status to DOWN", n, err) + cfg.Logger.Infof("couldn't create a connection pool to node %v: %v;setting node status to DOWN", n, err) n.setStatus(statusDown) } } diff --git a/transport/observer.go b/transport/observer.go index 895e00e8..5158182a 100644 --- a/transport/observer.go +++ b/transport/observer.go @@ -2,8 +2,9 @@ package transport import ( "fmt" - "log" "time" + + "github.com/scylladb/scylla-go-driver/log" ) var Now = time.Now @@ -51,18 +52,20 @@ type ConnObserver interface { OnPickReplacedWithLessBusyConn(ev ConnEvent) } -type LoggingConnObserver struct{} +type LoggingConnObserver struct { + log log.Logger +} var _ ConnObserver = LoggingConnObserver{} func (o LoggingConnObserver) OnConnect(ev ConnectEvent) { if ev.Err != nil { - log.Printf("%s failed to open connection after %s: %s", ev, ev.Duration(), ev.Err) + o.log.Infof("%s failed to open connection after %s: %s", ev, ev.Duration(), ev.Err) } else { - log.Printf("%s connected in %s", ev, ev.Duration()) + o.log.Infof("%s connected in %s", ev, ev.Duration()) } } func (o LoggingConnObserver) OnPickReplacedWithLessBusyConn(ev ConnEvent) { - log.Printf("%s pick replaced with less busy conn", ev) + o.log.Infof("%s pick replaced with less busy conn", ev) } diff --git a/transport/policy.go b/transport/policy.go index 937112a8..4dc84646 100644 --- a/transport/policy.go +++ b/transport/policy.go @@ -1,8 +1,9 @@ package transport import ( - "log" "sort" + + "github.com/scylladb/scylla-go-driver/log" ) // HostSelectionPolicy decides which node the query should be routed to. @@ -69,14 +70,14 @@ type policyInfo struct { remoteNodes []*Node } -func (pi *policyInfo) Preprocess(t *topology, ks keyspace) { +func (pi *policyInfo) Preprocess(t *topology, ks keyspace, logger log.Logger) { switch ks.strategy.class { case simpleStrategy, localStrategy: pi.preprocessSimpleStrategy(t, ks.strategy) case networkTopologyStrategy: pi.preprocessNetworkTopologyStrategy(t, ks.strategy) default: - log.Println("policyInfo: unknown strategy, defaulting to round robin") + logger.Warnf("policyInfo: keyspace %v has unknown strategy, defaulting to round robin") if t.localDC == "" { pi.preprocessRoundRobinStrategy(t) } else { diff --git a/transport/policy_test.go b/transport/policy_test.go index d00f0d2a..db272366 100644 --- a/transport/policy_test.go +++ b/transport/policy_test.go @@ -4,6 +4,7 @@ import ( "testing" "github.com/scylladb/scylla-go-driver/frame" + "github.com/scylladb/scylla-go-driver/log" ) // Round-Robin tests can't be run in parallel because @@ -29,9 +30,9 @@ func mockCluster(t *topology, ks, localDC string) *Cluster { t.localDC = localDC if k, ok := t.keyspaces[ks]; ok { - t.policyInfo.Preprocess(t, k) + t.policyInfo.Preprocess(t, k, log.NewDebugLogger()) } else { - t.policyInfo.Preprocess(t, keyspace{}) + t.policyInfo.Preprocess(t, keyspace{}, log.NewDebugLogger()) } c.setTopology(t) diff --git a/transport/pool.go b/transport/pool.go index e081781d..b15ec62a 100644 --- a/transport/pool.go +++ b/transport/pool.go @@ -191,7 +191,7 @@ func (r *PoolRefiller) onConnClose(conn *Conn) { select { case r.pool.connClosedCh <- conn.Shard(): default: - log.Printf("conn pool: ignoring conn %s close", conn) + r.cfg.Logger.Infof("conn pool: ignoring conn %s close", conn) } } diff --git a/transport/pool_integration_test.go b/transport/pool_integration_test.go index 452e4684..91fc1a81 100644 --- a/transport/pool_integration_test.go +++ b/transport/pool_integration_test.go @@ -14,7 +14,7 @@ import ( const refillerBackoff = 500 * time.Millisecond func newTestConnPool(ctx context.Context, t *testing.T) *ConnPool { - p, err := NewConnPool(ctx, TestHost, DefaultConnConfig("")) + p, err := NewConnPool(ctx, TestHost, testingConnConfig) if err != nil { t.Fatal(err) }