Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion dm/loader/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/pingcap/failpoint"
"github.com/pingcap/tiflow/dm/config"
"github.com/pingcap/tiflow/dm/pkg/conn"
"github.com/pingcap/tiflow/dm/pkg/dumpling"
"github.com/pingcap/tiflow/dm/pkg/ha"
"github.com/pingcap/tiflow/dm/pkg/log"
Expand All @@ -45,14 +46,26 @@ func percent(a int64, b int64, finish bool) string {
}

func getMydumpMetadata(ctx context.Context, cli *clientv3.Client, cfg *config.SubTaskConfig, workerName string) (string, string, error) {
flavor := ""
baseDB, err := conn.GetUpstreamDB(&cfg.From)
if err != nil {
log.L().Warn("set up db connect failed", zap.Any("db", cfg.From), zap.Error(err))
} else {
flavor, err = conn.GetFlavor(ctx, baseDB)
if err != nil {
log.L().Warn("failed to get database flavor", zap.Any("db", cfg.From), zap.Error(err))
}
baseDB.Close()
}

metafile := "metadata"
failpoint.Inject("TestRemoveMetaFile", func() {
err := storage.RemoveAll(ctx, cfg.LoaderConfig.Dir, nil)
if err != nil {
log.L().Warn("TestRemoveMetaFile Error", log.ShortError(err))
}
})
loc, _, err := dumpling.ParseMetaData(ctx, cfg.LoaderConfig.Dir, metafile, cfg.ExtStorage)
loc, _, err := dumpling.ParseMetaData(ctx, cfg.LoaderConfig.Dir, metafile, cfg.ExtStorage, flavor)
if err == nil {
return loc.Position.String(), loc.GTIDSetStr(), nil
}
Expand Down
7 changes: 4 additions & 3 deletions dm/pkg/dumpling/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,18 +48,19 @@ func ParseMetaData(
dir string,
filename string,
extStorage brstorage.ExternalStorage,
flavor string,
) (*binlog.Location, *binlog.Location, error) {
fd, err := storage.OpenFile(ctx, dir, filename, extStorage)
if err != nil {
return nil, nil, err
}
defer fd.Close()

return ParseMetaDataByReader(filename, fd)
return ParseMetaDataByReader(filename, fd, flavor)
}

// ParseMetaDataByReader parses mydumper's output meta file by created reader and returns binlog location.
func ParseMetaDataByReader(filename string, rd io.Reader) (*binlog.Location, *binlog.Location, error) {
func ParseMetaDataByReader(filename string, rd io.Reader, flavor string) (*binlog.Location, *binlog.Location, error) {
invalidErr := fmt.Errorf("file %s invalid format", filename)

var (
Expand Down Expand Up @@ -156,7 +157,7 @@ func ParseMetaDataByReader(filename string, rd io.Reader) (*binlog.Location, *bi
return nil, nil, terror.ErrMetadataNoBinlogLoc.Generate(filename)
}

gset, err := gtid.ParserGTID("", gtidStr)
gset, err := gtid.ParserGTID(flavor, gtidStr)
if err != nil {
return nil, nil, invalidErr
}
Expand Down
4 changes: 2 additions & 2 deletions dm/pkg/dumpling/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ Finished dump at: 2020-09-30 12:16:49
for _, tc := range testCases {
err2 := os.WriteFile(f.Name(), []byte(tc.source), 0o644)
require.NoError(t, err2)
loc, loc2, err2 := ParseMetaData(ctx, fdir, fname, nil)
loc, loc2, err2 := ParseMetaData(ctx, fdir, fname, nil, "mysql")
require.NoError(t, err2)
require.Equal(t, tc.pos, loc.Position)
gs, _ := gtid.ParserGTID("mysql", tc.gsetStr)
Expand All @@ -274,7 +274,7 @@ Finished dump at: 2020-12-02 17:13:56
`
err = os.WriteFile(f.Name(), []byte(noBinlogLoc), 0o644)
require.NoError(t, err)
_, _, err = ParseMetaData(ctx, fdir, fname, nil)
_, _, err = ParseMetaData(ctx, fdir, fname, nil, "mysql")
require.True(t, terror.ErrMetadataNoBinlogLoc.Equal(err))
}

Expand Down
15 changes: 14 additions & 1 deletion dm/syncer/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -1302,7 +1302,20 @@ func (cp *RemoteCheckPoint) genUpdateSQL(cpSchema, cpTable string, location binl
func (cp *RemoteCheckPoint) parseMetaData(ctx context.Context) (*binlog.Location, *binlog.Location, error) {
// `metadata` is mydumper's output meta file name
filename := "metadata"
loc, loc2, err := dumpling.ParseMetaData(ctx, cp.cfg.LoaderConfig.Dir, filename, cp.cfg.ExtStorage)

flavor := ""
baseDB, err := conn.GetUpstreamDB(&cp.cfg.From)
if err != nil {
log.L().Warn("set up db connect failed", zap.Any("db", cp.cfg.From), zap.Error(err))
} else {
flavor, err = conn.GetFlavor(ctx, baseDB)
if err != nil {
log.L().Warn("failed to get database flavor", zap.Any("db", cp.cfg.From), zap.Error(err))
}
baseDB.Close()
}

loc, loc2, err := dumpling.ParseMetaData(ctx, cp.cfg.LoaderConfig.Dir, filename, cp.cfg.ExtStorage, flavor)
if err != nil {
toPrint, err2 := storage.ReadFile(ctx, cp.cfg.LoaderConfig.Dir, filename, nil)
if err2 != nil {
Expand Down