Skip to content

Commit 9a33e98

Browse files
committed
feat: mysql-connector remove debezium
1 parent dc5a9ee commit 9a33e98

File tree

6 files changed

+259
-82
lines changed

6 files changed

+259
-82
lines changed

connectors-common/mysql-core/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,12 @@
123123
<artifactId>fastjson</artifactId>
124124
<version>1.2.83</version>
125125
</dependency>
126+
<dependency>
127+
<groupId>io.tapdata</groupId>
128+
<artifactId>tapdata-common</artifactId>
129+
<version>0.2.23-SNAPSHOT</version>
130+
<scope>provided</scope>
131+
</dependency>
126132
<dependency>
127133
<groupId>org.mockito</groupId>
128134
<artifactId>mockito-core</artifactId>
Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
package io.tapdata.connector.mysql;
2+
3+
import com.github.shyiko.mysql.binlog.BinaryLogClient;
4+
import com.github.shyiko.mysql.binlog.event.Event;
5+
import io.tapdata.common.concurrent.ConcurrentProcessor;
6+
import io.tapdata.common.concurrent.TapExecutors;
7+
import io.tapdata.connector.mysql.config.MysqlConfig;
8+
import io.tapdata.connector.mysql.entity.MysqlBinlogPosition;
9+
import io.tapdata.entity.event.TapEvent;
10+
import io.tapdata.entity.logger.Log;
11+
import io.tapdata.entity.schema.TapTable;
12+
import io.tapdata.entity.utils.cache.KVReadOnlyMap;
13+
import io.tapdata.kit.EmptyKit;
14+
import io.tapdata.pdk.apis.consumer.StreamReadConsumer;
15+
16+
import java.util.ArrayList;
17+
import java.util.List;
18+
import java.util.Random;
19+
import java.util.concurrent.TimeUnit;
20+
import java.util.concurrent.atomic.AtomicReference;
21+
import java.util.function.Supplier;
22+
23+
import static io.tapdata.base.ConnectorBase.list;
24+
25+
public class MysqlReaderV2 {
26+
27+
private MysqlJdbcContextV2 mysqlJdbcContext;
28+
private MysqlConfig mysqlConfig;
29+
private String connectorId;
30+
private Log tapLogger;
31+
private List<String> tableList;
32+
private KVReadOnlyMap<TapTable> tableMap;
33+
private Object offsetState;
34+
private int recordSize;
35+
private StreamReadConsumer consumer;
36+
37+
public MysqlReaderV2(MysqlJdbcContextV2 mysqlJdbcContext, String connectorId, Log tapLogger) {
38+
this.mysqlJdbcContext = mysqlJdbcContext;
39+
this.connectorId = connectorId;
40+
this.tapLogger = tapLogger;
41+
mysqlConfig = (MysqlConfig) mysqlJdbcContext.getConfig();
42+
}
43+
44+
public void init(List<String> tableList, KVReadOnlyMap<TapTable> tableMap, Object offsetState, int recordSize, StreamReadConsumer consumer) throws Throwable {
45+
this.tableList = tableList;
46+
this.tableMap = tableMap;
47+
this.offsetState = offsetState;
48+
this.recordSize = recordSize;
49+
this.consumer = consumer;
50+
}
51+
52+
public void startMiner(Supplier<Boolean> isAlive) throws Throwable {
53+
BinaryLogClient client = new BinaryLogClient(mysqlConfig.getHost(), mysqlConfig.getPort(), mysqlConfig.getUser(), mysqlConfig.getPassword());
54+
AtomicReference<List<TapEvent>> events = new AtomicReference<>(list());
55+
try (ConcurrentProcessor<Event, OffsetEvent> concurrentProcessor = TapExecutors.createSimple(8, 32, "MysqlReader-Processor")) {
56+
client.setServerId(randomServerId());
57+
client.registerEventListener(event -> concurrentProcessor.runAsync(event, this::emit));
58+
consumer.streamReadStarted();
59+
client.connect();
60+
MysqlBinlogPosition lastOffset;
61+
while (isAlive.get()) {
62+
OffsetEvent offsetEvent = concurrentProcessor.get(2, TimeUnit.SECONDS);
63+
if (EmptyKit.isNotNull(offsetEvent)) {
64+
events.get().add(offsetEvent.getTapEvent());
65+
lastOffset = offsetEvent.getMysqlBinlogPosition();
66+
if (events.get().size() >= recordSize) {
67+
consumer.accept(events.get(), lastOffset);
68+
events.set(new ArrayList<>());
69+
}
70+
} else {
71+
if (!events.get().isEmpty()) {
72+
consumer.accept(events.get(), offsetState);
73+
events.set(list());
74+
}
75+
}
76+
}
77+
} finally {
78+
consumer.streamReadEnded();
79+
client.disconnect();
80+
}
81+
}
82+
83+
private OffsetEvent emit(Event event) {
84+
tapLogger.info("Emitting event {}", event);
85+
return null;
86+
}
87+
88+
public int randomServerId() {
89+
int lowestServerId = 5400;
90+
int highestServerId = Integer.MAX_VALUE;
91+
return lowestServerId + new Random().nextInt(highestServerId - lowestServerId);
92+
}
93+
94+
static class OffsetEvent {
95+
96+
private TapEvent tapEvent;
97+
private MysqlBinlogPosition mysqlBinlogPosition;
98+
99+
public OffsetEvent(TapEvent tapEvent, MysqlBinlogPosition mysqlBinlogPosition) {
100+
this.tapEvent = tapEvent;
101+
this.mysqlBinlogPosition = mysqlBinlogPosition;
102+
}
103+
104+
public TapEvent getTapEvent() {
105+
return tapEvent;
106+
}
107+
108+
public MysqlBinlogPosition getMysqlBinlogPosition() {
109+
return mysqlBinlogPosition;
110+
}
111+
112+
public void setTapEvent(TapEvent tapEvent) {
113+
this.tapEvent = tapEvent;
114+
}
115+
116+
public void setMysqlBinlogPosition(MysqlBinlogPosition mysqlBinlogPosition) {
117+
this.mysqlBinlogPosition = mysqlBinlogPosition;
118+
}
119+
}
120+
}

