Skip to content

Commit 402ef56

Browse files
committed
DM: Get DB flavor from source instead of guessing it
1 parent d105a2b commit 402ef56

File tree

3 files changed

+32
-5
lines changed

3 files changed

+32
-5
lines changed

dm/loader/util.go

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323

2424
"github.com/pingcap/failpoint"
2525
"github.com/pingcap/tiflow/dm/config"
26+
"github.com/pingcap/tiflow/dm/pkg/conn"
2627
"github.com/pingcap/tiflow/dm/pkg/dumpling"
2728
"github.com/pingcap/tiflow/dm/pkg/ha"
2829
"github.com/pingcap/tiflow/dm/pkg/log"
@@ -45,14 +46,26 @@ func percent(a int64, b int64, finish bool) string {
4546
}
4647

4748
func getMydumpMetadata(ctx context.Context, cli *clientv3.Client, cfg *config.SubTaskConfig, workerName string) (string, string, error) {
49+
flavor := ""
50+
baseDB, err := conn.GetUpstreamDB(&cfg.From)
51+
if err != nil {
52+
log.L().Warn("set up db connect failed", zap.Any("db", cfg.From), zap.Error(err))
53+
} else {
54+
flavor, err = conn.GetFlavor(ctx, baseDB)
55+
if err != nil {
56+
log.L().Warn("failed to get database flavor", zap.Any("db", cfg.From), zap.Error(err))
57+
}
58+
baseDB.Close()
59+
}
60+
4861
metafile := "metadata"
4962
failpoint.Inject("TestRemoveMetaFile", func() {
5063
err := storage.RemoveAll(ctx, cfg.LoaderConfig.Dir, nil)
5164
if err != nil {
5265
log.L().Warn("TestRemoveMetaFile Error", log.ShortError(err))
5366
}
5467
})
55-
loc, _, err := dumpling.ParseMetaData(ctx, cfg.LoaderConfig.Dir, metafile, cfg.ExtStorage)
68+
loc, _, err := dumpling.ParseMetaData(ctx, cfg.LoaderConfig.Dir, metafile, cfg.ExtStorage, flavor)
5669
if err == nil {
5770
return loc.Position.String(), loc.GTIDSetStr(), nil
5871
}

dm/pkg/dumpling/utils.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,18 +48,19 @@ func ParseMetaData(
4848
dir string,
4949
filename string,
5050
extStorage brstorage.ExternalStorage,
51+
flavor string,
5152
) (*binlog.Location, *binlog.Location, error) {
5253
fd, err := storage.OpenFile(ctx, dir, filename, extStorage)
5354
if err != nil {
5455
return nil, nil, err
5556
}
5657
defer fd.Close()
5758

58-
return ParseMetaDataByReader(filename, fd)
59+
return ParseMetaDataByReader(filename, fd, flavor)
5960
}
6061

6162
// ParseMetaDataByReader parses mydumper's output meta file by created reader and returns binlog location.
62-
func ParseMetaDataByReader(filename string, rd io.Reader) (*binlog.Location, *binlog.Location, error) {
63+
func ParseMetaDataByReader(filename string, rd io.Reader, flavor string) (*binlog.Location, *binlog.Location, error) {
6364
invalidErr := fmt.Errorf("file %s invalid format", filename)
6465

6566
var (
@@ -156,7 +157,7 @@ func ParseMetaDataByReader(filename string, rd io.Reader) (*binlog.Location, *bi
156157
return nil, nil, terror.ErrMetadataNoBinlogLoc.Generate(filename)
157158
}
158159

159-
gset, err := gtid.ParserGTID("", gtidStr)
160+
gset, err := gtid.ParserGTID(flavor, gtidStr)
160161
if err != nil {
161162
return nil, nil, invalidErr
162163
}

dm/syncer/checkpoint.go

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1302,7 +1302,20 @@ func (cp *RemoteCheckPoint) genUpdateSQL(cpSchema, cpTable string, location binl
13021302
func (cp *RemoteCheckPoint) parseMetaData(ctx context.Context) (*binlog.Location, *binlog.Location, error) {
13031303
// `metadata` is mydumper's output meta file name
13041304
filename := "metadata"
1305-
loc, loc2, err := dumpling.ParseMetaData(ctx, cp.cfg.LoaderConfig.Dir, filename, cp.cfg.ExtStorage)
1305+
1306+
flavor := ""
1307+
baseDB, err := conn.GetUpstreamDB(&cp.cfg.From)
1308+
if err != nil {
1309+
log.L().Warn("set up db connect failed", zap.Any("db", cp.cfg.From), zap.Error(err))
1310+
} else {
1311+
flavor, err = conn.GetFlavor(ctx, baseDB)
1312+
if err != nil {
1313+
log.L().Warn("failed to get database flavor", zap.Any("db", cp.cfg.From), zap.Error(err))
1314+
}
1315+
baseDB.Close()
1316+
}
1317+
1318+
loc, loc2, err := dumpling.ParseMetaData(ctx, cp.cfg.LoaderConfig.Dir, filename, cp.cfg.ExtStorage, flavor)
13061319
if err != nil {
13071320
toPrint, err2 := storage.ReadFile(ctx, cp.cfg.LoaderConfig.Dir, filename, nil)
13081321
if err2 != nil {

0 commit comments

Comments
 (0)