Skip to content

Commit 7d10585

Browse files
committed
add log field value types instead of relying on interface{} for everything which leads to extra heap allocations
1 parent e302a52 commit 7d10585

File tree

16 files changed

+288
-86
lines changed

16 files changed

+288
-86
lines changed

cluster.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -340,8 +340,8 @@ func (cfg *ClusterConfig) translateAddressPort(addr net.IP, port int, logger int
340340
}
341341
newAddr, newPort := cfg.AddressTranslator.Translate(addr, port)
342342
logger.Debug("translating address '%v:%d' to '%v:%d'",
343-
NewLogField("old_addr", addr), NewLogField("old_port", port),
344-
NewLogField("new_addr", newAddr), NewLogField("new_port", newPort))
343+
newLogFieldIp("old_addr", addr), newLogFieldInt("old_port", port),
344+
newLogFieldIp("new_addr", newAddr), newLogFieldInt("new_port", newPort))
345345
return newAddr, newPort
346346
}
347347

conn.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -702,7 +702,7 @@ func (c *Conn) processFrame(ctx context.Context, r io.Reader) error {
702702
delete(c.calls, head.stream)
703703
c.mu.Unlock()
704704
if call == nil || !ok {
705-
c.logger.Warning("received response for stream which has no handler: header=%v", NewLogField("header", head))
705+
c.logger.Warning("received response for stream which has no handler: header=%v", newLogFieldString("header", head.String()))
706706
return c.discardFrame(r, head)
707707
} else if head.stream != call.streamID {
708708
panic(fmt.Sprintf("call has incorrect streamID: got %d expected %d", call.streamID, head.stream))
@@ -1319,18 +1319,18 @@ func (c *Conn) execInternal(ctx context.Context, req frameBuilder, tracer Tracer
13191319
case <-timeoutCh:
13201320
close(call.timeout)
13211321
c.logger.Debug("Request timed out on connection %v (%v)",
1322-
NewLogField("host_id", c.host.HostID()), NewLogField("addr", c.host.ConnectAddress()))
1322+
newLogFieldString("host_id", c.host.HostID()), newLogFieldIp("addr", c.host.ConnectAddress()))
13231323
c.handleTimeout()
13241324
return nil, ErrTimeoutNoResponse
13251325
case <-ctxDone:
13261326
c.logger.Debug("Request failed because context elapsed out on connection %v (%v): %v",
1327-
NewLogField("host_id", c.host.HostID()), NewLogField("addr", c.host.ConnectAddress()),
1328-
NewLogField("ctx_err", ctx.Err().Error()))
1327+
newLogFieldString("host_id", c.host.HostID()), newLogFieldIp("addr", c.host.ConnectAddress()),
1328+
newLogFieldError("ctx_err", ctx.Err()))
13291329
close(call.timeout)
13301330
return nil, ctx.Err()
13311331
case <-c.ctx.Done():
13321332
c.logger.Debug("Request failed because connection closed %v (%v).",
1333-
NewLogField("host_id", c.host.HostID()), NewLogField("addr", c.host.ConnectAddress()))
1333+
newLogFieldString("host_id", c.host.HostID()), newLogFieldIp("addr", c.host.ConnectAddress()))
13341334
close(call.timeout)
13351335
return nil, ErrConnectionClosed
13361336
}
@@ -1671,7 +1671,7 @@ func (c *Conn) executeQuery(ctx context.Context, qry *Query) *Iter {
16711671
iter := &Iter{framer: framer}
16721672
if err := c.awaitSchemaAgreement(ctx); err != nil {
16731673
// TODO: should have this behind a flag
1674-
c.logger.Warning("error while awaiting for schema agreement after a schema change event: %v", NewLogField("err", err.Error()))
1674+
c.logger.Warning("error while awaiting for schema agreement after a schema change event: %v", newLogFieldError("err", err))
16751675
}
16761676
// dont return an error from this, might be a good idea to give a warning
16771677
// though. The impact of this returning an error would be that the cluster
@@ -1925,7 +1925,7 @@ func (c *Conn) awaitSchemaAgreement(ctx context.Context) (err error) {
19251925
goto cont
19261926
}
19271927
if !isValidPeer(host) || host.schemaVersion == "" {
1928-
c.logger.Warning("invalid peer or peer with empty schema_version: peer=%s", NewLogField("peer", host.ConnectAddress()))
1928+
c.logger.Warning("invalid peer or peer with empty schema_version: peer=%s", newLogFieldIp("peer", host.ConnectAddress()))
19291929
continue
19301930
}
19311931

connectionpool.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -496,19 +496,19 @@ func (pool *hostConnPool) logConnectErr(err error) {
496496
// connection refused
497497
// these are typical during a node outage so avoid log spam.
498498
pool.logger.Debug("unable to dial %s (%s): %v",
499-
NewLogField("host_addr", pool.host.ConnectAddress()), NewLogField("host_id", pool.host.HostID()), NewLogField("err", err.Error()))
499+
newLogFieldIp("host_addr", pool.host.ConnectAddress()), newLogFieldString("host_id", pool.host.HostID()), newLogFieldError("err", err))
500500
} else if err != nil {
501501
// unexpected error
502502
pool.logger.Debug("failed to connect to %s (%s) due to error: %v",
503-
NewLogField("host_addr", pool.host.ConnectAddress()), NewLogField("host_id", pool.host.HostID()), NewLogField("err", err.Error()))
503+
newLogFieldIp("host_addr", pool.host.ConnectAddress()), newLogFieldString("host_id", pool.host.HostID()), newLogFieldError("err", err))
504504
}
505505
}
506506

