Skip to content

Commit fc26b48

Browse files
committed
pick thrift message size
1 parent e8ae64e commit fc26b48

7 files changed

Lines changed: 52 additions & 8 deletions

File tree

flink-doris-connector/src/main/java/org/apache/doris/flink/backend/BackendClient.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ public class BackendClient {
5454
private final int retries;
5555
private final int socketTimeout;
5656
private final int connectTimeout;
57+
private final int thriftMaxMessageSize;
5758

5859
public BackendClient(Routing routing, DorisReadOptions readOptions) {
5960
this.routing = routing;
@@ -69,11 +70,16 @@ public BackendClient(Routing routing, DorisReadOptions readOptions) {
6970
readOptions.getRequestRetries() == null
7071
? ConfigurationOptions.DORIS_REQUEST_RETRIES_DEFAULT
7172
: readOptions.getRequestRetries();
73+
this.thriftMaxMessageSize =
74+
readOptions.getRequestRetries() == null
75+
? ConfigurationOptions.DORIS_THRIFT_MAX_MESSAGE_SIZE_DEFAULT
76+
: readOptions.getThriftMaxMessageSize();
7277
logger.trace(
73-
"connect timeout set to '{}'. socket timeout set to '{}'. retries set to '{}'.",
78+
"connect timeout set to '{}'. socket timeout set to '{}'. retries set to '{}'. thrift MAX_MESSAGE_SIZE set to '{}'",
7479
this.connectTimeout,
7580
this.socketTimeout,
76-
this.retries);
81+
this.retries,
82+
this.thriftMaxMessageSize);
7783
open();
7884
}
7985

@@ -84,9 +90,11 @@ private void open() {
8490
logger.debug("Attempt {} to connect {}.", attempt, routing);
8591
try {
8692
TBinaryProtocol.Factory factory = new TBinaryProtocol.Factory();
93+
TConfiguration.Builder configBuilder = TConfiguration.custom();
94+
configBuilder.setMaxMessageSize(thriftMaxMessageSize);
8795
transport =
8896
new TSocket(
89-
new TConfiguration(),
97+
configBuilder.build(),
9098
routing.getHost(),
9199
routing.getPort(),
92100
socketTimeout,

flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/ConfigurationOptions.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,4 +51,7 @@ public interface ConfigurationOptions {
5151
Boolean DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT = false;
5252
String DORIS_DESERIALIZE_QUEUE_SIZE = "doris.deserialize.queue.size";
5353
Integer DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT = 64;
54+
55+
String DORIS_THRIFT_MAX_MESSAGE_SIZE = "doris.thrift.max.message.size";
56+
Integer DORIS_THRIFT_MAX_MESSAGE_SIZE_DEFAULT = Integer.MAX_VALUE;
5457
}

flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ public class DorisReadOptions implements Serializable {
3737
private Integer deserializeQueueSize;
3838
private Boolean deserializeArrowAsync;
3939
private boolean useOldApi;
40+
private Integer thriftMaxMessageSize;
4041

4142
public DorisReadOptions(
4243
String readFields,
@@ -50,7 +51,8 @@ public DorisReadOptions(
5051
Long execMemLimit,
5152
Integer deserializeQueueSize,
5253
Boolean deserializeArrowAsync,
53-
boolean useOldApi) {
54+
boolean useOldApi,
55+
Integer thriftMaxMessageSize) {
5456
this.readFields = readFields;
5557
this.filterQuery = filterQuery;
5658
this.requestTabletSize = requestTabletSize;
@@ -63,6 +65,7 @@ public DorisReadOptions(
6365
this.deserializeQueueSize = deserializeQueueSize;
6466
this.deserializeArrowAsync = deserializeArrowAsync;
6567
this.useOldApi = useOldApi;
68+
this.thriftMaxMessageSize = thriftMaxMessageSize;
6669
}
6770

6871
public String getReadFields() {
@@ -113,6 +116,10 @@ public boolean getUseOldApi() {
113116
return useOldApi;
114117
}
115118

119+
public Integer getThriftMaxMessageSize() {
120+
return thriftMaxMessageSize;
121+
}
122+
116123
public void setReadFields(String readFields) {
117124
this.readFields = readFields;
118125
}
@@ -149,7 +156,8 @@ public boolean equals(Object o) {
149156
&& Objects.equals(requestBatchSize, that.requestBatchSize)
150157
&& Objects.equals(execMemLimit, that.execMemLimit)
151158
&& Objects.equals(deserializeQueueSize, that.deserializeQueueSize)
152-
&& Objects.equals(deserializeArrowAsync, that.deserializeArrowAsync);
159+
&& Objects.equals(deserializeArrowAsync, that.deserializeArrowAsync)
160+
&& Objects.equals(thriftMaxMessageSize, that.thriftMaxMessageSize);
153161
}
154162

155163
@Override
@@ -166,7 +174,8 @@ public int hashCode() {
166174
execMemLimit,
167175
deserializeQueueSize,
168176
deserializeArrowAsync,
169-
useOldApi);
177+
useOldApi,
178+
thriftMaxMessageSize);
170179
}
171180

172181
/** Builder of {@link DorisReadOptions}. */
@@ -184,6 +193,8 @@ public static class Builder {
184193
private Integer deserializeQueueSize;
185194
private Boolean deserializeArrowAsync;
186195
private Boolean useOldApi = false;
196+
private Integer thriftMaxMessageSize =
197+
ConfigurationOptions.DORIS_THRIFT_MAX_MESSAGE_SIZE_DEFAULT;
187198

188199
public Builder setReadFields(String readFields) {
189200
this.readFields = readFields;
@@ -245,6 +256,11 @@ public Builder setUseOldApi(boolean useOldApi) {
245256
return this;
246257
}
247258

259+
public Builder setThriftMaxMessageSize(Integer thriftMaxMessageSize) {
260+
this.thriftMaxMessageSize = thriftMaxMessageSize;
261+
return this;
262+
}
263+
248264
public DorisReadOptions build() {
249265
return new DorisReadOptions(
250266
readFields,
@@ -258,7 +274,8 @@ public DorisReadOptions build() {
258274
execMemLimit,
259275
deserializeQueueSize,
260276
deserializeArrowAsync,
261-
useOldApi);
277+
useOldApi,
278+
thriftMaxMessageSize);
262279
}
263280
}
264281
}

flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT;
3939
import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_REQUEST_RETRIES_DEFAULT;
4040
import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_TABLET_SIZE_DEFAULT;
41+
import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_THRIFT_MAX_MESSAGE_SIZE_DEFAULT;
4142

4243
/** Options for the Doris connector. */
4344
@PublicEvolving
@@ -158,6 +159,13 @@ public class DorisConfigOptions {
158159
.withDescription(
159160
"Whether to read data using the new interface defined according to the FLIP-27 specification,default false");
160161

162+
public static final ConfigOption<Integer> DORIS_THRIFT_MAX_MESSAGE_SIZE =
163+
ConfigOptions.key("doris.thrift.max.message.size")
164+
.intType()
165+
.defaultValue(DORIS_THRIFT_MAX_MESSAGE_SIZE_DEFAULT)
166+
.withDescription(
167+
"The maximum message size for thrift protocol. The default is Integer.MAX_VALUE.");
168+
161169
// Lookup options
162170
public static final ConfigOption<Long> LOOKUP_CACHE_MAX_ROWS =
163171
ConfigOptions.key("lookup.cache.max-rows")

flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
import static org.apache.doris.flink.table.DorisConfigOptions.DORIS_REQUEST_READ_TIMEOUT_MS;
5252
import static org.apache.doris.flink.table.DorisConfigOptions.DORIS_REQUEST_RETRIES;
5353
import static org.apache.doris.flink.table.DorisConfigOptions.DORIS_TABLET_SIZE;
54+
import static org.apache.doris.flink.table.DorisConfigOptions.DORIS_THRIFT_MAX_MESSAGE_SIZE;
5455
import static org.apache.doris.flink.table.DorisConfigOptions.FENODES;
5556
import static org.apache.doris.flink.table.DorisConfigOptions.IDENTIFIER;
5657
import static org.apache.doris.flink.table.DorisConfigOptions.JDBC_URL;
@@ -128,6 +129,8 @@ public Set<ConfigOption<?>> optionalOptions() {
128129
options.add(DORIS_DESERIALIZE_QUEUE_SIZE);
129130
options.add(DORIS_BATCH_SIZE);
130131
options.add(DORIS_EXEC_MEM_LIMIT);
132+
options.add(DORIS_THRIFT_MAX_MESSAGE_SIZE);
133+
131134
options.add(LOOKUP_CACHE_MAX_ROWS);
132135
options.add(LOOKUP_CACHE_TTL);
133136
options.add(LOOKUP_MAX_RETRIES);
@@ -205,6 +208,7 @@ private DorisReadOptions getDorisReadOptions(ReadableConfig readableConfig) {
205208
builder.setDeserializeArrowAsync(readableConfig.get(DORIS_DESERIALIZE_ARROW_ASYNC))
206209
.setDeserializeQueueSize(readableConfig.get(DORIS_DESERIALIZE_QUEUE_SIZE))
207210
.setExecMemLimit(readableConfig.get(DORIS_EXEC_MEM_LIMIT).getBytes())
211+
.setThriftMaxMessageSize(readableConfig.get(DORIS_THRIFT_MAX_MESSAGE_SIZE))
208212
.setFilterQuery(readableConfig.get(DORIS_FILTER_QUERY))
209213
.setReadFields(readableConfig.get(DORIS_READ_FIELD))
210214
.setRequestQueryTimeoutS(

flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -211,6 +211,7 @@ public void testTableSourceAllOptions() throws Exception {
211211
+ " 'doris.request.tablet.size' = '2',"
212212
+ " 'doris.batch.size' = '1024',"
213213
+ " 'doris.exec.mem.limit' = '2048mb',"
214+
+ " 'doris.thrift.max.message.size' = '10485760',"
214215
+ " 'doris.deserialize.arrow.async' = 'true',"
215216
+ " 'doris.deserialize.queue.size' = '32'"
216217
+ ")",

flink-doris-connector/src/test/java/org/apache/doris/flink/table/DorisDynamicTableFactoryTest.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT;
4141
import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_REQUEST_RETRIES_DEFAULT;
4242
import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_TABLET_SIZE_DEFAULT;
43+
import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_THRIFT_MAX_MESSAGE_SIZE_DEFAULT;
4344
import static org.apache.doris.flink.utils.FactoryMocks.SCHEMA;
4445
import static org.apache.flink.table.factories.utils.FactoryMocks.createTableSink;
4546
import static org.apache.flink.table.factories.utils.FactoryMocks.createTableSource;
@@ -57,6 +58,7 @@ public void testDorisSourceProperties() {
5758
properties.put("doris.exec.mem.limit", "8192mb");
5859
properties.put("doris.deserialize.arrow.async", "false");
5960
properties.put("doris.deserialize.queue.size", "64");
61+
properties.put("doris.thrift.max.message.size", Integer.MAX_VALUE + "");
6062

6163
properties.put("lookup.cache.max-rows", "100");
6264
properties.put("lookup.cache.ttl", "20s");
@@ -98,7 +100,8 @@ public void testDorisSourceProperties() {
98100
.setRequestConnectTimeoutMs(DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT)
99101
.setRequestReadTimeoutMs(DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT)
100102
.setRequestRetries(DORIS_REQUEST_RETRIES_DEFAULT)
101-
.setRequestTabletSize(DORIS_TABLET_SIZE_DEFAULT);
103+
.setRequestTabletSize(DORIS_TABLET_SIZE_DEFAULT)
104+
.setThriftMaxMessageSize(DORIS_THRIFT_MAX_MESSAGE_SIZE_DEFAULT);
102105
DorisDynamicTableSource expected =
103106
new DorisDynamicTableSource(
104107
options,

0 commit comments

Comments
 (0)