Skip to content

Commit 88263cd

Browse files
authored
[Fix][Connector-v2] Fix the sql statement error of create table for doris and starrocks (#6679)
1 parent b7480e1 commit 88263cd

File tree

12 files changed

+303
-66
lines changed

12 files changed

+303
-66
lines changed

Diff for: seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SaveModeConstants.java

-29
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.api.sink;
19+
20+
import java.util.Arrays;
21+
import java.util.Optional;
22+
23+
public enum SaveModePlaceHolder {
24+
ROWTYPE_PRIMARY_KEY("rowtype_primary_key", "primary keys"),
25+
ROWTYPE_UNIQUE_KEY("rowtype_unique_key", "unique keys"),
26+
ROWTYPE_FIELDS("rowtype_fields", "fields"),
27+
TABLE_NAME("table_name", "table name"),
28+
DATABASE("database", "database");
29+
30+
private String keyValue;
31+
private String display;
32+
33+
private static final String REPLACE_PLACE_HOLDER = "\\$\\{%s\\}";
34+
private static final String PLACE_HOLDER = "${%s}";
35+
36+
SaveModePlaceHolder(String keyValue, String display) {
37+
this.keyValue = keyValue;
38+
this.display = display;
39+
}
40+
41+
public static String getDisplay(String placeholder) {
42+
Optional<SaveModePlaceHolder> saveModePlaceHolderEnumOptional =
43+
Arrays.stream(SaveModePlaceHolder.values())
44+
.filter(
45+
saveModePlaceHolderEnum ->
46+
placeholder.equals(
47+
saveModePlaceHolderEnum.getPlaceHolder()))
48+
.findFirst();
49+
if (saveModePlaceHolderEnumOptional.isPresent()) {
50+
return saveModePlaceHolderEnumOptional.get().display;
51+
}
52+
throw new RuntimeException(String.format("Not support the placeholder: %s", placeholder));
53+
}
54+
55+
public String getPlaceHolderKey() {
56+
return this.keyValue;
57+
}
58+
59+
public String getPlaceHolder() {
60+
return String.format(PLACE_HOLDER, getPlaceHolderKey());
61+
}
62+
63+
public String getReplacePlaceHolder() {
64+
return String.format(REPLACE_PLACE_HOLDER, getPlaceHolderKey());
65+
}
66+
}

Diff for: seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonError.java

+16
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import static org.apache.seatunnel.common.exception.CommonErrorCode.GET_CATALOG_TABLES_WITH_UNSUPPORTED_TYPE_ERROR;
3737
import static org.apache.seatunnel.common.exception.CommonErrorCode.GET_CATALOG_TABLE_WITH_UNSUPPORTED_TYPE_ERROR;
3838
import static org.apache.seatunnel.common.exception.CommonErrorCode.JSON_OPERATION_FAILED;
39+
import static org.apache.seatunnel.common.exception.CommonErrorCode.SQL_TEMPLATE_HANDLED_ERROR;
3940
import static org.apache.seatunnel.common.exception.CommonErrorCode.UNSUPPORTED_DATA_TYPE;
4041
import static org.apache.seatunnel.common.exception.CommonErrorCode.UNSUPPORTED_ENCODING;
4142
import static org.apache.seatunnel.common.exception.CommonErrorCode.WRITE_SEATUNNEL_ROW_ERROR;
@@ -183,4 +184,19 @@ public static SeaTunnelRuntimeException jsonOperationError(
183184
return new SeaTunnelRuntimeException(code, params);
184185
}
185186
}
187+
188+
public static SeaTunnelRuntimeException sqlTemplateHandledError(
189+
String tableName,
190+
String keyName,
191+
String template,
192+
String placeholder,
193+
String optionName) {
194+
Map<String, String> params = new HashMap<>();
195+
params.put("tableName", tableName);
196+
params.put("keyName", keyName);
197+
params.put("template", template);
198+
params.put("placeholder", placeholder);
199+
params.put("optionName", optionName);
200+
return new SeaTunnelRuntimeException(SQL_TEMPLATE_HANDLED_ERROR, params);
201+
}
186202
}

Diff for: seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonErrorCode.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,10 @@ public enum CommonErrorCode implements SeaTunnelErrorCode {
4848
"<identifier> <operation> file '<fileName>' failed, because it not existed."),
4949
WRITE_SEATUNNEL_ROW_ERROR(
5050
"COMMON-23",
51-
"<connector> write SeaTunnelRow failed, the SeaTunnelRow value is '<seaTunnelRow>'.");
51+
"<connector> write SeaTunnelRow failed, the SeaTunnelRow value is '<seaTunnelRow>'."),
52+
SQL_TEMPLATE_HANDLED_ERROR(
53+
"COMMON-24",
54+
"The table of <tableName> has no <keyName>, but the template \n <template> \n which has the place holder named <placeholder>. Please use the option named <optionName> to specify sql template");
5255