507507
// transition back to a not-filling state.
508508
func (pool *hostConnPool) fillingStopped(err error) {
509509
if err != nil {
510510
pool.logger.Warning("connection pool filling failed %s (%s): %v",
511-
NewLogField("host_addr", pool.host.ConnectAddress()), NewLogField("host_id", pool.host.HostID()), NewLogField("err", err.Error()))
511+
newLogFieldIp("host_addr", pool.host.ConnectAddress()), newLogFieldString("host_id", pool.host.HostID()), newLogFieldError("err", err))
512512
// wait for some time to avoid back-to-back filling
513513
// this provides some time between failed attempts
514514
// to fill the pool for the host to recover
@@ -525,7 +525,7 @@ func (pool *hostConnPool) fillingStopped(err error) {
525525
// if we errored and the size is now zero, make sure the host is marked as down
526526
// see https://github.com/apache/cassandra-gocql-driver/issues/1614
527527
pool.logger.Debug("conns of pool after stopped %s (%s): %v",
528-
NewLogField("host_addr", host.ConnectAddress()), NewLogField("host_id", host.HostID()), NewLogField("count", count))
528+
newLogFieldIp("host_addr", host.ConnectAddress()), newLogFieldString("host_id", host.HostID()), newLogFieldInt("count", count))
529529
if err != nil && count == 0 {
530530
if pool.session.cfg.ConvictionPolicy.AddFailure(err, host) {
531531
pool.session.handleNodeDown(host.ConnectAddress(), port)
@@ -582,10 +582,10 @@ func (pool *hostConnPool) connect() (err error) {
582582
}
583583
}
584584
pool.logger.Warning("connection failed %s (%s): %v, reconnecting with %v",
585-
NewLogField("host", pool.host.ConnectAddress()),
586-
NewLogField("host_id", pool.host.HostID()),
587-
NewLogField("err", err.Error()),
588-
NewLogField("reconnectionPolicy", fmt.Sprintf("%T", reconnectionPolicy)))
585+
newLogFieldIp("host", pool.host.ConnectAddress()),
586+
newLogFieldString("host_id", pool.host.HostID()),
587+
newLogFieldError("err", err),
588+
newLogFieldString("reconnectionPolicy", fmt.Sprintf("%T", reconnectionPolicy)))
589589
time.Sleep(reconnectionPolicy.GetInterval(i))
590590
}
591591

