Skip to content

Commit fdacbae

Browse files
liunaijieHisoka-X
andauthored
[Improve] maxcompute options (#9163)
Co-authored-by: Jia Fan <[email protected]>
1 parent ce15415 commit fdacbae

File tree

17 files changed

+254
-207
lines changed

17 files changed

+254
-207
lines changed

Diff for: seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java

-2
Original file line numberDiff line numberDiff line change
@@ -195,15 +195,13 @@ private Set<String> buildWhiteList() {
195195
whiteList.add("PulsarSinkOptions");
196196
whiteList.add("SlsSinkOptions");
197197
whiteList.add("Neo4jSinkOptions");
198-
whiteList.add("MaxcomputeSinkOptions");
199198
whiteList.add("TDengineSourceOptions");
200199
whiteList.add("PulsarSourceOptions");
201200
whiteList.add("MongodbSinkOptions");
202201
whiteList.add("SlsSourceOptions");
203202
whiteList.add("SentrySinkOptions");
204203
whiteList.add("QdrantSinkOptions");
205204
whiteList.add("RocketMqSinkOptions");
206-
whiteList.add("MaxcomputeSourceOptions");
207205
whiteList.add("SocketSinkOptions");
208206
whiteList.add("SelectDBSinkOptions");
209207
whiteList.add("RocketMqSourceOptions");

Diff for: seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeCatalog.java

+17-15
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@
3333
import org.apache.seatunnel.api.table.catalog.exception.TableAlreadyExistException;
3434
import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException;
3535
import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
36+
import org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeBaseOptions;
37+
import org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeSinkOptions;
3638
import org.apache.seatunnel.connectors.seatunnel.maxcompute.datatype.MaxComputeTypeConverter;
3739
import org.apache.seatunnel.connectors.seatunnel.maxcompute.util.MaxcomputeUtil;
3840

@@ -55,12 +57,6 @@
5557
import java.util.List;
5658
import java.util.Optional;
5759

58-
import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.ACCESS_ID;
59-
import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.ACCESS_KEY;
60-
import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.ENDPOINT;
61-
import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.PARTITION_SPEC;
62-
import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.PROJECT;
63-
import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.SAVE_MODE_CREATE_TEMPLATE;
6460
import static org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkArgument;
6561

6662
@Slf4j
@@ -78,7 +74,10 @@ public MaxComputeCatalog(String catalogName, ReadonlyConfig options) {
7874

7975
@Override
8076
public void open() throws CatalogException {
81-
account = new AliyunAccount(readonlyConfig.get(ACCESS_ID), readonlyConfig.get(ACCESS_KEY));
77+
account =
78+
new AliyunAccount(
79+
readonlyConfig.get(MaxcomputeBaseOptions.ACCESS_ID),
80+
readonlyConfig.get(MaxcomputeBaseOptions.ACCESS_KEY));
8281
}
8382

8483
@Override
@@ -91,13 +90,13 @@ public String name() {
9190

9291
@Override
9392
public String getDefaultDatabase() throws CatalogException {
94-
return readonlyConfig.get(PROJECT);
93+
return readonlyConfig.get(MaxcomputeBaseOptions.PROJECT);
9594
}
9695

9796
@Override
9897
public boolean databaseExists(String databaseName) throws CatalogException {
9998
try {
100-
Odps odps = getOdps(readonlyConfig.get(PROJECT));
99+
Odps odps = getOdps(readonlyConfig.get(MaxcomputeBaseOptions.PROJECT));
101100
Projects projects = odps.projects();
102101
return projects.exists(databaseName);
103102
} catch (OdpsException e) {
@@ -109,7 +108,7 @@ public boolean databaseExists(String databaseName) throws CatalogException {
109108
public List<String> listDatabases() throws CatalogException {
110109
try {
111110
// todo: how to get all projects
112-
String project = readonlyConfig.get(PROJECT);
111+
String project = readonlyConfig.get(MaxcomputeBaseOptions.PROJECT);
113112
if (databaseExists(project)) {
114113
return Lists.newArrayList(project);
115114
}
@@ -210,7 +209,8 @@ public void createTable(TablePath tablePath, CatalogTable table, boolean ignoreI
210209
SQLTask.run(
211210
odps,
212211
MaxComputeCatalogUtil.getCreateTableStatement(
213-
readonlyConfig.get(SAVE_MODE_CREATE_TEMPLATE),
212+
readonlyConfig.get(
213+
MaxcomputeSinkOptions.SAVE_MODE_CREATE_TEMPLATE),
214214
tablePath,
215215
table))
216216
.waitForSuccess();
@@ -250,8 +250,10 @@ public void truncateTable(TablePath tablePath, boolean ignoreIfNotExists)
250250
Odps odps = getOdps(tablePath.getDatabaseName());
251251
Table odpsTable = odps.tables().get(tablePath.getTableName());
252252
if (odpsTable.isPartitioned()
253-
&& StringUtils.isNotEmpty(readonlyConfig.get(PARTITION_SPEC))) {
254-
PartitionSpec partitionSpec = new PartitionSpec(readonlyConfig.get(PARTITION_SPEC));
253+
&& StringUtils.isNotEmpty(
254+
readonlyConfig.get(MaxcomputeBaseOptions.PARTITION_SPEC))) {
255+
PartitionSpec partitionSpec =
256+
new PartitionSpec(readonlyConfig.get(MaxcomputeBaseOptions.PARTITION_SPEC));
255257
odpsTable.deletePartition(partitionSpec, ignoreIfNotExists);
256258
odpsTable.createPartition(partitionSpec, true);
257259
} else {
@@ -313,7 +315,7 @@ public PreviewResult previewAction(
313315
checkArgument(catalogTable.isPresent(), "CatalogTable cannot be null");
314316
return new SQLPreviewResult(
315317
MaxComputeCatalogUtil.getCreateTableStatement(
316-
readonlyConfig.get(SAVE_MODE_CREATE_TEMPLATE),
318+
readonlyConfig.get(MaxcomputeSinkOptions.SAVE_MODE_CREATE_TEMPLATE),
317319
tablePath,
318320
catalogTable.get()));
319321
} else if (actionType == ActionType.DROP_TABLE) {
@@ -325,7 +327,7 @@ public PreviewResult previewAction(
325327

326328
private Odps getOdps(String project) {
327329
Odps odps = new Odps(account);
328-
odps.setEndpoint(readonlyConfig.get(ENDPOINT));
330+
odps.setEndpoint(readonlyConfig.get(MaxcomputeBaseOptions.ENDPOINT));
329331
odps.setDefaultProject(project);
330332
return odps;
331333
}

Diff for: seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeCatalogFactory.java

+12-12
Original file line numberDiff line numberDiff line change
@@ -23,18 +23,10 @@
2323
import org.apache.seatunnel.api.table.catalog.Catalog;
2424
import org.apache.seatunnel.api.table.factory.CatalogFactory;
2525
import org.apache.seatunnel.api.table.factory.Factory;
26+
import org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeBaseOptions;
2627

2728
import com.google.auto.service.AutoService;
2829

29-
import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.ACCESS_ID;
30-
import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.ACCESS_KEY;
31-
import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.ENDPOINT;
32-
import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.PARTITION_SPEC;
33-
import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.PLUGIN_NAME;
34-
import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.PROJECT;
35-
import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.SPLIT_ROW;
36-
import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.TABLE_NAME;
37-
3830
@AutoService(Factory.class)
3931
public class MaxComputeCatalogFactory implements CatalogFactory {
4032

@@ -45,14 +37,22 @@ public Catalog createCatalog(String catalogName, ReadonlyConfig options) {
4537

4638
@Override
4739
public String factoryIdentifier() {
48-
return PLUGIN_NAME;
40+
return MaxcomputeBaseOptions.PLUGIN_NAME;
4941
}
5042

5143
@Override
5244
public OptionRule optionRule() {
5345
return OptionRule.builder()
54-
.required(ACCESS_ID, ACCESS_KEY, ENDPOINT, PROJECT, TABLE_NAME)
55-
.optional(PARTITION_SPEC, SPLIT_ROW, ConnectorCommonOptions.SCHEMA)
46+
.required(
47+
MaxcomputeBaseOptions.ACCESS_ID,
48+
MaxcomputeBaseOptions.ACCESS_KEY,
49+
MaxcomputeBaseOptions.ENDPOINT,
50+
MaxcomputeBaseOptions.PROJECT,
51+
MaxcomputeBaseOptions.TABLE_NAME)
52+
.optional(
53+
MaxcomputeBaseOptions.PARTITION_SPEC,
54+
MaxcomputeBaseOptions.SPLIT_ROW,
55+
ConnectorCommonOptions.SCHEMA)
5656
.build();
5757
}
5858
}

Diff for: seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeDataTypeConvertor.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
3030
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
3131
import org.apache.seatunnel.common.exception.CommonError;
32-
import org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig;
32+
import org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeBaseOptions;
3333
import org.apache.seatunnel.connectors.seatunnel.maxcompute.datatype.MaxComputeTypeConverter;
3434

3535
import org.apache.commons.collections4.MapUtils;
@@ -82,7 +82,7 @@ public SeaTunnelDataType<?> toSeaTunnelType(String field, String connectorDataTy
8282
return ArrayType.DOUBLE_ARRAY_TYPE;
8383
default:
8484
throw CommonError.convertToSeaTunnelTypeError(
85-
MaxcomputeConfig.PLUGIN_NAME, connectorDataType, field);
85+
MaxcomputeBaseOptions.PLUGIN_NAME, connectorDataType, field);
8686
}
8787
}
8888
if (connectorDataType.startsWith("STRUCT")) {
@@ -141,7 +141,7 @@ public SeaTunnelDataType<?> toSeaTunnelType(String field, String connectorDataTy
141141
return BasicType.VOID_TYPE;
142142
default:
143143
throw CommonError.convertToSeaTunnelTypeError(
144-
MaxcomputeConfig.PLUGIN_NAME, connectorDataType, field);
144+
MaxcomputeBaseOptions.PLUGIN_NAME, connectorDataType, field);
145145
}
146146
}
147147

@@ -183,6 +183,6 @@ public TypeInfo toConnectorType(
183183

184184
@Override
185185
public String getIdentity() {
186-
return MaxcomputeConfig.PLUGIN_NAME;
186+
return MaxcomputeBaseOptions.PLUGIN_NAME;
187187
}
188188
}
+12-54
Original file line numberDiff line numberDiff line change
@@ -21,19 +21,15 @@
2121

2222
import org.apache.seatunnel.api.configuration.Option;
2323
import org.apache.seatunnel.api.configuration.Options;
24-
import org.apache.seatunnel.api.sink.DataSaveMode;
25-
import org.apache.seatunnel.api.sink.SaveModePlaceHolder;
26-
import org.apache.seatunnel.api.sink.SchemaSaveMode;
2724

2825
import java.io.Serializable;
2926
import java.util.List;
3027
import java.util.Map;
3128

32-
public class MaxcomputeConfig implements Serializable {
29+
public class MaxcomputeBaseOptions implements Serializable {
3330

3431
public static final String PLUGIN_NAME = "Maxcompute";
3532

36-
private static final int SPLIT_ROW_DEFAULT = 10000;
3733
public static final Option<String> ACCESS_ID =
3834
Options.key("accessId")
3935
.stringType()
@@ -51,72 +47,34 @@ public class MaxcomputeConfig implements Serializable {
5147
.stringType()
5248
.noDefaultValue()
5349
.withDescription("Your Maxcompute endpoint start with http");
50+
5451
public static final Option<String> PROJECT =
5552
Options.key("project")
5653
.stringType()
5754
.noDefaultValue()
5855
.withDescription("Your Maxcompute project which is created in Alibaba Cloud");
56+
57+
public static final Option<List<Map<String, Object>>> TABLE_LIST =
58+
Options.key("table_list")
59+
.type(new TypeReference<List<Map<String, Object>>>() {})
60+
.noDefaultValue()
61+
.withDescription("List of tables to be written to MaxCompute.");
62+
5963
public static final Option<String> TABLE_NAME =
6064
Options.key("table_name")
6165
.stringType()
6266
.noDefaultValue()
6367
.withDescription("Target Maxcompute table name eg: fake");
68+
6469
public static final Option<String> PARTITION_SPEC =
6570
Options.key("partition_spec")
6671
.stringType()
6772
.noDefaultValue()
6873
.withDescription("This spec of Maxcompute partition table.");
74+
6975
public static final Option<Integer> SPLIT_ROW =
7076
Options.key("split_row")
7177
.intType()
72-
.defaultValue(SPLIT_ROW_DEFAULT)
78+
.defaultValue(10000)
7379
.withDescription("Number of rows per split. default: 10000");
74-
public static final Option<Boolean> OVERWRITE =
75-
Options.key("overwrite")
76-
.booleanType()
77-
.defaultValue(false)
78-
.withDescription("Whether to overwrite the table or partition");
79-
80-
public static final Option<SchemaSaveMode> SCHEMA_SAVE_MODE =
81-
Options.key("schema_save_mode")
82-
.enumType(SchemaSaveMode.class)
83-
.defaultValue(SchemaSaveMode.CREATE_SCHEMA_WHEN_NOT_EXIST)
84-
.withDescription("schema_save_mode");
85-
86-
public static final Option<DataSaveMode> DATA_SAVE_MODE =
87-
Options.key("data_save_mode")
88-
.enumType(DataSaveMode.class)
89-
.defaultValue(DataSaveMode.APPEND_DATA)
90-
.withDescription("data_save_mode");
91-
92-
public static final Option<String> CUSTOM_SQL =
93-
Options.key("custom_sql").stringType().noDefaultValue().withDescription("custom_sql");
94-
95-
public static final Option<List<String>> READ_COLUMNS =
96-
Options.key("read_columns")
97-
.listType()
98-
.noDefaultValue()
99-
.withDescription("The read columns of the table");
100-
101-
public static final Option<List<Map<String, Object>>> TABLE_LIST =
102-
Options.key("table_list")
103-
.type(new TypeReference<List<Map<String, Object>>>() {})
104-
.noDefaultValue()
105-
.withDescription("List of tables to be written to MaxCompute.");
106-
107-
// create table
108-
public static final Option<String> SAVE_MODE_CREATE_TEMPLATE =
109-
Options.key("save_mode_create_template")
110-
.stringType()
111-
.defaultValue(
112-
"CREATE TABLE IF NOT EXISTS `"
113-
+ SaveModePlaceHolder.TABLE.getPlaceHolder()
114-
+ "` (\n"
115-
+ SaveModePlaceHolder.ROWTYPE_FIELDS.getPlaceHolder()
116-
+ "\n"
117-
+ ") COMMENT '"
118-
+ SaveModePlaceHolder.COMMENT.getPlaceHolder()
119-
+ "' ;")
120-
.withDescription(
121-
"Create table statement template, used to create MaxCompute table");
12280
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.seatunnel.maxcompute.config;
19+
20+
import org.apache.seatunnel.api.configuration.Option;
21+
import org.apache.seatunnel.api.configuration.Options;
22+
import org.apache.seatunnel.api.sink.DataSaveMode;
23+
import org.apache.seatunnel.api.sink.SaveModePlaceHolder;
24+
import org.apache.seatunnel.api.sink.SchemaSaveMode;
25+
26+
public class MaxcomputeSinkOptions extends MaxcomputeBaseOptions {
27+
28+
public static final Option<Boolean> OVERWRITE =
29+
Options.key("overwrite")
30+
.booleanType()
31+
.defaultValue(false)
32+
.withDescription("Whether to overwrite the table or partition");
33+
34+
public static final Option<SchemaSaveMode> SCHEMA_SAVE_MODE =
35+
Options.key("schema_save_mode")
36+
.enumType(SchemaSaveMode.class)
37+
.defaultValue(SchemaSaveMode.CREATE_SCHEMA_WHEN_NOT_EXIST)
38+
.withDescription("schema_save_mode");
39+
40+
public static final Option<DataSaveMode> DATA_SAVE_MODE =
41+
Options.key("data_save_mode")
42+
.enumType(DataSaveMode.class)
43+
.defaultValue(DataSaveMode.APPEND_DATA)
44+
.withDescription("data_save_mode");
45+
46+
public static final Option<String> CUSTOM_SQL =
47+
Options.key("custom_sql").stringType().noDefaultValue().withDescription("custom_sql");
48+
49+
// create table
50+
public static final Option<String> SAVE_MODE_CREATE_TEMPLATE =
51+
Options.key("save_mode_create_template")
52+
.stringType()
53+
.defaultValue(
54+
"CREATE TABLE IF NOT EXISTS `"
55+
+ SaveModePlaceHolder.TABLE.getPlaceHolder()
56+
+ "` (\n"
57+
+ SaveModePlaceHolder.ROWTYPE_FIELDS.getPlaceHolder()
58+
+ "\n"
59+
+ ") COMMENT '"
60+
+ SaveModePlaceHolder.COMMENT.getPlaceHolder()
61+
+ "' ;")
62+
.withDescription(
63+
"Create table statement template, used to create MaxCompute table");
64+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.seatunnel.maxcompute.config;
19+
20+
import org.apache.seatunnel.api.configuration.Option;
21+
import org.apache.seatunnel.api.configuration.Options;
22+
23+
import java.util.List;
24+
25+
public class MaxcomputeSourceOptions extends MaxcomputeBaseOptions {
26+
27+
public static final Option<List<String>> READ_COLUMNS =
28+
Options.key("read_columns")
29+
.listType()
30+
.noDefaultValue()
31+
.withDescription("The read columns of the table");
32+
}

0 commit comments

Comments
 (0)