Skip to content

Commit fbc529b

Browse files
committed
update
Signed-off-by: wk989898 <nhsmwk@gmail.com>
1 parent 50763c0 commit fbc529b

File tree

13 files changed

+263
-91
lines changed

13 files changed

+263
-91
lines changed

downstreamadapter/dispatcher/event_dispatcher.go

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,9 @@ func (d *EventDispatcher) cache(dispatcherEvents []DispatcherEvent, wakeCallback
109109
d.cacheEvents.Lock()
110110
defer d.cacheEvents.Unlock()
111111
if d.GetRemovingStatus() {
112-
log.Warn("dispatcher has removed", zap.Any("id", d.id))
112+
if shouldLogDispatcherWarning(&d.lastRemovingLogTime, dispatcherWarnLogInterval) {
113+
log.Warn("dispatcher has removed", zap.Any("id", d.id))
114+
}
113115
return
114116
}
115117
// Here we have to create a new event slice, because dispatcherEvents will be cleaned up in dynamic stream
@@ -174,14 +176,23 @@ func (d *EventDispatcher) EmitBootstrap() bool {
174176
ts := d.GetStartTs()
175177
schemaStore := appcontext.GetService[schemastore.SchemaStore](appcontext.SchemaStore)
176178
currentTables := make([]*common.TableInfo, 0, len(tables))
179+
lastBootstrapWarnLogTime := time.Time{}
180+
logBootstrapWarning := func(msg string, fields ...zap.Field) {
181+
now := time.Now()
182+
if !lastBootstrapWarnLogTime.IsZero() && now.Sub(lastBootstrapWarnLogTime) < dispatcherWarnLogInterval {
183+
return
184+
}
185+
lastBootstrapWarnLogTime = now
186+
log.Warn(msg, fields...)
187+
}
177188
meta := common.KeyspaceMeta{
178189
ID: d.tableSpan.KeyspaceID,
179190
Name: d.sharedInfo.changefeedID.Keyspace(),
180191
}
181192
for _, table := range tables {
182193
err := schemaStore.RegisterTable(meta, table, ts)
183194
if err != nil {
184-
log.Warn("register table to schemaStore failed",
195+
logBootstrapWarning("register table to schemaStore failed",
185196
zap.Int64("tableID", table),
186197
zap.Uint64("startTs", ts),
187198
zap.Error(err),
@@ -190,7 +201,7 @@ func (d *EventDispatcher) EmitBootstrap() bool {
190201
}
191202
tableInfo, err := schemaStore.GetTableInfo(meta, table, ts)
192203
if err != nil {
193-
log.Warn("get table info failed, just ignore",
204+
logBootstrapWarning("get table info failed, just ignore",
194205
zap.Stringer("changefeed", d.sharedInfo.changefeedID),
195206
zap.Error(err))
196207
continue

downstreamadapter/dispatchermanager/dispatcher_manager.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -151,8 +151,10 @@ type DispatcherManager struct {
151151
cancel context.CancelFunc
152152
wg sync.WaitGroup
153153

154-
lastErrorChannelFullLogTime atomic.Int64
155-
lastCollectErrLogTime atomic.Int64
154+
lastErrorChannelFullLogTime atomic.Int64
155+
lastCollectErrLogTime atomic.Int64
156+
lastRedoMetaErrLogTime atomic.Int64
157+
lastRedoMetaInvariantLogTime atomic.Int64
156158

157159
// removeTaskHandles stores the task handles for async dispatcher removal
158160
// map[common.DispatcherID]*threadpool.TaskHandle

downstreamadapter/dispatchermanager/dispatcher_manager_redo.go

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -288,7 +288,9 @@ func (e *DispatcherManager) UpdateRedoMeta(checkpointTs, resolvedTs uint64) {
288288
// only update meta on the one node
289289
d := e.GetTableTriggerRedoDispatcher()
290290
if d == nil {
291-
log.Warn("should not reach here. only update redo meta on the tableTriggerRedoDispatcher")
291+
if shouldLogDispatcherManagerWarning(&e.lastRedoMetaInvariantLogTime, dispatcherManagerWarnLogInterval) {
292+
log.Warn("should not reach here. only update redo meta on the tableTriggerRedoDispatcher")
293+
}
292294
return
293295
}
294296
d.UpdateMeta(checkpointTs, resolvedTs)
@@ -309,7 +311,9 @@ func (e *DispatcherManager) collectRedoMeta(ctx context.Context) error {
309311
return ctx.Err()
310312
case <-ticker.C:
311313
if e.GetTableTriggerRedoDispatcher() == nil {
312-
log.Error("should not reach here. only collect redo meta on the tableTriggerRedoDispatcher")
314+
if shouldLogDispatcherManagerWarning(&e.lastRedoMetaInvariantLogTime, dispatcherManagerWarnLogInterval) {
315+
log.Error("should not reach here. only collect redo meta on the tableTriggerRedoDispatcher")
316+
}
313317
continue
314318
}
315319
logMeta := e.GetTableTriggerRedoDispatcher().GetFlushedMeta()
@@ -326,7 +330,9 @@ func (e *DispatcherManager) collectRedoMeta(ctx context.Context) error {
326330
},
327331
))
328332
if err != nil {
329-
log.Error("failed to send redo request message", zap.Error(err))
333+
if shouldLogDispatcherManagerWarning(&e.lastRedoMetaErrLogTime, dispatcherManagerWarnLogInterval) {
334+
log.Error("failed to send redo request message", zap.Error(err))
335+
}
330336
}
331337
preResolvedTs = logMeta.ResolvedTs
332338
}

downstreamadapter/dispatcherorchestrator/dispatcher_orchestrator.go

Lines changed: 50 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,21 @@ import (
3535
"go.uber.org/zap"
3636
)
3737

38+
const dispatcherOrchestratorWarnLogInterval = 10 * time.Second
39+
40+
func shouldLogDispatcherOrchestratorWarning(lastLogTime *atomic.Int64, interval time.Duration) bool {
41+
now := time.Now().UnixNano()
42+
for {
43+
last := lastLogTime.Load()
44+
if last != 0 && now-last < interval.Nanoseconds() {
45+
return false
46+
}
47+
if lastLogTime.CompareAndSwap(last, now) {
48+
return true
49+
}
50+
}
51+
}
52+
3853
// DispatcherOrchestrator coordinates the creation, deletion, and management of event dispatcher managers
3954
// for different change feeds based on maintainer bootstrap messages.
4055
type DispatcherOrchestrator struct {
@@ -50,6 +65,11 @@ type DispatcherOrchestrator struct {
5065
closed atomic.Bool
5166
// msgGuardWaitGroup waits for in-flight RecvMaintainerRequest handlers before shutdown.
5267
msgGuardWaitGroup util.GuardedWaitGroup
68+
69+
lastUnknownMessageLogTime atomic.Int64
70+
lastHandleMessageErrLogTime atomic.Int64
71+
lastSendResponseErrLogTime atomic.Int64
72+
lastCreateManagerErrLogTime atomic.Int64
5373
}
5474

5575
func New() *DispatcherOrchestrator {
@@ -87,9 +107,11 @@ func (m *DispatcherOrchestrator) RecvMaintainerRequest(
87107

88108
key, ok := getPendingMessageKey(msg)
89109
if !ok {
90-
log.Warn("unknown message type, drop message",
91-
zap.String("type", msg.Type.String()),
92-
zap.Any("message", msg.Message))
110+
if shouldLogDispatcherOrchestratorWarning(&m.lastUnknownMessageLogTime, dispatcherOrchestratorWarnLogInterval) {
111+
log.Warn("unknown message type, drop message",
112+
zap.String("type", msg.Type.String()),
113+
zap.Any("message", msg.Message))
114+
}
93115
return nil
94116
}
95117

@@ -130,21 +152,29 @@ func (m *DispatcherOrchestrator) handleMessages() {
130152
switch req := msg.Message[0].(type) {
131153
case *heartbeatpb.MaintainerBootstrapRequest:
132154
if err := m.handleBootstrapRequest(msg.From, req); err != nil {
133-
log.Error("failed to handle bootstrap request", zap.Error(err))
155+
if shouldLogDispatcherOrchestratorWarning(&m.lastHandleMessageErrLogTime, dispatcherOrchestratorWarnLogInterval) {
156+
log.Error("failed to handle bootstrap request", zap.Error(err))
157+
}
134158
}
135159
case *heartbeatpb.MaintainerPostBootstrapRequest:
136160
// Only the event dispatcher manager with table trigger event dispatcher will receive the post bootstrap request
137161
if err := m.handlePostBootstrapRequest(msg.From, req); err != nil {
138-
log.Error("failed to handle post bootstrap request", zap.Error(err))
162+
if shouldLogDispatcherOrchestratorWarning(&m.lastHandleMessageErrLogTime, dispatcherOrchestratorWarnLogInterval) {
163+
log.Error("failed to handle post bootstrap request", zap.Error(err))
164+
}
139165
}
140166
case *heartbeatpb.MaintainerCloseRequest:
141167
if err := m.handleCloseRequest(msg.From, req); err != nil {
142-
log.Error("failed to handle close request", zap.Error(err))
168+
if shouldLogDispatcherOrchestratorWarning(&m.lastHandleMessageErrLogTime, dispatcherOrchestratorWarnLogInterval) {
169+
log.Error("failed to handle close request", zap.Error(err))
170+
}
143171
}
144172
default:
145-
log.Warn("unknown message type, ignore it",
146-
zap.String("type", msg.Type.String()),
147-
zap.Any("message", msg.Message))
173+
if shouldLogDispatcherOrchestratorWarning(&m.lastUnknownMessageLogTime, dispatcherOrchestratorWarnLogInterval) {
174+
log.Warn("unknown message type, ignore it",
175+
zap.String("type", msg.Type.String()),
176+
zap.Any("message", msg.Message))
177+
}
148178
}
149179

150180
m.msgQueue.Done(key)
@@ -182,8 +212,10 @@ func (m *DispatcherOrchestrator) handleBootstrapRequest(
182212
)
183213
// Fast return the error to maintainer.
184214
if err != nil {
185-
log.Error("failed to create new dispatcher manager",
186-
zap.Any("changefeedID", cfId.Name()), zap.Duration("duration", time.Since(start)), zap.Error(err))
215+
if shouldLogDispatcherOrchestratorWarning(&m.lastCreateManagerErrLogTime, dispatcherOrchestratorWarnLogInterval) {
216+
log.Error("failed to create new dispatcher manager",
217+
zap.Any("changefeedID", cfId.Name()), zap.Duration("duration", time.Since(start)), zap.Error(err))
218+
}
187219

188220
appcontext.GetService[*dispatchermanager.HeartBeatCollector](appcontext.HeartbeatCollector).RemoveDispatcherManager(cfId)
189221

@@ -196,8 +228,10 @@ func (m *DispatcherOrchestrator) handleBootstrapRequest(
196228
Message: err.Error(),
197229
},
198230
}
199-
log.Error("create new dispatcher manager failed",
200-
zap.Any("changefeedID", cfId.Name()), zap.Duration("duration", time.Since(start)), zap.Error(err))
231+
if shouldLogDispatcherOrchestratorWarning(&m.lastCreateManagerErrLogTime, dispatcherOrchestratorWarnLogInterval) {
232+
log.Error("create new dispatcher manager failed",
233+
zap.Any("changefeedID", cfId.Name()), zap.Duration("duration", time.Since(start)), zap.Error(err))
234+
}
201235

202236
return m.sendResponse(from, messaging.MaintainerManagerTopic, response)
203237
}
@@ -396,7 +430,9 @@ func createBootstrapResponse(
396430
func (m *DispatcherOrchestrator) sendResponse(to node.ID, topic string, msg messaging.IOTypeT) error {
397431
message := messaging.NewSingleTargetMessage(to, topic, msg)
398432
if err := m.mc.SendCommand(message); err != nil {
399-
log.Error("failed to send response", zap.Error(err))
433+
if shouldLogDispatcherOrchestratorWarning(&m.lastSendResponseErrLogTime, dispatcherOrchestratorWarnLogInterval) {
434+
log.Error("failed to send response", zap.Error(err))
435+
}
400436
return err
401437
}
402438
return nil

downstreamadapter/sink/kafka/sink.go

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,23 @@ import (
3636

3737
const (
3838
// batchSize is the maximum size of the number of messages in a batch.
39-
batchSize = 2048
39+
batchSize = 2048
40+
kafkaSinkWarnLogInterval = 10 * time.Second
4041
)
4142

43+
func shouldLogKafkaSinkWarning(lastLogTime *atomic.Int64, interval time.Duration) bool {
44+
now := time.Now().UnixNano()
45+
for {
46+
last := lastLogTime.Load()
47+
if last != 0 && now-last < interval.Nanoseconds() {
48+
return false
49+
}
50+
if lastLogTime.CAS(last, now) {
51+
return true
52+
}
53+
}
54+
}
55+
4256
type sink struct {
4357
changefeedID commonType.ChangeFeedID
4458

@@ -61,6 +75,8 @@ type sink struct {
6175
// isNormal indicate whether the sink is in the normal state.
6276
isNormal *atomic.Bool
6377
ctx context.Context
78+
79+
lastSendErrLogTime atomic.Int64
6480
}
6581

6682
func (s *sink) SinkType() commonType.SinkType {
@@ -406,10 +422,12 @@ func (s *sink) sendMessages(ctx context.Context) error {
406422
future.Key.Topic,
407423
future.Key.Partition,
408424
message); err != nil {
409-
log.Error("kafka sink send message failed",
410-
zap.String("keyspace", s.changefeedID.Keyspace()),
411-
zap.String("changefeed", s.changefeedID.Name()),
412-
zap.Error(err))
425+
if shouldLogKafkaSinkWarning(&s.lastSendErrLogTime, kafkaSinkWarnLogInterval) {
426+
log.Error("kafka sink send message failed",
427+
zap.String("keyspace", s.changefeedID.Keyspace()),
428+
zap.String("changefeed", s.changefeedID.Name()),
429+
zap.Error(err))
430+
}
413431
return 0, 0, err
414432
}
415433
return message.GetRowsCount(), int64(message.Length()), nil

downstreamadapter/sink/mysql/sink.go

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,22 @@ import (
3737
const (
3838
// defaultConflictDetectorSlots indicates the default slot count of conflict detector. TODO:check this
3939
defaultConflictDetectorSlots uint64 = 16 * 1024
40+
mysqlSinkWarnLogInterval = 10 * time.Second
4041
)
4142

43+
func shouldLogMySQLSinkWarning(lastLogTime *atomic.Int64, interval time.Duration) bool {
44+
now := time.Now().UnixNano()
45+
for {
46+
last := lastLogTime.Load()
47+
if last != 0 && now-last < interval.Nanoseconds() {
48+
return false
49+
}
50+
if lastLogTime.CAS(last, now) {
51+
return true
52+
}
53+
}
54+
}
55+
4256
// Sink is responsible for writing data to mysql downstream.
4357
// Including DDL and DML.
4458
type Sink struct {
@@ -68,6 +82,8 @@ type Sink struct {
6882
// variable @@tidb_cdc_active_active_sync_stats and is shared by all DML writers.
6983
// It is nil when disabled or unsupported by downstream.
7084
activeActiveSyncStatsCollector *mysql.ActiveActiveSyncStatsCollector
85+
86+
lastProgressUpdateErrLogTime atomic.Int64
7187
}
7288

7389
// Verify is used to verify the sink uri and config is valid
@@ -323,9 +339,11 @@ func (s *Sink) AddCheckpointTs(ts uint64) {
323339
}
324340

325341
if err := s.progressTableWriter.Flush(ts); err != nil {
326-
log.Warn("failed to update active active progress table",
327-
zap.String("changefeed", s.changefeedID.DisplayName.String()),
328-
zap.Error(err))
342+
if shouldLogMySQLSinkWarning(&s.lastProgressUpdateErrLogTime, mysqlSinkWarnLogInterval) {
343+
log.Warn("failed to update active active progress table",
344+
zap.String("changefeed", s.changefeedID.DisplayName.String()),
345+
zap.Error(err))
346+
}
329347
return
330348
}
331349
}

downstreamadapter/sink/pulsar/dml_producer.go

Lines changed: 31 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ package pulsar
1616
import (
1717
"context"
1818
"sync"
19+
"sync/atomic"
1920
"time"
2021

2122
pulsarClient "github.com/apache/pulsar-client-go/pulsar"
@@ -30,6 +31,21 @@ import (
3031
"go.uber.org/zap"
3132
)
3233

34+
const pulsarProducerWarnLogInterval = 10 * time.Second
35+
36+
func shouldLogPulsarProducerWarning(lastLogTime *atomic.Int64, interval time.Duration) bool {
37+
now := time.Now().UnixNano()
38+
for {
39+
last := lastLogTime.Load()
40+
if last != 0 && now-last < interval.Nanoseconds() {
41+
return false
42+
}
43+
if lastLogTime.CompareAndSwap(last, now) {
44+
return true
45+
}
46+
}
47+
}
48+
3349
// dmlProducer is the interface for the pulsar DML message producer.
3450
type dmlProducer interface {
3551
// AsyncSendMessage sends a message asynchronously.
@@ -65,6 +81,9 @@ type dmlProducers struct {
6581
failpointCh chan error
6682
// closeCh is send error
6783
errChan chan error
84+
85+
lastAsyncSendErrLogTime atomic.Int64
86+
lastErrChanFullLogTime atomic.Int64
6887
}
6988

7089
// newDMLProducers creates a new pulsar producer.
@@ -165,12 +184,14 @@ func (p *dmlProducers) asyncSendMessage(
165184
// fail
166185
if err != nil {
167186
e := errors.WrapError(errors.ErrPulsarAsyncSendMessage, err)
168-
log.Error("Pulsar DML producer async send error",
169-
zap.String("keyspace", p.changefeedID.Keyspace()),
170-
zap.String("changefeed", p.changefeedID.ID().String()),
171-
zap.Int("messageSize", len(m.Payload)),
172-
zap.String("topic", topic),
173-
zap.Error(err))
187+
if shouldLogPulsarProducerWarning(&p.lastAsyncSendErrLogTime, pulsarProducerWarnLogInterval) {
188+
log.Error("Pulsar DML producer async send error",
189+
zap.String("keyspace", p.changefeedID.Keyspace()),
190+
zap.String("changefeed", p.changefeedID.ID().String()),
191+
zap.Int("messageSize", len(m.Payload)),
192+
zap.String("topic", topic),
193+
zap.Error(err))
194+
}
174195
pulsar.IncPublishedDMLFail(topic, p.changefeedID.String())
175196
// use this select to avoid send error to a closed channel
176197
// the ctx will always be called before the errChan is closed
@@ -179,8 +200,10 @@ func (p *dmlProducers) asyncSendMessage(
179200
return
180201
case p.errChan <- e:
181202
default:
182-
log.Warn("Error channel is full in pulsar DML producer",
183-
zap.Stringer("changefeed", p.changefeedID), zap.Error(e))
203+
if shouldLogPulsarProducerWarning(&p.lastErrChanFullLogTime, pulsarProducerWarnLogInterval) {
204+
log.Warn("Error channel is full in pulsar DML producer",
205+
zap.Stringer("changefeed", p.changefeedID), zap.Error(e))
206+
}
184207
}
185208
} else if message.Callback != nil {
186209
// success

0 commit comments

Comments
 (0)