Skip to content

Commit acec4c5

Browse files
committed
Change logger so it supports structured logging and log levels
The logger interface uses printf semantics and lacks the notion of log levels. This commit changes (breaking change) the logger interface so printf semantics are removed in favor of structured logging and adds log levels. It provides 3 built in log implementations: - Default logger - uses log standard library - gocqlzap - uses zap library - gocqlzerolog - uses zerolog library Users can use these implementations as is or they can implement their own using these as examples. Patch by João Reis; reviewed by James Hartig, Bohdan Siryk for CASSGO-9
1 parent f5fe483 commit acec4c5

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

42 files changed

+1034
-220
lines changed

CHANGELOG.md

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1010
### Removed
1111

1212
- Drop support for old CQL protocol versions: 1 and 2 (CASSGO-75)
13+
- Cleanup of deprecated elements (CASSGO-12)
14+
- Remove global NewBatch function (CASSGO-15)
15+
- Remove deprecated global logger (CASSGO-24)
1316

1417
### Added
1518

@@ -27,26 +30,24 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
2730

2831
- Move lz4 compressor to lz4 package within the gocql module (CASSGO-32)
2932
- Don't restrict server authenticator unless PasswordAuthentictor.AllowedAuthenticators is provided (CASSGO-19)
30-
- Cleanup of deprecated elements (CASSGO-12)
31-
- Remove global NewBatch function (CASSGO-15)
3233
- Detailed description for NumConns (CASSGO-3)
3334
- Change Batch API to be consistent with Query() (CASSGO-7)
34-
- Added Cassandra 4.0 table options support(CASSGO-13)
35-
- Remove deprecated global logger (CASSGO-24)
35+
- Added Cassandra 4.0 table options support (CASSGO-13)
3636
- Bumped actions/upload-artifact and actions/cache versions to v4 in CI workflow (CASSGO-48)
3737
- Keep nil slices in MapScan (CASSGO-44)
3838
- Improve error messages for marshalling (CASSGO-38)
3939
- Remove HostPoolHostPolicy from gocql package (CASSGO-21)
4040
- Standardized spelling of datacenter (CASSGO-35)
4141
- Refactor HostInfo creation and ConnectAddress() method (CASSGO-45)
42-
- gocql.Compressor interface changes to follow append-like design. Bumped Go version to 1.19 (CASSGO-1)
42+
- gocql.Compressor interface changes to follow append-like design (CASSGO-1)
4343
- Refactoring hostpool package test and Expose HostInfo creation (CASSGO-59)
4444
- Move "execute batch" methods to Batch type (CASSGO-57)
4545
- Make `Session` immutable by removing setters and associated mutex (CASSGO-23)
4646
- inet columns default to net.IP when using MapScan or SliceMap (CASSGO-43)
4747
- NativeType removed (CASSGO-43)
4848
- `New` and `NewWithError` removed and replaced with `Zero` (CASSGO-43)
4949
- Changes to Query and Batch to make them safely reusable (CASSGO-22)
50+
- Change logger interface so it supports structured logging and log levels (CASSGO-9)
5051

5152
### Fixed
5253