5356
private final String code;
5457
private final String description;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.seatunnel.common.sql.template;
19+
20+
import org.apache.seatunnel.api.sink.SaveModePlaceHolder;
21+
import org.apache.seatunnel.common.exception.CommonError;
22+
23+
import org.apache.commons.lang3.StringUtils;
24+
25+
public class SqlTemplate {
26+
public static void canHandledByTemplateWithPlaceholder(
27+
String createTemplate,
28+
String placeholder,
29+
String actualPlaceHolderValue,
30+
String tableName,
31+
String optionsKey) {
32+
if (createTemplate.contains(placeholder) && StringUtils.isBlank(actualPlaceHolderValue)) {
33+
throw CommonError.sqlTemplateHandledError(
34+
tableName,
35+
SaveModePlaceHolder.getDisplay(placeholder),
36+
createTemplate,
37+
placeholder,
38+
optionsKey);
39+
}
40+
}
41+
}

Diff for: seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisOptions.java

+14-4
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.seatunnel.api.configuration.Options;
2222
import org.apache.seatunnel.api.configuration.util.OptionRule;
2323
import org.apache.seatunnel.api.sink.DataSaveMode;
24+
import org.apache.seatunnel.api.sink.SaveModePlaceHolder;
2425
import org.apache.seatunnel.api.sink.SchemaSaveMode;
2526

2627
import java.util.Map;
@@ -227,11 +228,20 @@ public interface DorisOptions {
227228
Options.key("save_mode_create_template")
228229
.stringType()
229230
.defaultValue(
230-
"CREATE TABLE IF NOT EXISTS `${database}`.`${table_name}` (\n"
231-
+ "${rowtype_fields}\n"
231+
"CREATE TABLE IF NOT EXISTS `"
232+
+ SaveModePlaceHolder.DATABASE.getPlaceHolder()
233+
+ "`.`"
234+
+ SaveModePlaceHolder.TABLE_NAME.getPlaceHolder()
235+
+ "` (\n"
236+
+ SaveModePlaceHolder.ROWTYPE_FIELDS.getPlaceHolder()
237+
+ "\n"
232238
+ ") ENGINE=OLAP\n"
233-
+ " UNIQUE KEY (${rowtype_primary_key})\n"
234-
+ "DISTRIBUTED BY HASH (${rowtype_primary_key})\n "
239+
+ " UNIQUE KEY ("
240+
+ SaveModePlaceHolder.ROWTYPE_PRIMARY_KEY.getPlaceHolder()
241+
+ ")\n"
242+
+ "DISTRIBUTED BY HASH ("
243+
+ SaveModePlaceHolder.ROWTYPE_PRIMARY_KEY.getPlaceHolder()
244+
+ ")\n "
235245
+ "PROPERTIES (\n"
236246
+ "\"replication_allocation\" = \"tag.location.default: 1\",\n"
237247
+ "\"in_memory\" = \"false\",\n"

Diff for: seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/util/DorisCatalogUtil.java

+20-8
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.seatunnel.connectors.doris.util;
1919

20-
import org.apache.seatunnel.api.sink.SaveModeConstants;
20+
import org.apache.seatunnel.api.sink.SaveModePlaceHolder;
2121
import org.apache.seatunnel.api.table.catalog.CatalogTable;
2222
import org.apache.seatunnel.api.table.catalog.Column;
2323
import org.apache.seatunnel.api.table.catalog.TablePath;
@@ -28,6 +28,8 @@
2828
import org.apache.seatunnel.api.table.type.DecimalType;
2929
import org.apache.seatunnel.api.table.type.LocalTimeType;
3030
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
31+
import org.apache.seatunnel.connectors.doris.config.DorisOptions;
32+
import org.apache.seatunnel.connectors.seatunnel.common.sql.template.SqlTemplate;
3133

3234
import org.apache.commons.lang3.StringUtils;
3335

@@ -125,14 +127,25 @@ public static String getCreateTableStatement(
125127
.map(r -> "`" + r.getColumnName() + "`")
126128
.collect(Collectors.joining(","));
127129
}
130+
SqlTemplate.canHandledByTemplateWithPlaceholder(
131+
template,
132+
SaveModePlaceHolder.ROWTYPE_PRIMARY_KEY.getPlaceHolder(),
133+
primaryKey,
134+
tablePath.getFullName(),
135+
DorisOptions.SAVE_MODE_CREATE_TEMPLATE.key());
128136
template =
129137
template.replaceAll(
130-
String.format("\\$\\{%s\\}", SaveModeConstants.ROWTYPE_PRIMARY_KEY),
138+
SaveModePlaceHolder.ROWTYPE_PRIMARY_KEY.getReplacePlaceHolder(),
131139
primaryKey);
140+
SqlTemplate.canHandledByTemplateWithPlaceholder(
141+
template,
142+
SaveModePlaceHolder.ROWTYPE_UNIQUE_KEY.getPlaceHolder(),
143+
uniqueKey,
144+
tablePath.getFullName(),
145+
DorisOptions.SAVE_MODE_CREATE_TEMPLATE.key());
132146
template =
133147
template.replaceAll(
134-
String.format("\\$\\{%s\\}", SaveModeConstants.ROWTYPE_UNIQUE_KEY),
135-
uniqueKey);
148+
SaveModePlaceHolder.ROWTYPE_UNIQUE_KEY.getReplacePlaceHolder(), uniqueKey);
136149
Map<String, CreateTableParser.ColumnInfo> columnInTemplate =
137150
CreateTableParser.getColumnList(template);
138151
template = mergeColumnInTemplate(columnInTemplate, tableSchema, template);
@@ -143,14 +156,13 @@ public static String getCreateTableStatement(
143156
.map(DorisCatalogUtil::columnToDorisType)
144157
.collect(Collectors.joining(",\n"));
145158
return template.replaceAll(
146-
String.format("\\$\\{%s\\}", SaveModeConstants.DATABASE),
159+
SaveModePlaceHolder.DATABASE.getReplacePlaceHolder(),
147160
tablePath.getDatabaseName())
148161
.replaceAll(
149-
String.format("\\$\\{%s\\}", SaveModeConstants.TABLE_NAME),
162+
SaveModePlaceHolder.TABLE_NAME.getReplacePlaceHolder(),
150163
tablePath.getTableName())
151164
.replaceAll(
152-
String.format("\\$\\{%s\\}", SaveModeConstants.ROWTYPE_FIELDS),
153-
rowTypeFields);
165+
SaveModePlaceHolder.ROWTYPE_FIELDS.getReplacePlaceHolder(), rowTypeFields);
154166
}
155167

