Skip to content

Commit 372bc17

Browse files
hailin0hawk9821
authored andcommitted
[Feature][Transform] Support define sink column type (#9114)
1 parent 8e9aeec commit 372bc17

File tree

45 files changed

+1367
-54
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

45 files changed

+1367
-54
lines changed
+38
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
# Define Sink Type
2+
3+
> Define sink type transform plugin
4+
5+
## Description
6+
7+
Used to define the storage type of sink field. This is effective when the savemode enables automatic table creation.
8+
9+
## Options
10+
11+
| name | type | required | default value | Description |
12+
|:-------:|---------------------------|----------|---------------|------------------------------------------------------------------------|
13+
| columns | list<map<string, string>> | yes | | The columns to be defined, the name and type of the column must be set |
14+
15+
## Examples
16+
17+
### Define sink columns type for savemode
18+
19+
```
20+
transform {
21+
DefineSinkType {
22+
columns = [
23+
{
24+
column = "c1"
25+
type = "nvarchar2(10)"
26+
}
27+
{
28+
column = "c2"
29+
type = "datetime(6)"
30+
}
31+
{
32+
column = "c3"
33+
type = "your target type"
34+
}
35+
]
36+
}
37+
}
38+
```
+38
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
# Define Sink Type
2+
3+
> Define sink type transform plugin
4+
5+
## Description
6+
7+
用于定义 sink 字段存储类型,对于 savemode 开启自动建表时有效
8+
9+
## Options
10+
11+
| name | type | required | default value | Description |
12+
|:-------:|---------------------------|----------|---------------|--------------------|
13+
| columns | list<map<string, string>> | yes | | 需要定义的列,必须设置列的名称和类型 |
14+
15+
## Examples
16+
17+
### 指定部分字段的建表类型
18+
19+
```
20+
transform {
21+
DefineSinkType {
22+
columns = [
23+
{
24+
column = "c1"
25+
type = "nvarchar2(10)"
26+
}
27+
{
28+
column = "c2"
29+
type = "datetime(6)"
30+
}
31+
{
32+
column = "c3"
33+
type = "your target type"
34+
}
35+
]
36+
}
37+
}
38+
```

seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Column.java

+6
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,12 @@ public abstract class Column implements Serializable {
8282
*/
8383
protected final String sourceType;
8484

85+
/**
86+
* The data type used to store the target database, typically specified in transform or sink
87+
* scenarios.
88+
*/
89+
protected String sinkType;
90+
8591
/** your options * */
8692
protected final Map<String, Object> options;
8793

seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/PhysicalColumn.java

+6
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,7 @@ public PhysicalColumn(
163163
Object defaultValue,
164164
String comment,
165165
String sourceType,
166+
String sinkType,
166167
Map<String, Object> options,
167168
boolean isUnsigned,
168169
boolean isZeroFill,
@@ -177,6 +178,7 @@ public PhysicalColumn(
177178
defaultValue,
178179
comment,
179180
sourceType,
181+
sinkType,
180182
options,
181183
isUnsigned,
182184
isZeroFill,
@@ -297,6 +299,7 @@ public Column copy(SeaTunnelDataType<?> newType) {
297299
defaultValue,
298300
comment,
299301
sourceType,
302+
sinkType,
300303
options,
301304
isUnsigned,
302305
isZeroFill,
@@ -315,6 +318,7 @@ public Column copy() {
315318
defaultValue,
316319
comment,
317320
sourceType,
321+
sinkType,
318322
options,
319323
isUnsigned,
320324
isZeroFill,
@@ -333,6 +337,7 @@ public Column rename(String newColumnName) {
333337
defaultValue,
334338
comment,
335339
sourceType,
340+
sinkType,
336341
options,
337342
isUnsigned,
338343
isZeroFill,
@@ -351,6 +356,7 @@ public Column reSourceType(String newSourceType) {
351356
defaultValue,
352357
comment,
353358
newSourceType,
359+
sinkType,
354360
options,
355361
isUnsigned,
356362
isZeroFill,

seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseCatalogUtil.java

+7-1
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,16 @@ public class ClickhouseCatalogUtil extends CatalogUtil {
3232

3333
public String columnToConnectorType(Column column) {
3434
checkNotNull(column, "The column is required.");
35+
String columnType;
36+
if (column.getSinkType() != null) {
37+
columnType = column.getSinkType();
38+
} else {
39+
columnType = ClickhouseTypeConverter.INSTANCE.reconvert(column).getColumnType();
40+
}
3541
return String.format(
3642
"`%s` %s %s",
3743
column.getName(),
38-
ClickhouseTypeConverter.INSTANCE.reconvert(column).getColumnType(),
44+
columnType,
3945
StringUtils.isEmpty(column.getComment())
4046
? ""
4147
: "COMMENT '"
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.seatunnel.clickhouse.util;
19+
20+
import org.apache.seatunnel.api.table.catalog.Column;
21+
import org.apache.seatunnel.api.table.type.BasicType;
22+
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
23+
24+
import org.junit.jupiter.api.Test;
25+
26+
import static org.junit.jupiter.api.Assertions.assertEquals;
27+
import static org.mockito.Mockito.mock;
28+
import static org.mockito.Mockito.when;
29+
30+
public class ClickhouseCatalogUtilTest {
31+
@Test
32+
void returnsReconvertedTypeWhenSinkTypeNotNull() {
33+
Column column = mock(Column.class);
34+
when(column.getName()).thenReturn("col1");
35+
when(column.getSinkType()).thenReturn("String");
36+
37+
String result = ClickhouseCatalogUtil.INSTANCE.columnToConnectorType(column);
38+
39+
assertEquals("`col1` String ", result);
40+
}
41+
42+
@Test
43+
void returnsReconvertedTypeWhenSinkTypeIsNull() {
44+
Column column = mock(Column.class);
45+
when(column.getName()).thenReturn("col1");
46+
when(column.getDataType()).thenReturn((SeaTunnelDataType) BasicType.INT_TYPE);
47+
48+
String result = ClickhouseCatalogUtil.INSTANCE.columnToConnectorType(column);
49+
50+
assertEquals("`col1` Int32 ", result);
51+
}
52+
53+
@Test
54+
void returnsReconvertedTypeWhenTypesNotNull() {
55+
Column column = mock(Column.class);
56+
when(column.getName()).thenReturn("col1");
57+
when(column.getDataType()).thenReturn((SeaTunnelDataType) BasicType.INT_TYPE);
58+
when(column.getSinkType()).thenReturn("String");
59+
60+
String result = ClickhouseCatalogUtil.INSTANCE.columnToConnectorType(column);
61+
62+
assertEquals("`col1` String ", result);
63+
}
64+
}

seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/util/DorisCatalogUtil.java

+8-3
Original file line numberDiff line numberDiff line change
@@ -258,13 +258,18 @@ private static String mergeColumnInTemplate(
258258
return template;
259259
}
260260

261-
private static String columnToDorisType(
262-
Column column, TypeConverter<BasicTypeDefine> typeConverter) {
261+
static String columnToDorisType(Column column, TypeConverter<BasicTypeDefine> typeConverter) {
263262
checkNotNull(column, "The column is required.");
263+
String columnType;
264+
if (column.getSinkType() != null) {
265+
columnType = column.getSinkType();
266+
} else {
267+
columnType = typeConverter.reconvert(column).getColumnType();
268+
}
264269
return String.format(
265270
"`%s` %s %s %s",
266271
column.getName(),
267-
typeConverter.reconvert(column).getColumnType(),
272+
columnType,
268273
column.isNullable() ? "NULL" : "NOT NULL",
269274
StringUtils.isEmpty(column.getComment())
270275
? ""
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.doris.util;
19+
20+
import org.apache.seatunnel.api.table.catalog.Column;
21+
import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
22+
import org.apache.seatunnel.api.table.converter.TypeConverter;
23+
import org.apache.seatunnel.api.table.type.BasicType;
24+
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
25+
import org.apache.seatunnel.connectors.doris.datatype.DorisTypeConverterFactory;
26+
27+
import org.junit.jupiter.api.Test;
28+
29+
import static org.junit.jupiter.api.Assertions.assertEquals;
30+
import static org.mockito.Mockito.mock;
31+
import static org.mockito.Mockito.when;
32+
33+
public class DorisCatalogUtilTest {
34+
35+
@Test
36+
void returnsReconvertedTypeWhenSinkTypeNotNull() {
37+
Column column = mock(Column.class);
38+
when(column.getName()).thenReturn("col1");
39+
when(column.getSinkType()).thenReturn("VARCHAR");
40+
41+
String result = DorisCatalogUtil.columnToDorisType(column, mock(TypeConverter.class));
42+
43+
assertEquals("`col1` VARCHAR NOT NULL ", result);
44+
}
45+
46+
@Test
47+
void returnsReconvertedTypeWhenSinkTypeIsNull() {
48+
Column column = mock(Column.class);
49+
when(column.getSinkType()).thenReturn(null);
50+
when(column.getDataType()).thenReturn((SeaTunnelDataType) BasicType.INT_TYPE);
51+
when(column.getName()).thenReturn("col1");
52+
TypeConverter<BasicTypeDefine> typeConverter =
53+
DorisTypeConverterFactory.getTypeConverter("Doris version Doris-2.0.0");
54+
String result = DorisCatalogUtil.columnToDorisType(column, typeConverter);
55+
56+
assertEquals("`col1` INT NOT NULL ", result);
57+
}
58+
59+
@Test
60+
void returnsReconvertedTypeWhenTypesNotNull() {
61+
Column column = mock(Column.class);
62+
when(column.getSinkType()).thenReturn("VARCHAR");
63+
when(column.getDataType()).thenReturn((SeaTunnelDataType) BasicType.INT_TYPE);
64+
when(column.getName()).thenReturn("col1");
65+
when(column.isNullable()).thenReturn(false);
66+
TypeConverter<BasicTypeDefine> typeConverter =
67+
DorisTypeConverterFactory.getTypeConverter("Doris version Doris-2.0.0");
68+
String result = DorisCatalogUtil.columnToDorisType(column, typeConverter);
69+
70+
assertEquals("`col1` VARCHAR NOT NULL ", result);
71+
}
72+
}

seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/data/IcebergTypeMapper.java

+8
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.seatunnel.connectors.seatunnel.iceberg.data;
1919

20+
import org.apache.seatunnel.api.table.catalog.Column;
2021
import org.apache.seatunnel.api.table.type.ArrayType;
2122
import org.apache.seatunnel.api.table.type.BasicType;
2223
import org.apache.seatunnel.api.table.type.DecimalType;
@@ -115,6 +116,13 @@ public static Type toIcebergType(SeaTunnelDataType dataType) {
115116
return toIcebergType(dataType, new AtomicInteger(1));
116117
}
117118

119+
public static Type toIcebergType(Column column, AtomicInteger nextId) {
120+
if (column.getSinkType() != null) {
121+
return Types.fromPrimitiveString(column.getSinkType());
122+
}
123+
return toIcebergType(column.getDataType(), nextId);
124+
}
125+
118126
public static Type toIcebergType(SeaTunnelDataType dataType, AtomicInteger nextId) {
119127
switch (dataType.getSqlType()) {
120128
case BOOLEAN:

seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/utils/SchemaUtils.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -285,7 +285,7 @@ public static Types.StructType toIcebergType(TableSchema tableSchema) {
285285
idIncrementer.getAndIncrement(),
286286
column.isNullable(),
287287
column.getName(),
288-
IcebergTypeMapper.toIcebergType(column.getDataType(), idIncrementer),
288+
IcebergTypeMapper.toIcebergType(column, idIncrementer),
289289
column.getComment());
290290
structFields.add(icebergField);
291291
}

0 commit comments

Comments
 (0)