diff --git a/canal/canal.go b/canal/canal.go index 67c61ddec..4bb992915 100644 --- a/canal/canal.go +++ b/canal/canal.go @@ -54,6 +54,8 @@ type Canal struct { ctx context.Context cancel context.CancelFunc + + binFileDownloader BinlogFileDownloader } // canal will retry fetching unknown table's meta after UnknownTableRetryPeriod diff --git a/canal/local.go b/canal/local.go new file mode 100644 index 000000000..45729e292 --- /dev/null +++ b/canal/local.go @@ -0,0 +1,117 @@ +package canal + +import ( + "context" + + "github.com/go-mysql-org/go-mysql/mysql" + "github.com/go-mysql-org/go-mysql/replication" + "github.com/pingcap/errors" +) + +// BinlogFileDownloader downloads the binlog file and return the path to it. It's often used to download binlog backup from RDS service. +type BinlogFileDownloader func(mysql.Position) (localBinFilePath string, err error) + +// WithLocalBinlogDownloader registers the local bin file downloader, +// that allows download the backup binlog file from RDS service to local +func (c *Canal) WithLocalBinlogDownloader(d BinlogFileDownloader) { + c.binFileDownloader = d +} + +func (c *Canal) adaptLocalBinFileStreamer(remoteBinlogStreamer *replication.BinlogStreamer, err error) (*localBinFileAdapterStreamer, error) { + return &localBinFileAdapterStreamer{ + BinlogStreamer: remoteBinlogStreamer, + syncMasterStreamer: remoteBinlogStreamer, + canal: c, + binFileDownloader: c.binFileDownloader, + }, err +} + +// localBinFileAdapterStreamer will support to download flushed binlog file for continuous sync in cloud computing platform +type localBinFileAdapterStreamer struct { + *replication.BinlogStreamer // the running streamer, it will be localStreamer or sync master streamer + syncMasterStreamer *replication.BinlogStreamer // syncMasterStreamer is the streamer from canal startSyncer + canal *Canal + binFileDownloader BinlogFileDownloader +} + +// GetEvent will auto switch the local and remote streamer to get binlog event if possible. +func (s *localBinFileAdapterStreamer) GetEvent(ctx context.Context) (*replication.BinlogEvent, error) { + if s.binFileDownloader == nil { // not support to use local bin file + return s.BinlogStreamer.GetEvent(ctx) + } + + ev, err := s.BinlogStreamer.GetEvent(ctx) + + if err == nil { + switch ev.Event.(type) { + case *replication.RotateEvent: // RotateEvent means need to change steamer back to sync master to retry sync + s.BinlogStreamer = s.syncMasterStreamer + } + return ev, err + } + + if err == replication.ErrNeedSyncAgain { // restart master if last sync master syncer has error + s.canal.syncer.Close() + _ = s.canal.prepareSyncer() + + newStreamer, startErr := s.canal.startSyncer() + if startErr != nil { + return nil, startErr + } + // set all streamer to the new sync master streamer + s.BinlogStreamer = newStreamer + s.syncMasterStreamer = newStreamer + + ev, err = newStreamer.GetEvent(ctx) + } + + mysqlErr, ok := err.(*mysql.MyError) + // only 'Could not find first log' can create local streamer, ignore other errors + if !ok || mysqlErr.Code != mysql.ER_MASTER_FATAL_ERROR_READING_BINLOG || + mysqlErr.Message != "Could not find first log file name in binary log index file" { + return ev, err + } + + s.canal.cfg.Logger.Info("Could not find first log, try to download the local binlog for retry") + + // local binlog need next position to find binlog file and begin event + pos := s.canal.master.Position() + newStreamer := s.newLocalBinFileStreamer(s.binFileDownloader, pos) + s.BinlogStreamer = newStreamer + + return newStreamer.GetEvent(ctx) +} + +func (s *localBinFileAdapterStreamer) newLocalBinFileStreamer(download BinlogFileDownloader, position mysql.Position) *replication.BinlogStreamer { + streamer := replication.NewBinlogStreamer() + binFilePath, err := download(position) + if err != nil { + streamer.CloseWithError(errors.New("local binlog file not exist")) + return streamer + } + + go func(binFilePath string, streamer *replication.BinlogStreamer) { + beginFromHere := false + err := s.canal.syncer.GetBinlogParser().ParseFile(binFilePath, 0, func(be *replication.BinlogEvent) error { + if position.Pos < 4 { // binlog first pos is 4, if pos < 4 means canal gives error position info + return nil + } + if be.Header.LogPos == position.Pos || position.Pos == 4 { // go ahead to check if begin + beginFromHere = true + } + if beginFromHere { + if err := s.canal.syncer.StorePosAndGTID(be); err != nil { + streamer.CloseWithError(err) + return nil + } + streamer.PutEvent(be) + } + return nil + }) + if err != nil { + streamer.CloseWithError(err) + } + }(binFilePath, streamer) + + return streamer +} diff --git a/canal/sync.go b/canal/sync.go index abd835c96..7deec9463 100644 --- a/canal/sync.go +++ b/canal/sync.go @@ -35,7 +35,7 @@ func (c *Canal) startSyncer() (*replication.BinlogStreamer, error) { } func (c *Canal) runSyncBinlog() error { - s, err := c.startSyncer() + s, err := c.adaptLocalBinFileStreamer(c.startSyncer()) if err != nil { return err } diff --git a/replication/binlogstreamer.go b/replication/binlogstreamer.go index 56b8622a8..93e96f52a 100644 --- a/replication/binlogstreamer.go +++ b/replication/binlogstreamer.go @@ -92,3 +92,16 @@ func newBinlogStreamer() *BinlogStreamer { return s } + +// PutEvent puts event to BinlogStreamer +func (s *BinlogStreamer) PutEvent(ev *BinlogEvent) { + s.ch <- ev +} + +func (s *BinlogStreamer) CloseWithError(err error) { + s.closeWithError(err) +} + +func NewBinlogStreamer() *BinlogStreamer { + return newBinlogStreamer() +} diff --git a/replication/binlogsyncer.go b/replication/binlogsyncer.go index 7421e102b..08e17deef 100644 --- a/replication/binlogsyncer.go +++ b/replication/binlogsyncer.go @@ -794,6 +794,32 @@ func (b *BinlogSyncer) parseEvent(s *BinlogStreamer, data []byte) error { return errors.Trace(err) } + if err := b.StorePosAndGTID(e); err != nil { + return errors.Trace(err) + } + + needStop := false + select { + case s.ch <- e: + case <-b.ctx.Done(): + needStop = true + } + + if needACK { + err := b.replySemiSyncACK(b.nextPos) + if err != nil { + return errors.Trace(err) + } + } + + if needStop { + return errors.New("sync is been closing...") + } + + return nil +} + +func (b *BinlogSyncer) StorePosAndGTID(e *BinlogEvent) error { if e.Header.LogPos > 0 { // Some events like FormatDescriptionEvent return 0, ignore. b.nextPos.Pos = e.Header.LogPos @@ -830,7 +856,7 @@ func (b *BinlogSyncer) parseEvent(s *BinlogStreamer, data []byte) error { break } prev := b.currGset.Clone() - err = b.currGset.(*MariadbGTIDSet).AddSet(&event.GTID) + err := b.currGset.(*MariadbGTIDSet).AddSet(&event.GTID) if err != nil { return errors.Trace(err) } @@ -847,25 +873,6 @@ func (b *BinlogSyncer) parseEvent(s *BinlogStreamer, data []byte) error { event.GSet = getCurrentGtidSet() } } - - needStop := false - select { - case s.ch <- e: - case <-b.ctx.Done(): - needStop = true - } - - if needACK { - err := b.replySemiSyncACK(b.nextPos) - if err != nil { - return errors.Trace(err) - } - } - - if needStop { - return errors.New("sync is been closing...") - } - return nil } @@ -902,3 +909,7 @@ func (b *BinlogSyncer) killConnection(conn *client.Conn, id uint32) { } b.cfg.Logger.Infof("kill last connection id %d", id) } + +func (b *BinlogSyncer) GetBinlogParser() *BinlogParser { + return b.parser +}