diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSinkConfig.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSinkConfig.java index 80aec7bfe9..026f33f4fc 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSinkConfig.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSinkConfig.java @@ -41,6 +41,8 @@ public class CanalSinkConfig extends SinkConfig { private boolean isGTIDMode = true; + private boolean isMariaDB = true; + // skip sink process exception private Boolean skipException = false; diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSourceConfig.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSourceConfig.java index 707f102901..8331d32cb7 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSourceConfig.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSourceConfig.java @@ -47,6 +47,8 @@ public class CanalSourceConfig extends SourceConfig { private String serverUUID; + private boolean isMariaDB = true; + private boolean isGTIDMode = true; private Integer batchSize = 10000; diff --git a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkConnector.java b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkConnector.java index 5f3c0a2bca..8ecda8e125 100644 --- a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkConnector.java +++ b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkConnector.java @@ -476,9 +476,11 @@ private Exception doCall() { } JdbcTemplate template = dbDialect.getJdbcTemplate(); String sourceGtid = context.getGtid(); - if (StringUtils.isNotEmpty(sourceGtid)) { - String setGtid = "SET @@session.gtid_next = '" + sourceGtid + "';"; - template.execute(setGtid); + if (StringUtils.isNotEmpty(sourceGtid) && !sinkConfig.isMariaDB()) { + String setMySQLGtid = "SET @@session.gtid_next = '" + sourceGtid + "';"; + template.execute(setMySQLGtid); + } else if (StringUtils.isNotEmpty(sourceGtid) && sinkConfig.isMariaDB()) { + throw new RuntimeException("unsupport gtid mode for mariaDB"); } else { log.error("gtid is empty in gtid mode"); throw new RuntimeException("gtid is empty in gtid mode"); @@ -510,8 +512,13 @@ public void setValues(PreparedStatement ps) throws SQLException { }); // reset gtid - String resetGtid = "SET @@session.gtid_next = AUTOMATIC;"; - dbDialect.getJdbcTemplate().execute(resetGtid); + if (sinkConfig.isMariaDB()) { + throw new RuntimeException("unsupport gtid mode for mariaDB"); + } else { + String resetMySQLGtid = "SET @@session.gtid_next = 'AUTOMATIC';"; + dbDialect.getJdbcTemplate().execute(resetMySQLGtid); + } + error = null; exeResult = ExecuteResult.SUCCESS; } catch (DeadlockLoserDataAccessException ex) { diff --git a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/EntryParser.java b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/EntryParser.java index 708d5d120c..5c4303588d 100644 --- a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/EntryParser.java +++ b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/EntryParser.java @@ -49,7 +49,7 @@ public class EntryParser { public static Map> parse(CanalSourceConfig sourceConfig, List datas, - RdbTableMgr tables) { + RdbTableMgr tables) { List recordList = new ArrayList<>(); List transactionDataBuffer = new ArrayList<>(); // need check weather the entry is loopback @@ -60,9 +60,9 @@ public static Map> parse(CanalSourceConfig source switch (entry.getEntryType()) { case ROWDATA: RowChange rowChange = RowChange.parseFrom(entry.getStoreValue()); - if (sourceConfig.getServerUUID() != null && sourceConfig.isGTIDMode()) { - String currentGtid = entry.getHeader().getPropsList().get(0).getValue(); - if (currentGtid.contains(sourceConfig.getServerUUID())) { + // don't support gtid for mariadb + if (sourceConfig.getServerUUID() != null && sourceConfig.isGTIDMode() && !sourceConfig.isMariaDB()) { + if (checkGtidForEntry(entry, sourceConfig)) { transactionDataBuffer.add(entry); } } else { @@ -90,9 +90,14 @@ public static Map> parse(CanalSourceConfig source return recordMap; } + private static boolean checkGtidForEntry(Entry entry, CanalSourceConfig sourceConfig) { + String currentGtid = entry.getHeader().getPropsList().get(0).getValue(); + return currentGtid.contains(sourceConfig.getServerUUID()); + } + private static void parseRecordListWithEntryBuffer(CanalSourceConfig sourceConfig, - List recordList, - List transactionDataBuffer, RdbTableMgr tables) { + List recordList, + List transactionDataBuffer, RdbTableMgr tables) { for (Entry bufferEntry : transactionDataBuffer) { List recordParsedList = internParse(sourceConfig, bufferEntry, tables); if (CollectionUtils.isEmpty(recordParsedList)) { @@ -130,7 +135,7 @@ private static Column getColumnIgnoreCase(List columns, String columName } private static List internParse(CanalSourceConfig sourceConfig, Entry entry, - RdbTableMgr tableMgr) { + RdbTableMgr tableMgr) { String schemaName = entry.getHeader().getSchemaName(); String tableName = entry.getHeader().getTableName(); if (tableMgr.getTable(schemaName, tableName) == null) { @@ -169,7 +174,7 @@ private static List internParse(CanalSourceConfig sourceConf } private static CanalConnectRecord internParse(CanalSourceConfig canalSourceConfig, Entry entry, - RowChange rowChange, RowData rowData) { + RowChange rowChange, RowData rowData) { CanalConnectRecord canalConnectRecord = new CanalConnectRecord(); canalConnectRecord.setTableName(entry.getHeader().getTableName()); canalConnectRecord.setSchemaName(entry.getHeader().getSchemaName()); @@ -179,10 +184,16 @@ private static CanalConnectRecord internParse(CanalSourceConfig canalSourceConfi canalConnectRecord.setBinLogOffset(entry.getHeader().getLogfileOffset()); // if enabled gtid mode, gtid not null if (canalSourceConfig.isGTIDMode()) { - String currentGtid = entry.getHeader().getPropsList().get(0).getValue(); - String gtidRange = replaceGtidRange(entry.getHeader().getGtid(), currentGtid, canalSourceConfig.getServerUUID()); - canalConnectRecord.setGtid(gtidRange); - canalConnectRecord.setCurrentGtid(currentGtid); + if (canalSourceConfig.isMariaDB()) { + String currentGtid = entry.getHeader().getGtid(); + canalConnectRecord.setGtid(currentGtid); + canalConnectRecord.setCurrentGtid(currentGtid); + } else { + String currentGtid = entry.getHeader().getPropsList().get(0).getValue(); + String gtidRange = replaceGtidRange(entry.getHeader().getGtid(), currentGtid, canalSourceConfig.getServerUUID()); + canalConnectRecord.setGtid(gtidRange); + canalConnectRecord.setCurrentGtid(currentGtid); + } } EventType eventType = canalConnectRecord.getEventType(); @@ -276,7 +287,7 @@ public static String replaceGtidRange(String gtid, String currentGtid, String se } private static void checkUpdateKeyColumns(Map oldKeyColumns, - Map keyColumns) { + Map keyColumns) { if (oldKeyColumns.isEmpty()) { return; } diff --git a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceConnector.java b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceConnector.java index 6cd575cb77..f3f8b2e160 100644 --- a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceConnector.java +++ b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceConnector.java @@ -198,13 +198,16 @@ private Canal buildCanal(CanalSourceConfig sourceConfig) { recordPositionMap.put("journalName", canalRecordPartition.getJournalName()); recordPositionMap.put("timestamp", canalRecordPartition.getTimeStamp()); recordPositionMap.put("position", canalRecordOffset.getOffset()); - String gtidRange = canalRecordOffset.getGtid(); - if (gtidRange != null) { - if (canalRecordOffset.getCurrentGtid() != null) { - gtidRange = EntryParser.replaceGtidRange(canalRecordOffset.getGtid(), canalRecordOffset.getCurrentGtid(), - sourceConfig.getServerUUID()); + // for mariaDB not support gtid mode + if (sourceConfig.isGTIDMode() && !sourceConfig.isMariaDB()) { + String gtidRange = canalRecordOffset.getGtid(); + if (gtidRange != null) { + if (canalRecordOffset.getCurrentGtid() != null) { + gtidRange = EntryParser.replaceGtidRange(canalRecordOffset.getGtid(), canalRecordOffset.getCurrentGtid(), + sourceConfig.getServerUUID()); + } + recordPositionMap.put("gtid", gtidRange); } - recordPositionMap.put("gtid", gtidRange); } positions.add(JsonUtils.toJSONString(recordPositionMap)); });