connectors-common/mysql-core/src/main/java/io/tapdata/connector/mysql/config/MysqlConfig.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,7 @@ public void generateSSlFile() throws IOException, InterruptedException {
162162
private ArrayList<LinkedHashMap<String, Object>> availableMasterSlaveAddress;
163163
private LinkedHashMap<String, Object> masterNode;
164164
private Integer maximumQueueSize = 800;
165+
private Boolean highPerformance;
165166

166167
public String getDeploymentMode() {
167168
return deploymentMode;
@@ -203,4 +204,12 @@ public void setMaximumQueueSize(Integer maximumQueueSize) {
203204
this.maximumQueueSize = maximumQueueSize;
204205
}
205206

207+
public Boolean getHighPerformance() {
208+
return highPerformance;
209+
}
210+
211+
public void setHighPerformance(Boolean highPerformance) {
212+
this.highPerformance = highPerformance;
213+
}
214+
206215
}

connectors/mysql-connector/src/main/java/io/tapdata/connector/mysql/MysqlConnector.java

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import io.tapdata.connector.mysql.dml.MysqlWriteRecorder;
1616
import io.tapdata.connector.mysql.dml.sqlmaker.MysqlSqlMaker;
1717
import io.tapdata.connector.mysql.entity.MysqlBinlogPosition;
18+
import io.tapdata.connector.mysql.util.MysqlBinlogPositionUtil;
1819
import io.tapdata.connector.mysql.util.MysqlUtil;
1920
import io.tapdata.connector.mysql.writer.MysqlSqlBatchWriter;
2021
import io.tapdata.connector.mysql.writer.MysqlWriter;
@@ -848,7 +849,13 @@ protected Set<String> dateFields(TapTable tapTable) {
848849

849850
private void streamRead(TapConnectorContext tapConnectorContext, List<String> tables, Object offset, int batchSize, StreamReadConsumer consumer) throws Throwable {
850851
throwNonSupportWhenLightInit();
851-
mysqlReader.readBinlog(tapConnectorContext, tables, offset, batchSize, DDLParserType.MYSQL_CCJ_SQL_PARSER, consumer, contextMapForMasterSlave);
852+
if (mysqlConfig.getHighPerformance()) {
853+
MysqlReaderV2 mysqlReaderV2 = new MysqlReaderV2(mysqlJdbcContext, firstConnectorId, tapLogger);
854+
mysqlReaderV2.init(tables, tapConnectorContext.getTableMap(), offset, batchSize, consumer);
855+
mysqlReaderV2.startMiner(this::isAlive);
856+
} else {
857+
mysqlReader.readBinlog(tapConnectorContext, tables, offset, batchSize, DDLParserType.MYSQL_CCJ_SQL_PARSER, consumer, contextMapForMasterSlave);
858+
}
852859
}
853860

854861

@@ -877,7 +884,21 @@ private Object timestampToStreamOffset(TapConnectorContext tapConnectorContext,
877884
}
878885
return this.mysqlJdbcContext.readBinlogPosition();
879886
}
880-
return startTime;
887+
if (mysqlConfig.getHighPerformance()) {
888+
try (MysqlBinlogPositionUtil ins = new MysqlBinlogPositionUtil(
889+
mysqlConfig.getHost(),
890+
mysqlConfig.getPort(),
891+
mysqlConfig.getUser(),
892+
mysqlConfig.getPassword())) {
893+
MysqlBinlogPosition mysqlBinlogPosition = ins.findByLessTimestamp(startTime, true);
894+
if (null == mysqlBinlogPosition) {
895+
throw new RuntimeException("Not found binlog of sync time: " + startTime);
896+
}
897+
return mysqlBinlogPosition;
898+
}
899+
} else {
900+
return startTime;
901+
}
881902
}
882903

