@@ -91,47 +91,35 @@ func (r *Input) prepareCanal() (err error) {
9191
9292 addr := fmt .Sprintf ("%s:%s" , pipe .Mysql .Address , strconv .Itoa (int (pipe .Mysql .Port )))
9393 cfg := & canal.Config {
94- Addr : addr ,
95- User : pipe .Mysql .User ,
96- Password : pipe .Mysql .Password ,
97- ServerID : pipe .Mysql .ServerId ,
98- Flavor : pipe .Mysql .Flavor .YaString (),
94+ Addr : addr ,
95+ User : pipe .Mysql .User ,
96+ Password : pipe .Mysql .Password ,
97+ ServerID : pipe .Mysql .ServerId ,
98+ Flavor : pipe .Mysql .Flavor .YaString (),
99+ MaxReconnectAttempts : 3 ,
99100 }
100101 r .canal , err = canal .NewCanal (cfg )
101102 return
102103}
103104
104105func (r * Input ) runCanal () (err error ) {
106+ defer func () {
107+ if err != nil {
108+ logrus .Errorln ("canal run failed with error, " , err .Error ())
109+ }
110+ }()
105111 record , err := dao_pipe .GetRecord (r .Options .PipeName )
106112 if err != nil {
107113 return
108114 }
109115 if r .pipe .Mysql .Mode == pipeline .MODE_GTID {
110116 var canGTID mysql.GTIDSet
111- if record != nil {
112- if record .Pre != nil {
113- if record .Pre .GTIDSet != "" {
114- canGTID , err = mysql .ParseGTIDSet (r .pipe .Mysql .Flavor .YaString (), record .Pre .GTIDSet )
115- if err != nil {
116- return
117- }
118- }
119- }
120- }
121-
122- if canGTID == nil {
123- canGTID , err = r .canal .GetMasterGTIDSet ()
124- if err != nil {
117+ if record != nil && record .Pre != nil && record .Pre .GTIDSet != "" {
118+ if canGTID , err = mysql .ParseGTIDSet (r .pipe .Mysql .Flavor .YaString (), record .Pre .GTIDSet ); err != nil {
125119 return
126120 }
127- err = dao_pipe .UpdateRecord (& pipeline.RecordPosition {
128- PipelineName : r .Options .PipeName ,
129- Pre : & pipeline.Position {
130- GTIDSet : canGTID .String (),
131- PipelineName : r .Options .PipeName ,
132- },
133- })
134- if err != nil {
121+ } else {
122+ if canGTID , err = r .storeNewestGTID (); err != nil {
135123 return
136124 }
137125 }
@@ -141,41 +129,40 @@ func (r *Input) runCanal() (err error) {
141129 })
142130 //go r.canal.StartFromGTID(canGTID)
143131 go func () {
144- startErr := r .canal .StartFromGTID (canGTID )
145- if startErr != nil {
146- event .Event (event2 .NewErrorPipeline (r .Options .PipeName , "Start mysql replication error: " + startErr .Error ()))
132+ var startErr error
133+ defer func () {
134+ if startErr != nil {
135+ event .Event (event2 .NewErrorPipeline (r .Options .PipeName , "Start mysql replication error: " + startErr .Error ()))
136+ }
137+ }()
138+ if startErr = r .canal .StartFromGTID (canGTID ); startErr != nil {
139+ logrus .WithField ("mode" , "GTID" ).Errorln (startErr .Error ())
140+ errCode := mysql .ErrorCode (startErr .Error ())
141+ if errCode == 1236 && r .pipe .FixPosNewest {
142+ if canGTID , startErr = r .storeNewestGTID (); startErr != nil {
143+ return
144+ } else {
145+ event .Event (event2 .NewErrorPipeline (r .Options .PipeName , "Start mysql replication could not find first log file name in binary log index file, set current pipeline binlog postion to newest" ))
146+ }
147+ if startErr = r .canal .StartFromGTID (canGTID ); startErr != nil {
148+ return
149+ }
150+ }
147151 }
148152 }()
149153 return
150154 }
151155
152156 if r .pipe .Mysql .Mode == pipeline .MODE_POSITION {
153157 logrus .Debugln ("Run pipeline in mode position" , r .Options .PipeName )
154- var canPos * mysql.Position
155- if record != nil {
156- if record .Pre != nil {
157- canPos = & mysql.Position {
158- Name : record .Pre .BinlogFile ,
159- Pos : record .Pre .BinlogPosition ,
160- }
158+ var canPos mysql.Position
159+ if record != nil && record .Pre != nil {
160+ canPos = mysql.Position {
161+ Name : record .Pre .BinlogFile ,
162+ Pos : record .Pre .BinlogPosition ,
161163 }
162- }
163- if canPos == nil {
164- canPos = & mysql.Position {}
165- * canPos , err = r .canal .GetMasterPos ()
166- if err != nil {
167- logrus .Errorln (err )
168- return
169- }
170- err = dao_pipe .UpdateRecord (& pipeline.RecordPosition {
171- PipelineName : r .Options .PipeName ,
172- Pre : & pipeline.Position {
173- BinlogFile : canPos .Name ,
174- BinlogPosition : canPos .Pos ,
175- PipelineName : r .Options .PipeName ,
176- },
177- })
178- if err != nil {
164+ } else {
165+ if canPos , err = r .storeNewestPosition (); err != nil {
179166 return
180167 }
181168 }
@@ -186,10 +173,25 @@ func (r *Input) runCanal() (err error) {
186173 })
187174 //go r.canal.RunFrom(canPos)
188175 go func () {
189- startErr := r .canal .RunFrom (* canPos )
190- if startErr != nil {
191- fmt .Println (startErr )
192- event .Event (event2 .NewErrorPipeline (r .Options .PipeName , "Start mysql replication error: " + startErr .Error ()))
176+ var startErr error
177+ defer func () {
178+ if startErr != nil {
179+ event .Event (event2 .NewErrorPipeline (r .Options .PipeName , "Start mysql replication error: " + startErr .Error ()))
180+ }
181+ }()
182+ if startErr = r .canal .RunFrom (canPos ); startErr != nil {
183+ logrus .WithField ("mode" , "file index" ).Errorln (startErr .Error ())
184+ errCode := mysql .ErrorCode (startErr .Error ())
185+ if errCode == 1236 && r .pipe .FixPosNewest {
186+ if canPos , startErr = r .storeNewestPosition (); startErr != nil {
187+ return
188+ } else {
189+ event .Event (event2 .NewErrorPipeline (r .Options .PipeName , "Start mysql replication could not find first log file name in binary log index file, set current pipeline binlog postion to newest" ))
190+ }
191+ if startErr = r .canal .RunFrom (canPos ); startErr != nil {
192+ return
193+ }
194+ }
193195 }
194196 }()
195197 return
@@ -202,6 +204,37 @@ func (r *Input) runCanal() (err error) {
202204 return
203205}
204206
207+ func (r * Input ) storeNewestGTID () (gtidSet mysql.GTIDSet , err error ) {
208+ if gtidSet , err = r .canal .GetMasterGTIDSet (); err != nil {
209+ return
210+ }
211+ err = dao_pipe .UpdateRecord (& pipeline.RecordPosition {
212+ PipelineName : r .Options .PipeName ,
213+ Pre : & pipeline.Position {
214+ GTIDSet : gtidSet .String (),
215+ PipelineName : r .Options .PipeName ,
216+ },
217+ })
218+ return
219+ }
220+
221+ func (r * Input ) storeNewestPosition () (pos mysql.Position , err error ) {
222+ pos , err = r .canal .GetMasterPos ()
223+ if err != nil {
224+ logrus .Errorln (err )
225+ return
226+ }
227+ err = dao_pipe .UpdateRecord (& pipeline.RecordPosition {
228+ PipelineName : r .Options .PipeName ,
229+ Pre : & pipeline.Position {
230+ BinlogFile : pos .Name ,
231+ BinlogPosition : pos .Pos ,
232+ PipelineName : r .Options .PipeName ,
233+ },
234+ })
235+ return
236+ }
237+
205238// Context returns Input's context
206239func (r * Input ) Context () context.Context {
207240 return r .ctx
0 commit comments