Skip to content

Commit b8b7d6c

Browse files
authored
add thrfit max messahe size (#593)
1 parent 280bd66 commit b8b7d6c

7 files changed

Lines changed: 60 additions & 10 deletions

File tree

flink-doris-connector/src/main/java/org/apache/doris/flink/backend/BackendClient.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ public class BackendClient {
5454
private final int retries;
5555
private final int socketTimeout;
5656
private final int connectTimeout;
57+
private final int thriftMaxMessageSize;
5758

5859
public BackendClient(Routing routing, DorisReadOptions readOptions) {
5960
this.routing = routing;
@@ -69,11 +70,16 @@ public BackendClient(Routing routing, DorisReadOptions readOptions) {
6970
readOptions.getRequestRetries() == null
7071
? ConfigurationOptions.DORIS_REQUEST_RETRIES_DEFAULT
7172
: readOptions.getRequestRetries();
73+
this.thriftMaxMessageSize =
74+
readOptions.getRequestRetries() == null
75+
? ConfigurationOptions.DORIS_THRIFT_MAX_MESSAGE_SIZE_DEFAULT
76+
: readOptions.getThriftMaxMessageSize();
7277
logger.trace(
73-
"connect timeout set to '{}'. socket timeout set to '{}'. retries set to '{}'.",
78+
"connect timeout set to '{}'. socket timeout set to '{}'. retries set to '{}'. thrift MAX_MESSAGE_SIZE set to '{}'",
7479
this.connectTimeout,
7580
this.socketTimeout,
76-
this.retries);
81+
this.retries,
82+
this.thriftMaxMessageSize);
7783
open();
7884
}
7985

@@ -84,9 +90,11 @@ private void open() {
8490
logger.debug("Attempt {} to connect {}.", attempt, routing);
8591
try {
8692
TBinaryProtocol.Factory factory = new TBinaryProtocol.Factory();
93+
TConfiguration.Builder configBuilder = TConfiguration.custom();
94+
configBuilder.setMaxMessageSize(thriftMaxMessageSize);
8795
transport =
8896
new TSocket(
89-
new TConfiguration(),
97+
configBuilder.build(),
9098
routing.getHost(),
9199
routing.getPort(),
92100
socketTimeout,

flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/ConfigurationOptions.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@ public interface ConfigurationOptions {
5252
Boolean DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT = false;
5353
String DORIS_DESERIALIZE_QUEUE_SIZE = "doris.deserialize.queue.size";
5454
Integer DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT = 64;
55+
String DORIS_THRIFT_MAX_MESSAGE_SIZE = "doris.thrift.max.message.size";
56+
Integer DORIS_THRIFT_MAX_MESSAGE_SIZE_DEFAULT = Integer.MAX_VALUE;
5557

5658
String USE_FLIGHT_SQL = "source.use-flight-sql";
5759
Boolean USE_FLIGHT_SQL_DEFAULT = true;

flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java

Lines changed: 29 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ public class DorisReadOptions implements Serializable {
4141
private Integer flightSqlPort;
4242
// for flink sql limit push down
4343
private Long rowLimit;
44+
private Integer thriftMaxMessageSize;
4445

4546
public DorisReadOptions(
4647
String readFields,
@@ -57,7 +58,8 @@ public DorisReadOptions(
5758
boolean useOldApi,
5859
boolean useFlightSql,
5960
Integer flightSqlPort,
60-
Long rowLimit) {
61+
Long rowLimit,
62+
Integer thriftMaxMessageSize) {
6163
this.readFields = readFields;
6264
this.filterQuery = filterQuery;
6365
this.requestTabletSize = requestTabletSize;
@@ -73,6 +75,7 @@ public DorisReadOptions(
7375
this.useFlightSql = useFlightSql;
7476
this.flightSqlPort = flightSqlPort;
7577
this.rowLimit = rowLimit;
78+
this.thriftMaxMessageSize = thriftMaxMessageSize;
7679
}
7780

7881
public String getReadFields() {
@@ -151,6 +154,10 @@ public void setRowLimit(Long rowLimit) {
151154
this.rowLimit = rowLimit;
152155
}
153156

157+
public Integer getThriftMaxMessageSize() {
158+
return thriftMaxMessageSize;
159+
}
160+
154161
public static Builder builder() {
155162
return new Builder();
156163
}
@@ -182,7 +189,8 @@ public boolean equals(Object o) {
182189
&& Objects.equals(deserializeArrowAsync, that.deserializeArrowAsync)
183190
&& Objects.equals(useFlightSql, that.useFlightSql)
184191
&& Objects.equals(flightSqlPort, that.flightSqlPort)
185-
&& Objects.equals(rowLimit, that.rowLimit);
192+
&& Objects.equals(rowLimit, that.rowLimit)
193+
&& Objects.equals(thriftMaxMessageSize, that.thriftMaxMessageSize);
186194
}
187195

188196
@Override
@@ -202,7 +210,8 @@ public int hashCode() {
202210
useOldApi,
203211
useFlightSql,
204212
flightSqlPort,
205-
rowLimit);
213+
rowLimit,
214+
thriftMaxMessageSize);
206215
}
207216

208217
public DorisReadOptions copy() {
@@ -221,7 +230,8 @@ public DorisReadOptions copy() {
221230
useOldApi,
222231
useFlightSql,
223232
flightSqlPort,
224-
rowLimit);
233+
rowLimit,
234+
thriftMaxMessageSize);
225235
}
226236

227237
/** Builder of {@link DorisReadOptions}. */
@@ -247,7 +257,8 @@ public static class Builder {
247257
private Boolean useFlightSql = ConfigurationOptions.USE_FLIGHT_SQL_DEFAULT;
248258
private Integer flightSqlPort = ConfigurationOptions.FLIGHT_SQL_PORT_DEFAULT;
249259
private Long rowLimit;
250-
260+
private Integer thriftMaxMessageSize =
261+
ConfigurationOptions.DORIS_THRIFT_MAX_MESSAGE_SIZE_DEFAULT;
251262
/**
252263
* Sets the readFields for doris table to push down projection, such as name,age.
253264
*
@@ -406,6 +417,17 @@ public Builder setFlightSqlPort(Integer flightSqlPort) {
406417
return this;
407418
}
408419

420+
/**
421+
* Sets the thriftMaxMessageSize for DorisReadOptions.
422+
*
423+
* @param thriftMaxMessageSize
424+
* @return
425+
*/
426+
public Builder setThriftMaxMessageSize(Integer thriftMaxMessageSize) {
427+
this.thriftMaxMessageSize = thriftMaxMessageSize;
428+
return this;
429+
}
430+
409431
/**
410432
* Build the {@link DorisReadOptions}.
411433
*
@@ -427,7 +449,8 @@ public DorisReadOptions build() {
427449
useOldApi,
428450
useFlightSql,
429451
flightSqlPort,
430-
rowLimit);
452+
rowLimit,
453+
thriftMaxMessageSize);
431454
}
432455
}
433456
}

flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT;
3939
import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_REQUEST_RETRIES_DEFAULT;
4040
import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_TABLET_SIZE_DEFAULT;
41+
import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_THRIFT_MAX_MESSAGE_SIZE_DEFAULT;
4142

4243
/** Options for the Doris connector. */
4344
@PublicEvolving
@@ -151,6 +152,14 @@ public class DorisConfigOptions {
151152
.memoryType()
152153
.defaultValue(MemorySize.parse(DORIS_EXEC_MEM_LIMIT_DEFAULT_STR))
153154
.withDescription("Memory limit for a single query. The default is 8192mb.");
155+
156+
public static final ConfigOption<Integer> DORIS_THRIFT_MAX_MESSAGE_SIZE =
157+
ConfigOptions.key("doris.thrift.max.message.size")
158+
.intType()
159+
.defaultValue(DORIS_THRIFT_MAX_MESSAGE_SIZE_DEFAULT)
160+
.withDescription(
161+
"The maximum message size for thrift protocol. The default is Integer.MAX_VALUE.");
162+
154163
public static final ConfigOption<Boolean> SOURCE_USE_OLD_API =
155164
ConfigOptions.key("source.use-old-api")
156165
.booleanType()

flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
import static org.apache.doris.flink.table.DorisConfigOptions.DORIS_REQUEST_READ_TIMEOUT_MS;
5252
import static org.apache.doris.flink.table.DorisConfigOptions.DORIS_REQUEST_RETRIES;
5353
import static org.apache.doris.flink.table.DorisConfigOptions.DORIS_TABLET_SIZE;
54+
import static org.apache.doris.flink.table.DorisConfigOptions.DORIS_THRIFT_MAX_MESSAGE_SIZE;
5455
import static org.apache.doris.flink.table.DorisConfigOptions.FENODES;
5556
import static org.apache.doris.flink.table.DorisConfigOptions.FLIGHT_SQL_PORT;
5657
import static org.apache.doris.flink.table.DorisConfigOptions.IDENTIFIER;
@@ -130,6 +131,8 @@ public Set<ConfigOption<?>> optionalOptions() {
130131
options.add(DORIS_DESERIALIZE_QUEUE_SIZE);
131132
options.add(DORIS_BATCH_SIZE);
132133
options.add(DORIS_EXEC_MEM_LIMIT);
134+
options.add(DORIS_THRIFT_MAX_MESSAGE_SIZE);
135+
133136
options.add(LOOKUP_CACHE_MAX_ROWS);
134137
options.add(LOOKUP_CACHE_TTL);
135138
options.add(LOOKUP_MAX_RETRIES);
@@ -210,6 +213,7 @@ private DorisReadOptions getDorisReadOptions(ReadableConfig readableConfig) {
210213
builder.setDeserializeArrowAsync(readableConfig.get(DORIS_DESERIALIZE_ARROW_ASYNC))
211214
.setDeserializeQueueSize(readableConfig.get(DORIS_DESERIALIZE_QUEUE_SIZE))
212215
.setExecMemLimit(readableConfig.get(DORIS_EXEC_MEM_LIMIT).getBytes())
216+
.setThriftMaxMessageSize(readableConfig.get(DORIS_THRIFT_MAX_MESSAGE_SIZE))
213217
.setFilterQuery(readableConfig.get(DORIS_FILTER_QUERY))
214218
.setReadFields(readableConfig.get(DORIS_READ_FIELD))
215219
.setRequestQueryTimeoutS(

flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -291,6 +291,7 @@ public void testTableSourceAllOptions() throws Exception {
291291
+ " 'doris.batch.size' = '1024',"
292292
+ " 'doris.exec.mem.limit' = '2048mb',"
293293
+ " 'doris.deserialize.arrow.async' = 'true',"
294+
+ " 'doris.thrift.max.message.size' = '10485760',"
294295
+ " 'doris.deserialize.queue.size' = '32',"
295296
+ " 'source.use-flight-sql' = '%s'"
296297
+ ")",

flink-doris-connector/src/test/java/org/apache/doris/flink/table/DorisDynamicTableFactoryTest.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT;
4141
import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_REQUEST_RETRIES_DEFAULT;
4242
import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_TABLET_SIZE_DEFAULT;
43+
import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_THRIFT_MAX_MESSAGE_SIZE_DEFAULT;
4344
import static org.apache.doris.flink.cfg.ConfigurationOptions.FLIGHT_SQL_PORT_DEFAULT;
4445
import static org.apache.doris.flink.cfg.ConfigurationOptions.USE_FLIGHT_SQL_DEFAULT;
4546
import static org.apache.doris.flink.utils.FactoryMocks.SCHEMA;
@@ -59,6 +60,7 @@ public void testDorisSourceProperties() {
5960
properties.put("doris.exec.mem.limit", "8192mb");
6061
properties.put("doris.deserialize.arrow.async", "false");
6162
properties.put("doris.deserialize.queue.size", "64");
63+
properties.put("doris.thrift.max.message.size", Integer.MAX_VALUE + "");
6264

6365
properties.put("lookup.cache.max-rows", "100");
6466
properties.put("lookup.cache.ttl", "20s");
@@ -103,7 +105,8 @@ public void testDorisSourceProperties() {
103105
.setRequestRetries(DORIS_REQUEST_RETRIES_DEFAULT)
104106
.setRequestTabletSize(DORIS_TABLET_SIZE_DEFAULT)
105107
.setUseFlightSql(USE_FLIGHT_SQL_DEFAULT)
106-
.setFlightSqlPort(FLIGHT_SQL_PORT_DEFAULT);
108+
.setFlightSqlPort(FLIGHT_SQL_PORT_DEFAULT)
109+
.setThriftMaxMessageSize(DORIS_THRIFT_MAX_MESSAGE_SIZE_DEFAULT);
107110
DorisDynamicTableSource expected =
108111
new DorisDynamicTableSource(
109112
options,

0 commit comments

Comments
 (0)