Skip to content

[Improve] maxcompute options #9163

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Apr 14, 2025
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,6 @@ private Set<String> buildWhiteList() {
whiteList.add("PulsarSinkOptions");
whiteList.add("SlsSinkOptions");
whiteList.add("Neo4jSinkOptions");
whiteList.add("MaxcomputeSinkOptions");
whiteList.add("PaimonSinkOptions");
whiteList.add("TDengineSourceOptions");
whiteList.add("PulsarSourceOptions");
Expand All @@ -206,7 +205,6 @@ private Set<String> buildWhiteList() {
whiteList.add("QdrantSinkOptions");
whiteList.add("MilvusSourceOptions");
whiteList.add("RocketMqSinkOptions");
whiteList.add("MaxcomputeSourceOptions");
whiteList.add("KuduSourceOptions");
whiteList.add("SocketSinkOptions");
whiteList.add("SelectDBSinkOptions");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import org.apache.seatunnel.api.table.catalog.exception.TableAlreadyExistException;
import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException;
import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
import org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeBaseOptions;
import org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeSinkOptions;
import org.apache.seatunnel.connectors.seatunnel.maxcompute.datatype.MaxComputeTypeConverter;
import org.apache.seatunnel.connectors.seatunnel.maxcompute.util.MaxcomputeUtil;

Expand All @@ -55,12 +57,6 @@
import java.util.List;
import java.util.Optional;

import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.ACCESS_ID;
import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.ACCESS_KEY;
import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.ENDPOINT;
import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.PARTITION_SPEC;
import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.PROJECT;
import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.SAVE_MODE_CREATE_TEMPLATE;
import static org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkArgument;

@Slf4j
Expand All @@ -78,7 +74,10 @@ public MaxComputeCatalog(String catalogName, ReadonlyConfig options) {

@Override
public void open() throws CatalogException {
account = new AliyunAccount(readonlyConfig.get(ACCESS_ID), readonlyConfig.get(ACCESS_KEY));
account =
new AliyunAccount(
readonlyConfig.get(MaxcomputeBaseOptions.ACCESS_ID),
readonlyConfig.get(MaxcomputeBaseOptions.ACCESS_KEY));
}

@Override
Expand All @@ -91,13 +90,13 @@ public String name() {

@Override
public String getDefaultDatabase() throws CatalogException {
return readonlyConfig.get(PROJECT);
return readonlyConfig.get(MaxcomputeBaseOptions.PROJECT);
}

@Override
public boolean databaseExists(String databaseName) throws CatalogException {
try {
Odps odps = getOdps(readonlyConfig.get(PROJECT));
Odps odps = getOdps(readonlyConfig.get(MaxcomputeBaseOptions.PROJECT));
Projects projects = odps.projects();
return projects.exists(databaseName);
} catch (OdpsException e) {
Expand All @@ -109,7 +108,7 @@ public boolean databaseExists(String databaseName) throws CatalogException {
public List<String> listDatabases() throws CatalogException {
try {
// todo: how to get all projects
String project = readonlyConfig.get(PROJECT);
String project = readonlyConfig.get(MaxcomputeBaseOptions.PROJECT);
if (databaseExists(project)) {
return Lists.newArrayList(project);
}
Expand Down Expand Up @@ -210,7 +209,8 @@ public void createTable(TablePath tablePath, CatalogTable table, boolean ignoreI
SQLTask.run(
odps,
MaxComputeCatalogUtil.getCreateTableStatement(
readonlyConfig.get(SAVE_MODE_CREATE_TEMPLATE),
readonlyConfig.get(
MaxcomputeSinkOptions.SAVE_MODE_CREATE_TEMPLATE),
tablePath,
table))
.waitForSuccess();
Expand Down Expand Up @@ -250,8 +250,10 @@ public void truncateTable(TablePath tablePath, boolean ignoreIfNotExists)
Odps odps = getOdps(tablePath.getDatabaseName());
Table odpsTable = odps.tables().get(tablePath.getTableName());
if (odpsTable.isPartitioned()
&& StringUtils.isNotEmpty(readonlyConfig.get(PARTITION_SPEC))) {
PartitionSpec partitionSpec = new PartitionSpec(readonlyConfig.get(PARTITION_SPEC));
&& StringUtils.isNotEmpty(
readonlyConfig.get(MaxcomputeBaseOptions.PARTITION_SPEC))) {
PartitionSpec partitionSpec =
new PartitionSpec(readonlyConfig.get(MaxcomputeBaseOptions.PARTITION_SPEC));
odpsTable.deletePartition(partitionSpec, ignoreIfNotExists);
odpsTable.createPartition(partitionSpec, true);
} else {
Expand Down Expand Up @@ -313,7 +315,7 @@ public PreviewResult previewAction(
checkArgument(catalogTable.isPresent(), "CatalogTable cannot be null");
return new SQLPreviewResult(
MaxComputeCatalogUtil.getCreateTableStatement(
readonlyConfig.get(SAVE_MODE_CREATE_TEMPLATE),
readonlyConfig.get(MaxcomputeSinkOptions.SAVE_MODE_CREATE_TEMPLATE),
tablePath,
catalogTable.get()));
} else if (actionType == ActionType.DROP_TABLE) {
Expand All @@ -325,7 +327,7 @@ public PreviewResult previewAction(

private Odps getOdps(String project) {
Odps odps = new Odps(account);
odps.setEndpoint(readonlyConfig.get(ENDPOINT));
odps.setEndpoint(readonlyConfig.get(MaxcomputeBaseOptions.ENDPOINT));
odps.setDefaultProject(project);
return odps;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,10 @@
import org.apache.seatunnel.api.table.catalog.Catalog;
import org.apache.seatunnel.api.table.factory.CatalogFactory;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeBaseOptions;

import com.google.auto.service.AutoService;

import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.ACCESS_ID;
import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.ACCESS_KEY;
import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.ENDPOINT;
import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.PARTITION_SPEC;
import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.PLUGIN_NAME;
import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.PROJECT;
import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.SPLIT_ROW;
import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.TABLE_NAME;

@AutoService(Factory.class)
public class MaxComputeCatalogFactory implements CatalogFactory {

Expand All @@ -45,14 +37,22 @@ public Catalog createCatalog(String catalogName, ReadonlyConfig options) {

@Override
public String factoryIdentifier() {
return PLUGIN_NAME;
return MaxcomputeBaseOptions.PLUGIN_NAME;
}

@Override
public OptionRule optionRule() {
return OptionRule.builder()
.required(ACCESS_ID, ACCESS_KEY, ENDPOINT, PROJECT, TABLE_NAME)
.optional(PARTITION_SPEC, SPLIT_ROW, ConnectorCommonOptions.SCHEMA)
.required(
MaxcomputeBaseOptions.ACCESS_ID,
MaxcomputeBaseOptions.ACCESS_KEY,
MaxcomputeBaseOptions.ENDPOINT,
MaxcomputeBaseOptions.PROJECT,
MaxcomputeBaseOptions.TABLE_NAME)
.optional(
MaxcomputeBaseOptions.PARTITION_SPEC,
MaxcomputeBaseOptions.SPLIT_ROW,
ConnectorCommonOptions.SCHEMA)
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.CommonError;
import org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig;
import org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeBaseOptions;
import org.apache.seatunnel.connectors.seatunnel.maxcompute.datatype.MaxComputeTypeConverter;

import org.apache.commons.collections4.MapUtils;
Expand Down Expand Up @@ -82,7 +82,7 @@ public SeaTunnelDataType<?> toSeaTunnelType(String field, String connectorDataTy
return ArrayType.DOUBLE_ARRAY_TYPE;
default:
throw CommonError.convertToSeaTunnelTypeError(
MaxcomputeConfig.PLUGIN_NAME, connectorDataType, field);
MaxcomputeBaseOptions.PLUGIN_NAME, connectorDataType, field);
}
}
if (connectorDataType.startsWith("STRUCT")) {
Expand Down Expand Up @@ -141,7 +141,7 @@ public SeaTunnelDataType<?> toSeaTunnelType(String field, String connectorDataTy
return BasicType.VOID_TYPE;
default:
throw CommonError.convertToSeaTunnelTypeError(
MaxcomputeConfig.PLUGIN_NAME, connectorDataType, field);
MaxcomputeBaseOptions.PLUGIN_NAME, connectorDataType, field);
}
}

Expand Down Expand Up @@ -183,6 +183,6 @@ public TypeInfo toConnectorType(

@Override
public String getIdentity() {
return MaxcomputeConfig.PLUGIN_NAME;
return MaxcomputeBaseOptions.PLUGIN_NAME;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,15 @@

import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
import org.apache.seatunnel.api.sink.DataSaveMode;
import org.apache.seatunnel.api.sink.SaveModePlaceHolder;
import org.apache.seatunnel.api.sink.SchemaSaveMode;

import java.io.Serializable;
import java.util.List;
import java.util.Map;

public class MaxcomputeConfig implements Serializable {
public class MaxcomputeBaseOptions implements Serializable {

public static final String PLUGIN_NAME = "Maxcompute";

private static final int SPLIT_ROW_DEFAULT = 10000;
public static final Option<String> ACCESS_ID =
Options.key("accessId")
.stringType()
Expand All @@ -51,72 +47,34 @@ public class MaxcomputeConfig implements Serializable {
.stringType()
.noDefaultValue()
.withDescription("Your Maxcompute endpoint start with http");

public static final Option<String> PROJECT =
Options.key("project")
.stringType()
.noDefaultValue()
.withDescription("Your Maxcompute project which is created in Alibaba Cloud");

public static final Option<List<Map<String, Object>>> TABLE_LIST =
Options.key("table_list")
.type(new TypeReference<List<Map<String, Object>>>() {})
.noDefaultValue()
.withDescription("List of tables to be written to MaxCompute.");

public static final Option<String> TABLE_NAME =
Options.key("table_name")
.stringType()
.noDefaultValue()
.withDescription("Target Maxcompute table name eg: fake");

public static final Option<String> PARTITION_SPEC =
Options.key("partition_spec")
.stringType()
.noDefaultValue()
.withDescription("This spec of Maxcompute partition table.");

public static final Option<Integer> SPLIT_ROW =
Options.key("split_row")
.intType()
.defaultValue(SPLIT_ROW_DEFAULT)
.defaultValue(10000)
.withDescription("Number of rows per split. default: 10000");
public static final Option<Boolean> OVERWRITE =
Options.key("overwrite")
.booleanType()
.defaultValue(false)
.withDescription("Whether to overwrite the table or partition");

public static final Option<SchemaSaveMode> SCHEMA_SAVE_MODE =
Options.key("schema_save_mode")
.enumType(SchemaSaveMode.class)
.defaultValue(SchemaSaveMode.CREATE_SCHEMA_WHEN_NOT_EXIST)
.withDescription("schema_save_mode");

public static final Option<DataSaveMode> DATA_SAVE_MODE =
Options.key("data_save_mode")
.enumType(DataSaveMode.class)
.defaultValue(DataSaveMode.APPEND_DATA)
.withDescription("data_save_mode");

public static final Option<String> CUSTOM_SQL =
Options.key("custom_sql").stringType().noDefaultValue().withDescription("custom_sql");

public static final Option<List<String>> READ_COLUMNS =
Options.key("read_columns")
.listType()
.noDefaultValue()
.withDescription("The read columns of the table");

public static final Option<List<Map<String, Object>>> TABLE_LIST =
Options.key("table_list")
.type(new TypeReference<List<Map<String, Object>>>() {})
.noDefaultValue()
.withDescription("List of tables to be written to MaxCompute.");

// create table
public static final Option<String> SAVE_MODE_CREATE_TEMPLATE =
Options.key("save_mode_create_template")
.stringType()
.defaultValue(
"CREATE TABLE IF NOT EXISTS `"
+ SaveModePlaceHolder.TABLE.getPlaceHolder()
+ "` (\n"
+ SaveModePlaceHolder.ROWTYPE_FIELDS.getPlaceHolder()
+ "\n"
+ ") COMMENT '"
+ SaveModePlaceHolder.COMMENT.getPlaceHolder()
+ "' ;")
.withDescription(
"Create table statement template, used to create MaxCompute table");
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.maxcompute.config;

import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
import org.apache.seatunnel.api.sink.DataSaveMode;
import org.apache.seatunnel.api.sink.SaveModePlaceHolder;
import org.apache.seatunnel.api.sink.SchemaSaveMode;

public class MaxcomputeSinkOptions extends MaxcomputeBaseOptions {

public static final Option<Boolean> OVERWRITE =
Options.key("overwrite")
.booleanType()
.defaultValue(false)
.withDescription("Whether to overwrite the table or partition");

public static final Option<SchemaSaveMode> SCHEMA_SAVE_MODE =
Options.key("schema_save_mode")
.enumType(SchemaSaveMode.class)
.defaultValue(SchemaSaveMode.CREATE_SCHEMA_WHEN_NOT_EXIST)
.withDescription("schema_save_mode");

public static final Option<DataSaveMode> DATA_SAVE_MODE =
Options.key("data_save_mode")
.enumType(DataSaveMode.class)
.defaultValue(DataSaveMode.APPEND_DATA)
.withDescription("data_save_mode");

public static final Option<String> CUSTOM_SQL =
Options.key("custom_sql").stringType().noDefaultValue().withDescription("custom_sql");

// create table
public static final Option<String> SAVE_MODE_CREATE_TEMPLATE =
Options.key("save_mode_create_template")
.stringType()
.defaultValue(
"CREATE TABLE IF NOT EXISTS `"
+ SaveModePlaceHolder.TABLE.getPlaceHolder()
+ "` (\n"
+ SaveModePlaceHolder.ROWTYPE_FIELDS.getPlaceHolder()
+ "\n"
+ ") COMMENT '"
+ SaveModePlaceHolder.COMMENT.getPlaceHolder()
+ "' ;")
.withDescription(
"Create table statement template, used to create MaxCompute table");
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.maxcompute.config;

import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;

import java.util.List;

public class MaxcomputeSourceOptions extends MaxcomputeBaseOptions {

public static final Option<List<String>> READ_COLUMNS =
Options.key("read_columns")
.listType()
.noDefaultValue()
.withDescription("The read columns of the table");
}
Loading
Loading