From 6d9a964b1ddac3eefb674a72885b60f163e223f9 Mon Sep 17 00:00:00 2001 From: wudi <676366545@qq.com> Date: Thu, 19 Mar 2026 15:49:18 +0800 Subject: [PATCH] fix --- .../sink/batch/DorisBatchStreamLoad.java | 9 +- .../flink/sink/writer/DorisStreamLoad.java | 7 +- .../flink/sink/writer/LoadConstants.java | 2 + .../flink/table/DorisDynamicTableSink.java | 8 +- .../flink/table/DorisDynamicTableSink.java | 8 +- .../doris/flink/sink/DorisSinkITCase.java | 116 ++++++++++++++++++ 6 files changed, 146 insertions(+), 4 deletions(-) diff --git a/flink-doris-connector/flink-doris-connector-base/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java b/flink-doris-connector/flink-doris-connector-base/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java index 4bc0c19dc..eccdc2b52 100644 --- a/flink-doris-connector/flink-doris-connector-base/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java +++ b/flink-doris-connector/flink-doris-connector-base/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java @@ -78,6 +78,8 @@ import static org.apache.doris.flink.sink.writer.LoadConstants.GROUP_COMMIT_OFF_MODE; import static org.apache.doris.flink.sink.writer.LoadConstants.LINE_DELIMITER_DEFAULT; import static org.apache.doris.flink.sink.writer.LoadConstants.LINE_DELIMITER_KEY; +import static org.apache.doris.flink.sink.writer.LoadConstants.UNIQUE_KEY_UPDATE_MODE; +import static org.apache.doris.flink.sink.writer.LoadConstants.UPDATE_FLEXIBLE_COLUMNS; /** async stream load. */ public class DorisBatchStreamLoad implements Serializable { @@ -470,7 +472,12 @@ public void load(String label, BatchRecordBuffer buffer) throws IOException { .setLabel(label) .addCommonHeader() .setEntity(entity) - .addHiddenColumns(executionOptions.getDeletable()) + .addHiddenColumns( + executionOptions.getDeletable() + && !UPDATE_FLEXIBLE_COLUMNS.equalsIgnoreCase( + executionOptions + .getStreamLoadProp() + .getProperty(UNIQUE_KEY_UPDATE_MODE))) .addProperties(executionOptions.getStreamLoadProp()); if (enableGzCompress) { diff --git a/flink-doris-connector/flink-doris-connector-base/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java b/flink-doris-connector/flink-doris-connector-base/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java index 18a979598..e491b15bc 100644 --- a/flink-doris-connector/flink-doris-connector-base/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java +++ b/flink-doris-connector/flink-doris-connector-base/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java @@ -70,6 +70,8 @@ import static org.apache.doris.flink.sink.writer.LoadConstants.GROUP_COMMIT_OFF_MODE; import static org.apache.doris.flink.sink.writer.LoadConstants.LINE_DELIMITER_DEFAULT; import static org.apache.doris.flink.sink.writer.LoadConstants.LINE_DELIMITER_KEY; +import static org.apache.doris.flink.sink.writer.LoadConstants.UNIQUE_KEY_UPDATE_MODE; +import static org.apache.doris.flink.sink.writer.LoadConstants.UPDATE_FLEXIBLE_COLUMNS; /** load data to doris. */ public class DorisStreamLoad implements Serializable { @@ -362,7 +364,10 @@ public void startLoad(String label, boolean isResume) throws IOException { .setUrl(loadUrlStr) .baseAuth(user, passwd) .addCommonHeader() - .addHiddenColumns(enableDelete) + .addHiddenColumns( + enableDelete + && !UPDATE_FLEXIBLE_COLUMNS.equalsIgnoreCase( + streamLoadProp.getProperty(UNIQUE_KEY_UPDATE_MODE))) .setLabel(label) .setEntity(entity) .addProperties(streamLoadProp); diff --git a/flink-doris-connector/flink-doris-connector-base/src/main/java/org/apache/doris/flink/sink/writer/LoadConstants.java b/flink-doris-connector/flink-doris-connector-base/src/main/java/org/apache/doris/flink/sink/writer/LoadConstants.java index 54deb4a03..86965938d 100644 --- a/flink-doris-connector/flink-doris-connector-base/src/main/java/org/apache/doris/flink/sink/writer/LoadConstants.java +++ b/flink-doris-connector/flink-doris-connector-base/src/main/java/org/apache/doris/flink/sink/writer/LoadConstants.java @@ -37,6 +37,8 @@ public class LoadConstants { public static final String ARROW = "arrow"; public static final String NULL_VALUE = "\\N"; public static final String DORIS_DELETE_SIGN = "__DORIS_DELETE_SIGN__"; + public static final String UNIQUE_KEY_UPDATE_MODE = "unique_key_update_mode"; + public static final String UPDATE_FLEXIBLE_COLUMNS = "UPDATE_FLEXIBLE_COLUMNS"; public static final String READ_JSON_BY_LINE = "read_json_by_line"; public static final String GROUP_COMMIT = "group_commit"; public static final String GROUP_COMMIT_OFF_MODE = "off_mode"; diff --git a/flink-doris-connector/flink-doris-connector-flink1/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java b/flink-doris-connector/flink-doris-connector-flink1/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java index f43d49bc7..33a632fde 100644 --- a/flink-doris-connector/flink-doris-connector-flink1/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java +++ b/flink-doris-connector/flink-doris-connector-flink1/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java @@ -49,6 +49,8 @@ import static org.apache.doris.flink.sink.writer.LoadConstants.FIELD_DELIMITER_DEFAULT; import static org.apache.doris.flink.sink.writer.LoadConstants.FIELD_DELIMITER_KEY; import static org.apache.doris.flink.sink.writer.LoadConstants.FORMAT_KEY; +import static org.apache.doris.flink.sink.writer.LoadConstants.UNIQUE_KEY_UPDATE_MODE; +import static org.apache.doris.flink.sink.writer.LoadConstants.UPDATE_FLEXIBLE_COLUMNS; /** DorisDynamicTableSink. */ public class DorisDynamicTableSink implements DynamicTableSink, SupportsOverwrite { @@ -85,10 +87,14 @@ public ChangelogMode getChangelogMode(ChangelogMode changelogMode) { @Override public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { Properties loadProperties = executionOptions.getStreamLoadProp(); + // Flexible column update mode is incompatible with both 'columns' and 'hidden_columns' + boolean isFlexibleColumn = + UPDATE_FLEXIBLE_COLUMNS.equalsIgnoreCase( + loadProperties.getProperty(UNIQUE_KEY_UPDATE_MODE)); boolean deletable = executionOptions.getDeletable() && RestService.isUniqueKeyType(options, readOptions, LOG); - if (!loadProperties.containsKey(COLUMNS_KEY)) { + if (!loadProperties.containsKey(COLUMNS_KEY) && !isFlexibleColumn) { String[] fieldNames = tableSchema.getFieldNames(); Preconditions.checkState(fieldNames != null && fieldNames.length > 0); String columns = diff --git a/flink-doris-connector/flink-doris-connector-flink2/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java b/flink-doris-connector/flink-doris-connector-flink2/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java index b3d75ded5..b6b372c51 100644 --- a/flink-doris-connector/flink-doris-connector-flink2/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java +++ b/flink-doris-connector/flink-doris-connector-flink2/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java @@ -49,6 +49,8 @@ import static org.apache.doris.flink.sink.writer.LoadConstants.FIELD_DELIMITER_DEFAULT; import static org.apache.doris.flink.sink.writer.LoadConstants.FIELD_DELIMITER_KEY; import static org.apache.doris.flink.sink.writer.LoadConstants.FORMAT_KEY; +import static org.apache.doris.flink.sink.writer.LoadConstants.UNIQUE_KEY_UPDATE_MODE; +import static org.apache.doris.flink.sink.writer.LoadConstants.UPDATE_FLEXIBLE_COLUMNS; /** DorisDynamicTableSink. */ public class DorisDynamicTableSink implements DynamicTableSink, SupportsOverwrite { @@ -85,10 +87,14 @@ public ChangelogMode getChangelogMode(ChangelogMode changelogMode) { @Override public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { Properties loadProperties = executionOptions.getStreamLoadProp(); + // Flexible column update mode is incompatible with both 'columns' and 'hidden_columns' + boolean isFlexibleColumn = + UPDATE_FLEXIBLE_COLUMNS.equalsIgnoreCase( + loadProperties.getProperty(UNIQUE_KEY_UPDATE_MODE)); boolean deletable = executionOptions.getDeletable() && RestService.isUniqueKeyType(options, readOptions, LOG); - if (!loadProperties.containsKey(COLUMNS_KEY)) { + if (!loadProperties.containsKey(COLUMNS_KEY) && !isFlexibleColumn) { String[] fieldNames = tableSchema.getFieldNames(); Preconditions.checkState(fieldNames != null && fieldNames.length > 0); String columns = diff --git a/flink-doris-connector/flink-doris-connector-it/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java b/flink-doris-connector/flink-doris-connector-it/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java index 3d1345b88..fdac80e06 100644 --- a/flink-doris-connector/flink-doris-connector-it/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java +++ b/flink-doris-connector/flink-doris-connector-it/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java @@ -20,15 +20,21 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.common.time.Deadline; +import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.RestartStrategyOptions; import org.apache.flink.core.execution.JobClient; import org.apache.flink.runtime.minicluster.RpcServiceSharing; import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.Schema; import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.test.util.MiniClusterWithClientResource; +import org.apache.flink.types.Row; +import org.apache.flink.types.RowKind; import org.apache.flink.util.StringUtils; import com.fasterxml.jackson.databind.ObjectMapper; @@ -44,6 +50,7 @@ import org.apache.doris.flink.table.DorisConfigOptions; import org.apache.doris.flink.utils.MockSource; import org.junit.Assert; +import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; @@ -80,6 +87,7 @@ public class DorisSinkITCase extends AbstractITCaseService { static final String TABLE_CSV_JM = "tbl_csv_jm"; static final String TABLE_CSV_TM = "tbl_csv_tm"; static final String TABLE_UNICODE_COLUMN = "tbl_unicode_column"; + static final String TABLE_FLEXIBLE_COLUMN = "tbl_flexible_column"; private final boolean batchMode; @@ -819,4 +827,112 @@ public void testSinkUnicodeColumn() throws Exception { "select `名称`,`年龄` from %s.%s order by 1", DATABASE, TABLE_UNICODE_COLUMN); ContainerUtils.checkResult(getDorisQueryConnection(), LOG, expected, query, 2); } + + // todo: only test for doris3.1+ + @Ignore + @Test + public void testFlexibleColumnUpdate() throws Exception { + initializeFlexibleColumnTable(TABLE_FLEXIBLE_COLUMN); + + // pre-insert full rows with all columns into Doris + ContainerUtils.executeSQLStatement( + getDorisQueryConnection(), + LOG, + String.format( + "INSERT INTO %s.%s VALUES(1,'doris',18,100),(2,'flink',20,200)", + DATABASE, TABLE_FLEXIBLE_COLUMN)); + + String query = + String.format( + "select id,name,age,score from %s.%s order by 1", + DATABASE, TABLE_FLEXIBLE_COLUMN); + ContainerUtils.checkResult( + getDorisQueryConnection(), + LOG, + Arrays.asList("1,doris,18,100", "2,flink,20,200"), + query, + 4); + + // Build a changelog source: + // INSERT (1, 25) -> update age of id=1 to 25, name/score remain unchanged + // DELETE (2, 20) -> delete the row with id=2 + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(DEFAULT_PARALLELISM); + final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); + + DataStream changelogStream = + env.fromCollection( + Arrays.asList( + Row.ofKind(RowKind.INSERT, 1, 25), + Row.ofKind(RowKind.DELETE, 2, 20)), + Types.ROW_NAMED(new String[] {"id", "age"}, Types.INT, Types.INT)); + + Schema sourceSchema = + Schema.newBuilder() + .column("id", DataTypes.INT().notNull()) + .column("age", DataTypes.INT()) + .primaryKey("id") + .build(); + tEnv.createTemporaryView( + "source_view", tEnv.fromChangelogStream(changelogStream, sourceSchema)); + + // Doris sink declares only 'id' and 'age'; connector will NOT auto-build columns header + // and NOT add hidden_columns header because unique_key_update_mode=UPDATE_FLEXIBLE_COLUMNS + String sinkDDL = + String.format( + "CREATE TABLE doris_flex_sink (" + + " id INT," + + " age INT" + + ") WITH (" + + " 'connector' = '" + + DorisConfigOptions.IDENTIFIER + + "'," + + " 'fenodes' = '%s'," + + " 'table.identifier' = '%s'," + + " 'username' = '%s'," + + " 'password' = '%s'," + + " 'sink.enable.batch-mode' = '%s'," + + " 'sink.label-prefix' = '" + + UUID.randomUUID() + + "'," + + " 'sink.properties.format' = 'json'," + + " 'sink.properties.read_json_by_line' = 'true'," + + " 'sink.properties.unique_key_update_mode' = 'UPDATE_FLEXIBLE_COLUMNS'" + + ")", + getFenodes(), + DATABASE + "." + TABLE_FLEXIBLE_COLUMN, + getDorisUsername(), + getDorisPassword(), + batchMode); + tEnv.executeSql(sinkDDL); + tEnv.executeSql("INSERT INTO doris_flex_sink SELECT id, age FROM source_view"); + + Thread.sleep(10000); + // id=1: age updated to 25, name/score remain unchanged + // id=2: deleted + ContainerUtils.checkResult( + getDorisQueryConnection(), LOG, Arrays.asList("1,doris,25,100"), query, 4); + } + + private void initializeFlexibleColumnTable(String table) { + ContainerUtils.executeSQLStatement( + getDorisQueryConnection(), + LOG, + String.format("CREATE DATABASE IF NOT EXISTS %s", DATABASE), + String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, table), + String.format( + "CREATE TABLE %s.%s (\n" + + " `id` int,\n" + + " `name` varchar(256),\n" + + " `age` int,\n" + + " `score` int\n" + + ") UNIQUE KEY(`id`)\n" + + "DISTRIBUTED BY HASH(`id`) BUCKETS 1\n" + + "PROPERTIES (\n" + + " \"replication_num\" = \"1\",\n" + + " \"enable_unique_key_merge_on_write\" = \"true\",\n" + + " \"enable_unique_key_skip_bitmap_column\" = \"true\"\n" + + ")", + DATABASE, table)); + } }