diff --git a/CHANGELOG.md b/CHANGELOG.md index ea0d600c5..2a1c888bb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Removed - Drop support for old CQL protocol versions: 1 and 2 (CASSGO-75) +- Cleanup of deprecated elements (CASSGO-12) +- Remove global NewBatch function (CASSGO-15) +- Remove deprecated global logger (CASSGO-24) ### Added @@ -27,19 +30,16 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Move lz4 compressor to lz4 package within the gocql module (CASSGO-32) - Don't restrict server authenticator unless PasswordAuthentictor.AllowedAuthenticators is provided (CASSGO-19) -- Cleanup of deprecated elements (CASSGO-12) -- Remove global NewBatch function (CASSGO-15) - Detailed description for NumConns (CASSGO-3) - Change Batch API to be consistent with Query() (CASSGO-7) -- Added Cassandra 4.0 table options support(CASSGO-13) -- Remove deprecated global logger (CASSGO-24) +- Added Cassandra 4.0 table options support (CASSGO-13) - Bumped actions/upload-artifact and actions/cache versions to v4 in CI workflow (CASSGO-48) - Keep nil slices in MapScan (CASSGO-44) - Improve error messages for marshalling (CASSGO-38) - Remove HostPoolHostPolicy from gocql package (CASSGO-21) - Standardized spelling of datacenter (CASSGO-35) - Refactor HostInfo creation and ConnectAddress() method (CASSGO-45) -- gocql.Compressor interface changes to follow append-like design. Bumped Go version to 1.19 (CASSGO-1) +- gocql.Compressor interface changes to follow append-like design (CASSGO-1) - Refactoring hostpool package test and Expose HostInfo creation (CASSGO-59) - Move "execute batch" methods to Batch type (CASSGO-57) - Make `Session` immutable by removing setters and associated mutex (CASSGO-23) @@ -47,6 +47,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - NativeType removed (CASSGO-43) - `New` and `NewWithError` removed and replaced with `Zero` (CASSGO-43) - Changes to Query and Batch to make them safely reusable (CASSGO-22) +- Change logger interface so it supports structured logging and log levels (CASSGO-9) ### Fixed diff --git a/address_translators_test.go b/address_translators_test.go index eb96a700a..a1e4100a8 100644 --- a/address_translators_test.go +++ b/address_translators_test.go @@ -1,3 +1,6 @@ +//go:build all || unit +// +build all unit + /* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file diff --git a/cluster.go b/cluster.go index f18f72978..812e459fc 100644 --- a/cluster.go +++ b/cluster.go @@ -259,9 +259,15 @@ type ClusterConfig struct { // If not provided, Dialer will be used instead. HostDialer HostDialer - // Logger for this ClusterConfig. - // If not specified, defaults to the gocql.defaultLogger. - Logger StdLogger + // StructuredLogger for this ClusterConfig. + // + // There are 3 built in implementations of StructuredLogger: + // - std library "log" package: gocql.NewLogger + // - zerolog: gocqlzerolog.NewZerologLogger + // - zap: gocqlzap.NewZapLogger + // + // You can also provide your own logger implementation of the StructuredLogger interface. + Logger StructuredLogger // Tracer will be used for all queries. Alternatively it can be set of on a // per query basis. @@ -318,11 +324,11 @@ func NewCluster(hosts ...string) *ClusterConfig { return cfg } -func (cfg *ClusterConfig) logger() StdLogger { - if cfg.Logger == nil { - return &defaultLogger{} +func (cfg *ClusterConfig) newLogger() StructuredLogger { + if cfg.Logger != nil { + return cfg.Logger } - return cfg.Logger + return NewLogger(LogLevelNone) } // CreateSession initializes the cluster based on this config and returns a @@ -335,14 +341,14 @@ func (cfg *ClusterConfig) CreateSession() (*Session, error) { // if defined, to translate the given address and port into a possibly new address // and port, If no AddressTranslator or if an error occurs, the given address and // port will be returned. -func (cfg *ClusterConfig) translateAddressPort(addr net.IP, port int) (net.IP, int) { +func (cfg *ClusterConfig) translateAddressPort(addr net.IP, port int, logger StructuredLogger) (net.IP, int) { if cfg.AddressTranslator == nil || len(addr) == 0 { return addr, port } newAddr, newPort := cfg.AddressTranslator.Translate(addr, port) - if gocqlDebug { - cfg.logger().Printf("gocql: translating address '%v:%d' to '%v:%d'", addr, port, newAddr, newPort) - } + logger.Debug("Translating address.", + newLogFieldIp("old_addr", addr), newLogFieldInt("old_port", port), + newLogFieldIp("new_addr", newAddr), newLogFieldInt("new_port", newPort)) return newAddr, newPort } diff --git a/cluster_test.go b/cluster_test.go index adc21fd05..16d94ebeb 100644 --- a/cluster_test.go +++ b/cluster_test.go @@ -1,3 +1,6 @@ +//go:build all || unit +// +build all unit + /* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -60,7 +63,7 @@ func TestNewCluster_WithHosts(t *testing.T) { func TestClusterConfig_translateAddressAndPort_NilTranslator(t *testing.T) { cfg := NewCluster() assertNil(t, "cluster config address translator", cfg.AddressTranslator) - newAddr, newPort := cfg.translateAddressPort(net.ParseIP("10.0.0.1"), 1234) + newAddr, newPort := cfg.translateAddressPort(net.ParseIP("10.0.0.1"), 1234, nopLoggerSingleton) assertTrue(t, "same address as provided", net.ParseIP("10.0.0.1").Equal(newAddr)) assertEqual(t, "translated host and port", 1234, newPort) } @@ -68,7 +71,7 @@ func TestClusterConfig_translateAddressAndPort_NilTranslator(t *testing.T) { func TestClusterConfig_translateAddressAndPort_EmptyAddr(t *testing.T) { cfg := NewCluster() cfg.AddressTranslator = staticAddressTranslator(net.ParseIP("10.10.10.10"), 5432) - newAddr, newPort := cfg.translateAddressPort(net.IP([]byte{}), 0) + newAddr, newPort := cfg.translateAddressPort(net.IP([]byte{}), 0, nopLoggerSingleton) assertTrue(t, "translated address is still empty", len(newAddr) == 0) assertEqual(t, "translated port", 0, newPort) } @@ -76,7 +79,7 @@ func TestClusterConfig_translateAddressAndPort_EmptyAddr(t *testing.T) { func TestClusterConfig_translateAddressAndPort_Success(t *testing.T) { cfg := NewCluster() cfg.AddressTranslator = staticAddressTranslator(net.ParseIP("10.10.10.10"), 5432) - newAddr, newPort := cfg.translateAddressPort(net.ParseIP("10.0.0.1"), 2345) + newAddr, newPort := cfg.translateAddressPort(net.ParseIP("10.0.0.1"), 2345, nopLoggerSingleton) assertTrue(t, "translated address", net.ParseIP("10.10.10.10").Equal(newAddr)) assertEqual(t, "translated port", 5432, newPort) } diff --git a/common_test.go b/common_test.go index 1aa4dba21..27f5b9595 100644 --- a/common_test.go +++ b/common_test.go @@ -1,3 +1,6 @@ +//go:build all || unit || integration || ccm || cassandra +// +build all unit integration ccm cassandra + /* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -85,7 +88,7 @@ func addSslOptions(cluster *ClusterConfig) *ClusterConfig { var initOnce sync.Once func createTable(s *Session, table string) error { - // lets just be really sure + // let's just be really sure if err := s.control.awaitSchemaAgreement(); err != nil { log.Printf("error waiting for schema agreement pre create table=%q err=%v\n", table, err) return err diff --git a/conn.go b/conn.go index 4e1bb4f2e..41f7db41a 100644 --- a/conn.go +++ b/conn.go @@ -144,19 +144,12 @@ type ConnConfig struct { Authenticator Authenticator AuthProvider func(h *HostInfo) (Authenticator, error) Keepalive time.Duration - Logger StdLogger + Logger StructuredLogger tlsConfig *tls.Config disableCoalesce bool } -func (c *ConnConfig) logger() StdLogger { - if c.Logger == nil { - return &defaultLogger{} - } - return c.Logger -} - type ConnErrorHandler interface { HandleError(conn *Conn, err error, closed bool) } @@ -208,7 +201,7 @@ type Conn struct { timeouts int64 - logger StdLogger + logger StructuredLogger } // connect establishes a connection to a Cassandra node using session's connection config. @@ -715,7 +708,7 @@ func (c *Conn) processFrame(ctx context.Context, r io.Reader) error { delete(c.calls, head.stream) c.mu.Unlock() if call == nil || !ok { - c.logger.Printf("gocql: received response for stream which has no handler: header=%v\n", head) + c.logger.Warning("Received response for stream which has no handler.", newLogFieldString("header", head.String())) return c.discardFrame(r, head) } else if head.stream != call.streamID { panic(fmt.Sprintf("call has incorrect streamID: got %d expected %d", call.streamID, head.stream)) @@ -1330,12 +1323,19 @@ func (c *Conn) execInternal(ctx context.Context, req frameBuilder, tracer Tracer return resp.framer, nil case <-timeoutCh: close(call.timeout) + c.logger.Debug("Request timed out on connection.", + newLogFieldString("host_id", c.host.HostID()), newLogFieldIp("addr", c.host.ConnectAddress())) c.handleTimeout() return nil, ErrTimeoutNoResponse case <-ctxDone: + c.logger.Debug("Request failed because context elapsed out on connection.", + newLogFieldString("host_id", c.host.HostID()), newLogFieldIp("addr", c.host.ConnectAddress()), + newLogFieldError("ctx_err", ctx.Err())) close(call.timeout) return nil, ctx.Err() case <-c.ctx.Done(): + c.logger.Debug("Request failed because connection closed.", + newLogFieldString("host_id", c.host.HostID()), newLogFieldIp("addr", c.host.ConnectAddress())) close(call.timeout) return nil, ErrConnectionClosed } @@ -1685,7 +1685,7 @@ func (c *Conn) executeQuery(ctx context.Context, q *internalQuery) *Iter { iter.framer = framer if err := c.awaitSchemaAgreement(ctx); err != nil { // TODO: should have this behind a flag - c.logger.Println(err) + c.logger.Warning("Error while awaiting for schema agreement after a schema change event.", newLogFieldError("err", err)) } // dont return an error from this, might be a good idea to give a warning // though. The impact of this returning an error would be that the cluster @@ -1947,7 +1947,7 @@ func (c *Conn) awaitSchemaAgreement(ctx context.Context) (err error) { goto cont } if !isValidPeer(host) || host.schemaVersion == "" { - c.logger.Printf("invalid peer or peer with empty schema_version: peer=%q", host) + c.logger.Warning("Invalid peer or peer with empty schema_version.", newLogFieldIp("peer", host.ConnectAddress())) continue } diff --git a/conn_test.go b/conn_test.go index 9a3586196..0a3bc69bb 100644 --- a/conn_test.go +++ b/conn_test.go @@ -41,6 +41,7 @@ import ( "math/rand" "net" "os" + "strconv" "strings" "sync" "sync/atomic" @@ -184,7 +185,7 @@ func newTestSession(proto protoVersion, addresses ...string) (*Session, error) { } func TestDNSLookupConnected(t *testing.T) { - log := &testLogger{} + log := newTestLogger(LogLevelDebug) // Override the defaul DNS resolver and restore at the end failDNS = true @@ -205,13 +206,13 @@ func TestDNSLookupConnected(t *testing.T) { t.Fatal("CreateSession() should have connected") } - if !strings.Contains(log.String(), "gocql: dns error") { + if !strings.Contains(log.String(), "gocql: DNS error") { t.Fatalf("Expected to receive dns error log message - got '%s' instead", log.String()) } } func TestDNSLookupError(t *testing.T) { - log := &testLogger{} + log := newTestLogger(LogLevelDebug) // Override the defaul DNS resolver and restore at the end failDNS = true @@ -229,7 +230,7 @@ func TestDNSLookupError(t *testing.T) { t.Fatal("CreateSession() should have returned an error") } - if !strings.Contains(log.String(), "gocql: dns error") { + if !strings.Contains(log.String(), "gocql: DNS error") { t.Fatalf("Expected to receive dns error log message - got '%s' instead", log.String()) } @@ -240,7 +241,7 @@ func TestDNSLookupError(t *testing.T) { func TestStartupTimeout(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) - log := &testLogger{} + log := newTestLogger(LogLevelDebug) srv := NewTestServer(t, defaultProto, ctx) defer srv.Stop() @@ -348,17 +349,20 @@ func TestCancel(t *testing.T) { type testQueryObserver struct { metrics map[string]*hostMetrics - verbose bool - logger StdLogger + logger StructuredLogger } func (o *testQueryObserver) ObserveQuery(ctx context.Context, q ObservedQuery) { host := q.Host.ConnectAddress().String() o.metrics[host] = q.Metrics - if o.verbose { - o.logger.Printf("Observed query %q. Returned %v rows, took %v on host %q with %v attempts and total latency %v. Error: %q\n", - q.Statement, q.Rows, q.End.Sub(q.Start), host, q.Metrics.Attempts, q.Metrics.TotalLatency, q.Err) - } + o.logger.Debug("Observed query.", + newLogFieldString("stmt", q.Statement), + newLogFieldInt("rows", q.Rows), + newLogFieldString("duration", q.End.Sub(q.Start).String()), + newLogFieldString("host", host), + newLogFieldInt("attempts", q.Metrics.Attempts), + newLogFieldString("latency", strconv.FormatInt(q.Metrics.TotalLatency, 10)), + newLogFieldError("err", q.Err)) } func (o *testQueryObserver) GetMetrics(host *HostInfo) *hostMetrics { @@ -411,7 +415,7 @@ func TestQueryRetry(t *testing.T) { } func TestQueryMultinodeWithMetrics(t *testing.T) { - log := &testLogger{} + log := newTestLogger(LogLevelNone) defer func() { os.Stdout.WriteString(log.String()) }() @@ -439,7 +443,7 @@ func TestQueryMultinodeWithMetrics(t *testing.T) { // 1 retry per host rt := &SimpleRetryPolicy{NumRetries: 3} - observer := &testQueryObserver{metrics: make(map[string]*hostMetrics), verbose: false, logger: log} + observer := &testQueryObserver{metrics: make(map[string]*hostMetrics), logger: log} qry := db.Query("kill").RetryPolicy(rt).Observer(observer).Idempotent(true) iter := qry.Iter() err = iter.Close() @@ -488,7 +492,7 @@ func (t *testRetryPolicy) GetRetryType(err error) RetryType { } func TestSpeculativeExecution(t *testing.T) { - log := &testLogger{} + log := newTestLogger(LogLevelDebug) defer func() { os.Stdout.WriteString(log.String()) }() @@ -724,7 +728,7 @@ func TestStream0(t *testing.T) { session: &Session{ types: GlobalTypes, }, - logger: &defaultLogger{}, + logger: NewLogger(LogLevelNone), } err := conn.recv(context.Background(), false) diff --git a/connectionpool.go b/connectionpool.go index 9b8295e70..f316b5695 100644 --- a/connectionpool.go +++ b/connectionpool.go @@ -154,7 +154,7 @@ func connConfig(cfg *ClusterConfig) (*ConnConfig, error) { Authenticator: cfg.Authenticator, AuthProvider: cfg.AuthProvider, Keepalive: cfg.SocketKeepalive, - Logger: cfg.logger(), + Logger: cfg.Logger, }, nil } @@ -310,7 +310,7 @@ type hostConnPool struct { filling bool pos uint32 - logger StdLogger + logger StructuredLogger } func (h *hostConnPool) String() string { @@ -493,21 +493,20 @@ func (pool *hostConnPool) logConnectErr(err error) { if opErr, ok := err.(*net.OpError); ok && (opErr.Op == "dial" || opErr.Op == "read") { // connection refused // these are typical during a node outage so avoid log spam. - if gocqlDebug { - pool.logger.Printf("gocql: unable to dial %q: %v\n", pool.host, err) - } + pool.logger.Debug("Pool unable to establish a connection to host.", + newLogFieldIp("host_addr", pool.host.ConnectAddress()), newLogFieldString("host_id", pool.host.HostID()), newLogFieldError("err", err)) } else if err != nil { // unexpected error - pool.logger.Printf("error: failed to connect to %q due to error: %v", pool.host, err) + pool.logger.Debug("Pool failed to connect to host due to error.", + newLogFieldIp("host_addr", pool.host.ConnectAddress()), newLogFieldString("host_id", pool.host.HostID()), newLogFieldError("err", err)) } } // transition back to a not-filling state. func (pool *hostConnPool) fillingStopped(err error) { if err != nil { - if gocqlDebug { - pool.logger.Printf("gocql: filling stopped %q: %v\n", pool.host.ConnectAddress(), err) - } + pool.logger.Warning("Connection pool filling failed.", + newLogFieldIp("host_addr", pool.host.ConnectAddress()), newLogFieldString("host_id", pool.host.HostID()), newLogFieldError("err", err)) // wait for some time to avoid back-to-back filling // this provides some time between failed attempts // to fill the pool for the host to recover @@ -523,9 +522,8 @@ func (pool *hostConnPool) fillingStopped(err error) { // if we errored and the size is now zero, make sure the host is marked as down // see https://github.com/apache/cassandra-gocql-driver/issues/1614 - if gocqlDebug { - pool.logger.Printf("gocql: conns of pool after stopped %q: %v\n", host.ConnectAddress(), count) - } + pool.logger.Debug("Logging number of connections of pool after filling stopped.", + newLogFieldIp("host_addr", host.ConnectAddress()), newLogFieldString("host_id", host.HostID()), newLogFieldInt("count", count)) if err != nil && count == 0 { if pool.session.cfg.ConvictionPolicy.AddFailure(err, host) { pool.session.handleNodeDown(host.ConnectAddress(), port) @@ -581,10 +579,11 @@ func (pool *hostConnPool) connect() (err error) { break } } - if gocqlDebug { - pool.logger.Printf("gocql: connection failed %q: %v, reconnecting with %T\n", - pool.host.ConnectAddress(), err, reconnectionPolicy) - } + pool.logger.Warning("Pool failed to connect to host. Reconnecting according to the reconnection policy.", + newLogFieldIp("host", pool.host.ConnectAddress()), + newLogFieldString("host_id", pool.host.HostID()), + newLogFieldError("err", err), + newLogFieldString("reconnectionPolicy", fmt.Sprintf("%T", reconnectionPolicy))) time.Sleep(reconnectionPolicy.GetInterval(i)) } @@ -631,9 +630,8 @@ func (pool *hostConnPool) HandleError(conn *Conn, err error, closed bool) { return } - if gocqlDebug { - pool.logger.Printf("gocql: pool connection error %q: %v\n", conn.addr, err) - } + pool.logger.Info("Pool connection error.", + newLogFieldString("addr", conn.addr), newLogFieldError("err", err)) // find the connection index for i, candidate := range pool.conns { diff --git a/control.go b/control.go index dfc7dc021..c2cb4cb13 100644 --- a/control.go +++ b/control.go @@ -105,18 +105,20 @@ func (c *controlConn) heartBeat() { resp, err := c.writeFrame(&writeOptionsFrame{}) if err != nil { + c.session.logger.Debug("Control connection failed to send heartbeat.", newLogFieldError("err", err)) goto reconn } - switch resp.(type) { + switch actualResp := resp.(type) { case *supportedFrame: // Everything ok sleepTime = 5 * time.Second continue case error: + c.session.logger.Debug("Control connection heartbeat failed.", newLogFieldError("err", actualResp)) goto reconn default: - panic(fmt.Sprintf("gocql: unknown frame in response to options: %T", resp)) + c.session.logger.Error("Unknown frame in response to options.", newLogFieldString("frame_type", fmt.Sprintf("%T", resp))) } reconn: @@ -244,18 +246,25 @@ func (c *controlConn) discoverProtocol(hosts []*HostInfo) (int, error) { } if err == nil { + c.session.logger.Debug("Discovered protocol version using host.", + newLogFieldInt("protocol_version", connCfg.ProtoVersion), newLogFieldIp("host_addr", host.ConnectAddress()), newLogFieldString("host_id", host.HostID())) return connCfg.ProtoVersion, nil } if proto := parseProtocolFromError(err); proto > 0 { + c.session.logger.Debug("Discovered protocol version using host after parsing protocol error.", + newLogFieldInt("protocol_version", proto), newLogFieldIp("host_addr", host.ConnectAddress()), newLogFieldString("host_id", host.HostID())) return proto, nil } + + c.session.logger.Debug("Failed to discover protocol version using host.", + newLogFieldIp("host_addr", host.ConnectAddress()), newLogFieldString("host_id", host.HostID()), newLogFieldError("err", err)) } return 0, err } -func (c *controlConn) connect(hosts []*HostInfo) error { +func (c *controlConn) connect(hosts []*HostInfo, sessionInit bool) error { if len(hosts) == 0 { return errors.New("control: no endpoints specified") } @@ -272,14 +281,22 @@ func (c *controlConn) connect(hosts []*HostInfo) error { for _, host := range hosts { conn, err = c.session.dial(c.session.ctx, host, &cfg, c) if err != nil { - c.session.logger.Printf("gocql: unable to dial control conn %v:%v: %v\n", host.ConnectAddress(), host.Port(), err) + c.session.logger.Info("Control connection failed to establish a connection to host.", + newLogFieldIp("host_addr", host.ConnectAddress()), + newLogFieldInt("port", host.Port()), + newLogFieldString("host_id", host.HostID()), + newLogFieldError("err", err)) continue } - err = c.setupConn(conn) + err = c.setupConn(conn, sessionInit) if err == nil { break } - c.session.logger.Printf("gocql: unable setup control conn %v:%v: %v\n", host.ConnectAddress(), host.Port(), err) + c.session.logger.Info("Control connection setup failed after connecting to host.", + newLogFieldIp("host_addr", host.ConnectAddress()), + newLogFieldInt("port", host.Port()), + newLogFieldString("host_id", host.HostID()), + newLogFieldError("err", err)) conn.Close() conn = nil } @@ -300,7 +317,7 @@ type connHost struct { host *HostInfo } -func (c *controlConn) setupConn(conn *Conn) error { +func (c *controlConn) setupConn(conn *Conn, sessionInit bool) error { // we need up-to-date host info for the filterHost call below iter := conn.querySystemLocal(context.TODO()) host, err := c.session.hostInfoFromIter(iter, conn.host.connectAddress, conn.r.RemoteAddr().(*net.TCPAddr).Port) @@ -308,10 +325,22 @@ func (c *controlConn) setupConn(conn *Conn) error { return err } - host = c.session.ring.addOrUpdate(host) + var exists bool + host, exists = c.session.ring.addOrUpdate(host) if c.session.cfg.filterHost(host) { - return fmt.Errorf("host was filtered: %v", host.ConnectAddress()) + return fmt.Errorf("host was filtered: %v (%s)", host.ConnectAddress(), host.HostID()) + } + + if !exists { + logLevel := LogLevelInfo + msg := "Added control host." + if sessionInit { + logLevel = LogLevelDebug + msg = "Added control host (session initialization)." + } + logHelper(c.session.logger, logLevel, msg, + newLogFieldIp("host_addr", host.ConnectAddress()), newLogFieldString("host_id", host.HostID())) } if err := c.registerEvents(conn); err != nil { @@ -324,6 +353,10 @@ func (c *controlConn) setupConn(conn *Conn) error { } c.conn.Store(ch) + + c.session.logger.Info("Control connection connected to host.", + newLogFieldIp("host_addr", host.ConnectAddress()), newLogFieldString("host_id", host.HostID())) + if c.session.initialized() { // We connected to control conn, so add the connect the host in pool as well. // Notify session we can start trying to connect to the node. @@ -365,7 +398,7 @@ func (c *controlConn) registerEvents(conn *Conn) error { if err != nil { return err } else if _, ok := frame.(*readyFrame); !ok { - return fmt.Errorf("unexpected frame in response to register: got %T: %v\n", frame, frame) + return fmt.Errorf("unexpected frame in response to register: got %T: %v", frame, frame) } return nil @@ -380,20 +413,25 @@ func (c *controlConn) reconnect() { } defer atomic.StoreInt32(&c.reconnecting, 0) - conn, err := c.attemptReconnect() + _, err := c.attemptReconnect() - if conn == nil { - c.session.logger.Printf("gocql: unable to reconnect control connection: %v\n", err) + if err != nil { + c.session.logger.Error("Unable to reconnect control connection.", + newLogFieldError("err", err)) return } err = c.session.refreshRing() if err != nil { - c.session.logger.Printf("gocql: unable to refresh ring: %v\n", err) + c.session.logger.Warning("Unable to refresh ring.", + newLogFieldError("err", err)) } } func (c *controlConn) attemptReconnect() (*Conn, error) { + + c.session.logger.Debug("Reconnecting the control connection.") + hosts := c.session.ring.allHosts() hosts = shuffleHosts(hosts) @@ -416,8 +454,7 @@ func (c *controlConn) attemptReconnect() (*Conn, error) { return conn, err } - c.session.logger.Printf("gocql: unable to connect to any ring node: %v\n", err) - c.session.logger.Printf("gocql: control falling back to initial contact points.\n") + c.session.logger.Error("Unable to connect to any ring node, control connection falling back to initial contact points.", newLogFieldError("err", err)) // Fallback to initial contact points, as it may be the case that all known initialHosts // changed their IPs while keeping the same hostname(s). initialHosts, resolvErr := addrsToHosts(c.session.cfg.Hosts, c.session.cfg.Port, c.session.logger) @@ -434,14 +471,22 @@ func (c *controlConn) attemptReconnectToAnyOfHosts(hosts []*HostInfo) (*Conn, er for _, host := range hosts { conn, err = c.session.connect(c.session.ctx, host, c) if err != nil { - c.session.logger.Printf("gocql: unable to dial control conn %v:%v: %v\n", host.ConnectAddress(), host.Port(), err) + c.session.logger.Info("During reconnection, control connection failed to establish a connection to host.", + newLogFieldIp("host_addr", host.ConnectAddress()), + newLogFieldInt("port", host.Port()), + newLogFieldString("host_id", host.HostID()), + newLogFieldError("err", err)) continue } - err = c.setupConn(conn) + err = c.setupConn(conn, false) if err == nil { break } - c.session.logger.Printf("gocql: unable setup control conn %v:%v: %v\n", host.ConnectAddress(), host.Port(), err) + c.session.logger.Info("During reconnection, control connection setup failed after connecting to host.", + newLogFieldIp("host_addr", host.ConnectAddress()), + newLogFieldInt("port", host.Port()), + newLogFieldString("host_id", host.HostID()), + newLogFieldError("err", err)) conn.Close() conn = nil } @@ -461,6 +506,11 @@ func (c *controlConn) HandleError(conn *Conn, err error, closed bool) { return } + c.session.logger.Warning("Control connection error.", + newLogFieldIp("host_addr", conn.host.ConnectAddress()), + newLogFieldString("host_id", conn.host.HostID()), + newLogFieldError("err", err)) + c.reconnect() } @@ -522,8 +572,9 @@ func (c *controlConn) query(statement string, values ...interface{}) (iter *Iter return conn.executeQuery(qry.Context(), qry) }) - if gocqlDebug && iter.err != nil { - c.session.logger.Printf("control: error executing %q: %v\n", statement, iter.err) + if iter.err != nil { + c.session.logger.Warning("Error executing control connection statement.", + newLogFieldString("statement", statement), newLogFieldError("err", iter.err)) } iter.metrics.attempt(1, 0, c.getConn().host, false) diff --git a/control_test.go b/control_test.go index 9713718e6..9f83ec955 100644 --- a/control_test.go +++ b/control_test.go @@ -1,3 +1,6 @@ +//go:build all || unit +// +build all unit + /* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file diff --git a/crc_test.go b/crc_test.go index cf5e40a35..2556f2599 100644 --- a/crc_test.go +++ b/crc_test.go @@ -1,3 +1,6 @@ +//go:build all || unit +// +build all unit + /* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file diff --git a/debug_off.go b/debug_off.go deleted file mode 100644 index 75b0b0cec..000000000 --- a/debug_off.go +++ /dev/null @@ -1,30 +0,0 @@ -//go:build !gocql_debug -// +build !gocql_debug - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -/* - * Content before git sha 34fdeebefcbf183ed7f916f931aa0586fdaa1b40 - * Copyright (c) 2016, The Gocql authors, - * provided under the BSD-3-Clause License. - * See the NOTICE file distributed with this work for additional information. - */ - -package gocql - -const gocqlDebug = false diff --git a/debug_on.go b/debug_on.go deleted file mode 100644 index 424394c28..000000000 --- a/debug_on.go +++ /dev/null @@ -1,30 +0,0 @@ -//go:build gocql_debug -// +build gocql_debug - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -/* - * Content before git sha 34fdeebefcbf183ed7f916f931aa0586fdaa1b40 - * Copyright (c) 2016, The Gocql authors, - * provided under the BSD-3-Clause License. - * See the NOTICE file distributed with this work for additional information. - */ - -package gocql - -const gocqlDebug = true diff --git a/events.go b/events.go index 93b001acc..8f4bd1db8 100644 --- a/events.go +++ b/events.go @@ -25,7 +25,10 @@ package gocql import ( + "fmt" "net" + "strconv" + "strings" "sync" "time" ) @@ -39,10 +42,10 @@ type eventDebouncer struct { callback func([]frame) quit chan struct{} - logger StdLogger + logger StructuredLogger } -func newEventDebouncer(name string, eventHandler func([]frame), logger StdLogger) *eventDebouncer { +func newEventDebouncer(name string, eventHandler func([]frame), logger StructuredLogger) *eventDebouncer { e := &eventDebouncer{ name: name, quit: make(chan struct{}), @@ -100,7 +103,8 @@ func (e *eventDebouncer) debounce(frame frame) { if len(e.events) < eventBufferSize { e.events = append(e.events, frame) } else { - e.logger.Printf("%s: buffer full, dropping event frame: %s", e.name, frame) + e.logger.Warning("Event buffer full, dropping event frame.", + newLogFieldString("event_name", e.name), newLogFieldStringer("frame", frame)) } e.mu.Unlock() @@ -109,13 +113,11 @@ func (e *eventDebouncer) debounce(frame frame) { func (s *Session) handleEvent(framer *framer) { frame, err := framer.parseFrame() if err != nil { - s.logger.Printf("gocql: unable to parse event frame: %v\n", err) + s.logger.Error("Unable to parse event frame.", newLogFieldError("err", err)) return } - if gocqlDebug { - s.logger.Printf("gocql: handling frame: %v\n", frame) - } + s.logger.Debug("Handling event frame.", newLogFieldStringer("frame", frame)) switch f := frame.(type) { case *schemaChangeKeyspace, *schemaChangeFunction, @@ -125,7 +127,8 @@ func (s *Session) handleEvent(framer *framer) { case *topologyChangeEventFrame, *statusChangeEventFrame: s.nodeEvents.debounce(frame) default: - s.logger.Printf("gocql: invalid event frame (%T): %v\n", f, f) + s.logger.Error("Invalid event frame.", + newLogFieldString("frame_type", fmt.Sprintf("%T", f)), newLogFieldStringer("frame", f)) } } @@ -177,6 +180,8 @@ func (s *Session) handleNodeEvent(frames []frame) { for _, frame := range frames { switch f := frame.(type) { case *topologyChangeEventFrame: + s.logger.Info("Received topology change event.", + newLogFieldString("frame", strings.Join([]string{f.change, "->", f.host.String(), ":", strconv.Itoa(f.port)}, ""))) topologyEventReceived = true case *statusChangeEventFrame: event, ok := sEvents[f.host.String()] @@ -193,9 +198,8 @@ func (s *Session) handleNodeEvent(frames []frame) { } for _, f := range sEvents { - if gocqlDebug { - s.logger.Printf("gocql: dispatching status change event: %+v\n", f) - } + s.logger.Info("Dispatching status change event.", + newLogFieldString("frame", strings.Join([]string{f.change, "->", f.host.String(), ":", strconv.Itoa(f.port)}, ""))) // ignore events we received if they were disabled // see https://github.com/apache/cassandra-gocql-driver/issues/1591 @@ -213,9 +217,8 @@ func (s *Session) handleNodeEvent(frames []frame) { } func (s *Session) handleNodeUp(eventIp net.IP, eventPort int) { - if gocqlDebug { - s.logger.Printf("gocql: Session.handleNodeUp: %s:%d\n", eventIp.String(), eventPort) - } + s.logger.Info("Node is UP.", + newLogFieldStringer("event_ip", eventIp), newLogFieldInt("event_port", eventPort)) host, ok := s.ring.getHostByIP(eventIp.String()) if !ok { @@ -240,9 +243,8 @@ func (s *Session) startPoolFill(host *HostInfo) { } func (s *Session) handleNodeConnected(host *HostInfo) { - if gocqlDebug { - s.logger.Printf("gocql: Session.handleNodeConnected: %s:%d\n", host.ConnectAddress(), host.Port()) - } + s.logger.Debug("Pool connected to node.", + newLogFieldIp("host_addr", host.ConnectAddress()), newLogFieldInt("port", host.Port()), newLogFieldString("host_id", host.HostID())) host.setState(NodeUp) @@ -252,9 +254,8 @@ func (s *Session) handleNodeConnected(host *HostInfo) { } func (s *Session) handleNodeDown(ip net.IP, port int) { - if gocqlDebug { - s.logger.Printf("gocql: Session.handleNodeDown: %s:%d\n", ip.String(), port) - } + s.logger.Warning("Node is DOWN.", + newLogFieldIp("host_addr", ip), newLogFieldInt("port", port)) host, ok := s.ring.getHostByIP(ip.String()) if ok { diff --git a/events_test.go b/events_test.go index 537c51885..cf088e3d1 100644 --- a/events_test.go +++ b/events_test.go @@ -1,3 +1,6 @@ +//go:build all || unit +// +build all unit + /* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file diff --git a/filters.go b/filters.go index 312bd0d1a..c258dadda 100644 --- a/filters.go +++ b/filters.go @@ -70,7 +70,7 @@ func DataCentreHostFilter(dataCenter string) HostFilter { // WhiteListHostFilter filters incoming hosts by checking that their address is // in the initial hosts whitelist. func WhiteListHostFilter(hosts ...string) HostFilter { - hostInfos, err := addrsToHosts(hosts, 9042, nopLogger{}) + hostInfos, err := addrsToHosts(hosts, 9042, nopLoggerSingleton) if err != nil { // dont want to panic here, but rather not break the API panic(fmt.Errorf("unable to lookup host info from address: %v", err)) diff --git a/filters_test.go b/filters_test.go index a1abec207..2469422b6 100644 --- a/filters_test.go +++ b/filters_test.go @@ -1,3 +1,6 @@ +//go:build all || unit +// +build all unit + /* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file diff --git a/frame.go b/frame.go index a48279d3e..93af2c673 100644 --- a/frame.go +++ b/frame.go @@ -411,6 +411,7 @@ func newFramer(compressor Compressor, version byte, r *RegisteredTypes) *framer type frame interface { Header() frameHeader + String() string } func readHeader(r io.Reader, p []byte) (head frameHeader, err error) { diff --git a/frame_test.go b/frame_test.go index e2270d2c2..e7a8bb4bf 100644 --- a/frame_test.go +++ b/frame_test.go @@ -1,3 +1,6 @@ +//go:build all || unit +// +build all unit + /* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file diff --git a/go.mod b/go.mod index c4505dc41..93a5ca659 100644 --- a/go.mod +++ b/go.mod @@ -21,14 +21,16 @@ require ( github.com/golang/snappy v0.0.3 github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed github.com/pierrec/lz4/v4 v4.1.8 + github.com/rs/zerolog v1.34.0 github.com/stretchr/testify v1.9.0 + go.uber.org/zap v1.27.0 gopkg.in/inf.v0 v0.9.1 ) require ( github.com/bitly/go-hostpool v0.0.0-20171023180738-a3a6125de932 // indirect github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 // indirect - github.com/kr/pretty v0.1.0 // indirect + github.com/kr/pretty v0.3.1 // indirect ) go 1.13 diff --git a/go.sum b/go.sum index 14c301fe6..b0a2adec7 100644 --- a/go.sum +++ b/go.sum @@ -2,33 +2,63 @@ github.com/bitly/go-hostpool v0.0.0-20171023180738-a3a6125de932 h1:mXoPYz/Ul5HYE github.com/bitly/go-hostpool v0.0.0-20171023180738-a3a6125de932/go.mod h1:NOuUCSz6Q9T7+igc/hlvDOUdtWKryOrtFyIVABv/p7k= github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 h1:DDGfHa7BWjL4YnC6+E63dPcxHo2sUxDIu8g3QgEJdRY= github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4= +github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= +github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= github.com/golang/snappy v0.0.3 h1:fHPg5GQYlCeLIPB9BZqMVR5nR9A+IM5zcgeTdjMYmLA= github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed h1:5upAirOpQc1Q53c0bnx2ufif5kANL7bfZWcc6VJWJd8= github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed/go.mod h1:tMWxXQ9wFIaZeTI9F+hmhFiGpFmhOHzyShyFUhRm0H4= -github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= -github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= +github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= +github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= +github.com/mattn/go-isatty v0.0.19 h1:JITubQf0MOLdlGRuRq+jtsDlekdYPia9ZFsB8h/APPA= +github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/pierrec/lz4/v4 v4.1.8 h1:ieHkV+i2BRzngO4Wd/3HGowuZStgq6QkPsD1eolNAO4= github.com/pierrec/lz4/v4 v4.1.8/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= +github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8= +github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= +github.com/rs/xid v1.6.0/go.mod h1:7XoLgs4eV+QndskICGsho+ADou8ySMSjJKDIan90Nz0= +github.com/rs/zerolog v1.34.0 h1:k43nTLIwcTVQAncfCw4KZ2VY6ukYoZaBPNOE8txlOeY= +github.com/rs/zerolog v1.34.0/go.mod h1:bJsvje4Z08ROH4Nhs5iH600c3IkWhwp44iRc54W6wYQ= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= -gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= +go.uber.org/multierr v1.10.0 h1:S0h4aNzvfcFsC3dRF1jLoaov7oRaKqRGC/pUEJ2yvPQ= +go.uber.org/multierr v1.10.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= +go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= +go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= +golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.12.0 h1:CM0HF96J0hcLAwsHPJZjfdNzs0gftsLfgKt57wWHJ0o= +golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc= gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/gocqlzap/zap.go b/gocqlzap/zap.go new file mode 100644 index 000000000..0b7dbfcd1 --- /dev/null +++ b/gocqlzap/zap.go @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package gocqlzap + +import ( + "go.uber.org/zap" + + "github.com/gocql/gocql" +) + +const DefaultName = "gocql" + +type Logger interface { + gocql.StructuredLogger + ZapLogger() *zap.Logger +} + +type logger struct { + zapLogger *zap.Logger +} + +// NewZapLogger creates a new zap based logger with the logger name set to DefaultName +func NewZapLogger(l *zap.Logger) Logger { + return &logger{zapLogger: l.Named(DefaultName)} +} + +// NewUnnamedZapLogger doesn't set the logger name so the user can set the name of the logger +// before providing it to this function (or just leave it unset) +func NewUnnamedZapLogger(l *zap.Logger) Logger { + return &logger{zapLogger: l} +} + +func (rec *logger) ZapLogger() *zap.Logger { + return rec.zapLogger +} + +func (rec *logger) log(fields []gocql.LogField) *zap.Logger { + childLogger := rec.zapLogger + for _, field := range fields { + childLogger = childLogger.WithLazy(zapField(field)) + } + return childLogger +} + +func zapField(field gocql.LogField) zap.Field { + switch field.Value.LogFieldValueType() { + case gocql.LogFieldTypeBool: + return zap.Bool(field.Name, field.Value.Bool()) + case gocql.LogFieldTypeInt64: + return zap.Int64(field.Name, field.Value.Int64()) + case gocql.LogFieldTypeString: + return zap.String(field.Name, field.Value.String()) + default: + return zap.Any(field.Name, field.Value.Any()) + } +} + +func (rec *logger) Error(msg string, fields ...gocql.LogField) { + rec.log(fields).Error(msg) +} + +func (rec *logger) Warning(msg string, fields ...gocql.LogField) { + rec.log(fields).Warn(msg) +} + +func (rec *logger) Info(msg string, fields ...gocql.LogField) { + rec.log(fields).Info(msg) +} + +func (rec *logger) Debug(msg string, fields ...gocql.LogField) { + rec.log(fields).Debug(msg) +} diff --git a/gocqlzap/zap_test.go b/gocqlzap/zap_test.go new file mode 100644 index 000000000..7ac0fb6d3 --- /dev/null +++ b/gocqlzap/zap_test.go @@ -0,0 +1,80 @@ +//go:build all || unit +// +build all unit + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package gocqlzap + +import ( + "bytes" + "io" + "strings" + "testing" + + "go.uber.org/zap" + "go.uber.org/zap/zapcore" + + "github.com/gocql/gocql" +) + +const logLineEnding = "%%%\n%%%" + +func NewCustomLogger(pipeTo io.Writer) zapcore.Core { + cfg := zap.NewProductionEncoderConfig() + cfg.LineEnding = logLineEnding + return zapcore.NewCore( + zapcore.NewConsoleEncoder(cfg), + zapcore.AddSync(pipeTo), + zapcore.DebugLevel, + ) +} + +func TestGocqlZapLog(t *testing.T) { + b := &bytes.Buffer{} + logger := zap.New(NewCustomLogger(b)) + clusterCfg := gocql.NewCluster("0.0.0.1") + clusterCfg.Logger = NewZapLogger(logger) + clusterCfg.ProtoVersion = 4 + session, err := clusterCfg.CreateSession() + if err == nil { + session.Close() + t.Fatal("expected error creating session") + } + err = logger.Sync() + if err != nil { + t.Fatal("logger sync failed") + } + logOutput := strings.Split(b.String(), logLineEnding) + found := false + for _, logEntry := range logOutput { + if len(logEntry) == 0 { + continue + } + if !strings.Contains(logEntry, "info\tgocql\tControl connection failed to establish a connection to host.\t{\"host_addr\": "+ + "\"0.0.0.1\", \"port\": 9042, \"host_id\": \"\", \"err\": \"dial tcp 0.0.0.1:9042:") { + continue + } else { + found = true + break + } + } + if !found { + t.Fatal("log output didn't match expectations: ", strings.Join(logOutput, "\n")) + } +} diff --git a/gocqlzerolog/zerolog.go b/gocqlzerolog/zerolog.go new file mode 100644 index 000000000..ee2e70213 --- /dev/null +++ b/gocqlzerolog/zerolog.go @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package gocqlzerolog + +import ( + "github.com/rs/zerolog" + + "github.com/gocql/gocql" +) + +const DefaultName = "gocql" +const DefaultNameField = "logger" + +type Logger interface { + gocql.StructuredLogger + ZerologLogger() zerolog.Logger +} + +type logger struct { + zerologLogger zerolog.Logger +} + +// NewZerologLogger creates a new zerolog based logger with a global context containing a field +// with name "logger" and value "gocql", i.e.: +// +// l.With().Str("logger", "gocql").Logger() +func NewZerologLogger(l zerolog.Logger) Logger { + return &logger{zerologLogger: l.With().Str(DefaultNameField, DefaultName).Logger()} +} + +// NewUnnamedZerologLogger creates a new zerolog based logger without modifying its context like +// NewZerologLogger does. +func NewUnnamedZerologLogger(l zerolog.Logger) Logger { + return &logger{zerologLogger: l} +} + +func (rec *logger) ZerologLogger() zerolog.Logger { + return rec.zerologLogger +} + +func (rec *logger) log(event *zerolog.Event, fields ...gocql.LogField) *zerolog.Event { + for _, field := range fields { + event = zerologEvent(event, field) + } + return event +} + +func zerologEvent(event *zerolog.Event, field gocql.LogField) *zerolog.Event { + switch field.Value.LogFieldValueType() { + case gocql.LogFieldTypeBool: + return event.Bool(field.Name, field.Value.Bool()) + case gocql.LogFieldTypeInt64: + return event.Int64(field.Name, field.Value.Int64()) + case gocql.LogFieldTypeString: + return event.Str(field.Name, field.Value.String()) + default: + return event.Any(field.Name, field.Value.Any()) + } +} + +func (rec *logger) Error(msg string, fields ...gocql.LogField) { + rec.log(rec.zerologLogger.Error(), fields...).Msg(msg) +} + +func (rec *logger) Warning(msg string, fields ...gocql.LogField) { + rec.log(rec.zerologLogger.Warn(), fields...).Msg(msg) +} + +func (rec *logger) Info(msg string, fields ...gocql.LogField) { + rec.log(rec.zerologLogger.Info(), fields...).Msg(msg) +} + +func (rec *logger) Debug(msg string, fields ...gocql.LogField) { + rec.log(rec.zerologLogger.Debug(), fields...).Msg(msg) +} diff --git a/gocqlzerolog/zerolog_test.go b/gocqlzerolog/zerolog_test.go new file mode 100644 index 000000000..66c54961a --- /dev/null +++ b/gocqlzerolog/zerolog_test.go @@ -0,0 +1,70 @@ +//go:build all || unit +// +build all unit + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package gocqlzerolog + +import ( + "bytes" + "strings" + "testing" + + "github.com/rs/zerolog" + + "github.com/gocql/gocql" +) + +const logLineEnding = "%%%\n%%%" + +func TestGocqlZeroLog(t *testing.T) { + b := &bytes.Buffer{} + output := zerolog.ConsoleWriter{Out: b} + output.NoColor = true + output.FormatExtra = func(m map[string]interface{}, buffer *bytes.Buffer) error { + buffer.WriteString(logLineEnding) + return nil + } + logger := zerolog.New(output).Level(zerolog.DebugLevel) + clusterCfg := gocql.NewCluster("0.0.0.1") + clusterCfg.Logger = NewZerologLogger(logger) + clusterCfg.ProtoVersion = 4 + session, err := clusterCfg.CreateSession() + if err == nil { + session.Close() + t.Fatal("expected error creating session") + } + logOutput := strings.Split(b.String(), logLineEnding+"\n") + found := false + for _, logEntry := range logOutput { + if len(logEntry) == 0 { + continue + } + if !strings.Contains(logEntry, "Control connection failed to establish a connection to host.") || + !strings.Contains(logEntry, "host_addr=0.0.0.1 host_id= logger=gocql port=9042") { + continue + } else { + found = true + break + } + } + if !found { + t.Fatal("log output didn't match expectations: ", strings.Join(logOutput, "\n")) + } +} diff --git a/helpers.go b/helpers.go index a391a11ef..baae5ce05 100644 --- a/helpers.go +++ b/helpers.go @@ -25,6 +25,7 @@ package gocql import ( + "bytes" "fmt" "net" "reflect" @@ -202,3 +203,11 @@ func LookupIP(host string) ([]net.IP, error) { return net.LookupIP(host) } + +func ringString(hosts []*HostInfo) string { + buf := new(bytes.Buffer) + for _, h := range hosts { + buf.WriteString("[" + h.ConnectAddress().String() + "-" + h.HostID() + ":" + h.State().String() + "]") + } + return buf.String() +} diff --git a/helpers_test.go b/helpers_test.go index 45903a695..bcd727c50 100644 --- a/helpers_test.go +++ b/helpers_test.go @@ -1,3 +1,6 @@ +//go:build all || unit +// +build all unit + /* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file diff --git a/host_source.go b/host_source.go index ece5d3e5a..7e88cf9da 100644 --- a/host_source.go +++ b/host_source.go @@ -610,7 +610,7 @@ func (s *Session) hostInfoFromMap(row map[string]interface{}, host *HostInfo) (* // Not sure what the port field will be called until the JIRA issue is complete } - ip, port := s.cfg.translateAddressPort(host.ConnectAddress(), host.port) + ip, port := s.cfg.translateAddressPort(host.ConnectAddress(), host.port, s.logger) if !validIpAddr(ip) { return nil, fmt.Errorf("invalid host address (before translation: %v:%v, after translation: %v:%v)", host.ConnectAddress(), host.port, ip.String(), port) } @@ -687,8 +687,8 @@ func (r *ringDescriber) getClusterPeerInfo(localHost *HostInfo) ([]*HostInfo, er return nil, err } else if !isValidPeer(host) { // If it's not a valid peer - r.session.logger.Printf("Found invalid peer '%s' "+ - "Likely due to a gossip or snitch issue, this host will be ignored", host) + r.session.logger.Warning("Found invalid peer "+ + "likely due to a gossip or snitch issue, this host will be ignored.", newLogFieldStringer("host", host)) continue } @@ -760,6 +760,7 @@ func refreshRing(r *ringDescriber) error { } if host, ok := r.session.ring.addHostIfMissing(h); !ok { + r.session.logger.Info("Adding host.", newLogFieldIp("host_addr", h.ConnectAddress()), newLogFieldString("host_id", h.HostID())) r.session.startPoolFill(h) } else { // host (by hostID) already exists; determine if IP has changed @@ -778,6 +779,7 @@ func refreshRing(r *ringDescriber) error { if _, alreadyExists := r.session.ring.addHostIfMissing(h); alreadyExists { return fmt.Errorf("add new host=%s after removal: %w", h, ErrHostAlreadyExists) } + r.session.logger.Info("Adding host with new IP after removing old host.", newLogFieldIp("host_addr", h.ConnectAddress()), newLogFieldString("host_id", h.HostID())) // add new HostInfo (same hostID, new IP) r.session.startPoolFill(h) } @@ -791,6 +793,7 @@ func refreshRing(r *ringDescriber) error { r.session.metadata.setPartitioner(partitioner) r.session.policy.SetPartitioner(partitioner) + r.session.logger.Info("Refreshed ring.", newLogFieldString("ring", ringString(r.session.ring.allHosts()))) return nil } diff --git a/hostpool/hostpool.go b/hostpool/hostpool.go index f3a3d0f6f..c821a1c61 100644 --- a/hostpool/hostpool.go +++ b/hostpool/hostpool.go @@ -1,3 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package hostpool import ( diff --git a/hostpool/hostpool_test.go b/hostpool/hostpool_test.go index 10064339e..75e3cc563 100644 --- a/hostpool/hostpool_test.go +++ b/hostpool/hostpool_test.go @@ -1,6 +1,24 @@ //go:build all || unit // +build all unit +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package hostpool import ( diff --git a/logger.go b/logger.go index 1520111fc..e01bf6848 100644 --- a/logger.go +++ b/logger.go @@ -28,33 +28,379 @@ import ( "bytes" "fmt" "log" + "net" + "strconv" + "strings" + "sync" ) -type StdLogger interface { - Print(v ...interface{}) - Printf(format string, v ...interface{}) - Println(v ...interface{}) +// Deprecated: use StructuredLogger instead +type StdLogger interface{} + +func logHelper(logger StructuredLogger, level LogLevel, msg string, fields ...LogField) { + switch level { + case LogLevelDebug: + logger.Debug(msg, fields...) + case LogLevelInfo: + logger.Info(msg, fields...) + case LogLevelWarn: + logger.Warning(msg, fields...) + case LogLevelError: + logger.Error(msg, fields...) + default: + logger.Error("Unknown log level", newLogFieldInt("level", int(level)), newLogFieldString("msg", msg)) + } } type nopLogger struct{} -func (n nopLogger) Print(_ ...interface{}) {} +func (n nopLogger) Error(_ string, _ ...LogField) {} + +func (n nopLogger) Warning(_ string, _ ...LogField) {} + +func (n nopLogger) Info(_ string, _ ...LogField) {} -func (n nopLogger) Printf(_ string, _ ...interface{}) {} +func (n nopLogger) Debug(_ string, _ ...LogField) {} -func (n nopLogger) Println(_ ...interface{}) {} +var nopLoggerSingleton = &nopLogger{} type testLogger struct { - capture bytes.Buffer + logLevel LogLevel + capture bytes.Buffer + mu sync.Mutex +} + +func newTestLogger(logLevel LogLevel) *testLogger { + return &testLogger{logLevel: logLevel} +} + +func (l *testLogger) Error(msg string, fields ...LogField) { + if LogLevelError <= l.logLevel { + l.write("ERR gocql: ", msg, fields) + } +} + +func (l *testLogger) Warning(msg string, fields ...LogField) { + if LogLevelWarn <= l.logLevel { + l.write("WRN gocql: ", msg, fields) + } +} + +func (l *testLogger) Info(msg string, fields ...LogField) { + if LogLevelInfo <= l.logLevel { + l.write("INF gocql: ", msg, fields) + } +} + +func (l *testLogger) Debug(msg string, fields ...LogField) { + if LogLevelDebug <= l.logLevel { + l.write("DBG gocql: ", msg, fields) + } +} + +func (l *testLogger) write(prefix string, msg string, fields []LogField) { + buf := bytes.Buffer{} + writeLogMsg(&buf, prefix, msg, fields) + l.mu.Lock() + defer l.mu.Unlock() + l.capture.WriteString(buf.String() + "\n") +} + +func (l *testLogger) String() string { + l.mu.Lock() + defer l.mu.Unlock() + return l.capture.String() +} + +type defaultLogger struct { + logLevel LogLevel +} + +// NewLogger creates a StructuredLogger that uses the standard library log package. +// +// This logger will write log messages in the following format: +// +// gocql: = = +// +// LOG_LEVEL is always a 3 letter string: +// - DEBUG -> DBG +// - INFO -> INF +// - WARNING -> WRN +// - ERROR -> ERR +// +// Example: +// +// INF gocql: Adding host (session initialization). host_addr=127.0.0.1 host_id=a21dd06e-9e7e-4528-8ad7-039604e25e73 +func NewLogger(logLevel LogLevel) StructuredLogger { + return &defaultLogger{logLevel: logLevel} +} + +func (l *defaultLogger) Error(msg string, fields ...LogField) { + if LogLevelError <= l.logLevel { + l.write("ERR gocql: ", msg, fields) + } +} + +func (l *defaultLogger) Warning(msg string, fields ...LogField) { + if LogLevelWarn <= l.logLevel { + l.write("WRN gocql: ", msg, fields) + } +} + +func (l *defaultLogger) Info(msg string, fields ...LogField) { + if LogLevelInfo <= l.logLevel { + l.write("INF gocql: ", msg, fields) + } +} + +func (l *defaultLogger) Debug(msg string, fields ...LogField) { + if LogLevelDebug <= l.logLevel { + l.write("DBG gocql: ", msg, fields) + } +} + +func (l *defaultLogger) write(prefix string, msg string, fields []LogField) { + buf := bytes.Buffer{} + writeLogMsg(&buf, prefix, msg, fields) + log.Println(buf.String()) +} + +func writeFields(buf *bytes.Buffer, fields []LogField) { + for i, field := range fields { + if i > 0 { + buf.WriteRune(' ') + } + buf.WriteString(field.Name) + buf.WriteRune('=') + buf.WriteString(field.Value.String()) + } +} + +func writeLogMsg(buf *bytes.Buffer, prefix string, msg string, fields []LogField) { + buf.WriteString(prefix) + buf.WriteString(msg) + buf.WriteRune(' ') + writeFields(buf, fields) +} + +type LogLevel int + +const ( + LogLevelDebug = LogLevel(5) + LogLevelInfo = LogLevel(4) + LogLevelWarn = LogLevel(3) + LogLevelError = LogLevel(2) + LogLevelNone = LogLevel(0) +) + +func (recv LogLevel) String() string { + switch recv { + case LogLevelDebug: + return "debug" + case LogLevelInfo: + return "info" + case LogLevelWarn: + return "warn" + case LogLevelError: + return "error" + case LogLevelNone: + return "none" + default: + // fmt.sprintf allocates so use strings.Join instead + temp := [2]string{"invalid level ", strconv.Itoa(int(recv))} + return strings.Join(temp[:], "") + } +} + +type LogField struct { + Name string + Value LogFieldValue +} + +func newLogField(name string, value LogFieldValue) LogField { + return LogField{ + Name: name, + Value: value, + } } -func (l *testLogger) Print(v ...interface{}) { fmt.Fprint(&l.capture, v...) } -func (l *testLogger) Printf(format string, v ...interface{}) { fmt.Fprintf(&l.capture, format, v...) } -func (l *testLogger) Println(v ...interface{}) { fmt.Fprintln(&l.capture, v...) } -func (l *testLogger) String() string { return l.capture.String() } +func newLogFieldIp(name string, value net.IP) LogField { + var str string + if value == nil { + str = "" + } else { + str = value.String() + } + return newLogField(name, logFieldValueString(str)) +} + +func newLogFieldError(name string, value error) LogField { + var str string + if value != nil { + str = value.Error() + } + return newLogField(name, logFieldValueString(str)) +} + +func newLogFieldStringer(name string, value fmt.Stringer) LogField { + var str string + if value != nil { + str = value.String() + } + return newLogField(name, logFieldValueString(str)) +} + +func newLogFieldString(name string, value string) LogField { + return newLogField(name, logFieldValueString(value)) +} + +func newLogFieldInt(name string, value int) LogField { + return newLogField(name, logFieldValueInt64(int64(value))) +} + +func newLogFieldBool(name string, value bool) LogField { + return newLogField(name, logFieldValueBool(value)) +} + +type StructuredLogger interface { + Error(msg string, fields ...LogField) + Warning(msg string, fields ...LogField) + Info(msg string, fields ...LogField) + Debug(msg string, fields ...LogField) +} + +// A LogFieldValue can represent any Go value, but unlike type any, +// it can represent most small values without an allocation. +// The zero Value corresponds to nil. +type LogFieldValue struct { + num uint64 + any interface{} +} + +// LogFieldValueType is the type of a LogFieldValue. +type LogFieldValueType int + +// It's important that LogFieldTypeAny is 0 so that a zero Value represents nil. +const ( + LogFieldTypeAny LogFieldValueType = iota + LogFieldTypeBool + LogFieldTypeInt64 + LogFieldTypeString +) + +// LogFieldValueType returns v's LogFieldValueType. +func (v LogFieldValue) LogFieldValueType() LogFieldValueType { + switch x := v.any.(type) { + case LogFieldValueType: + return x + case string: + return LogFieldTypeString + default: + return LogFieldTypeAny + } +} + +func logFieldValueString(value string) LogFieldValue { + return LogFieldValue{any: value} +} -type defaultLogger struct{} +func logFieldValueInt(v int) LogFieldValue { + return logFieldValueInt64(int64(v)) +} + +func logFieldValueInt64(v int64) LogFieldValue { + return LogFieldValue{num: uint64(v), any: LogFieldTypeInt64} +} + +func logFieldValueBool(v bool) LogFieldValue { + u := uint64(0) + if v { + u = 1 + } + return LogFieldValue{num: u, any: LogFieldTypeBool} +} + +// Any returns v's value as an interface. +func (v LogFieldValue) Any() interface{} { + switch v.LogFieldValueType() { + case LogFieldTypeAny: + if k, ok := v.any.(LogFieldValueType); ok { + return k + } + return v.any + case LogFieldTypeInt64: + return int64(v.num) + case LogFieldTypeString: + return v.str() + case LogFieldTypeBool: + return v.bool() + default: + panic(fmt.Sprintf("bad value type: %s", v.LogFieldValueType())) + } +} + +// String returns LogFieldValue's value as a string, formatted like fmt.Sprint. +// +// Unlike the methods Int64 and Bool which panic if v is of the +// wrong LogFieldValueType, String never panics +// (i.e. it can be called for any LogFieldValueType, not just LogFieldTypeString) +func (v LogFieldValue) String() string { + return v.stringValue() +} + +func (v LogFieldValue) str() string { + return v.any.(string) +} -func (l *defaultLogger) Print(v ...interface{}) { log.Print(v...) } -func (l *defaultLogger) Printf(format string, v ...interface{}) { log.Printf(format, v...) } -func (l *defaultLogger) Println(v ...interface{}) { log.Println(v...) } +// Int64 returns v's value as an int64. It panics +// if v is not a signed integer. +func (v LogFieldValue) Int64() int64 { + if g, w := v.LogFieldValueType(), LogFieldTypeInt64; g != w { + panic(fmt.Sprintf("Value type is %s, not %s", g, w)) + } + return int64(v.num) +} + +// Bool returns v's value as a bool. It panics +// if v is not a bool. +func (v LogFieldValue) Bool() bool { + if g, w := v.LogFieldValueType(), LogFieldTypeBool; g != w { + panic(fmt.Sprintf("Value type is %s, not %s", g, w)) + } + return v.bool() +} + +func (v LogFieldValue) bool() bool { + return v.num == 1 +} + +// stringValue returns a text representation of v. +// v is formatted as with fmt.Sprint. +func (v LogFieldValue) stringValue() string { + switch v.LogFieldValueType() { + case LogFieldTypeString: + return v.str() + case LogFieldTypeInt64: + return strconv.FormatInt(int64(v.num), 10) + case LogFieldTypeBool: + return strconv.FormatBool(v.bool()) + case LogFieldTypeAny: + return fmt.Sprint(v.any) + default: + panic(fmt.Sprintf("bad value type: %s", v.LogFieldValueType())) + } +} + +var logFieldValueTypeStrings = []string{ + "Any", + "Bool", + "Int64", + "String", +} + +func (t LogFieldValueType) String() string { + if t >= 0 && int(t) < len(logFieldValueTypeStrings) { + return logFieldValueTypeStrings[t] + } + return "" +} diff --git a/metadata_test.go b/metadata_test.go index b6a7a88f2..e2af014ec 100644 --- a/metadata_test.go +++ b/metadata_test.go @@ -1,3 +1,6 @@ +//go:build all || unit +// +build all unit + // Copyright (c) 2015 The gocql Authors. All rights reserved. // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. @@ -40,7 +43,7 @@ func TestCompileMetadata(t *testing.T) { cfg: ClusterConfig{ ProtoVersion: 1, }, - logger: &defaultLogger{}, + logger: NewLogger(LogLevelNone), types: GlobalTypes, } // V2 test - V2+ protocol is simpler so here are some toy examples to verify that the mapping works @@ -431,7 +434,7 @@ func assertParseNonCompositeType( cfg: ClusterConfig{ ProtoVersion: 4, }, - logger: &defaultLogger{}, + logger: NewLogger(LogLevelNone), types: GlobalTypes, } result, err := parseType(session, def) @@ -472,7 +475,7 @@ func assertParseCompositeType( cfg: ClusterConfig{ ProtoVersion: 4, }, - logger: &defaultLogger{}, + logger: NewLogger(LogLevelNone), types: GlobalTypes, } result, err := parseType(session, def) diff --git a/policies.go b/policies.go index c9f3399fd..8db55bf2d 100644 --- a/policies.go +++ b/policies.go @@ -417,7 +417,7 @@ type tokenAwareHostPolicy struct { partitioner string metadata atomic.Value // *clusterMeta - logger StdLogger + logger StructuredLogger } func (t *tokenAwareHostPolicy) Init(s *Session) { @@ -560,7 +560,7 @@ func (t *tokenAwareHostPolicy) getMetadataForUpdate() *clusterMeta { // resetTokenRing creates a new tokenRing. // It must be called with t.mu locked. -func (m *clusterMeta) resetTokenRing(partitioner string, hosts []*HostInfo, logger StdLogger) { +func (m *clusterMeta) resetTokenRing(partitioner string, hosts []*HostInfo, logger StructuredLogger) { if partitioner == "" { // partitioner not yet set return @@ -569,7 +569,7 @@ func (m *clusterMeta) resetTokenRing(partitioner string, hosts []*HostInfo, logg // create a new token ring tokenRing, err := newTokenRing(partitioner, hosts) if err != nil { - logger.Printf("Unable to update the token ring due to error: %s", err) + logger.Warning("Unable to update the token ring due to error.", newLogFieldError("err", err)) return } diff --git a/policies_test.go b/policies_test.go index 540742a0f..ab40a62a6 100644 --- a/policies_test.go +++ b/policies_test.go @@ -1,3 +1,6 @@ +//go:build all || unit +// +build all unit + // Copyright (c) 2015 The gocql Authors. All rights reserved. // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. diff --git a/ring.go b/ring.go index 1dbf07c48..d11162264 100644 --- a/ring.go +++ b/ring.go @@ -94,12 +94,12 @@ func (r *ring) currentHosts() map[string]*HostInfo { return hosts } -func (r *ring) addOrUpdate(host *HostInfo) *HostInfo { - if existingHost, ok := r.addHostIfMissing(host); ok { +func (r *ring) addOrUpdate(host *HostInfo) (*HostInfo, bool) { + existingHost, ok := r.addHostIfMissing(host) + if ok { existingHost.update(host) - host = existingHost } - return host + return existingHost, ok } func (r *ring) addHostIfMissing(host *HostInfo) (*HostInfo, bool) { diff --git a/ring_test.go b/ring_test.go index 3e9533ecd..8d1c094e8 100644 --- a/ring_test.go +++ b/ring_test.go @@ -1,3 +1,6 @@ +//go:build all || unit +// +build all unit + /* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file diff --git a/session.go b/session.go index dcd0b6207..45206cd04 100644 --- a/session.go +++ b/session.go @@ -101,17 +101,17 @@ type Session struct { // you can use initialized() to read the value. isInitialized bool - logger StdLogger + logger StructuredLogger } -func addrsToHosts(addrs []string, defaultPort int, logger StdLogger) ([]*HostInfo, error) { +func addrsToHosts(addrs []string, defaultPort int, logger StructuredLogger) ([]*HostInfo, error) { var hosts []*HostInfo for _, hostaddr := range addrs { resolvedHosts, err := hostInfo(hostaddr, defaultPort) if err != nil { // Try other hosts if unable to resolve DNS name if _, ok := err.(*net.DNSError); ok { - logger.Printf("gocql: dns error: %v\n", err) + logger.Error("DNS error.", newLogFieldError("err", err)) continue } return nil, err @@ -153,7 +153,7 @@ func NewSession(cfg ClusterConfig) (*Session, error) { connectObserver: cfg.ConnectObserver, ctx: ctx, cancel: cancel, - logger: cfg.logger(), + logger: cfg.newLogger(), trace: cfg.Tracer, } if cfg.RegisteredTypes == nil { @@ -234,9 +234,10 @@ func (s *Session) init() error { // TODO(zariel): we really only need this in 1 place s.cfg.ProtoVersion = proto s.connCfg.ProtoVersion = proto + s.logger.Info("Discovered protocol version.", newLogFieldInt("protocol_version", proto)) } - if err := s.control.connect(hosts); err != nil { + if err := s.control.connect(hosts, true); err != nil { return err } @@ -255,6 +256,9 @@ func (s *Session) init() error { } hosts = filteredHosts + s.logger.Info("Refreshed ring.", newLogFieldString("ring", ringString(hosts))) + } else { + s.logger.Info("Not performing a ring refresh because DisableInitialHostLookup is true.") } } @@ -285,10 +289,14 @@ func (s *Session) init() error { // again atomic.AddInt64(&left, 1) for _, host := range hostMap { - host := s.ring.addOrUpdate(host) + host, exists := s.ring.addOrUpdate(host) if s.cfg.filterHost(host) { continue } + if !exists { + s.logger.Info("Adding host (session initialization).", + newLogFieldIp("host_addr", host.ConnectAddress()), newLogFieldString("host_id", host.HostID())) + } atomic.AddInt64(&left, 1) go func() { @@ -366,6 +374,7 @@ func (s *Session) init() error { s.isInitialized = true s.sessionStateMu.Unlock() + s.logger.Info("Session initialized successfully.") return nil } @@ -391,21 +400,20 @@ func (s *Session) reconnectDownedHosts(intv time.Duration) { for { select { case <-reconnectTicker.C: + s.logger.Debug("Connecting to downed hosts if there is any.") hosts := s.ring.allHosts() // Print session.ring for debug. - if gocqlDebug { - buf := bytes.NewBufferString("Session.ring:") - for _, h := range hosts { - buf.WriteString("[" + h.ConnectAddress().String() + ":" + h.State().String() + "]") - } - s.logger.Println(buf.String()) - } + s.logger.Debug("Logging current ring state.", newLogFieldString("ring", ringString(hosts))) for _, h := range hosts { if h.IsUp() { continue } + s.logger.Debug("Reconnecting to downed host.", + newLogFieldIp("host_addr", h.ConnectAddress()), + newLogFieldInt("host_port", h.Port()), + newLogFieldString("host_id", h.HostID())) // we let the pool call handleNodeConnected to change the host state s.pool.addHost(h) } @@ -524,6 +532,7 @@ func (s *Session) executeQuery(qry *internalQuery) (it *Iter) { } func (s *Session) removeHost(h *HostInfo) { + s.logger.Warning("Removing host.", newLogFieldIp("host_addr", h.ConnectAddress()), newLogFieldString("host_id", h.HostID())) s.policy.RemoveHost(h) hostID := h.HostID() s.pool.removeHost(hostID) diff --git a/session_connect_test.go b/session_connect_test.go index d2097b637..83214b5e7 100644 --- a/session_connect_test.go +++ b/session_connect_test.go @@ -1,3 +1,6 @@ +//go:build all || unit +// +build all unit + /* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file diff --git a/session_test.go b/session_test.go index d414d6f41..3fcb21630 100644 --- a/session_test.go +++ b/session_test.go @@ -40,7 +40,7 @@ func TestSessionAPI(t *testing.T) { cfg: *cfg, cons: Quorum, policy: RoundRobinHostPolicy(), - logger: cfg.logger(), + logger: cfg.newLogger(), } defer s.Close() @@ -168,7 +168,7 @@ func TestBatchBasicAPI(t *testing.T) { s := &Session{ cfg: *cfg, cons: Quorum, - logger: cfg.logger(), + logger: cfg.newLogger(), } defer s.Close() diff --git a/snappy/compressor_test.go b/snappy/compressor_test.go index 3efe3fa70..b87eb5bf2 100644 --- a/snappy/compressor_test.go +++ b/snappy/compressor_test.go @@ -1,3 +1,6 @@ +//go:build all || unit +// +build all unit + /* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file diff --git a/topology.go b/topology.go index 2fc38a887..5a9b50fa3 100644 --- a/topology.go +++ b/topology.go @@ -90,12 +90,13 @@ func getReplicationFactorFromOpts(val interface{}) (int, error) { } } -func getStrategy(ks *KeyspaceMetadata, logger StdLogger) placementStrategy { +func getStrategy(ks *KeyspaceMetadata, logger StructuredLogger) placementStrategy { switch { case strings.Contains(ks.StrategyClass, "SimpleStrategy"): rf, err := getReplicationFactorFromOpts(ks.StrategyOptions["replication_factor"]) if err != nil { - logger.Printf("parse rf for keyspace %q: %v", ks.Name, err) + logger.Warning("Failed to parse replication factor of keyspace configured with SimpleStrategy.", + newLogFieldString("keyspace", ks.Name), newLogFieldError("err", err)) return nil } return &simpleStrategy{rf: rf} @@ -108,7 +109,8 @@ func getStrategy(ks *KeyspaceMetadata, logger StdLogger) placementStrategy { rf, err := getReplicationFactorFromOpts(rf) if err != nil { - logger.Println("parse rf for keyspace %q, dc %q: %v", err) + logger.Warning("Failed to parse replication factors of keyspace configured with NetworkTopologyStrategy.", + newLogFieldString("keyspace", ks.Name), newLogFieldString("dc", dc), newLogFieldError("err", err)) // skip DC if the rf is invalid/unsupported, so that we can at least work with other working DCs. continue } @@ -119,7 +121,8 @@ func getStrategy(ks *KeyspaceMetadata, logger StdLogger) placementStrategy { case strings.Contains(ks.StrategyClass, "LocalStrategy"): return nil default: - logger.Printf("parse rf for keyspace %q: unsupported strategy class: %v", ks.StrategyClass) + logger.Warning("Failed to parse replication factor of keyspace due to unknown strategy class.", + newLogFieldString("keyspace", ks.Name), newLogFieldString("strategy_class", ks.StrategyClass)) return nil } } diff --git a/topology_test.go b/topology_test.go index fe8473e98..9a5cf1c10 100644 --- a/topology_test.go +++ b/topology_test.go @@ -1,3 +1,6 @@ +//go:build all || unit +// +build all unit + /* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file