Skip to content

Commit 9c1971d

Browse files
authored
[ISSUE apache#5040] Support gtid mode for sync data with mysql (apache#5041)
* [ISSUE apache#5040] Support gtid mode for sync data with mysql * fix conflicts with master * fix checkstyle error
1 parent 9d93400 commit 9c1971d

File tree

19 files changed

+511
-170
lines changed

19 files changed

+511
-170
lines changed

eventmesh-admin-server/conf/eventmesh.sql

+3
Original file line numberDiff line numberDiff line change
@@ -71,8 +71,11 @@ CREATE TABLE IF NOT EXISTS `event_mesh_job_info` (
7171
CREATE TABLE IF NOT EXISTS `event_mesh_mysql_position` (
7272
`id` int unsigned NOT NULL AUTO_INCREMENT,
7373
`jobID` int unsigned NOT NULL,
74+
`serverUUID` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL,
7475
`address` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL,
7576
`position` bigint DEFAULT NULL,
77+
`gtid` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL,
78+
`currentGtid` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL,
7679
`timestamp` bigint DEFAULT NULL,
7780
`journalName` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL,
7881
`createTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,

eventmesh-admin-server/conf/mapper/EventMeshMysqlPositionMapper.xml

+16-12
Original file line numberDiff line numberDiff line change
@@ -16,24 +16,28 @@
1616
limitations under the License.
1717
-->
1818
<!DOCTYPE mapper
19-
PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
20-
"http://mybatis.org/dtd/mybatis-3-mapper.dtd">
19+
PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
20+
"http://mybatis.org/dtd/mybatis-3-mapper.dtd">
2121
<mapper namespace="org.apache.eventmesh.admin.server.web.db.mapper.EventMeshMysqlPositionMapper">
2222

2323
<resultMap id="BaseResultMap" type="org.apache.eventmesh.admin.server.web.db.entity.EventMeshMysqlPosition">
24-
<id property="id" column="id" jdbcType="INTEGER"/>
25-
<result property="jobID" column="jobID" jdbcType="INTEGER"/>
26-
<result property="address" column="address" jdbcType="VARCHAR"/>
27-
<result property="position" column="position" jdbcType="BIGINT"/>
28-
<result property="timestamp" column="timestamp" jdbcType="BIGINT"/>
29-
<result property="journalName" column="journalName" jdbcType="VARCHAR"/>
30-
<result property="createTime" column="createTime" jdbcType="TIMESTAMP"/>
31-
<result property="updateTime" column="updateTime" jdbcType="TIMESTAMP"/>
24+
<id property="id" column="id" jdbcType="INTEGER"/>
25+
<result property="jobID" column="jobID" jdbcType="INTEGER"/>
26+
<result property="serverUUID" column="serverUUID" jdbcType="VARCHAR"/>
27+
<result property="address" column="address" jdbcType="VARCHAR"/>
28+
<result property="position" column="position" jdbcType="BIGINT"/>
29+
<result property="gtid" column="gtid" jdbcType="VARCHAR"/>
30+
<result property="currentGtid" column="currentGtid" jdbcType="VARCHAR"/>
31+
<result property="timestamp" column="timestamp" jdbcType="BIGINT"/>
32+
<result property="journalName" column="journalName" jdbcType="VARCHAR"/>
33+
<result property="createTime" column="createTime" jdbcType="TIMESTAMP"/>
34+
<result property="updateTime" column="updateTime" jdbcType="TIMESTAMP"/>
3235
</resultMap>
3336

3437
<sql id="Base_Column_List">
35-
id,jobID,address,
36-
position,timestamp,journalName,
38+
id
39+
,jobID,serverUUID,address,
40+
position,gtid,currentGtid,timestamp,journalName,
3741
createTime,updateTime
3842
</sql>
3943
</mapper>

eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshMysqlPosition.java

+6
Original file line numberDiff line numberDiff line change
@@ -38,10 +38,16 @@ public class EventMeshMysqlPosition implements Serializable {
3838

3939
private Integer jobID;
4040

41+
private String serverUUID;
42+
4143
private String address;
4244

4345
private Long position;
4446

47+
private String gtid;
48+
49+
private String currentGtid;
50+
4551
private Long timestamp;
4652

4753
private String journalName;

eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/impl/MysqlPositionHandler.java

+7-1
Original file line numberDiff line numberDiff line change
@@ -115,9 +115,12 @@ public boolean handler(ReportPositionRequest request, Metadata metadata) {
115115
CanalRecordOffset offset = (CanalRecordOffset) recordPosition.getRecordOffset();
116116
if (offset != null) {
117117
position.setPosition(offset.getOffset());
118+
position.setGtid(offset.getGtid());
119+
position.setCurrentGtid(offset.getCurrentGtid());
118120
}
119121
CanalRecordPartition partition = (CanalRecordPartition) recordPosition.getRecordPartition();
120122
if (partition != null) {
123+
position.setServerUUID(partition.getServerUUID());
121124
position.setTimestamp(partition.getTimeStamp());
122125
position.setJournalName(partition.getJournalName());
123126
}
@@ -148,13 +151,16 @@ public List<RecordPosition> handler(FetchPositionRequest request, Metadata metad
148151
request.getJobID()));
149152
List<RecordPosition> recordPositionList = new ArrayList<>();
150153
for (EventMeshMysqlPosition position : positionList) {
151-
RecordPosition recordPosition = new RecordPosition();
152154
CanalRecordPartition partition = new CanalRecordPartition();
153155
partition.setTimeStamp(position.getTimestamp());
154156
partition.setJournalName(position.getJournalName());
157+
partition.setServerUUID(position.getServerUUID());
158+
RecordPosition recordPosition = new RecordPosition();
155159
recordPosition.setRecordPartition(partition);
156160
CanalRecordOffset offset = new CanalRecordOffset();
157161
offset.setOffset(position.getPosition());
162+
offset.setGtid(position.getGtid());
163+
offset.setCurrentGtid(position.getCurrentGtid());
158164
recordPosition.setRecordOffset(offset);
159165
recordPositionList.add(recordPosition);
160166
}

eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSinkConfig.java

+2
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ public class CanalSinkConfig extends SinkConfig {
3939
// sync mode: field/row
4040
private SyncMode syncMode;
4141

42+
private boolean isGTIDMode = true;
43+
4244
// skip sink process exception
4345
private Boolean skipException = false;
4446

eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSourceConfig.java

+4
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,10 @@ public class CanalSourceConfig extends SourceConfig {
4545

4646
private Short clientId;
4747

48+
private String serverUUID;
49+
50+
private boolean isGTIDMode = true;
51+
4852
private Integer batchSize = 10000;
4953

5054
private Long batchTimeout = -1L;

eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/offset/canal/CanalRecordOffset.java

+5
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,11 @@ public class CanalRecordOffset extends RecordOffset {
3030

3131
private Long offset;
3232

33+
// mysql instance gtid range
34+
private String gtid;
35+
36+
private String currentGtid;
37+
3338
@Override
3439
public Class<? extends RecordOffset> getRecordOffsetClass() {
3540
return CanalRecordOffset.class;

eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/offset/canal/CanalRecordPartition.java

+2
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@
2929
@ToString
3030
public class CanalRecordPartition extends RecordPartition {
3131

32+
private String serverUUID;
33+
3234
private String journalName;
3335

3436
private Long timeStamp;

eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/CanalConnectRecord.java

+6
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,14 @@
3131
public class CanalConnectRecord {
3232

3333
private String schemaName;
34+
3435
private String tableName;
3536

37+
// mysql instance gtid range
38+
private String gtid;
39+
40+
private String currentGtid;
41+
3642
/**
3743
* The business type of the changed data (I/U/D/C/A/E), consistent with the EventType defined in EntryProtocol in canal.
3844
*/

eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/dialect/AbstractDbDialect.java

-4
Original file line numberDiff line numberDiff line change
@@ -97,10 +97,6 @@ public SqlTemplate getSqlTemplate() {
9797
return sqlTemplate;
9898
}
9999

100-
public boolean isDRDS() {
101-
return false;
102-
}
103-
104100
public String getShardColumns(String schema, String table) {
105101
return null;
106102
}

eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/dialect/DbDialect.java

-2
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,6 @@ public interface DbDialect {
4848

4949
public boolean isSupportMergeSql();
5050

51-
public boolean isDRDS();
52-
5351
public LobHandler getLobHandler();
5452

5553
public JdbcTemplate getJdbcTemplate();

eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/dialect/MysqlDialect.java

-4
Original file line numberDiff line numberDiff line change
@@ -50,10 +50,6 @@ public String getDefaultSchema() {
5050
return null;
5151
}
5252

53-
public boolean isDRDS() {
54-
return false;
55-
}
56-
5753
public String getDefaultCatalog() {
5854
return jdbcTemplate.queryForObject("select database()", String.class);
5955
}

eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/interceptor/SqlBuilderLoadInterceptor.java

+5-19
Original file line numberDiff line numberDiff line change
@@ -51,35 +51,21 @@ public boolean before(CanalSinkConfig sinkConfig, CanalConnectRecord record) {
5151
String shardColumns = null;
5252

5353
if (type.isInsert()) {
54-
if (CollectionUtils.isEmpty(record.getColumns())
55-
&& (dbDialect.isDRDS())) {
56-
// sql
57-
sql = sqlTemplate.getInsertSql(schemaName,
58-
record.getTableName(),
59-
buildColumnNames(record.getKeys()),
60-
buildColumnNames(record.getColumns()));
61-
} else {
62-
sql = sqlTemplate.getMergeSql(schemaName,
54+
sql = sqlTemplate.getMergeSql(schemaName,
6355
record.getTableName(),
6456
buildColumnNames(record.getKeys()),
6557
buildColumnNames(record.getColumns()),
6658
new String[] {},
67-
!dbDialect.isDRDS(),
59+
true,
6860
shardColumns);
69-
}
7061
} else if (type.isUpdate()) {
71-
7262
boolean existOldKeys = !CollectionUtils.isEmpty(record.getOldKeys());
7363
boolean rowMode = sinkConfig.getSyncMode().isRow();
7464
String[] keyColumns = null;
7565
String[] otherColumns = null;
7666
if (existOldKeys) {
7767
keyColumns = buildColumnNames(record.getOldKeys());
78-
if (dbDialect.isDRDS()) {
79-
otherColumns = buildColumnNames(record.getUpdatedColumns(), record.getUpdatedKeys());
80-
} else {
81-
otherColumns = buildColumnNames(record.getUpdatedColumns(), record.getKeys());
82-
}
68+
otherColumns = buildColumnNames(record.getUpdatedColumns(), record.getKeys());
8369
} else {
8470
keyColumns = buildColumnNames(record.getKeys());
8571
otherColumns = buildColumnNames(record.getUpdatedColumns());
@@ -91,10 +77,10 @@ public boolean before(CanalSinkConfig sinkConfig, CanalConnectRecord record) {
9177
keyColumns,
9278
otherColumns,
9379
new String[] {},
94-
!dbDialect.isDRDS(),
80+
true,
9581
shardColumns);
9682
} else {
97-
sql = sqlTemplate.getUpdateSql(schemaName, record.getTableName(), keyColumns, otherColumns, !dbDialect.isDRDS(), shardColumns);
83+
sql = sqlTemplate.getUpdateSql(schemaName, record.getTableName(), keyColumns, otherColumns, true, shardColumns);
9884
}
9985
} else if (type.isDelete()) {
10086
sql = sqlTemplate.getDeleteSql(schemaName,

eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/DbLoadContext.java

+2
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@
2828
@Data
2929
public class DbLoadContext {
3030

31+
private String gtid;
32+
3133
private List<CanalConnectRecord> lastProcessedRecords;
3234

3335
private List<CanalConnectRecord> prepareRecords;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.eventmesh.connector.canal.sink;
19+
20+
import org.apache.eventmesh.connector.canal.CanalConnectRecord;
21+
22+
import java.util.List;
23+
import java.util.concurrent.CopyOnWriteArrayList;
24+
25+
public class GtidBatch {
26+
private int totalBatches;
27+
private List<List<CanalConnectRecord>> batches;
28+
private int receivedBatchCount;
29+
30+
public GtidBatch(int totalBatches) {
31+
this.totalBatches = totalBatches;
32+
this.batches = new CopyOnWriteArrayList<>(new List[totalBatches]);
33+
this.receivedBatchCount = 0;
34+
}
35+
36+
public void addBatch(int batchIndex, List<CanalConnectRecord> batchRecords) {
37+
batches.set(batchIndex, batchRecords);
38+
receivedBatchCount++;
39+
}
40+
41+
public List<List<CanalConnectRecord>> getBatches() {
42+
return batches;
43+
}
44+
45+
public boolean isComplete() {
46+
return receivedBatchCount == totalBatches;
47+
}
48+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.eventmesh.connector.canal.sink;
19+
20+
import org.apache.eventmesh.connector.canal.CanalConnectRecord;
21+
22+
import java.util.List;
23+
import java.util.concurrent.ConcurrentHashMap;
24+
25+
public class GtidBatchManager {
26+
27+
private static ConcurrentHashMap<String, GtidBatch> gtidBatchMap = new ConcurrentHashMap<>();
28+
29+
public static void addBatch(String gtid, int batchIndex, int totalBatches, List<CanalConnectRecord> batchRecords) {
30+
gtidBatchMap.computeIfAbsent(gtid, k -> new GtidBatch(totalBatches)).addBatch(batchIndex, batchRecords);
31+
}
32+
33+
public static GtidBatch getGtidBatch(String gtid) {
34+
return gtidBatchMap.get(gtid);
35+
}
36+
37+
public static boolean isComplete(String gtid) {
38+
GtidBatch batch = gtidBatchMap.get(gtid);
39+
return batch != null && batch.isComplete();
40+
}
41+
42+
public static void removeGtidBatch(String gtid) {
43+
gtidBatchMap.remove(gtid);
44+
}
45+
}

0 commit comments

Comments
 (0)