columnMap = columns.stream().collect(Collectors.toMap(Column::getName, item -> item));
+ assembler.appendSqlSliceLists(", ", columnNames, (columnName) -> {
+ StringBuilder builder = new StringBuilder();
+ // assemble column name
+ builder.append(((AbstractGeneralDatabaseDialect, ?>) databaseDialect).getQualifiedText(columnName));
+ // assemble column type
+ Column column = columnMap.get(columnName);
+ String typeName = this.databaseDialect.getTypeName(hibernateDialect, column);
+ builder.append(" ").append(typeName);
+
+ builder.append(" ").append(this.databaseDialect.getCharsetOrCollateFormatted(column));
+ if (Optional.ofNullable(table.getPrimaryKey().getColumnNames()).orElse(new ArrayList<>(0)).contains(columnName)) {
+ builder.append(" NOT NULL ");
+ if (column.isAutoIncremented()) {
+ builder.append(this.databaseDialect.getAutoIncrementFormatted(column));
+ }
+ } else {
+ if (column.isNotNull()) {
+ builder.append(" NOT NULL ");
+ }
+ }
+ addColumnDefaultValue(column, builder);
+ builder.append(" ").append(this.databaseDialect.getCommentFormatted(column));
+ // assemble column default value
+ return builder.toString();
+ });
+ // assemble primary key and others key
+ assembler.appendSqlSlice(", PRIMARY KEY(");
+ assembler.appendSqlSliceLists(",", catalogChanges.getTable().getPrimaryKey().getColumnNames(),
+ (columnName) -> ((AbstractGeneralDatabaseDialect, ?>) databaseDialect).getQualifiedText(columnName));
+ assembler.appendSqlSlice(")");
+ assembler.appendSqlSlice(")");
+ assembler.appendSqlSlice(this.databaseDialect.getTableOptionsFormatted(catalogChanges.getTable()));
+ return assembler.build();
+ }
+
+ private void addColumnDefaultValue(Column> column, StringBuilder builder) {
+ if (column.isNotNull() && column.getDefaultValue() == null) {
+ return;
+ }
+ final String defaultValueFormatted = this.databaseDialect.getDefaultValueFormatted(column);
+ if (StringUtils.isNotEmpty(defaultValueFormatted)) {
+ builder.append(" DEFAULT ").append(defaultValueFormatted);
+ }
+ }
+
+ private String assembleDropTableSql(CatalogChanges catalogChanges) {
+ SqlStatementAssembler assembler = new SqlStatementAssembler();
+ assembler.appendSqlSlice("DROP TABLE IF EXISTS ");
+ assembler.appendSqlSlice(
+ ((AbstractGeneralDatabaseDialect, ?>) databaseDialect).getQualifiedTableName(catalogChanges.getTable().getTableId()));
+ return assembler.build();
+ }
+
+ private String assembleAlertTableSql(CatalogChanges catalogChanges) {
+ SqlStatementAssembler assembler = new SqlStatementAssembler();
+ return assembler.build();
+ }
+}
diff --git a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/sink/handle/SinkRecordHandler.java b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/sink/handle/SinkRecordHandler.java
new file mode 100644
index 0000000000..fa6a5b841c
--- /dev/null
+++ b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/sink/handle/SinkRecordHandler.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.jdbc.sink.handle;
+
+import org.apache.eventmesh.connector.jdbc.JdbcConnectData;
+
+/**
+ * This interface represents a schema change handler.
+ */
+public interface SinkRecordHandler {
+
+ /**
+ * Handles a schema change using the specified JDBC connection data.
+ *
+ * @param connectData the JDBC connection data
+ */
+ void handle(JdbcConnectData connectData) throws Exception;
+
+}
diff --git a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/sink/hibernate/DruidConnectionProvider.java b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/sink/hibernate/DruidConnectionProvider.java
new file mode 100644
index 0000000000..e261978b7a
--- /dev/null
+++ b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/sink/hibernate/DruidConnectionProvider.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.jdbc.sink.hibernate;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.Map;
+
+import org.hibernate.HibernateException;
+import org.hibernate.cfg.Environment;
+import org.hibernate.engine.jdbc.connections.spi.ConnectionProvider;
+import org.hibernate.service.spi.Configurable;
+
+import com.alibaba.druid.pool.DruidDataSource;
+import com.alibaba.druid.pool.DruidDataSourceFactory;
+
+public class DruidConnectionProvider implements ConnectionProvider, Configurable {
+
+ private DruidDataSource dataSource = new DruidDataSource();
+
+ /**
+ * Obtains a connection for Hibernate use according to the underlying strategy of this provider.
+ *
+ * @return The obtained JDBC connection
+ * @throws SQLException Indicates a problem opening a connection
+ * @throws HibernateException Indicates a problem otherwise obtaining a connection.
+ */
+ @Override
+ public Connection getConnection() throws SQLException {
+ return dataSource.getConnection();
+ }
+
+ /**
+ * Release a connection from Hibernate use.
+ *
+ * @param conn The JDBC connection to release
+ * @throws SQLException Indicates a problem closing the connection
+ * @throws HibernateException Indicates a problem otherwise releasing a connection.
+ */
+ @Override
+ public void closeConnection(Connection conn) throws SQLException {
+ conn.close();
+ }
+
+ /**
+ *
+ * Does this connection provider support aggressive release of JDBC connections and re-acquisition of those connections (if need be) later?
+ *
+ *
+ * This is used in conjunction with {@link Environment#RELEASE_CONNECTIONS} to aggressively release JDBC connections. However, the configured
+ * ConnectionProvider must support re-acquisition of the same underlying connection for that semantic to work.
+ *
+ * Typically, this is only true in managed environments where a container tracks connections by transaction or thread.
+ *
+ * Note that JTA semantic depends on the fact that the underlying connection provider does support aggressive release.
+ *
+ * @return {@code true} if aggressive releasing is supported; {@code false} otherwise.
+ */
+ @Override
+ public boolean supportsAggressiveRelease() {
+ return false;
+ }
+
+ /**
+ * Configure the service.
+ *
+ * @param configurationValues The configuration properties.
+ */
+ @Override
+ public void configure(Map configurationValues) {
+ try {
+ DruidDataSourceFactory.config(dataSource, configurationValues);
+ } catch (SQLException e) {
+ throw new IllegalArgumentException("Config druid error", e);
+ }
+ }
+
+ /**
+ * Can this wrapped service be unwrapped as the indicated type?
+ *
+ * @param unwrapType The type to check.
+ * @return True/false.
+ */
+ @Override
+ public boolean isUnwrappableAs(Class unwrapType) {
+ return dataSource.isWrapperFor(unwrapType);
+ }
+
+ /**
+ * Unproxy the service proxy
+ *
+ * @param unwrapType The java type as which to unwrap this instance.
+ * @return The unwrapped reference
+ */
+ @Override
+ public T unwrap(Class unwrapType) {
+ return dataSource.unwrap(unwrapType);
+ }
+}
diff --git a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/sink/hibernate/HibernateConfiguration.java b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/sink/hibernate/HibernateConfiguration.java
new file mode 100644
index 0000000000..fd5d49de74
--- /dev/null
+++ b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/sink/hibernate/HibernateConfiguration.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.jdbc.sink.hibernate;
+
+import org.hibernate.SessionFactory;
+import org.hibernate.boot.registry.StandardServiceRegistryBuilder;
+import org.hibernate.cfg.Configuration;
+import org.hibernate.service.ServiceRegistry;
+
+public final class HibernateConfiguration {
+
+ public static HibernateConfigurationBuilder newBuilder() {
+ return new HibernateConfigurationBuilder();
+ }
+
+ public static class HibernateConfigurationBuilder {
+
+ private Configuration configuration;
+
+ public HibernateConfigurationBuilder() {
+ this.configuration = new Configuration();
+ this.configuration.setProperty("hibernate.connection.provider_class", DruidConnectionProvider.class.getName());
+ }
+
+ public HibernateConfigurationBuilder withUser(String username) {
+ configuration.setProperty("username", username);
+ return this;
+ }
+
+ public HibernateConfigurationBuilder withPassword(String password) {
+ configuration.setProperty("password", password);
+ return this;
+ }
+
+ public HibernateConfigurationBuilder withUrl(String url) {
+ configuration.setProperty("url", url);
+ return this;
+ }
+
+ public HibernateConfigurationBuilder withDruidMaxActive(String maxActive) {
+ configuration.setProperty("maxActive", maxActive);
+ return this;
+ }
+
+ public HibernateConfigurationBuilder withShowSql(boolean showSql) {
+ configuration.setProperty("hibernate.show_sql", Boolean.toString(showSql));
+ return this;
+ }
+
+ public SessionFactory build() {
+ ServiceRegistry serviceRegistry = new StandardServiceRegistryBuilder()
+ .applySettings(configuration.getProperties())
+ .build();
+ return configuration.buildSessionFactory(serviceRegistry);
+ }
+
+ }
+}
diff --git a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/sink/mysql/MysqlDialectAssemblyLine.java b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/sink/mysql/MysqlDialectAssemblyLine.java
new file mode 100644
index 0000000000..670a2d0258
--- /dev/null
+++ b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/sink/mysql/MysqlDialectAssemblyLine.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.jdbc.sink.mysql;
+
+import org.apache.eventmesh.connector.jdbc.Field;
+import org.apache.eventmesh.connector.jdbc.Schema;
+import org.apache.eventmesh.connector.jdbc.dialect.DatabaseDialect;
+import org.apache.eventmesh.connector.jdbc.dialect.SqlStatementAssembler;
+import org.apache.eventmesh.connector.jdbc.sink.handle.GeneralDialectAssemblyLine;
+import org.apache.eventmesh.connector.jdbc.source.SourceMateData;
+import org.apache.eventmesh.connector.jdbc.table.catalog.Column;
+import org.apache.eventmesh.connector.jdbc.utils.JdbcStringUtils;
+
+import java.util.Comparator;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import org.hibernate.dialect.Dialect;
+
+public class MysqlDialectAssemblyLine extends GeneralDialectAssemblyLine {
+
+ public MysqlDialectAssemblyLine(DatabaseDialect> databaseDialect, Dialect hibernateDialect) {
+ super(databaseDialect, hibernateDialect);
+ }
+
+ /**
+ * Generates an upsert statement using the given sourceMateData, schema, and originStatement.
+ *
+ * @param sourceMateData The metadata of the data source.
+ * @param schema The schema to upsert into.
+ * @param originStatement The original upsert statement.
+ * @return The upsert statement as a string.
+ */
+ @Override
+ public String getUpsertStatement(SourceMateData sourceMateData, Schema schema, String originStatement) {
+ final SqlStatementAssembler sqlStatementAssembler = new SqlStatementAssembler();
+ sqlStatementAssembler.appendSqlSlice(getInsertStatement(sourceMateData, schema, originStatement));
+ Field afterField = schema.getFields().get(0);
+ List> columns = afterField.getFields().stream().map(item -> item.getColumn()).sorted(Comparator.comparingInt(Column::getOrder))
+ .collect(Collectors.toList());
+ if (JdbcStringUtils.compareVersion(getDatabaseDialect().getJdbcDriverMetaData().getDatabaseProductVersion(), "8.0.20") >= 0) {
+ // mysql doc:https://dev.mysql.com/doc/refman/8.0/en/insert-on-duplicate.html
+ // Beginning with MySQL 8.0.20, an INSERT ... SELECT ... ON DUPLICATE KEY UPDATE statement that uses VALUES() in the UPDATE clause
+ sqlStatementAssembler.appendSqlSlice("AS new ON DUPLICATE KEY UPDATE ");
+ sqlStatementAssembler.appendSqlSliceOfColumns(",", columns, column -> {
+ final String columnName = column.getName();
+ return columnName + "=new." + columnName;
+ });
+
+ } else {
+ sqlStatementAssembler.appendSqlSlice(" ON DUPLICATE KEY UPDATE ");
+ sqlStatementAssembler.appendSqlSliceOfColumns(",", columns, column -> {
+ final String columnName = column.getName();
+ return columnName + "=VALUES(" + columnName + ")";
+ });
+
+ }
+ return sqlStatementAssembler.build();
+ }
+}
diff --git a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/source/AbstractEngine.java b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/source/AbstractEngine.java
index 2d1080e05b..08c3823725 100644
--- a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/source/AbstractEngine.java
+++ b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/source/AbstractEngine.java
@@ -18,7 +18,7 @@
package org.apache.eventmesh.connector.jdbc.source;
import org.apache.eventmesh.common.ThreadWrapper;
-import org.apache.eventmesh.connector.jdbc.DatabaseDialect;
+import org.apache.eventmesh.connector.jdbc.dialect.DatabaseDialect;
import org.apache.eventmesh.connector.jdbc.source.config.JdbcSourceConfig;
import org.apache.eventmesh.connector.jdbc.source.config.SourceConnectorConfig;
import org.apache.eventmesh.connector.jdbc.table.catalog.TableId;
diff --git a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/source/JdbcAllFactoryLoader.java b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/source/JdbcAllFactoryLoader.java
index f3c09adb53..2529d9a5a8 100644
--- a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/source/JdbcAllFactoryLoader.java
+++ b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/source/JdbcAllFactoryLoader.java
@@ -19,7 +19,7 @@
import static com.google.common.base.Preconditions.checkNotNull;
-import org.apache.eventmesh.connector.jdbc.source.dialect.DatabaseDialectFactory;
+import org.apache.eventmesh.connector.jdbc.dialect.DatabaseDialectFactory;
import org.apache.eventmesh.connector.jdbc.source.dialect.cdc.CdcEngineFactory;
import org.apache.eventmesh.connector.jdbc.source.dialect.snapshot.SnapshotEngineFactory;
import org.apache.eventmesh.spi.EventMeshExtensionFactory;
diff --git a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/source/JdbcSourceConnector.java b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/source/JdbcSourceConnector.java
index 202d7b9ca4..8d7d9cb66d 100644
--- a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/source/JdbcSourceConnector.java
+++ b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/source/JdbcSourceConnector.java
@@ -17,10 +17,10 @@
package org.apache.eventmesh.connector.jdbc.source;
-import org.apache.eventmesh.connector.jdbc.DatabaseDialect;
+import org.apache.eventmesh.connector.jdbc.dialect.DatabaseDialect;
+import org.apache.eventmesh.connector.jdbc.dialect.DatabaseDialectFactory;
import org.apache.eventmesh.connector.jdbc.event.Event;
import org.apache.eventmesh.connector.jdbc.source.config.JdbcSourceConfig;
-import org.apache.eventmesh.connector.jdbc.source.dialect.DatabaseDialectFactory;
import org.apache.eventmesh.connector.jdbc.source.dialect.cdc.CdcEngine;
import org.apache.eventmesh.connector.jdbc.source.dialect.cdc.CdcEngineFactory;
import org.apache.eventmesh.connector.jdbc.source.dialect.snapshot.SnapshotEngine;
@@ -111,7 +111,7 @@ private void doInit() {
// Get the database dialect factory and create the database dialect.
final DatabaseDialectFactory databaseDialectFactory = JdbcAllFactoryLoader.getDatabaseDialectFactory(databaseType);
- this.databaseDialect = databaseDialectFactory.createDatabaseDialect(sourceConfig);
+ this.databaseDialect = databaseDialectFactory.createDatabaseDialect(this.sourceConfig.getSourceConnectorConfig().getJdbcConfig());
this.databaseDialect.init();
// Get the snapshot engine factory and create the snapshot engine
diff --git a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/source/SourceMateData.java b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/source/SourceMateData.java
index e1f6f28fd7..bf12a36d97 100644
--- a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/source/SourceMateData.java
+++ b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/source/SourceMateData.java
@@ -17,12 +17,20 @@
package org.apache.eventmesh.connector.jdbc.source;
+import org.apache.eventmesh.connector.jdbc.source.dialect.mysql.MysqlSourceMateData;
+import org.apache.eventmesh.connector.jdbc.table.catalog.TableId;
+
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+
import lombok.Data;
/**
* Represents metadata related to a data source.
*/
@Data
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "connector")
+@JsonSubTypes({@JsonSubTypes.Type(value = MysqlSourceMateData.class, name = "mysql")})
public class SourceMateData {
/**
@@ -60,4 +68,13 @@ public class SourceMateData {
*/
private String tableName;
+ /**
+ * This method returns the TableId object with the specified catalog name, schema name, and table name.
+ *
+ * @return the TableId object
+ */
+ public TableId ofTableId() {
+ return new TableId(catalogName, schemaName, tableName);
+ }
+
}
diff --git a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/source/TaskManagerCoordinator.java b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/source/TaskManagerCoordinator.java
index a1572787d7..c299fbc531 100644
--- a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/source/TaskManagerCoordinator.java
+++ b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/source/TaskManagerCoordinator.java
@@ -17,6 +17,7 @@
package org.apache.eventmesh.connector.jdbc.source;
+import org.apache.eventmesh.common.utils.JsonUtils;
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
import org.apache.commons.collections4.CollectionUtils;
@@ -102,6 +103,9 @@ public List poll() {
if (Objects.isNull(record)) {
break;
}
+ if (log.isDebugEnabled()) {
+ log.debug("record:{}", JsonUtils.toJSONString(record));
+ }
records.add(record);
} catch (InterruptedException e) {
break;
diff --git a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/source/dialect/AbstractGeneralDatabaseDialect.java b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/source/dialect/AbstractGeneralDatabaseDialect.java
deleted file mode 100644
index ae497fce5c..0000000000
--- a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/source/dialect/AbstractGeneralDatabaseDialect.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eventmesh.connector.jdbc.source.dialect;
-
-import org.apache.eventmesh.connector.jdbc.DatabaseDialect;
-import org.apache.eventmesh.connector.jdbc.connection.JdbcConnection;
-import org.apache.eventmesh.connector.jdbc.exception.JdbcConnectionException;
-import org.apache.eventmesh.connector.jdbc.source.config.SourceConnectorConfig;
-
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.SQLException;
-
-public abstract class AbstractGeneralDatabaseDialect implements DatabaseDialect {
-
- private static final int DEFAULT_BATCH_MAX_ROWS = 20;
-
- private SourceConnectorConfig config;
-
- private int batchMaxRows = DEFAULT_BATCH_MAX_ROWS;
-
- public AbstractGeneralDatabaseDialect(SourceConnectorConfig config) {
- this.config = config;
- }
-
- @Override
- public boolean isValid(Connection connection, int timeout) throws JdbcConnectionException, SQLException {
- return connection == null ? false : connection.isValid(timeout);
- }
-
- @Override
- public PreparedStatement createPreparedStatement(Connection connection, String sql) throws SQLException {
- PreparedStatement preparedStatement = connection.prepareStatement(sql);
- if (batchMaxRows > 0) {
- preparedStatement.setFetchSize(batchMaxRows);
- }
- return preparedStatement;
- }
-
-}
diff --git a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/source/dialect/antlr4/mysql/MysqlAntlr4DdlParser.java b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/source/dialect/antlr4/mysql/MysqlAntlr4DdlParser.java
index 1cf69bde2f..9bb110815f 100644
--- a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/source/dialect/antlr4/mysql/MysqlAntlr4DdlParser.java
+++ b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/source/dialect/antlr4/mysql/MysqlAntlr4DdlParser.java
@@ -20,12 +20,15 @@
import org.apache.eventmesh.connector.jdbc.antlr4.Antlr4DdlParser;
import org.apache.eventmesh.connector.jdbc.antlr4.autogeneration.MySqlLexer;
import org.apache.eventmesh.connector.jdbc.antlr4.autogeneration.MySqlParser;
+import org.apache.eventmesh.connector.jdbc.antlr4.autogeneration.MySqlParser.CharsetNameContext;
+import org.apache.eventmesh.connector.jdbc.antlr4.autogeneration.MySqlParser.CollationNameContext;
import org.apache.eventmesh.connector.jdbc.antlr4.listener.Antlr4DdlParserListener;
import org.apache.eventmesh.connector.jdbc.ddl.DdlParserCallback;
import org.apache.eventmesh.connector.jdbc.event.Event;
import org.apache.eventmesh.connector.jdbc.source.config.JdbcSourceConfig;
import org.apache.eventmesh.connector.jdbc.source.dialect.antlr4.mysql.listener.MySqlAntlr4DdlParserListener;
import org.apache.eventmesh.connector.jdbc.table.catalog.TableId;
+import org.apache.eventmesh.connector.jdbc.utils.JdbcStringUtils;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
@@ -55,6 +58,10 @@ public MysqlAntlr4DdlParser(boolean skipViews, boolean skipComments, JdbcSourceC
this(skipViews, skipComments, new HashSet<>(), sourceConfig);
}
+ public MysqlAntlr4DdlParser(boolean skipViews, boolean skipComments) {
+ this(skipViews, skipComments, new HashSet<>(), null);
+ }
+
@Override
protected MySqlLexer buildLexerInstance(CharStream charStreams) {
return new MySqlLexer(charStreams);
@@ -149,4 +156,25 @@ public void addTableIdSet(Set tableIdSet) {
public JdbcSourceConfig getSourceConfig() {
return sourceConfig;
}
+
+ public String parseCharset(CharsetNameContext charsetNameContext) {
+ String charsetName = null;
+ if (charsetNameContext != null && charsetNameContext.getText() != null) {
+ charsetName = JdbcStringUtils.withoutWrapper(charsetNameContext.getText());
+ }
+ return charsetName;
+ }
+
+ public String parseCollation(CollationNameContext collationNameContext) {
+ String collationName = null;
+ if (collationNameContext != null && collationNameContext.getText() != null) {
+ collationName = JdbcStringUtils.withoutWrapper(collationNameContext.getText()).toLowerCase();
+ /*
+ * for (int index = 0; index < CharsetMapping.MAP_SIZE; index++) { if
+ * (collationName.equals(CharsetMapping.getStaticCollationNameForCollationIndex(index))) { collationName =
+ * CharsetMapping.getStaticMysqlCharsetNameForCollationIndex(index); break; } }
+ */
+ }
+ return collationName;
+ }
}
diff --git a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/source/dialect/antlr4/mysql/listener/ColumnDefinitionParserListener.java b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/source/dialect/antlr4/mysql/listener/ColumnDefinitionParserListener.java
index 9be9b42ddd..23e3728d47 100644
--- a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/source/dialect/antlr4/mysql/listener/ColumnDefinitionParserListener.java
+++ b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/source/dialect/antlr4/mysql/listener/ColumnDefinitionParserListener.java
@@ -19,29 +19,46 @@
import org.apache.eventmesh.connector.jdbc.antlr4.autogeneration.MySqlParser.AutoIncrementColumnConstraintContext;
import org.apache.eventmesh.connector.jdbc.antlr4.autogeneration.MySqlParser.CollateColumnConstraintContext;
+import org.apache.eventmesh.connector.jdbc.antlr4.autogeneration.MySqlParser.CollectionDataTypeContext;
import org.apache.eventmesh.connector.jdbc.antlr4.autogeneration.MySqlParser.ColumnDefinitionContext;
import org.apache.eventmesh.connector.jdbc.antlr4.autogeneration.MySqlParser.CommentColumnConstraintContext;
+import org.apache.eventmesh.connector.jdbc.antlr4.autogeneration.MySqlParser.DataTypeContext;
+import org.apache.eventmesh.connector.jdbc.antlr4.autogeneration.MySqlParser.DecimalLiteralContext;
+import org.apache.eventmesh.connector.jdbc.antlr4.autogeneration.MySqlParser.DimensionDataTypeContext;
+import org.apache.eventmesh.connector.jdbc.antlr4.autogeneration.MySqlParser.LongVarcharDataTypeContext;
+import org.apache.eventmesh.connector.jdbc.antlr4.autogeneration.MySqlParser.NationalStringDataTypeContext;
+import org.apache.eventmesh.connector.jdbc.antlr4.autogeneration.MySqlParser.NationalVaryingStringDataTypeContext;
import org.apache.eventmesh.connector.jdbc.antlr4.autogeneration.MySqlParser.NullNotnullContext;
import org.apache.eventmesh.connector.jdbc.antlr4.autogeneration.MySqlParser.PrimaryKeyColumnConstraintContext;
+import org.apache.eventmesh.connector.jdbc.antlr4.autogeneration.MySqlParser.SimpleDataTypeContext;
+import org.apache.eventmesh.connector.jdbc.antlr4.autogeneration.MySqlParser.SpatialDataTypeContext;
+import org.apache.eventmesh.connector.jdbc.antlr4.autogeneration.MySqlParser.StringDataTypeContext;
import org.apache.eventmesh.connector.jdbc.antlr4.autogeneration.MySqlParser.UniqueKeyColumnConstraintContext;
import org.apache.eventmesh.connector.jdbc.antlr4.autogeneration.MySqlParserBaseListener;
import org.apache.eventmesh.connector.jdbc.source.dialect.antlr4.mysql.MysqlAntlr4DdlParser;
import org.apache.eventmesh.connector.jdbc.source.dialect.mysql.MysqlDataTypeConvertor;
import org.apache.eventmesh.connector.jdbc.table.catalog.TableEditor;
import org.apache.eventmesh.connector.jdbc.table.catalog.mysql.MysqlColumnEditor;
+import org.apache.eventmesh.connector.jdbc.table.catalog.mysql.MysqlOptions.MysqlColumnOptions;
import org.apache.eventmesh.connector.jdbc.table.type.EventMeshDataType;
import org.apache.eventmesh.connector.jdbc.utils.JdbcStringUtils;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
import org.antlr.v4.runtime.tree.ParseTreeListener;
import lombok.Getter;
import lombok.Setter;
+import lombok.extern.slf4j.Slf4j;
@Getter
@Setter
+@Slf4j
public class ColumnDefinitionParserListener extends MySqlParserBaseListener {
private DefaultValueParserListener defaultValueParserListener;
@@ -72,11 +89,108 @@ public ColumnDefinitionParserListener(List listeners, TableEd
public void enterColumnDefinition(ColumnDefinitionContext ctx) {
// parse Column data type
this.parser.runIfAllNotNull(() -> {
- String dataTypeString = ctx.dataType().getText();
- EventMeshDataType> eventMeshType = this.dataTypeConvertor.toEventMeshType(dataTypeString);
- this.columnEditor.withEventMeshType(eventMeshType);
- this.columnEditor.withJdbcType(this.dataTypeConvertor.toJDBCType(dataTypeString));
- this.columnEditor.withType(dataTypeString);
+ DataTypeContext dataTypeContext = ctx.dataType();
+ String dataTypeString = null;
+ if (dataTypeContext instanceof StringDataTypeContext) {
+ StringDataTypeContext stringDataTypeCtx = (StringDataTypeContext) dataTypeContext;
+ dataTypeString = stringDataTypeCtx.typeName.getText();
+ // parse data type length
+ if (stringDataTypeCtx.lengthOneDimension() != null) {
+ this.columnEditor.length(Integer.parseInt(stringDataTypeCtx.lengthOneDimension().decimalLiteral().getText()));
+ }
+ // parse data type charset and collation
+ String charsetName = parser.parseCharset(stringDataTypeCtx.charsetName());
+ String collationName = parser.parseCollation(stringDataTypeCtx.collationName());
+ columnEditor.charsetName(charsetName);
+ columnEditor.collation(collationName);
+ } else if (dataTypeContext instanceof NationalStringDataTypeContext) {
+ NationalStringDataTypeContext nationalStringDataTypeCtx = (NationalStringDataTypeContext) dataTypeContext;
+ dataTypeString = nationalStringDataTypeCtx.typeName.getText();
+ if (nationalStringDataTypeCtx.lengthOneDimension() != null) {
+ this.columnEditor.length(Integer.parseInt(nationalStringDataTypeCtx.lengthOneDimension().decimalLiteral().getText()));
+ }
+ } else if (dataTypeContext instanceof NationalVaryingStringDataTypeContext) {
+ NationalVaryingStringDataTypeContext nationalVaryingStringDataTypeCtx = (NationalVaryingStringDataTypeContext) dataTypeContext;
+ dataTypeString = nationalVaryingStringDataTypeCtx.typeName.getText();
+ if (nationalVaryingStringDataTypeCtx.lengthOneDimension() != null) {
+ this.columnEditor.length(Integer.parseInt(nationalVaryingStringDataTypeCtx.lengthOneDimension().decimalLiteral().getText()));
+ }
+ } else if (dataTypeContext instanceof DimensionDataTypeContext) {
+ DimensionDataTypeContext dimensionDataTypeCtx = (DimensionDataTypeContext) dataTypeContext;
+ dataTypeString = dimensionDataTypeCtx.typeName.getText();
+ // parse column length
+ if (dimensionDataTypeCtx.lengthOneDimension() != null) {
+ this.columnEditor.length(Integer.parseInt(dimensionDataTypeCtx.lengthOneDimension().decimalLiteral().getText()));
+ }
+ // parse column scale if has scale
+ if (dimensionDataTypeCtx.lengthTwoDimension() != null) {
+ List decimalLiteralContexts = dimensionDataTypeCtx.lengthTwoDimension().decimalLiteral();
+ this.columnEditor.length(Integer.parseInt(decimalLiteralContexts.get(0).getText()));
+ this.columnEditor.scale(Integer.parseInt(decimalLiteralContexts.get(1).getText()));
+ }
+
+ if (dimensionDataTypeCtx.lengthTwoOptionalDimension() != null) {
+ List decimalLiteralContexts = dimensionDataTypeCtx.lengthTwoOptionalDimension().decimalLiteral();
+ if (decimalLiteralContexts.get(0).REAL_LITERAL() != null) {
+ String[] digits = decimalLiteralContexts.get(0).getText().split(".");
+ if (StringUtils.isBlank(digits[0]) || Integer.valueOf(digits[0]) == 0) {
+ this.columnEditor.length(10);
+ } else {
+ this.columnEditor.length(Integer.valueOf(digits[0]));
+ }
+ } else {
+ this.columnEditor.length(Integer.parseInt(decimalLiteralContexts.get(0).getText()));
+ }
+ if (decimalLiteralContexts.size() > 1) {
+ this.columnEditor.scale(Integer.parseInt(decimalLiteralContexts.get(1).getText()));
+ }
+ }
+ if (CollectionUtils.isNotEmpty(dimensionDataTypeCtx.SIGNED())) {
+ this.columnEditor.withOption(MysqlColumnOptions.SIGNED, dimensionDataTypeCtx.SIGNED().get(0).getText());
+ }
+ if (CollectionUtils.isNotEmpty(dimensionDataTypeCtx.UNSIGNED())) {
+ this.columnEditor.withOption(MysqlColumnOptions.UNSIGNED, dimensionDataTypeCtx.UNSIGNED().get(0).getText());
+ }
+ if (CollectionUtils.isNotEmpty(dimensionDataTypeCtx.ZEROFILL())) {
+ this.columnEditor.withOption(MysqlColumnOptions.ZEROFILL, dimensionDataTypeCtx.ZEROFILL().get(0).getText());
+ }
+ } else if (dataTypeContext instanceof SimpleDataTypeContext) {
+ // Do nothing for example: DATE, TINYBLOB, etc.
+ SimpleDataTypeContext simpleDataTypeCtx = (SimpleDataTypeContext) dataTypeContext;
+ dataTypeString = simpleDataTypeCtx.typeName.getText();
+ } else if (dataTypeContext instanceof CollectionDataTypeContext) {
+ CollectionDataTypeContext collectionDataTypeContext = (CollectionDataTypeContext) dataTypeContext;
+ dataTypeString = collectionDataTypeContext.typeName.getText();
+ if (collectionDataTypeContext.charsetName() != null) {
+ String charsetName = collectionDataTypeContext.charsetName().getText();
+ columnEditor.charsetName(charsetName);
+ }
+ } else if (dataTypeContext instanceof SpatialDataTypeContext) {
+ // do nothing
+ SpatialDataTypeContext spatialDataTypeCtx = (SpatialDataTypeContext) dataTypeContext;
+ dataTypeString = spatialDataTypeCtx.typeName.getText();
+ } else if (dataTypeContext instanceof LongVarcharDataTypeContext) {
+ LongVarcharDataTypeContext longVarcharDataTypeCtx = (LongVarcharDataTypeContext) dataTypeContext;
+ dataTypeString = longVarcharDataTypeCtx.typeName.getText();
+ String charsetName = parser.parseCharset(longVarcharDataTypeCtx.charsetName());
+ String collationName = parser.parseCollation(longVarcharDataTypeCtx.collationName());
+ columnEditor.charsetName(charsetName);
+ columnEditor.collation(collationName);
+ }
+ // handle enum and set type values
+ if (StringUtils.equalsAnyIgnoreCase(dataTypeString, "ENUM", "SET")) {
+ CollectionDataTypeContext collectionDataTypeContext = (CollectionDataTypeContext) dataTypeContext;
+ List values = collectionDataTypeContext.collectionOptions().STRING_LITERAL().stream()
+ .map(node -> JdbcStringUtils.withoutWrapper(node.getText())).collect(Collectors.toList());
+ columnEditor.enumValues(values);
+ }
+
+ if (StringUtils.isNotBlank(dataTypeString)) {
+ EventMeshDataType eventMeshType = this.dataTypeConvertor.toEventMeshType(dataTypeString);
+ this.columnEditor.withEventMeshType(eventMeshType);
+ this.columnEditor.withJdbcType(this.dataTypeConvertor.toJDBCType(dataTypeString));
+ this.columnEditor.withType(dataTypeString);
+ }
}, columnEditor);
this.parser.runIfAllNotNull(() -> {
@@ -140,7 +254,7 @@ public void exitColumnDefinition(ColumnDefinitionContext ctx) {
@Override
public void enterCollateColumnConstraint(CollateColumnConstraintContext ctx) {
if (ctx.COLLATE() != null) {
- columnEditor.collate(ctx.collationName().getText());
+ columnEditor.collation(ctx.collationName().getText());
}
super.enterCollateColumnConstraint(ctx);
}
diff --git a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/source/dialect/antlr4/mysql/listener/CreateTableParserListener.java b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/source/dialect/antlr4/mysql/listener/CreateTableParserListener.java
index 5a72c4449b..c24f885d75 100644
--- a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/source/dialect/antlr4/mysql/listener/CreateTableParserListener.java
+++ b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/source/dialect/antlr4/mysql/listener/CreateTableParserListener.java
@@ -21,7 +21,9 @@
import org.apache.eventmesh.connector.jdbc.Payload;
import org.apache.eventmesh.connector.jdbc.antlr4.autogeneration.MySqlParser.ColumnCreateTableContext;
import org.apache.eventmesh.connector.jdbc.antlr4.autogeneration.MySqlParser.CopyCreateTableContext;
+import org.apache.eventmesh.connector.jdbc.antlr4.autogeneration.MySqlParser.DecimalLiteralContext;
import org.apache.eventmesh.connector.jdbc.antlr4.autogeneration.MySqlParser.QueryCreateTableContext;
+import org.apache.eventmesh.connector.jdbc.antlr4.autogeneration.MySqlParser.TableOptionAutoIncrementContext;
import org.apache.eventmesh.connector.jdbc.antlr4.autogeneration.MySqlParser.TableOptionCharsetContext;
import org.apache.eventmesh.connector.jdbc.antlr4.autogeneration.MySqlParser.TableOptionCollateContext;
import org.apache.eventmesh.connector.jdbc.antlr4.autogeneration.MySqlParser.TableOptionEngineContext;
@@ -32,9 +34,10 @@
import org.apache.eventmesh.connector.jdbc.source.dialect.mysql.MysqlSourceMateData;
import org.apache.eventmesh.connector.jdbc.table.catalog.Table;
import org.apache.eventmesh.connector.jdbc.table.catalog.TableId;
+import org.apache.eventmesh.connector.jdbc.table.catalog.mysql.MysqlOptions.MysqlTableOptions;
import org.apache.eventmesh.connector.jdbc.table.catalog.mysql.MysqlTableEditor;
-import org.apache.eventmesh.connector.jdbc.table.catalog.mysql.MysqlTableOptions;
import org.apache.eventmesh.connector.jdbc.table.catalog.mysql.MysqlTableSchema;
+import org.apache.eventmesh.connector.jdbc.utils.Antlr4Utils;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
@@ -98,7 +101,7 @@ public void enterColumnCreateTable(ColumnCreateTableContext ctx) {
@Override
public void exitColumnCreateTable(ColumnCreateTableContext ctx) {
- String ddl = ctx.getText();
+ String ddl = Antlr4Utils.getText(ctx);
parser.runIfAllNotNull(() -> {
listeners.remove(columnDefinitionListener);
// help JVM GC
@@ -114,7 +117,12 @@ public void exitColumnCreateTable(ColumnCreateTableContext ctx) {
.catalogName(currentDatabase)
.serverId(sourceConnectorConfig.getMysqlConfig().getServerId())
.build();
- Table table = new Table(tableSchema.getSimpleName(), tableSchema.getPrimaryKey(), tableSchema.getUniqueKeys(), tableSchema.getComment());
+ Table table = Table.newBuilder().withTableId(tableSchema.getTableId())
+ .withPrimaryKey(tableSchema.getPrimaryKey())
+ .withUniqueKeys(tableSchema.getUniqueKeys())
+ .withComment(tableSchema.getComment())
+ .withOptions(tableSchema.getTableOptions())
+ .build();
CatalogChanges changes = CatalogChanges.newBuilder().operationType(SchemaChangeEventType.TABLE_CREATE).table(table)
.columns(tableSchema.getColumns()).build();
payload.withSource(sourceMateData).withDdl(ddl).withCatalogChanges(changes);
@@ -136,7 +144,7 @@ private MysqlTableEditor createTableEditor(String tableName) {
@Override
public void enterTableOptionEngine(TableOptionEngineContext ctx) {
if (ctx.ENGINE() != null) {
- this.tableEditor.withOption(MysqlTableOptions.ENGINE, ctx.ENGINE().getText());
+ this.tableEditor.withOption(MysqlTableOptions.ENGINE, ctx.engineName().getText());
}
super.enterTableOptionEngine(ctx);
}
@@ -155,6 +163,16 @@ public void enterTableOptionCharset(TableOptionCharsetContext ctx) {
super.enterTableOptionCharset(ctx);
}
+ @Override
+ public void enterTableOptionAutoIncrement(TableOptionAutoIncrementContext ctx) {
+ DecimalLiteralContext decimalLiteralContext = ctx.decimalLiteral();
+ if (null != decimalLiteralContext) {
+ String autoIncrementNumber = Antlr4Utils.getText(decimalLiteralContext);
+ this.tableEditor.withOption(MysqlTableOptions.AUTO_INCREMENT, autoIncrementNumber);
+ }
+ super.enterTableOptionAutoIncrement(ctx);
+ }
+
@Override
public void enterTableOptionCollate(TableOptionCollateContext ctx) {
if (ctx.COLLATE() != null) {
diff --git a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/source/dialect/antlr4/mysql/listener/DefaultValueParserListener.java b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/source/dialect/antlr4/mysql/listener/DefaultValueParserListener.java
index aad1dc65ad..51bb5fe579 100644
--- a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/source/dialect/antlr4/mysql/listener/DefaultValueParserListener.java
+++ b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/source/dialect/antlr4/mysql/listener/DefaultValueParserListener.java
@@ -83,7 +83,7 @@ public void enterDefaultValue(DefaultValueContext ctx) {
if (stringLiteralContext.COLLATE() == null) {
columnEditor.defaultValueExpression(sign + unquote(stringLiteralContext.getText()));
} else {
- columnEditor.collate(sign + unquote(stringLiteralContext.STRING_LITERAL(0).getText()));
+ columnEditor.collation(sign + unquote(stringLiteralContext.STRING_LITERAL(0).getText()));
}
} else if (ctx.constant().decimalLiteral() != null) {
columnEditor.defaultValueExpression(sign + ctx.constant().decimalLiteral().getText());
@@ -117,14 +117,14 @@ public void enterDefaultValue(DefaultValueContext ctx) {
* ;
*/
List currentTimestampContexts = ctx.currentTimestamp();
- if (currentTimestampContexts.size() > 1 || (ctx.ON() == null && ctx.UPDATE() == null)) {
+ if (currentTimestampContexts.size() > 1 && (ctx.ON() != null && ctx.UPDATE() != null)) {
+ StringBuilder builder = new StringBuilder();
+ builder.append(currentTimestampContexts.get(0).getText()).append(" ").append(ctx.ON().getText()).append(" ").append(ctx.UPDATE())
+ .append(" ").append(currentTimestampContexts.get(1).getText());
+ columnEditor.defaultValueExpression(builder.toString());
+ } else if (currentTimestampContexts.size() == 1) {
CurrentTimestampContext currentTimestampContext = currentTimestampContexts.get(0);
- //
- if (currentTimestampContext.CURRENT_TIMESTAMP() != null || currentTimestampContext.NOW() != null) {
- columnEditor.defaultValueExpression("1970-01-01 00:00:00");
- } else {
- columnEditor.defaultValueExpression(currentTimestampContext.getText());
- }
+ columnEditor.defaultValueExpression(currentTimestampContext.getText());
}
} else if (ctx.expression() != null) {
// e.g. CREATE TABLE t2 (b BLOB DEFAULT ('abc'));
diff --git a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/source/dialect/cdc/AbstractCdcEngine.java b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/source/dialect/cdc/AbstractCdcEngine.java
index fc9d2f3724..1fb1a95579 100644
--- a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/source/dialect/cdc/AbstractCdcEngine.java
+++ b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/source/dialect/cdc/AbstractCdcEngine.java
@@ -18,9 +18,9 @@
package org.apache.eventmesh.connector.jdbc.source.dialect.cdc;
import org.apache.eventmesh.common.ThreadWrapper;
-import org.apache.eventmesh.connector.jdbc.DatabaseDialect;
import org.apache.eventmesh.connector.jdbc.JdbcContext;
import org.apache.eventmesh.connector.jdbc.ddl.DdlParser;
+import org.apache.eventmesh.connector.jdbc.dialect.DatabaseDialect;
import org.apache.eventmesh.connector.jdbc.source.config.JdbcSourceConfig;
import org.apache.eventmesh.connector.jdbc.source.config.SourceConnectorConfig;
import org.apache.eventmesh.connector.jdbc.table.catalog.TableId;
diff --git a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/source/dialect/cdc/CdcEngineFactory.java b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/source/dialect/cdc/CdcEngineFactory.java
index 33e5f555b6..e08e1c8216 100644
--- a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/source/dialect/cdc/CdcEngineFactory.java
+++ b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/source/dialect/cdc/CdcEngineFactory.java
@@ -17,7 +17,7 @@
package org.apache.eventmesh.connector.jdbc.source.dialect.cdc;
-import org.apache.eventmesh.connector.jdbc.DatabaseDialect;
+import org.apache.eventmesh.connector.jdbc.dialect.DatabaseDialect;
import org.apache.eventmesh.openconnect.api.config.SourceConfig;
import org.apache.eventmesh.spi.EventMeshExtensionType;
import org.apache.eventmesh.spi.EventMeshSPI;
diff --git a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/source/dialect/cdc/mysql/MysqlCdcEngine.java b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/source/dialect/cdc/mysql/MysqlCdcEngine.java
index 67c837caf7..147509e12d 100644
--- a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/source/dialect/cdc/mysql/MysqlCdcEngine.java
+++ b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/source/dialect/cdc/mysql/MysqlCdcEngine.java
@@ -18,6 +18,7 @@
package org.apache.eventmesh.connector.jdbc.source.dialect.cdc.mysql;
import org.apache.eventmesh.common.EventMeshThreadFactory;
+import org.apache.eventmesh.connector.jdbc.CatalogChanges;
import org.apache.eventmesh.connector.jdbc.DataChanges;
import org.apache.eventmesh.connector.jdbc.DataChanges.Builder;
import org.apache.eventmesh.connector.jdbc.Field;
@@ -25,10 +26,12 @@
import org.apache.eventmesh.connector.jdbc.Schema;
import org.apache.eventmesh.connector.jdbc.config.JdbcConfig;
import org.apache.eventmesh.connector.jdbc.connection.mysql.MysqlJdbcConnection;
+import org.apache.eventmesh.connector.jdbc.dialect.mysql.MysqlDatabaseDialect;
import org.apache.eventmesh.connector.jdbc.event.DeleteDataEvent;
import org.apache.eventmesh.connector.jdbc.event.EventConsumer;
import org.apache.eventmesh.connector.jdbc.event.GeneralDataChangeEvent;
import org.apache.eventmesh.connector.jdbc.event.InsertDataEvent;
+import org.apache.eventmesh.connector.jdbc.event.SchemaChangeEventType;
import org.apache.eventmesh.connector.jdbc.event.UpdateDataEvent;
import org.apache.eventmesh.connector.jdbc.source.config.JdbcSourceConfig;
import org.apache.eventmesh.connector.jdbc.source.config.MysqlConfig;
@@ -40,12 +43,13 @@
import org.apache.eventmesh.connector.jdbc.source.dialect.mysql.EventDataDeserializationExceptionData;
import org.apache.eventmesh.connector.jdbc.source.dialect.mysql.EventMeshGtidSet;
import org.apache.eventmesh.connector.jdbc.source.dialect.mysql.MysqlConstants;
-import org.apache.eventmesh.connector.jdbc.source.dialect.mysql.MysqlDatabaseDialect;
import org.apache.eventmesh.connector.jdbc.source.dialect.mysql.MysqlJdbcContext;
import org.apache.eventmesh.connector.jdbc.source.dialect.mysql.MysqlSourceMateData;
import org.apache.eventmesh.connector.jdbc.table.catalog.Column;
+import org.apache.eventmesh.connector.jdbc.table.catalog.DefaultValueConvertor;
import org.apache.eventmesh.connector.jdbc.table.catalog.TableId;
import org.apache.eventmesh.connector.jdbc.table.catalog.TableSchema;
+import org.apache.eventmesh.connector.jdbc.table.catalog.mysql.MysqlDefaultValueConvertorImpl;
import org.apache.eventmesh.connector.jdbc.table.type.Pair;
import org.apache.eventmesh.openconnect.api.config.Config;
@@ -113,6 +117,8 @@ public class MysqlCdcEngine extends AbstractCdcEngine {
+ column.setDefaultValue(defaultValueConvertor.parseDefaultValue(column, column.getDefaultValueExpression()));
+ });
+ }
+ }
event.getJdbcConnectData().getPayload().ofSourceMateData().setSnapshot(false);
consumers.stream().forEach(consumer -> consumer.accept(event));
}
@@ -583,7 +601,7 @@ private MysqlSourceMateData buildMysqlSourceMateData(MysqlJdbcContext context, E
return sourceMateData;
}
- private enum CdcDmlType {
+ public enum CdcDmlType {
INSERT,
UPDATE,
DELETE
@@ -606,20 +624,25 @@ private void handleCdcDmlData(MysqlJdbcContext context, MysqlSourceMateData sour
List, Pair>> rows, CdcDmlType type) {
TableSchema tableSchema = context.getCatalogTableSet().getTableSchema(tableId);
- Map orderColumnMap = tableSchema.getOrderColumnMap();
- List extends Column> columns = tableSchema.getColumns();
+ Map> orderColumnMap = tableSchema.getOrderColumnMap();
+ List extends Column>> columns = tableSchema.getColumns();
List fields = null;
Builder builder = DataChanges.newBuilder();
if (CollectionUtils.isNotEmpty(columns)) {
fields = columns.stream()
- .map(col -> new Field(col.getDataType().getName(), col.isNotNull(), col.getName(), tableId.toString()))
- .collect(Collectors.toList());
+ .map(col -> {
+ Column> rebuild = Column.newBuilder().withName(col.getName()).withDataType(col.getDataType()).withJdbcType(col.getJdbcType())
+ .withNativeType(col.getNativeType()).withOrder(col.getOrder()).build();
+ return new Field(rebuild, col.isNotNull(), col.getName(), tableId.toString());
+ }).collect(Collectors.toList());
}
int columnsSize = orderColumnMap.size();
for (Pair, Pair> pair : rows) {
GeneralDataChangeEvent dataEvent = buildEvent(type, tableId);
- Payload payload = dataEvent.getJdbcConnectData().getPayload();
+ builder.withType(dataEvent.getDataChangeEventType().ofCode());
Schema schema = new Schema();
+ // set primary key
+ schema.addKeys(tableSchema.getPrimaryKey().getColumnNames());
Pair beforePair = Optional.ofNullable(pair.getLeft()).orElse(new Pair<>());
Serializable[] beforeRows = beforePair.getLeft();
if (null != beforeRows && beforeRows.length != 0) {
@@ -633,7 +656,7 @@ private void handleCdcDmlData(MysqlJdbcContext context, MysqlSourceMateData sour
beforeValues.put(orderColumnMap.get(index + 1).getName(), beforeRows[index]);
}
builder.withBefore(beforeValues);
- Field beforeField = new Field().withField(Payload.BEFORE_FIELD).withType("field").withName("payload.before").withRequired(false);
+ Field beforeField = new Field().withField(Payload.BEFORE_FIELD).withName(Payload.PAYLOAD_BEFORE).withRequired(false);
beforeField.withRequired(true).withFields(fields);
schema.add(beforeField);
}
@@ -650,15 +673,15 @@ private void handleCdcDmlData(MysqlJdbcContext context, MysqlSourceMateData sour
}
afterValues.put(orderColumnMap.get(index + 1).getName(), afterRows[index]);
}
- builder.withBefore(afterValues);
- Field afterField = new Field().withField(Payload.AFTER_FIELD).withType("field").withName("payload.after").withRequired(false);
+ builder.withAfter(afterValues);
+ Field afterField = new Field().withField(Payload.AFTER_FIELD).withName(Payload.PAYLOAD_AFTER).withRequired(false);
afterField.withRequired(true).withFields(fields);
schema.add(afterField);
}
+ Payload payload = dataEvent.getJdbcConnectData().getPayload();
payload.withSource(sourceMateData).withDataChanges(builder.build());
dataEvent.getJdbcConnectData().setSchema(schema);
consumers.stream().forEach(consumer -> consumer.accept(dataEvent));
-
}
}
diff --git a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/source/dialect/cdc/mysql/MysqlCdcEngineFactory.java b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/source/dialect/cdc/mysql/MysqlCdcEngineFactory.java
index 60c3c7b7c1..35e722fe12 100644
--- a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/source/dialect/cdc/mysql/MysqlCdcEngineFactory.java
+++ b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/source/dialect/cdc/mysql/MysqlCdcEngineFactory.java
@@ -17,11 +17,11 @@
package org.apache.eventmesh.connector.jdbc.source.dialect.cdc.mysql;
-import org.apache.eventmesh.connector.jdbc.DatabaseDialect;
+import org.apache.eventmesh.connector.jdbc.dialect.DatabaseDialect;
+import org.apache.eventmesh.connector.jdbc.dialect.mysql.MysqlDatabaseDialect;
import org.apache.eventmesh.connector.jdbc.source.dialect.cdc.AbstractCdcEngineFactory;
import org.apache.eventmesh.connector.jdbc.source.dialect.cdc.CdcEngine;
import org.apache.eventmesh.connector.jdbc.source.dialect.cdc.CdcEngineFactory;
-import org.apache.eventmesh.connector.jdbc.source.dialect.mysql.MysqlDatabaseDialect;
import org.apache.eventmesh.openconnect.api.config.SourceConfig;
import org.apache.commons.lang3.StringUtils;
diff --git a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/source/dialect/cdc/mysql/RowDeserializers.java b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/source/dialect/cdc/mysql/RowDeserializers.java
index f605e95bde..e23839563f 100644
--- a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/source/dialect/cdc/mysql/RowDeserializers.java
+++ b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/source/dialect/cdc/mysql/RowDeserializers.java
@@ -23,9 +23,9 @@
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
-import java.time.Year;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
+import java.util.BitSet;
import java.util.Map;
import com.github.shyiko.mysql.binlog.event.TableMapEventData;
@@ -99,6 +99,11 @@ protected Serializable deserializeTimestampV2(int meta, ByteArrayInputStream inp
protected Serializable deserializeYear(ByteArrayInputStream inputStream) throws IOException {
return RowDeserializers.deserializeYear(inputStream);
}
+
+ @Override
+ protected Serializable deserializeBit(int meta, ByteArrayInputStream inputStream) throws IOException {
+ return ((BitSet) super.deserializeBit(meta, inputStream)).toByteArray();
+ }
}
public static class UpdateRowsEventMeshDeserializer extends UpdateRowsEventDataDeserializer {
@@ -156,6 +161,12 @@ protected Serializable deserializeTimestampV2(int meta, ByteArrayInputStream inp
protected Serializable deserializeYear(ByteArrayInputStream inputStream) throws IOException {
return RowDeserializers.deserializeYear(inputStream);
}
+
+ @Override
+ protected Serializable deserializeBit(int meta, ByteArrayInputStream inputStream) throws IOException {
+ return ((BitSet) super.deserializeBit(meta, inputStream)).toByteArray();
+
+ }
}
public static class DeleteRowsEventMeshDeserializer extends DeleteRowsEventDataDeserializer {
@@ -213,6 +224,10 @@ protected Serializable deserializeTimestampV2(int meta, ByteArrayInputStream inp
protected Serializable deserializeYear(ByteArrayInputStream inputStream) throws IOException {
return RowDeserializers.deserializeYear(inputStream);
}
+
+ protected Serializable deserializeBit(int meta, ByteArrayInputStream inputStream) throws IOException {
+ return ((BitSet) super.deserializeBit(meta, inputStream)).toByteArray();
+ }
}
protected static Serializable deserializeTimestamp(ByteArrayInputStream inputStream) throws IOException {
@@ -228,7 +243,7 @@ protected static Serializable deserializeTimestampV2(int meta, ByteArrayInputStr
}
protected static Serializable deserializeYear(ByteArrayInputStream inputStream) throws IOException {
- return Year.of(1900 + inputStream.readInteger(1));
+ return LocalDate.parse(String.format("%d-01-01", 1900 + inputStream.readInteger(1)));
}
/**
@@ -448,4 +463,8 @@ protected static int deserializeFractionalSecondsInNanos(int fsp, ByteArrayInput
return 0;
}
+ protected static byte[] deserializeBit(int meta, ByteArrayInputStream inputStream) throws IOException {
+ return inputStream.read(meta);
+ }
+
}
diff --git a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/source/dialect/mysql/MysqlDataTypeConvertor.java b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/source/dialect/mysql/MysqlDataTypeConvertor.java
index 33b26ebe72..f187e71fbd 100644
--- a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/source/dialect/mysql/MysqlDataTypeConvertor.java
+++ b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/source/dialect/mysql/MysqlDataTypeConvertor.java
@@ -19,12 +19,23 @@
import org.apache.eventmesh.connector.jdbc.DataTypeConvertor;
import org.apache.eventmesh.connector.jdbc.exception.DataTypeConvertException;
-import org.apache.eventmesh.connector.jdbc.table.type.CalendarType;
-import org.apache.eventmesh.connector.jdbc.table.type.DecimalType;
import org.apache.eventmesh.connector.jdbc.table.type.EventMeshDataType;
-import org.apache.eventmesh.connector.jdbc.table.type.PrimitiveByteArrayType;
-import org.apache.eventmesh.connector.jdbc.table.type.PrimitiveType;
import org.apache.eventmesh.connector.jdbc.table.type.SQLType;
+import org.apache.eventmesh.connector.jdbc.type.eventmesh.BooleanEventMeshDataType;
+import org.apache.eventmesh.connector.jdbc.type.eventmesh.BytesEventMeshDataType;
+import org.apache.eventmesh.connector.jdbc.type.eventmesh.DateEventMeshDataType;
+import org.apache.eventmesh.connector.jdbc.type.eventmesh.DateTimeEventMeshDataType;
+import org.apache.eventmesh.connector.jdbc.type.eventmesh.DecimalEventMeshDataType;
+import org.apache.eventmesh.connector.jdbc.type.eventmesh.Float32EventMeshDataType;
+import org.apache.eventmesh.connector.jdbc.type.eventmesh.Float64EventMeshDataType;
+import org.apache.eventmesh.connector.jdbc.type.eventmesh.Int16EventMeshDataType;
+import org.apache.eventmesh.connector.jdbc.type.eventmesh.Int32EventMeshDataType;
+import org.apache.eventmesh.connector.jdbc.type.eventmesh.Int64EventMeshDataType;
+import org.apache.eventmesh.connector.jdbc.type.eventmesh.Int8EventMeshDataType;
+import org.apache.eventmesh.connector.jdbc.type.eventmesh.NullEventMeshDataType;
+import org.apache.eventmesh.connector.jdbc.type.eventmesh.StringEventMeshDataType;
+import org.apache.eventmesh.connector.jdbc.type.eventmesh.TimeEventMeshDataType;
+import org.apache.eventmesh.connector.jdbc.type.eventmesh.YearEventMeshDataType;
import java.sql.JDBCType;
import java.util.Map;
@@ -98,49 +109,50 @@ public EventMeshDataType> toEventMeshType(MysqlType connectorDataType, MapMysql doc
*/
if (null == dataTypeProperties) {
- return PrimitiveByteArrayType.BYTES_TYPE;
+ return BytesEventMeshDataType.INSTANCE;
}
Integer precision = (Integer) dataTypeProperties.get(MysqlDataTypeConvertor.PRECISION);
if (precision != null && precision == 1) {
- return PrimitiveType.BOOLEAN_TYPE;
+ return BooleanEventMeshDataType.INSTANCE;
}
- return PrimitiveByteArrayType.BYTES_TYPE;
+ return BytesEventMeshDataType.INSTANCE;
}
case TINYINT:
- return PrimitiveType.BYTE_TYPE;
+ return Int8EventMeshDataType.INSTANCE;
case TINYINT_UNSIGNED:
case SMALLINT:
- return PrimitiveType.SHORT_TYPE;
+ return Int16EventMeshDataType.INSTANCE;
case SMALLINT_UNSIGNED:
case INT:
case MEDIUMINT:
case MEDIUMINT_UNSIGNED:
- return PrimitiveType.INT_TYPE;
+ return Int32EventMeshDataType.INSTANCE;
case INT_UNSIGNED:
case BIGINT:
- return PrimitiveType.LONG_TYPE;
+ return Int64EventMeshDataType.INSTANCE;
case FLOAT:
case FLOAT_UNSIGNED:
- return PrimitiveType.FLOAT_TYPE;
+ return Float32EventMeshDataType.INSTANCE;
case DOUBLE:
case DOUBLE_UNSIGNED:
- return PrimitiveType.DOUBLE_TYPE;
+ return Float64EventMeshDataType.INSTANCE;
case TIME:
- return CalendarType.LOCAL_TIME_TYPE;
+ return TimeEventMeshDataType.INSTANCE;
case YEAR:
+ return YearEventMeshDataType.INSTANCE;
case DATE:
- return CalendarType.LOCAL_DATE_TYPE;
+ return DateEventMeshDataType.INSTANCE;
case TIMESTAMP:
case DATETIME:
- return CalendarType.LOCAL_DATE_TIME_TYPE;
+ return DateTimeEventMeshDataType.INSTANCE;
case CHAR:
case VARCHAR:
case TINYTEXT:
@@ -149,7 +161,8 @@ public EventMeshDataType> toEventMeshType(MysqlType connectorDataType, Map toEventMeshType(MysqlType connectorDataType, Map toEventMeshType(MysqlType connectorDataType, MapMysql doc-DECIMAL, NUMERIC
*/
if (dataTypeProperties == null) {
- return new DecimalType(DEFAULT_PRECISION, DEFAULT_SCALE);
+ return new DecimalEventMeshDataType(DEFAULT_PRECISION, DEFAULT_SCALE);
}
Integer precision = (Integer) dataTypeProperties.getOrDefault(PRECISION, DEFAULT_PRECISION);
Integer scale = (Integer) dataTypeProperties.getOrDefault(SCALE, DEFAULT_SCALE);
- return new DecimalType(precision, scale);
+ return new DecimalEventMeshDataType(precision, scale);
}
default:
throw new DataTypeConvertException(String.format("%s type is not supported", connectorDataType.getName()));
@@ -218,8 +231,6 @@ public MysqlType toConnectorType(EventMeshDataType> eventMeshDataType, Map snapshot
private Callable createSnapshotDataEvent4TableCallable(Jc context, SnapshotContext snapshotContext,
Queue connectionPool, String sql, TableId tableId) {
UniversalJdbcContext, ?, ?> universalJdbcContext = (UniversalJdbcContext, ?, ?>) context;
- universalJdbcContext.withTableId(tableId);
+ //universalJdbcContext.withTableId(tableId);
return () -> {
JdbcConnection connection = connectionPool.poll();
- MysqlSourceMateData sourceMateData = MysqlSourceMateData.newBuilder()
- .name(sourceConnectorConfig.getName())
- .snapshot(true)
- .withTableId(tableId)
- .serverId(sourceConnectorConfig.getMysqlConfig().getServerId())
- .build();
+ SourceMateData sourceMateData = buildSourceMateData(context, snapshotContext, tableId);
TableSchema tableSchema = universalJdbcContext.getCatalogTableSet().getTableSchema(tableId);
- Field field = new Field().withField("after").withType("field").withName("payload.after").withRequired(false);
+ Field field = new Field().withField("after").withName("payload.after").withRequired(false);
List extends Column> columns = tableSchema.getColumns();
if (CollectionUtils.isNotEmpty(columns)) {
- List fields = columns.stream()
- .map(col -> new Field(col.getDataType().getName(), col.isNotNull(), col.getName(), tableId.toString()))
- .collect(Collectors.toList());
+ List fields = columns.stream().map(col -> {
+ Column> rebuild = Column.newBuilder().withName(col.getName()).withDataType(col.getDataType())
+ .withJdbcType(col.getJdbcType()).withNativeType(col.getNativeType()).withOrder(col.getOrder()).build();
+ return new Field(rebuild, col.isNotNull(), col.getName(), tableId.toString());
+ }).collect(Collectors.toList());
field.withRequired(true).withFields(fields);
}
try (Statement statement = connection.createStatement(jdbcSourceConfig.getSourceConnectorConfig().getSnapshotFetchSize(), 100)) {
@@ -239,13 +236,14 @@ private Callable createSnapshotDataEvent4TableCallable(Jc context, Snapsho
while (resultSet.next()) {
int columnCount = resultSet.getMetaData().getColumnCount();
InsertDataEvent event = new InsertDataEvent(tableId);
- Payload payload = event.getJdbcConnectData().getPayload();
Map values = new HashMap<>(columnCount);
for (int index = 1; index <= columnCount; ++index) {
values.put(resultSet.getMetaData().getColumnName(index), resultSet.getObject(index));
}
Builder builder = DataChanges.newBuilder();
builder.withAfter(values);
+ builder.withType(event.getDataChangeEventType().ofCode());
+ final Payload payload = event.getJdbcConnectData().getPayload();
payload.withDataChanges(builder.build());
payload.withSource(sourceMateData);
event.getJdbcConnectData().setSchema(new Schema(Collections.singletonList(field)));
@@ -273,6 +271,16 @@ private Queue createConnectionPool(final SnapshotContext snapshotContext, TableId tableId);
+
/**
* Pre-snapshot preparations.
*
diff --git a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/source/dialect/snapshot/SnapshotEngineFactory.java b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/source/dialect/snapshot/SnapshotEngineFactory.java
index 164b5b97b4..d573d7081a 100644
--- a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/source/dialect/snapshot/SnapshotEngineFactory.java
+++ b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/source/dialect/snapshot/SnapshotEngineFactory.java
@@ -17,8 +17,8 @@
package org.apache.eventmesh.connector.jdbc.source.dialect.snapshot;
-import org.apache.eventmesh.connector.jdbc.DatabaseDialect;
import org.apache.eventmesh.connector.jdbc.JdbcContext;
+import org.apache.eventmesh.connector.jdbc.dialect.DatabaseDialect;
import org.apache.eventmesh.connector.jdbc.source.config.JdbcSourceConfig;
import org.apache.eventmesh.spi.EventMeshExtensionType;
import org.apache.eventmesh.spi.EventMeshSPI;
diff --git a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/source/dialect/snapshot/mysql/MysqlSnapshotEngine.java b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/source/dialect/snapshot/mysql/MysqlSnapshotEngine.java
index 2d4a7c2d31..a707029012 100644
--- a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/source/dialect/snapshot/mysql/MysqlSnapshotEngine.java
+++ b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/source/dialect/snapshot/mysql/MysqlSnapshotEngine.java
@@ -17,19 +17,25 @@
package org.apache.eventmesh.connector.jdbc.source.dialect.snapshot.mysql;
+import org.apache.eventmesh.connector.jdbc.CatalogChanges;
import org.apache.eventmesh.connector.jdbc.connection.mysql.MysqlJdbcConnection;
import org.apache.eventmesh.connector.jdbc.context.mysql.MysqlOffsetContext;
import org.apache.eventmesh.connector.jdbc.context.mysql.MysqlPartition;
+import org.apache.eventmesh.connector.jdbc.dialect.mysql.MysqlDatabaseDialect;
import org.apache.eventmesh.connector.jdbc.event.Event;
import org.apache.eventmesh.connector.jdbc.event.EventConsumer;
+import org.apache.eventmesh.connector.jdbc.event.SchemaChangeEventType;
+import org.apache.eventmesh.connector.jdbc.source.SourceMateData;
import org.apache.eventmesh.connector.jdbc.source.config.JdbcSourceConfig;
import org.apache.eventmesh.connector.jdbc.source.config.MysqlConfig;
import org.apache.eventmesh.connector.jdbc.source.dialect.mysql.MysqlConstants;
-import org.apache.eventmesh.connector.jdbc.source.dialect.mysql.MysqlDatabaseDialect;
import org.apache.eventmesh.connector.jdbc.source.dialect.mysql.MysqlDialectSql;
import org.apache.eventmesh.connector.jdbc.source.dialect.mysql.MysqlJdbcContext;
+import org.apache.eventmesh.connector.jdbc.source.dialect.mysql.MysqlSourceMateData;
import org.apache.eventmesh.connector.jdbc.source.dialect.snapshot.AbstractSnapshotEngine;
+import org.apache.eventmesh.connector.jdbc.table.catalog.DefaultValueConvertor;
import org.apache.eventmesh.connector.jdbc.table.catalog.TableId;
+import org.apache.eventmesh.connector.jdbc.table.catalog.mysql.MysqlDefaultValueConvertorImpl;
import org.apache.eventmesh.connector.jdbc.utils.MysqlUtils;
import org.apache.commons.collections4.CollectionUtils;
@@ -60,6 +66,8 @@ public class MysqlSnapshotEngine extends
private MysqlJdbcConnection connection;
+ private DefaultValueConvertor defaultValueConvertor = new MysqlDefaultValueConvertorImpl();
+
public MysqlSnapshotEngine(JdbcSourceConfig jdbcSourceConfig, MysqlDatabaseDialect databaseDialect, MysqlJdbcContext jdbcContext) {
super(jdbcSourceConfig, databaseDialect, jdbcContext, jdbcContext.getPartition(), jdbcContext.getOffsetContext());
this.connection = databaseDialect.getConnection();
@@ -76,6 +84,29 @@ public void close() throws Exception {
shutdown();
}
+ /**
+ * Builds the source metadata.
+ *
+ * @param context The context.
+ * @param snapshotContext The snapshot context.
+ * @param tableId The table id
+ * @return The source metadata.
+ */
+ @Override
+ protected SourceMateData buildSourceMateData(MysqlJdbcContext context, SnapshotContext snapshotContext,
+ TableId tableId) {
+
+ MysqlSourceMateData sourceMateData = MysqlSourceMateData.newBuilder()
+ .name(sourceConnectorConfig.getName())
+ .withTableId(tableId)
+ .serverId(sourceConnectorConfig.getMysqlConfig().getServerId())
+ .snapshot(true)
+ .position(context.getSourceInfo().getCurrentBinlogPosition())
+ .build();
+
+ return sourceMateData;
+ }
+
@Override
protected void preSnapshot(MysqlJdbcContext jdbcContext, SnapshotContext snapshotContext) {
// nothing to do
@@ -185,7 +216,17 @@ private void addParseDdlAndEvent(MysqlJdbcContext jdbcContext, String ddl, Table
if (event == null) {
return;
}
- event.getJdbcConnectData().getPayload().ofSourceMateData().setSnapshot(true);
+ // handle default value expression
+ if (event.getJdbcConnectData().isSchemaChanges()) {
+ CatalogChanges catalogChanges = event.getJdbcConnectData().getPayload().getCatalogChanges();
+ SchemaChangeEventType schemaChangeEventType = SchemaChangeEventType.ofSchemaChangeEventType(catalogChanges.getType(),
+ catalogChanges.getOperationType());
+ if (SchemaChangeEventType.TABLE_CREATE == schemaChangeEventType || SchemaChangeEventType.TABLE_ALERT == schemaChangeEventType) {
+ catalogChanges.getColumns().forEach(
+ column -> column.setDefaultValue(defaultValueConvertor.parseDefaultValue(column, column.getDefaultValueExpression())));
+ }
+ }
+ event.getJdbcConnectData().getPayload().withDdl(ddl).ofSourceMateData().setSnapshot(true);
eventQueue.put(event);
} catch (InterruptedException e) {
throw new RuntimeException(e);
diff --git a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/source/dialect/snapshot/mysql/MysqlSnapshotEngineFactory.java b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/source/dialect/snapshot/mysql/MysqlSnapshotEngineFactory.java
index 53c5f993da..210ded0edf 100644
--- a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/source/dialect/snapshot/mysql/MysqlSnapshotEngineFactory.java
+++ b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/source/dialect/snapshot/mysql/MysqlSnapshotEngineFactory.java
@@ -17,10 +17,10 @@
package org.apache.eventmesh.connector.jdbc.source.dialect.snapshot.mysql;
-import org.apache.eventmesh.connector.jdbc.DatabaseDialect;
+import org.apache.eventmesh.connector.jdbc.dialect.DatabaseDialect;
+import org.apache.eventmesh.connector.jdbc.dialect.mysql.MysqlDatabaseDialect;
import org.apache.eventmesh.connector.jdbc.source.config.JdbcSourceConfig;
import org.apache.eventmesh.connector.jdbc.source.dialect.antlr4.mysql.MysqlAntlr4DdlParser;
-import org.apache.eventmesh.connector.jdbc.source.dialect.mysql.MysqlDatabaseDialect;
import org.apache.eventmesh.connector.jdbc.source.dialect.mysql.MysqlJdbcContext;
import org.apache.eventmesh.connector.jdbc.source.dialect.snapshot.SnapshotEngine;
import org.apache.eventmesh.connector.jdbc.source.dialect.snapshot.SnapshotEngineFactory;
diff --git a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/table/catalog/AbstractColumnEditorImpl.java b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/table/catalog/AbstractColumnEditorImpl.java
index 146d232a2d..a3bcc97f8e 100644
--- a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/table/catalog/AbstractColumnEditorImpl.java
+++ b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/table/catalog/AbstractColumnEditorImpl.java
@@ -20,6 +20,7 @@
import org.apache.eventmesh.connector.jdbc.table.type.EventMeshDataType;
import java.sql.JDBCType;
+import java.util.List;
public abstract class AbstractColumnEditorImpl implements ColumnEditor {
@@ -36,7 +37,7 @@ public abstract class AbstractColumnEditorImpl enumValues;
+
+ private Options options;
+
public AbstractColumnEditorImpl(String name) {
this.name = name;
}
@@ -132,7 +146,7 @@ public CE withJdbcType(JDBCType jdbcType) {
*/
@SuppressWarnings("unchecked")
@Override
- public CE withEventMeshType(EventMeshDataType> eventMeshType) {
+ public CE withEventMeshType(EventMeshDataType eventMeshType) {
this.eventMeshDataType = eventMeshType;
return (CE) this;
}
@@ -158,7 +172,7 @@ public CE withOrder(int order) {
*/
@SuppressWarnings("unchecked")
@Override
- public CE length(int length) {
+ public CE length(long length) {
this.columnLength = length;
return (CE) this;
}
@@ -241,11 +255,47 @@ public CE notNull(boolean notNull) {
return (CE) this;
}
- public EventMeshDataType> ofEventMeshDataType() {
+ @SuppressWarnings("unchecked")
+ @Override
+ public CE charsetName(String charsetName) {
+ this.charsetName = charsetName;
+ return (CE) this;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public CE enumValues(List enumValues) {
+ this.enumValues = enumValues;
+ return (CE) this;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public CE withOption(String key, Object value) {
+ if (options == null) {
+ this.options = new Options();
+ }
+ this.options.put(key, value);
+ return (CE) this;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public CE withOptions(Options options) {
+ if (options != null) {
+ if (this.options == null) {
+ this.options = new Options();
+ }
+ this.options.putAll(options);
+ }
+ return (CE) this;
+ }
+
+ public EventMeshDataType ofEventMeshDataType() {
return eventMeshDataType;
}
- public Integer ofColumnLength() {
+ public Long ofColumnLength() {
return columnLength;
}
@@ -284,4 +334,16 @@ public boolean isOptional() {
public int ofOrder() {
return this.order;
}
+
+ public String ofCharsetName() {
+ return this.charsetName;
+ }
+
+ public List ofEnumValues() {
+ return this.enumValues;
+ }
+
+ public Options ofOptions() {
+ return this.options;
+ }
}
diff --git a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/table/catalog/AbstractTableEditorImpl.java b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/table/catalog/AbstractTableEditorImpl.java
index 61a7c7bf3f..4d26b3cb85 100644
--- a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/table/catalog/AbstractTableEditorImpl.java
+++ b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/table/catalog/AbstractTableEditorImpl.java
@@ -174,11 +174,6 @@ public TE withUniqueKeys(UniqueKey... uniqueKeys) {
return (TE) this;
}
- /**
- * @param key
- * @param value
- * @return
- */
@Override
@SuppressWarnings("unchecked")
public TE withOption(String key, Object value) {
diff --git a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/table/catalog/CatalogTableSet.java b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/table/catalog/CatalogTableSet.java
index ed2b6dc6e4..16c083f06a 100644
--- a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/table/catalog/CatalogTableSet.java
+++ b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/table/catalog/CatalogTableSet.java
@@ -51,6 +51,7 @@ public void overrideTable(TableSchema tableSchema) {
return;
}
tableSchemaMap.putTableSchema(tableSchema);
+ tableIdSet.addTableId(tableSchema.getTableId());
}
public TableSchema getTableSchema(TableId tableId) {
@@ -65,6 +66,10 @@ public TableIdSet() {
values = new HashSet<>(32);
}
+ public void addTableId(TableId tableId) {
+ values.add(tableId);
+ }
+
public void removeDatabase(String catalogName, String schemaName) {
values.removeIf(
entry -> StringUtils.equals(entry.getCatalogName(), catalogName) && StringUtils.equals(entry.getSchemaName(), schemaName));
diff --git a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/table/catalog/Column.java b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/table/catalog/Column.java
index 94ed25f24e..363a50c147 100644
--- a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/table/catalog/Column.java
+++ b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/table/catalog/Column.java
@@ -22,6 +22,12 @@
import java.io.Serializable;
import java.sql.JDBCType;
import java.sql.Types;
+import java.util.List;
+import java.util.Optional;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import lombok.AllArgsConstructor;
import lombok.Data;
@@ -33,7 +39,7 @@
@Data
@AllArgsConstructor
@NoArgsConstructor
-public abstract class Column implements Serializable {
+public class Column implements Serializable {
/**
* Name of the column
@@ -43,7 +49,9 @@ public abstract class Column implements Serializable {
/**
* Data type of the column
*/
- protected EventMeshDataType> dataType;
+ @JsonSerialize(using = EventMeshDataTypeJsonSerializer.class)
+ @JsonDeserialize(using = EventMeshDataTypeJsonDeserializer.class)
+ protected EventMeshDataType dataType;
/**
* {@link Types JDBC type}
@@ -53,7 +61,7 @@ public abstract class Column implements Serializable {
/**
* Length of the column
*/
- protected Integer columnLength;
+ protected Long columnLength;
/**
* Decimal point of the column
@@ -63,7 +71,7 @@ public abstract class Column implements Serializable {
/**
* Indicates if the column can be null or not
*/
- protected boolean notNull;
+ protected boolean notNull = false;
/**
* Comment for the column
@@ -75,15 +83,202 @@ public abstract class Column implements Serializable {
*/
protected Object defaultValue;
+ @JsonIgnore
protected String defaultValueExpression;
- protected int order;
+ // order of the column in the table
+ protected int order = 1;
+
+ protected String charsetName;
+
+ // Use wrapper types to reduce data transmission during serialization
+ protected Boolean autoIncremented;
+
+ protected Boolean generated;
+
+ protected String collationName;
+
+ protected List enumValues;
+
+ // for mysql: varchar or json
+ protected String nativeType;
+
+ protected Options options;
+
+ public Column(String name, EventMeshDataType dataType, JDBCType jdbcType, Long columnLength, Integer decimal, boolean notNull, String comment,
+ Object defaultValue, String defaultValueExpression, int order, String charsetName, boolean autoIncremented, boolean generated,
+ String collationName) {
+ this.name = name;
+ this.dataType = dataType;
+ this.jdbcType = jdbcType;
+ this.columnLength = columnLength;
+ this.decimal = decimal;
+ this.notNull = notNull;
+ this.comment = comment;
+ this.defaultValue = defaultValue;
+ this.defaultValueExpression = defaultValueExpression;
+ this.order = order;
+ this.charsetName = charsetName;
+ this.autoIncremented = autoIncremented;
+ this.generated = generated;
+ this.collationName = collationName;
+ }
+
+ private Column(Builder builder) {
+ this.name = builder.name;
+ this.dataType = builder.dataType;
+ this.jdbcType = builder.jdbcType;
+ this.columnLength = builder.columnLength;
+ this.decimal = builder.decimal;
+ this.notNull = builder.notNull;
+ this.comment = builder.comment;
+ this.defaultValue = builder.defaultValue;
+ this.defaultValueExpression = builder.defaultValueExpression;
+ this.order = builder.order;
+ this.charsetName = builder.charsetName;
+ this.autoIncremented = builder.autoIncremented;
+ this.generated = builder.generated;
+ this.collationName = builder.collationName;
+ this.enumValues = builder.enumValues;
+ this.nativeType = builder.nativeType;
+ this.options = builder.options;
+ }
+
+ public boolean isAutoIncremented() {
+ return Optional.ofNullable(this.autoIncremented).orElse(false);
+ }
+
+ public static Builder newBuilder() {
+ return new Builder();
+ }
/**
* creates a clone of the Column
*
* @return clone of column
*/
- public abstract Col clone();
+ public Col clone() {
+ return null;
+ }
+
+ /**
+ * Builder for the Column class
+ */
+ public static class Builder {
+
+ protected String name;
+ protected EventMeshDataType dataType;
+ protected JDBCType jdbcType;
+ protected Long columnLength;
+ protected Integer decimal;
+ protected boolean notNull = false;
+ protected String comment;
+ protected Object defaultValue;
+ protected String defaultValueExpression;
+ protected int order = 1;
+ protected String charsetName;
+ protected Boolean autoIncremented;
+ protected Boolean generated;
+ protected String collationName;
+ protected List enumValues;
+ // for mysql: varchar or json
+ protected String nativeType;
+
+ protected Options options;
+
+ public Builder withName(String name) {
+ this.name = name;
+ return this;
+ }
+
+ public Builder withDataType(EventMeshDataType dataType) {
+ this.dataType = dataType;
+ return this;
+ }
+
+ public Builder withJdbcType(JDBCType jdbcType) {
+ this.jdbcType = jdbcType;
+ return this;
+ }
+
+ public Builder withColumnLength(Long columnLength) {
+ this.columnLength = columnLength;
+ return this;
+ }
+
+ public Builder withDecimal(Integer decimal) {
+ this.decimal = decimal;
+ return this;
+ }
+
+ public Builder withNotNull(boolean notNull) {
+ this.notNull = notNull;
+ return this;
+ }
+
+ public Builder withComment(String comment) {
+ this.comment = comment;
+ return this;
+ }
+
+ public Builder withDefaultValue(Object defaultValue) {
+ this.defaultValue = defaultValue;
+ return this;
+ }
+
+ public Builder withDefaultValueExpression(String defaultValueExpression) {
+ this.defaultValueExpression = defaultValueExpression;
+ return this;
+ }
+
+ public Builder withOrder(int order) {
+ this.order = order;
+ return this;
+ }
+
+ public Builder withCharsetName(String charsetName) {
+ this.charsetName = charsetName;
+ return this;
+ }
+
+ public Builder withAutoIncremented(boolean autoIncremented) {
+ this.autoIncremented = autoIncremented;
+ return this;
+ }
+
+ public Builder withGenerated(boolean generated) {
+ this.generated = generated;
+ return this;
+ }
+
+ public Builder withCollationName(String collationName) {
+ this.collationName = collationName;
+ return this;
+ }
+
+ public Builder withEnumValues(List enumValues) {
+ this.enumValues = enumValues;
+ return this;
+ }
+
+ public Builder withNativeType(String nativeType) {
+ this.nativeType = nativeType;
+ return this;
+ }
+
+ public Builder withOptions(Options options) {
+ this.options = options;
+ return this;
+ }
+
+ /**
+ * Builds the Column instance.
+ *
+ * @return Column instance
+ */
+ public Column build() {
+ return new Column(this);
+ }
+ }
}
diff --git a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/table/catalog/ColumnEditor.java b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/table/catalog/ColumnEditor.java
index 2661c7db27..e99abaccd8 100644
--- a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/table/catalog/ColumnEditor.java
+++ b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/table/catalog/ColumnEditor.java
@@ -20,6 +20,7 @@
import org.apache.eventmesh.connector.jdbc.table.type.EventMeshDataType;
import java.sql.JDBCType;
+import java.util.List;
/**
* An interface for building and configuring columns in a database table.
@@ -66,7 +67,7 @@ public interface ColumnEditor {
* @param eventMeshType The EventMesh data type of the column.
* @return The column editor instance.
*/
- CE withEventMeshType(EventMeshDataType> eventMeshType);
+ CE withEventMeshType(EventMeshDataType eventMeshType);
/**
* Sets the order or position of the column within a table.
@@ -82,7 +83,7 @@ public interface ColumnEditor {
* @param length The length of the column.
* @return The column editor instance.
*/
- CE length(int length);
+ CE length(long length);
/**
* Sets the scale of the column (if applicable).
@@ -132,10 +133,19 @@ public interface ColumnEditor {
*/
CE notNull(boolean notNull);
+ CE charsetName(String charsetName);
+
+ CE enumValues(List enumValues);
+
+ CE withOption(String key, Object value);
+
+ CE withOptions(Options options);
+
/**
* Builds and returns the configured column.
*
* @return The configured column.
*/
Col build();
+
}
diff --git a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/table/catalog/DefaultColumn.java b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/table/catalog/DefaultColumn.java
index f682ea9438..e0dae1e617 100644
--- a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/table/catalog/DefaultColumn.java
+++ b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/table/catalog/DefaultColumn.java
@@ -28,26 +28,34 @@
@NoArgsConstructor
public class DefaultColumn extends Column {
- public DefaultColumn(String name, EventMeshDataType> dataType, JDBCType jdbcType, Integer columnLength, Integer decimal, boolean notNull,
- String comment, Object defaultValue, String defaultValueExpression) {
- super(name, dataType, jdbcType, columnLength, decimal, notNull, comment, defaultValue, defaultValueExpression, 0);
+ public DefaultColumn(String name, EventMeshDataType dataType, JDBCType jdbcType, Long columnLength, Integer decimal, boolean notNull,
+ String comment, Object defaultValue, String defaultValueExpression, String charsetName, boolean autoIncremented, boolean generated,
+ String collationName) {
+ super(name, dataType, jdbcType, columnLength, decimal, notNull, comment, defaultValue, defaultValueExpression, 0, charsetName,
+ autoIncremented, generated, collationName);
}
- public DefaultColumn(String name, EventMeshDataType> dataType, JDBCType jdbcType, Integer columnLength, Integer decimal, boolean notNull,
- String comment, Object defaultValue, String defaultValueExpression, int order) {
- super(name, dataType, jdbcType, columnLength, decimal, notNull, comment, defaultValue, defaultValueExpression, order);
+ public DefaultColumn(String name, EventMeshDataType dataType, JDBCType jdbcType, Long columnLength, Integer decimal, boolean notNull,
+ String comment, Object defaultValue, String defaultValueExpression, int order, String charsetName, boolean autoIncremented, boolean generated,
+ String collationName) {
+ super(name, dataType, jdbcType, columnLength, decimal, notNull, comment, defaultValue, defaultValueExpression, order, charsetName,
+ autoIncremented, generated, collationName);
}
public static DefaultColumn of(
- String name, EventMeshDataType> dataType, JDBCType jdbcType, Integer columnLength, Integer decimal, boolean notNull,
- String comment, Object defaultValue, String defaultValueExpression) {
- return new DefaultColumn(name, dataType, jdbcType, columnLength, decimal, notNull, comment, defaultValue, defaultValueExpression);
+ String name, EventMeshDataType dataType, JDBCType jdbcType, Long columnLength, Integer decimal, boolean notNull,
+ String comment, Object defaultValue, String defaultValueExpression, String charsetName, boolean autoIncremented, boolean generated,
+ String collationName) {
+ return new DefaultColumn(name, dataType, jdbcType, columnLength, decimal, notNull, comment, defaultValue, defaultValueExpression,
+ charsetName, autoIncremented, generated, collationName);
}
public static DefaultColumn of(
- String name, EventMeshDataType> dataType, JDBCType jdbcType, Integer columnLength, Integer decimal, boolean notNull,
- String comment, Object defaultValue, String defaultValueExpression, int order) {
- return new DefaultColumn(name, dataType, jdbcType, columnLength, decimal, notNull, comment, defaultValue, defaultValueExpression, order);
+ String name, EventMeshDataType dataType, JDBCType jdbcType, Long columnLength, Integer decimal, boolean notNull,
+ String comment, Object defaultValue, String defaultValueExpression, int order, String charsetName, boolean autoIncremented, boolean generated,
+ String collationName) {
+ return new DefaultColumn(name, dataType, jdbcType, columnLength, decimal, notNull, comment, defaultValue, defaultValueExpression, order,
+ charsetName, autoIncremented, generated, collationName);
}
/**
@@ -57,6 +65,7 @@ public static DefaultColumn of(
*/
@Override
public DefaultColumn clone() {
- return DefaultColumn.of(name, dataType, jdbcType, columnLength, decimal, notNull, comment, defaultValue, defaultValueExpression, order);
+ return DefaultColumn.of(name, dataType, jdbcType, columnLength, decimal, notNull, comment, defaultValue, defaultValueExpression, order,
+ charsetName, autoIncremented, generated, collationName);
}
}
diff --git a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/table/catalog/DefaultValueConvertor.java b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/table/catalog/DefaultValueConvertor.java
new file mode 100644
index 0000000000..9c3e6d4185
--- /dev/null
+++ b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/table/catalog/DefaultValueConvertor.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.jdbc.table.catalog;
+
+/**
+ * Functional interface for converting default values.
+ */
+@FunctionalInterface
+public interface DefaultValueConvertor {
+
+ /**
+ * Parses the default value expression for a column.
+ *
+ * @param column The column for which the default value is being parsed.
+ * @param defaultValueExpression The expression representing the default value.
+ * @return The parsed default value.
+ */
+ Object parseDefaultValue(Column> column, String defaultValueExpression);
+}
diff --git a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/table/catalog/EventMeshDataTypeJsonDeserializer.java b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/table/catalog/EventMeshDataTypeJsonDeserializer.java
new file mode 100644
index 0000000000..86b5530614
--- /dev/null
+++ b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/table/catalog/EventMeshDataTypeJsonDeserializer.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.jdbc.table.catalog;
+
+import org.apache.eventmesh.connector.jdbc.table.type.EventMeshDataType;
+import org.apache.eventmesh.connector.jdbc.table.type.EventMeshTypeNameConverter;
+import org.apache.eventmesh.connector.jdbc.type.eventmesh.NullEventMeshDataType;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import com.fasterxml.jackson.core.JacksonException;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.JsonDeserializer;
+import com.fasterxml.jackson.databind.JsonNode;
+
+public class EventMeshDataTypeJsonDeserializer extends JsonDeserializer {
+
+ @Override
+ public EventMeshDataType deserialize(JsonParser jsonParser, DeserializationContext deserializationContext)
+ throws IOException, JacksonException {
+ JsonNode treeNode = jsonParser.readValueAsTree();
+ Iterator> fields = treeNode.fields();
+ while (fields.hasNext()) {
+ Map.Entry field = fields.next();
+ if (StringUtils.equals("eventMeshDataType", field.getKey())) {
+ String value = field.getValue().asText();
+ return EventMeshTypeNameConverter.ofEventMeshDataType(value);
+ }
+ }
+ return NullEventMeshDataType.INSTANCE;
+ }
+}
\ No newline at end of file
diff --git a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/table/catalog/EventMeshDataTypeJsonSerializer.java b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/table/catalog/EventMeshDataTypeJsonSerializer.java
new file mode 100644
index 0000000000..68fd9a6954
--- /dev/null
+++ b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/table/catalog/EventMeshDataTypeJsonSerializer.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.jdbc.table.catalog;
+
+import org.apache.eventmesh.connector.jdbc.table.type.EventMeshDataType;
+
+import java.io.IOException;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.JsonSerializer;
+import com.fasterxml.jackson.databind.SerializerProvider;
+
+public class EventMeshDataTypeJsonSerializer extends JsonSerializer {
+
+ @Override
+ public void serialize(EventMeshDataType value, JsonGenerator gen, SerializerProvider serializers) throws IOException {
+ gen.writeStartObject();
+ gen.writeStringField("eventMeshDataType", value.getName());
+ gen.writeEndObject();
+ }
+}
diff --git a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/table/catalog/Index.java b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/table/catalog/Index.java
new file mode 100644
index 0000000000..751d26cb13
--- /dev/null
+++ b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/table/catalog/Index.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.jdbc.table.catalog;
+
+import java.io.Serializable;
+import java.util.List;
+
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
+
+/**
+ * This class represents an Index object with attributes such as name, column names, index type, index method, and comment.
+ * It provides a Builder pattern for creating Index objects.
+ */
+@Getter
+@Setter
+@NoArgsConstructor
+public class Index implements Serializable {
+
+ // Name of the index
+ private String name;
+
+ // List of column names included in the index
+ private List columnNames;
+
+ // Type of the index (e.g., unique, normal, etc.)
+ private String indexType;
+
+ // Method used for the index (e.g., B-tree, Hash, etc.)
+ private String indexMethod;
+
+ // Comment associated with the index
+ private String comment;
+
+ protected Index(Builder builder) {
+ this.name = builder.name;
+ this.columnNames = builder.columnNames;
+ this.indexType = builder.indexType;
+ this.indexMethod = builder.indexMethod;
+ this.comment = builder.comment;
+ }
+
+ public Index(List columnNames) {
+ this.columnNames = columnNames;
+ }
+
+ public Index(String name, List columnNames) {
+ this.name = name;
+ this.columnNames = columnNames;
+ }
+
+ public Index(String name, List columnNames, String comment) {
+ this.name = name;
+ this.columnNames = columnNames;
+ this.comment = comment;
+ }
+
+ public void addColumnNames(String... columnNames) {
+ if (columnNames != null && columnNames.length > 0) {
+ for (String columnName : columnNames) {
+ this.columnNames.add(columnName);
+ }
+ }
+ }
+
+ public Index(String name, List columnNames, String indexType, String indexMethod, String comment) {
+ this.name = name;
+ this.columnNames = columnNames;
+ this.indexType = indexType;
+ this.indexMethod = indexMethod;
+ this.comment = comment;
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ public static class Builder {
+
+ protected String name;
+ protected List columnNames;
+ protected String indexType;
+ protected String indexMethod;
+ protected String comment;
+
+ public static Builder newIndex() {
+ return new Builder();
+ }
+
+ public Builder withName(String name) {
+ this.name = name;
+ return this;
+ }
+
+ public Builder withColumnNames(List columnNames) {
+ this.columnNames = columnNames;
+ return this;
+ }
+
+ public Builder withIndexType(String indexType) {
+ this.indexType = indexType;
+ return this;
+ }
+
+ public Builder withIndexMethod(String indexMethod) {
+ this.indexMethod = indexMethod;
+ return this;
+ }
+
+ public Builder withComment(String comment) {
+ this.comment = comment;
+ return this;
+ }
+
+ public Index build() {
+ return new Index(this);
+ }
+ }
+}
diff --git a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/table/catalog/PrimaryKey.java b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/table/catalog/PrimaryKey.java
index 2095b30429..48fa27d936 100644
--- a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/table/catalog/PrimaryKey.java
+++ b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/table/catalog/PrimaryKey.java
@@ -22,6 +22,7 @@
public class PrimaryKey extends UniqueKey {
public PrimaryKey() {
+
}
public PrimaryKey(String name, List columnNames, String comment) {
diff --git a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/table/catalog/Table.java b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/table/catalog/Table.java
index 34a44eedff..c475227248 100644
--- a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/table/catalog/Table.java
+++ b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/table/catalog/Table.java
@@ -19,16 +19,14 @@
import java.util.List;
-import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
-@AllArgsConstructor
@NoArgsConstructor
public class Table {
- private String name;
+ private TableId tableId;
private PrimaryKey primaryKey;
@@ -36,4 +34,72 @@ public class Table {
private String comment;
+ private Options options = new Options();
+
+ public Table(TableId tableId, PrimaryKey primaryKey, List uniqueKeys, String comment) {
+ this.tableId = tableId;
+ this.primaryKey = primaryKey;
+ this.uniqueKeys = uniqueKeys;
+ this.comment = comment;
+ }
+
+ public Table(TableId tableId, PrimaryKey primaryKey, List uniqueKeys, String comment, Options options) {
+ this.tableId = tableId;
+ this.primaryKey = primaryKey;
+ this.uniqueKeys = uniqueKeys;
+ this.comment = comment;
+ if (null != options) {
+ this.options.putAll(options);
+ }
+ }
+
+ public void put(String key, Object value) {
+ options.put(key, value);
+ }
+
+ public void putAll(Options options) {
+ this.options.putAll(options);
+ }
+
+ public static Builder newBuilder() {
+ return new Builder();
+ }
+
+ public static class Builder {
+
+ private TableId tableId;
+ private PrimaryKey primaryKey;
+ private List uniqueKeys;
+ private String comment;
+ private Options options;
+
+ public Builder withTableId(TableId tableId) {
+ this.tableId = tableId;
+ return this;
+ }
+
+ public Builder withPrimaryKey(PrimaryKey primaryKey) {
+ this.primaryKey = primaryKey;
+ return this;
+ }
+
+ public Builder withUniqueKeys(List uniqueKeys) {
+ this.uniqueKeys = uniqueKeys;
+ return this;
+ }
+
+ public Builder withComment(String comment) {
+ this.comment = comment;
+ return this;
+ }
+
+ public Builder withOptions(Options options) {
+ this.options = options;
+ return this;
+ }
+
+ public Table build() {
+ return new Table(tableId, primaryKey, uniqueKeys, comment, options);
+ }
+ }
}
diff --git a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/table/catalog/TableSchema.java b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/table/catalog/TableSchema.java
index 3eadd78d56..fa4ed243d8 100644
--- a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/table/catalog/TableSchema.java
+++ b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/table/catalog/TableSchema.java
@@ -40,14 +40,14 @@ public class TableSchema implements Serializable {
/**
* A map of column names to their respective column objects.
*/
- private Map columnMap;
+ private Map> columnMap;
/**
* A list of columns in the table.
*/
- private List extends Column> columns;
+ private List extends Column>> columns;
- private Map orderColumnMap;
+ private Map> orderColumnMap;
/**
* The primary key of the table.
@@ -58,8 +58,8 @@ public class TableSchema implements Serializable {
private String comment;
- public TableSchema(TableId tableId, Map columnMap, List extends Column> columns,
- Map orderColumnMap, PrimaryKey primaryKey, List uniqueKeys, String comment) {
+ public TableSchema(TableId tableId, Map> columnMap, List extends Column>> columns,
+ Map> orderColumnMap, PrimaryKey primaryKey, List uniqueKeys, String comment) {
this.tableId = tableId;
this.columnMap = columnMap;
this.columns = columns;
@@ -88,9 +88,9 @@ public static TableSchemaBuilder newTableSchemaBuilder() {
public static class TableSchemaBuilder {
private TableId tableId;
- private Map columnMap;
- private Map orderColumnMap;
- private List columns;
+ private Map> columnMap;
+ private Map> orderColumnMap;
+ private List> columns;
private PrimaryKey primaryKey;
private List uniqueKeys;
private String comment;
@@ -104,12 +104,12 @@ public TableSchemaBuilder withTableId(TableId tableId) {
return this;
}
- public TableSchemaBuilder withColumns(Map columnMap) {
+ public TableSchemaBuilder withColumns(Map> columnMap) {
this.columnMap = columnMap;
return this;
}
- public TableSchemaBuilder withColumns(List columns) {
+ public TableSchemaBuilder withColumns(List> columns) {
this.columns = columns;
return this;
}
diff --git a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/table/catalog/UniqueKey.java b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/table/catalog/UniqueKey.java
index f44ed561f4..4677d43c62 100644
--- a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/table/catalog/UniqueKey.java
+++ b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/table/catalog/UniqueKey.java
@@ -17,7 +17,6 @@
package org.apache.eventmesh.connector.jdbc.table.catalog;
-import java.io.Serializable;
import java.util.List;
import lombok.Getter;
@@ -29,44 +28,26 @@
*/
@Setter
@Getter
-public class UniqueKey implements Serializable {
+public class UniqueKey extends Index {
- // The name of the unique key, if specified.
- private String name;
-
- // The list of column names that make up the unique/primary key.
- private List columnNames;
-
- // An optional comment or description for the unique/primary key.
- private String comment;
+ private static final String INDEX_TYPE = "UNIQUE";
public UniqueKey() {
}
public UniqueKey(String name, List columnNames, String comment) {
- this.name = name;
- this.columnNames = columnNames;
- this.comment = comment;
+ super(name, columnNames, INDEX_TYPE, null, comment);
}
public UniqueKey(String name, List columnNames) {
- this.name = name;
- this.columnNames = columnNames;
+ super(name, columnNames);
}
public UniqueKey(List columnNames) {
- this.columnNames = columnNames;
+ super(columnNames);
}
public UniqueKey copy() {
- return new UniqueKey(name, columnNames, comment);
- }
-
- public void addColumnNames(String... columnNames) {
- if (columnNames != null && columnNames.length > 0) {
- for (String columnName : columnNames) {
- this.columnNames.add(columnName);
- }
- }
+ return new UniqueKey(getName(), getColumnNames(), getComment());
}
}
diff --git a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/table/catalog/mysql/MysqlColumn.java b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/table/catalog/mysql/MysqlColumn.java
index c0440a4d14..424f70f3a6 100644
--- a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/table/catalog/mysql/MysqlColumn.java
+++ b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/table/catalog/mysql/MysqlColumn.java
@@ -18,36 +18,34 @@
package org.apache.eventmesh.connector.jdbc.table.catalog.mysql;
import org.apache.eventmesh.connector.jdbc.table.catalog.Column;
+import org.apache.eventmesh.connector.jdbc.table.catalog.Options;
import org.apache.eventmesh.connector.jdbc.table.type.EventMeshDataType;
import java.sql.JDBCType;
+import java.util.List;
+
+import lombok.Data;
+import lombok.EqualsAndHashCode;
/**
* Represents a MySQL column in a database table.
*/
+@Data
+@EqualsAndHashCode(callSuper = true)
public class MysqlColumn extends Column {
- private boolean autoIncremented;
-
- private boolean generated;
-
- private String collationName;
-
- public MysqlColumn(String name, EventMeshDataType> dataType, JDBCType jdbcType, Integer columnLength, Integer decimal, boolean notNull,
- String comment, Object defaultValue, String defaultValueExpression, boolean autoIncremented, boolean generated, String collationName) {
- super(name, dataType, jdbcType, columnLength, decimal, notNull, comment, defaultValue, defaultValueExpression, 0);
- this.autoIncremented = autoIncremented;
- this.generated = generated;
- this.collationName = collationName;
+ public MysqlColumn(String name, EventMeshDataType dataType, JDBCType jdbcType, Long columnLength, Integer decimal, boolean notNull,
+ String comment, Object defaultValue, String defaultValueExpression, boolean autoIncremented, boolean generated, String collationName,
+ String charsetName, List enumValues, String nativeType, Options options) {
+ super(name, dataType, jdbcType, columnLength, decimal, notNull, comment, defaultValue, defaultValueExpression, 0, charsetName,
+ autoIncremented, generated, collationName, enumValues, nativeType, options);
}
- public MysqlColumn(String name, EventMeshDataType> dataType, JDBCType jdbcType, Integer columnLength, Integer decimal, boolean notNull,
+ public MysqlColumn(String name, EventMeshDataType dataType, JDBCType jdbcType, Long columnLength, Integer decimal, boolean notNull,
String comment, Object defaultValue, String defaultValueExpression, boolean autoIncremented, boolean generated, String collationName,
- int order) {
- super(name, dataType, jdbcType, columnLength, decimal, notNull, comment, defaultValue, defaultValueExpression, order);
- this.autoIncremented = autoIncremented;
- this.generated = generated;
- this.collationName = collationName;
+ int order, String charsetName, List enumValues, String nativeType, Options options) {
+ super(name, dataType, jdbcType, columnLength, decimal, notNull, comment, defaultValue, defaultValueExpression, order, charsetName,
+ autoIncremented, generated, collationName, enumValues, nativeType, options);
}
public MysqlColumn() {
@@ -55,17 +53,19 @@ public MysqlColumn() {
}
public static MysqlColumn of(
- String name, EventMeshDataType> dataType, JDBCType jdbcType, Integer columnLength, Integer decimal, boolean notNull,
- String comment, Object defaultValue, String defaultValueExpression, boolean autoIncremented, boolean generated, String collationName) {
+ String name, EventMeshDataType dataType, JDBCType jdbcType, Long columnLength, Integer decimal, boolean notNull,
+ String comment, Object defaultValue, String defaultValueExpression, boolean autoIncremented, boolean generated, String collationName,
+ String charsetName, List enumValues, String nativeType, Options options) {
return new MysqlColumn(name, dataType, jdbcType, columnLength, decimal, notNull, comment, defaultValue, defaultValueExpression,
- autoIncremented, generated, collationName);
+ autoIncremented, generated, collationName, charsetName, enumValues, nativeType, options);
}
public static MysqlColumn of(
- String name, EventMeshDataType> dataType, JDBCType jdbcType, Integer columnLength, Integer decimal, boolean notNull, String comment,
- Object defaultValue, String defaultValueExpression, boolean autoIncremented, boolean generated, String collationName, int order) {
+ String name, EventMeshDataType dataType, JDBCType jdbcType, Long columnLength, Integer decimal, boolean notNull, String comment,
+ Object defaultValue, String defaultValueExpression, boolean autoIncremented, boolean generated, String collationName, int order,
+ String charsetName, List enumValues, String nativeType, Options options) {
return new MysqlColumn(name, dataType, jdbcType, columnLength, decimal, notNull, comment, defaultValue, defaultValueExpression,
- autoIncremented, generated, collationName, order);
+ autoIncremented, generated, collationName, order, charsetName, enumValues, nativeType, options);
}
/**
diff --git a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/table/catalog/mysql/MysqlColumnEditor.java b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/table/catalog/mysql/MysqlColumnEditor.java
index cea5ea9ceb..470e31affd 100644
--- a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/table/catalog/mysql/MysqlColumnEditor.java
+++ b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/table/catalog/mysql/MysqlColumnEditor.java
@@ -65,6 +65,6 @@ static MysqlColumnEditor ofEditor() {
* @param collationName The name of the collation to set.
* @return The column editor with the collation set.
*/
- MysqlColumnEditor collate(String collationName);
+ MysqlColumnEditor collation(String collationName);
}
diff --git a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/table/catalog/mysql/MysqlColumnEditorImpl.java b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/table/catalog/mysql/MysqlColumnEditorImpl.java
index c7b7648af2..9f6ac950d0 100644
--- a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/table/catalog/mysql/MysqlColumnEditorImpl.java
+++ b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/table/catalog/mysql/MysqlColumnEditorImpl.java
@@ -17,7 +17,12 @@
package org.apache.eventmesh.connector.jdbc.table.catalog.mysql;
+import org.apache.eventmesh.connector.jdbc.source.dialect.mysql.MysqlDataTypeConvertor;
import org.apache.eventmesh.connector.jdbc.table.catalog.AbstractColumnEditorImpl;
+import org.apache.eventmesh.connector.jdbc.table.type.EventMeshDataType;
+
+import java.util.HashMap;
+import java.util.Map;
public class MysqlColumnEditorImpl extends AbstractColumnEditorImpl implements MysqlColumnEditor {
@@ -27,6 +32,8 @@ public class MysqlColumnEditorImpl extends AbstractColumnEditorImpl dataTypeProperties = new HashMap<>();
+ if (ofColumnLength() != null) {
+ dataTypeProperties.put(MysqlDataTypeConvertor.PRECISION, ofColumnLength().intValue());
+ }
+ dataTypeProperties.put(MysqlDataTypeConvertor.SCALE, ofScale());
+ EventMeshDataType> eventMeshType = convertor.toEventMeshType(ofJdbcType(), dataTypeProperties);
+ withEventMeshType(eventMeshType);
+
return MysqlColumn.of(ofName(), ofEventMeshDataType(), ofJdbcType(), ofColumnLength(), ofScale(), isNotNull(), ofComment(), ofDefaultValue(),
- ofDefaultValueExpression(), autoIncremented, generated, collationName, ofOrder());
+ ofDefaultValueExpression(), autoIncremented, generated, collationName, ofOrder(), ofCharsetName(), ofEnumValues(), ofTypeName(),
+ ofOptions());
}
}
diff --git a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/table/catalog/mysql/MysqlDefaultValueConvertorImpl.java b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/table/catalog/mysql/MysqlDefaultValueConvertorImpl.java
new file mode 100644
index 0000000000..699f4fd4f5
--- /dev/null
+++ b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/table/catalog/mysql/MysqlDefaultValueConvertorImpl.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.jdbc.table.catalog.mysql;
+
+import org.apache.eventmesh.common.Constants;
+import org.apache.eventmesh.connector.jdbc.table.catalog.Column;
+import org.apache.eventmesh.connector.jdbc.table.catalog.DefaultValueConvertor;
+import org.apache.eventmesh.connector.jdbc.utils.ByteArrayUtils;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.math.BigDecimal;
+import java.math.RoundingMode;
+import java.sql.JDBCType;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.format.DateTimeFormatter;
+import java.time.format.DateTimeFormatterBuilder;
+import java.time.temporal.ChronoField;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class MysqlDefaultValueConvertorImpl implements DefaultValueConvertor {
+
+ private static final String EPOCH_DATE = "1970-01-01";
+
+ // Time The range is '-838:59:59.000000' to '838:59:59.000000'
+ private static final Pattern TIME_PATTERN = Pattern.compile("(\\-?[0-9]*):([0-9]*)(:([0-9]*))?(\\.([0-9]*))?");
+
+ private static final Set NUMBER_DATA_TYPES = Collections.unmodifiableSet(new HashSet<>(
+ Arrays.asList(JDBCType.TINYINT, JDBCType.INTEGER, JDBCType.DATE, JDBCType.TIMESTAMP, JDBCType.TIMESTAMP_WITH_TIMEZONE, JDBCType.TIME,
+ JDBCType.BOOLEAN, JDBCType.BIT, JDBCType.NUMERIC, JDBCType.DECIMAL, JDBCType.FLOAT, JDBCType.DOUBLE, JDBCType.REAL)));
+
+ private static final Set BINARY_DATA_TYPES = Collections.unmodifiableSet(new HashSet<>(
+ Arrays.asList(JDBCType.BINARY, JDBCType.VARBINARY)));
+
+ @Override
+ public Object parseDefaultValue(Column> column, String defaultValueExpression) {
+ if (null == defaultValueExpression) {
+ return null;
+ }
+ defaultValueExpression = defaultValueExpression.trim();
+
+ if (NUMBER_DATA_TYPES.contains(column.getJdbcType()) && StringUtils.equalsAnyIgnoreCase(defaultValueExpression, Boolean.TRUE.toString(),
+ Boolean.FALSE.toString())) {
+ /*
+ * These types are synonyms for DECIMAL: DEC[(M[,D])] [UNSIGNED] [ZEROFILL], NUMERIC[(M[,D])] [UNSIGNED] [ZEROFILL], FIXED[(M[,D])]
+ * [UNSIGNED] [ZEROFILL]
+ */
+ if (column.getJdbcType() == JDBCType.DECIMAL || column.getJdbcType() == JDBCType.NUMERIC) {
+ return convert2Decimal(column, defaultValueExpression);
+ }
+ return StringUtils.equalsIgnoreCase(Boolean.TRUE.toString(), defaultValueExpression) ? 1 : 0;
+ }
+
+ if (BINARY_DATA_TYPES.contains(column.getJdbcType()) && column.getDefaultValueExpression() != null) {
+ // https://dev.mysql.com/doc/refman/8.0/en/binary-varbinary.html
+ String cleanedDefaultValueExpression = StringUtils.replace(column.getDefaultValueExpression(), "\\0", "");
+ return ByteArrayUtils.bytesToHexString(cleanedDefaultValueExpression.getBytes(Constants.DEFAULT_CHARSET));
+ }
+
+ switch (column.getDataType().getSQLType()) {
+ case DATE:
+ return convert2LocalDate(column, defaultValueExpression);
+ case TIMESTAMP:
+ return convertToLocalDateTime(column, defaultValueExpression);
+ case TIMESTAMP_WITH_TIMEZONE:
+ return convertToTimestamp(column, defaultValueExpression);
+ case TIME:
+ return convertToLocalTime(column, defaultValueExpression);
+ case BOOLEAN:
+ return convert2Boolean(column, defaultValueExpression);
+ case BIT:
+ return convertToBits(column, defaultValueExpression);
+
+ case NUMERIC:
+ case DECIMAL:
+ return convert2Decimal(column, defaultValueExpression);
+
+ case FLOAT:
+ case DOUBLE:
+ case REAL:
+ return Double.parseDouble(defaultValueExpression);
+ default:
+ }
+ return defaultValueExpression;
+ }
+
+ private Object convert2Boolean(Column> column, String value) {
+ // value maybe is numeric or string
+ if (StringUtils.isNumeric(value)) {
+ return Integer.parseInt(value) != 0;
+ }
+ return Boolean.parseBoolean(value);
+ }
+
+ private Object convert2Decimal(Column> column, String value) {
+ return Optional.ofNullable(column.getDecimal()).isPresent() ? new BigDecimal(value).setScale(column.getDecimal(), RoundingMode.HALF_UP)
+ : new BigDecimal(value);
+ }
+
+ private Object convertToBits(Column> column, String value) {
+ // value: '101010111'
+ if (column.getColumnLength() > 1) {
+ int nums = value.length() / Byte.SIZE + (value.length() % Byte.SIZE == 0 ? 0 : 1);
+ byte[] bytes = new byte[nums];
+ int length = value.length();
+ for (int i = 0; i < nums; i++) {
+ int size = value.length() - Byte.SIZE < 0 ? 0 : value.length() - Byte.SIZE;
+ bytes[nums - i - 1] = (byte) Integer.parseInt(value.substring(size, length), 2);
+ value = value.substring(0, size);
+ }
+ return bytes;
+ }
+
+ // value: '1' or '0' parse to boolean
+ return Short.parseShort(value) != 0;
+ }
+
+ private Object convertToLocalTime(Column> column, String value) {
+
+ Matcher matcher = TIME_PATTERN.matcher(value);
+ if (!matcher.matches()) {
+ throw new IllegalArgumentException("Unexpected format for TIME column: " + value);
+ }
+
+ final int hours = Integer.parseInt(matcher.group(1));
+ final int minutes = Integer.parseInt(matcher.group(2));
+ final String secondsGroup = matcher.group(4);
+ int seconds = 0;
+ int nanoSeconds = 0;
+
+ if (secondsGroup != null) {
+ seconds = Integer.parseInt(secondsGroup);
+ String microSecondsString = matcher.group(6);
+ if (microSecondsString != null) {
+ nanoSeconds = Integer.parseInt(microSecondsString) * 1000;
+ }
+ }
+ return LocalTime.of(hours, minutes, seconds, nanoSeconds);
+ }
+
+ private Object convertToTimestamp(Column> column, String value) {
+ // Mysql not support
+ return null;
+ }
+
+ private Object convertToLocalDateTime(Column> column, String value) {
+ if (StringUtils.containsAny(value, "CURRENT_TIMESTAMP", "current_timestamp")) {
+ return value;
+ }
+ // The TIMESTAMP data type is used for values that contain both date and time parts.
+ // TIMESTAMP has a range of '1970-01-01 00:00:01' UTC to '2038-01-19 03:14:07' UTC.
+ return LocalDateTime.from(timestampFormat(Optional.ofNullable(column.getColumnLength()).orElse(0L).intValue()).parse(value));
+ }
+
+ private Object convert2LocalDate(Column> column, String value) {
+ // The DATE type is used for values with a date part but no time part.
+ // MySQL retrieves and displays DATE values in 'YYYY-MM-DD' format.
+ // The supported range is '1000-01-01' to '9999-12-31'.
+
+ try {
+ if (StringUtils.contains(value, "-")) {
+ return LocalDate.parse(value);
+ }
+ // maybe is year, e.g. 2020
+ if (StringUtils.isNumeric(value)) {
+ return LocalDate.parse(value + "-01-01");
+ }
+ // format: 20200101
+ return LocalDate.from(dateFormat().parse(value));
+ } catch (Exception e) {
+ log.warn("Convert date error[value={}]", value);
+ return LocalDate.parse(EPOCH_DATE);
+ }
+ }
+
+ private DateTimeFormatter timestampFormat(int length) {
+ final DateTimeFormatterBuilder dtf = new DateTimeFormatterBuilder().appendPattern("yyyy-MM-dd").optionalStart().appendLiteral(" ")
+ .append(DateTimeFormatter.ISO_LOCAL_TIME).optionalEnd().parseDefaulting(ChronoField.HOUR_OF_DAY, 0)
+ .parseDefaulting(ChronoField.MINUTE_OF_HOUR, 0).parseDefaulting(ChronoField.SECOND_OF_MINUTE, 0);
+ if (length > 0) {
+ dtf.appendFraction(ChronoField.MICRO_OF_SECOND, 0, length, true);
+ }
+ return dtf.toFormatter();
+ }
+
+ private DateTimeFormatter dateFormat() {
+ final DateTimeFormatterBuilder dtf = new DateTimeFormatterBuilder().appendValue(ChronoField.YEAR, 4).appendValue(ChronoField.MONTH_OF_YEAR, 2)
+ .optionalStart().appendValue(ChronoField.DAY_OF_YEAR, 2);
+ return dtf.toFormatter();
+ }
+
+}
diff --git a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/table/catalog/mysql/MysqlOptions.java b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/table/catalog/mysql/MysqlOptions.java
new file mode 100644
index 0000000000..dc82e6ee51
--- /dev/null
+++ b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/table/catalog/mysql/MysqlOptions.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.jdbc.table.catalog.mysql;
+
+/**
+ * The MysqlOptions class provides the constants for configuring options in MySQL tables and columns.
+ */
+public class MysqlOptions {
+
+ public static final class MysqlTableOptions {
+
+ public static String ENGINE = "ENGINE";
+
+ public static String AUTO_INCREMENT = "AUTO_INCREMENT";
+
+ public static String CHARSET = "CHARSET";
+
+ public static String COLLATE = "COLLATE";
+
+ public static String COMMENT = "COMMENT";
+ }
+
+ public static final class MysqlColumnOptions {
+
+ public static String SIGNED = "SIGNED";
+
+ public static String UNSIGNED = "UNSIGNED";
+
+ public static String ZEROFILL = "ZEROFILL";
+ }
+
+}
diff --git a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/table/catalog/mysql/MysqlTableSchema.java b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/table/catalog/mysql/MysqlTableSchema.java
index c26ca5c3ce..4cddc03a65 100644
--- a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/table/catalog/mysql/MysqlTableSchema.java
+++ b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/table/catalog/mysql/MysqlTableSchema.java
@@ -26,8 +26,11 @@
import java.util.List;
import java.util.Map;
+import lombok.Getter;
+
public class MysqlTableSchema extends TableSchema {
+ @Getter
private Options tableOptions;
public MysqlTableSchema(TableId tableId, Map columnMap, List columns, Map orderColumnMap,
diff --git a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/table/type/CalendarType.java b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/table/type/CalendarType.java
deleted file mode 100644
index 788b85ecd7..0000000000
--- a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/table/type/CalendarType.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eventmesh.connector.jdbc.table.type;
-
-import java.time.LocalDate;
-import java.time.LocalDateTime;
-import java.time.LocalTime;
-import java.time.temporal.Temporal;
-import java.util.Objects;
-
-public class CalendarType implements EventMeshDataType {
-
- // Constants for LocalDate, LocalTime, and LocalDateTime types
- public static final CalendarType LOCAL_DATE_TYPE = new CalendarType<>(LocalDate.class, SQLType.DATE);
-
- public static final CalendarType LOCAL_TIME_TYPE = new CalendarType<>(LocalTime.class, SQLType.TIME);
-
- public static final CalendarType LOCAL_DATE_TIME_TYPE = new CalendarType<>(LocalDateTime.class, SQLType.TIMESTAMP);
-
- private final Class typeClass;
- private final SQLType sqlType;
-
- private CalendarType(Class typeClass, SQLType sqlType) {
- this.typeClass = typeClass;
- this.sqlType = sqlType;
- }
-
- /**
- * Returns the type class of the data.
- *
- * @return the type class of the data.
- */
- @Override
- public Class getTypeClass() {
- return typeClass;
- }
-
- /**
- * Returns the SQL type of the data.
- *
- * @return the SQL type of the data.
- */
- @Override
- public SQLType getSQLType() {
- return sqlType;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (!(o instanceof CalendarType)) {
- return false;
- }
- CalendarType> that = (CalendarType>) o;
- return Objects.equals(getTypeClass(), that.getTypeClass()) && sqlType == that.sqlType;
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(getTypeClass(), sqlType);
- }
-
- @Override
- public String toString() {
- return typeClass.getName();
- }
-}
diff --git a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/table/type/EventMeshDataType.java b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/table/type/EventMeshDataType.java
index 07da72bea7..110ef499df 100644
--- a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/table/type/EventMeshDataType.java
+++ b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/table/type/EventMeshDataType.java
@@ -17,22 +17,26 @@
package org.apache.eventmesh.connector.jdbc.table.type;
+import org.apache.eventmesh.connector.jdbc.type.Type;
+
/**
- * Defines Event Mesh data type with methods to get the type class and SQL type of the data.
+ * An interface representing a data type used in an EventMesh.
+ *
+ * @param The type of the data.
*/
-public interface EventMeshDataType {
+public interface EventMeshDataType extends Type {
/**
- * Gets the type class of the data.
+ * Gets the class representing the type of the data.
*
- * @return the type class of the data.
+ * @return The class representing the type of the data.
*/
Class getTypeClass();
/**
* Gets the SQL type of the data.
*
- * @return the SQL type of the data.
+ * @return The SQL type of the data.
*/
SQLType getSQLType();
@@ -41,7 +45,5 @@ public interface EventMeshDataType {
*
* @return The name of the data type.
*/
- default String getName() {
- return EventMeshTypeNameConverter.ofName(this);
- }
+ String getName();
}
diff --git a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/table/type/EventMeshTypeNameConverter.java b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/table/type/EventMeshTypeNameConverter.java
index 181e735a91..f78386d37e 100644
--- a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/table/type/EventMeshTypeNameConverter.java
+++ b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/table/type/EventMeshTypeNameConverter.java
@@ -17,45 +17,45 @@
package org.apache.eventmesh.connector.jdbc.table.type;
+import org.apache.eventmesh.connector.jdbc.type.eventmesh.BooleanEventMeshDataType;
+import org.apache.eventmesh.connector.jdbc.type.eventmesh.BytesEventMeshDataType;
+import org.apache.eventmesh.connector.jdbc.type.eventmesh.DateEventMeshDataType;
+import org.apache.eventmesh.connector.jdbc.type.eventmesh.DateTimeEventMeshDataType;
+import org.apache.eventmesh.connector.jdbc.type.eventmesh.DecimalEventMeshDataType;
+import org.apache.eventmesh.connector.jdbc.type.eventmesh.Float32EventMeshDataType;
+import org.apache.eventmesh.connector.jdbc.type.eventmesh.Float64EventMeshDataType;
+import org.apache.eventmesh.connector.jdbc.type.eventmesh.Int16EventMeshDataType;
+import org.apache.eventmesh.connector.jdbc.type.eventmesh.Int32EventMeshDataType;
+import org.apache.eventmesh.connector.jdbc.type.eventmesh.Int64EventMeshDataType;
+import org.apache.eventmesh.connector.jdbc.type.eventmesh.Int8EventMeshDataType;
+import org.apache.eventmesh.connector.jdbc.type.eventmesh.StringEventMeshDataType;
+import org.apache.eventmesh.connector.jdbc.type.eventmesh.TimeEventMeshDataType;
+
import java.util.HashMap;
import java.util.Map;
public final class EventMeshTypeNameConverter {
- private static Map, String> PRIMITIVE_TYPE_MAP = new HashMap<>(32);
+ private static Map PRIMITIVE_TYPE_MAP = new HashMap<>(32);
static {
- PRIMITIVE_TYPE_MAP.put(PrimitiveType.STRING_TYPE, "string");
- PRIMITIVE_TYPE_MAP.put(PrimitiveType.BOOLEAN_TYPE, "bool");
- PRIMITIVE_TYPE_MAP.put(PrimitiveType.BYTE_TYPE, "byte");
- PRIMITIVE_TYPE_MAP.put(PrimitiveType.SHORT_TYPE, "short");
- PRIMITIVE_TYPE_MAP.put(PrimitiveType.INT_TYPE, "int");
- PRIMITIVE_TYPE_MAP.put(PrimitiveType.LONG_TYPE, "long");
- PRIMITIVE_TYPE_MAP.put(PrimitiveType.FLOAT_TYPE, "float");
- PRIMITIVE_TYPE_MAP.put(PrimitiveType.DOUBLE_TYPE, "double");
- PRIMITIVE_TYPE_MAP.put(PrimitiveType.VOID_TYPE, "void");
- PRIMITIVE_TYPE_MAP.put(CalendarType.LOCAL_DATE_TYPE, "LocalDate");
- PRIMITIVE_TYPE_MAP.put(CalendarType.LOCAL_TIME_TYPE, "LocalTime");
- PRIMITIVE_TYPE_MAP.put(CalendarType.LOCAL_DATE_TIME_TYPE, "LocalDateTime");
- PRIMITIVE_TYPE_MAP.put(PrimitiveByteArrayType.BYTES_TYPE, "bytes");
-
- PRIMITIVE_TYPE_MAP.put(PrimitiveArrayType.STRING_ARRAY_TYPE, "string-array");
- PRIMITIVE_TYPE_MAP.put(PrimitiveArrayType.BOOLEAN_ARRAY_TYPE, "bool-array");
- PRIMITIVE_TYPE_MAP.put(PrimitiveArrayType.BYTE_ARRAY_TYPE, "byte-array");
- PRIMITIVE_TYPE_MAP.put(PrimitiveArrayType.SHORT_ARRAY_TYPE, "short-array");
- PRIMITIVE_TYPE_MAP.put(PrimitiveArrayType.INT_ARRAY_TYPE, "int-array");
- PRIMITIVE_TYPE_MAP.put(PrimitiveArrayType.LONG_ARRAY_TYPE, "long-array");
- PRIMITIVE_TYPE_MAP.put(PrimitiveArrayType.FLOAT_ARRAY_TYPE, "float-array");
- PRIMITIVE_TYPE_MAP.put(PrimitiveArrayType.DOUBLE_ARRAY_TYPE, "double-array");
+ PRIMITIVE_TYPE_MAP.put(BooleanEventMeshDataType.INSTANCE.getName(), BooleanEventMeshDataType.INSTANCE);
+ PRIMITIVE_TYPE_MAP.put(Float32EventMeshDataType.INSTANCE.getName(), Float32EventMeshDataType.INSTANCE);
+ PRIMITIVE_TYPE_MAP.put(Float64EventMeshDataType.INSTANCE.getName(), Float64EventMeshDataType.INSTANCE);
+ PRIMITIVE_TYPE_MAP.put(Int8EventMeshDataType.INSTANCE.getName(), Int8EventMeshDataType.INSTANCE);
+ PRIMITIVE_TYPE_MAP.put(Int16EventMeshDataType.INSTANCE.getName(), Int16EventMeshDataType.INSTANCE);
+ PRIMITIVE_TYPE_MAP.put(Int32EventMeshDataType.INSTANCE.getName(), Int32EventMeshDataType.INSTANCE);
+ PRIMITIVE_TYPE_MAP.put(Int64EventMeshDataType.INSTANCE.getName(), Int64EventMeshDataType.INSTANCE);
+ PRIMITIVE_TYPE_MAP.put(StringEventMeshDataType.INSTANCE.getName(), StringEventMeshDataType.INSTANCE);
+ PRIMITIVE_TYPE_MAP.put(BytesEventMeshDataType.INSTANCE.getName(), BytesEventMeshDataType.INSTANCE);
+ PRIMITIVE_TYPE_MAP.put(DateEventMeshDataType.INSTANCE.getName(), DateEventMeshDataType.INSTANCE);
+ PRIMITIVE_TYPE_MAP.put(TimeEventMeshDataType.INSTANCE.getName(), TimeEventMeshDataType.INSTANCE);
+ PRIMITIVE_TYPE_MAP.put(DateTimeEventMeshDataType.INSTANCE.getName(), DateTimeEventMeshDataType.INSTANCE);
+ PRIMITIVE_TYPE_MAP.put(DecimalEventMeshDataType.INSTANCE.getName(), DecimalEventMeshDataType.INSTANCE);
}
- public static String ofName(EventMeshDataType> type) {
- String typeName = PRIMITIVE_TYPE_MAP.get(type);
- if (typeName == null && (type instanceof DecimalType)) {
- DecimalType decimalType = (DecimalType) type;
- return String.format("decimal(%s,%s)", decimalType.getScale(), decimalType.getPrecision());
- }
- return typeName;
+ public static EventMeshDataType ofEventMeshDataType(String dataType) {
+ return PRIMITIVE_TYPE_MAP.get(dataType);
}
}
diff --git a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/table/type/MapType.java b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/table/type/MapType.java
index 5e33d818bf..f8a5fb530f 100644
--- a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/table/type/MapType.java
+++ b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/table/type/MapType.java
@@ -19,10 +19,9 @@
import java.util.Arrays;
import java.util.List;
-import java.util.Map;
import java.util.Objects;
-public class MapType implements EventMeshDataType