address_translators_test.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
//go:build all || unit
2+
// +build all unit
3+
14
/*
25
* Licensed to the Apache Software Foundation (ASF) under one
36
* or more contributor license agreements. See the NOTICE file

cluster.go

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -259,9 +259,15 @@ type ClusterConfig struct {
259259
// If not provided, Dialer will be used instead.
260260
HostDialer HostDialer
261261

262-
// Logger for this ClusterConfig.
263-
// If not specified, defaults to the gocql.defaultLogger.
264-
Logger StdLogger
262+
// StructuredLogger for this ClusterConfig.
263+
//
264+
// There are 3 built in implementations of StructuredLogger:
265+
// - std library "log" package: gocql.NewLogger
266+
// - zerolog: gocqlzerolog.NewZerologLogger
267+
// - zap: gocqlzap.NewZapLogger
268+
//
269+
// You can also provide your own logger implementation of the StructuredLogger interface.
270+
Logger StructuredLogger
265271

266272
// Tracer will be used for all queries. Alternatively it can be set of on a
267273
// per query basis.
@@ -318,11 +324,11 @@ func NewCluster(hosts ...string) *ClusterConfig {
318324
return cfg
319325
}
320326

321-
func (cfg *ClusterConfig) logger() StdLogger {
322-
if cfg.Logger == nil {
323-
return &defaultLogger{}
327+
func (cfg *ClusterConfig) newLogger() StructuredLogger {
328+
if cfg.Logger != nil {
329+
return cfg.Logger
324330
}
325-
return cfg.Logger
331+
return NewLogger(LogLevelNone)
326332
}
327333

328334
// CreateSession initializes the cluster based on this config and returns a
@@ -335,14 +341,14 @@ func (cfg *ClusterConfig) CreateSession() (*Session, error) {
335341
// if defined, to translate the given address and port into a possibly new address
336342
// and port, If no AddressTranslator or if an error occurs, the given address and
337343
// port will be returned.
338-
func (cfg *ClusterConfig) translateAddressPort(addr net.IP, port int) (net.IP, int) {
344+
func (cfg *ClusterConfig) translateAddressPort(addr net.IP, port int, logger StructuredLogger) (net.IP, int) {
339345
if cfg.AddressTranslator == nil || len(addr) == 0 {
340346
return addr, port
341347
}
342348
newAddr, newPort := cfg.AddressTranslator.Translate(addr, port)
343-
if gocqlDebug {
344-
cfg.logger().Printf("gocql: translating address '%v:%d' to '%v:%d'", addr, port, newAddr, newPort)
345-
}
349+
logger.Debug("Translating address.",
350+
newLogFieldIp("old_addr", addr), newLogFieldInt("old_port", port),
351+
newLogFieldIp("new_addr", newAddr), newLogFieldInt("new_port", newPort))
346352
return newAddr, newPort
347353
}
348354

cluster_test.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
//go:build all || unit
2+
// +build all unit
3+
14
/*
25
* Licensed to the Apache Software Foundation (ASF) under one
36
* or more contributor license agreements. See the NOTICE file
@@ -60,23 +63,23 @@ func TestNewCluster_WithHosts(t *testing.T) {
6063
func TestClusterConfig_translateAddressAndPort_NilTranslator(t *testing.T) {
6164
cfg := NewCluster()
6265
assertNil(t, "cluster config address translator", cfg.AddressTranslator)
63-
newAddr, newPort := cfg.translateAddressPort(net.ParseIP("10.0.0.1"), 1234)
66+
newAddr, newPort := cfg.translateAddressPort(net.ParseIP("10.0.0.1"), 1234, nopLoggerSingleton)
6467
assertTrue(t, "same address as provided", net.ParseIP("10.0.0.1").Equal(newAddr))
6568
assertEqual(t, "translated host and port", 1234, newPort)
6669
}
6770

6871
func TestClusterConfig_translateAddressAndPort_EmptyAddr(t *testing.T) {
6972
cfg := NewCluster()
7073
cfg.AddressTranslator = staticAddressTranslator(net.ParseIP("10.10.10.10"), 5432)
71-
newAddr, newPort := cfg.translateAddressPort(net.IP([]byte{}), 0)
74+
newAddr, newPort := cfg.translateAddressPort(net.IP([]byte{}), 0, nopLoggerSingleton)
7275
assertTrue(t, "translated address is still empty", len(newAddr) == 0)
7376
assertEqual(t, "translated port", 0, newPort)
7477
}
7578

7679
func TestClusterConfig_translateAddressAndPort_Success(t *testing.T) {
7780
cfg := NewCluster()
7881
cfg.AddressTranslator = staticAddressTranslator(net.ParseIP("10.10.10.10"), 5432)
79-
newAddr, newPort := cfg.translateAddressPort(net.ParseIP("10.0.0.1"), 2345)
82+
newAddr, newPort := cfg.translateAddressPort(net.ParseIP("10.0.0.1"), 2345, nopLoggerSingleton)
8083
assertTrue(t, "translated address", net.ParseIP("10.10.10.10").Equal(newAddr))
8184
assertEqual(t, "translated port", 5432, newPort)
8285
}

common_test.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
//go:build all || unit || integration || ccm || cassandra
2+
// +build all unit integration ccm cassandra
3+
14
/*
25
* Licensed to the Apache Software Foundation (ASF) under one
36
* or more contributor license agreements. See the NOTICE file
@@ -85,7 +88,7 @@ func addSslOptions(cluster *ClusterConfig) *ClusterConfig {
8588
var initOnce sync.Once
8689

8790
func createTable(s *Session, table string) error {
88-
// lets just be really sure
91+
// let's just be really sure
8992
if err := s.control.awaitSchemaAgreement(); err != nil {
9093
log.Printf("error waiting for schema agreement pre create table=%q err=%v\n", table, err)
9194
return err

conn.go

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -144,19 +144,12 @@ type ConnConfig struct {
144144
Authenticator Authenticator
145145
AuthProvider func(h *HostInfo) (Authenticator, error)
146146
Keepalive time.Duration
147-
Logger StdLogger
147+
Logger StructuredLogger
148148

149149
tlsConfig *tls.Config
150150
disableCoalesce bool
151151
}
152152

153-
func (c *ConnConfig) logger() StdLogger {
154-
if c.Logger == nil {
155-
return &defaultLogger{}
156-
}
157-
return c.Logger
158-
}
159-
160153
type ConnErrorHandler interface {
161154
HandleError(conn *Conn, err error, closed bool)
162155
}
@@ -208,7 +201,7 @@ type Conn struct {
208201

209202
timeouts int64
210203

211-
logger StdLogger
204+
logger StructuredLogger
212205
}
213206

214207
// connect establishes a connection to a Cassandra node using session's connection config.
@@ -715,7 +708,7 @@ func (c *Conn) processFrame(ctx context.Context, r io.Reader) error {
715708
delete(c.calls, head.stream)
716709
c.mu.Unlock()
717710
if call == nil || !ok {
718-
c.logger.Printf("gocql: received response for stream which has no handler: header=%v\n", head)
711+
c.logger.Warning("Received response for stream which has no handler.", newLogFieldString("header", head.String()))
719712
return c.discardFrame(r, head)
720713
} else if head.stream != call.streamID {
721714
panic(fmt.Sprintf("call has incorrect streamID: got %d expected %d", call.streamID, head.stream))
@@ -1330,12 +1323,19 @@ func (c *Conn) execInternal(ctx context.Context, req frameBuilder, tracer Tracer
13301323
return resp.framer, nil
13311324
case <-timeoutCh:
13321325
close(call.timeout)
1326+
c.logger.Debug("Request timed out on connection.",
1327+
newLogFieldString("host_id", c.host.HostID()), newLogFieldIp("addr", c.host.ConnectAddress()))
13331328
c.handleTimeout()
13341329
return nil, ErrTimeoutNoResponse
13351330
case <-ctxDone:
1331+
c.logger.Debug("Request failed because context elapsed out on connection.",
1332+
newLogFieldString("host_id", c.host.HostID()), newLogFieldIp("addr", c.host.ConnectAddress()),
1333+
newLogFieldError("ctx_err", ctx.Err()))
13361334
close(call.timeout)
13371335
return nil, ctx.Err()
13381336
case <-c.ctx.Done():
1337+
c.logger.Debug("Request failed because connection closed.",
1338+
newLogFieldString("host_id", c.host.HostID()), newLogFieldIp("addr", c.host.ConnectAddress()))
13391339
close(call.timeout)
13401340
return nil, ErrConnectionClosed
13411341
}
@@ -1685,7 +1685,7 @@ func (c *Conn) executeQuery(ctx context.Context, q *internalQuery) *Iter {
16851685
iter.framer = framer
16861686
if err := c.awaitSchemaAgreement(ctx); err != nil {
16871687
// TODO: should have this behind a flag
1688-
c.logger.Println(err)
1688+
c.logger.Warning("Error while awaiting for schema agreement after a schema change event.", newLogFieldError("err", err))
16891689
}
16901690
// dont return an error from this, might be a good idea to give a warning
16911691
// though. The impact of this returning an error would be that the cluster
@@ -1947,7 +1947,7 @@ func (c *Conn) awaitSchemaAgreement(ctx context.Context) (err error) {
19471947
goto cont
19481948
}
19491949
if !isValidPeer(host) || host.schemaVersion == "" {
1950-
c.logger.Printf("invalid peer or peer with empty schema_version: peer=%q", host)
1950+
c.logger.Warning("Invalid peer or peer with empty schema_version.", newLogFieldIp("peer", host.ConnectAddress()))
19511951
continue
19521952
}
19531953

conn_test.go

Lines changed: 19 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ import (
4141
"math/rand"
4242
"net"
4343
"os"
44+
"strconv"
4445
"strings"
4546
"sync"
4647
"sync/atomic"
@@ -184,7 +185,7 @@ func newTestSession(proto protoVersion, addresses ...string) (*Session, error) {
184185
}
185186

186187
func TestDNSLookupConnected(t *testing.T) {
187-
log := &testLogger{}
188+
log := newTestLogger(LogLevelDebug)
188189

189190
// Override the defaul DNS resolver and restore at the end
190191
failDNS = true
@@ -205,13 +206,13 @@ func TestDNSLookupConnected(t *testing.T) {
205206
t.Fatal("CreateSession() should have connected")
206207
}
207208

208-
if !strings.Contains(log.String(), "gocql: dns error") {
209+
if !strings.Contains(log.String(), "gocql: DNS error") {
209210
t.Fatalf("Expected to receive dns error log message - got '%s' instead", log.String())
210211
}
211212
}
212213

213214
func TestDNSLookupError(t *testing.T) {
214-
log := &testLogger{}
215+
log := newTestLogger(LogLevelDebug)
215216

216217
// Override the defaul DNS resolver and restore at the end
217218
failDNS = true
@@ -229,7 +230,7 @@ func TestDNSLookupError(t *testing.T) {
229230
t.Fatal("CreateSession() should have returned an error")
230231
}
231232

232-
if !strings.Contains(log.String(), "gocql: dns error") {
233+
if !strings.Contains(log.String(), "gocql: DNS error") {
233234
t.Fatalf("Expected to receive dns error log message - got '%s' instead", log.String())
234235
}
235236

@@ -240,7 +241,7 @@ func TestDNSLookupError(t *testing.T) {
240241

241242
func TestStartupTimeout(t *testing.T) {
242243
ctx, cancel := context.WithCancel(context.Background())
243-
log := &testLogger{}
244+
log := newTestLogger(LogLevelDebug)
244245

245246
srv := NewTestServer(t, defaultProto, ctx)
246247
defer srv.Stop()
@@ -348,17 +349,20 @@ func TestCancel(t *testing.T) {
348349

349350
type testQueryObserver struct {
350351
metrics map[string]*hostMetrics
351-
verbose bool
352-
logger StdLogger
352+
logger StructuredLogger
353353
}
354354

355355
func (o *testQueryObserver) ObserveQuery(ctx context.Context, q ObservedQuery) {
356356
host := q.Host.ConnectAddress().String()
357357
o.metrics[host] = q.Metrics
358-
if o.verbose {
359-
o.logger.Printf("Observed query %q. Returned %v rows, took %v on host %q with %v attempts and total latency %v. Error: %q\n",
360-
q.Statement, q.Rows, q.End.Sub(q.Start), host, q.Metrics.Attempts, q.Metrics.TotalLatency, q.Err)
361-
}
358+
o.logger.Debug("Observed query.",
359+
newLogFieldString("stmt", q.Statement),
360+
newLogFieldInt("rows", q.Rows),
361+
newLogFieldString("duration", q.End.Sub(q.Start).String()),
362+
newLogFieldString("host", host),
363+
newLogFieldInt("attempts", q.Metrics.Attempts),
364+
newLogFieldString("latency", strconv.FormatInt(q.Metrics.TotalLatency, 10)),
365+
newLogFieldError("err", q.Err))
362366
}
363367

364368
func (o *testQueryObserver) GetMetrics(host *HostInfo) *hostMetrics {
@@ -411,7 +415,7 @@ func TestQueryRetry(t *testing.T) {
411415
}
412416

413417
func TestQueryMultinodeWithMetrics(t *testing.T) {
414-
log := &testLogger{}
418+
log := newTestLogger(LogLevelNone)
415419
defer func() {
416420
os.Stdout.WriteString(log.String())
417421
}()
@@ -439,7 +443,7 @@ func TestQueryMultinodeWithMetrics(t *testing.T) {
439443

440444
// 1 retry per host
441445
rt := &SimpleRetryPolicy{NumRetries: 3}
442-
observer := &testQueryObserver{metrics: make(map[string]*hostMetrics), verbose: false, logger: log}
446+
observer := &testQueryObserver{metrics: make(map[string]*hostMetrics), logger: log}
443447
qry := db.Query("kill").RetryPolicy(rt).Observer(observer).Idempotent(true)
444448
iter := qry.Iter()
445449
err = iter.Close()
@@ -488,7 +492,7 @@ func (t *testRetryPolicy) GetRetryType(err error) RetryType {
488492
}
489493

490494
func TestSpeculativeExecution(t *testing.T) {
491-
log := &testLogger{}
495+
log := newTestLogger(LogLevelDebug)
492496
defer func() {
493497
os.Stdout.WriteString(log.String())
494498
}()
@@ -724,7 +728,7 @@ func TestStream0(t *testing.T) {
724728
session: &Session{
725729
types: GlobalTypes,
726730
},
727-
logger: &defaultLogger{},
731+
logger: NewLogger(LogLevelNone),
728732
}
729733

730734
err := conn.recv(context.Background(), false)

0 commit comments

Comments
 (0)