883904

connectors/mysql-connector/src/main/resources/mysql-spec.json

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -339,6 +339,24 @@
339339
"node": {
340340
"type": "object",
341341
"properties": {
342+
"highPerformance": {
343+
"type": "boolean",
344+
"title": "${highPerformance}",
345+
"default": false,
346+
"x-index": 4,
347+
"x-decorator": "FormItem",
348+
"x-component": "Switch",
349+
"x-reactions": [
350+
{
351+
"dependencies": ["$inputs"],
352+
"fulfill": {
353+
"state": {
354+
"display": "{{!$deps[0].length ? \"visible\":\"hidden\"}}"
355+
}
356+
}
357+
}
358+
]
359+
},
342360
"createAutoInc": {
343361
"type": "boolean",
344362
"title": "${createAutoInc}",
@@ -528,7 +546,8 @@
528546
"batchReadThreadSize": "Batch read thread size",
529547
"maximumQueueSize": "Maximum queue size",
530548
"maximumQueueSizeTip": "The queue size for reading incremental data in MySQL. If the downstream synchronization is slow or individual records in the table are too large, please lower this setting.",
531-
"lowerCaseTableNames": "Lower Case TableNames"
549+
"lowerCaseTableNames": "Lower Case TableNames",
550+
"highPerformance": "High Performance Mode"
532551
},
533552
"zh_CN": {
534553
"host": "地址",
@@ -565,7 +584,8 @@
565584
"batchReadThreadSize": "批量读取线程数",
566585
"maximumQueueSize": "最大队列大小",
567586
"maximumQueueSizeTip": "MySql读取增量数据队列大小,如果下游同步较慢或表的单条数据过大,请调低此配置。",
568-
"lowerCaseTableNames": "大小写敏感"
587+
"lowerCaseTableNames": "大小写敏感",
588+
"highPerformance": "高性能模式"
569589
},
570590
"zh_TW": {
571591
"host": "地址",
@@ -602,7 +622,8 @@
602622
"batchReadThreadSize": "批量讀取線程數",
603623
"maximumQueueSize": "最大隊列大小",
604624
"maximumQueueSizeTip": "MySql 讀取增量數據隊列大小。如果下游同步較慢或表的單條數據過大,請調低此配置。",
605-
"lowerCaseTableNames": "大小寫敏感"
625+
"lowerCaseTableNames": "大小寫敏感",
626+
"highPerformance": "高性能模式"
606627
}
607628
},
608629
"dataTypes": {

0 commit comments

Comments
 (0)