Skip to content

Commit d4a0970

Browse files
committed
optimize gtid
1 parent 34e9440 commit d4a0970

File tree

6 files changed

+20
-22
lines changed

6 files changed

+20
-22
lines changed

dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/LogHeader.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -326,7 +326,7 @@ public void putGtid(GTIDSet gtidSet, LogEvent gtidEvent) {
326326
if (gtidSet != null) {
327327
gtidMap.put(GTID_SET_STRING, gtidSet.toString());
328328
if (gtidEvent != null && gtidEvent instanceof GtidLogEvent) {
329-
GtidLogEvent event = (GtidLogEvent)gtidEvent;
329+
GtidLogEvent event = (GtidLogEvent) gtidEvent;
330330
gtidMap.put(CURRENT_GTID_STRING, event.getGtidStr());
331331
gtidMap.put(CURRENT_GTID_SN, String.valueOf(event.getSequenceNumber()));
332332
gtidMap.put(CURRENT_GTID_LAST_COMMIT, String.valueOf(event.getLastCommitted()));

driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/packets/MariaGTIDSet.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,21 @@
11
package com.alibaba.otter.canal.parse.driver.mysql.packets;
22

3-
import org.apache.commons.lang.StringUtils;
4-
53
import java.io.IOException;
64
import java.util.HashMap;
75
import java.util.Map;
86

7+
import org.apache.commons.lang.StringUtils;
8+
99
/**
1010
* 类 MariaGTIDSet.java 的实现
1111
*
1212
* @author winger 2020/9/24 10:31 上午
1313
* @version 1.0.0
1414
*/
1515
public class MariaGTIDSet implements GTIDSet {
16+
1617
//MariaDB 10.0.2+ representation of Gtid
17-
Map<Long, MariaGtid> gtidMap = new HashMap<>();
18+
private Map<Long, MariaGtid> gtidMap = new HashMap<>();
1819

1920
@Override
2021
public byte[] encode() throws IOException {

driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/utils/GtidUtil.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,8 @@ public class GtidUtil {
1515
public static GTIDSet parseGtidSet(String gtid, boolean isMariaDB) {
1616
if (isMariaDB) {
1717
return MariaGTIDSet.parse(gtid);
18+
} else {
19+
return MysqlGTIDSet.parse(gtid);
1820
}
19-
return MysqlGTIDSet.parse(gtid);
2021
}
2122
}

parse/src/main/java/com/alibaba/otter/canal/parse/inbound/AbstractEventParser.java

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,12 @@
11
package com.alibaba.otter.canal.parse.inbound;
22

3+
import static com.alibaba.otter.canal.parse.driver.mysql.utils.GtidUtil.parseGtidSet;
4+
35
import java.io.IOException;
4-
import java.util.Arrays;
5-
import java.util.HashMap;
6-
import java.util.List;
7-
import java.util.Map;
8-
import java.util.Timer;
9-
import java.util.TimerTask;
6+
import java.util.*;
107
import java.util.concurrent.atomic.AtomicBoolean;
118
import java.util.concurrent.atomic.AtomicLong;
129

13-
import com.alibaba.otter.canal.parse.inbound.mysql.MysqlConnection;
1410
import org.apache.commons.lang.StringUtils;
1511
import org.apache.commons.lang.exception.ExceptionUtils;
1612
import org.apache.commons.lang.math.RandomUtils;
@@ -25,7 +21,7 @@
2521
import com.alibaba.otter.canal.parse.driver.mysql.packets.GTIDSet;
2622
import com.alibaba.otter.canal.parse.exception.CanalParseException;
2723
import com.alibaba.otter.canal.parse.exception.PositionNotFoundException;
28-
import com.taobao.tddl.dbsync.binlog.exception.TableIdNotFoundException;
24+
import com.alibaba.otter.canal.parse.inbound.mysql.MysqlConnection;
2925
import com.alibaba.otter.canal.parse.inbound.mysql.MysqlMultiStageCoprocessor;
3026
import com.alibaba.otter.canal.parse.index.CanalLogPositionManager;
3127
import com.alibaba.otter.canal.parse.support.AuthenticationInfo;
@@ -38,8 +34,7 @@
3834
import com.alibaba.otter.canal.protocol.position.LogPosition;
3935
import com.alibaba.otter.canal.sink.CanalEventSink;
4036
import com.alibaba.otter.canal.sink.exception.CanalSinkException;
41-
42-
import static com.alibaba.otter.canal.parse.driver.mysql.utils.GtidUtil.parseGtidSet;
37+
import com.taobao.tddl.dbsync.binlog.exception.TableIdNotFoundException;
4338

4439
/**
4540
* 抽象的EventParser, 最大化共用mysql/oracle版本的实现
@@ -183,7 +178,7 @@ public void run() {
183178
}
184179

185180
if (erosaConnection instanceof MysqlConnection) {
186-
isMariaDB = ((MysqlConnection)erosaConnection).isMariaDB();
181+
isMariaDB = ((MysqlConnection) erosaConnection).isMariaDB();
187182
}
188183
// 4. 获取最后的位置信息
189184
long start = System.currentTimeMillis();
@@ -249,7 +244,7 @@ public boolean sink(EVENT event) {
249244
multiStageCoprocessor = buildMultiStageCoprocessor();
250245
if (isGTIDMode() && StringUtils.isNotEmpty(startPosition.getGtid())) {
251246
// 判断所属instance是否启用GTID模式,是的话调用ErosaConnection中GTID对应方法dump数据
252-
GTIDSet gtidSet = parseGtidSet(startPosition.getGtid(),isMariaDB);
247+
GTIDSet gtidSet = parseGtidSet(startPosition.getGtid(), isMariaDB);
253248
((MysqlMultiStageCoprocessor) multiStageCoprocessor).setGtidSet(gtidSet);
254249
multiStageCoprocessor.start();
255250
erosaConnection.dump(gtidSet, multiStageCoprocessor);

parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlConnection.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -350,9 +350,9 @@ public void sendSemiAck(String binlogfilename, Long binlogPosition) throws IOExc
350350
private void sendBinlogDumpGTID(GTIDSet gtidSet) throws IOException {
351351
if (isMariaDB()) {
352352
sendMariaBinlogDumpGTID(gtidSet);
353-
return;
353+
} else {
354+
sendMySQLBinlogDumpGTID(gtidSet);
354355
}
355-
sendMySQLBinlogDumpGTID(gtidSet);
356356
}
357357

358358
private void sendMySQLBinlogDumpGTID(GTIDSet gtidSet) throws IOException {

parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlEventParser.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -343,7 +343,7 @@ private final long generateUniqueServerId() {
343343

344344
protected EntryPosition findStartPosition(ErosaConnection connection) throws IOException {
345345
if (isGTIDMode()) {
346-
// GTID模式下,CanalLogPositionManager里取最后的gtid,没有则取instanc配置中的
346+
// GTID模式下,CanalLogPositionManager里取最后的gtid,没有则取instance配置中的
347347
LogPosition logPosition = getLogPositionManager().getLatestIndexBy(destination);
348348
if (logPosition != null) {
349349
// 如果以前是非GTID模式,后来调整为了GTID模式,那么为了保持兼容,需要判断gtid是否为空
@@ -381,7 +381,7 @@ protected EntryPosition findEndPosition(ErosaConnection connection) throws IOExc
381381
protected EntryPosition findEndPositionWithMasterIdAndTimestamp(MysqlConnection connection) {
382382
MysqlConnection mysqlConnection = (MysqlConnection) connection;
383383
final EntryPosition endPosition = findEndPosition(mysqlConnection);
384-
if (tableMetaTSDB != null) {
384+
if (tableMetaTSDB != null || isGTIDMode()) {
385385
long startTimestamp = System.currentTimeMillis();
386386
return findAsPerTimestampInSpecificLogFile(mysqlConnection,
387387
startTimestamp,
@@ -426,7 +426,8 @@ protected EntryPosition findStartPositionInternal(ErosaConnection connection) {
426426
}
427427

428428
if (entryPosition == null) {
429-
entryPosition = findEndPositionWithMasterIdAndTimestamp(mysqlConnection); // 默认从当前最后一个位置进行消费
429+
entryPosition =
430+
findEndPositionWithMasterIdAndTimestamp(mysqlConnection); // 默认从当前最后一个位置进行消费
430431
}
431432

432433
// 判断一下是否需要按时间订阅

0 commit comments

Comments
 (0)