Skip to content

Commit 7e0d4db

Browse files
authored
log(dm): sample repeated retry logs in retry loops (#12597)
ref #12499
1 parent 9fbde6e commit 7e0d4db

File tree

8 files changed

+111
-21
lines changed

8 files changed

+111
-21
lines changed

dm/master/scheduler/scheduler.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,8 @@ const (
9696
type Scheduler struct {
9797
mu sync.RWMutex
9898

99-
logger log.Logger
99+
logger log.Logger
100+
retryLogger *zap.Logger
100101

101102
started atomic.Bool // whether the scheduler already started for work.
102103
cancel context.CancelFunc
@@ -203,8 +204,10 @@ type Scheduler struct {
203204

204205
// NewScheduler creates a new scheduler instance.
205206
func NewScheduler(pLogger *log.Logger, securityCfg security.Security) *Scheduler {
207+
logger := pLogger.WithFields(zap.String("component", "scheduler"))
206208
return &Scheduler{
207-
logger: pLogger.WithFields(zap.String("component", "scheduler")),
209+
logger: logger,
210+
retryLogger: log.NewRetrySampleLogger(logger),
208211
subtaskLatch: newLatches(),
209212
sourceCfgs: make(map[string]*config.SourceConfig),
210213
workers: make(map[string]*Worker),
@@ -2108,7 +2111,7 @@ func (s *Scheduler) observeWorkerEvent(ctx context.Context, rev int64) error {
21082111
case <-time.After(500 * time.Millisecond):
21092112
rev, err = s.resetWorkerEv()
21102113
if err != nil {
2111-
log.L().Error("resetWorkerEv is failed, will retry later", zap.Error(err), zap.Int("retryNum", retryNum))
2114+
s.retryLogger.Error("resetWorkerEv is failed, will retry later", zap.Error(err), zap.Int("retryNum", retryNum))
21122115
}
21132116
}
21142117
retryNum++
@@ -2563,7 +2566,7 @@ func (s *Scheduler) observeLoadTask(ctx context.Context, rev int64) error {
25632566
case <-time.After(500 * time.Millisecond):
25642567
rev, err = s.recoverLoadTasks(true)
25652568
if err != nil {
2566-
log.L().Error("resetLoadTask is failed, will retry later", zap.Error(err), zap.Int("retryNum", retryNum))
2569+
s.retryLogger.Error("resetLoadTask is failed, will retry later", zap.Error(err), zap.Int("retryNum", retryNum))
25672570
}
25682571
}
25692572
retryNum++

dm/master/shardddl/optimist.go

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,8 @@ import (
4040
type Optimist struct {
4141
mu sync.Mutex
4242

43-
logger log.Logger
43+
logger log.Logger
44+
retryLogger *zap.Logger
4445

4546
closed bool
4647
cancel context.CancelFunc
@@ -53,11 +54,13 @@ type Optimist struct {
5354

5455
// NewOptimist creates a new Optimist instance.
5556
func NewOptimist(pLogger *log.Logger, getDownstreamMetaFunc func(string) (*dbconfig.DBConfig, string)) *Optimist {
57+
logger := pLogger.WithFields(zap.String("component", "shard DDL optimist"))
5658
return &Optimist{
57-
logger: pLogger.WithFields(zap.String("component", "shard DDL optimist")),
58-
closed: true,
59-
lk: optimism.NewLockKeeper(getDownstreamMetaFunc),
60-
tk: optimism.NewTableKeeper(),
59+
logger: logger,
60+
retryLogger: log.NewRetrySampleLogger(logger),
61+
closed: true,
62+
lk: optimism.NewLockKeeper(getDownstreamMetaFunc),
63+
tk: optimism.NewTableKeeper(),
6164
}
6265
}
6366

@@ -377,7 +380,7 @@ func (o *Optimist) run(ctx context.Context, revSource, revInfo, revOperation int
377380
case <-time.After(500 * time.Millisecond):
378381
revSource, revInfo, revOperation, err = o.rebuildLocks()
379382
if err != nil {
380-
o.logger.Error("fail to rebuild shard DDL lock, will retry",
383+
o.retryLogger.Error("fail to rebuild shard DDL lock, will retry",
381384
zap.Int("retryNum", retryNum), zap.Error(err))
382385
continue
383386
}

dm/master/shardddl/pessimist.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,8 @@ var (
4141
type Pessimist struct {
4242
mu sync.Mutex
4343

44-
logger log.Logger
44+
logger log.Logger
45+
retryLogger *zap.Logger
4546

4647
closed bool
4748
cancel context.CancelFunc
@@ -58,8 +59,10 @@ type Pessimist struct {
5859

5960
// NewPessimist creates a new Pessimist instance.
6061
func NewPessimist(pLogger *log.Logger, taskSources func(task string) []string) *Pessimist {
62+
logger := pLogger.WithFields(zap.String("component", "shard DDL pessimist"))
6163
return &Pessimist{
62-
logger: pLogger.WithFields(zap.String("component", "shard DDL pessimist")),
64+
logger: logger,
65+
retryLogger: log.NewRetrySampleLogger(logger),
6366
closed: true, // mark as closed before started.
6467
lk: pessimism.NewLockKeeper(),
6568
taskSources: taskSources,
@@ -107,7 +110,7 @@ func (p *Pessimist) run(ctx context.Context, etcdCli *clientv3.Client, rev1, rev
107110
case <-time.After(500 * time.Millisecond):
108111
rev1, rev2, err = p.buildLocks(etcdCli)
109112
if err != nil {
110-
log.L().Error("resetWorkerEv is failed, will retry later", zap.Error(err), zap.Int("retryNum", retryNum))
113+
p.retryLogger.Error("build shard DDL locks failed, will retry later", zap.Error(err), zap.Int("retryNum", retryNum))
111114
} else {
112115
succeed = true
113116
}

dm/pkg/log/log.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@ import (
1717
"context"
1818
"fmt"
1919
"strings"
20+
"sync"
2021
"sync/atomic"
22+
"time"
2123

2224
"github.com/pingcap/errors"
2325
pclog "github.com/pingcap/log"
@@ -33,6 +35,9 @@ const (
3335
defaultLogLevel = "info"
3436
defaultLogMaxDays = 7
3537
defaultLogMaxSize = 512 // MB
38+
39+
retryLogSampleInterval = time.Minute
40+
retryLogSampleFirst = 10
3641
)
3742

3843
// Config serializes log related config in toml/json.
@@ -150,6 +155,33 @@ func With(fields ...zap.Field) Logger {
150155
return Logger{appLogger.With(fields...)}
151156
}
152157

158+
func sampleLoggerFactory(base *zap.Logger, tick time.Duration, first int, fields ...zap.Field) func() *zap.Logger {
159+
if base == nil {
160+
base = zap.NewNop()
161+
}
162+
163+
var (
164+
once sync.Once
165+
logger *zap.Logger
166+
)
167+
168+
return func() *zap.Logger {
169+
once.Do(func() {
170+
sampleCore := zap.WrapCore(func(core zapcore.Core) zapcore.Core {
171+
return zapcore.NewSamplerWithOptions(core, tick, first, 0)
172+
})
173+
logger = base.With(fields...).With(zap.String("sampled", "")).WithOptions(sampleCore)
174+
})
175+
return logger
176+
}
177+
}
178+
179+
// NewRetrySampleLogger creates a logger that caps repeated retry logs with the
180+
// same level and message to retryLogSampleFirst entries per sampling window.
181+
func NewRetrySampleLogger(base Logger, fields ...zap.Field) *zap.Logger {
182+
return sampleLoggerFactory(base.Logger, retryLogSampleInterval, retryLogSampleFirst, fields...)()
183+
}
184+
153185
// SetLevel modifies the log level of the global logger. Returns the previous
154186
// level.
155187
func SetLevel(level zapcore.Level) zapcore.Level {

dm/pkg/log/log_test.go

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import (
3232
"go.uber.org/zap"
3333
"go.uber.org/zap/zapcore"
3434
"go.uber.org/zap/zaptest"
35+
"go.uber.org/zap/zaptest/observer"
3536
)
3637

3738
func TestTestLogger(t *testing.T) {
@@ -48,6 +49,41 @@ func TestTestLogger(t *testing.T) {
4849
require.Empty(t, buffer.Stripped())
4950
}
5051

52+
func TestNewRetrySampleLogger(t *testing.T) {
53+
core, observed := observer.New(zap.InfoLevel)
54+
base := Logger{zap.New(core).With(zap.String("base", "logger"))}
55+
logger := NewRetrySampleLogger(base, zap.String("component", "retry-unit-test"))
56+
57+
for i := 1; i <= retryLogSampleFirst+1; i++ {
58+
logger.Error("retryable operation failed", zap.Int("retryNum", i))
59+
}
60+
logger.Warn("retryable operation failed", zap.Int("retryNum", 3))
61+
logger.Error("another retryable operation failed", zap.Int("retryNum", retryLogSampleFirst+2))
62+
63+
entries := observed.All()
64+
require.Len(t, entries, retryLogSampleFirst+2)
65+
66+
require.Equal(t, "retryable operation failed", entries[0].Message)
67+
require.Equal(t, zap.ErrorLevel, entries[0].Level)
68+
require.Equal(t, map[string]any{
69+
"base": "logger",
70+
"component": "retry-unit-test",
71+
"retryNum": int64(1),
72+
"sampled": "",
73+
}, entries[0].ContextMap())
74+
75+
require.Equal(t, "retryable operation failed", entries[retryLogSampleFirst-1].Message)
76+
require.Equal(t, zap.ErrorLevel, entries[retryLogSampleFirst-1].Level)
77+
require.Equal(t, int64(retryLogSampleFirst), entries[retryLogSampleFirst-1].ContextMap()["retryNum"])
78+
79+
require.Equal(t, "retryable operation failed", entries[retryLogSampleFirst].Message)
80+
require.Equal(t, zap.WarnLevel, entries[retryLogSampleFirst].Level)
81+
82+
require.Equal(t, "another retryable operation failed", entries[retryLogSampleFirst+1].Message)
83+
require.Equal(t, zap.ErrorLevel, entries[retryLogSampleFirst+1].Level)
84+
require.Equal(t, int64(retryLogSampleFirst+2), entries[retryLogSampleFirst+1].ContextMap()["retryNum"])
85+
}
86+
5187
// makeTestLogger creates a Logger instance which produces JSON logs.
5288
func makeTestLogger() (Logger, *zaptest.Buffer) {
5389
buffer := new(zaptest.Buffer)

dm/syncer/syncer.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -260,6 +260,7 @@ type Syncer struct {
260260
idAndCollationMap map[int]string
261261

262262
ddlWorker *DDLWorker
263+
fetchBinlogLogger *zap.Logger
263264
unhandledEventLogger *zap.Logger
264265
}
265266

@@ -296,6 +297,7 @@ func NewSyncer(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, relay rel
296297
syncer.checkpoint = NewRemoteCheckPoint(syncer.tctx, cfg, syncer.metricsProxies, syncer.checkpointID())
297298

298299
syncer.binlogType = binlogstream.RelayToBinlogType(relay)
300+
syncer.fetchBinlogLogger = log.NewRetrySampleLogger(logger, zap.String("binlogType", syncer.binlogType.String()))
299301
syncer.readerHub = streamer.GetReaderHub()
300302

301303
if cfg.ShardMode == config.ShardPessimistic {
@@ -2214,7 +2216,7 @@ func (s *Syncer) Run(ctx context.Context) (err error) {
22142216
}
22152217

22162218
if err != nil {
2217-
s.tctx.L().Error("fail to fetch binlog", log.ShortError(err))
2219+
s.fetchBinlogLogger.Error("fail to fetch binlog", log.ShortError(err))
22182220

22192221
if isConnectionRefusedError(err) {
22202222
return err

dm/worker/server.go

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -92,12 +92,19 @@ type Server struct {
9292
worker *SourceWorker
9393
// relay status will never be put in server.sourceStatus
9494
sourceStatus pb.SourceStatus
95+
96+
retryLogger *zap.Logger
9597
}
9698

9799
// NewServer creates a new Server.
98100
func NewServer(cfg *Config) *Server {
101+
retryLogger := log.With(
102+
zap.String("component", "worker server"),
103+
zap.String("worker", cfg.Name),
104+
)
99105
s := Server{
100-
cfg: cfg,
106+
cfg: cfg,
107+
retryLogger: log.NewRetrySampleLogger(retryLogger),
101108
}
102109
s.ctx, s.cancel = context.WithCancel(context.Background())
103110
s.closed.Store(true) // not start yet
@@ -323,7 +330,7 @@ func (s *Server) observeRelayConfig(ctx context.Context, rev int64) error {
323330
case <-time.After(500 * time.Millisecond):
324331
relaySource, rev1, err1 := ha.GetRelayConfig(s.etcdClient, s.cfg.Name)
325332
if err1 != nil {
326-
log.L().Error("get relay config from etcd failed, will retry later", zap.Error(err1), zap.Int("retryNum", retryNum))
333+
s.retryLogger.Error("get relay config from etcd failed, will retry later", zap.Error(err1), zap.Int("retryNum", retryNum))
327334
retryNum++
328335
if retryNum > retryGetRelayConfig && etcdutil.IsLimitedRetryableError(err1) {
329336
return err1
@@ -416,7 +423,7 @@ func (s *Server) observeSourceBound(ctx context.Context, rev int64) error {
416423
case <-time.After(500 * time.Millisecond):
417424
bound, cfg, rev1, err1 := ha.GetSourceBoundConfig(s.etcdClient, s.cfg.Name)
418425
if err1 != nil {
419-
log.L().Error("get source bound from etcd failed, will retry later", zap.Error(err1), zap.Int("retryNum", retryNum))
426+
s.retryLogger.Error("get source bound from etcd failed, will retry later", zap.Error(err1), zap.Int("retryNum", retryNum))
420427
retryNum++
421428
if retryNum > retryGetSourceBoundConfig && etcdutil.IsLimitedRetryableError(err1) {
422429
return err1

dm/worker/source_worker.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,8 @@ type SourceWorker struct {
6666

6767
l log.Logger
6868

69+
retryLogger *zap.Logger
70+
6971
sourceStatus atomic.Value // stores a pointer to SourceStatus
7072
// sourceStatusErr stores latest error when updating sourceStatus (if any).
7173
sourceStatusErr atomic.Pointer[sourceStatusErrState]
@@ -108,10 +110,12 @@ func NewSourceWorker(
108110
name string,
109111
relayDir string,
110112
) (w *SourceWorker, err error) {
113+
logger := log.With(zap.String("component", "worker controller"))
111114
w = &SourceWorker{
112115
cfg: cfg,
113116
subTaskHolder: newSubTaskHolder(),
114-
l: log.With(zap.String("component", "worker controller")),
117+
l: logger,
118+
retryLogger: log.NewRetrySampleLogger(logger, zap.String("worker", name), zap.String("sourceID", cfg.SourceID)),
115119
etcdClient: etcdClient,
116120
name: name,
117121
relayDir: relayDir,
@@ -833,7 +837,7 @@ func (w *SourceWorker) observeSubtaskStage(ctx context.Context, etcdCli *clientv
833837
case <-time.After(500 * time.Millisecond):
834838
rev, err = w.resetSubtaskStage()
835839
if err != nil {
836-
log.L().Error("resetSubtaskStage is failed, will retry later", zap.Error(err), zap.Int("retryNum", retryNum))
840+
w.retryLogger.Error("resetSubtaskStage is failed, will retry later", zap.Error(err), zap.Int("retryNum", retryNum))
837841
}
838842
}
839843
retryNum++
@@ -973,7 +977,7 @@ func (w *SourceWorker) observeRelayStage(ctx context.Context, etcdCli *clientv3.
973977
case <-time.After(500 * time.Millisecond):
974978
stage, rev1, err1 := ha.GetRelayStage(etcdCli, w.cfg.SourceID)
975979
if err1 != nil {
976-
log.L().Error("get source bound from etcd failed, will retry later", zap.Error(err1), zap.Int("retryNum", retryNum))
980+
w.retryLogger.Error("get relay stage from etcd failed, will retry later", zap.Error(err1), zap.Int("retryNum", retryNum))
977981
break
978982
}
979983
rev = rev1
@@ -1252,7 +1256,7 @@ func (w *SourceWorker) observeValidatorStage(ctx context.Context, lastUsedRev in
12521256
w.RUnlock()
12531257
startRevision, err = w.getCurrentValidatorRevision(sourceID)
12541258
if err != nil {
1255-
log.L().Error("reset validator stage failed, will retry later", zap.Error(err), zap.Int("retryNum", retryNum))
1259+
w.retryLogger.Error("reset validator stage failed, will retry later", zap.Error(err), zap.Int("retryNum", retryNum))
12561260
}
12571261
}
12581262
retryNum++

0 commit comments

Comments
 (0)