@@ -633,7 +633,7 @@ func (pool *hostConnPool) HandleError(conn *Conn, err error, closed bool) {
633633
}
634634

635635
pool.logger.Info("pool connection error %v: %v",
636-
NewLogField("addr", conn.addr), NewLogField("err", err.Error()))
636+
newLogFieldString("addr", conn.addr), newLogFieldError("err", err))
637637

638638
// find the connection index
639639
for i, candidate := range pool.conns {

control.go

Lines changed: 31 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ func (c *controlConn) heartBeat() {
105105

106106
resp, err := c.writeFrame(&writeOptionsFrame{})
107107
if err != nil {
108-
c.session.logger.Debug("control connection heartbeat failed: %v.", NewLogField("err", err.Error()))
108+
c.session.logger.Debug("control connection heartbeat failed: %v.", newLogFieldError("err", err))
109109
goto reconn
110110
}
111111

@@ -115,10 +115,10 @@ func (c *controlConn) heartBeat() {
115115
sleepTime = 5 * time.Second
116116
continue
117117
case error:
118-
c.session.logger.Debug("control connection heartbeat failed: %v.", NewLogField("err", actualResp.Error()))
118+
c.session.logger.Debug("control connection heartbeat failed: %v.", newLogFieldError("err", actualResp))
119119
goto reconn
120120
default:
121-
c.session.logger.Error("unknown frame in response to options: %v", NewLogField("frame_type", fmt.Sprintf("%T", resp)))
121+
c.session.logger.Error("unknown frame in response to options: %v", newLogFieldString("frame_type", fmt.Sprintf("%T", resp)))
122122
}
123123

124124
reconn:
@@ -247,18 +247,18 @@ func (c *controlConn) discoverProtocol(hosts []*HostInfo) (int, error) {
247247

248248
if err == nil {
249249
c.session.logger.Debug("discovered protocol version %v using host %v (%s).",
250-
NewLogField("protocol_version", connCfg.ProtoVersion), NewLogField("host_addr", host.ConnectAddress()), NewLogField("host_id", host.HostID()))
250+
newLogFieldInt("protocol_version", connCfg.ProtoVersion), newLogFieldIp("host_addr", host.ConnectAddress()), newLogFieldString("host_id", host.HostID()))
251251
return connCfg.ProtoVersion, nil
252252
}
253253

254254
if proto := parseProtocolFromError(err); proto > 0 {
255255
c.session.logger.Debug("discovered protocol version %v using host %v (%s).",
256-
NewLogField("protocol_version", proto), NewLogField("host_addr", host.ConnectAddress()), NewLogField("host_id", host.HostID()))
256+
newLogFieldInt("protocol_version", proto), newLogFieldIp("host_addr", host.ConnectAddress()), newLogFieldString("host_id", host.HostID()))
257257
return proto, nil
258258
}
259259

260260
c.session.logger.Debug("failed to discover protocol version using host %v (%v): %v.",
261-
NewLogField("host_addr", host.ConnectAddress()), NewLogField("host_id", host.HostID()), NewLogField("err", err.Error()))
261+
newLogFieldIp("host_addr", host.ConnectAddress()), newLogFieldString("host_id", host.HostID()), newLogFieldError("err", err))
262262
}
263263

