Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions dm/config/subtask.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,9 @@ func (c *SubTaskConfig) Adjust(verifyDecryptPassword bool) error {
if c.SyncerConfig.QueueSize == 0 {
c.SyncerConfig.QueueSize = defaultQueueSize
}
if c.SyncerConfig.EventCacheCount == 0 {
c.SyncerConfig.EventCacheCount = defaultEventCacheCount
}
if c.SyncerConfig.CheckpointFlushInterval == 0 {
c.SyncerConfig.CheckpointFlushInterval = defaultCheckpointFlushInterval
}
Expand Down
25 changes: 19 additions & 6 deletions dm/config/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ var (
defaultWorkerCount = 16
defaultBatch = 100
defaultQueueSize = 1024 // do not give too large default value to avoid OOM
defaultCheckpointFlushInterval = 30 // in seconds
defaultEventCacheCount = 10240

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

It's good practice to add a comment explaining the purpose and impact of this default value, similar to defaultQueueSize. This helps users understand the configuration better.

Suggested change
defaultEventCacheCount = 10240
defaultEventCacheCount = 10240 // the count of binlog event cache. If you have a large transaction, you may need to enlarge it.

defaultCheckpointFlushInterval = 30 // in seconds
defaultSafeModeDuration = strconv.Itoa(2*defaultCheckpointFlushInterval) + "s"

// TargetDBConfig.
Expand Down Expand Up @@ -390,10 +391,11 @@ func (m *LoaderConfig) adjust() error {

// SyncerConfig represents syncer process unit's specific config.
type SyncerConfig struct {
MetaFile string `yaml:"meta-file" toml:"meta-file" json:"meta-file"` // meta filename, used only when load SubConfig directly
WorkerCount int `yaml:"worker-count" toml:"worker-count" json:"worker-count"`
Batch int `yaml:"batch" toml:"batch" json:"batch"`
QueueSize int `yaml:"queue-size" toml:"queue-size" json:"queue-size"`
MetaFile string `yaml:"meta-file" toml:"meta-file" json:"meta-file"` // meta filename, used only when load SubConfig directly
WorkerCount int `yaml:"worker-count" toml:"worker-count" json:"worker-count"`
Batch int `yaml:"batch" toml:"batch" json:"batch"`
QueueSize int `yaml:"queue-size" toml:"queue-size" json:"queue-size"`
EventCacheCount int `yaml:"event-cache-count" toml:"event-cache-count" json:"event-cache-count"`

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

For better maintainability and user-friendliness, please add a comment explaining what EventCacheCount is for, similar to how other fields in this struct are commented.

Suggested change
EventCacheCount int `yaml:"event-cache-count" toml:"event-cache-count" json:"event-cache-count"`
// EventCacheCount is the count of binlog event cache. If you have a large transaction, you may need to enlarge it.
EventCacheCount int `yaml:"event-cache-count" toml:"event-cache-count" json:"event-cache-count"`

// checkpoint flush interval in seconds.
CheckpointFlushInterval int `yaml:"checkpoint-flush-interval" toml:"checkpoint-flush-interval" json:"checkpoint-flush-interval"`
// TODO: add this two new config items for openapi.
Expand All @@ -420,6 +422,7 @@ func DefaultSyncerConfig() SyncerConfig {
WorkerCount: defaultWorkerCount,
Batch: defaultBatch,
QueueSize: defaultQueueSize,
EventCacheCount: defaultEventCacheCount,
CheckpointFlushInterval: defaultCheckpointFlushInterval,
SafeModeDuration: defaultSafeModeDuration,
}
Expand Down Expand Up @@ -448,6 +451,7 @@ type ValidatorConfig struct {
BatchQuerySize int `yaml:"batch-query-size" toml:"batch-query-size" json:"batch-query-size"`
MaxPendingRowSize string `yaml:"max-pending-row-size" toml:"max-pending-row-size" json:"max-pending-row-size"`
MaxPendingRowCount int `yaml:"max-pending-row-count" toml:"max-pending-row-count" json:"max-pending-row-count"`
EventCacheCount int `yaml:"event-cache-count" toml:"event-cache-count" json:"event-cache-count"`

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

To improve configuration clarity for users, please add a comment for EventCacheCount explaining its purpose.

Suggested change
EventCacheCount int `yaml:"event-cache-count" toml:"event-cache-count" json:"event-cache-count"`
// EventCacheCount is the count of binlog event cache. If you have a large transaction, you may need to enlarge it.
EventCacheCount int `yaml:"event-cache-count" toml:"event-cache-count" json:"event-cache-count"`

StartTime string `yaml:"-" toml:"start-time" json:"-"`
}

Expand Down Expand Up @@ -487,12 +491,16 @@ func (v *ValidatorConfig) Adjust() error {
if v.MaxPendingRowCount == 0 {
v.MaxPendingRowCount = DefaultValidatorMaxPendingRow
}
if v.EventCacheCount == 0 {
v.EventCacheCount = defaultEventCacheCount
}
return nil
}

func defaultValidatorConfig() ValidatorConfig {
return ValidatorConfig{
Mode: ValidationNone,
Mode: ValidationNone,
EventCacheCount: defaultEventCacheCount,
}
}

Expand Down Expand Up @@ -883,6 +891,9 @@ func (c *TaskConfig) adjust() error {
if inst.Syncer.QueueSize == 0 {
inst.Syncer.QueueSize = defaultQueueSize
}
if inst.Syncer.EventCacheCount == 0 {
inst.Syncer.EventCacheCount = defaultEventCacheCount
}
if inst.Syncer.CheckpointFlushInterval == 0 {
inst.Syncer.CheckpointFlushInterval = defaultCheckpointFlushInterval
}
Expand Down Expand Up @@ -1161,6 +1172,7 @@ type SyncerConfigForDowngrade struct {
WorkerCount int `yaml:"worker-count"`
Batch int `yaml:"batch"`
QueueSize int `yaml:"queue-size"`
EventCacheCount int `yaml:"event-cache-count"`
CheckpointFlushInterval int `yaml:"checkpoint-flush-interval"`
MaxRetry int `yaml:"max-retry"`
EnableGTID bool `yaml:"enable-gtid"`
Expand All @@ -1182,6 +1194,7 @@ func NewSyncerConfigsForDowngrade(syncerConfigs map[string]*SyncerConfig) map[st
WorkerCount: syncerConfig.WorkerCount,
Batch: syncerConfig.Batch,
QueueSize: syncerConfig.QueueSize,
EventCacheCount: syncerConfig.EventCacheCount,
CheckpointFlushInterval: syncerConfig.CheckpointFlushInterval,
MaxRetry: syncerConfig.MaxRetry,
EnableGTID: syncerConfig.EnableGTID,
Expand Down
2 changes: 1 addition & 1 deletion dm/syncer/data_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ func (v *DataValidator) initialize() error {
return err
}

v.syncCfg, err = subtaskCfg2BinlogSyncerCfg(v.cfg, v.timezone, v.syncer.baList)
v.syncCfg, err = subtaskCfg2BinlogSyncerCfg(v.cfg, v.timezone, v.syncer.baList, v.cfg.ValidatorCfg.EventCacheCount)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion dm/syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ func (s *Syncer) Init(ctx context.Context) (err error) {
return terror.ErrSyncerUnitGenBAList.Delegate(err)
}

s.syncCfg, err = subtaskCfg2BinlogSyncerCfg(s.cfg, s.timezone, s.baList)
s.syncCfg, err = subtaskCfg2BinlogSyncerCfg(s.cfg, s.timezone, s.baList, s.cfg.SyncerConfig.EventCacheCount)
if err != nil {
return err
}
Expand Down
3 changes: 2 additions & 1 deletion dm/syncer/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func str2TimezoneOrFromDB(tctx *tcontext.Context, tzStr string, dbCfg conn.Scope
return loc, tzStr, nil
}

func subtaskCfg2BinlogSyncerCfg(cfg *config.SubTaskConfig, timezone *time.Location, baList *filter.Filter) (replication.BinlogSyncerConfig, error) {
func subtaskCfg2BinlogSyncerCfg(cfg *config.SubTaskConfig, timezone *time.Location, baList *filter.Filter, eventCacheCount int) (replication.BinlogSyncerConfig, error) {
var tlsConfig *tls.Config
var err error
if cfg.From.Security != nil {
Expand Down Expand Up @@ -213,6 +213,7 @@ func subtaskCfg2BinlogSyncerCfg(cfg *config.SubTaskConfig, timezone *time.Locati
TLSConfig: tlsConfig,
RowsEventDecodeFunc: rowsEventDecodeFunc,
Localhost: h,
EventCacheCount: eventCacheCount,
}
// when retry count > 1, go-mysql will retry sync from the previous GTID set in GTID mode,
// which may get duplicate binlog event after retry success. so just set retry count = 1, and task
Expand Down
15 changes: 15 additions & 0 deletions dm/syncer/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@ import (
"time"

"github.com/DATA-DOG/go-sqlmock"
"github.com/go-mysql-org/go-mysql/mysql"
"github.com/pingcap/tidb/pkg/parser"
"github.com/pingcap/tidb/pkg/parser/ast"
pmodel "github.com/pingcap/tidb/pkg/parser/model"
_ "github.com/pingcap/tidb/pkg/types/parser_driver"
"github.com/pingcap/tidb/pkg/util/filter"
"github.com/pingcap/tiflow/dm/config"
"github.com/pingcap/tiflow/dm/pkg/conn"
tcontext "github.com/pingcap/tiflow/dm/pkg/context"
"github.com/pingcap/tiflow/dm/syncer/dbconn"
Expand Down Expand Up @@ -129,6 +131,19 @@ func TestRecordSourceTbls(t *testing.T) {
require.Len(t, sourceTbls, 0)
}

func TestSubtaskCfg2BinlogSyncerCfgEventCacheCount(t *testing.T) {
cfg := &config.SubTaskConfig{
ServerID: 1234,
Flavor: mysql.MySQLFlavor,
WorkerName: "worker-01",
From: config.GetDBConfigForTest(),
}

syncCfg, err := subtaskCfg2BinlogSyncerCfg(cfg, time.UTC, nil, 2048)
require.NoError(t, err)
require.Equal(t, 2048, syncCfg.EventCacheCount)
}

func TestGetDDLStatusFromTiDB(t *testing.T) {
var (
cfg = genDefaultSubTaskConfig4Test()
Expand Down
Loading