Skip to content

Commit 2ddebec

Browse files
authored
checker: get incremental data without list | pd=release-8.5-20260121-v8.5.5 tikv=release-8.5-20260121-v8.5.5 tidb=release-8.5-20260121-v8.5.5 (#4778)
close #4244
1 parent 161519c commit 2ddebec

File tree

18 files changed

+1021
-390
lines changed

18 files changed

+1021
-390
lines changed

api/v2/model.go

Lines changed: 27 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -497,14 +497,15 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig(
497497
var cloudStorageConfig *config.CloudStorageConfig
498498
if c.Sink.CloudStorageConfig != nil {
499499
cloudStorageConfig = &config.CloudStorageConfig{
500-
WorkerCount: c.Sink.CloudStorageConfig.WorkerCount,
501-
FlushInterval: c.Sink.CloudStorageConfig.FlushInterval,
502-
FileSize: c.Sink.CloudStorageConfig.FileSize,
503-
OutputColumnID: c.Sink.CloudStorageConfig.OutputColumnID,
504-
FileExpirationDays: c.Sink.CloudStorageConfig.FileExpirationDays,
505-
FileCleanupCronSpec: c.Sink.CloudStorageConfig.FileCleanupCronSpec,
506-
FlushConcurrency: c.Sink.CloudStorageConfig.FlushConcurrency,
507-
OutputRawChangeEvent: c.Sink.CloudStorageConfig.OutputRawChangeEvent,
500+
WorkerCount: c.Sink.CloudStorageConfig.WorkerCount,
501+
FlushInterval: c.Sink.CloudStorageConfig.FlushInterval,
502+
FileSize: c.Sink.CloudStorageConfig.FileSize,
503+
OutputColumnID: c.Sink.CloudStorageConfig.OutputColumnID,
504+
FileExpirationDays: c.Sink.CloudStorageConfig.FileExpirationDays,
505+
FileCleanupCronSpec: c.Sink.CloudStorageConfig.FileCleanupCronSpec,
506+
FlushConcurrency: c.Sink.CloudStorageConfig.FlushConcurrency,
507+
EnableSchemaIndexByGetObject: c.Sink.CloudStorageConfig.EnableSchemaIndexByGetObject,
508+
OutputRawChangeEvent: c.Sink.CloudStorageConfig.OutputRawChangeEvent,
508509
}
509510
}
510511
var debeziumConfig *config.DebeziumConfig
@@ -854,14 +855,15 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig {
854855
var cloudStorageConfig *CloudStorageConfig
855856
if cloned.Sink.CloudStorageConfig != nil {
856857
cloudStorageConfig = &CloudStorageConfig{
857-
WorkerCount: cloned.Sink.CloudStorageConfig.WorkerCount,
858-
FlushInterval: cloned.Sink.CloudStorageConfig.FlushInterval,
859-
FileSize: cloned.Sink.CloudStorageConfig.FileSize,
860-
OutputColumnID: cloned.Sink.CloudStorageConfig.OutputColumnID,
861-
FileExpirationDays: cloned.Sink.CloudStorageConfig.FileExpirationDays,
862-
FileCleanupCronSpec: cloned.Sink.CloudStorageConfig.FileCleanupCronSpec,
863-
FlushConcurrency: cloned.Sink.CloudStorageConfig.FlushConcurrency,
864-
OutputRawChangeEvent: cloned.Sink.CloudStorageConfig.OutputRawChangeEvent,
858+
WorkerCount: cloned.Sink.CloudStorageConfig.WorkerCount,
859+
FlushInterval: cloned.Sink.CloudStorageConfig.FlushInterval,
860+
FileSize: cloned.Sink.CloudStorageConfig.FileSize,
861+
OutputColumnID: cloned.Sink.CloudStorageConfig.OutputColumnID,
862+
FileExpirationDays: cloned.Sink.CloudStorageConfig.FileExpirationDays,
863+
FileCleanupCronSpec: cloned.Sink.CloudStorageConfig.FileCleanupCronSpec,
864+
FlushConcurrency: cloned.Sink.CloudStorageConfig.FlushConcurrency,
865+
EnableSchemaIndexByGetObject: cloned.Sink.CloudStorageConfig.EnableSchemaIndexByGetObject,
866+
OutputRawChangeEvent: cloned.Sink.CloudStorageConfig.OutputRawChangeEvent,
865867
}
866868
}
867869
var debeziumConfig *DebeziumConfig
@@ -1490,14 +1492,15 @@ type MySQLConfig struct {
14901492

14911493
// CloudStorageConfig represents a cloud storage sink configuration
14921494
type CloudStorageConfig struct {
1493-
WorkerCount *int `json:"worker_count,omitempty"`
1494-
FlushInterval *string `json:"flush_interval,omitempty"`
1495-
FileSize *int `json:"file_size,omitempty"`
1496-
OutputColumnID *bool `json:"output_column_id,omitempty"`
1497-
FileExpirationDays *int `json:"file_expiration_days,omitempty"`
1498-
FileCleanupCronSpec *string `json:"file_cleanup_cron_spec,omitempty"`
1499-
FlushConcurrency *int `json:"flush_concurrency,omitempty"`
1500-
OutputRawChangeEvent *bool `json:"output_raw_change_event,omitempty"`
1495+
WorkerCount *int `json:"worker_count,omitempty"`
1496+
FlushInterval *string `json:"flush_interval,omitempty"`
1497+
FileSize *int `json:"file_size,omitempty"`
1498+
OutputColumnID *bool `json:"output_column_id,omitempty"`
1499+
FileExpirationDays *int `json:"file_expiration_days,omitempty"`
1500+
FileCleanupCronSpec *string `json:"file_cleanup_cron_spec,omitempty"`
1501+
FlushConcurrency *int `json:"flush_concurrency,omitempty"`
1502+
EnableSchemaIndexByGetObject *bool `json:"enable_schema_index_by_get_object,omitempty"`
1503+
OutputRawChangeEvent *bool `json:"output_raw_change_event,omitempty"`
15011504
}
15021505

15031506
// ChangefeedStatus holds common information of a changefeed in cdc

api/v2/model_test.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,3 +90,22 @@ func TestReplicaConfigConversion(t *testing.T) {
9090
require.Equal(t, "correctness", *apiCfgBack.Integrity.IntegrityCheckLevel)
9191
require.Equal(t, "eventual", *apiCfgBack.Consistent.Level)
9292
}
93+
94+
func TestCloudStorageConfigEnableSchemaIndexByGetObjectConversion(t *testing.T) {
95+
t.Parallel()
96+
97+
apiCfg := &ReplicaConfig{
98+
Sink: &SinkConfig{
99+
CloudStorageConfig: &CloudStorageConfig{
100+
EnableSchemaIndexByGetObject: util.AddressOf(true),
101+
},
102+
},
103+
}
104+
internalCfg := apiCfg.ToInternalReplicaConfig()
105+
require.NotNil(t, internalCfg.Sink.CloudStorageConfig)
106+
require.True(t, util.GetOrZero(internalCfg.Sink.CloudStorageConfig.EnableSchemaIndexByGetObject))
107+
108+
apiCfgBack := ToAPIReplicaConfig(internalCfg)
109+
require.NotNil(t, apiCfgBack.Sink.CloudStorageConfig)
110+
require.True(t, util.GetOrZero(apiCfgBack.Sink.CloudStorageConfig.EnableSchemaIndexByGetObject))
111+
}

cmd/multi-cluster-consistency-checker/advancer/time_window_advancer_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -100,8 +100,8 @@ func TestNewTimeWindowAdvancerInitializeFromCheckpointMissingClusterInfo(t *test
100100
"cluster2": {},
101101
}
102102
s3Watchers := map[string]*watcher.S3Watcher{
103-
"cluster1": watcher.NewS3Watcher(&mockAdvancerWatcher{delta: 1}, storage.NewMemStorage(), nil),
104-
"cluster2": watcher.NewS3Watcher(&mockAdvancerWatcher{delta: 1}, storage.NewMemStorage(), nil),
103+
"cluster1": watcher.NewS3Watcher(&mockAdvancerWatcher{delta: 1}, storage.NewMemStorage(), nil, false),
104+
"cluster2": watcher.NewS3Watcher(&mockAdvancerWatcher{delta: 1}, storage.NewMemStorage(), nil, false),
105105
}
106106
pdClients := map[string]pd.Client{
107107
"cluster1": &mockPDClient{},
@@ -159,8 +159,8 @@ func TestTimeWindowAdvancer_AdvanceMultipleRounds(t *testing.T) {
159159
s3WatcherMockC1 := &mockAdvancerWatcher{delta: 50}
160160
s3WatcherMockC2 := &mockAdvancerWatcher{delta: 50}
161161
s3Watchers := map[string]*watcher.S3Watcher{
162-
"c1": watcher.NewS3Watcher(s3WatcherMockC1, storage.NewMemStorage(), nil),
163-
"c2": watcher.NewS3Watcher(s3WatcherMockC2, storage.NewMemStorage(), nil),
162+
"c1": watcher.NewS3Watcher(s3WatcherMockC1, storage.NewMemStorage(), nil, false),
163+
"c2": watcher.NewS3Watcher(s3WatcherMockC2, storage.NewMemStorage(), nil, false),
164164
}
165165

166166
advancer, _, err := NewTimeWindowAdvancer(ctx, checkpointWatchers, s3Watchers, pdClients, nil)

cmd/multi-cluster-consistency-checker/config/config.example.toml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,12 @@ log-level = "info"
99
# Data directory configuration, contains report and checkpoint data
1010
data-dir = "/tmp/multi-cluster-consistency-checker-data"
1111

12+
# Maximum number of report files to keep in data directory
13+
max-report-files = 1000
14+
15+
# Enable list by file index
16+
enable-list-by-file-index = false
17+
1218
# Tables configuration
1319
[global.tables]
1420
schema1 = ["table1", "table2"]

cmd/multi-cluster-consistency-checker/config/config.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,11 @@ type GlobalConfig struct {
3838
DataDir string `toml:"data-dir" json:"data-dir"`
3939
MaxReportFiles int `toml:"max-report-files" json:"max-report-files"`
4040
Tables map[string][]string `toml:"tables" json:"tables"`
41+
42+
// EnableListByFileIndex is the flag to enable list by file index
43+
// If true, the consistency checker will use the file index to list the files
44+
// If false, the consistency checker will use list directory to list the files
45+
EnableListByFileIndex bool `toml:"enable-list-by-file-index" json:"enable-list-by-file-index"`
4146
}
4247

4348
type PeerClusterChangefeedConfig struct {

0 commit comments

Comments
 (0)