Skip to content

Commit 69dbcb3

Browse files
author
guoxuanlin
committed
[pipeline][starrocks] support sink.buffer-flush.max-rows
1 parent 97e484c commit 69dbcb3

2 files changed

Lines changed: 11 additions & 0 deletions

File tree

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksDataSinkFactory.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,10 @@ private StarRocksSinkOptions buildSinkConnectorOptions(Configuration cdcConfig)
9797
.getOptional(StarRocksDataSinkOptions.SINK_BATCH_MAX_SIZE)
9898
.ifPresent(
9999
config -> sinkConfig.set(StarRocksSinkOptions.SINK_BATCH_MAX_SIZE, config));
100+
cdcConfig
101+
.getOptional(StarRocksDataSinkOptions.SINK_BATCH_MAX_ROWS)
102+
.ifPresent(
103+
config -> sinkConfig.set(StarRocksSinkOptions.SINK_BATCH_MAX_ROWS, config));
100104
cdcConfig
101105
.getOptional(StarRocksDataSinkOptions.SINK_BATCH_FLUSH_INTERVAL)
102106
.ifPresent(
@@ -175,6 +179,7 @@ public Set<ConfigOption<?>> optionalOptions() {
175179
optionalOptions.add(StarRocksDataSinkOptions.SINK_SOCKET_TIMEOUT);
176180
optionalOptions.add(StarRocksDataSinkOptions.SINK_WAIT_FOR_CONTINUE_TIMEOUT);
177181
optionalOptions.add(StarRocksDataSinkOptions.SINK_BATCH_MAX_SIZE);
182+
optionalOptions.add(StarRocksDataSinkOptions.SINK_BATCH_MAX_ROWS);
178183
optionalOptions.add(StarRocksDataSinkOptions.SINK_BATCH_FLUSH_INTERVAL);
179184
optionalOptions.add(StarRocksDataSinkOptions.SINK_SCAN_FREQUENCY);
180185
optionalOptions.add(StarRocksDataSinkOptions.SINK_IO_THREAD_COUNT);

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksDataSinkOptions.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,12 @@ public class StarRocksDataSinkOptions {
9494
.defaultValue(150L * 1024 * 1024)
9595
.withDescription("Max data bytes of the flush.");
9696

97+
public static final ConfigOption<Long> SINK_BATCH_MAX_ROWS =
98+
ConfigOptions.key("sink.buffer-flush.max-rows")
99+
.longType()
100+
.defaultValue(500000L)
101+
.withDescription("Max row count of the flush.");
102+
97103
public static final ConfigOption<Long> SINK_BATCH_FLUSH_INTERVAL =
98104
ConfigOptions.key("sink.buffer-flush.interval-ms")
99105
.longType()

0 commit comments

Comments
 (0)