Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public class BackendClient {
private final int retries;
private final int socketTimeout;
private final int connectTimeout;
private final int thriftMaxMessageSize;

public BackendClient(Routing routing, DorisReadOptions readOptions) {
this.routing = routing;
Expand All @@ -69,11 +70,16 @@ public BackendClient(Routing routing, DorisReadOptions readOptions) {
readOptions.getRequestRetries() == null
? ConfigurationOptions.DORIS_REQUEST_RETRIES_DEFAULT
: readOptions.getRequestRetries();
this.thriftMaxMessageSize =
readOptions.getRequestRetries() == null
? ConfigurationOptions.DORIS_THRIFT_MAX_MESSAGE_SIZE_DEFAULT
: readOptions.getThriftMaxMessageSize();
logger.trace(
"connect timeout set to '{}'. socket timeout set to '{}'. retries set to '{}'.",
"connect timeout set to '{}'. socket timeout set to '{}'. retries set to '{}'. thrift MAX_MESSAGE_SIZE set to '{}'",
this.connectTimeout,
this.socketTimeout,
this.retries);
this.retries,
this.thriftMaxMessageSize);
open();
}

Expand All @@ -84,9 +90,11 @@ private void open() {
logger.debug("Attempt {} to connect {}.", attempt, routing);
try {
TBinaryProtocol.Factory factory = new TBinaryProtocol.Factory();
TConfiguration.Builder configBuilder = TConfiguration.custom();
configBuilder.setMaxMessageSize(thriftMaxMessageSize);
transport =
new TSocket(
new TConfiguration(),
configBuilder.build(),
routing.getHost(),
routing.getPort(),
socketTimeout,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,7 @@ public interface ConfigurationOptions {
Boolean DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT = false;
String DORIS_DESERIALIZE_QUEUE_SIZE = "doris.deserialize.queue.size";
Integer DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT = 64;

String DORIS_THRIFT_MAX_MESSAGE_SIZE = "doris.thrift.max.message.size";
Integer DORIS_THRIFT_MAX_MESSAGE_SIZE_DEFAULT = Integer.MAX_VALUE;
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public class DorisReadOptions implements Serializable {
private Integer deserializeQueueSize;
private Boolean deserializeArrowAsync;
private boolean useOldApi;
private Integer thriftMaxMessageSize;

public DorisReadOptions(
String readFields,
Expand All @@ -50,7 +51,8 @@ public DorisReadOptions(
Long execMemLimit,
Integer deserializeQueueSize,
Boolean deserializeArrowAsync,
boolean useOldApi) {
boolean useOldApi,
Integer thriftMaxMessageSize) {
this.readFields = readFields;
this.filterQuery = filterQuery;
this.requestTabletSize = requestTabletSize;
Expand All @@ -63,6 +65,7 @@ public DorisReadOptions(
this.deserializeQueueSize = deserializeQueueSize;
this.deserializeArrowAsync = deserializeArrowAsync;
this.useOldApi = useOldApi;
this.thriftMaxMessageSize = thriftMaxMessageSize;
}

public String getReadFields() {
Expand Down Expand Up @@ -113,6 +116,10 @@ public boolean getUseOldApi() {
return useOldApi;
}

public Integer getThriftMaxMessageSize() {
return thriftMaxMessageSize;
}

public void setReadFields(String readFields) {
this.readFields = readFields;
}
Expand Down Expand Up @@ -149,7 +156,8 @@ public boolean equals(Object o) {
&& Objects.equals(requestBatchSize, that.requestBatchSize)
&& Objects.equals(execMemLimit, that.execMemLimit)
&& Objects.equals(deserializeQueueSize, that.deserializeQueueSize)
&& Objects.equals(deserializeArrowAsync, that.deserializeArrowAsync);
&& Objects.equals(deserializeArrowAsync, that.deserializeArrowAsync)
&& Objects.equals(thriftMaxMessageSize, that.thriftMaxMessageSize);
}

@Override
Expand All @@ -166,7 +174,8 @@ public int hashCode() {
execMemLimit,
deserializeQueueSize,
deserializeArrowAsync,
useOldApi);
useOldApi,
thriftMaxMessageSize);
}

/** Builder of {@link DorisReadOptions}. */
Expand All @@ -184,6 +193,8 @@ public static class Builder {
private Integer deserializeQueueSize;
private Boolean deserializeArrowAsync;
private Boolean useOldApi = false;
private Integer thriftMaxMessageSize =
ConfigurationOptions.DORIS_THRIFT_MAX_MESSAGE_SIZE_DEFAULT;

public Builder setReadFields(String readFields) {
this.readFields = readFields;
Expand Down Expand Up @@ -245,6 +256,11 @@ public Builder setUseOldApi(boolean useOldApi) {
return this;
}

public Builder setThriftMaxMessageSize(Integer thriftMaxMessageSize) {
this.thriftMaxMessageSize = thriftMaxMessageSize;
return this;
}

public DorisReadOptions build() {
return new DorisReadOptions(
readFields,
Expand All @@ -258,7 +274,8 @@ public DorisReadOptions build() {
execMemLimit,
deserializeQueueSize,
deserializeArrowAsync,
useOldApi);
useOldApi,
thriftMaxMessageSize);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT;
import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_REQUEST_RETRIES_DEFAULT;
import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_TABLET_SIZE_DEFAULT;
import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_THRIFT_MAX_MESSAGE_SIZE_DEFAULT;

/** Options for the Doris connector. */
@PublicEvolving
Expand Down Expand Up @@ -158,6 +159,13 @@ public class DorisConfigOptions {
.withDescription(
"Whether to read data using the new interface defined according to the FLIP-27 specification,default false");

public static final ConfigOption<Integer> DORIS_THRIFT_MAX_MESSAGE_SIZE =
ConfigOptions.key("doris.thrift.max.message.size")
.intType()
.defaultValue(DORIS_THRIFT_MAX_MESSAGE_SIZE_DEFAULT)
.withDescription(
"The maximum message size for thrift protocol. The default is Integer.MAX_VALUE.");

// Lookup options
public static final ConfigOption<Long> LOOKUP_CACHE_MAX_ROWS =
ConfigOptions.key("lookup.cache.max-rows")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import static org.apache.doris.flink.table.DorisConfigOptions.DORIS_REQUEST_READ_TIMEOUT_MS;
import static org.apache.doris.flink.table.DorisConfigOptions.DORIS_REQUEST_RETRIES;
import static org.apache.doris.flink.table.DorisConfigOptions.DORIS_TABLET_SIZE;
import static org.apache.doris.flink.table.DorisConfigOptions.DORIS_THRIFT_MAX_MESSAGE_SIZE;
import static org.apache.doris.flink.table.DorisConfigOptions.FENODES;
import static org.apache.doris.flink.table.DorisConfigOptions.IDENTIFIER;
import static org.apache.doris.flink.table.DorisConfigOptions.JDBC_URL;
Expand Down Expand Up @@ -128,6 +129,8 @@ public Set<ConfigOption<?>> optionalOptions() {
options.add(DORIS_DESERIALIZE_QUEUE_SIZE);
options.add(DORIS_BATCH_SIZE);
options.add(DORIS_EXEC_MEM_LIMIT);
options.add(DORIS_THRIFT_MAX_MESSAGE_SIZE);

options.add(LOOKUP_CACHE_MAX_ROWS);
options.add(LOOKUP_CACHE_TTL);
options.add(LOOKUP_MAX_RETRIES);
Expand Down Expand Up @@ -205,6 +208,7 @@ private DorisReadOptions getDorisReadOptions(ReadableConfig readableConfig) {
builder.setDeserializeArrowAsync(readableConfig.get(DORIS_DESERIALIZE_ARROW_ASYNC))
.setDeserializeQueueSize(readableConfig.get(DORIS_DESERIALIZE_QUEUE_SIZE))
.setExecMemLimit(readableConfig.get(DORIS_EXEC_MEM_LIMIT).getBytes())
.setThriftMaxMessageSize(readableConfig.get(DORIS_THRIFT_MAX_MESSAGE_SIZE))
.setFilterQuery(readableConfig.get(DORIS_FILTER_QUERY))
.setReadFields(readableConfig.get(DORIS_READ_FIELD))
.setRequestQueryTimeoutS(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ public void testTableSourceAllOptions() throws Exception {
+ " 'doris.request.tablet.size' = '2',"
+ " 'doris.batch.size' = '1024',"
+ " 'doris.exec.mem.limit' = '2048mb',"
+ " 'doris.thrift.max.message.size' = '10485760',"
+ " 'doris.deserialize.arrow.async' = 'true',"
+ " 'doris.deserialize.queue.size' = '32'"
+ ")",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT;
import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_REQUEST_RETRIES_DEFAULT;
import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_TABLET_SIZE_DEFAULT;
import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_THRIFT_MAX_MESSAGE_SIZE_DEFAULT;
import static org.apache.doris.flink.utils.FactoryMocks.SCHEMA;
import static org.apache.flink.table.factories.utils.FactoryMocks.createTableSink;
import static org.apache.flink.table.factories.utils.FactoryMocks.createTableSource;
Expand All @@ -57,6 +58,7 @@ public void testDorisSourceProperties() {
properties.put("doris.exec.mem.limit", "8192mb");
properties.put("doris.deserialize.arrow.async", "false");
properties.put("doris.deserialize.queue.size", "64");
properties.put("doris.thrift.max.message.size", Integer.MAX_VALUE + "");

properties.put("lookup.cache.max-rows", "100");
properties.put("lookup.cache.ttl", "20s");
Expand Down Expand Up @@ -98,7 +100,8 @@ public void testDorisSourceProperties() {
.setRequestConnectTimeoutMs(DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT)
.setRequestReadTimeoutMs(DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT)
.setRequestRetries(DORIS_REQUEST_RETRIES_DEFAULT)
.setRequestTabletSize(DORIS_TABLET_SIZE_DEFAULT);
.setRequestTabletSize(DORIS_TABLET_SIZE_DEFAULT)
.setThriftMaxMessageSize(DORIS_THRIFT_MAX_MESSAGE_SIZE_DEFAULT);
DorisDynamicTableSource expected =
new DorisDynamicTableSource(
options,
Expand Down
Loading