Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public class BackendClient {
private final int retries;
private final int socketTimeout;
private final int connectTimeout;
private final int thriftMaxMessageSize;

public BackendClient(Routing routing, DorisReadOptions readOptions) {
this.routing = routing;
Expand All @@ -69,11 +70,16 @@ public BackendClient(Routing routing, DorisReadOptions readOptions) {
readOptions.getRequestRetries() == null
? ConfigurationOptions.DORIS_REQUEST_RETRIES_DEFAULT
: readOptions.getRequestRetries();
this.thriftMaxMessageSize =
readOptions.getRequestRetries() == null
? ConfigurationOptions.DORIS_THRIFT_MAX_MESSAGE_SIZE_DEFAULT
: readOptions.getThriftMaxMessageSize();
logger.trace(
"connect timeout set to '{}'. socket timeout set to '{}'. retries set to '{}'.",
"connect timeout set to '{}'. socket timeout set to '{}'. retries set to '{}'. thrift MAX_MESSAGE_SIZE set to '{}'",
this.connectTimeout,
this.socketTimeout,
this.retries);
this.retries,
this.thriftMaxMessageSize);
open();
}

Expand All @@ -84,9 +90,11 @@ private void open() {
logger.debug("Attempt {} to connect {}.", attempt, routing);
try {
TBinaryProtocol.Factory factory = new TBinaryProtocol.Factory();
TConfiguration.Builder configBuilder = TConfiguration.custom();
configBuilder.setMaxMessageSize(thriftMaxMessageSize);
transport =
new TSocket(
new TConfiguration(),
configBuilder.build(),
routing.getHost(),
routing.getPort(),
socketTimeout,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ public interface ConfigurationOptions {
Boolean DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT = false;
String DORIS_DESERIALIZE_QUEUE_SIZE = "doris.deserialize.queue.size";
Integer DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT = 64;
String DORIS_THRIFT_MAX_MESSAGE_SIZE = "doris.thrift.max.message.size";
Integer DORIS_THRIFT_MAX_MESSAGE_SIZE_DEFAULT = Integer.MAX_VALUE;

String USE_FLIGHT_SQL = "source.use-flight-sql";
Boolean USE_FLIGHT_SQL_DEFAULT = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public class DorisReadOptions implements Serializable {
private Integer flightSqlPort;
// for flink sql limit push down
private Long rowLimit;
private Integer thriftMaxMessageSize;

public DorisReadOptions(
String readFields,
Expand All @@ -57,7 +58,8 @@ public DorisReadOptions(
boolean useOldApi,
boolean useFlightSql,
Integer flightSqlPort,
Long rowLimit) {
Long rowLimit,
Integer thriftMaxMessageSize) {
this.readFields = readFields;
this.filterQuery = filterQuery;
this.requestTabletSize = requestTabletSize;
Expand All @@ -73,6 +75,7 @@ public DorisReadOptions(
this.useFlightSql = useFlightSql;
this.flightSqlPort = flightSqlPort;
this.rowLimit = rowLimit;
this.thriftMaxMessageSize = thriftMaxMessageSize;
}

public String getReadFields() {
Expand Down Expand Up @@ -151,6 +154,10 @@ public void setRowLimit(Long rowLimit) {
this.rowLimit = rowLimit;
}

public Integer getThriftMaxMessageSize() {
return thriftMaxMessageSize;
}

public static Builder builder() {
return new Builder();
}
Expand Down Expand Up @@ -182,7 +189,8 @@ public boolean equals(Object o) {
&& Objects.equals(deserializeArrowAsync, that.deserializeArrowAsync)
&& Objects.equals(useFlightSql, that.useFlightSql)
&& Objects.equals(flightSqlPort, that.flightSqlPort)
&& Objects.equals(rowLimit, that.rowLimit);
&& Objects.equals(rowLimit, that.rowLimit)
&& Objects.equals(thriftMaxMessageSize, that.thriftMaxMessageSize);
}

@Override
Expand All @@ -202,7 +210,8 @@ public int hashCode() {
useOldApi,
useFlightSql,
flightSqlPort,
rowLimit);
rowLimit,
thriftMaxMessageSize);
}

public DorisReadOptions copy() {
Expand All @@ -221,7 +230,8 @@ public DorisReadOptions copy() {
useOldApi,
useFlightSql,
flightSqlPort,
rowLimit);
rowLimit,
thriftMaxMessageSize);
}

/** Builder of {@link DorisReadOptions}. */
Expand All @@ -247,7 +257,8 @@ public static class Builder {
private Boolean useFlightSql = ConfigurationOptions.USE_FLIGHT_SQL_DEFAULT;
private Integer flightSqlPort = ConfigurationOptions.FLIGHT_SQL_PORT_DEFAULT;
private Long rowLimit;

private Integer thriftMaxMessageSize =
ConfigurationOptions.DORIS_THRIFT_MAX_MESSAGE_SIZE_DEFAULT;
/**
* Sets the readFields for doris table to push down projection, such as name,age.
*
Expand Down Expand Up @@ -406,6 +417,17 @@ public Builder setFlightSqlPort(Integer flightSqlPort) {
return this;
}

/**
* Sets the thriftMaxMessageSize for DorisReadOptions.
*
* @param thriftMaxMessageSize
* @return
*/
public Builder setThriftMaxMessageSize(Integer thriftMaxMessageSize) {
this.thriftMaxMessageSize = thriftMaxMessageSize;
return this;
}

/**
* Build the {@link DorisReadOptions}.
*
Expand All @@ -427,7 +449,8 @@ public DorisReadOptions build() {
useOldApi,
useFlightSql,
flightSqlPort,
rowLimit);
rowLimit,
thriftMaxMessageSize);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT;
import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_REQUEST_RETRIES_DEFAULT;
import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_TABLET_SIZE_DEFAULT;
import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_THRIFT_MAX_MESSAGE_SIZE_DEFAULT;

/** Options for the Doris connector. */
@PublicEvolving
Expand Down Expand Up @@ -151,6 +152,14 @@ public class DorisConfigOptions {
.memoryType()
.defaultValue(MemorySize.parse(DORIS_EXEC_MEM_LIMIT_DEFAULT_STR))
.withDescription("Memory limit for a single query. The default is 8192mb.");

public static final ConfigOption<Integer> DORIS_THRIFT_MAX_MESSAGE_SIZE =
ConfigOptions.key("doris.thrift.max.message.size")
.intType()
.defaultValue(DORIS_THRIFT_MAX_MESSAGE_SIZE_DEFAULT)
.withDescription(
"The maximum message size for thrift protocol. The default is Integer.MAX_VALUE.");

public static final ConfigOption<Boolean> SOURCE_USE_OLD_API =
ConfigOptions.key("source.use-old-api")
.booleanType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import static org.apache.doris.flink.table.DorisConfigOptions.DORIS_REQUEST_READ_TIMEOUT_MS;
import static org.apache.doris.flink.table.DorisConfigOptions.DORIS_REQUEST_RETRIES;
import static org.apache.doris.flink.table.DorisConfigOptions.DORIS_TABLET_SIZE;
import static org.apache.doris.flink.table.DorisConfigOptions.DORIS_THRIFT_MAX_MESSAGE_SIZE;
import static org.apache.doris.flink.table.DorisConfigOptions.FENODES;
import static org.apache.doris.flink.table.DorisConfigOptions.FLIGHT_SQL_PORT;
import static org.apache.doris.flink.table.DorisConfigOptions.IDENTIFIER;
Expand Down Expand Up @@ -130,6 +131,8 @@ public Set<ConfigOption<?>> optionalOptions() {
options.add(DORIS_DESERIALIZE_QUEUE_SIZE);
options.add(DORIS_BATCH_SIZE);
options.add(DORIS_EXEC_MEM_LIMIT);
options.add(DORIS_THRIFT_MAX_MESSAGE_SIZE);

options.add(LOOKUP_CACHE_MAX_ROWS);
options.add(LOOKUP_CACHE_TTL);
options.add(LOOKUP_MAX_RETRIES);
Expand Down Expand Up @@ -210,6 +213,7 @@ private DorisReadOptions getDorisReadOptions(ReadableConfig readableConfig) {
builder.setDeserializeArrowAsync(readableConfig.get(DORIS_DESERIALIZE_ARROW_ASYNC))
.setDeserializeQueueSize(readableConfig.get(DORIS_DESERIALIZE_QUEUE_SIZE))
.setExecMemLimit(readableConfig.get(DORIS_EXEC_MEM_LIMIT).getBytes())
.setThriftMaxMessageSize(readableConfig.get(DORIS_THRIFT_MAX_MESSAGE_SIZE))
.setFilterQuery(readableConfig.get(DORIS_FILTER_QUERY))
.setReadFields(readableConfig.get(DORIS_READ_FIELD))
.setRequestQueryTimeoutS(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,7 @@ public void testTableSourceAllOptions() throws Exception {
+ " 'doris.batch.size' = '1024',"
+ " 'doris.exec.mem.limit' = '2048mb',"
+ " 'doris.deserialize.arrow.async' = 'true',"
+ " 'doris.thrift.max.message.size' = '10485760',"
+ " 'doris.deserialize.queue.size' = '32',"
+ " 'source.use-flight-sql' = '%s'"
+ ")",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT;
import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_REQUEST_RETRIES_DEFAULT;
import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_TABLET_SIZE_DEFAULT;
import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_THRIFT_MAX_MESSAGE_SIZE_DEFAULT;
import static org.apache.doris.flink.cfg.ConfigurationOptions.FLIGHT_SQL_PORT_DEFAULT;
import static org.apache.doris.flink.cfg.ConfigurationOptions.USE_FLIGHT_SQL_DEFAULT;
import static org.apache.doris.flink.utils.FactoryMocks.SCHEMA;
Expand All @@ -59,6 +60,7 @@ public void testDorisSourceProperties() {
properties.put("doris.exec.mem.limit", "8192mb");
properties.put("doris.deserialize.arrow.async", "false");
properties.put("doris.deserialize.queue.size", "64");
properties.put("doris.thrift.max.message.size", Integer.MAX_VALUE + "");

properties.put("lookup.cache.max-rows", "100");
properties.put("lookup.cache.ttl", "20s");
Expand Down Expand Up @@ -103,7 +105,8 @@ public void testDorisSourceProperties() {
.setRequestRetries(DORIS_REQUEST_RETRIES_DEFAULT)
.setRequestTabletSize(DORIS_TABLET_SIZE_DEFAULT)
.setUseFlightSql(USE_FLIGHT_SQL_DEFAULT)
.setFlightSqlPort(FLIGHT_SQL_PORT_DEFAULT);
.setFlightSqlPort(FLIGHT_SQL_PORT_DEFAULT)
.setThriftMaxMessageSize(DORIS_THRIFT_MAX_MESSAGE_SIZE_DEFAULT);
DorisDynamicTableSource expected =
new DorisDynamicTableSource(
options,
Expand Down