Skip to content
This repository was archived by the owner on Nov 7, 2025. It is now read-only.

Commit 94181a6

Browse files
authored
Add deduplicated log line printing to quesma_logger, use it in places that logged most often (#1266)
This PR introduces `logger.DeduplicatedWarn()` and `logger.DeduplicatedInfo()` methods to `quesma_logger`, which automatically deduplicate log messages. For example, ``` Ingesting via _bulk API, batch size=2 lines ``` is a log line that the code tries to print very often, but with `DeduplicatedInfo()` that exact log line is only printed once. An expirable LRU cache is used with 1 minute expiry time, so the log line is still printed occasionally, but at most 1 every minute. Additionally, a couple places that produced a lot of logs were migrated to use `DeduplicatedWarn()`/`DeduplicatedInfo()`. --------- Signed-off-by: Piotr Grabowski <[email protected]>
1 parent 83609fd commit 94181a6

File tree

7 files changed

+114
-24
lines changed

7 files changed

+114
-24
lines changed

quesma/clickhouse/schema.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -280,13 +280,13 @@ func NewType(value any, valueOrigin string) (Type, error) {
280280
cols = append(cols, &Column{Name: k, Type: innerType, Codec: Codec{Name: ""}})
281281
}
282282
if len(cols) == 0 {
283-
logger.Warn().Msgf("Empty map type (origin: %s).", valueOrigin)
283+
logger.DeduplicatedWarn().Msgf("Empty map type (origin: %s).", valueOrigin)
284284
return nil, fmt.Errorf("empty map type (origin: %s)", valueOrigin)
285285
}
286286
return MultiValueType{Name: "Tuple", Cols: cols}, nil
287287
case []interface{}:
288288
if len(valueCasted) == 0 {
289-
logger.Warn().Msgf("Empty array type (origin: %s).", valueOrigin)
289+
logger.DeduplicatedWarn().Msgf("Empty array type (origin: %s).", valueOrigin)
290290
return nil, fmt.Errorf("empty array type (origin: %s)", valueOrigin)
291291
}
292292
innerName := fmt.Sprintf("%s[0]", valueOrigin)
@@ -296,11 +296,11 @@ func NewType(value any, valueOrigin string) (Type, error) {
296296
}
297297
return CompoundType{Name: "Array", BaseType: innerType}, nil
298298
case nil:
299-
logger.Warn().Msgf("Nil type (origin: %s).", valueOrigin)
299+
logger.DeduplicatedWarn().Msgf("Nil type (origin: %s).", valueOrigin)
300300
return nil, fmt.Errorf("nil type (origin: %s)", valueOrigin)
301301
}
302302

303-
logger.Warn().Msgf("Unsupported type '%T' of value: %v (origin: %s).", value, value, valueOrigin)
303+
logger.DeduplicatedWarn().Msgf("Unsupported type '%T' of value: %v (origin: %s).", value, value, valueOrigin)
304304
return nil, fmt.Errorf("unsupported type '%T' of value: %v (origin: %s)", value, value, valueOrigin)
305305
}
306306

