Skip to content

Commit e92bfe8

Browse files
committed
feat: mysql-oceanbase performance
2 parents 4392549 + 2965d48 commit e92bfe8

File tree

11 files changed

+115
-7
lines changed

11 files changed

+115
-7
lines changed

connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/PostgresSqlMaker.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,9 @@ protected void buildNullDefinition(StringBuilder builder, TapField tapField) {
3838
}
3939

4040
protected void buildDefaultDefinition(StringBuilder builder, TapField tapField) {
41+
if (EmptyKit.isNotNull(dbVersion) && Integer.parseInt(dbVersion) < 100000 && EmptyKit.isNotBlank(tapField.getSequenceName())) {
42+
return;
43+
}
4144
if (EmptyKit.isNotNull(tapField.getDefaultValue())) {
4245
builder.append("DEFAULT").append(' ');
4346
if (EmptyKit.isNotNull(tapField.getDefaultFunction())) {
@@ -62,9 +65,17 @@ protected void buildAutoIncDefinition(StringBuilder builder, TapField tapField)
6265
} else {
6366
startValue = 1;
6467
}
68+
long incrementValue;
69+
if (EmptyKit.isNotNull(tapField.getAutoIncrementValue())) {
70+
incrementValue = tapField.getAutoIncrementValue();
71+
} else {
72+
incrementValue = 1;
73+
}
6574
builder.append("GENERATED BY DEFAULT AS IDENTITY (START WITH ")
6675
.append(startValue)
67-
.append(" INCREMENT BY 1 CACHE ")
76+
.append(" INCREMENT BY ")
77+
.append(incrementValue)
78+
.append(" CACHE ")
6879
.append(autoIncCacheValue)
6980
.append(") ");
7081
}

connectors-common/sql-core/src/main/java/io/tapdata/common/CommonDbConfig.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ public class CommonDbConfig implements Serializable {
4747
private long autoIncJumpValue = 1000000L;
4848
private long autoIncCacheValue = 100;
4949
private Boolean applyDefault = false;
50+
private Boolean applyForeignKey = false;
5051
private Integer writeThreadSize = 15;
5152
protected String timezone = "+00:00";
5253
protected ZoneId zoneId;
@@ -322,6 +323,14 @@ public void setApplyDefault(Boolean applyDefault) {
322323
this.applyDefault = applyDefault;
323324
}
324325

326+
public Boolean getApplyForeignKey() {
327+
return applyForeignKey;
328+
}
329+
330+
public void setApplyForeignKey(Boolean applyForeignKey) {
331+
this.applyForeignKey = applyForeignKey;
332+
}
333+
325334
public Integer getWriteThreadSize() {
326335
return writeThreadSize;
327336
}

connectors-common/sql-core/src/main/java/io/tapdata/common/CommonDbConnector.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -450,6 +450,9 @@ protected void createIndex(TapConnectorContext connectorContext, TapTable tapTab
450450
}
451451

452452
protected void createConstraint(TapConnectorContext connectorContext, TapTable tapTable, TapCreateConstraintEvent createConstraintEvent, boolean create) {
453+
if (!Boolean.TRUE.equals(commonDbConfig.getApplyForeignKey())) {
454+
return;
455+
}
453456
List<TapConstraint> constraintList = createConstraintEvent.getConstraintList();
454457
if (EmptyKit.isNotEmpty(constraintList)) {
455458
List<String> constraintSqlList = new ArrayList<>();

connectors-common/sql-core/src/main/java/io/tapdata/common/CommonSqlMaker.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ public class CommonSqlMaker {
3737
protected long autoIncCacheValue = 1;
3838
protected Boolean applyDefault = false;
3939
protected String schema;
40+
protected String dbVersion;
4041

4142
public CommonSqlMaker() {
4243

@@ -71,6 +72,11 @@ public <T extends CommonSqlMaker> T schema(String schema) {
7172
return (T) this;
7273
}
7374

75+
public <T extends CommonSqlMaker> T dbVersion(String dbVersion) {
76+
this.dbVersion = dbVersion;
77+
return (T) this;
78+
}
79+
7480
public char getEscapeChar() {
7581
return escapeChar;
7682
}

connectors-common/sql-core/src/main/java/io/tapdata/common/dml/NormalRecordWriter.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ public class NormalRecordWriter {
4242
protected Map<String, Object> autoIncMap = new HashMap<>();
4343
protected boolean largeSql = false;
4444
protected CommonDbConfig commonDbConfig;
45+
protected boolean needCloseIdentity = false;
4546

4647
public NormalRecordWriter(JdbcContext jdbcContext, TapTable tapTable) throws SQLException {
4748
this.commonDbConfig = jdbcContext.getConfig();
@@ -147,6 +148,9 @@ public void write(List<TapRecordEvent> tapRecordEvents, Consumer<WriteListResult
147148
updateRecorder.releaseResource();
148149
deleteRecorder.releaseResource();
149150
if (!isTransaction) {
151+
if (needCloseIdentity) {
152+
openIdentity();
153+
}
150154
connection.close();
151155
}
152156
writeListResultConsumer.accept(listResult
@@ -217,9 +221,23 @@ public void closeIdentity() throws SQLException {
217221
statement.execute(sql);
218222
}
219223
}
224+
needCloseIdentity = true;
220225
}
221226

222227
protected String getIdentitySql() {
223228
return null;
224229
}
230+
231+
public void openIdentity() throws SQLException {
232+
String sql = getOpenIdentitySql();
233+
if (EmptyKit.isNotBlank(sql)) {
234+
try (Statement statement = connection.createStatement()) {
235+
statement.execute(sql);
236+
}
237+
}
238+
}
239+
240+
protected String getOpenIdentitySql() {
241+
return null;
242+
}
225243
}

connectors/greenplum-connector/src/main/java/io/tapdata/connector/greenplum/GreenplumWriteRecorder.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ protected void upsert(Map<String, Object> after, WriteListResult<TapRecordEvent>
3737
}
3838
try (ResultSet resultSet = queryStatement.executeQuery()) {
3939
if (resultSet.next()) {
40-
justUpdate(after, DbKit.getBeforeForUpdate(after, new HashMap<>(), allColumn, uniqueCondition), listResult);
40+
justUpdate(DbKit.getAfterForUpdate(after, new HashMap<>(), allColumn, uniqueCondition), DbKit.getBeforeForUpdate(after, new HashMap<>(), allColumn, uniqueCondition), listResult);
4141
} else {
4242
justInsert(after);
4343
}

connectors/mysql-connector/src/main/java/io/tapdata/connector/mysql/MysqlConnector.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -971,6 +971,15 @@ protected void createConstraint(TapConnectorContext connectorContext, TapTable t
971971
} catch (Exception e1) {
972972
exception.addException(c, sql, e1);
973973
}
974+
} else if (e instanceof SQLException && (((SQLException) e).getErrorCode() == 1826 || ((SQLException) e).getErrorCode() == 1823)) {
975+
String rename = c.getName() + "_" + UUID.randomUUID().toString().replaceAll("-", "").substring(28);
976+
c.setName(rename);
977+
sql = getCreateConstraintSql(tapTable, c);
978+
try {
979+
jdbcContext.execute(sql);
980+
} catch (Exception e1) {
981+
exception.addException(c, sql, e1);
982+
}
974983
} else {
975984
exception.addException(c, sql, e);
976985
}
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
package io.tapdata.oceanbase;
2+
3+
import com.oceanbase.jdbc.OceanBaseStatement;
4+
import com.zaxxer.hikari.pool.HikariProxyStatement;
5+
import io.tapdata.common.CommonDbConfig;
6+
import io.tapdata.common.ResultSetConsumer;
7+
import io.tapdata.connector.mysql.MysqlJdbcContextV2;
8+
9+
import java.sql.Connection;
10+
import java.sql.ResultSet;
11+
import java.sql.SQLException;
12+
import java.sql.Statement;
13+
14+
public class OceanbaseJdbcContext extends MysqlJdbcContextV2 {
15+
16+
public OceanbaseJdbcContext(CommonDbConfig config) {
17+
super(config);
18+
}
19+
20+
public void queryWithStream(String sql, ResultSetConsumer resultSetConsumer) throws Throwable {
21+
try (
22+
Connection connection = getConnection();
23+
Statement statement = connection.createStatement()
24+
) {
25+
if (statement instanceof HikariProxyStatement) {
26+
OceanBaseStatement statementImpl = statement.unwrap(OceanBaseStatement.class);
27+
if (null != statementImpl) {
28+
statementImpl.setFetchSize(Integer.MIN_VALUE);
29+
}
30+
}
31+
try (
32+
ResultSet resultSet = statement.executeQuery(sql)
33+
) {
34+
if (null != resultSet) {
35+
resultSetConsumer.accept(resultSet);
36+
}
37+
}
38+
} catch (SQLException e) {
39+
throw new Exception("Execute steaming query failed, sql: " + sql + ", code: " + e.getSQLState() + "(" + e.getErrorCode() + "), error: " + e.getMessage(), e);
40+
}
41+
}
42+
}

connectors/oceanbase-mysql-connector/src/main/java/io/tapdata/oceanbase/cdc/OceanbaseReader.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,13 +77,19 @@ public void start(BooleanSupplier isAlive) throws Throwable {
7777
@Override
7878
public void notify(LogMessage message) {
7979
try {
80+
String op = message.getOpt().name();
8081
if (!tableList.contains(message.getTableName())) {
81-
return;
82+
switch (op) {
83+
case "INSERT":
84+
case "UPDATE":
85+
case "DELETE":
86+
return;
87+
}
8288
}
8389
Map<String, Object> after = DataMap.create();
8490
Map<String, Object> before = DataMap.create();
8591
analyzeMessage(message, after, before);
86-
switch (message.getOpt().name()) {
92+
switch (op) {
8793
case "INSERT":
8894
eventList.get().add(new TapInsertRecordEvent().init().table(message.getTableName()).after(after).referenceTime(Long.parseLong(message.getTimestamp()) * 1000));
8995
break;

connectors/oceanbase-mysql-connector/src/main/java/io/tapdata/oceanbase/connector/OceanbaseConnector.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
import io.tapdata.common.SqlExecuteCommandFunction;
55
import io.tapdata.connector.mysql.MysqlConnector;
66
import io.tapdata.connector.mysql.MysqlExceptionCollector;
7-
import io.tapdata.connector.mysql.MysqlJdbcContextV2;
87
import io.tapdata.connector.mysql.ddl.sqlmaker.MysqlDDLSqlGenerator;
98
import io.tapdata.connector.mysql.dml.MysqlRecordWriter;
109
import io.tapdata.entity.codec.TapCodecsRegistry;
@@ -21,6 +20,7 @@
2120
import io.tapdata.kit.DbKit;
2221
import io.tapdata.kit.EmptyKit;
2322
import io.tapdata.kit.StringKit;
23+
import io.tapdata.oceanbase.OceanbaseJdbcContext;
2424
import io.tapdata.oceanbase.OceanbaseTest;
2525
import io.tapdata.oceanbase.bean.OceanbaseConfig;
2626
import io.tapdata.oceanbase.cdc.OceanbaseReader;
@@ -195,7 +195,7 @@ public void onStart(TapConnectionContext tapConnectionContext) throws Throwable
195195

196196
mysqlConfig = new OceanbaseConfig().load(tapConnectionContext.getConnectionConfig());
197197
mysqlConfig.load(tapConnectionContext.getNodeConfig());
198-
mysqlJdbcContext = new MysqlJdbcContextV2(mysqlConfig);
198+
mysqlJdbcContext = new OceanbaseJdbcContext(mysqlConfig);
199199
commonDbConfig = mysqlConfig;
200200

201201
jdbcContext = mysqlJdbcContext;

0 commit comments

Comments
 (0)