splits, int subtaskId) {
+ LOG.info("Source reader {} return splits {}.", subtaskId, splits);
+
+ int readerNum = context.totalParallelism();
+ for (DorisSourceSplit split : splits) {
+ int readerIndex = ReaderSelector.getReaderIndex(readerNum);
+ splitAssignmentPlan.computeIfAbsent(readerIndex, k -> new HashSet<>()).add(split);
+ LOG.info("Re-assign split {} to the {}-th reader.", split.uniqSplitId(), readerIndex);
+ }
+ tryAssignSplitsToReader();
+ }
+
+ @Override
+ public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) {
+ }
+
+ @Override
+ public EmptyState snapshotState(long checkpoint) throws Exception {
+ return new EmptyState();
+ }
+
+ @Override
+ public void close() {
+ // empty
+ }
+
+ @NoArgsConstructor
+ static class ReaderSelector {
+ private static long readerIndex = 0;
+
+ public static int getReaderIndex(int totalReaderNum) {
+ return (int) readerIndex++ % totalReaderNum;
+ }
+ }
+}
diff --git a/bitsail-connectors/connector-doris/src/main/resources/bitsail-connector-unified-doris.json b/bitsail-connectors/connector-doris/src/main/resources/bitsail-connector-unified-doris.json
index d5a8cf09f..623ac5858 100644
--- a/bitsail-connectors/connector-doris/src/main/resources/bitsail-connector-unified-doris.json
+++ b/bitsail-connectors/connector-doris/src/main/resources/bitsail-connector-unified-doris.json
@@ -1,7 +1,8 @@
{
"name": "bitsail-connector-unified-doris",
"classes": [
- "com.bytedance.bitsail.connector.doris.sink.DorisSink"
+ "com.bytedance.bitsail.connector.doris.sink.DorisSink",
+ "com.bytedance.bitsail.connector.doris.source.DorisSource"
],
"libs": [
"bitsail-connector-doris-${version}.jar"
diff --git a/bitsail-connectors/connector-doris/src/main/resources/doris-type-converter.yaml b/bitsail-connectors/connector-doris/src/main/resources/doris-type-converter.yaml
index 93ba709cc..d3cd272b7 100644
--- a/bitsail-connectors/connector-doris/src/main/resources/doris-type-converter.yaml
+++ b/bitsail-connectors/connector-doris/src/main/resources/doris-type-converter.yaml
@@ -36,13 +36,13 @@ engine.type.to.bitsail.type.converter:
target.type: short
- source.type: int
- target.type: long
+ target.type: int
- source.type: smallint
- target.type: long
+ target.type: int
- source.type: integer
- target.type: long
+ target.type: int
- source.type: bigint
target.type: bigint
diff --git a/bitsail-dist/src/main/resources/examples/Doris_Print_Example.json b/bitsail-dist/src/main/resources/examples/Doris_Print_Example.json
new file mode 100644
index 000000000..e69de29bb
diff --git a/bitsail-test/bitsail-test-integration/bitsail-test-integration-doris/src/test/java/com/bytedance/bitsail/test/integration/doris/DorisSinkITCase.java b/bitsail-test/bitsail-test-integration/bitsail-test-integration-doris/src/test/java/com/bytedance/bitsail/test/integration/doris/sink/DorisSinkITCase.java
similarity index 85%
rename from bitsail-test/bitsail-test-integration/bitsail-test-integration-doris/src/test/java/com/bytedance/bitsail/test/integration/doris/DorisSinkITCase.java
rename to bitsail-test/bitsail-test-integration/bitsail-test-integration-doris/src/test/java/com/bytedance/bitsail/test/integration/doris/sink/DorisSinkITCase.java
index 05c841ca0..6ca4b2650 100644
--- a/bitsail-test/bitsail-test-integration/bitsail-test-integration-doris/src/test/java/com/bytedance/bitsail/test/integration/doris/DorisSinkITCase.java
+++ b/bitsail-test/bitsail-test-integration/bitsail-test-integration-doris/src/test/java/com/bytedance/bitsail/test/integration/doris/sink/DorisSinkITCase.java
@@ -14,7 +14,7 @@
* limitations under the License.
*/
-package com.bytedance.bitsail.test.integration.doris;
+package com.bytedance.bitsail.test.integration.doris.sink;
import com.bytedance.bitsail.common.configuration.BitSailConfiguration;
import com.bytedance.bitsail.common.option.CommonOptions;
@@ -48,12 +48,12 @@ public void test() throws Exception {
* Below codes are just example.
*/
public void addDorisInfo(BitSailConfiguration jobConf) {
- jobConf.set(DorisWriterOptions.FE_HOSTS, "127.0.0.1:1234");
- jobConf.set(DorisWriterOptions.MYSQL_HOSTS, "127.0.0.1:4321");
- jobConf.set(DorisWriterOptions.USER, "test_user");
- jobConf.set(DorisWriterOptions.PASSWORD, "password");
- jobConf.set(DorisWriterOptions.DB_NAME, "test_db");
- jobConf.set(DorisWriterOptions.TABLE_NAME, "test_table");
+ jobConf.set(DorisWriterOptions.FE_HOSTS, "127.0.0.1:8030");
+ jobConf.set(DorisWriterOptions.MYSQL_HOSTS, "127.0.0.1:9030");
+ jobConf.set(DorisWriterOptions.USER, "root");
+ jobConf.set(DorisWriterOptions.PASSWORD, "");
+ jobConf.set(DorisWriterOptions.DB_NAME, "test");
+ jobConf.set(DorisWriterOptions.TABLE_NAME, "test_bitsails");
jobConf.set(CommonOptions.CheckPointOptions.CHECKPOINT_ENABLE, true);
jobConf.set(CommonOptions.CheckPointOptions.CHECKPOINT_INTERVAL, 5000L);
jobConf.set(DorisWriterOptions.SINK_ENABLE_2PC, false);
diff --git a/bitsail-test/bitsail-test-integration/bitsail-test-integration-doris/src/test/java/com/bytedance/bitsail/test/integration/doris/source/DorisSourceITCase.java b/bitsail-test/bitsail-test-integration/bitsail-test-integration-doris/src/test/java/com/bytedance/bitsail/test/integration/doris/source/DorisSourceITCase.java
new file mode 100644
index 000000000..3f6f0d716
--- /dev/null
+++ b/bitsail-test/bitsail-test-integration/bitsail-test-integration-doris/src/test/java/com/bytedance/bitsail/test/integration/doris/source/DorisSourceITCase.java
@@ -0,0 +1,53 @@
+/*
+ * Copyright 2022 Bytedance Ltd. and/or its affiliates.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.bytedance.bitsail.test.integration.doris.source;
+
+import com.bytedance.bitsail.common.configuration.BitSailConfiguration;
+import com.bytedance.bitsail.connector.doris.option.DorisReaderOptions;
+import com.bytedance.bitsail.test.integration.AbstractIntegrationTest;
+import com.bytedance.bitsail.test.integration.utils.JobConfUtils;
+
+import org.junit.Ignore;
+import org.junit.Test;
+
+@Ignore
+public class DorisSourceITCase extends AbstractIntegrationTest {
+
+ @Test
+ public void test() throws Exception {
+ BitSailConfiguration jobConf = JobConfUtils.fromClasspath("doris_to_print.json");
+ addDorisInfo(jobConf);
+ submitJob(jobConf);
+ }
+
+ /**
+ * Add your doris setting to job configuration.
+ * Below codes are just example.
+ *
+ * select id, bigint_type, string_type, double_type from doris_table where id = 1
+ */
+ public void addDorisInfo(BitSailConfiguration jobConf) {
+ jobConf.set(DorisReaderOptions.FE_HOSTS, "127.0.0.1:8030");
+ jobConf.set(DorisReaderOptions.MYSQL_HOSTS, "127.0.0.1:9030");
+ jobConf.set(DorisReaderOptions.USER, "root");
+ jobConf.set(DorisReaderOptions.PASSWORD, "");
+ jobConf.set(DorisReaderOptions.DB_NAME, "test");
+ jobConf.set(DorisReaderOptions.TABLE_NAME, "test_bitsail");
+// jobConf.set(DorisReaderOptions.SQL_FILTER, "id=1");
+ }
+
+}
diff --git a/bitsail-test/bitsail-test-integration/bitsail-test-integration-doris/src/test/resources/doris_to_print.json b/bitsail-test/bitsail-test-integration/bitsail-test-integration-doris/src/test/resources/doris_to_print.json
new file mode 100644
index 000000000..113ca2fc1
--- /dev/null
+++ b/bitsail-test/bitsail-test-integration/bitsail-test-integration-doris/src/test/resources/doris_to_print.json
@@ -0,0 +1,40 @@
+{
+ "job": {
+ "common": {
+ "job_id": -31198,
+ "instance_id": -31198,
+ "job_name": "bitsail_doris_to_print_test",
+ "user_name": "test"
+ },
+ "reader": {
+ "class": "com.bytedance.bitsail.connector.doris.source.DorisSource",
+ "fe_hosts": "127.0.0.1:8030",
+ "mysql_hosts": "127.0.0.1:9030",
+ "user": "root",
+ "password": "",
+ "db_name": "test",
+ "table_name": "doris_table",
+ "columns": [
+ {
+ "name": "id",
+ "type": "int"
+ },
+ {
+ "name": "bigint_type",
+ "type": "bigint"
+ },
+ {
+ "name": "string_type",
+ "type": "string"
+ },
+ {
+ "name": "double_type",
+ "type": "double"
+ }
+ ]
+ },
+ "writer": {
+ "class": "com.bytedance.bitsail.connector.legacy.print.sink.PrintSink"
+ }
+ }
+}
\ No newline at end of file
diff --git a/website/en/documents/connectors/doris/doris-example.md b/website/en/documents/connectors/doris/doris-example.md
index 0f1e42de3..05be8a790 100644
--- a/website/en/documents/connectors/doris/doris-example.md
+++ b/website/en/documents/connectors/doris/doris-example.md
@@ -42,6 +42,41 @@ PROPERTIES
)
```
+## Doris Reader
+Assuming there is a test doris cluster, then we can use the following configuration to read `test_doris_table` table.
+```json
+{
+ "job": {
+ "reader": {
+ "class": "com.bytedance.bitsail.connector.doris.source.DorisSource",
+ "fe_hosts": "127.0.0.1:1234",
+ "mysql_hosts": "127.0.0.1:4321",
+ "user": "test_user",
+ "password": "1234567",
+ "db_name": "test_db",
+ "table_name": "test_doris_table",
+ "columns": [
+ {
+ "name": "id",
+ "type": "bigint"
+ },
+ {
+ "name": "bigint_type",
+ "type": "bigint"
+ },
+ {
+ "name": "string_type",
+ "type": "string"
+ },
+ {
+ "name": "double_type",
+ "type": "double"
+ }
+ ]
+ }
+ }
+}
+```
## Doris writer
diff --git a/website/en/documents/connectors/doris/doris.md b/website/en/documents/connectors/doris/doris.md
index 324a4e511..2da6a795b 100644
--- a/website/en/documents/connectors/doris/doris.md
+++ b/website/en/documents/connectors/doris/doris.md
@@ -17,6 +17,67 @@ Parent document: [Connectors](../README.md)
```
+## Doris Reader
+### Supported data types
+Doris read connector parses according to the data segment mapping and supports the following data types:
+
+- CHAR
+- VARCHAR
+- BOOLEAN
+- BINARY
+- VARBINARY
+- INT
+- TINYINT
+- SMALLINT
+- INTEGER
+- BIGINT
+- FLOAT
+- DOUBLE
+
+### Parameters
+The following mentioned parameters should be added to `job.reader` block when using, for example:
+```json
+{
+ "job": {
+ "writer": {
+ "class": "com.bytedance.bitsail.connector.doris.source.DorisSource",
+ "fe_hosts": "127.0.0.1:8030",
+ "mysql_hosts": "127.0.0.1:9030",
+ "user": "root",
+ "password": "",
+ "db_name": "test",
+ "table_name": "test_doris_table"
+ }
+ }
+}
+```
+
+#### Necessary parameters
+| Param name | Required | Default Value | Description |
+|:----------------|:---------|:--------------|:------------------------------------------------------------------------------------|
+| class | yes | -- | Doris writer class name, `com.bytedance.bitsail.connector.doris.source.DorisSource` |
+| fe_hosts | yes | -- | Doris FE address, multi addresses separated by comma |
+| mysql_hosts | yes | -- | Doris jdbc query address , multi addresses separated by comma |
+| user | yes | -- | Doris account user |
+| password | yes | -- | Doris account password, can be empty |
+| db_name | yes | -- | database to read |
+| table_name | yes | -- | table to read |
+| columns | yes | -- | The name and type of columns to read |
+
+#### Optional parameters
+| Param name | Required | Default Value | Description |
+|:------------------------------|:---------|:------------------|:----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| reader_parallelism_num | no | 1 | reader parallelism |
+| sql_filter | no | -- | Column value conditions that need to be queried and filtered |
+| tablet_size | no | Integer.MAX_VALUE | The number of Doris Tablets corresponding to an Partition. The smaller this value is set, the more partitions will be generated. This will increase the parallelism on the bitSail side, but at the same time will cause greater pressure on Doris. |
+| exec_mem_limit | no | 2147483648 | Memory limit for a single query. The default is 2GB, in bytes. |
+| request_query_timeout_s | no | 3600 | Query the timeout time of doris, the default is 1 hour, -1 means no timeout limit |
+| request_batch_size | no | 1024 | The maximum number of rows to read data from BE at one time. Increasing this value can reduce the number of connections between bitSail and Doris. Thereby reducing the extra time overhead caused by network delay. |
+| request_connect_timeouts | no | 30 * 1000 | Connection timeout for sending requests to Doris |
+| request_read_timeouts | no | 30 * 1000 | Read timeout for sending request to Doris |
+| request_retries | no | 3 | Number of retries to send requests to Doris |
+
+
## Doris Writer
### Supported data type
diff --git a/website/en/documents/introduce.md b/website/en/documents/introduce.md
index c289fafe6..9543cb766 100644
--- a/website/en/documents/introduce.md
+++ b/website/en/documents/introduce.md
@@ -120,7 +120,7 @@ In the Runtime layer, it supports multiple execution modes, such as yarn, local,
Doris |
- |
- |
+ ✅ |
✅ |
diff --git a/website/zh/documents/connectors/doris/doris-example.md b/website/zh/documents/connectors/doris/doris-example.md
index b4b091263..6bef28ef1 100644
--- a/website/zh/documents/connectors/doris/doris-example.md
+++ b/website/zh/documents/connectors/doris/doris-example.md
@@ -42,6 +42,41 @@ PROPERTIES
)
```
+## Doris读连接器
+假设当前有一个测试doris集群,则可以通过如下配置读取`test_doris_table`表。
+```json
+{
+ "job": {
+ "reader": {
+ "class": "com.bytedance.bitsail.connector.doris.source.DorisSource",
+ "fe_hosts": "127.0.0.1:1234",
+ "mysql_hosts": "127.0.0.1:4321",
+ "user": "test_user",
+ "password": "1234567",
+ "db_name": "test_db",
+ "table_name": "test_doris_table",
+ "columns": [
+ {
+ "name": "id",
+ "type": "bigint"
+ },
+ {
+ "name": "bigint_type",
+ "type": "bigint"
+ },
+ {
+ "name": "string_type",
+ "type": "string"
+ },
+ {
+ "name": "double_type",
+ "type": "double"
+ }
+ ]
+ }
+ }
+}
+```
## Doris写连接器
diff --git a/website/zh/documents/connectors/doris/doris.md b/website/zh/documents/connectors/doris/doris.md
index 3aa9a1055..b7178a8c2 100644
--- a/website/zh/documents/connectors/doris/doris.md
+++ b/website/zh/documents/connectors/doris/doris.md
@@ -19,6 +19,67 @@
```
+## Doris读取
+### 支持的数据类型
+Doris读连接器根据字段映射进行解析,支持以下数据类型:
+
+- CHAR
+- VARCHAR
+- BOOLEAN
+- BINARY
+- VARBINARY
+- INT
+- TINYINT
+- SMALLINT
+- INTEGER
+- BIGINT
+- FLOAT
+- DOUBLE
+
+### 主要参数
+写连接器参数在`job.reader`中配置,实际使用时请注意路径前缀。示例:
+```json
+{
+ "job": {
+ "writer": {
+ "class": "com.bytedance.bitsail.connector.doris.source.DorisSource",
+ "fe_hosts": "127.0.0.1:8030",
+ "mysql_hosts": "127.0.0.1:9030",
+ "user": "root",
+ "password": "",
+ "db_name": "test",
+ "table_name": "test_doris_table"
+ }
+ }
+}
+```
+#### 必需参数
+| 参数名称 | 是否必填 | 默认值 | 参数含义 |
+|:------------|:-------|:----|:------------------------------------------------------------------------|
+| class | 是 | -- | Doris读连接器类型, `com.bytedance.bitsail.connector.doris.source.DorisSource` |
+| fe_hosts | 是 | -- | Doris FE地址, 多个地址用逗号分隔 |
+| mysql_hosts | 是 | -- | JDBC连接Doris的地址, 多个地址用逗号分隔 |
+| user | 是 | -- | Doris账户名 |
+| password | 是 | -- | Doris密码,可为空 |
+| db_name | 是 | -- | 要读取的Doris库 |
+| table_name | 是 | -- | 要读取的Doris表 |
+| columns | 是 | -- | 要读取的数据列的列名和类型 |
+
+#### 可选参数
+| 参数名称 | 是否必填 | 默认值 | 参数含义 |
+|:------------------------------|:-----|:------------------|:----------------------------------------------------------------------------------------------------|
+| reader_parallelism_num | 否 | 1 | 指定doris读并发 |
+| sql_filter | 否 | -- | 需要查询过滤的列值条件 |
+| tablet_size | 否 | Integer.MAX_VALUE | 一个 Partition 对应的 Doris Tablet 个数。 此数值设置越小,则会生成越多的 Partition。从而提升 bitSail 侧的并行度,但同时会对 Doris 造成更大的压力。 |
+| exec_mem_limit | 否 | 2147483648 | 单个查询的内存限制。默认为 2GB,单位为字节 |
+| request_query_timeout_s | 否 | 3600 | 查询 Doris 的超时时间,默认值为1小时,-1表示无超时限制 |
+| request_batch_size | 否 | 1024 | 一次从 Doris BE 读取数据的最大行数。增大此数值可减少 bitSail 与 Doris 之间建立连接的次数。 从而减轻网络延迟所带来的额外时间开销 |
+| request_connect_timeouts | 否 | 30 * 1000 | 向 Doris 发送请求的连接超时时间 |
+| request_read_timeouts | 否 | 30 * 1000 | 向 Doris 发送请求的读取超时时间 |
+| request_retries | 否 | 3 | 向 Doris 发送请求的重试次数 |
+
+
+
## Doris写入
### 支持的数据类型
diff --git a/website/zh/documents/introduce.md b/website/zh/documents/introduce.md
index 984ab82c5..887d73476 100644
--- a/website/zh/documents/introduce.md
+++ b/website/zh/documents/introduce.md
@@ -104,7 +104,7 @@ BitSail目前已被广泛使用,并支持数百万亿的大流量场景。同时
Doris |
- |
- |
+ ✅ |
✅ |
diff --git a/website/zh/documents/start/env_setup.md b/website/zh/documents/start/env_setup.md
index c187aabf4..41651e941 100644
--- a/website/zh/documents/start/env_setup.md
+++ b/website/zh/documents/start/env_setup.md
@@ -16,6 +16,7 @@ order: 2
- maven 3.6+
- [Docker desktop](https://www.docker.com/products/docker-desktop/)
+
在安装上述必需组件后,您可以在本地的IDE上直接运行已有的集成测试。
## 从源代码编译