Skip to content

Commit 6d9a964

Browse files
committed
fix
1 parent f569e71 commit 6d9a964

6 files changed

Lines changed: 146 additions & 4 deletions

File tree

flink-doris-connector/flink-doris-connector-base/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,8 @@
7878
import static org.apache.doris.flink.sink.writer.LoadConstants.GROUP_COMMIT_OFF_MODE;
7979
import static org.apache.doris.flink.sink.writer.LoadConstants.LINE_DELIMITER_DEFAULT;
8080
import static org.apache.doris.flink.sink.writer.LoadConstants.LINE_DELIMITER_KEY;
81+
import static org.apache.doris.flink.sink.writer.LoadConstants.UNIQUE_KEY_UPDATE_MODE;
82+
import static org.apache.doris.flink.sink.writer.LoadConstants.UPDATE_FLEXIBLE_COLUMNS;
8183

8284
/** async stream load. */
8385
public class DorisBatchStreamLoad implements Serializable {
@@ -470,7 +472,12 @@ public void load(String label, BatchRecordBuffer buffer) throws IOException {
470472
.setLabel(label)
471473
.addCommonHeader()
472474
.setEntity(entity)
473-
.addHiddenColumns(executionOptions.getDeletable())
475+
.addHiddenColumns(
476+
executionOptions.getDeletable()
477+
&& !UPDATE_FLEXIBLE_COLUMNS.equalsIgnoreCase(
478+
executionOptions
479+
.getStreamLoadProp()
480+
.getProperty(UNIQUE_KEY_UPDATE_MODE)))
474481
.addProperties(executionOptions.getStreamLoadProp());
475482

476483
if (enableGzCompress) {

flink-doris-connector/flink-doris-connector-base/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,8 @@
7070
import static org.apache.doris.flink.sink.writer.LoadConstants.GROUP_COMMIT_OFF_MODE;
7171
import static org.apache.doris.flink.sink.writer.LoadConstants.LINE_DELIMITER_DEFAULT;
7272
import static org.apache.doris.flink.sink.writer.LoadConstants.LINE_DELIMITER_KEY;
73+
import static org.apache.doris.flink.sink.writer.LoadConstants.UNIQUE_KEY_UPDATE_MODE;
74+
import static org.apache.doris.flink.sink.writer.LoadConstants.UPDATE_FLEXIBLE_COLUMNS;
7375

7476
/** load data to doris. */
7577
public class DorisStreamLoad implements Serializable {
@@ -362,7 +364,10 @@ public void startLoad(String label, boolean isResume) throws IOException {
362364
.setUrl(loadUrlStr)
363365
.baseAuth(user, passwd)
364366
.addCommonHeader()
365-
.addHiddenColumns(enableDelete)
367+
.addHiddenColumns(
368+
enableDelete
369+
&& !UPDATE_FLEXIBLE_COLUMNS.equalsIgnoreCase(
370+
streamLoadProp.getProperty(UNIQUE_KEY_UPDATE_MODE)))
366371
.setLabel(label)
367372
.setEntity(entity)
368373
.addProperties(streamLoadProp);

flink-doris-connector/flink-doris-connector-base/src/main/java/org/apache/doris/flink/sink/writer/LoadConstants.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ public class LoadConstants {
3737
public static final String ARROW = "arrow";
3838
public static final String NULL_VALUE = "\\N";
3939
public static final String DORIS_DELETE_SIGN = "__DORIS_DELETE_SIGN__";
40+
public static final String UNIQUE_KEY_UPDATE_MODE = "unique_key_update_mode";
41+
public static final String UPDATE_FLEXIBLE_COLUMNS = "UPDATE_FLEXIBLE_COLUMNS";
4042
public static final String READ_JSON_BY_LINE = "read_json_by_line";
4143
public static final String GROUP_COMMIT = "group_commit";
4244
public static final String GROUP_COMMIT_OFF_MODE = "off_mode";

flink-doris-connector/flink-doris-connector-flink1/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@
4949
import static org.apache.doris.flink.sink.writer.LoadConstants.FIELD_DELIMITER_DEFAULT;
5050
import static org.apache.doris.flink.sink.writer.LoadConstants.FIELD_DELIMITER_KEY;
5151
import static org.apache.doris.flink.sink.writer.LoadConstants.FORMAT_KEY;
52+
import static org.apache.doris.flink.sink.writer.LoadConstants.UNIQUE_KEY_UPDATE_MODE;
53+
import static org.apache.doris.flink.sink.writer.LoadConstants.UPDATE_FLEXIBLE_COLUMNS;
5254

5355
/** DorisDynamicTableSink. */
5456
public class DorisDynamicTableSink implements DynamicTableSink, SupportsOverwrite {
@@ -85,10 +87,14 @@ public ChangelogMode getChangelogMode(ChangelogMode changelogMode) {
8587
@Override
8688
public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
8789
Properties loadProperties = executionOptions.getStreamLoadProp();
90+
// Flexible column update mode is incompatible with both 'columns' and 'hidden_columns'
91+
boolean isFlexibleColumn =
92+
UPDATE_FLEXIBLE_COLUMNS.equalsIgnoreCase(
93+
loadProperties.getProperty(UNIQUE_KEY_UPDATE_MODE));
8894
boolean deletable =
8995
executionOptions.getDeletable()
9096
&& RestService.isUniqueKeyType(options, readOptions, LOG);
91-
if (!loadProperties.containsKey(COLUMNS_KEY)) {
97+
if (!loadProperties.containsKey(COLUMNS_KEY) && !isFlexibleColumn) {
9298
String[] fieldNames = tableSchema.getFieldNames();
9399
Preconditions.checkState(fieldNames != null && fieldNames.length > 0);
94100
String columns =

flink-doris-connector/flink-doris-connector-flink2/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@
4949
import static org.apache.doris.flink.sink.writer.LoadConstants.FIELD_DELIMITER_DEFAULT;
5050
import static org.apache.doris.flink.sink.writer.LoadConstants.FIELD_DELIMITER_KEY;
5151
import static org.apache.doris.flink.sink.writer.LoadConstants.FORMAT_KEY;
52+
import static org.apache.doris.flink.sink.writer.LoadConstants.UNIQUE_KEY_UPDATE_MODE;
53+
import static org.apache.doris.flink.sink.writer.LoadConstants.UPDATE_FLEXIBLE_COLUMNS;
5254

5355
/** DorisDynamicTableSink. */
5456
public class DorisDynamicTableSink implements DynamicTableSink, SupportsOverwrite {
@@ -85,10 +87,14 @@ public ChangelogMode getChangelogMode(ChangelogMode changelogMode) {
8587
@Override
8688
public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
8789
Properties loadProperties = executionOptions.getStreamLoadProp();
90+
// Flexible column update mode is incompatible with both 'columns' and 'hidden_columns'
91+
boolean isFlexibleColumn =
92+
UPDATE_FLEXIBLE_COLUMNS.equalsIgnoreCase(
93+
loadProperties.getProperty(UNIQUE_KEY_UPDATE_MODE));
8894
boolean deletable =
8995
executionOptions.getDeletable()
9096
&& RestService.isUniqueKeyType(options, readOptions, LOG);
91-
if (!loadProperties.containsKey(COLUMNS_KEY)) {
97+
if (!loadProperties.containsKey(COLUMNS_KEY) && !isFlexibleColumn) {
9298
String[] fieldNames = tableSchema.getFieldNames();
9399
Preconditions.checkState(fieldNames != null && fieldNames.length > 0);
94100
String columns =

flink-doris-connector/flink-doris-connector-it/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,15 +20,21 @@
2020
import org.apache.flink.api.common.JobID;
2121
import org.apache.flink.api.common.RuntimeExecutionMode;
2222
import org.apache.flink.api.common.time.Deadline;
23+
import org.apache.flink.api.common.typeinfo.Types;
2324
import org.apache.flink.configuration.Configuration;
2425
import org.apache.flink.configuration.RestartStrategyOptions;
2526
import org.apache.flink.core.execution.JobClient;
2627
import org.apache.flink.runtime.minicluster.RpcServiceSharing;
2728
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
29+
import org.apache.flink.streaming.api.datastream.DataStream;
2830
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
31+
import org.apache.flink.table.api.DataTypes;
32+
import org.apache.flink.table.api.Schema;
2933
import org.apache.flink.table.api.TableResult;
3034
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
3135
import org.apache.flink.test.util.MiniClusterWithClientResource;
36+
import org.apache.flink.types.Row;
37+
import org.apache.flink.types.RowKind;
3238
import org.apache.flink.util.StringUtils;
3339

3440
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -44,6 +50,7 @@
4450
import org.apache.doris.flink.table.DorisConfigOptions;
4551
import org.apache.doris.flink.utils.MockSource;
4652
import org.junit.Assert;
53+
import org.junit.Ignore;
4754
import org.junit.Rule;
4855
import org.junit.Test;
4956
import org.junit.runner.RunWith;
@@ -80,6 +87,7 @@ public class DorisSinkITCase extends AbstractITCaseService {
8087
static final String TABLE_CSV_JM = "tbl_csv_jm";
8188
static final String TABLE_CSV_TM = "tbl_csv_tm";
8289
static final String TABLE_UNICODE_COLUMN = "tbl_unicode_column";
90+
static final String TABLE_FLEXIBLE_COLUMN = "tbl_flexible_column";
8391

8492
private final boolean batchMode;
8593

@@ -819,4 +827,112 @@ public void testSinkUnicodeColumn() throws Exception {
819827
"select `名称`,`年龄` from %s.%s order by 1", DATABASE, TABLE_UNICODE_COLUMN);
820828
ContainerUtils.checkResult(getDorisQueryConnection(), LOG, expected, query, 2);
821829
}
830+
831+
// todo: only test for doris3.1+
832+
@Ignore
833+
@Test
834+
public void testFlexibleColumnUpdate() throws Exception {
835+
initializeFlexibleColumnTable(TABLE_FLEXIBLE_COLUMN);
836+
837+
// pre-insert full rows with all columns into Doris
838+
ContainerUtils.executeSQLStatement(
839+
getDorisQueryConnection(),
840+
LOG,
841+
String.format(
842+
"INSERT INTO %s.%s VALUES(1,'doris',18,100),(2,'flink',20,200)",
843+
DATABASE, TABLE_FLEXIBLE_COLUMN));
844+
845+
String query =
846+
String.format(
847+
"select id,name,age,score from %s.%s order by 1",
848+
DATABASE, TABLE_FLEXIBLE_COLUMN);
849+
ContainerUtils.checkResult(
850+
getDorisQueryConnection(),
851+
LOG,
852+
Arrays.asList("1,doris,18,100", "2,flink,20,200"),
853+
query,
854+
4);
855+
856+
// Build a changelog source:
857+
// INSERT (1, 25) -> update age of id=1 to 25, name/score remain unchanged
858+
// DELETE (2, 20) -> delete the row with id=2
859+
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
860+
env.setParallelism(DEFAULT_PARALLELISM);
861+
final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
862+
863+
DataStream<Row> changelogStream =
864+
env.fromCollection(
865+
Arrays.asList(
866+
Row.ofKind(RowKind.INSERT, 1, 25),
867+
Row.ofKind(RowKind.DELETE, 2, 20)),
868+
Types.ROW_NAMED(new String[] {"id", "age"}, Types.INT, Types.INT));
869+
870+
Schema sourceSchema =
871+
Schema.newBuilder()
872+
.column("id", DataTypes.INT().notNull())
873+
.column("age", DataTypes.INT())
874+
.primaryKey("id")
875+
.build();
876+
tEnv.createTemporaryView(
877+
"source_view", tEnv.fromChangelogStream(changelogStream, sourceSchema));
878+
879+
// Doris sink declares only 'id' and 'age'; connector will NOT auto-build columns header
880+
// and NOT add hidden_columns header because unique_key_update_mode=UPDATE_FLEXIBLE_COLUMNS
881+
String sinkDDL =
882+
String.format(
883+
"CREATE TABLE doris_flex_sink ("
884+
+ " id INT,"
885+
+ " age INT"
886+
+ ") WITH ("
887+
+ " 'connector' = '"
888+
+ DorisConfigOptions.IDENTIFIER
889+
+ "',"
890+
+ " 'fenodes' = '%s',"
891+
+ " 'table.identifier' = '%s',"
892+
+ " 'username' = '%s',"
893+
+ " 'password' = '%s',"
894+
+ " 'sink.enable.batch-mode' = '%s',"
895+
+ " 'sink.label-prefix' = '"
896+
+ UUID.randomUUID()
897+
+ "',"
898+
+ " 'sink.properties.format' = 'json',"
899+
+ " 'sink.properties.read_json_by_line' = 'true',"
900+
+ " 'sink.properties.unique_key_update_mode' = 'UPDATE_FLEXIBLE_COLUMNS'"
901+
+ ")",
902+
getFenodes(),
903+
DATABASE + "." + TABLE_FLEXIBLE_COLUMN,
904+
getDorisUsername(),
905+
getDorisPassword(),
906+
batchMode);
907+
tEnv.executeSql(sinkDDL);
908+
tEnv.executeSql("INSERT INTO doris_flex_sink SELECT id, age FROM source_view");
909+
910+
Thread.sleep(10000);
911+
// id=1: age updated to 25, name/score remain unchanged
912+
// id=2: deleted
913+
ContainerUtils.checkResult(
914+
getDorisQueryConnection(), LOG, Arrays.asList("1,doris,25,100"), query, 4);
915+
}
916+
917+
private void initializeFlexibleColumnTable(String table) {
918+
ContainerUtils.executeSQLStatement(
919+
getDorisQueryConnection(),
920+
LOG,
921+
String.format("CREATE DATABASE IF NOT EXISTS %s", DATABASE),
922+
String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, table),
923+
String.format(
924+
"CREATE TABLE %s.%s (\n"
925+
+ " `id` int,\n"
926+
+ " `name` varchar(256),\n"
927+
+ " `age` int,\n"
928+
+ " `score` int\n"
929+
+ ") UNIQUE KEY(`id`)\n"
930+
+ "DISTRIBUTED BY HASH(`id`) BUCKETS 1\n"
931+
+ "PROPERTIES (\n"
932+
+ " \"replication_num\" = \"1\",\n"
933+
+ " \"enable_unique_key_merge_on_write\" = \"true\",\n"
934+
+ " \"enable_unique_key_skip_bitmap_column\" = \"true\"\n"
935+
+ ")",
936+
DATABASE, table));
937+
}
822938
}

0 commit comments

Comments
 (0)