Skip to content

Commit 80f392a

Browse files
authored
[feature][connector-v2] add xugudb connector (#6561)
1 parent fe33422 commit 80f392a

File tree

19 files changed

+2241
-1
lines changed

19 files changed

+2241
-1
lines changed

Diff for: .github/workflows/backend.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -997,7 +997,7 @@ jobs:
997997
java-version: ${{ matrix.java }}
998998
distribution: 'temurin'
999999
cache: 'maven'
1000-
- name: run jdbc connectors integration test (part-6)
1000+
- name: run jdbc connectors integration test (part-7)
10011001
if: needs.changes.outputs.api == 'true'
10021002
run: |
10031003
./mvnw -B -T 1 verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl :connector-jdbc-e2e-part-7 -am -Pci

Diff for: docs/en/connector-v2/sink/Jdbc.md

+1
Original file line numberDiff line numberDiff line change
@@ -235,6 +235,7 @@ there are some reference value for params above.
235235
| Vertica | com.vertica.jdbc.Driver | jdbc:vertica://localhost:5433 | / | https://repo1.maven.org/maven2/com/vertica/jdbc/vertica-jdbc/12.0.3-0/vertica-jdbc-12.0.3-0.jar |
236236
| Kingbase | com.kingbase8.Driver | jdbc:kingbase8://localhost:54321/db_test | / | https://repo1.maven.org/maven2/cn/com/kingbase/kingbase8/8.6.0/kingbase8-8.6.0.jar |
237237
| OceanBase | com.oceanbase.jdbc.Driver | jdbc:oceanbase://localhost:2881 | / | https://repo1.maven.org/maven2/com/oceanbase/oceanbase-client/2.4.3/oceanbase-client-2.4.3.jar |
238+
| xugu | com.xugu.cloudjdbc.Driver | jdbc:xugu://localhost:5138 | / | https://repo1.maven.org/maven2/com/xugudb/xugu-jdbc/12.2.0/xugu-jdbc-12.2.0.jar |
238239

239240
## Example
240241

Diff for: docs/en/connector-v2/source/Jdbc.md

+1
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,7 @@ there are some reference value for params above.
227227
| Kingbase | com.kingbase8.Driver | jdbc:kingbase8://localhost:54321/db_test | https://repo1.maven.org/maven2/cn/com/kingbase/kingbase8/8.6.0/kingbase8-8.6.0.jar |
228228
| OceanBase | com.oceanbase.jdbc.Driver | jdbc:oceanbase://localhost:2881 | https://repo1.maven.org/maven2/com/oceanbase/oceanbase-client/2.4.3/oceanbase-client-2.4.3.jar |
229229
| Hive | org.apache.hive.jdbc.HiveDriver | jdbc:hive2://localhost:10000 | https://repo1.maven.org/maven2/org/apache/hive/hive-jdbc/3.1.3/hive-jdbc-3.1.3-standalone.jar |
230+
| xugu | com.xugu.cloudjdbc.Driver | jdbc:xugu://localhost:5138 | https://repo1.maven.org/maven2/com/xugudb/xugu-jdbc/12.2.0/xugu-jdbc-12.2.0.jar |
230231

231232
## Example
232233

Diff for: seatunnel-connectors-v2/connector-jdbc/pom.xml

+7
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
<kingbase8.version>8.6.0</kingbase8.version>
5151
<hive.jdbc.version>3.1.3</hive.jdbc.version>
5252
<oceanbase.jdbc.version>2.4.3</oceanbase.jdbc.version>
53+
<xugu.jdbc.version>12.2.0</xugu.jdbc.version>
5354
</properties>
5455

5556
<dependencyManagement>
@@ -188,6 +189,12 @@
188189
<version>${oceanbase.jdbc.version}</version>
189190
<scope>provided</scope>
190191
</dependency>
192+
<dependency>
193+
<groupId>com.xugudb</groupId>
194+
<artifactId>xugu-jdbc</artifactId>
195+
<version>${xugu.jdbc.version}</version>
196+
<scope>provided</scope>
197+
</dependency>
191198
</dependencies>
192199
</dependencyManagement>
193200

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,266 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.xugu;
19+
20+
import org.apache.seatunnel.api.table.catalog.CatalogTable;
21+
import org.apache.seatunnel.api.table.catalog.Column;
22+
import org.apache.seatunnel.api.table.catalog.ConstraintKey;
23+
import org.apache.seatunnel.api.table.catalog.TablePath;
24+
import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
25+
import org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
26+
import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
27+
import org.apache.seatunnel.common.utils.JdbcUrlUtil;
28+
import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog;
29+
import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.utils.CatalogUtils;
30+
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.xugu.XuguTypeConverter;
31+
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.xugu.XuguTypeMapper;
32+
33+
import org.apache.commons.lang3.StringUtils;
34+
35+
import lombok.extern.slf4j.Slf4j;
36+
37+
import java.sql.Connection;
38+
import java.sql.DatabaseMetaData;
39+
import java.sql.ResultSet;
40+
import java.sql.SQLException;
41+
import java.util.ArrayList;
42+
import java.util.Arrays;
43+
import java.util.Collections;
44+
import java.util.List;
45+
46+
@Slf4j
47+
public class XuguCatalog extends AbstractJdbcCatalog {
48+
49+
protected static List<String> EXCLUDED_SCHEMAS =
50+
Collections.unmodifiableList(Arrays.asList("GUEST", "SYSAUDITOR", "SYSSSO"));
51+
52+
private static final String SELECT_COLUMNS_SQL_TEMPLATE =
53+
"SELECT\n"
54+
+ " dc.COLUMN_NAME,\n"
55+
+ " CASE\n"
56+
+ " WHEN dc.TYPE_NAME LIKE 'INTERVAL%%' THEN 'INTERVAL' ELSE REGEXP_SUBSTR(dc.TYPE_NAME, '^[^(]+')\n"
57+
+ " END AS TYPE_NAME,\n"
58+
+ " dc.TYPE_NAME ||\n"
59+
+ " CASE\n"
60+
+ " WHEN dc.TYPE_NAME IN ('VARCHAR', 'CHAR') THEN '(' || dc.COLUMN_LENGTH || ')'\n"
61+
+ " WHEN dc.TYPE_NAME IN ('NUMERIC') AND dc.COLUMN_PRECISION IS NOT NULL AND dc.COLUMN_SCALE IS NOT NULL THEN '(' || dc.COLUMN_PRECISION || ', ' || dc.COLUMN_SCALE || ')'\n"
62+
+ " WHEN dc.TYPE_NAME IN ('NUMERIC') AND dc.COLUMN_PRECISION IS NOT NULL AND dc.COLUMN_SCALE IS NULL THEN '(' || dc.COLUMN_PRECISION || ')'\n"
63+
+ " WHEN dc.TYPE_NAME IN ('TIMESTAMP') THEN '(' || dc.COLUMN_SCALE || ')'\n"
64+
+ " END AS FULL_TYPE_NAME,\n"
65+
+ " dc.COLUMN_LENGTH,\n"
66+
+ " dc.COLUMN_PRECISION,\n"
67+
+ " dc.COLUMN_SCALE,\n"
68+
+ " dc.COLUMN_COMMENT,\n"
69+
+ " dc.DEFAULT_VALUE,\n"
70+
+ " CASE\n"
71+
+ " dc.IS_NULLABLE WHEN TRUE THEN 'NO' ELSE 'YES'\n"
72+
+ " END AS IS_NULLABLE\n"
73+
+ "FROM\n"
74+
+ " (\n"
75+
+ " SELECT\n"
76+
+ " c.col_name AS COLUMN_NAME,\n"
77+
+ " CASE\n"
78+
+ " WHEN c.type_name = 'CHAR' AND c.\"VARYING\" = TRUE THEN 'VARCHAR'\n"
79+
+ " WHEN c.type_name = 'DATETIME' AND c.TIMESTAMP_T = 'i' THEN 'TIMESTAMP' ELSE c.type_name\n"
80+
+ " END AS TYPE_NAME,\n"
81+
+ " DECODE(c.type_name,\n"
82+
+ " 'TINYINT', 1, 'SMALLINT', 2,\n"
83+
+ " 'INTEGER', 4, 'BIGINT', 8,\n"
84+
+ " 'FLOAT', 4, 'DOUBLE', 8,\n"
85+
+ " 'NUMERIC', 17,\n"
86+
+ " 'CHAR', DECODE(c.scale, -1, 60000, c.scale),\n"
87+
+ " 'DATE', 4, 'DATETIME', 8,\n"
88+
+ " 'TIMESTAMP', 8, 'DATETIME WITH TIME ZONE', 8,\n"
89+
+ " 'TIME', 4, 'TIME WITH TIME ZONE', 4,\n"
90+
+ " 'INTERVAL YEAR', 4, 'INTERVAL MONTH', 4,\n"
91+
+ " 'INTERVAL DAY', 4, 'INTERVAL HOUR', 4,\n"
92+
+ " 'INTERVAL MINUTE', 4, 'INTERVAL SECOND', 8,\n"
93+
+ " 'INTERVAL YEAR TO MONTH', 4,\n"
94+
+ " 'INTERVAL DAY TO HOUR', 4,\n"
95+
+ " 'INTERVAL DAY TO MINUTE', 4,\n"
96+
+ " 'INTERVAL DAY TO SECOND', 8,\n"
97+
+ " 'INTERVAL HOUR TO MINUTE', 4,\n"
98+
+ " 'INTERVAL HOUR TO SECOND', 8,\n"
99+
+ " 'INTERVAL MINUTE TO SECOND', 8,\n"
100+
+ " 'CLOB', 2147483648,\n"
101+
+ " 'BLOB', 2147483648, 'BINARY', 2147483648,\n"
102+
+ " 'GUID', 2, 'BOOLEAN', 1,\n"
103+
+ " 'ROWVERSION', 8, 'ROWID', 10, NULL) AS COLUMN_LENGTH,\n"
104+
+ " DECODE(TRUNC(c.scale / 65536), 0, NULL, TRUNC(c.scale / 65536)::INTEGER) AS COLUMN_PRECISION,\n"
105+
+ " DECODE(DECODE(c.type_name, 'CHAR',-1, c.scale),-1, NULL, MOD(c.scale, 65536)) AS COLUMN_SCALE,\n"
106+
+ " c.comments AS COLUMN_COMMENT,\n"
107+
+ " c.DEF_VAL AS DEFAULT_VALUE,\n"
108+
+ " c.NOT_NULl AS IS_NULLABLE\n"
109+
+ " FROM\n"
110+
+ " dba_columns c\n"
111+
+ " LEFT JOIN dba_tables tab ON\n"
112+
+ " c.db_id = tab.db_id\n"
113+
+ " AND c.table_id = tab.table_id\n"
114+
+ " LEFT JOIN dba_schemas sc ON\n"
115+
+ " tab.schema_id = sc.schema_id\n"
116+
+ " AND tab.db_id = sc.db_id\n"
117+
+ " WHERE\n"
118+
+ " sc.schema_name = '%s'\n"
119+
+ " AND tab.table_name = '%s'\n"
120+
+ ") AS dc \n";
121+
122+
public XuguCatalog(
123+
String catalogName,
124+
String username,
125+
String pwd,
126+
JdbcUrlUtil.UrlInfo urlInfo,
127+
String defaultSchema) {
128+
super(catalogName, username, pwd, urlInfo, defaultSchema);
129+
}
130+
131+
@Override
132+
protected String getListDatabaseSql() {
133+
return "SELECT DB_NAME FROM dba_databases";
134+
}
135+
136+
@Override
137+
protected String getCreateTableSql(TablePath tablePath, CatalogTable table) {
138+
return new XuguCreateTableSqlBuilder(table).build(tablePath);
139+
}
140+
141+
@Override
142+
protected String getDropTableSql(TablePath tablePath) {
143+
return String.format("DROP TABLE %s", tablePath.getSchemaAndTableName("\""));
144+
}
145+
146+
@Override
147+
protected String getCreateDatabaseSql(String databaseName) {
148+
return String.format("CREATE DATABASE \"%s\"", databaseName);
149+
}
150+
151+
@Override
152+
protected String getDropDatabaseSql(String databaseName) {
153+
return String.format("DROP DATABASE \"%s\"", databaseName);
154+
}
155+
156+
@Override
157+
protected String getListTableSql(String databaseName) {
158+
return "SELECT user_name ,table_name FROM all_users au \n"
159+
+ "INNER JOIN all_tables at ON au.user_id=at.user_id AND au.db_id=at.db_id";
160+
}
161+
162+
@Override
163+
protected String getTableName(ResultSet rs) throws SQLException {
164+
if (EXCLUDED_SCHEMAS.contains(rs.getString(1))) {
165+
return null;
166+
}
167+
return rs.getString(1) + "." + rs.getString(2);
168+
}
169+
170+
@Override
171+
protected String getSelectColumnsSql(TablePath tablePath) {
172+
return String.format(
173+
SELECT_COLUMNS_SQL_TEMPLATE, tablePath.getSchemaName(), tablePath.getTableName());
174+
}
175+
176+
@Override
177+
protected Column buildColumn(ResultSet resultSet) throws SQLException {
178+
String columnName = resultSet.getString("COLUMN_NAME");
179+
String typeName = resultSet.getString("TYPE_NAME");
180+
String fullTypeName = resultSet.getString("FULL_TYPE_NAME");
181+
long columnLength = resultSet.getLong("COLUMN_LENGTH");
182+
Long columnPrecision = resultSet.getObject("COLUMN_PRECISION", Long.class);
183+
Integer columnScale = resultSet.getObject("COLUMN_SCALE", Integer.class);
184+
String columnComment = resultSet.getString("COLUMN_COMMENT");
185+
Object defaultValue = resultSet.getObject("DEFAULT_VALUE");
186+
boolean isNullable = resultSet.getString("IS_NULLABLE").equals("YES");
187+
188+
BasicTypeDefine typeDefine =
189+
BasicTypeDefine.builder()
190+
.name(columnName)
191+
.columnType(fullTypeName)
192+
.dataType(typeName)
193+
.length(columnLength)
194+
.precision(columnPrecision)
195+
.scale(columnScale)
196+
.nullable(isNullable)
197+
.defaultValue(defaultValue)
198+
.comment(columnComment)
199+
.build();
200+
return XuguTypeConverter.INSTANCE.convert(typeDefine);
201+
}
202+
203+
@Override
204+
protected String getUrlFromDatabaseName(String databaseName) {
205+
return defaultUrl;
206+
}
207+
208+
@Override
209+
protected String getOptionTableName(TablePath tablePath) {
210+
return tablePath.getSchemaAndTableName();
211+
}
212+
213+
@Override
214+
public boolean tableExists(TablePath tablePath) throws CatalogException {
215+
try {
216+
if (StringUtils.isNotBlank(tablePath.getDatabaseName())) {
217+
return databaseExists(tablePath.getDatabaseName())
218+
&& listTables(tablePath.getDatabaseName())
219+
.contains(tablePath.getSchemaAndTableName());
220+
}
221+
return listTables().contains(tablePath.getSchemaAndTableName());
222+
} catch (DatabaseNotExistException e) {
223+
return false;
224+
}
225+
}
226+
227+
private List<String> listTables() {
228+
List<String> databases = listDatabases();
229+
return listTables(databases.get(0));
230+
}
231+
232+
@Override
233+
public CatalogTable getTable(String sqlQuery) throws SQLException {
234+
Connection defaultConnection = getConnection(defaultUrl);
235+
return CatalogUtils.getCatalogTable(defaultConnection, sqlQuery, new XuguTypeMapper());
236+
}
237+
238+
@Override
239+
protected String getTruncateTableSql(TablePath tablePath) {
240+
return String.format(
241+
"TRUNCATE TABLE \"%s\".\"%s\"",
242+
tablePath.getSchemaName(), tablePath.getTableName());
243+
}
244+
245+
@Override
246+
protected String getExistDataSql(TablePath tablePath) {
247+
return String.format(
248+
"SELECT * FROM \"%s\".\"%s\" WHERE ROWNUM = 1",
249+
tablePath.getSchemaName(), tablePath.getTableName());
250+
}
251+
252+
@Override
253+
protected List<ConstraintKey> getConstraintKeys(DatabaseMetaData metaData, TablePath tablePath)
254+
throws SQLException {
255+
try {
256+
return getConstraintKeys(
257+
metaData,
258+
tablePath.getDatabaseName(),
259+
tablePath.getSchemaName(),
260+
tablePath.getTableName());
261+
} catch (SQLException e) {
262+
log.info("Obtain constraint failure", e);
263+
return new ArrayList<>();
264+
}
265+
}
266+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.xugu;
19+
20+
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
21+
import org.apache.seatunnel.api.configuration.util.OptionRule;
22+
import org.apache.seatunnel.api.configuration.util.OptionValidationException;
23+
import org.apache.seatunnel.api.table.catalog.Catalog;
24+
import org.apache.seatunnel.api.table.factory.CatalogFactory;
25+
import org.apache.seatunnel.api.table.factory.Factory;
26+
import org.apache.seatunnel.common.utils.JdbcUrlUtil;
27+
import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.JdbcCatalogOptions;
28+
import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.oracle.OracleURLParser;
29+
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
30+
31+
import com.google.auto.service.AutoService;
32+
33+
import java.util.Optional;
34+
35+
@AutoService(Factory.class)
36+
public class XuguCatalogFactory implements CatalogFactory {
37+
38+
@Override
39+
public String factoryIdentifier() {
40+
return DatabaseIdentifier.XUGU;
41+
}
42+
43+
@Override
44+
public Catalog createCatalog(String catalogName, ReadonlyConfig options) {
45+
String urlWithDatabase = options.get(JdbcCatalogOptions.BASE_URL);
46+
JdbcUrlUtil.UrlInfo urlInfo = OracleURLParser.parse(urlWithDatabase);
47+
Optional<String> defaultDatabase = urlInfo.getDefaultDatabase();
48+
if (!defaultDatabase.isPresent()) {
49+
throw new OptionValidationException(JdbcCatalogOptions.BASE_URL);
50+
}
51+
return new XuguCatalog(
52+
catalogName,
53+
options.get(JdbcCatalogOptions.USERNAME),
54+
options.get(JdbcCatalogOptions.PASSWORD),
55+
urlInfo,
56+
options.get(JdbcCatalogOptions.SCHEMA));
57+
}
58+
59+
@Override
60+
public OptionRule optionRule() {
61+
return JdbcCatalogOptions.BASE_RULE.build();
62+
}
63+
}

0 commit comments

Comments
 (0)