264264
return 0, err
@@ -282,21 +282,21 @@ func (c *controlConn) connect(hosts []*HostInfo) error {
282282
conn, err = c.session.dial(c.session.ctx, host, &cfg, c)
283283
if err != nil {
284284
c.session.logger.Info("unable to dial control conn %s:%v (%s): %v",
285-
NewLogField("host_addr", host.ConnectAddress()),
286-
NewLogField("port", host.Port()),
287-
NewLogField("host_id", host.HostID()),
288-
NewLogField("err", err.Error()))
285+
newLogFieldIp("host_addr", host.ConnectAddress()),
286+
newLogFieldInt("port", host.Port()),
287+
newLogFieldString("host_id", host.HostID()),
288+
newLogFieldError("err", err))
289289
continue
290290
}
291291
err = c.setupConn(conn)
292292
if err == nil {
293293
break
294294
}
295295
c.session.logger.Info("unable setup control conn %v:%v (%s): %v",
296-
NewLogField("host_addr", host.ConnectAddress()),
297-
NewLogField("port", host.Port()),
298-
NewLogField("host_id", host.HostID()),
299-
NewLogField("err", err.Error()))
296+
newLogFieldIp("host_addr", host.ConnectAddress()),
297+
newLogFieldInt("port", host.Port()),
298+
newLogFieldString("host_id", host.HostID()),
299+
newLogFieldError("err", err))
300300
conn.Close()
301301
conn = nil
302302
}
@@ -334,7 +334,7 @@ func (c *controlConn) setupConn(conn *Conn) error {
334334

335335
if !exists {
336336
c.session.logger.Info("adding host %v (%v).",
337-
NewLogField("host_addr", host.ConnectAddress().String()), NewLogField("host_id", host.HostID()))
337+
newLogFieldIp("host_addr", host.ConnectAddress()), newLogFieldString("host_id", host.HostID()))
338338
}
339339

340340
if err := c.registerEvents(conn); err != nil {
@@ -349,7 +349,7 @@ func (c *controlConn) setupConn(conn *Conn) error {
349349
c.conn.Store(ch)
350350

351351
c.session.logger.Info("control connection connected to %v (%s).",
352-
NewLogField("host_addr", host.ConnectAddress().String()), NewLogField("host_id", host.HostID()))
352+
newLogFieldIp("host_addr", host.ConnectAddress()), newLogFieldString("host_id", host.HostID()))
353353

354354
if c.session.initialized() {
355355
// We connected to control conn, so add the connect the host in pool as well.
@@ -411,14 +411,14 @@ func (c *controlConn) reconnect() {
411411

412412
if err != nil {
413413
c.session.logger.Error("unable to reconnect control connection: %v",
414-
NewLogField("err", err.Error()))
414+
newLogFieldError("err", err))
415415
return
416416
}
417417

418418
err = c.session.refreshRing()
419419
if err != nil {
420420
c.session.logger.Warning("unable to refresh ring: %v",
421-
NewLogField("err", err.Error()))
421+
newLogFieldError("err", err))
422422
}
423423
}
424424

@@ -448,7 +448,7 @@ func (c *controlConn) attemptReconnect() (*Conn, error) {
448448
return conn, err
449449
}
450450

451-
c.session.logger.Error("unable to connect to any ring node, control connection falling back to initial contact points: %v", NewLogField("err", err.Error()))
451+
c.session.logger.Error("unable to connect to any ring node, control connection falling back to initial contact points: %v", newLogFieldError("err", err))
452452
// Fallback to initial contact points, as it may be the case that all known initialHosts
453453
// changed their IPs while keeping the same hostname(s).
454454
initialHosts, resolvErr := addrsToHosts(c.session.cfg.Hosts, c.session.cfg.Port, c.session.logger)
@@ -466,21 +466,21 @@ func (c *controlConn) attemptReconnectToAnyOfHosts(hosts []*HostInfo) (*Conn, er
466466
conn, err = c.session.connect(c.session.ctx, host, c)
467467
if err != nil {
468468
c.session.logger.Info("unable to dial control conn %s:%v (%s): %v",
469-
NewLogField("host_addr", host.ConnectAddress()),
470-
NewLogField("port", host.Port()),
471-
NewLogField("host_id", host.HostID()),
472-
NewLogField("err", err.Error()))
469+
newLogFieldIp("host_addr", host.ConnectAddress()),
470+
newLogFieldInt("port", host.Port()),
471+
newLogFieldString("host_id", host.HostID()),
472+
newLogFieldError("err", err))
473473
continue
474474
}
475475
err = c.setupConn(conn)
476476
if err == nil {
477477
break
478478
}
479479
c.session.logger.Info("unable setup control conn %v:%v (%s): %v",
480-
NewLogField("host_addr", host.ConnectAddress()),
481-
NewLogField("port", host.Port()),
482-
NewLogField("host_id", host.HostID()),
483-
NewLogField("err", err.Error()))
480+
newLogFieldIp("host_addr", host.ConnectAddress()),
481+
newLogFieldInt("port", host.Port()),
482+
newLogFieldString("host_id", host.HostID()),
483+
newLogFieldError("err", err))
484484
conn.Close()
485485
conn = nil
486486
}
@@ -501,9 +501,9 @@ func (c *controlConn) HandleError(conn *Conn, err error, closed bool) {
501501
}
502502

