Skip to content

Commit 51bfbfd

Browse files
authored
Merge pull request #9254 from dolthub/fulghum/mariadb-gtid
Add support for MariaDB's GTID format
2 parents b9b5176 + 834405a commit 51bfbfd

File tree

2 files changed

+42
-19
lines changed

2 files changed

+42
-19
lines changed

go/libraries/doltcore/sqle/binlogreplication/binlog_position_store.go

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import (
3131
const binlogPositionDirectory = ".doltcfg"
3232
const binlogPositionFilename = "binlog-position"
3333
const mysqlFlavor = "MySQL56"
34+
const mariadbFlavor = "MariaDB"
3435

3536
// binlogPositionStore manages loading and saving data to the binlog position file stored on disk. This provides
3637
// durable storage for the set of GTIDs that have been successfully executed on the replica, so that the replica
@@ -70,13 +71,19 @@ func (store *binlogPositionStore) Load(filesys filesys.Filesys) (*mysql.Position
7071
}
7172
positionString := string(bytes)
7273

73-
// Strip off the "MySQL56/" prefix
74-
prefix := "MySQL56/"
75-
if strings.HasPrefix(positionString, prefix) {
76-
positionString = string(bytes[len(prefix):])
74+
// Determine the flavor
75+
flavor := ""
76+
if strings.HasPrefix(positionString, mysqlFlavor) {
77+
positionString = string(bytes[len(mysqlFlavor)+1:])
78+
flavor = mysqlFlavor
79+
} else if strings.HasPrefix(positionString, mariadbFlavor) {
80+
positionString = string(bytes[len(mariadbFlavor)+1:])
81+
flavor = mariadbFlavor
82+
} else {
83+
return nil, fmt.Errorf("unknown binlog position flavor: %s", positionString)
7784
}
7885

79-
position, err := mysql.ParsePosition(mysqlFlavor, positionString)
86+
position, err := mysql.ParsePosition(flavor, positionString)
8087
if err != nil {
8188
return nil, err
8289
}

go/libraries/doltcore/sqle/binlogreplication/binlog_replica_applier.go

Lines changed: 30 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -235,13 +235,15 @@ func (a *binlogReplicaApplier) startReplicationEventStream(ctx *sql.Context, con
235235
return err
236236
}
237237

238-
if position == nil {
238+
if position != nil {
239+
ctx.GetLogger().Debugf("Loaded position from position store: %s", position.String())
240+
} else {
239241
// If the positionStore doesn't have a record of executed GTIDs, check to see if the gtid_purged system
240242
// variable is set. If it holds a GTIDSet, then we use that as our starting position. As part of loading
241243
// a mysqldump onto a replica, gtid_purged will be set to indicate where to start replication.
242244
_, value, ok := sql.SystemVariables.GetGlobal("gtid_purged")
243245
gtidPurged, isString := value.(string)
244-
if ok && value != nil && isString {
246+
if ok && value != nil && isString && gtidPurged != "" {
245247
// Starting in MySQL 8.0, when setting the GTID_PURGED sys variable, if the new value starts with '+', then
246248
// the specified GTID Set value is added to the current GTID Set value to get a new GTID Set that contains
247249
// all the previous GTIDs, plus the new ones from the current assignment. Dolt doesn't support this
@@ -253,25 +255,31 @@ func (a *binlogReplicaApplier) startReplicationEventStream(ctx *sql.Context, con
253255
gtidPurged = gtidPurged[1:]
254256
}
255257

256-
purged, err := mysql.ParsePosition(mysqlFlavor, gtidPurged)
258+
flavor := mysqlFlavor
259+
if conn.IsMariaDB() {
260+
flavor = mariadbFlavor
261+
}
262+
purged, err := mysql.ParsePosition(flavor, gtidPurged)
257263
if err != nil {
264+
ctx.GetLogger().Errorf("unable to parse gtid_purged: %s", err.Error())
258265
return err
259266
}
260-
position = &purged
267+
268+
if !purged.IsZero() {
269+
position = &purged
270+
}
261271
}
262272
}
263273

274+
// If we don't have a position that we previously stored, then initialize an empty position.
264275
if position == nil {
265-
// If we still don't have any record of executed GTIDs, we create a GTIDSet with just one transaction ID
266-
// for the 0000 server ID. There doesn't seem to be a cleaner way of saying "start at the very beginning".
267-
//
268-
// Also... "starting position" is a bit of a misnomer – it's actually the processed GTIDs, which
269-
// indicate the NEXT GTID where replication should start, but it's not as direct as specifying
270-
// a starting position, like the Vitess function signature seems to suggest.
271-
gtid := mysql.Mysql56GTID{
272-
Sequence: 1,
276+
if conn.IsMariaDB() {
277+
ctx.GetLogger().Infof("Initializing empty GTID set (for MariaDB)")
278+
position = &mysql.Position{GTIDSet: mysql.MariadbGTIDSet{}}
279+
} else {
280+
ctx.GetLogger().Infof("Initializing empty GTID set")
281+
position = &mysql.Position{GTIDSet: mysql.Mysql56GTIDSet{}}
273282
}
274-
position = &mysql.Position{GTIDSet: gtid.GTIDSet()}
275283
}
276284

277285
a.currentPosition = position
@@ -485,7 +493,7 @@ func (a *binlogReplicaApplier) processBinlogEvent(ctx *sql.Context, engine *gms.
485493
return err
486494
}
487495
if isBegin {
488-
ctx.GetLogger().Errorf("unsupported binlog protocol message: GTID event with 'isBegin' set to true")
496+
ctx.GetLogger().Warnf("unsupported binlog protocol message: GTID event with 'isBegin' set to true")
489497
}
490498
ctx.GetLogger().WithFields(logrus.Fields{
491499
"gtid": gtid,
@@ -550,6 +558,14 @@ func (a *binlogReplicaApplier) processBinlogEvent(ctx *sql.Context, engine *gms.
550558
return err
551559
}
552560

561+
case event.Bytes()[4] == 0xA1:
562+
// https://mariadb.com/kb/en/binlog_checkpoint_event/
563+
ctx.GetLogger().Warnf("received unsupported event: CHECKPOINT_EVENT")
564+
565+
case event.Bytes()[4] == 0xA3:
566+
// https://mariadb.com/kb/en/gtid_list_event/
567+
ctx.GetLogger().Warnf("received unsupported event: GTID_LIST_EVENT")
568+
553569
default:
554570
// We can't access the bytes directly because these non-interface types in Vitess are not exposed.
555571
// Having a Bytes() or Type() method on the Vitess interface would let us clean this up.

0 commit comments

Comments
 (0)