From fc26b4820ea81c0df4ed7001fcc822504ad5b7ea Mon Sep 17 00:00:00 2001 From: wudi <676366545@qq.com> Date: Thu, 8 May 2025 14:55:59 +0800 Subject: [PATCH] pick thrift message size --- .../doris/flink/backend/BackendClient.java | 14 ++++++++--- .../doris/flink/cfg/ConfigurationOptions.java | 3 +++ .../doris/flink/cfg/DorisReadOptions.java | 25 ++++++++++++++++--- .../doris/flink/table/DorisConfigOptions.java | 8 ++++++ .../flink/table/DorisDynamicTableFactory.java | 4 +++ .../doris/flink/source/DorisSourceITCase.java | 1 + .../table/DorisDynamicTableFactoryTest.java | 5 +++- 7 files changed, 52 insertions(+), 8 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/backend/BackendClient.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/backend/BackendClient.java index 9881ec5f0..7834bc81f 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/backend/BackendClient.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/backend/BackendClient.java @@ -54,6 +54,7 @@ public class BackendClient { private final int retries; private final int socketTimeout; private final int connectTimeout; + private final int thriftMaxMessageSize; public BackendClient(Routing routing, DorisReadOptions readOptions) { this.routing = routing; @@ -69,11 +70,16 @@ public BackendClient(Routing routing, DorisReadOptions readOptions) { readOptions.getRequestRetries() == null ? ConfigurationOptions.DORIS_REQUEST_RETRIES_DEFAULT : readOptions.getRequestRetries(); + this.thriftMaxMessageSize = + readOptions.getRequestRetries() == null + ? ConfigurationOptions.DORIS_THRIFT_MAX_MESSAGE_SIZE_DEFAULT + : readOptions.getThriftMaxMessageSize(); logger.trace( - "connect timeout set to '{}'. socket timeout set to '{}'. retries set to '{}'.", + "connect timeout set to '{}'. socket timeout set to '{}'. retries set to '{}'. thrift MAX_MESSAGE_SIZE set to '{}'", this.connectTimeout, this.socketTimeout, - this.retries); + this.retries, + this.thriftMaxMessageSize); open(); } @@ -84,9 +90,11 @@ private void open() { logger.debug("Attempt {} to connect {}.", attempt, routing); try { TBinaryProtocol.Factory factory = new TBinaryProtocol.Factory(); + TConfiguration.Builder configBuilder = TConfiguration.custom(); + configBuilder.setMaxMessageSize(thriftMaxMessageSize); transport = new TSocket( - new TConfiguration(), + configBuilder.build(), routing.getHost(), routing.getPort(), socketTimeout, diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/ConfigurationOptions.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/ConfigurationOptions.java index 4a3f70b80..2981ae2f4 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/ConfigurationOptions.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/ConfigurationOptions.java @@ -51,4 +51,7 @@ public interface ConfigurationOptions { Boolean DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT = false; String DORIS_DESERIALIZE_QUEUE_SIZE = "doris.deserialize.queue.size"; Integer DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT = 64; + + String DORIS_THRIFT_MAX_MESSAGE_SIZE = "doris.thrift.max.message.size"; + Integer DORIS_THRIFT_MAX_MESSAGE_SIZE_DEFAULT = Integer.MAX_VALUE; } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java index 3669e740a..99d487f61 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java @@ -37,6 +37,7 @@ public class DorisReadOptions implements Serializable { private Integer deserializeQueueSize; private Boolean deserializeArrowAsync; private boolean useOldApi; + private Integer thriftMaxMessageSize; public DorisReadOptions( String readFields, @@ -50,7 +51,8 @@ public DorisReadOptions( Long execMemLimit, Integer deserializeQueueSize, Boolean deserializeArrowAsync, - boolean useOldApi) { + boolean useOldApi, + Integer thriftMaxMessageSize) { this.readFields = readFields; this.filterQuery = filterQuery; this.requestTabletSize = requestTabletSize; @@ -63,6 +65,7 @@ public DorisReadOptions( this.deserializeQueueSize = deserializeQueueSize; this.deserializeArrowAsync = deserializeArrowAsync; this.useOldApi = useOldApi; + this.thriftMaxMessageSize = thriftMaxMessageSize; } public String getReadFields() { @@ -113,6 +116,10 @@ public boolean getUseOldApi() { return useOldApi; } + public Integer getThriftMaxMessageSize() { + return thriftMaxMessageSize; + } + public void setReadFields(String readFields) { this.readFields = readFields; } @@ -149,7 +156,8 @@ public boolean equals(Object o) { && Objects.equals(requestBatchSize, that.requestBatchSize) && Objects.equals(execMemLimit, that.execMemLimit) && Objects.equals(deserializeQueueSize, that.deserializeQueueSize) - && Objects.equals(deserializeArrowAsync, that.deserializeArrowAsync); + && Objects.equals(deserializeArrowAsync, that.deserializeArrowAsync) + && Objects.equals(thriftMaxMessageSize, that.thriftMaxMessageSize); } @Override @@ -166,7 +174,8 @@ public int hashCode() { execMemLimit, deserializeQueueSize, deserializeArrowAsync, - useOldApi); + useOldApi, + thriftMaxMessageSize); } /** Builder of {@link DorisReadOptions}. */ @@ -184,6 +193,8 @@ public static class Builder { private Integer deserializeQueueSize; private Boolean deserializeArrowAsync; private Boolean useOldApi = false; + private Integer thriftMaxMessageSize = + ConfigurationOptions.DORIS_THRIFT_MAX_MESSAGE_SIZE_DEFAULT; public Builder setReadFields(String readFields) { this.readFields = readFields; @@ -245,6 +256,11 @@ public Builder setUseOldApi(boolean useOldApi) { return this; } + public Builder setThriftMaxMessageSize(Integer thriftMaxMessageSize) { + this.thriftMaxMessageSize = thriftMaxMessageSize; + return this; + } + public DorisReadOptions build() { return new DorisReadOptions( readFields, @@ -258,7 +274,8 @@ public DorisReadOptions build() { execMemLimit, deserializeQueueSize, deserializeArrowAsync, - useOldApi); + useOldApi, + thriftMaxMessageSize); } } } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java index 4b0b56c4e..b009b2868 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java @@ -38,6 +38,7 @@ import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT; import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_REQUEST_RETRIES_DEFAULT; import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_TABLET_SIZE_DEFAULT; +import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_THRIFT_MAX_MESSAGE_SIZE_DEFAULT; /** Options for the Doris connector. */ @PublicEvolving @@ -158,6 +159,13 @@ public class DorisConfigOptions { .withDescription( "Whether to read data using the new interface defined according to the FLIP-27 specification,default false"); + public static final ConfigOption DORIS_THRIFT_MAX_MESSAGE_SIZE = + ConfigOptions.key("doris.thrift.max.message.size") + .intType() + .defaultValue(DORIS_THRIFT_MAX_MESSAGE_SIZE_DEFAULT) + .withDescription( + "The maximum message size for thrift protocol. The default is Integer.MAX_VALUE."); + // Lookup options public static final ConfigOption LOOKUP_CACHE_MAX_ROWS = ConfigOptions.key("lookup.cache.max-rows") diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java index 5edefe6be..0bd71322e 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java @@ -51,6 +51,7 @@ import static org.apache.doris.flink.table.DorisConfigOptions.DORIS_REQUEST_READ_TIMEOUT_MS; import static org.apache.doris.flink.table.DorisConfigOptions.DORIS_REQUEST_RETRIES; import static org.apache.doris.flink.table.DorisConfigOptions.DORIS_TABLET_SIZE; +import static org.apache.doris.flink.table.DorisConfigOptions.DORIS_THRIFT_MAX_MESSAGE_SIZE; import static org.apache.doris.flink.table.DorisConfigOptions.FENODES; import static org.apache.doris.flink.table.DorisConfigOptions.IDENTIFIER; import static org.apache.doris.flink.table.DorisConfigOptions.JDBC_URL; @@ -128,6 +129,8 @@ public Set> optionalOptions() { options.add(DORIS_DESERIALIZE_QUEUE_SIZE); options.add(DORIS_BATCH_SIZE); options.add(DORIS_EXEC_MEM_LIMIT); + options.add(DORIS_THRIFT_MAX_MESSAGE_SIZE); + options.add(LOOKUP_CACHE_MAX_ROWS); options.add(LOOKUP_CACHE_TTL); options.add(LOOKUP_MAX_RETRIES); @@ -205,6 +208,7 @@ private DorisReadOptions getDorisReadOptions(ReadableConfig readableConfig) { builder.setDeserializeArrowAsync(readableConfig.get(DORIS_DESERIALIZE_ARROW_ASYNC)) .setDeserializeQueueSize(readableConfig.get(DORIS_DESERIALIZE_QUEUE_SIZE)) .setExecMemLimit(readableConfig.get(DORIS_EXEC_MEM_LIMIT).getBytes()) + .setThriftMaxMessageSize(readableConfig.get(DORIS_THRIFT_MAX_MESSAGE_SIZE)) .setFilterQuery(readableConfig.get(DORIS_FILTER_QUERY)) .setReadFields(readableConfig.get(DORIS_READ_FIELD)) .setRequestQueryTimeoutS( diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java index e13eeb365..5e6da5754 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java @@ -211,6 +211,7 @@ public void testTableSourceAllOptions() throws Exception { + " 'doris.request.tablet.size' = '2'," + " 'doris.batch.size' = '1024'," + " 'doris.exec.mem.limit' = '2048mb'," + + " 'doris.thrift.max.message.size' = '10485760'," + " 'doris.deserialize.arrow.async' = 'true'," + " 'doris.deserialize.queue.size' = '32'" + ")", diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/table/DorisDynamicTableFactoryTest.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/table/DorisDynamicTableFactoryTest.java index 0004af05c..69f9b3a50 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/table/DorisDynamicTableFactoryTest.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/table/DorisDynamicTableFactoryTest.java @@ -40,6 +40,7 @@ import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT; import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_REQUEST_RETRIES_DEFAULT; import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_TABLET_SIZE_DEFAULT; +import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_THRIFT_MAX_MESSAGE_SIZE_DEFAULT; import static org.apache.doris.flink.utils.FactoryMocks.SCHEMA; import static org.apache.flink.table.factories.utils.FactoryMocks.createTableSink; import static org.apache.flink.table.factories.utils.FactoryMocks.createTableSource; @@ -57,6 +58,7 @@ public void testDorisSourceProperties() { properties.put("doris.exec.mem.limit", "8192mb"); properties.put("doris.deserialize.arrow.async", "false"); properties.put("doris.deserialize.queue.size", "64"); + properties.put("doris.thrift.max.message.size", Integer.MAX_VALUE + ""); properties.put("lookup.cache.max-rows", "100"); properties.put("lookup.cache.ttl", "20s"); @@ -98,7 +100,8 @@ public void testDorisSourceProperties() { .setRequestConnectTimeoutMs(DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT) .setRequestReadTimeoutMs(DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT) .setRequestRetries(DORIS_REQUEST_RETRIES_DEFAULT) - .setRequestTabletSize(DORIS_TABLET_SIZE_DEFAULT); + .setRequestTabletSize(DORIS_TABLET_SIZE_DEFAULT) + .setThriftMaxMessageSize(DORIS_THRIFT_MAX_MESSAGE_SIZE_DEFAULT); DorisDynamicTableSource expected = new DorisDynamicTableSource( options,