quesma/go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ require (
1818
github.com/gorilla/securecookie v1.1.2
1919
github.com/gorilla/sessions v1.4.0
2020
github.com/hashicorp/go-multierror v1.1.1
21+
github.com/hashicorp/golang-lru/v2 v2.0.7
2122
github.com/jackc/pgx/v5 v5.7.2
2223
github.com/k0kubun/pp v3.0.1+incompatible
2324
github.com/knadh/koanf/parsers/json v0.1.0

quesma/go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,8 @@ github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY
6565
github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
6666
github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo=
6767
github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM=
68+
github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k=
69+
github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM=
6870
github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM=
6971
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
7072
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo=

quesma/logger/logger.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -268,3 +268,11 @@ func Panic() *zerolog.Event {
268268
func ReasonUnsupportedQuery(queryType string) string {
269269
return ReasonPrefixUnsupportedQueryType + queryType
270270
}
271+
272+
func DeduplicatedInfo() quesma_v2.DeduplicatedEvent {
273+
return logger.DeduplicatedInfo()
274+
}
275+
276+
func DeduplicatedWarn() quesma_v2.DeduplicatedEvent {
277+
return logger.DeduplicatedWarn()
278+
}

quesma/quesma/functionality/bulk/bulk.go

Lines changed: 1 addition & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ import (
2222
"net/http"
2323
"sort"
2424
"strings"
25-
"sync"
2625
)
2726

2827
type (
@@ -72,7 +71,7 @@ func Write(ctx context.Context, defaultIndex *string, bulk types.NDJSON, ip *ing
7271
defer recovery.LogPanic()
7372

7473
maxBulkSize := len(bulk)
75-
maybeLogBatchSize(maxBulkSize)
74+
logger.DeduplicatedInfo().Msgf("Ingesting via _bulk API, batch size=%d lines", maxBulkSize)
7675

7776
// The returned results should be in the same order as the input request, however splitting the bulk might change the order.
7877
// Therefore, each BulkRequestEntry has a corresponding pointer to the result entry, allowing us to freely split and reshuffle the bulk.
@@ -333,17 +332,3 @@ func sendToClickhouse(ctx context.Context, clickhouseBulkEntries map[string][]Bu
333332
}
334333
}
335334
}
336-
337-
// Global set to keep track of logged batch sizes
338-
var loggedBatchSizes = make(map[int]struct{})
339-
var mutex sync.Mutex
340-
341-
// maybeLogBatchSize logs only unique batch sizes
342-
func maybeLogBatchSize(batchSize int) {
343-
mutex.Lock()
344-
defer mutex.Unlock()
345-
if _, alreadyLogged := loggedBatchSizes[batchSize]; !alreadyLogged {
346-
logger.Info().Msgf("Ingesting via _bulk API, batch size=%d lines", batchSize)
347-
loggedBatchSizes[batchSize] = struct{}{}
348-
}
349-
}

quesma/v2/core/quesma_logger.go

Lines changed: 54 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,11 @@ package quesma_api
44

55
import (
66
"context"
7+
"fmt"
78
"github.com/QuesmaOrg/quesma/quesma/v2/core/tracing"
9+
"github.com/hashicorp/golang-lru/v2/expirable"
810
"github.com/rs/zerolog"
11+
"hash/fnv"
912
"os"
1013
"time"
1114
)
@@ -20,6 +23,9 @@ const (
2023
DefaultBurstSamplerPeriodSeconds = 20 // burst up to 600 lines of logs per 20 seconds period
2124
DefaultBurstSamplerMaxLogsPerSecond = 30 // ~100k lines of logs per hour
2225
DefaultSheddingFrequency = 100 // when the limit is exhausted, log every ~ 100 log lines
26+
27+
DeduplicatedLogsCacheSize = 1000
28+
DeduplicatedLogsExpiryTime = 1 * time.Minute
2329
)
2430

2531
type QuesmaLogger interface {
@@ -30,6 +36,10 @@ type QuesmaLogger interface {
3036
Fatal() *zerolog.Event
3137
Panic() *zerolog.Event
3238

39+
// TODO: Add similar for other log levels
40+
DeduplicatedInfo() DeduplicatedEvent
41+
DeduplicatedWarn() DeduplicatedEvent
42+
3343
DebugWithCtx(ctx context.Context) *zerolog.Event
3444
InfoWithCtx(ctx context.Context) *zerolog.Event
3545
WarnWithCtx(ctx context.Context) *zerolog.Event
@@ -45,18 +55,19 @@ type QuesmaLogger interface {
4555

4656
type QuesmaLoggerImpl struct {
4757
zerolog.Logger
58+
59+
deduplicatedLogs *expirable.LRU[any, struct{}]
4860
}
4961

5062
func NewQuesmaLogger(log zerolog.Logger) QuesmaLogger {
5163
return &QuesmaLoggerImpl{
52-
Logger: log,
64+
Logger: log,
65+
deduplicatedLogs: expirable.NewLRU[any, struct{}](DeduplicatedLogsCacheSize, nil, DeduplicatedLogsExpiryTime),
5366
}
5467
}
5568

5669
func (l *QuesmaLoggerImpl) WithComponent(name string) QuesmaLogger {
57-
return &QuesmaLoggerImpl{
58-
Logger: l.Logger.With().Str("component", name).Logger(),
59-
}
70+
return NewQuesmaLogger(l.Logger.With().Str("component", name).Logger())
6071
}
6172

6273
func (l *QuesmaLoggerImpl) MarkTraceEndWithCtx(ctx context.Context) *zerolog.Event {
@@ -124,6 +135,45 @@ func (l *QuesmaLoggerImpl) ErrorWithCtx(ctx context.Context) *zerolog.Event {
124135
return event
125136
}
126137

138+
func (l *QuesmaLoggerImpl) DeduplicatedInfo() DeduplicatedEvent {
139+
return DeduplicatedEvent{
140+
event: l.Info(),
141+
l: l,
142+
}
143+
}
144+
145+
func (l *QuesmaLoggerImpl) DeduplicatedWarn() DeduplicatedEvent {
146+
return DeduplicatedEvent{
147+
event: l.Warn(),
148+
l: l,
149+
}
150+
}
151+
152+
type DeduplicatedEvent struct {
153+
event *zerolog.Event
154+
l *QuesmaLoggerImpl
155+
}
156+
157+
func hashMsgf(format string, v ...interface{}) uint32 {
158+
// []interface{} is not hashable, so we need to hash it manually
159+
// For the convenience sake we just hash a Print representation
160+
h := fnv.New32a()
161+
fmt.Fprint(h, format, v)
162+
return h.Sum32()
163+
}
164+
165+
// TODO: Add wrappers for other *zerolog.Event methods
166+
func (m DeduplicatedEvent) Msgf(format string, v ...interface{}) {
167+
hash := hashMsgf(format, v)
168+
169+
if m.l.deduplicatedLogs.Contains(hash) {
170+
return
171+
}
172+
173+
m.l.deduplicatedLogs.Add(hash, struct{}{})
174+
m.event.Msgf(format, v...)
175+
}
176+
127177
func EmptyQuesmaLogger() QuesmaLogger {
128178
// not so empty :D
129179
return NewQuesmaLogger(zerolog.New(
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
// Copyright Quesma, licensed under the Elastic License 2.0.
2+
// SPDX-License-Identifier: Elastic-2.0
3+
package quesma_api
4+
5+
import (
6+
"bytes"
7+
"github.com/rs/zerolog"
8+
"github.com/stretchr/testify/assert"
9+
"strings"
10+
"testing"
11+
)
12+
13+
func TestDeduplicatedEvents(t *testing.T) {
14+
var buf bytes.Buffer
15+
16+
log := zerolog.New(&buf).Level(zerolog.DebugLevel)
17+
logger := NewQuesmaLogger(log)
18+
19+
logger.DeduplicatedInfo().Msgf("info test %d", 42)
20+
logger.DeduplicatedInfo().Msgf("info test %d", 42) // duplicate should be skipped
21+
22+
logger.DeduplicatedWarn().Msgf("warn test %d", 42)
23+
logger.DeduplicatedWarn().Msgf("warn test %d", 42) // duplicate should be skipped
24+
logger.DeduplicatedWarn().Msgf("warn test %d", 1000) // distinct argument
25+
logger.DeduplicatedWarn().Msgf("WARN test %d", 42) // distinct format string
26+
27+
logger.DeduplicatedWarn().Msgf("two args %d %s %v", 42, "asda", []byte{1, 2, 3})
28+
logger.DeduplicatedWarn().Msgf("two args %d %s %v", 42, "asda", []byte{1, 2, 3}) // duplicate should be skipped
29+
logger.DeduplicatedWarn().Msgf("two args %d %s %v", 45, "asda", []byte{1, 2, 3}) // distinct argument
30+
logger.DeduplicatedWarn().Msgf("two args %d %s %v", 42, "asda2", []byte{1, 2, 3}) // distinct argument
31+
logger.DeduplicatedWarn().Msgf("two args %d %s %v", 42, "asda", []byte{5, 2, 3}) // distinct argument
32+
logger.DeduplicatedWarn().Msgf("two args %d %s %v", 42, "asda2", []byte{1, 2, 3}) // duplicate should be skipped
33+
34+
output := buf.String()
35+
36+
assert.Equal(t, 1, strings.Count(output, "info test 42"))
37+
assert.Equal(t, 1, strings.Count(output, "warn test 42"))
38+
assert.Equal(t, 1, strings.Count(output, "warn test 1000"))
39+
assert.Equal(t, 1, strings.Count(output, "WARN test 42"))
40+
assert.Equal(t, 1, strings.Count(output, "two args 42 asda [1 2 3]"))
41+
assert.Equal(t, 1, strings.Count(output, "two args 45 asda [1 2 3]"))
42+
assert.Equal(t, 1, strings.Count(output, "two args 42 asda2 [1 2 3]"))
43+
assert.Equal(t, 1, strings.Count(output, "two args 42 asda [5 2 3]"))
44+
}

0 commit comments

Comments
 (0)