Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Removed

- Drop support for old CQL protocol versions: 1 and 2 (CASSGO-75)
- Cleanup of deprecated elements (CASSGO-12)
- Remove global NewBatch function (CASSGO-15)
- Remove deprecated global logger (CASSGO-24)

### Added

Expand All @@ -27,26 +30,24 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

- Move lz4 compressor to lz4 package within the gocql module (CASSGO-32)
- Don't restrict server authenticator unless PasswordAuthentictor.AllowedAuthenticators is provided (CASSGO-19)
- Cleanup of deprecated elements (CASSGO-12)
- Remove global NewBatch function (CASSGO-15)
- Detailed description for NumConns (CASSGO-3)
- Change Batch API to be consistent with Query() (CASSGO-7)
- Added Cassandra 4.0 table options support(CASSGO-13)
- Remove deprecated global logger (CASSGO-24)
- Added Cassandra 4.0 table options support (CASSGO-13)
- Bumped actions/upload-artifact and actions/cache versions to v4 in CI workflow (CASSGO-48)
- Keep nil slices in MapScan (CASSGO-44)
- Improve error messages for marshalling (CASSGO-38)
- Remove HostPoolHostPolicy from gocql package (CASSGO-21)
- Standardized spelling of datacenter (CASSGO-35)
- Refactor HostInfo creation and ConnectAddress() method (CASSGO-45)
- gocql.Compressor interface changes to follow append-like design. Bumped Go version to 1.19 (CASSGO-1)
- gocql.Compressor interface changes to follow append-like design (CASSGO-1)
- Refactoring hostpool package test and Expose HostInfo creation (CASSGO-59)
- Move "execute batch" methods to Batch type (CASSGO-57)
- Make `Session` immutable by removing setters and associated mutex (CASSGO-23)
- inet columns default to net.IP when using MapScan or SliceMap (CASSGO-43)
- NativeType removed (CASSGO-43)
- `New` and `NewWithError` removed and replaced with `Zero` (CASSGO-43)
- Changes to Query and Batch to make them safely reusable (CASSGO-22)
- Change logger interface so it supports structured logging and log levels (CASSGO-9)

### Fixed

Expand Down
3 changes: 3 additions & 0 deletions address_translators_test.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
//go:build all || unit
// +build all unit

