Skip to content

Commit d88b74a

Browse files
JNSimbaclaude
andcommitted
[Feature] enable gz compression by default for StreamLoad
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent f569e71 commit d88b74a

2 files changed

Lines changed: 103 additions & 0 deletions

File tree

flink-doris-connector/flink-doris-connector-base/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
import java.util.Objects;
2626
import java.util.Properties;
2727

28+
import static org.apache.doris.flink.sink.writer.LoadConstants.COMPRESS_TYPE;
29+
import static org.apache.doris.flink.sink.writer.LoadConstants.COMPRESS_TYPE_GZ;
2830
import static org.apache.doris.flink.sink.writer.LoadConstants.FORMAT_KEY;
2931
import static org.apache.doris.flink.sink.writer.LoadConstants.JSON;
3032
import static org.apache.doris.flink.sink.writer.LoadConstants.READ_JSON_BY_LINE;
@@ -531,6 +533,11 @@ public DorisExecutionOptions build() {
531533
streamLoadProp.put(READ_JSON_BY_LINE, true);
532534
}
533535

536+
// Enable gz compression by default
537+
if (streamLoadProp != null && !streamLoadProp.containsKey(COMPRESS_TYPE)) {
538+
streamLoadProp.put(COMPRESS_TYPE, COMPRESS_TYPE_GZ);
539+
}
540+
534541
Preconditions.checkArgument(
535542
bufferFlushIntervalMs >= 1000,
536543
"bufferFlushIntervalMs must be greater than or equal to 1 second");

flink-doris-connector/flink-doris-connector-it/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,8 @@ public class DorisSinkITCase extends AbstractITCaseService {
7777
static final String TABLE_GROUP_COMMIT = "tbl_group_commit";
7878
static final String TABLE_OVERWRITE = "tbl_overwrite";
7979
static final String TABLE_GZ_FORMAT = "tbl_gz_format";
80+
static final String TABLE_GZ_FORMAT_DEFAULT = "tbl_gz_format_default";
81+
static final String TABLE_NO_COMPRESS = "tbl_no_compress";
8082
static final String TABLE_CSV_JM = "tbl_csv_jm";
8183
static final String TABLE_CSV_TM = "tbl_csv_tm";
8284
static final String TABLE_UNICODE_COLUMN = "tbl_unicode_column";
@@ -481,6 +483,100 @@ public void testTableGzFormat() throws Exception {
481483
ContainerUtils.checkResult(getDorisQueryConnection(), LOG, expected, query, 2);
482484
}
483485

486+
@Test
487+
public void testTableDefaultGzFormat() throws Exception {
488+
initializeTable(TABLE_GZ_FORMAT_DEFAULT, DataModel.UNIQUE);
489+
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
490+
env.setParallelism(DEFAULT_PARALLELISM);
491+
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
492+
final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
493+
494+
String sinkDDL =
495+
String.format(
496+
"CREATE TABLE doris_gz_default_sink ("
497+
+ " name STRING,"
498+
+ " age INT"
499+
+ ") WITH ("
500+
+ " 'connector' = '"
501+
+ DorisConfigOptions.IDENTIFIER
502+
+ "',"
503+
+ " 'fenodes' = '%s',"
504+
+ " 'table.identifier' = '%s',"
505+
+ " 'username' = '%s',"
506+
+ " 'password' = '%s',"
507+
+ " 'sink.enable.batch-mode' = '%s',"
508+
+ " 'sink.enable-delete' = 'false',"
509+
+ " 'sink.label-prefix' = '"
510+
+ UUID.randomUUID()
511+
+ "',"
512+
+ " 'sink.properties.column_separator' = '\\x01',"
513+
+ " 'sink.properties.line_delimiter' = '\\x02'"
514+
+ ")",
515+
getFenodes(),
516+
DATABASE + "." + TABLE_GZ_FORMAT_DEFAULT,
517+
getDorisUsername(),
518+
getDorisPassword(),
519+
batchMode);
520+
tEnv.executeSql(sinkDDL);
521+
tEnv.executeSql(
522+
"INSERT INTO doris_gz_default_sink SELECT 'doris',1 union all SELECT 'flink',2");
523+
524+
Thread.sleep(25000);
525+
List<String> expected = Arrays.asList("doris,1", "flink,2");
526+
String query =
527+
String.format(
528+
"select name,age from %s.%s order by 1",
529+
DATABASE, TABLE_GZ_FORMAT_DEFAULT);
530+
ContainerUtils.checkResult(getDorisQueryConnection(), LOG, expected, query, 2);
531+
}
532+
533+
@Test
534+
public void testTableNoCompressFormat() throws Exception {
535+
initializeTable(TABLE_NO_COMPRESS, DataModel.UNIQUE);
536+
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
537+
env.setParallelism(DEFAULT_PARALLELISM);
538+
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
539+
final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
540+
541+
String sinkDDL =
542+
String.format(
543+
"CREATE TABLE doris_no_compress_sink ("
544+
+ " name STRING,"
545+
+ " age INT"
546+
+ ") WITH ("
547+
+ " 'connector' = '"
548+
+ DorisConfigOptions.IDENTIFIER
549+
+ "',"
550+
+ " 'fenodes' = '%s',"
551+
+ " 'table.identifier' = '%s',"
552+
+ " 'username' = '%s',"
553+
+ " 'password' = '%s',"
554+
+ " 'sink.enable.batch-mode' = '%s',"
555+
+ " 'sink.enable-delete' = 'false',"
556+
+ " 'sink.label-prefix' = '"
557+
+ UUID.randomUUID()
558+
+ "',"
559+
+ " 'sink.properties.column_separator' = '\\x01',"
560+
+ " 'sink.properties.line_delimiter' = '\\x02',"
561+
+ " 'sink.properties.compress_type' = ''"
562+
+ ")",
563+
getFenodes(),
564+
DATABASE + "." + TABLE_NO_COMPRESS,
565+
getDorisUsername(),
566+
getDorisPassword(),
567+
batchMode);
568+
tEnv.executeSql(sinkDDL);
569+
tEnv.executeSql(
570+
"INSERT INTO doris_no_compress_sink SELECT 'doris',1 union all SELECT 'flink',2");
571+
572+
Thread.sleep(25000);
573+
List<String> expected = Arrays.asList("doris,1", "flink,2");
574+
String query =
575+
String.format(
576+
"select name,age from %s.%s order by 1", DATABASE, TABLE_NO_COMPRESS);
577+
ContainerUtils.checkResult(getDorisQueryConnection(), LOG, expected, query, 2);
578+
}
579+
484580
@Test
485581
public void testJobManagerFailoverSink() throws Exception {
486582
LOG.info("start to test JobManagerFailoverSink.");

0 commit comments

Comments
 (0)