Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@
import static org.apache.doris.flink.sink.writer.LoadConstants.GROUP_COMMIT_OFF_MODE;
import static org.apache.doris.flink.sink.writer.LoadConstants.LINE_DELIMITER_DEFAULT;
import static org.apache.doris.flink.sink.writer.LoadConstants.LINE_DELIMITER_KEY;
import static org.apache.doris.flink.sink.writer.LoadConstants.UNIQUE_KEY_UPDATE_MODE;
import static org.apache.doris.flink.sink.writer.LoadConstants.UPDATE_FLEXIBLE_COLUMNS;

/** async stream load. */
public class DorisBatchStreamLoad implements Serializable {
Expand Down Expand Up @@ -470,7 +472,12 @@ public void load(String label, BatchRecordBuffer buffer) throws IOException {
.setLabel(label)
.addCommonHeader()
.setEntity(entity)
.addHiddenColumns(executionOptions.getDeletable())
.addHiddenColumns(
executionOptions.getDeletable()
&& !UPDATE_FLEXIBLE_COLUMNS.equalsIgnoreCase(
executionOptions
.getStreamLoadProp()
.getProperty(UNIQUE_KEY_UPDATE_MODE)))
.addProperties(executionOptions.getStreamLoadProp());

if (enableGzCompress) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@
import static org.apache.doris.flink.sink.writer.LoadConstants.GROUP_COMMIT_OFF_MODE;
import static org.apache.doris.flink.sink.writer.LoadConstants.LINE_DELIMITER_DEFAULT;
import static org.apache.doris.flink.sink.writer.LoadConstants.LINE_DELIMITER_KEY;
import static org.apache.doris.flink.sink.writer.LoadConstants.UNIQUE_KEY_UPDATE_MODE;
import static org.apache.doris.flink.sink.writer.LoadConstants.UPDATE_FLEXIBLE_COLUMNS;

/** load data to doris. */
public class DorisStreamLoad implements Serializable {
Expand Down Expand Up @@ -362,7 +364,10 @@ public void startLoad(String label, boolean isResume) throws IOException {
.setUrl(loadUrlStr)
.baseAuth(user, passwd)
.addCommonHeader()
.addHiddenColumns(enableDelete)
.addHiddenColumns(
enableDelete
&& !UPDATE_FLEXIBLE_COLUMNS.equalsIgnoreCase(
streamLoadProp.getProperty(UNIQUE_KEY_UPDATE_MODE)))
.setLabel(label)
.setEntity(entity)
.addProperties(streamLoadProp);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ public class LoadConstants {
public static final String ARROW = "arrow";
public static final String NULL_VALUE = "\\N";
public static final String DORIS_DELETE_SIGN = "__DORIS_DELETE_SIGN__";
public static final String UNIQUE_KEY_UPDATE_MODE = "unique_key_update_mode";
public static final String UPDATE_FLEXIBLE_COLUMNS = "UPDATE_FLEXIBLE_COLUMNS";
public static final String READ_JSON_BY_LINE = "read_json_by_line";
public static final String GROUP_COMMIT = "group_commit";
public static final String GROUP_COMMIT_OFF_MODE = "off_mode";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@
import static org.apache.doris.flink.sink.writer.LoadConstants.FIELD_DELIMITER_DEFAULT;
import static org.apache.doris.flink.sink.writer.LoadConstants.FIELD_DELIMITER_KEY;
import static org.apache.doris.flink.sink.writer.LoadConstants.FORMAT_KEY;
import static org.apache.doris.flink.sink.writer.LoadConstants.UNIQUE_KEY_UPDATE_MODE;
import static org.apache.doris.flink.sink.writer.LoadConstants.UPDATE_FLEXIBLE_COLUMNS;

/** DorisDynamicTableSink. */
public class DorisDynamicTableSink implements DynamicTableSink, SupportsOverwrite {
Expand Down Expand Up @@ -85,10 +87,14 @@ public ChangelogMode getChangelogMode(ChangelogMode changelogMode) {
@Override
public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
Properties loadProperties = executionOptions.getStreamLoadProp();
// Flexible column update mode is incompatible with both 'columns' and 'hidden_columns'
boolean isFlexibleColumn =
UPDATE_FLEXIBLE_COLUMNS.equalsIgnoreCase(
loadProperties.getProperty(UNIQUE_KEY_UPDATE_MODE));
boolean deletable =
executionOptions.getDeletable()
&& RestService.isUniqueKeyType(options, readOptions, LOG);
if (!loadProperties.containsKey(COLUMNS_KEY)) {
if (!loadProperties.containsKey(COLUMNS_KEY) && !isFlexibleColumn) {
String[] fieldNames = tableSchema.getFieldNames();
Preconditions.checkState(fieldNames != null && fieldNames.length > 0);
String columns =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@
import static org.apache.doris.flink.sink.writer.LoadConstants.FIELD_DELIMITER_DEFAULT;
import static org.apache.doris.flink.sink.writer.LoadConstants.FIELD_DELIMITER_KEY;
import static org.apache.doris.flink.sink.writer.LoadConstants.FORMAT_KEY;
import static org.apache.doris.flink.sink.writer.LoadConstants.UNIQUE_KEY_UPDATE_MODE;
import static org.apache.doris.flink.sink.writer.LoadConstants.UPDATE_FLEXIBLE_COLUMNS;

/** DorisDynamicTableSink. */
public class DorisDynamicTableSink implements DynamicTableSink, SupportsOverwrite {
Expand Down Expand Up @@ -85,10 +87,14 @@ public ChangelogMode getChangelogMode(ChangelogMode changelogMode) {
@Override
public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
Properties loadProperties = executionOptions.getStreamLoadProp();
// Flexible column update mode is incompatible with both 'columns' and 'hidden_columns'
boolean isFlexibleColumn =
UPDATE_FLEXIBLE_COLUMNS.equalsIgnoreCase(
loadProperties.getProperty(UNIQUE_KEY_UPDATE_MODE));
boolean deletable =
executionOptions.getDeletable()
&& RestService.isUniqueKeyType(options, readOptions, LOG);
if (!loadProperties.containsKey(COLUMNS_KEY)) {
if (!loadProperties.containsKey(COLUMNS_KEY) && !isFlexibleColumn) {
String[] fieldNames = tableSchema.getFieldNames();
Preconditions.checkState(fieldNames != null && fieldNames.length > 0);
String columns =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,21 @@
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestartStrategyOptions;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.runtime.minicluster.RpcServiceSharing;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.StringUtils;

import com.fasterxml.jackson.databind.ObjectMapper;
Expand All @@ -44,6 +50,7 @@
import org.apache.doris.flink.table.DorisConfigOptions;
import org.apache.doris.flink.utils.MockSource;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand Down Expand Up @@ -80,6 +87,7 @@ public class DorisSinkITCase extends AbstractITCaseService {
static final String TABLE_CSV_JM = "tbl_csv_jm";
static final String TABLE_CSV_TM = "tbl_csv_tm";
static final String TABLE_UNICODE_COLUMN = "tbl_unicode_column";
static final String TABLE_FLEXIBLE_COLUMN = "tbl_flexible_column";

private final boolean batchMode;

Expand Down Expand Up @@ -819,4 +827,112 @@ public void testSinkUnicodeColumn() throws Exception {
"select `名称`,`年龄` from %s.%s order by 1", DATABASE, TABLE_UNICODE_COLUMN);
ContainerUtils.checkResult(getDorisQueryConnection(), LOG, expected, query, 2);
}

// todo: only test for doris3.1+
@Ignore
@Test
public void testFlexibleColumnUpdate() throws Exception {
initializeFlexibleColumnTable(TABLE_FLEXIBLE_COLUMN);

// pre-insert full rows with all columns into Doris
ContainerUtils.executeSQLStatement(
getDorisQueryConnection(),
LOG,
String.format(
"INSERT INTO %s.%s VALUES(1,'doris',18,100),(2,'flink',20,200)",
DATABASE, TABLE_FLEXIBLE_COLUMN));

String query =
String.format(
"select id,name,age,score from %s.%s order by 1",
DATABASE, TABLE_FLEXIBLE_COLUMN);
ContainerUtils.checkResult(
getDorisQueryConnection(),
LOG,
Arrays.asList("1,doris,18,100", "2,flink,20,200"),
query,
4);

// Build a changelog source:
// INSERT (1, 25) -> update age of id=1 to 25, name/score remain unchanged
// DELETE (2, 20) -> delete the row with id=2
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(DEFAULT_PARALLELISM);
final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

DataStream<Row> changelogStream =
env.fromCollection(
Arrays.asList(
Row.ofKind(RowKind.INSERT, 1, 25),
Row.ofKind(RowKind.DELETE, 2, 20)),
Types.ROW_NAMED(new String[] {"id", "age"}, Types.INT, Types.INT));

Schema sourceSchema =
Schema.newBuilder()
.column("id", DataTypes.INT().notNull())
.column("age", DataTypes.INT())
.primaryKey("id")
.build();
tEnv.createTemporaryView(
"source_view", tEnv.fromChangelogStream(changelogStream, sourceSchema));

// Doris sink declares only 'id' and 'age'; connector will NOT auto-build columns header
// and NOT add hidden_columns header because unique_key_update_mode=UPDATE_FLEXIBLE_COLUMNS
String sinkDDL =
String.format(
"CREATE TABLE doris_flex_sink ("
+ " id INT,"
+ " age INT"
+ ") WITH ("
+ " 'connector' = '"
+ DorisConfigOptions.IDENTIFIER
+ "',"
+ " 'fenodes' = '%s',"
+ " 'table.identifier' = '%s',"
+ " 'username' = '%s',"
+ " 'password' = '%s',"
+ " 'sink.enable.batch-mode' = '%s',"
+ " 'sink.label-prefix' = '"
+ UUID.randomUUID()
+ "',"
+ " 'sink.properties.format' = 'json',"
+ " 'sink.properties.read_json_by_line' = 'true',"
+ " 'sink.properties.unique_key_update_mode' = 'UPDATE_FLEXIBLE_COLUMNS'"
+ ")",
getFenodes(),
DATABASE + "." + TABLE_FLEXIBLE_COLUMN,
getDorisUsername(),
getDorisPassword(),
batchMode);
tEnv.executeSql(sinkDDL);
tEnv.executeSql("INSERT INTO doris_flex_sink SELECT id, age FROM source_view");

Thread.sleep(10000);
// id=1: age updated to 25, name/score remain unchanged
// id=2: deleted
ContainerUtils.checkResult(
getDorisQueryConnection(), LOG, Arrays.asList("1,doris,25,100"), query, 4);
}

private void initializeFlexibleColumnTable(String table) {
ContainerUtils.executeSQLStatement(
getDorisQueryConnection(),
LOG,
String.format("CREATE DATABASE IF NOT EXISTS %s", DATABASE),
String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, table),
String.format(
"CREATE TABLE %s.%s (\n"
+ " `id` int,\n"
+ " `name` varchar(256),\n"
+ " `age` int,\n"
+ " `score` int\n"
+ ") UNIQUE KEY(`id`)\n"
+ "DISTRIBUTED BY HASH(`id`) BUCKETS 1\n"
+ "PROPERTIES (\n"
+ " \"replication_num\" = \"1\",\n"
+ " \"enable_unique_key_merge_on_write\" = \"true\",\n"
+ " \"enable_unique_key_skip_bitmap_column\" = \"true\"\n"
+ ")",
DATABASE, table));
}
}
Loading