Skip to content

Commit f09faa0

Browse files
authored
[ISSUE #5139] update canal connector module (#5140)
* [ISSUE #5137] update connector runtime v2 module * fix checkStyle error * [ISSUE #5139] update canal connector module
1 parent 293a61e commit f09faa0

24 files changed

+1673
-786
lines changed

Diff for: eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSinkFullConfig.java

+1
Original file line numberDiff line numberDiff line change
@@ -28,4 +28,5 @@
2828
public class CanalSinkFullConfig extends SinkConfig {
2929
private SinkConnectorConfig sinkConnectorConfig;
3030
private String zeroDate;
31+
private int parallel = 2;
3132
}

Diff for: eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSinkIncrementConfig.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,9 @@ public class CanalSinkIncrementConfig extends CanalSinkConfig {
3636
private Integer poolSize = 5;
3737

3838
// sync mode: field/row
39-
private SyncMode syncMode;
39+
private SyncMode syncMode = SyncMode.ROW;
4040

41-
private boolean isGTIDMode = true;
41+
private boolean isGTIDMode = false;
4242

4343
private boolean isMariaDB = true;
4444

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.eventmesh.common.config.connector.rdb.canal;
19+
20+
import org.apache.eventmesh.common.config.connector.SourceConfig;
21+
import org.apache.eventmesh.common.remote.offset.RecordPosition;
22+
23+
import java.util.List;
24+
25+
import lombok.Data;
26+
import lombok.EqualsAndHashCode;
27+
28+
@Data
29+
@EqualsAndHashCode(callSuper = true)
30+
public class CanalSourceCheckConfig extends SourceConfig {
31+
private SourceConnectorConfig sourceConnectorConfig;
32+
private List<RecordPosition> startPosition;
33+
private int parallel;
34+
private int flushSize;
35+
private int executePeriod = 3600;
36+
private Integer pagePerSecond = 1;
37+
private Integer recordPerSecond = 100;
38+
}

Diff for: eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSourceFullConfig.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@
3030
public class CanalSourceFullConfig extends SourceConfig {
3131
private SourceConnectorConfig sourceConnectorConfig;
3232
private List<RecordPosition> startPosition;
33-
private int parallel;
34-
private int flushSize;
33+
private int parallel = 2;
34+
private int flushSize = 20;
35+
private Integer pagePerSecond = 1;
36+
private Integer recordPerSecond = 100;
3537
}

Diff for: eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSourceIncrementConfig.java

+8-8
Original file line numberDiff line numberDiff line change
@@ -32,17 +32,17 @@ public class CanalSourceIncrementConfig extends CanalSourceConfig {
3232

3333
private String destination;
3434

35-
private Long canalInstanceId;
35+
private Long canalInstanceId = 1L;
3636

37-
private String desc;
37+
private String desc = "canalSourceInstance";
3838

39-
private boolean ddlSync = true;
39+
private boolean ddlSync = false;
4040

4141
private boolean filterTableError = false;
4242

4343
private Long slaveId;
4444

45-
private Short clientId;
45+
private Short clientId = 1;
4646

4747
private String serverUUID;
4848

@@ -67,19 +67,19 @@ public class CanalSourceIncrementConfig extends CanalSourceConfig {
6767
private Boolean enableRemedy = false;
6868

6969
// sync mode: field/row
70-
private SyncMode syncMode;
70+
private SyncMode syncMode = SyncMode.ROW;
7171

7272
// sync consistency
73-
private SyncConsistency syncConsistency;
73+
private SyncConsistency syncConsistency = SyncConsistency.BASE;
7474

7575
// ================================= system parameter
7676
// ================================
7777

7878
// Column name of the bidirectional synchronization mark
79-
private String needSyncMarkTableColumnName = "needSync";
79+
private String needSyncMarkTableColumnName;
8080

8181
// Column value of the bidirectional synchronization mark
82-
private String needSyncMarkTableColumnValue = "needSync";
82+
private String needSyncMarkTableColumnValue;
8383

8484
private SourceConnectorConfig sourceConnectorConfig;
8585

Diff for: eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/JobRdbFullPosition.java

+1
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ public class JobRdbFullPosition {
3030
private String tableName;
3131
private String primaryKeyRecords;
3232
private long maxCount;
33+
private long handledRecordCount = 0;
3334
private boolean finished;
3435
private BigDecimal percent;
3536
}

Diff for: eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/mysql/MySQLTableDef.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@
1919

2020
import org.apache.eventmesh.common.config.connector.rdb.canal.RdbTableDefinition;
2121

22+
import java.util.List;
2223
import java.util.Map;
23-
import java.util.Set;
2424

2525
import lombok.Data;
2626
import lombok.EqualsAndHashCode;
@@ -31,6 +31,6 @@
3131
@Data
3232
@EqualsAndHashCode(callSuper = true)
3333
public class MySQLTableDef extends RdbTableDefinition {
34-
private Set<String> primaryKeys;
34+
private List<String> primaryKeys;
3535
private Map<String, MySQLColumnDef> columnDefinitions;
3636
}

Diff for: eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/datasource/DataSourceDriverType.java

+1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
public enum DataSourceDriverType {
2121
MYSQL,
22+
MariaDB,
2223
REDIS,
2324
ROCKETMQ,
2425
HTTP;

Diff for: eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/datasource/DataSourceType.java

+1
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
@ToString
2828
public enum DataSourceType {
2929
MYSQL("MySQL", DataSourceDriverType.MYSQL, DataSourceClassify.RDB),
30+
MariaDB("MariaDB", DataSourceDriverType.MariaDB, DataSourceClassify.RDB),
3031
REDIS("Redis", DataSourceDriverType.REDIS, DataSourceClassify.CACHE),
3132
ROCKETMQ("RocketMQ", DataSourceDriverType.ROCKETMQ, DataSourceClassify.MQ),
3233
HTTP("HTTP", DataSourceDriverType.HTTP, DataSourceClassify.TUNNEL);

Diff for: eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/SqlUtils.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -780,8 +780,8 @@ public static LocalDateTime toLocalDateTime(Object value) {
780780
long nanos = ((Timestamp) value).getNanos();
781781
return Instant.ofEpochMilli(((Timestamp) value).getTime() - (nanos / 1000000)).plusNanos(nanos).atZone(ZoneId.systemDefault())
782782
.toLocalDateTime();
783-
} else if (value instanceof java.sql.Date) {
784-
return ((java.sql.Date) value).toLocalDate().atTime(0, 0);
783+
} else if (value instanceof Date) {
784+
return ((Date) value).toLocalDate().atTime(0, 0);
785785
} else {
786786
if (!(value instanceof Time)) {
787787
return ((java.util.Date) value).toInstant().atZone(ZoneId.systemDefault()).toLocalDateTime();

Diff for: eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/interceptor/SqlBuilderLoadInterceptor.java

+14-11
Original file line numberDiff line numberDiff line change
@@ -64,24 +64,27 @@ public boolean before(CanalSinkIncrementConfig sinkConfig, CanalConnectRecord re
6464
String[] keyColumns = null;
6565
String[] otherColumns = null;
6666
if (existOldKeys) {
67+
// update table xxx set pk = newPK where pk = oldPk
6768
keyColumns = buildColumnNames(record.getOldKeys());
6869
otherColumns = buildColumnNames(record.getUpdatedColumns(), record.getKeys());
6970
} else {
7071
keyColumns = buildColumnNames(record.getKeys());
7172
otherColumns = buildColumnNames(record.getUpdatedColumns());
7273
}
7374

74-
if (rowMode && !existOldKeys) {
75-
sql = sqlTemplate.getMergeSql(schemaName,
76-
record.getTableName(),
77-
keyColumns,
78-
otherColumns,
79-
new String[] {},
80-
true,
81-
shardColumns);
82-
} else {
83-
sql = sqlTemplate.getUpdateSql(schemaName, record.getTableName(), keyColumns, otherColumns, true, shardColumns);
84-
}
75+
// not support the column default not null for merge sql
76+
// if (rowMode && !existOldKeys) {
77+
// sql = sqlTemplate.getMergeSql(schemaName,
78+
// record.getTableName(),
79+
// keyColumns,
80+
// otherColumns,
81+
// new String[] {},
82+
// true,
83+
// shardColumns);
84+
// } else {
85+
// sql = sqlTemplate.getUpdateSql(schemaName, record.getTableName(), keyColumns, otherColumns, true, shardColumns);
86+
// }
87+
sql = sqlTemplate.getUpdateSql(schemaName, record.getTableName(), keyColumns, otherColumns, true, shardColumns);
8588
} else if (type.isDelete()) {
8689
sql = sqlTemplate.getDeleteSql(schemaName,
8790
record.getTableName(),

0 commit comments

Comments
 (0)