503503
c.session.logger.Info("control connection error %v (%s): %v",
504-
NewLogField("host_addr", conn.host.ConnectAddress().String()),
505-
NewLogField("host_id", conn.host.HostID()),
506-
NewLogField("err", err.Error()))
504+
newLogFieldIp("host_addr", conn.host.ConnectAddress()),
505+
newLogFieldString("host_id", conn.host.HostID()),
506+
newLogFieldError("err", err))
507507

508508
c.reconnect()
509509
}
@@ -568,7 +568,7 @@ func (c *controlConn) query(statement string, values ...interface{}) (iter *Iter
568568

569569
if iter.err != nil {
570570
c.session.logger.Warning("control: error executing %v: %v",
571-
NewLogField("statement", statement), NewLogField("err", iter.err.Error()))
571+
newLogFieldString("statement", statement), newLogFieldError("err", iter.err))
572572
}
573573

574574
q.AddAttempts(1, c.getConn().host)

events.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ func (e *eventDebouncer) debounce(frame frame) {
104104
e.events = append(e.events, frame)
105105
} else {
106106
e.logger.Warning("%s: buffer full, dropping event frame: %s",
107-
NewLogField("event_name", e.name), NewLogField("frame", frame))
107+
newLogFieldString("event_name", e.name), newLogFieldStringer("frame", frame))
108108
}
109109

