-
Notifications
You must be signed in to change notification settings - Fork 298
dm: make binlog event cache configurable #12411
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||
|---|---|---|---|---|---|---|---|---|
|
|
@@ -103,7 +103,8 @@ var ( | |||||||
| defaultWorkerCount = 16 | ||||||||
| defaultBatch = 100 | ||||||||
| defaultQueueSize = 1024 // do not give too large default value to avoid OOM | ||||||||
| defaultCheckpointFlushInterval = 30 // in seconds | ||||||||
| defaultEventCacheCount = 10240 | ||||||||
| defaultCheckpointFlushInterval = 30 // in seconds | ||||||||
| defaultSafeModeDuration = strconv.Itoa(2*defaultCheckpointFlushInterval) + "s" | ||||||||
|
|
||||||||
| // TargetDBConfig. | ||||||||
|
|
@@ -394,10 +395,11 @@ func (m *LoaderConfig) adjust() error { | |||||||
|
|
||||||||
| // SyncerConfig represents syncer process unit's specific config. | ||||||||
| type SyncerConfig struct { | ||||||||
| MetaFile string `yaml:"meta-file" toml:"meta-file" json:"meta-file"` // meta filename, used only when load SubConfig directly | ||||||||
| WorkerCount int `yaml:"worker-count" toml:"worker-count" json:"worker-count"` | ||||||||
| Batch int `yaml:"batch" toml:"batch" json:"batch"` | ||||||||
| QueueSize int `yaml:"queue-size" toml:"queue-size" json:"queue-size"` | ||||||||
| MetaFile string `yaml:"meta-file" toml:"meta-file" json:"meta-file"` // meta filename, used only when load SubConfig directly | ||||||||
| WorkerCount int `yaml:"worker-count" toml:"worker-count" json:"worker-count"` | ||||||||
| Batch int `yaml:"batch" toml:"batch" json:"batch"` | ||||||||
| QueueSize int `yaml:"queue-size" toml:"queue-size" json:"queue-size"` | ||||||||
| EventCacheCount int `yaml:"event-cache-count" toml:"event-cache-count" json:"event-cache-count"` | ||||||||
| // checkpoint flush interval in seconds. | ||||||||
| CheckpointFlushInterval int `yaml:"checkpoint-flush-interval" toml:"checkpoint-flush-interval" json:"checkpoint-flush-interval"` | ||||||||
| // TODO: add this two new config items for openapi. | ||||||||
|
|
@@ -424,6 +426,7 @@ func DefaultSyncerConfig() SyncerConfig { | |||||||
| WorkerCount: defaultWorkerCount, | ||||||||
| Batch: defaultBatch, | ||||||||
| QueueSize: defaultQueueSize, | ||||||||
| EventCacheCount: defaultEventCacheCount, | ||||||||
| CheckpointFlushInterval: defaultCheckpointFlushInterval, | ||||||||
| SafeModeDuration: defaultSafeModeDuration, | ||||||||
| } | ||||||||
|
|
@@ -452,6 +455,7 @@ type ValidatorConfig struct { | |||||||
| BatchQuerySize int `yaml:"batch-query-size" toml:"batch-query-size" json:"batch-query-size"` | ||||||||
| MaxPendingRowSize string `yaml:"max-pending-row-size" toml:"max-pending-row-size" json:"max-pending-row-size"` | ||||||||
| MaxPendingRowCount int `yaml:"max-pending-row-count" toml:"max-pending-row-count" json:"max-pending-row-count"` | ||||||||
| EventCacheCount int `yaml:"event-cache-count" toml:"event-cache-count" json:"event-cache-count"` | ||||||||
| StartTime string `yaml:"-" toml:"start-time" json:"-"` | ||||||||
| } | ||||||||
|
|
||||||||
|
|
@@ -491,12 +495,16 @@ func (v *ValidatorConfig) Adjust() error { | |||||||
| if v.MaxPendingRowCount == 0 { | ||||||||
| v.MaxPendingRowCount = DefaultValidatorMaxPendingRow | ||||||||
| } | ||||||||
| if v.EventCacheCount == 0 { | ||||||||
| v.EventCacheCount = defaultEventCacheCount | ||||||||
| } | ||||||||
| return nil | ||||||||
| } | ||||||||
|
|
||||||||
| func defaultValidatorConfig() ValidatorConfig { | ||||||||
| return ValidatorConfig{ | ||||||||
| Mode: ValidationNone, | ||||||||
| Mode: ValidationNone, | ||||||||
| EventCacheCount: defaultEventCacheCount, | ||||||||
|
Comment on lines
+506
to
+507
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. To centralize default value handling, consider setting the default for
Suggested change
|
||||||||
| } | ||||||||
| } | ||||||||
|
|
||||||||
|
|
@@ -887,6 +895,9 @@ func (c *TaskConfig) adjust() error { | |||||||
| if inst.Syncer.QueueSize == 0 { | ||||||||
| inst.Syncer.QueueSize = defaultQueueSize | ||||||||
| } | ||||||||
| if inst.Syncer.EventCacheCount == 0 { | ||||||||
| inst.Syncer.EventCacheCount = defaultEventCacheCount | ||||||||
| } | ||||||||
|
Comment on lines
+898
to
+900
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This check appears to be redundant. |
||||||||
| if inst.Syncer.CheckpointFlushInterval == 0 { | ||||||||
| inst.Syncer.CheckpointFlushInterval = defaultCheckpointFlushInterval | ||||||||
| } | ||||||||
|
|
@@ -1165,6 +1176,7 @@ type SyncerConfigForDowngrade struct { | |||||||
| WorkerCount int `yaml:"worker-count"` | ||||||||
| Batch int `yaml:"batch"` | ||||||||
| QueueSize int `yaml:"queue-size"` | ||||||||
| EventCacheCount int `yaml:"event-cache-count"` | ||||||||
| CheckpointFlushInterval int `yaml:"checkpoint-flush-interval"` | ||||||||
| MaxRetry int `yaml:"max-retry"` | ||||||||
| EnableGTID bool `yaml:"enable-gtid"` | ||||||||
|
|
@@ -1186,6 +1198,7 @@ func NewSyncerConfigsForDowngrade(syncerConfigs map[string]*SyncerConfig) map[st | |||||||
| WorkerCount: syncerConfig.WorkerCount, | ||||||||
| Batch: syncerConfig.Batch, | ||||||||
| QueueSize: syncerConfig.QueueSize, | ||||||||
| EventCacheCount: syncerConfig.EventCacheCount, | ||||||||
| CheckpointFlushInterval: syncerConfig.CheckpointFlushInterval, | ||||||||
| MaxRetry: syncerConfig.MaxRetry, | ||||||||
| EnableGTID: syncerConfig.EnableGTID, | ||||||||
|
|
||||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -314,7 +314,7 @@ func (v *DataValidator) initialize() error { | |
| return err | ||
| } | ||
|
|
||
| v.syncCfg, err = subtaskCfg2BinlogSyncerCfg(v.cfg, v.timezone, v.syncer.baList) | ||
| v.syncCfg, err = subtaskCfg2BinlogSyncerCfg(v.cfg, v.timezone, v.syncer.baList, v.cfg.ValidatorCfg.EventCacheCount) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. maybe use the same config as syncer, validator shares many config as syncer |
||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To centralize default value handling, consider setting the default for
EventCacheCountonly within theAdjustmethods (TaskConfig.adjustandSubTaskConfig.Adjust), and removing it from here. This avoids redundancy and makes it clearer where defaults are applied.