@@ -18,6 +18,12 @@ import (
1818 "time"
1919)
2020
21+ const (
22+ START_POS_BY_NONE uint8 = iota
23+ START_POS_BY_POS
24+ START_POS_BY_TIME
25+ )
26+
2127type Creator struct {
2228 CC * config.CreateConfig
2329 DBC * config.DBConfig
@@ -26,8 +32,11 @@ type Creator struct {
2632 StartPosition * models.Position
2733 EndPosition * models.Position
2834 CurrentPosition * models.Position
35+ GetStartPosType uint8
2936 CurrentTimestamp uint32
3037 CurrentThreadID uint32
38+ StartTime time.Time
39+ StartTimestamp uint32
3140 HaveEndPosition bool
3241 EndTime time.Time
3342 HaveEndTime bool
@@ -62,6 +71,23 @@ func NewFlashback(sc *config.CreateConfig, dbc *config.DBConfig, mTables []*visi
6271 if err != nil {
6372 return nil , err
6473 }
74+
75+ // 获取获取位点的类型, 是通过 位点获取, 还是通过时间
76+ if ct .CC .HaveStartPosInfo () { // 通过指定位点获取开始位点
77+ ct .GetStartPosType = START_POS_BY_POS
78+ seelog .Infof ("解析binglog开始位点通过指定 开始binlog 位点获取. 开始位点: %s" , ct .StartPosition .String ())
79+ } else if ct .CC .HaveStartTime () { // 通过时间获取开始位点
80+ ct .GetStartPosType = START_POS_BY_TIME
81+ seelog .Infof ("解析binglog开始位点通过指定 开始时间 获取. 开始位点: %s" , ct .StartPosition .String ())
82+ ct .StartTime , err = utils .NewTime (ct .CC .StartTime )
83+ if err != nil {
84+ return nil , fmt .Errorf ("输入的开始时间有问题. %v" , err )
85+ }
86+ ct .StartTimestamp = uint32 (ct .StartTime .Unix ())
87+ } else {
88+ return nil , fmt .Errorf ("无法获取" )
89+ }
90+
6591 // 原sql文件
6692 fileName := ct .getSqlFileName ("origin_sql" )
6793 ct .OriSQLFile = fmt .Sprintf ("%s/%s" , ct .CC .GetSaveDir (), fileName )
@@ -202,6 +228,13 @@ func (this *Creator) runProduceEvent(wg *sync.WaitGroup) {
202228 defer wg .Done ()
203229 defer this .Syncer .Close ()
204230
231+ // 判断是否需要跳过, 位点
232+ var isSkip bool
233+ if this .GetStartPosType == START_POS_BY_TIME { // 如果开始位点是通过时间获取的就需要执行跳过
234+ isSkip = true
235+ }
236+ seelog .Debugf ("是否是需要跳过事件: %v" , isSkip )
237+
205238 pos := mysql.Position {this .StartPosition .File , this .StartPosition .Position }
206239 streamer , err := this .Syncer .StartSync (pos )
207240 if err != nil {
@@ -222,6 +255,19 @@ produceLoop:
222255 seelog .Error (err .Error ())
223256 this .quit ()
224257 }
258+
259+ // 过去掉还没到开始时间的事件
260+ if isSkip {
261+ // 判断是否到了开始时间
262+ if ev .Header .Timestamp < this .StartTimestamp {
263+ continue
264+ } else {
265+ isSkip = false
266+ seelog .Infof ("停止跳过, 开始生成回滚sql. 时间戳: %d, 时间: %s, 位点: %s:%d" , ev .Header .Timestamp ,
267+ utils .TS2String (int64 (ev .Header .Timestamp ), utils .TIME_FORMAT ), this .StartPosition .File , ev .Header .LogPos )
268+ }
269+ }
270+
225271 if err = this .handleEvent (ev ); err != nil {
226272 seelog .Error (err .Error ())
227273 this .quit ()
0 commit comments