156168
private static String mergeColumnInTemplate(

Diff for: seatunnel-connectors-v2/connector-doris/src/test/java/org/apache/seatunnel/connectors/doris/catalog/DorisCreateTableTest.java

+38
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.seatunnel.connectors.doris.catalog;
1919

20+
import org.apache.seatunnel.api.sink.SaveModePlaceHolder;
2021
import org.apache.seatunnel.api.table.catalog.CatalogTable;
2122
import org.apache.seatunnel.api.table.catalog.Column;
2223
import org.apache.seatunnel.api.table.catalog.ConstraintKey;
@@ -28,8 +29,13 @@
2829
import org.apache.seatunnel.api.table.type.BasicType;
2930
import org.apache.seatunnel.api.table.type.DecimalType;
3031
import org.apache.seatunnel.api.table.type.LocalTimeType;
32+
import org.apache.seatunnel.common.exception.CommonError;
33+
import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
34+
import org.apache.seatunnel.connectors.doris.config.DorisOptions;
3135
import org.apache.seatunnel.connectors.doris.util.DorisCatalogUtil;
3236

37+
import org.apache.commons.lang3.StringUtils;
38+
3339
import org.junit.jupiter.api.Assertions;
3440
import org.junit.jupiter.api.Test;
3541

@@ -130,6 +136,38 @@ public void test() {
130136
+ "\"storage_format\" = \"V2\",\n"
131137
+ "\"disable_auto_compaction\" = \"false\"\n"
132138
+ ")");
139+
140+
String createTemplate = DorisOptions.SAVE_MODE_CREATE_TEMPLATE.defaultValue();
141+
CatalogTable catalogTable =
142+
CatalogTable.of(
143+
TableIdentifier.of("test", "test1", "test2"),
144+
TableSchema.builder()
145+
.primaryKey(
146+
PrimaryKey.of(StringUtils.EMPTY, Collections.emptyList()))
147+
.constraintKey(Collections.emptyList())
148+
.columns(columns)
149+
.build(),
150+
Collections.emptyMap(),
151+
Collections.emptyList(),
152+
"");
153+
TablePath tablePath = TablePath.of("test1.test2");
154+
SeaTunnelRuntimeException actualSeaTunnelRuntimeException =
155+
Assertions.assertThrows(
156+
SeaTunnelRuntimeException.class,
157+
() ->
158+
DorisCatalogUtil.getCreateTableStatement(
159+
createTemplate, tablePath, catalogTable));
160+
String primaryKeyHolder = SaveModePlaceHolder.ROWTYPE_PRIMARY_KEY.getPlaceHolder();
161+
SeaTunnelRuntimeException exceptSeaTunnelRuntimeException =
162+
CommonError.sqlTemplateHandledError(
163+
tablePath.getFullName(),
164+
SaveModePlaceHolder.getDisplay(primaryKeyHolder),
165+
createTemplate,
166+
primaryKeyHolder,
167+
DorisOptions.SAVE_MODE_CREATE_TEMPLATE.key());
168+
Assertions.assertEquals(
169+
exceptSeaTunnelRuntimeException.getMessage(),
170+
actualSeaTunnelRuntimeException.getMessage());
133171
}
134172

135173
@Test

0 commit comments

Comments
 (0)