Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions dm/config/subtask.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,9 @@ func (c *SubTaskConfig) Adjust(verifyDecryptPassword bool) error {
if c.SyncerConfig.QueueSize == 0 {
c.SyncerConfig.QueueSize = defaultQueueSize
}
if c.SyncerConfig.EventCacheCount == 0 {
c.SyncerConfig.EventCacheCount = defaultEventCacheCount
}
if c.SyncerConfig.CheckpointFlushInterval == 0 {
c.SyncerConfig.CheckpointFlushInterval = defaultCheckpointFlushInterval
}
Expand Down
25 changes: 19 additions & 6 deletions dm/config/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,8 @@ var (
defaultWorkerCount = 16
defaultBatch = 100
defaultQueueSize = 1024 // do not give too large default value to avoid OOM
defaultCheckpointFlushInterval = 30 // in seconds
defaultEventCacheCount = 10240
defaultCheckpointFlushInterval = 30 // in seconds
defaultSafeModeDuration = strconv.Itoa(2*defaultCheckpointFlushInterval) + "s"

// TargetDBConfig.
Expand Down Expand Up @@ -394,10 +395,11 @@ func (m *LoaderConfig) adjust() error {

// SyncerConfig represents syncer process unit's specific config.
type SyncerConfig struct {
MetaFile string `yaml:"meta-file" toml:"meta-file" json:"meta-file"` // meta filename, used only when load SubConfig directly
WorkerCount int `yaml:"worker-count" toml:"worker-count" json:"worker-count"`
Batch int `yaml:"batch" toml:"batch" json:"batch"`
QueueSize int `yaml:"queue-size" toml:"queue-size" json:"queue-size"`
MetaFile string `yaml:"meta-file" toml:"meta-file" json:"meta-file"` // meta filename, used only when load SubConfig directly
WorkerCount int `yaml:"worker-count" toml:"worker-count" json:"worker-count"`
Batch int `yaml:"batch" toml:"batch" json:"batch"`
QueueSize int `yaml:"queue-size" toml:"queue-size" json:"queue-size"`
EventCacheCount int `yaml:"event-cache-count" toml:"event-cache-count" json:"event-cache-count"`
// checkpoint flush interval in seconds.
CheckpointFlushInterval int `yaml:"checkpoint-flush-interval" toml:"checkpoint-flush-interval" json:"checkpoint-flush-interval"`
// TODO: add this two new config items for openapi.
Expand All @@ -424,6 +426,7 @@ func DefaultSyncerConfig() SyncerConfig {
WorkerCount: defaultWorkerCount,
Batch: defaultBatch,
QueueSize: defaultQueueSize,
EventCacheCount: defaultEventCacheCount,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

To centralize default value handling, consider setting the default for EventCacheCount only within the Adjust methods (TaskConfig.adjust and SubTaskConfig.Adjust), and removing it from here. This avoids redundancy and makes it clearer where defaults are applied.

CheckpointFlushInterval: defaultCheckpointFlushInterval,
SafeModeDuration: defaultSafeModeDuration,
}
Expand Down Expand Up @@ -452,6 +455,7 @@ type ValidatorConfig struct {
BatchQuerySize int `yaml:"batch-query-size" toml:"batch-query-size" json:"batch-query-size"`
MaxPendingRowSize string `yaml:"max-pending-row-size" toml:"max-pending-row-size" json:"max-pending-row-size"`
MaxPendingRowCount int `yaml:"max-pending-row-count" toml:"max-pending-row-count" json:"max-pending-row-count"`
EventCacheCount int `yaml:"event-cache-count" toml:"event-cache-count" json:"event-cache-count"`
StartTime string `yaml:"-" toml:"start-time" json:"-"`
}

Expand Down Expand Up @@ -491,12 +495,16 @@ func (v *ValidatorConfig) Adjust() error {
if v.MaxPendingRowCount == 0 {
v.MaxPendingRowCount = DefaultValidatorMaxPendingRow
}
if v.EventCacheCount == 0 {
v.EventCacheCount = defaultEventCacheCount
}
return nil
}

func defaultValidatorConfig() ValidatorConfig {
return ValidatorConfig{
Mode: ValidationNone,
Mode: ValidationNone,
EventCacheCount: defaultEventCacheCount,
Comment on lines +506 to +507

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

To centralize default value handling, consider setting the default for EventCacheCount only within ValidatorConfig.Adjust, and removing it from here. This avoids redundancy and makes it clearer where defaults are applied.

Suggested change
Mode: ValidationNone,
EventCacheCount: defaultEventCacheCount,
Mode: ValidationNone,

}
}

Expand Down Expand Up @@ -887,6 +895,9 @@ func (c *TaskConfig) adjust() error {
if inst.Syncer.QueueSize == 0 {
inst.Syncer.QueueSize = defaultQueueSize
}
if inst.Syncer.EventCacheCount == 0 {
inst.Syncer.EventCacheCount = defaultEventCacheCount
}
Comment on lines +898 to +900

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This check appears to be redundant. SyncerConfig instances are initialized with default values from DefaultSyncerConfig through UnmarshalYAML. Consequently, this check will likely always evaluate to false. It would be cleaner to rely on DefaultSyncerConfig as the single source for defaults and remove this check.

if inst.Syncer.CheckpointFlushInterval == 0 {
inst.Syncer.CheckpointFlushInterval = defaultCheckpointFlushInterval
}
Expand Down Expand Up @@ -1165,6 +1176,7 @@ type SyncerConfigForDowngrade struct {
WorkerCount int `yaml:"worker-count"`
Batch int `yaml:"batch"`
QueueSize int `yaml:"queue-size"`
EventCacheCount int `yaml:"event-cache-count"`
CheckpointFlushInterval int `yaml:"checkpoint-flush-interval"`
MaxRetry int `yaml:"max-retry"`
EnableGTID bool `yaml:"enable-gtid"`
Expand All @@ -1186,6 +1198,7 @@ func NewSyncerConfigsForDowngrade(syncerConfigs map[string]*SyncerConfig) map[st
WorkerCount: syncerConfig.WorkerCount,
Batch: syncerConfig.Batch,
QueueSize: syncerConfig.QueueSize,
EventCacheCount: syncerConfig.EventCacheCount,
CheckpointFlushInterval: syncerConfig.CheckpointFlushInterval,
MaxRetry: syncerConfig.MaxRetry,
EnableGTID: syncerConfig.EnableGTID,
Expand Down
2 changes: 1 addition & 1 deletion dm/syncer/data_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ func (v *DataValidator) initialize() error {
return err
}

v.syncCfg, err = subtaskCfg2BinlogSyncerCfg(v.cfg, v.timezone, v.syncer.baList)
v.syncCfg, err = subtaskCfg2BinlogSyncerCfg(v.cfg, v.timezone, v.syncer.baList, v.cfg.ValidatorCfg.EventCacheCount)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe use the same config as syncer, validator shares many config as syncer

if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion dm/syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ func (s *Syncer) Init(ctx context.Context) (err error) {
return terror.ErrSyncerUnitGenBAList.Delegate(err)
}

s.syncCfg, err = subtaskCfg2BinlogSyncerCfg(s.cfg, s.timezone, s.baList)
s.syncCfg, err = subtaskCfg2BinlogSyncerCfg(s.cfg, s.timezone, s.baList, s.cfg.SyncerConfig.EventCacheCount)
if err != nil {
return err
}
Expand Down
3 changes: 2 additions & 1 deletion dm/syncer/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func str2TimezoneOrFromDB(tctx *tcontext.Context, tzStr string, dbCfg conn.Scope
return loc, tzStr, nil
}

func subtaskCfg2BinlogSyncerCfg(cfg *config.SubTaskConfig, timezone *time.Location, baList *filter.Filter) (replication.BinlogSyncerConfig, error) {
func subtaskCfg2BinlogSyncerCfg(cfg *config.SubTaskConfig, timezone *time.Location, baList *filter.Filter, eventCacheCount int) (replication.BinlogSyncerConfig, error) {
var tlsConfig *tls.Config
var err error
if cfg.From.Security != nil {
Expand Down Expand Up @@ -213,6 +213,7 @@ func subtaskCfg2BinlogSyncerCfg(cfg *config.SubTaskConfig, timezone *time.Locati
TLSConfig: tlsConfig,
RowsEventDecodeFunc: rowsEventDecodeFunc,
Localhost: h,
EventCacheCount: eventCacheCount,
}
// when retry count > 1, go-mysql will retry sync from the previous GTID set in GTID mode,
// which may get duplicate binlog event after retry success. so just set retry count = 1, and task
Expand Down
15 changes: 15 additions & 0 deletions dm/syncer/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@ import (
"time"

"github.com/DATA-DOG/go-sqlmock"
"github.com/go-mysql-org/go-mysql/mysql"
"github.com/pingcap/tidb/pkg/parser"
"github.com/pingcap/tidb/pkg/parser/ast"
_ "github.com/pingcap/tidb/pkg/types/parser_driver"
"github.com/pingcap/tidb/pkg/util/filter"
"github.com/pingcap/tiflow/dm/config"
"github.com/pingcap/tiflow/dm/pkg/conn"
tcontext "github.com/pingcap/tiflow/dm/pkg/context"
"github.com/pingcap/tiflow/dm/syncer/dbconn"
Expand Down Expand Up @@ -128,6 +130,19 @@ func TestRecordSourceTbls(t *testing.T) {
require.Len(t, sourceTbls, 0)
}

func TestSubtaskCfg2BinlogSyncerCfgEventCacheCount(t *testing.T) {
cfg := &config.SubTaskConfig{
ServerID: 1234,
Flavor: mysql.MySQLFlavor,
WorkerName: "worker-01",
From: config.GetDBConfigForTest(),
}

syncCfg, err := subtaskCfg2BinlogSyncerCfg(cfg, time.UTC, nil, 2048)
require.NoError(t, err)
require.Equal(t, 2048, syncCfg.EventCacheCount)
}

func TestGetDDLStatusFromTiDB(t *testing.T) {
var (
cfg = genDefaultSubTaskConfig4Test()
Expand Down