Skip to content

Commit cc66020

Browse files
authored
[Improve][Jdbc] Increase tyepe converter when auto creating tables (#6617)
1 parent 01159ec commit cc66020

File tree

18 files changed

+4044
-608
lines changed

18 files changed

+4044
-608
lines changed

Diff for: seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCatalog.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,8 @@ public class OracleCatalog extends AbstractJdbcCatalog {
7979
+ " END as TYPE_NAME,\n"
8080
+ " cols.data_type || \n"
8181
+ " CASE \n"
82-
+ " WHEN cols.data_type IN ('VARCHAR2', 'NVARCHAR2', 'CHAR', 'NCHAR') THEN '(' || cols.data_length || ')'\n"
82+
+ " WHEN cols.data_type IN ('VARCHAR', 'VARCHAR2', 'CHAR') THEN '(' || cols.data_length || ')'\n"
83+
+ " WHEN cols.data_type IN ('NVARCHAR2', 'NCHAR') THEN '(' || cols.char_length || ')'\n"
8384
+ " WHEN cols.data_type IN ('NUMBER') AND cols.data_precision IS NOT NULL AND cols.data_scale IS NOT NULL THEN '(' || cols.data_precision || ', ' || cols.data_scale || ')'\n"
8485
+ " WHEN cols.data_type IN ('NUMBER') AND cols.data_precision IS NOT NULL AND cols.data_scale IS NULL THEN '(' || cols.data_precision || ')'\n"
8586
+ " WHEN cols.data_type IN ('RAW') THEN '(' || cols.data_length || ')'\n"

Diff for: seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/db2/DB2TypeConverter.java

+422
Large diffs are not rendered by default.

Diff for: seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/db2/DB2TypeMapper.java

+22-114
Original file line numberDiff line numberDiff line change
@@ -17,129 +17,37 @@
1717

1818
package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.db2;
1919

20-
import org.apache.seatunnel.api.table.type.BasicType;
21-
import org.apache.seatunnel.api.table.type.DecimalType;
22-
import org.apache.seatunnel.api.table.type.LocalTimeType;
23-
import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
24-
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
25-
import org.apache.seatunnel.common.exception.CommonError;
26-
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
27-
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
20+
import org.apache.seatunnel.api.table.catalog.Column;
21+
import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
2822
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;
2923

30-
import org.slf4j.Logger;
31-
import org.slf4j.LoggerFactory;
32-
3324
import java.sql.ResultSetMetaData;
3425
import java.sql.SQLException;
3526

3627
public class DB2TypeMapper implements JdbcDialectTypeMapper {
3728

38-
private static final Logger LOG = LoggerFactory.getLogger(JdbcDialect.class);
39-
40-
// reference https://www.ibm.com/docs/en/ssw_ibm_i_75/pdf/rbafzpdf.pdf
41-
// ============================data types=====================
42-
private static final String DB2_BOOLEAN = "BOOLEAN";
43-
44-
private static final String DB2_ROWID = "ROWID";
45-
private static final String DB2_SMALLINT = "SMALLINT";
46-
private static final String DB2_INTEGER = "INTEGER";
47-
private static final String DB2_INT = "INT";
48-
private static final String DB2_BIGINT = "BIGINT";
49-
// exact
50-
private static final String DB2_DECIMAL = "DECIMAL";
51-
private static final String DB2_DEC = "DEC";
52-
private static final String DB2_NUMERIC = "NUMERIC";
53-
private static final String DB2_NUM = "NUM";
54-
// float
55-
private static final String DB2_REAL = "REAL";
56-
private static final String DB2_FLOAT = "FLOAT";
57-
private static final String DB2_DOUBLE = "DOUBLE";
58-
private static final String DB2_DOUBLE_PRECISION = "DOUBLE PRECISION";
59-
private static final String DB2_DECFLOAT = "DECFLOAT";
60-
// string
61-
private static final String DB2_CHAR = "CHAR";
62-
private static final String DB2_VARCHAR = "VARCHAR";
63-
private static final String DB2_LONG_VARCHAR = "LONG VARCHAR";
64-
private static final String DB2_CLOB = "CLOB";
65-
// graphic
66-
private static final String DB2_GRAPHIC = "GRAPHIC";
67-
private static final String DB2_VARGRAPHIC = "VARGRAPHIC";
68-
private static final String DB2_LONG_VARGRAPHIC = "LONG VARGRAPHIC";
69-
private static final String DB2_DBCLOB = "DBCLOB";
70-
71-
// ---------------------------binary---------------------------
72-
private static final String DB2_BINARY = "BINARY";
73-
private static final String DB2_VARBINARY = "VARBINARY";
74-
75-
// ------------------------------time-------------------------
76-
private static final String DB2_DATE = "DATE";
77-
private static final String DB2_TIME = "TIME";
78-
private static final String DB2_TIMESTAMP = "TIMESTAMP";
79-
80-
// ------------------------------blob-------------------------
81-
private static final String DB2_BLOB = "BLOB";
82-
83-
// other
84-
private static final String DB2_XML = "XML";
29+
@Override
30+
public Column mappingColumn(BasicTypeDefine typeDefine) {
31+
return DB2TypeConverter.INSTANCE.convert(typeDefine);
32+
}
8533

8634
@Override
87-
public SeaTunnelDataType<?> mapping(ResultSetMetaData metadata, int colIndex)
88-
throws SQLException {
89-
String columnType = metadata.getColumnTypeName(colIndex).toUpperCase();
35+
public Column mappingColumn(ResultSetMetaData metadata, int colIndex) throws SQLException {
36+
String columnName = metadata.getColumnLabel(colIndex);
37+
String nativeType = metadata.getColumnTypeName(colIndex);
38+
int isNullable = metadata.isNullable(colIndex);
9039
int precision = metadata.getPrecision(colIndex);
91-
switch (columnType) {
92-
case DB2_BOOLEAN:
93-
return BasicType.BOOLEAN_TYPE;
94-
case DB2_SMALLINT:
95-
return BasicType.SHORT_TYPE;
96-
case DB2_INT:
97-
case DB2_INTEGER:
98-
return BasicType.INT_TYPE;
99-
case DB2_BIGINT:
100-
return BasicType.LONG_TYPE;
101-
case DB2_DECIMAL:
102-
case DB2_DEC:
103-
case DB2_NUMERIC:
104-
case DB2_NUM:
105-
if (precision > 0) {
106-
return new DecimalType(precision, metadata.getScale(colIndex));
107-
}
108-
LOG.warn("decimal did define precision,scale, will be Decimal(38,18)");
109-
return new DecimalType(38, 18);
110-
case DB2_REAL:
111-
return BasicType.FLOAT_TYPE;
112-
case DB2_FLOAT:
113-
case DB2_DOUBLE:
114-
case DB2_DOUBLE_PRECISION:
115-
case DB2_DECFLOAT:
116-
return BasicType.DOUBLE_TYPE;
117-
case DB2_CHAR:
118-
case DB2_VARCHAR:
119-
case DB2_LONG_VARCHAR:
120-
case DB2_CLOB:
121-
case DB2_GRAPHIC:
122-
case DB2_VARGRAPHIC:
123-
case DB2_LONG_VARGRAPHIC:
124-
case DB2_DBCLOB:
125-
return BasicType.STRING_TYPE;
126-
case DB2_BINARY:
127-
case DB2_VARBINARY:
128-
case DB2_BLOB:
129-
return PrimitiveByteArrayType.INSTANCE;
130-
case DB2_DATE:
131-
return LocalTimeType.LOCAL_DATE_TYPE;
132-
case DB2_TIME:
133-
return LocalTimeType.LOCAL_TIME_TYPE;
134-
case DB2_TIMESTAMP:
135-
return LocalTimeType.LOCAL_DATE_TIME_TYPE;
136-
case DB2_ROWID:
137-
// maybe should support
138-
case DB2_XML:
139-
default:
140-
final String jdbcColumnName = metadata.getColumnName(colIndex);
141-
throw CommonError.convertToSeaTunnelTypeError(
142-
DatabaseIdentifier.DB_2, columnType, jdbcColumnName);
143-
}
40+
int scale = metadata.getScale(colIndex);
41+
BasicTypeDefine typeDefine =
42+
BasicTypeDefine.builder()
43+
.name(columnName)
44+
.columnType(nativeType)
45+
.dataType(nativeType)
46+
.nullable(isNullable == ResultSetMetaData.columnNullable)
47+
.length((long) precision)
48+
.precision((long) precision)
49+
.scale(scale)
50+
.build();
51+
return mappingColumn(typeDefine);
14452
}
14553
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.kingbase;
19+
20+
import org.apache.seatunnel.api.table.catalog.Column;
21+
import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
22+
import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
23+
import org.apache.seatunnel.api.table.converter.TypeConverter;
24+
import org.apache.seatunnel.api.table.type.BasicType;
25+
import org.apache.seatunnel.api.table.type.DecimalType;
26+
import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
27+
import org.apache.seatunnel.common.exception.CommonError;
28+
import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
29+
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
30+
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.psql.PostgresTypeConverter;
31+
32+
import com.google.auto.service.AutoService;
33+
import lombok.extern.slf4j.Slf4j;
34+
35+
// reference https://help.kingbase.com.cn/v8/development/sql-plsql/sql/datatype.html#id2
36+
@Slf4j
37+
@AutoService(TypeConverter.class)
38+
public class KingbaseTypeConverter extends PostgresTypeConverter {
39+
public static final String KB_TINYINT = "TINYINT";
40+
public static final String KB_MONEY = "MONEY";
41+
public static final String KB_BLOB = "BLOB";
42+
public static final String KB_CLOB = "CLOB";
43+
public static final String KB_BIT = "BIT";
44+
45+
public static final KingbaseTypeConverter INSTANCE = new KingbaseTypeConverter();
46+
47+
@Override
48+
public String identifier() {
49+
return DatabaseIdentifier.KINGBASE;
50+
}
51+
52+
@Override
53+
public Column convert(BasicTypeDefine typeDefine) {
54+
try {
55+
return super.convert(typeDefine);
56+
} catch (SeaTunnelRuntimeException e) {
57+
PhysicalColumn.PhysicalColumnBuilder builder =
58+
PhysicalColumn.builder()
59+
.name(typeDefine.getName())
60+
.sourceType(typeDefine.getColumnType())
61+
.nullable(typeDefine.isNullable())
62+
.defaultValue(typeDefine.getDefaultValue())
63+
.comment(typeDefine.getComment());
64+
65+
String kingbaseDataType = typeDefine.getDataType().toUpperCase();
66+
switch (kingbaseDataType) {
67+
case KB_TINYINT:
68+
builder.dataType(BasicType.BYTE_TYPE);
69+
break;
70+
case KB_MONEY:
71+
builder.dataType(new DecimalType(38, 18));
72+
builder.columnLength(38L);
73+
builder.scale(18);
74+
break;
75+
case KB_BLOB:
76+
builder.dataType(PrimitiveByteArrayType.INSTANCE);
77+
builder.columnLength((long) (1024 * 1024 * 1024));
78+
break;
79+
case KB_CLOB:
80+
builder.dataType(BasicType.STRING_TYPE);
81+
builder.columnLength(typeDefine.getLength());
82+
builder.columnLength((long) (1024 * 1024 * 1024));
83+
break;
84+
case KB_BIT:
85+
builder.dataType(PrimitiveByteArrayType.INSTANCE);
86+
// BIT(M) -> BYTE(M/8)
87+
long byteLength = typeDefine.getLength() / 8;
88+
byteLength += typeDefine.getLength() % 8 > 0 ? 1 : 0;
89+
builder.columnLength(byteLength);
90+
break;
91+
default:
92+
throw CommonError.convertToSeaTunnelTypeError(
93+
DatabaseIdentifier.KINGBASE,
94+
typeDefine.getDataType(),
95+
typeDefine.getName());
96+
}
97+
return builder.build();
98+
}
99+
}
100+
101+
@Override
102+
public BasicTypeDefine reconvert(Column column) {
103+
try {
104+
return super.reconvert(column);
105+
} catch (SeaTunnelRuntimeException e) {
106+
throw CommonError.convertToConnectorTypeError(
107+
DatabaseIdentifier.KINGBASE,
108+
column.getDataType().getSqlType().name(),
109+
column.getName());
110+
}
111+
}
112+
}

0 commit comments

Comments
 (0)