Skip to content

Commit 06edd0c

Browse files
authored
[Feature] enable gz compression by default for StreamLoad (#648)
Enable gz compression by default for StreamLoad writes to reduce network transfer and improve write performance
1 parent f569e71 commit 06edd0c

5 files changed

Lines changed: 112 additions & 3 deletions

File tree

.github/workflows/checkstyle.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ jobs:
4040

4141
- name: Run java checkstyle
4242
run:
43-
cd flink-doris-connector && mvn clean checkstyle:check -Pflink1 -pl flink-doris-connector-flink1 -am
43+
cd flink-doris-connector && mvn clean install checkstyle:check -DskipTests -Pflink1 -pl flink-doris-connector-flink1 -am
4444

4545
- name: Setup java 17
4646
uses: actions/setup-java@v2
@@ -50,4 +50,4 @@ jobs:
5050

5151
- name: Run java checkstyle
5252
run:
53-
cd flink-doris-connector && mvn clean checkstyle:check -Pflink2 -pl flink-doris-connector-flink2 -am
53+
cd flink-doris-connector && mvn clean install checkstyle:check -DskipTests -Pflink2 -pl flink-doris-connector-flink2 -am

flink-doris-connector/flink-doris-connector-base/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
import java.util.Objects;
2626
import java.util.Properties;
2727

28+
import static org.apache.doris.flink.sink.writer.LoadConstants.COMPRESS_TYPE;
29+
import static org.apache.doris.flink.sink.writer.LoadConstants.COMPRESS_TYPE_GZ;
2830
import static org.apache.doris.flink.sink.writer.LoadConstants.FORMAT_KEY;
2931
import static org.apache.doris.flink.sink.writer.LoadConstants.JSON;
3032
import static org.apache.doris.flink.sink.writer.LoadConstants.READ_JSON_BY_LINE;
@@ -531,6 +533,11 @@ public DorisExecutionOptions build() {
531533
streamLoadProp.put(READ_JSON_BY_LINE, true);
532534
}
533535

536+
// Enable gz compression by default
537+
if (streamLoadProp != null && !streamLoadProp.containsKey(COMPRESS_TYPE)) {
538+
streamLoadProp.put(COMPRESS_TYPE, COMPRESS_TYPE_GZ);
539+
}
540+
534541
Preconditions.checkArgument(
535542
bufferFlushIntervalMs >= 1000,
536543
"bufferFlushIntervalMs must be greater than or equal to 1 second");

flink-doris-connector/flink-doris-connector-base/src/test/java/org/apache/doris/flink/cfg/DorisExecutionOptionsTest.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,10 +36,12 @@ public void testBuild() {
3636
Properties expected = new Properties();
3737
expected.put("format", "json");
3838
expected.put("read_json_by_line", true);
39-
Assert.assertTrue(actual.size() == 2);
39+
expected.put("compress_type", "gz");
40+
Assert.assertTrue(actual.size() == 3);
4041
Assert.assertTrue(actual.get("format").equals(expected.get("format")));
4142
Assert.assertTrue(
4243
actual.get("read_json_by_line").equals(expected.get("read_json_by_line")));
44+
Assert.assertTrue(actual.get("compress_type").equals(expected.get("compress_type")));
4345
}
4446

4547
@Test

flink-doris-connector/flink-doris-connector-flink1/src/test/java/org/apache/doris/flink/container/AbstractE2EService.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,9 @@ protected void setSinkConfDefaultConfig(List<String> argList) {
142142
argList.add(JDBC_URL + "=" + getDorisQueryUrl());
143143
argList.add(SINK_CONF);
144144
argList.add(SINK_LABEL_PREFIX + "=" + "label");
145+
// disable gz compression for json format, doris 2.1 does not support json compression
146+
argList.add(SINK_CONF);
147+
argList.add("sink.properties.compress_type=");
145148
}
146149

147150
public static void closeE2EContainers() {

flink-doris-connector/flink-doris-connector-it/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,8 @@ public class DorisSinkITCase extends AbstractITCaseService {
7777
static final String TABLE_GROUP_COMMIT = "tbl_group_commit";
7878
static final String TABLE_OVERWRITE = "tbl_overwrite";
7979
static final String TABLE_GZ_FORMAT = "tbl_gz_format";
80+
static final String TABLE_GZ_FORMAT_DEFAULT = "tbl_gz_format_default";
81+
static final String TABLE_NO_COMPRESS = "tbl_no_compress";
8082
static final String TABLE_CSV_JM = "tbl_csv_jm";
8183
static final String TABLE_CSV_TM = "tbl_csv_tm";
8284
static final String TABLE_UNICODE_COLUMN = "tbl_unicode_column";
@@ -137,6 +139,7 @@ public void testSinkJsonFormat() throws Exception {
137139
Properties properties = new Properties();
138140
properties.setProperty("read_json_by_line", "true");
139141
properties.setProperty("format", "json");
142+
properties.setProperty("compress_type", "");
140143

141144
// mock data
142145
Map<String, Object> row1 = new HashMap<>();
@@ -223,6 +226,7 @@ public void testTableSinkJsonFormat() throws Exception {
223226
+ " 'sink.ignore.update-before' = 'true',"
224227
+ " 'sink.properties.format' = 'json',"
225228
+ " 'sink.properties.read_json_by_line' = 'true',"
229+
+ " 'sink.properties.compress_type' = '',"
226230
+ " 'sink.label-prefix' = 'doris_sink"
227231
+ UUID.randomUUID()
228232
+ "'"
@@ -481,6 +485,98 @@ public void testTableGzFormat() throws Exception {
481485
ContainerUtils.checkResult(getDorisQueryConnection(), LOG, expected, query, 2);
482486
}
483487

488+
@Test
489+
public void testTableDefaultGzFormat() throws Exception {
490+
initializeTable(TABLE_GZ_FORMAT_DEFAULT, DataModel.UNIQUE);
491+
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
492+
env.setParallelism(DEFAULT_PARALLELISM);
493+
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
494+
final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
495+
496+
String sinkDDL =
497+
String.format(
498+
"CREATE TABLE doris_gz_default_sink ("
499+
+ " name STRING,"
500+
+ " age INT"
501+
+ ") WITH ("
502+
+ " 'connector' = '"
503+
+ DorisConfigOptions.IDENTIFIER
504+
+ "',"
505+
+ " 'fenodes' = '%s',"
506+
+ " 'table.identifier' = '%s',"
507+
+ " 'username' = '%s',"
508+
+ " 'password' = '%s',"
509+
+ " 'sink.enable.batch-mode' = '%s',"
510+
+ " 'sink.enable-delete' = 'false',"
511+
+ " 'sink.label-prefix' = '"
512+
+ UUID.randomUUID()
513+
+ "',"
514+
+ " 'sink.properties.column_separator' = '\\x01',"
515+
+ " 'sink.properties.line_delimiter' = '\\x02'"
516+
+ ")",
517+
getFenodes(),
518+
DATABASE + "." + TABLE_GZ_FORMAT_DEFAULT,
519+
getDorisUsername(),
520+
getDorisPassword(),
521+
batchMode);
522+
tEnv.executeSql(sinkDDL);
523+
tEnv.executeSql(
524+
"INSERT INTO doris_gz_default_sink SELECT 'doris',1 union all SELECT 'flink',2");
525+
526+
Thread.sleep(25000);
527+
List<String> expected = Arrays.asList("doris,1", "flink,2");
528+
String query =
529+
String.format(
530+
"select name,age from %s.%s order by 1", DATABASE, TABLE_GZ_FORMAT_DEFAULT);
531+
ContainerUtils.checkResult(getDorisQueryConnection(), LOG, expected, query, 2);
532+
}
533+
534+
@Test
535+
public void testTableNoCompressFormat() throws Exception {
536+
initializeTable(TABLE_NO_COMPRESS, DataModel.UNIQUE);
537+
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
538+
env.setParallelism(DEFAULT_PARALLELISM);
539+
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
540+
final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
541+
542+
String sinkDDL =
543+
String.format(
544+
"CREATE TABLE doris_no_compress_sink ("
545+
+ " name STRING,"
546+
+ " age INT"
547+
+ ") WITH ("
548+
+ " 'connector' = '"
549+
+ DorisConfigOptions.IDENTIFIER
550+
+ "',"
551+
+ " 'fenodes' = '%s',"
552+
+ " 'table.identifier' = '%s',"
553+
+ " 'username' = '%s',"
554+
+ " 'password' = '%s',"
555+
+ " 'sink.enable.batch-mode' = '%s',"
556+
+ " 'sink.enable-delete' = 'false',"
557+
+ " 'sink.label-prefix' = '"
558+
+ UUID.randomUUID()
559+
+ "',"
560+
+ " 'sink.properties.column_separator' = '\\x01',"
561+
+ " 'sink.properties.line_delimiter' = '\\x02',"
562+
+ " 'sink.properties.compress_type' = ''"
563+
+ ")",
564+
getFenodes(),
565+
DATABASE + "." + TABLE_NO_COMPRESS,
566+
getDorisUsername(),
567+
getDorisPassword(),
568+
batchMode);
569+
tEnv.executeSql(sinkDDL);
570+
tEnv.executeSql(
571+
"INSERT INTO doris_no_compress_sink SELECT 'doris',1 union all SELECT 'flink',2");
572+
573+
Thread.sleep(25000);
574+
List<String> expected = Arrays.asList("doris,1", "flink,2");
575+
String query =
576+
String.format("select name,age from %s.%s order by 1", DATABASE, TABLE_NO_COMPRESS);
577+
ContainerUtils.checkResult(getDorisQueryConnection(), LOG, expected, query, 2);
578+
}
579+
484580
@Test
485581
public void testJobManagerFailoverSink() throws Exception {
486582
LOG.info("start to test JobManagerFailoverSink.");
@@ -780,6 +876,7 @@ public void testSinkUnicodeColumn() throws Exception {
780876
Properties properties = new Properties();
781877
properties.setProperty("read_json_by_line", "true");
782878
properties.setProperty("format", "json");
879+
properties.setProperty("compress_type", "");
783880

784881
// mock data
785882
Map<String, Object> row1 = new HashMap<>();

0 commit comments

Comments
 (0)