diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseMySqlCatalog.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseMySqlCatalog.java
index 86410b573fe..3fd7735907f 100644
--- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseMySqlCatalog.java
+++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseMySqlCatalog.java
@@ -147,6 +147,11 @@ protected Column buildColumn(ResultSet resultSet) throws SQLException {
String comment = resultSet.getString("COLUMN_COMMENT");
Object defaultValue = resultSet.getObject("COLUMN_DEFAULT");
String isNullableStr = resultSet.getString("IS_NULLABLE");
+
+ if (dataType.toUpperCase().startsWith("VECTOR")) {
+ dataType = "VECTOR";
+ }
+
boolean isNullable = isNullableStr.equals("YES");
// e.g. `decimal(10, 2)` is 10
long numberPrecision = resultSet.getInt("NUMERIC_PRECISION");
diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseMysqlCreateTableSqlBuilder.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseMysqlCreateTableSqlBuilder.java
index 9707ff23acc..1100b30d8fb 100644
--- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseMysqlCreateTableSqlBuilder.java
+++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseMysqlCreateTableSqlBuilder.java
@@ -265,6 +265,12 @@ private String buildConstraintKeySql(
keyName = "FOREIGN KEY";
// todo:
break;
+ case VECTOR_INDEX_KEY:
+ keyName = "VECTOR INDEX";
+ return String.format(
+ "%s `%s` (%s)",
+ keyName, constraintKey.getConstraintName(), indexColumns)
+ + " WITH (distance=L2, type=hnsw)";
default:
throw new UnsupportedOperationException(
"Unsupported constraint type: " + constraintType);
diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseMySqlTypeConverter.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseMySqlTypeConverter.java
index f7790ff178c..f1357018791 100644
--- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseMySqlTypeConverter.java
+++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseMySqlTypeConverter.java
@@ -102,7 +102,6 @@ public class OceanBaseMySqlTypeConverter
public static final long POWER_2_32 = (long) Math.pow(2, 32);
public static final long MAX_VARBINARY_LENGTH = POWER_2_16 - 4;
- private static final String VECTOR_TYPE_NAME = "";
private static final String VECTOR_NAME = "VECTOR";
public static final OceanBaseMySqlTypeConverter INSTANCE = new OceanBaseMySqlTypeConverter();
@@ -296,9 +295,9 @@ public Column convert(BasicTypeDefine typeDefine) {
builder.dataType(LocalTimeType.LOCAL_DATE_TIME_TYPE);
builder.scale(typeDefine.getScale());
break;
- case VECTOR_TYPE_NAME:
- String columnType = typeDefine.getColumnType();
- if (columnType.startsWith("vector(") && columnType.endsWith(")")) {
+ case VECTOR_NAME:
+ String columnType = typeDefine.getColumnType().toUpperCase();
+ if (columnType.startsWith("VECTOR(") && columnType.endsWith(")")) {
Integer number =
Integer.parseInt(
columnType.substring(
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/pom.xml
index 7a0b89ce13d..4363f49848e 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/pom.xml
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/pom.xml
@@ -69,7 +69,14 @@
mysql
mysql-connector-java
test
+
+
+ com.google.protobuf
+ protobuf-java
+
+
+
com.dameng
DmJdbcDriver18
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseMilvusIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseMilvusIT.java
index e91eaed2de1..58380bc6e76 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseMilvusIT.java
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseMilvusIT.java
@@ -66,6 +66,7 @@
import java.io.IOException;
import java.sql.Connection;
import java.sql.Driver;
+import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.Duration;
@@ -90,7 +91,7 @@
disabledReason = "Currently SPARK and FLINK not support adapt")
public class JdbcOceanBaseMilvusIT extends TestSuiteBase implements TestResource {
- private static final String IMAGE = "oceanbase/oceanbase-ce:vector";
+ private static final String IMAGE = "oceanbase/oceanbase-ce:4.3.5.1-101000042025031818";
private static final String HOSTNAME = "e2e_oceanbase_vector";
private static final int PORT = 2881;
@@ -145,7 +146,7 @@ public void startUp() throws Exception {
.await()
.atMost(360, TimeUnit.SECONDS)
.untilAsserted(() -> this.initializeJdbcConnection(jdbcCase.getJdbcUrl()));
-
+ setObVectorMemory();
createSchemaIfNeeded();
createNeededTables();
this.container =
@@ -275,6 +276,20 @@ public void testMilvusToOceanBase(TestContainer container) throws Exception {
}
}
+ @TestTemplate
+ public void testMilvusToOceanBaseNotTable(TestContainer container) throws Exception {
+ try {
+ dropOceanBaseTable();
+ checkTableNotExist();
+ Container.ExecResult execResult =
+ container.executeJob("/jdbc_milvus_source_and_oceanbase_sink.conf");
+ Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());
+ checkCreateTableSql();
+ } finally {
+ clearTable(jdbcCase.getDatabase(), jdbcCase.getSchema(), jdbcCase.getSinkTable());
+ }
+ }
+
@TestTemplate
public void testFakeToOceanBase(TestContainer container)
throws IOException, InterruptedException {
@@ -407,6 +422,12 @@ private void initializeJdbcConnection(String jdbcUrl)
connection.setAutoCommit(false);
}
+ /** This parameter is required for OceanBase 4.3.x to enable vector indexing */
+ public void setObVectorMemory() {
+ String sql = "ALTER SYSTEM SET ob_vector_memory_limit_percentage = 30";
+ executeSql(sql);
+ }
+
private Class> loadDriverClass() {
try {
return Class.forName(jdbcCase.getDriverClass());
@@ -418,13 +439,17 @@ private Class> loadDriverClass() {
private void createSchemaIfNeeded() {
String sql = "CREATE DATABASE IF NOT EXISTS " + OCEANBASE_DATABASE;
+ executeSql(sql);
+ }
+
+ private void executeSql(String sql) {
try {
connection.prepareStatement(sql).executeUpdate();
} catch (Exception e) {
throw new SeaTunnelRuntimeException(
JdbcITErrorCode.CREATE_TABLE_FAILED, "Fail to execute sql " + sql, e);
}
- log.info("oceanbase schema created,sql is" + sql);
+ log.info("oceanbase execute sql,sql is:{}", sql);
}
String createSqlTemplate() {
@@ -466,7 +491,7 @@ private void createNeededTables() {
jdbcCase.getSchema(),
jdbcCase.getSinkTable()));
statement.execute(createSink);
- log.info("oceanbase table created,sql is" + createSink);
+ log.info("oceanbase table created,sql is:{}", createSink);
}
connection.commit();
@@ -493,6 +518,65 @@ public String buildTableInfoWithSchema(String schema, String table) {
}
}
+ private void dropOceanBaseTable() {
+ String sql =
+ String.format("drop table IF EXISTS %s.%s", OCEANBASE_DATABASE, OCEANBASE_SINK);
+ executeSql(sql);
+ }
+
+ private void checkTableNotExist() {
+ String sql =
+ String.format(
+ "SELECT COUNT(*) FROM information_schema.tables WHERE table_schema = '%s' AND table_name = '%s'",
+ OCEANBASE_DATABASE, OCEANBASE_SINK);
+
+ boolean isExist = false;
+ try (Statement statement = connection.createStatement();
+ ResultSet resultSet = statement.executeQuery(sql)) {
+
+ if (resultSet.next()) {
+ isExist = resultSet.getInt(1) > 0;
+ }
+ } catch (Exception e) {
+ throw new SeaTunnelRuntimeException(
+ JdbcITErrorCode.CREATE_TABLE_FAILED, "Fail to execute sql: " + sql, e);
+ }
+ Assertions.assertFalse(isExist);
+ }
+
+ private void checkCreateTableSql() {
+ String sql = String.format("SHOW CREATE TABLE %s.%s;", OCEANBASE_DATABASE, OCEANBASE_SINK);
+ String createTableSql = "";
+ try (Statement statement = connection.createStatement();
+ ResultSet resultSet = statement.executeQuery(sql)) {
+
+ if (resultSet.next()) {
+ createTableSql = resultSet.getString(2);
+ }
+ } catch (Exception e) {
+ throw new SeaTunnelRuntimeException(
+ JdbcITErrorCode.CREATE_TABLE_FAILED, "Fail to execute sql: " + sql, e);
+ }
+ // Removed the column store compression configuration that is automatically set by oceanbase
+ String startToken = "VECTOR KEY `vector_index` (`book_intro`) WITH (DISTANCE=L2, TYPE=HNSW";
+ int startIndex = createTableSql.indexOf(startToken);
+
+ if (startIndex != -1) {
+ String part1 = createTableSql.substring(0, startIndex + startToken.length());
+ createTableSql = part1 + "));";
+ }
+ Assertions.assertEquals(expectationSql(), createTableSql);
+ }
+
+ private String expectationSql() {
+ return "CREATE TABLE `simple_example` (\n"
+ + " `book_id` bigint(20) NOT NULL,\n"
+ + " `book_intro` VECTOR(4) NOT NULL,\n"
+ + " `book_title` text NOT NULL,\n"
+ + " PRIMARY KEY (`book_id`),\n"
+ + " VECTOR KEY `vector_index` (`book_intro`) WITH (DISTANCE=L2, TYPE=HNSW));";
+ }
+
private String[] getFieldNames() {
return new String[] {
"book_id", "book_intro", "book_title",