110110
e.mu.Unlock()
@@ -113,11 +113,11 @@ func (e *eventDebouncer) debounce(frame frame) {
113113
func (s *Session) handleEvent(framer *framer) {
114114
frame, err := framer.parseFrame()
115115
if err != nil {
116-
s.logger.Error("unable to parse event frame: %v", NewLogField("err", err.Error()))
116+
s.logger.Error("unable to parse event frame: %v", newLogFieldError("err", err))
117117
return
118118
}
119119

120-
s.logger.Debug("handling event frame: %v", NewLogField("frame", frame.String()))
120+
s.logger.Debug("handling event frame: %v", newLogFieldStringer("frame", frame))
121121

122122
switch f := frame.(type) {
123123
case *schemaChangeKeyspace, *schemaChangeFunction,
@@ -128,7 +128,7 @@ func (s *Session) handleEvent(framer *framer) {
128128
s.nodeEvents.debounce(frame)
129129
default:
130130
s.logger.Error("invalid event frame (%v): %v",
131-
NewLogField("frame_type", fmt.Sprintf("%T", f)), NewLogField("frame", f.String()))
131+
newLogFieldString("frame_type", fmt.Sprintf("%T", f)), newLogFieldStringer("frame", f))
132132
}
133133
}
134134

@@ -181,7 +181,7 @@ func (s *Session) handleNodeEvent(frames []frame) {
181181
switch f := frame.(type) {
182182
case *topologyChangeEventFrame:
183183
s.logger.Warning("received topology change event: %v",
184-
NewLogField("frame", strings.Join([]string{f.change, "->", f.host.String(), ":", strconv.Itoa(f.port)}, "")))
184+
newLogFieldString("frame", strings.Join([]string{f.change, "->", f.host.String(), ":", strconv.Itoa(f.port)}, "")))
185185
topologyEventReceived = true
186186
case *statusChangeEventFrame:
187187
event, ok := sEvents[f.host.String()]
@@ -199,7 +199,7 @@ func (s *Session) handleNodeEvent(frames []frame) {
199199

200200
for _, f := range sEvents {
201201
s.logger.Info("dispatching status change event: %v",
202-
NewLogField("frame", strings.Join([]string{f.change, "->", f.host.String(), ":", strconv.Itoa(f.port)}, "")))
202+
newLogFieldString("frame", strings.Join([]string{f.change, "->", f.host.String(), ":", strconv.Itoa(f.port)}, "")))
203203

204204
// ignore events we received if they were disabled
205205
// see https://github.com/apache/cassandra-gocql-driver/issues/1591
@@ -218,7 +218,7 @@ func (s *Session) handleNodeEvent(frames []frame) {
218218

219219
func (s *Session) handleNodeUp(eventIp net.IP, eventPort int) {
220220
s.logger.Info("node is UP: %s:%d",
221-
NewLogField("event_ip", eventIp.String()), NewLogField("event_port", eventPort))
221+
newLogFieldStringer("event_ip", eventIp), newLogFieldInt("event_port", eventPort))
222222

223223
host, ok := s.ring.getHostByIP(eventIp.String())
224224
if !ok {
@@ -244,7 +244,7 @@ func (s *Session) startPoolFill(host *HostInfo) {
244244

245245
func (s *Session) handleNodeConnected(host *HostInfo) {
246246
s.logger.Debug("connected to node: %s:%d (%s)",
247-
NewLogField("host_addr", host.ConnectAddress()), NewLogField("port", host.Port()), NewLogField("host_id", host.HostID()))
247+
newLogFieldIp("host_addr", host.ConnectAddress()), newLogFieldInt("port", host.Port()), newLogFieldString("host_id", host.HostID()))
248248

249249
host.setState(NodeUp)
250250

@@ -255,7 +255,7 @@ func (s *Session) handleNodeConnected(host *HostInfo) {
255255

256256
func (s *Session) handleNodeDown(ip net.IP, port int) {
257257
s.logger.Warning("node is DOWN: %s:%d",
258-
NewLogField("host_addr", ip.String()), NewLogField("port", port))
258+
newLogFieldIp("host_addr", ip), newLogFieldInt("port", port))
259259

260260
host, ok := s.ring.getHostByIP(ip.String())
261261
if ok {

extensions/gocqlzap/zap.go

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,11 +34,24 @@ func (rec *logger) ZapLogger() *zap.Logger {
3434
func (rec *logger) log(fields []gocql.LogField) *zap.Logger {
3535
childLogger := rec.zapLogger
3636
for _, field := range fields {
37-
childLogger = childLogger.WithLazy(zap.Any(field.Name, field.Value))
37+
childLogger = childLogger.WithLazy(zapField(field))
3838
}
3939
return childLogger
4040
}
4141

42+
func zapField(field gocql.LogField) zap.Field {
43+
switch field.Value.LogFieldValueType() {
44+
case gocql.LogFieldTypeBool:
45+
return zap.Bool(field.Name, field.Value.Bool())
46+
case gocql.LogFieldTypeInt64:
47+
return zap.Int64(field.Name, field.Value.Int64())
48+
case gocql.LogFieldTypeString:
49+
return zap.String(field.Name, field.Value.String())
50+
default:
51+
return zap.Any(field.Name, field.Value.Any())
52+
}
53+
}
54+
4255
func (rec *logger) Error(msg string, fields ...gocql.LogField) {
4356
rec.log(fields).Error(msg)
4457
}

0 commit comments

Comments
 (0)