Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,8 @@ func TestNewTimeWindowAdvancerInitializeFromCheckpointMissingClusterInfo(t *test
"cluster2": {},
}
s3Watchers := map[string]*watcher.S3Watcher{
"cluster1": watcher.NewS3Watcher(&mockAdvancerWatcher{delta: 1}, storage.NewMemStorage(), nil),
"cluster2": watcher.NewS3Watcher(&mockAdvancerWatcher{delta: 1}, storage.NewMemStorage(), nil),
"cluster1": watcher.NewS3Watcher(&mockAdvancerWatcher{delta: 1}, storage.NewMemStorage(), nil, false),
"cluster2": watcher.NewS3Watcher(&mockAdvancerWatcher{delta: 1}, storage.NewMemStorage(), nil, false),
}
pdClients := map[string]pd.Client{
"cluster1": &mockPDClient{},
Expand Down Expand Up @@ -159,8 +159,8 @@ func TestTimeWindowAdvancer_AdvanceMultipleRounds(t *testing.T) {
s3WatcherMockC1 := &mockAdvancerWatcher{delta: 50}
s3WatcherMockC2 := &mockAdvancerWatcher{delta: 50}
s3Watchers := map[string]*watcher.S3Watcher{
"c1": watcher.NewS3Watcher(s3WatcherMockC1, storage.NewMemStorage(), nil),
"c2": watcher.NewS3Watcher(s3WatcherMockC2, storage.NewMemStorage(), nil),
"c1": watcher.NewS3Watcher(s3WatcherMockC1, storage.NewMemStorage(), nil, false),
"c2": watcher.NewS3Watcher(s3WatcherMockC2, storage.NewMemStorage(), nil, false),
}

advancer, _, err := NewTimeWindowAdvancer(ctx, checkpointWatchers, s3Watchers, pdClients, nil)
Expand Down
5 changes: 5 additions & 0 deletions cmd/multi-cluster-consistency-checker/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ type GlobalConfig struct {
DataDir string `toml:"data-dir" json:"data-dir"`
MaxReportFiles int `toml:"max-report-files" json:"max-report-files"`
Tables map[string][]string `toml:"tables" json:"tables"`

// EnableListByFileIndex is the flag to enable list by file index
// If true, the consistency checker will use the file index to list the files
// If false, the consistency checker will use list directory to list the files
EnableListByFileIndex bool `toml:"enable-list-by-file-index" json:"enable-list-by-file-index"`
}

type PeerClusterChangefeedConfig struct {
Expand Down
255 changes: 10 additions & 245 deletions cmd/multi-cluster-consistency-checker/consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,8 @@ package consumer
import (
"context"
"encoding/json"
"fmt"
"path"
"strings"
"sync"

perrors "github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/ticdc/cmd/multi-cluster-consistency-checker/recorder"
"github.com/pingcap/ticdc/cmd/multi-cluster-consistency-checker/types"
Expand Down Expand Up @@ -75,8 +71,6 @@ type schemaKey struct {
table string
}

var ErrWalkDirEnd = perrors.Normalize("walk dir end", perrors.RFCCodeText("CDC:ErrWalkDirEnd"))

type CurrentTableVersion struct {
mu sync.RWMutex
currentTableVersionMap map[schemaKey]types.VersionKey
Expand Down Expand Up @@ -242,13 +236,16 @@ type S3Consumer struct {
currentTableVersion *CurrentTableVersion
tableDMLIdx *TableDMLIdx
schemaDefinitions *SchemaDefinitions

newFileDiscoverer newFileDiscoverer
}

func NewS3Consumer(
s3Storage storage.ExternalStorage,
tables map[string][]string,
enableSchemaIndexByGetObject bool,
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The parameter name enableSchemaIndexByGetObject is inconsistent with the configuration option enable-list-by-file-index and the parameter name used in the NewNewFileDiscoverer factory function. For better code clarity and maintainability, consider renaming it to enableListByFileIndex.

Suggested change
enableSchemaIndexByGetObject bool,
enableListByFileIndex bool,

) *S3Consumer {
return &S3Consumer{
c := &S3Consumer{
s3Storage: s3Storage,
fileExtension: ".json",
dateSeparator: config.DateSeparatorDay.String(),
Expand All @@ -270,6 +267,8 @@ func NewS3Consumer(
tableDMLIdx: NewTableDMLIdx(),
schemaDefinitions: NewSchemaDefinitions(),
}
c.newFileDiscoverer = NewNewFileDiscoverer(c, enableSchemaIndexByGetObject)
return c
}

func (c *S3Consumer) acquireReadSlot(ctx context.Context) error {
Expand Down Expand Up @@ -331,7 +330,7 @@ func (c *S3Consumer) InitializeFromCheckpoint(
eg.SetLimit(c.tableWorkerConcurrencyLimit)
for schemaTableKey, scanRange := range scanRanges {
eg.Go(func() error {
scanVersions, err := c.downloadSchemaFilesWithScanRange(
scanVersions, err := c.newFileDiscoverer.downloadSchemaFilesWithScanRange(
egCtx, schemaTableKey.Schema, schemaTableKey.Table, scanRange.StartVersionKey, scanRange.EndVersionKey, scanRange.EndDataPath)
if err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -363,85 +362,6 @@ func (c *S3Consumer) InitializeFromCheckpoint(
return result, nil
}

func (c *S3Consumer) downloadSchemaFilesWithScanRange(
ctx context.Context,
schema, table string,
startVersionKey string,
endVersionKey string,
endDataPath string,
) ([]types.VersionKey, error) {
metaSubDir := fmt.Sprintf("%s/%s/meta/", schema, table)
opt := &storage.WalkOption{
SubDir: metaSubDir,
ObjPrefix: "schema_",
StartAfter: startVersionKey,
}

var startSchemaKey, endSchemaKey cloudstorage.SchemaPathKey
_, err := startSchemaKey.ParseSchemaFilePath(startVersionKey)
if err != nil {
return nil, errors.Trace(err)
}
_, err = endSchemaKey.ParseSchemaFilePath(endVersionKey)
if err != nil {
return nil, errors.Trace(err)
}

var scanVersions []types.VersionKey
newVersionPaths := make(map[cloudstorage.SchemaPathKey]string)
scanVersions = append(scanVersions, types.VersionKey{
Version: startSchemaKey.TableVersion,
VersionPath: startVersionKey,
})
newVersionPaths[startSchemaKey] = startVersionKey
if err := func() error {
if err := c.acquireWalkSlot(ctx); err != nil {
return errors.Trace(err)
}
defer c.releaseWalkSlot()
return c.s3Storage.WalkDir(ctx, opt, func(filePath string, size int64) error {
if endVersionKey < filePath {
return ErrWalkDirEnd
}
if !cloudstorage.IsSchemaFile(filePath) {
return nil
}
var schemaKey cloudstorage.SchemaPathKey
_, err := schemaKey.ParseSchemaFilePath(filePath)
if err != nil {
log.Error("failed to parse schema file path, skipping",
zap.String("path", filePath),
zap.Error(err))
return nil
}
if schemaKey.TableVersion > startSchemaKey.TableVersion {
if _, exists := newVersionPaths[schemaKey]; !exists {
scanVersions = append(scanVersions, types.VersionKey{
Version: schemaKey.TableVersion,
VersionPath: filePath,
})
}
newVersionPaths[schemaKey] = filePath
}
return nil
})
}(); err != nil && !errors.Is(err, ErrWalkDirEnd) {
return nil, errors.Trace(err)
}

if err := c.downloadSchemaFiles(ctx, newVersionPaths); err != nil {
return nil, errors.Trace(err)
}

c.currentTableVersion.UpdateCurrentTableVersion(schema, table, types.VersionKey{
Version: endSchemaKey.TableVersion,
VersionPath: endVersionKey,
DataPath: endDataPath,
})

return scanVersions, nil
}

// downloadDataFilesWithScanRange downloads data files for a given scan range.
// consumeFunc is called from multiple goroutines concurrently and must be goroutine-safe.
func (c *S3Consumer) downloadDataFilesWithScanRange(
Expand All @@ -458,7 +378,7 @@ func (c *S3Consumer) downloadDataFilesWithScanRange(
eg, egCtx := errgroup.WithContext(ctx)
for _, version := range scanVersions {
eg.Go(func() error {
newFiles, err := c.getNewFilesForSchemaPathKeyWithEndPath(egCtx, schema, table, version.Version, scanRange.StartDataPath, scanRange.EndDataPath)
newFiles, err := c.newFileDiscoverer.getNewFilesForSchemaPathKeyWithEndPath(egCtx, schema, table, version.Version, scanRange.StartDataPath, scanRange.EndDataPath)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -482,60 +402,6 @@ func (c *S3Consumer) downloadDataFilesWithScanRange(
return nil
}

func (c *S3Consumer) getNewFilesForSchemaPathKeyWithEndPath(
ctx context.Context,
schema, table string,
version uint64,
startDataPath string,
endDataPath string,
) (map[cloudstorage.DmlPathKey]fileIndexRange, error) {
schemaPrefix := path.Join(schema, table, fmt.Sprintf("%d", version))
opt := &storage.WalkOption{
SubDir: schemaPrefix,
StartAfter: startDataPath,
}
{
var startDmlkey cloudstorage.DmlPathKey
startFileIdx, err := startDmlkey.ParseDMLFilePath(c.dateSeparator, startDataPath)
if err != nil {
log.Error("failed to parse start dml file path, skipping",
zap.String("path", startDataPath),
zap.Error(err))
} else {
c.tableDMLIdx.UpdateDMLIdxMapByStartPath(startDmlkey, startFileIdx)
}
}

newTableDMLIdxMap := make(map[cloudstorage.DmlPathKey]fileIndexKeyMap)
if err := func() error {
if err := c.acquireWalkSlot(ctx); err != nil {
return errors.Trace(err)
}
defer c.releaseWalkSlot()
return c.s3Storage.WalkDir(ctx, opt, func(filePath string, size int64) error {
if endDataPath < filePath {
return ErrWalkDirEnd
}
// Try to parse DML file path if it matches the expected extension
if strings.HasSuffix(filePath, c.fileExtension) {
var dmlkey cloudstorage.DmlPathKey
fileIdx, err := dmlkey.ParseDMLFilePath(c.dateSeparator, filePath)
if err != nil {
log.Error("failed to parse dml file path, skipping",
zap.String("path", filePath),
zap.Error(err))
return nil
}
updateTableDMLIdxMap(newTableDMLIdxMap, dmlkey, fileIdx)
}
return nil
})
}(); err != nil && !errors.Is(err, ErrWalkDirEnd) {
return nil, errors.Trace(err)
}
return c.tableDMLIdx.DiffNewTableDMLIdxMap(newTableDMLIdxMap), nil
}

// downloadSchemaFiles downloads schema files concurrently for given schema path keys
func (c *S3Consumer) downloadSchemaFiles(
ctx context.Context,
Expand Down Expand Up @@ -584,107 +450,6 @@ func (c *S3Consumer) readAndParseSchemaFile(
return nil
}

func (c *S3Consumer) discoverAndDownloadNewTableVersions(
ctx context.Context,
schema, table string,
) ([]types.VersionKey, error) {
currentVersion := c.currentTableVersion.GetCurrentTableVersion(schema, table)
metaSubDir := fmt.Sprintf("%s/%s/meta/", schema, table)
opt := &storage.WalkOption{
SubDir: metaSubDir,
ObjPrefix: "schema_",
StartAfter: currentVersion.VersionPath,
}

var scanVersions []types.VersionKey
newVersionPaths := make(map[cloudstorage.SchemaPathKey]string)
if err := func() error {
if err := c.acquireWalkSlot(ctx); err != nil {
return errors.Trace(err)
}
defer c.releaseWalkSlot()
return c.s3Storage.WalkDir(ctx, opt, func(filePath string, size int64) error {
if !cloudstorage.IsSchemaFile(filePath) {
return nil
}
var schemaKey cloudstorage.SchemaPathKey
_, err := schemaKey.ParseSchemaFilePath(filePath)
if err != nil {
log.Error("failed to parse schema file path, skipping",
zap.String("path", filePath),
zap.Error(err))
return nil
}
version := schemaKey.TableVersion
if version > currentVersion.Version {
if _, exists := newVersionPaths[schemaKey]; !exists {
scanVersions = append(scanVersions, types.VersionKey{
Version: version,
VersionPath: filePath,
})
}
newVersionPaths[schemaKey] = filePath
}
return nil
})
}(); err != nil {
return nil, errors.Trace(err)
}

// download new version schema files concurrently
if err := c.downloadSchemaFiles(ctx, newVersionPaths); err != nil {
return nil, errors.Trace(err)
}

if currentVersion.Version > 0 {
scanVersions = append(scanVersions, currentVersion)
}
return scanVersions, nil
}

func (c *S3Consumer) getNewFilesForSchemaPathKey(
ctx context.Context,
schema, table string,
version *types.VersionKey,
) (map[cloudstorage.DmlPathKey]fileIndexRange, error) {
schemaPrefix := path.Join(schema, table, fmt.Sprintf("%d", version.Version))
opt := &storage.WalkOption{
SubDir: schemaPrefix,
StartAfter: version.DataPath,
}

newTableDMLIdxMap := make(map[cloudstorage.DmlPathKey]fileIndexKeyMap)
maxFilePath := ""
if err := func() error {
if err := c.acquireWalkSlot(ctx); err != nil {
return errors.Trace(err)
}
defer c.releaseWalkSlot()
return c.s3Storage.WalkDir(ctx, opt, func(filePath string, size int64) error {
// Try to parse DML file path if it matches the expected extension
if strings.HasSuffix(filePath, c.fileExtension) {
var dmlkey cloudstorage.DmlPathKey
fileIdx, err := dmlkey.ParseDMLFilePath(c.dateSeparator, filePath)
if err != nil {
log.Error("failed to parse dml file path, skipping",
zap.String("path", filePath),
zap.Error(err))
return nil
}
updateTableDMLIdxMap(newTableDMLIdxMap, dmlkey, fileIdx)
maxFilePath = filePath
}
return nil
})
}(); err != nil {
return nil, errors.Trace(err)
}
if len(maxFilePath) > 0 {
version.DataPath = maxFilePath
}
return c.tableDMLIdx.DiffNewTableDMLIdxMap(newTableDMLIdxMap), nil
}

func (c *S3Consumer) downloadDMLFiles(
ctx context.Context,
newFiles map[cloudstorage.DmlPathKey]fileIndexRange,
Expand Down Expand Up @@ -805,7 +570,7 @@ func (c *S3Consumer) downloadNewFilesWithVersions(
maxVersion = versionp
}
eg.Go(func() error {
newFiles, err := c.getNewFilesForSchemaPathKey(egCtx, schema, table, versionp)
newFiles, err := c.newFileDiscoverer.getNewFilesForSchemaPathKey(egCtx, schema, table, versionp)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -845,7 +610,7 @@ func (c *S3Consumer) ConsumeNewFiles(
for schema, tables := range c.tables {
for _, table := range tables {
eg.Go(func() error {
scanVersions, err := c.discoverAndDownloadNewTableVersions(egCtx, schema, table)
scanVersions, err := c.newFileDiscoverer.discoverAndDownloadNewTableVersions(egCtx, schema, table)
if err != nil {
return errors.Trace(err)
}
Expand Down
Loading
Loading