Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/actions/get-workflow-origin
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import static org.apache.seatunnel.connectors.doris.config.DorisBaseOptions.QUERY_PORT;
import static org.apache.seatunnel.connectors.doris.config.DorisBaseOptions.TABLE;
import static org.apache.seatunnel.connectors.doris.config.DorisBaseOptions.USERNAME;
import static org.apache.seatunnel.connectors.doris.config.DorisSinkOptions.CASE_SENSITIVE;
import static org.apache.seatunnel.connectors.doris.config.DorisSinkOptions.DORIS_SINK_CONFIG_PREFIX;
import static org.apache.seatunnel.connectors.doris.config.DorisSinkOptions.NEEDS_UNSUPPORTED_TYPE_CASTING;
import static org.apache.seatunnel.connectors.doris.config.DorisSinkOptions.SAVE_MODE_CREATE_TEMPLATE;
Expand Down Expand Up @@ -71,6 +72,7 @@ public class DorisSinkConfig implements Serializable {
private Integer bufferCount;
private Properties streamLoadProps;
private boolean needsUnsupportedTypeCasting;
private boolean caseSensitive;

// create table option
private String createTableTemplate;
Expand Down Expand Up @@ -102,7 +104,7 @@ public static DorisSinkConfig of(ReadonlyConfig config) {
dorisSinkConfig.setBufferCount(config.get(SINK_BUFFER_COUNT));
dorisSinkConfig.setEnableDelete(config.get(SINK_ENABLE_DELETE));
dorisSinkConfig.setNeedsUnsupportedTypeCasting(config.get(NEEDS_UNSUPPORTED_TYPE_CASTING));

dorisSinkConfig.setCaseSensitive(config.get(CASE_SENSITIVE));
// create table option
dorisSinkConfig.setCreateTableTemplate(config.get(SAVE_MODE_CREATE_TEMPLATE));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,13 @@ public class DorisSinkOptions extends DorisBaseOptions {
.withDescription(
"Whether to enable the unsupported type casting, such as Decimal64 to Double");

public static final Option<Boolean> CASE_SENSITIVE =
Options.key("case_sensitive")
.booleanType()
.defaultValue(true)
.withDescription(
"Whether to preserve the original case of table and column names. Default is true (case sensitive)");

// create table
public static final Option<String> SAVE_MODE_CREATE_TEMPLATE =
Options.key("save_mode_create_template")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import static org.apache.seatunnel.connectors.doris.config.DorisBaseOptions.DATABASE;
import static org.apache.seatunnel.connectors.doris.config.DorisBaseOptions.DORIS_BATCH_SIZE;
import static org.apache.seatunnel.connectors.doris.config.DorisBaseOptions.TABLE;
import static org.apache.seatunnel.connectors.doris.config.DorisSinkOptions.CASE_SENSITIVE;
import static org.apache.seatunnel.connectors.doris.config.DorisSourceOptions.DORIS_EXEC_MEM_LIMIT;
import static org.apache.seatunnel.connectors.doris.config.DorisSourceOptions.DORIS_FILTER_QUERY;
import static org.apache.seatunnel.connectors.doris.config.DorisSourceOptions.DORIS_READ_FIELD;
Expand Down Expand Up @@ -78,6 +79,21 @@ public static List<DorisTableConfig> of(ReadonlyConfig connectorConfig) {
if (connectorConfig.getOptional(TABLE_LIST).isPresent()) {
tableList = connectorConfig.get(TABLE_LIST);
} else {
DorisTableConfig dorisTableConfig = new DorisTableConfig();
dorisTableConfig.setDatabase(connectorConfig.get(DATABASE));
dorisTableConfig.setTable(connectorConfig.get(TABLE));

// 处理大小写敏感
boolean caseSensitive = true;
if (connectorConfig.getOptional(CASE_SENSITIVE).isPresent()) {
caseSensitive = connectorConfig.get(CASE_SENSITIVE);
}

if (!caseSensitive) {
dorisTableConfig.setDatabase(dorisTableConfig.getDatabase().toLowerCase());
dorisTableConfig.setTable(dorisTableConfig.getTable().toLowerCase());
}

DorisTableConfig tableProperty =
DorisTableConfig.builder()
.table(connectorConfig.get(TABLE))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,17 @@ protected PhysicalColumn.PhysicalColumnBuilder getPhysicalColumnBuilder(
return builder;
}

protected PhysicalColumn.PhysicalColumnBuilder getPhysicalColumnBuilder(
BasicTypeDefine typeDefine, boolean caseSensitive) {
String columnName =
caseSensitive ? typeDefine.getName() : typeDefine.getName().toLowerCase();
return PhysicalColumn.builder()
.name(columnName)
.nullable(typeDefine.isNullable())
.comment(typeDefine.getComment())
.defaultValue(typeDefine.getDefaultValue());
}

protected BasicTypeDefine.BasicTypeDefineBuilder getBasicTypeDefineBuilder(Column column) {
BasicTypeDefine.BasicTypeDefineBuilder builder =
BasicTypeDefine.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,12 @@ public String identifier() {

@Override
public Column convert(BasicTypeDefine typeDefine) {
PhysicalColumn.PhysicalColumnBuilder builder = getPhysicalColumnBuilder(typeDefine);
return convert(typeDefine, true);
}

public Column convert(BasicTypeDefine typeDefine, boolean caseSensitive) {
PhysicalColumn.PhysicalColumnBuilder builder =
getPhysicalColumnBuilder(typeDefine, caseSensitive);
String dorisColumnType = getDorisColumnName(typeDefine);

switch (dorisColumnType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,12 @@ public String identifier() {

@Override
public Column convert(BasicTypeDefine typeDefine) {
PhysicalColumn.PhysicalColumnBuilder builder = getPhysicalColumnBuilder(typeDefine);
return convert(typeDefine, true);
}

public Column convert(BasicTypeDefine typeDefine, boolean caseSensitive) {
PhysicalColumn.PhysicalColumnBuilder builder =
getPhysicalColumnBuilder(typeDefine, caseSensitive);
String dorisColumnType = getDorisColumnName(typeDefine);

switch (dorisColumnType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,27 +45,45 @@ public class SeaTunnelRowSerializer implements DorisSerializer {
private final String fieldDelimiter;
private final boolean enableDelete;
private final SerializationSchema serialize;
private final boolean caseSensitive;

public SeaTunnelRowSerializer(
String type,
SeaTunnelRowType seaTunnelRowType,
String fieldDelimiter,
boolean enableDelete) {
this(type, seaTunnelRowType, fieldDelimiter, enableDelete, true);
}

public SeaTunnelRowSerializer(
String type,
SeaTunnelRowType seaTunnelRowType,
String fieldDelimiter,
boolean enableDelete,
boolean caseSensitive) {
this.type = type;
this.fieldDelimiter = fieldDelimiter;
this.enableDelete = enableDelete;
List<Object> fieldNames = new ArrayList<>(Arrays.asList(seaTunnelRowType.getFieldNames()));
this.caseSensitive = caseSensitive;

String[] fieldNames = seaTunnelRowType.getFieldNames();
Copy link

Copilot AI May 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] Add a brief comment to explain why we are converting field names based on the case_sensitive flag to aid future maintainers.

Suggested change
String[] fieldNames = seaTunnelRowType.getFieldNames();
String[] fieldNames = seaTunnelRowType.getFieldNames();
// Normalize field names based on the caseSensitive flag.
// If caseSensitive is false, convert field names to lowercase to ensure consistent handling
// in case-insensitive environments.

Copilot uses AI. Check for mistakes.
String[] processedFieldNames = new String[fieldNames.length];
for (int i = 0; i < fieldNames.length; i++) {
processedFieldNames[i] = caseSensitive ? fieldNames[i] : fieldNames[i].toLowerCase();
}

List<Object> fieldNamesList = new ArrayList<>(Arrays.asList(processedFieldNames));
List<SeaTunnelDataType<?>> fieldTypes =
new ArrayList<>(Arrays.asList(seaTunnelRowType.getFieldTypes()));

if (enableDelete) {
fieldNames.add(LoadConstants.DORIS_DELETE_SIGN);
fieldNamesList.add(LoadConstants.DORIS_DELETE_SIGN);
fieldTypes.add(STRING_TYPE);
}

this.seaTunnelRowType =
new SeaTunnelRowType(
fieldNames.toArray(new String[0]),
fieldNamesList.toArray(new String[0]),
fieldTypes.toArray(new SeaTunnelDataType<?>[0]));

if (JSON.equals(type)) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.doris.serialize;

import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.doris.config.DorisSinkConfig;
import org.apache.seatunnel.connectors.doris.sink.writer.LoadConstants;

public class SeaTunnelRowSerializerFactory {

/**
* Create a DorisSerializer instance
*
* @param dorisSinkConfig
* @param seaTunnelRowType
* @return DorisSerializer
*/
public static DorisSerializer createSerializer(
DorisSinkConfig dorisSinkConfig, SeaTunnelRowType seaTunnelRowType) {
return new SeaTunnelRowSerializer(
dorisSinkConfig
.getStreamLoadProps()
.getProperty(LoadConstants.FORMAT_KEY)
.toLowerCase(),
seaTunnelRowType,
dorisSinkConfig.getStreamLoadProps().getProperty(LoadConstants.FIELD_DELIMITER_KEY),
dorisSinkConfig.getEnableDelete(),
dorisSinkConfig.isCaseSensitive());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
import org.apache.seatunnel.connectors.doris.rest.models.RespContent;
import org.apache.seatunnel.connectors.doris.schema.SchemaChangeManager;
import org.apache.seatunnel.connectors.doris.serialize.DorisSerializer;
import org.apache.seatunnel.connectors.doris.serialize.SeaTunnelRowSerializer;
import org.apache.seatunnel.connectors.doris.serialize.SeaTunnelRowSerializerFactory;
import org.apache.seatunnel.connectors.doris.sink.LoadStatus;
import org.apache.seatunnel.connectors.doris.sink.committer.DorisCommitInfo;
import org.apache.seatunnel.connectors.doris.util.HttpUtil;
Expand Down Expand Up @@ -266,13 +266,6 @@ public void close() throws IOException {

private DorisSerializer createSerializer(
DorisSinkConfig dorisSinkConfig, SeaTunnelRowType seaTunnelRowType) {
return new SeaTunnelRowSerializer(
dorisSinkConfig
.getStreamLoadProps()
.getProperty(LoadConstants.FORMAT_KEY)
.toLowerCase(),
seaTunnelRowType,
dorisSinkConfig.getStreamLoadProps().getProperty(LoadConstants.FIELD_DELIMITER_KEY),
dorisSinkConfig.getEnableDelete());
return SeaTunnelRowSerializerFactory.createSerializer(dorisSinkConfig, seaTunnelRowType);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,10 @@ public DorisStreamLoad(
CloseableHttpClient httpClient) {
this.hostPort = hostPort;
this.db = tablePath.getDatabaseName();
this.table = tablePath.getTableName();
this.table =
dorisSinkConfig.isCaseSensitive()
? tablePath.getTableName()
: tablePath.getTableName().toLowerCase();
this.user = dorisSinkConfig.getUsername();
this.passwd = dorisSinkConfig.getPassword();
this.labelGenerator = labelGenerator;
Expand Down