Skip to content

Commit c8a682c

Browse files
authored
[Fix][SQLTransform] fix the scale loss for the sql transform (#6553)
1 parent 5f3c9c3 commit c8a682c

File tree

2 files changed

+110
-0
lines changed

2 files changed

+110
-0
lines changed

Diff for: seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLTransform.java

+1
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,7 @@ protected TableSchema transformTableSchema() {
167167
fieldNames[i],
168168
fieldTypes[i],
169169
simpleColumn.getColumnLength(),
170+
simpleColumn.getScale(),
170171
simpleColumn.isNullable(),
171172
simpleColumn.getDefaultValue(),
172173
simpleColumn.getComment());
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.transform.sql;
19+
20+
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
21+
import org.apache.seatunnel.api.table.catalog.CatalogTable;
22+
import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
23+
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
24+
import org.apache.seatunnel.api.table.catalog.TableSchema;
25+
import org.apache.seatunnel.api.table.type.BasicType;
26+
import org.apache.seatunnel.api.table.type.LocalTimeType;
27+
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
28+
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
29+
30+
import org.junit.jupiter.api.Assertions;
31+
import org.junit.jupiter.api.Test;
32+
33+
import java.util.ArrayList;
34+
import java.util.HashMap;
35+
import java.util.Objects;
36+
37+
public class SQLTransformTest {
38+
39+
private static final String TEST_NAME = "test";
40+
private static final String TIMESTAMP_FILEDNAME = "create_time";
41+
private static final String[] FILED_NAMES =
42+
new String[] {"id", "name", "age", TIMESTAMP_FILEDNAME};
43+
private static final String GENERATE_PARTITION_KEY = "dt";
44+
private static final ReadonlyConfig READONLY_CONFIG =
45+
ReadonlyConfig.fromMap(
46+
new HashMap() {
47+
{
48+
put(
49+
"query",
50+
"select *,FORMATDATETIME(create_time,'yyyy-MM-dd HH:mm') as dt from test");
51+
}
52+
});
53+
54+
@Test
55+
public void testScaleSupport() {
56+
SQLTransform sqlTransform = new SQLTransform(READONLY_CONFIG, getCatalogTable());
57+
TableSchema tableSchema = sqlTransform.transformTableSchema();
58+
tableSchema
59+
.getColumns()
60+
.forEach(
61+
column -> {
62+
if (column.getName().equals(TIMESTAMP_FILEDNAME)) {
63+
Assertions.assertEquals(9, column.getScale());
64+
} else if (column.getName().equals(GENERATE_PARTITION_KEY)) {
65+
Assertions.assertTrue(Objects.isNull(column.getScale()));
66+
} else {
67+
Assertions.assertEquals(3, column.getColumnLength());
68+
}
69+
});
70+
}
71+
72+
private CatalogTable getCatalogTable() {
73+
SeaTunnelRowType rowType =
74+
new SeaTunnelRowType(
75+
FILED_NAMES,
76+
new SeaTunnelDataType[] {
77+
BasicType.INT_TYPE,
78+
BasicType.STRING_TYPE,
79+
BasicType.INT_TYPE,
80+
LocalTimeType.LOCAL_DATE_TIME_TYPE
81+
});
82+
TableSchema.Builder schemaBuilder = TableSchema.builder();
83+
for (int i = 0; i < rowType.getTotalFields(); i++) {
84+
Integer scale = null;
85+
Long columnLength = null;
86+
if (rowType.getFieldName(i).equals(TIMESTAMP_FILEDNAME)) {
87+
scale = 9;
88+
} else {
89+
columnLength = 3L;
90+
}
91+
PhysicalColumn column =
92+
PhysicalColumn.of(
93+
rowType.getFieldName(i),
94+
rowType.getFieldType(i),
95+
columnLength,
96+
scale,
97+
true,
98+
null,
99+
null);
100+
schemaBuilder.column(column);
101+
}
102+
return CatalogTable.of(
103+
TableIdentifier.of(TEST_NAME, TEST_NAME, null, TEST_NAME),
104+
schemaBuilder.build(),
105+
new HashMap<>(),
106+
new ArrayList<>(),
107+
"It has column information.");
108+
}
109+
}

0 commit comments

Comments
 (0)