/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
Expand Down
28 changes: 17 additions & 11 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,9 +259,15 @@ type ClusterConfig struct {
// If not provided, Dialer will be used instead.
HostDialer HostDialer

// Logger for this ClusterConfig.
// If not specified, defaults to the gocql.defaultLogger.
Logger StdLogger
// StructuredLogger for this ClusterConfig.
//
// There are 3 built in implementations of StructuredLogger:
// - std library "log" package: gocql.NewLogger
// - zerolog: gocqlzerolog.NewZerologLogger
// - zap: gocqlzap.NewZapLogger
//
// You can also provide your own logger implementation of the StructuredLogger interface.
Logger StructuredLogger

// Tracer will be used for all queries. Alternatively it can be set of on a
// per query basis.
Expand Down Expand Up @@ -318,11 +324,11 @@ func NewCluster(hosts ...string) *ClusterConfig {
return cfg
}

func (cfg *ClusterConfig) logger() StdLogger {
if cfg.Logger == nil {
return &defaultLogger{}
func (cfg *ClusterConfig) newLogger() StructuredLogger {
if cfg.Logger != nil {
return cfg.Logger
}
return cfg.Logger
return NewLogger(LogLevelNone)
}

// CreateSession initializes the cluster based on this config and returns a
Expand All @@ -335,14 +341,14 @@ func (cfg *ClusterConfig) CreateSession() (*Session, error) {
// if defined, to translate the given address and port into a possibly new address
// and port, If no AddressTranslator or if an error occurs, the given address and
// port will be returned.
func (cfg *ClusterConfig) translateAddressPort(addr net.IP, port int) (net.IP, int) {
func (cfg *ClusterConfig) translateAddressPort(addr net.IP, port int, logger StructuredLogger) (net.IP, int) {
if cfg.AddressTranslator == nil || len(addr) == 0 {
return addr, port
}
newAddr, newPort := cfg.AddressTranslator.Translate(addr, port)
if gocqlDebug {
cfg.logger().Printf("gocql: translating address '%v:%d' to '%v:%d'", addr, port, newAddr, newPort)
}
logger.Debug("Translating address.",
newLogFieldIp("old_addr", addr), newLogFieldInt("old_port", port),
newLogFieldIp("new_addr", newAddr), newLogFieldInt("new_port", newPort))
return newAddr, newPort
}

Expand Down
9 changes: 6 additions & 3 deletions cluster_test.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
//go:build all || unit
// +build all unit

/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
Expand Down Expand Up @@ -60,23 +63,23 @@ func TestNewCluster_WithHosts(t *testing.T) {
func TestClusterConfig_translateAddressAndPort_NilTranslator(t *testing.T) {
cfg := NewCluster()
assertNil(t, "cluster config address translator", cfg.AddressTranslator)
newAddr, newPort := cfg.translateAddressPort(net.ParseIP("10.0.0.1"), 1234)
newAddr, newPort := cfg.translateAddressPort(net.ParseIP("10.0.0.1"), 1234, nopLoggerSingleton)
assertTrue(t, "same address as provided", net.ParseIP("10.0.0.1").Equal(newAddr))
assertEqual(t, "translated host and port", 1234, newPort)
}

func TestClusterConfig_translateAddressAndPort_EmptyAddr(t *testing.T) {
cfg := NewCluster()
cfg.AddressTranslator = staticAddressTranslator(net.ParseIP("10.10.10.10"), 5432)
newAddr, newPort := cfg.translateAddressPort(net.IP([]byte{}), 0)
newAddr, newPort := cfg.translateAddressPort(net.IP([]byte{}), 0, nopLoggerSingleton)
assertTrue(t, "translated address is still empty", len(newAddr) == 0)
assertEqual(t, "translated port", 0, newPort)
}

func TestClusterConfig_translateAddressAndPort_Success(t *testing.T) {
cfg := NewCluster()
cfg.AddressTranslator = staticAddressTranslator(net.ParseIP("10.10.10.10"), 5432)
newAddr, newPort := cfg.translateAddressPort(net.ParseIP("10.0.0.1"), 2345)
newAddr, newPort := cfg.translateAddressPort(net.ParseIP("10.0.0.1"), 2345, nopLoggerSingleton)
assertTrue(t, "translated address", net.ParseIP("10.10.10.10").Equal(newAddr))
assertEqual(t, "translated port", 5432, newPort)
}
5 changes: 4 additions & 1 deletion common_test.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
//go:build all || unit || integration || ccm || cassandra
// +build all unit integration ccm cassandra

/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
Expand Down Expand Up @@ -85,7 +88,7 @@ func addSslOptions(cluster *ClusterConfig) *ClusterConfig {
var initOnce sync.Once

func createTable(s *Session, table string) error {
// lets just be really sure
// let's just be really sure
if err := s.control.awaitSchemaAgreement(); err != nil {
log.Printf("error waiting for schema agreement pre create table=%q err=%v\n", table, err)
return err
Expand Down
24 changes: 12 additions & 12 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,19 +144,12 @@ type ConnConfig struct {
Authenticator Authenticator
AuthProvider func(h *HostInfo) (Authenticator, error)
Keepalive time.Duration
Logger StdLogger
Logger StructuredLogger

tlsConfig *tls.Config
disableCoalesce bool
}

func (c *ConnConfig) logger() StdLogger {
if c.Logger == nil {
return &defaultLogger{}
}
return c.Logger
}

type ConnErrorHandler interface {
HandleError(conn *Conn, err error, closed bool)
}
Expand Down Expand Up @@ -208,7 +201,7 @@ type Conn struct {

timeouts int64

logger StdLogger
logger StructuredLogger
}

// connect establishes a connection to a Cassandra node using session's connection config.
Expand Down Expand Up @@ -715,7 +708,7 @@ func (c *Conn) processFrame(ctx context.Context, r io.Reader) error {
delete(c.calls, head.stream)
c.mu.Unlock()
if call == nil || !ok {
c.logger.Printf("gocql: received response for stream which has no handler: header=%v\n", head)
c.logger.Warning("Received response for stream which has no handler.", newLogFieldString("header", head.String()))
return c.discardFrame(r, head)
} else if head.stream != call.streamID {
panic(fmt.Sprintf("call has incorrect streamID: got %d expected %d", call.streamID, head.stream))
Expand Down Expand Up @@ -1330,12 +1323,19 @@ func (c *Conn) execInternal(ctx context.Context, req frameBuilder, tracer Tracer
return resp.framer, nil
case <-timeoutCh:
close(call.timeout)
c.logger.Debug("Request timed out on connection.",
newLogFieldString("host_id", c.host.HostID()), newLogFieldIp("addr", c.host.ConnectAddress()))
c.handleTimeout()
return nil, ErrTimeoutNoResponse
case <-ctxDone:
c.logger.Debug("Request failed because context elapsed out on connection.",
newLogFieldString("host_id", c.host.HostID()), newLogFieldIp("addr", c.host.ConnectAddress()),
newLogFieldError("ctx_err", ctx.Err()))
close(call.timeout)
return nil, ctx.Err()
case <-c.ctx.Done():
c.logger.Debug("Request failed because connection closed.",
newLogFieldString("host_id", c.host.HostID()), newLogFieldIp("addr", c.host.ConnectAddress()))
close(call.timeout)
return nil, ErrConnectionClosed
}
Expand Down Expand Up @@ -1685,7 +1685,7 @@ func (c *Conn) executeQuery(ctx context.Context, q *internalQuery) *Iter {
iter.framer = framer
if err := c.awaitSchemaAgreement(ctx); err != nil {
// TODO: should have this behind a flag
c.logger.Println(err)
c.logger.Warning("Error while awaiting for schema agreement after a schema change event.", newLogFieldError("err", err))
}
// dont return an error from this, might be a good idea to give a warning
// though. The impact of this returning an error would be that the cluster
Expand Down Expand Up @@ -1947,7 +1947,7 @@ func (c *Conn) awaitSchemaAgreement(ctx context.Context) (err error) {
goto cont
}
if !isValidPeer(host) || host.schemaVersion == "" {
c.logger.Printf("invalid peer or peer with empty schema_version: peer=%q", host)
c.logger.Warning("Invalid peer or peer with empty schema_version.", newLogFieldIp("peer", host.ConnectAddress()))
continue
}

Expand Down
34 changes: 19 additions & 15 deletions conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"math/rand"
"net"
"os"
"strconv"
"strings"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -184,7 +185,7 @@ func newTestSession(proto protoVersion, addresses ...string) (*Session, error) {
}

func TestDNSLookupConnected(t *testing.T) {
log := &testLogger{}
log := newTestLogger(LogLevelDebug)

// Override the defaul DNS resolver and restore at the end
failDNS = true
Expand All @@ -205,13 +206,13 @@ func TestDNSLookupConnected(t *testing.T) {
t.Fatal("CreateSession() should have connected")
}

if !strings.Contains(log.String(), "gocql: dns error") {
if !strings.Contains(log.String(), "gocql: DNS error") {
t.Fatalf("Expected to receive dns error log message - got '%s' instead", log.String())
}
}

func TestDNSLookupError(t *testing.T) {
log := &testLogger{}
log := newTestLogger(LogLevelDebug)

// Override the defaul DNS resolver and restore at the end
failDNS = true
Expand All @@ -229,7 +230,7 @@ func TestDNSLookupError(t *testing.T) {
t.Fatal("CreateSession() should have returned an error")
}

if !strings.Contains(log.String(), "gocql: dns error") {
if !strings.Contains(log.String(), "gocql: DNS error") {
t.Fatalf("Expected to receive dns error log message - got '%s' instead", log.String())
}

Expand All @@ -240,7 +241,7 @@ func TestDNSLookupError(t *testing.T) {

func TestStartupTimeout(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
log := &testLogger{}
log := newTestLogger(LogLevelDebug)

srv := NewTestServer(t, defaultProto, ctx)
defer srv.Stop()
Expand Down Expand Up @@ -348,17 +349,20 @@ func TestCancel(t *testing.T) {

type testQueryObserver struct {
metrics map[string]*hostMetrics
verbose bool
logger StdLogger
logger StructuredLogger
}

func (o *testQueryObserver) ObserveQuery(ctx context.Context, q ObservedQuery) {
host := q.Host.ConnectAddress().String()
o.metrics[host] = q.Metrics
if o.verbose {
o.logger.Printf("Observed query %q. Returned %v rows, took %v on host %q with %v attempts and total latency %v. Error: %q\n",
q.Statement, q.Rows, q.End.Sub(q.Start), host, q.Metrics.Attempts, q.Metrics.TotalLatency, q.Err)
}
o.logger.Debug("Observed query.",
newLogFieldString("stmt", q.Statement),
newLogFieldInt("rows", q.Rows),
newLogFieldString("duration", q.End.Sub(q.Start).String()),
newLogFieldString("host", host),
newLogFieldInt("attempts", q.Metrics.Attempts),
newLogFieldString("latency", strconv.FormatInt(q.Metrics.TotalLatency, 10)),
newLogFieldError("err", q.Err))
}

func (o *testQueryObserver) GetMetrics(host *HostInfo) *hostMetrics {
Expand Down Expand Up @@ -411,7 +415,7 @@ func TestQueryRetry(t *testing.T) {
}

func TestQueryMultinodeWithMetrics(t *testing.T) {
log := &testLogger{}
log := newTestLogger(LogLevelNone)
defer func() {
os.Stdout.WriteString(log.String())
}()
Expand Down Expand Up @@ -439,7 +443,7 @@ func TestQueryMultinodeWithMetrics(t *testing.T) {

// 1 retry per host
rt := &SimpleRetryPolicy{NumRetries: 3}
observer := &testQueryObserver{metrics: make(map[string]*hostMetrics), verbose: false, logger: log}
observer := &testQueryObserver{metrics: make(map[string]*hostMetrics), logger: log}
qry := db.Query("kill").RetryPolicy(rt).Observer(observer).Idempotent(true)
iter := qry.Iter()
err = iter.Close()
Expand Down Expand Up @@ -488,7 +492,7 @@ func (t *testRetryPolicy) GetRetryType(err error) RetryType {
}

func TestSpeculativeExecution(t *testing.T) {
log := &testLogger{}
log := newTestLogger(LogLevelDebug)
defer func() {
os.Stdout.WriteString(log.String())
}()
Expand Down Expand Up @@ -724,7 +728,7 @@ func TestStream0(t *testing.T) {
session: &Session{
types: GlobalTypes,
},
logger: &defaultLogger{},
logger: NewLogger(LogLevelNone),
}

err := conn.recv(context.Background(), false)
Expand Down
Loading