Skip to content

Commit 6af9ed0

Browse files
authored
[Feature] [Connectors-v2] Add Doris sink redirect enhancement (#10715)
1 parent 5111281 commit 6af9ed0

21 files changed

Lines changed: 1793 additions & 90 deletions

File tree

docs/en/connectors/sink/Doris.md

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@ The internal implementation of Doris sink connector is cached and imported by st
4242
| Name | Type | Required | Default | Description |
4343
|--------------------------------|---------|----------|------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
4444
| fenodes | String | Yes | - | `Doris` cluster fenodes address, the format is `"fe_ip:fe_http_port, ..."` |
45+
| benodes | String | No | - | `Doris` BE http address list used when `direct_to_be=true`, the format is `"be_ip:be_http_port, ..."` |
46+
| direct_to_be | bool | No | false | Whether to send stream load write requests directly to `benodes`. This is an opt-in mode and does not change the default FE path. |
4547
| query-port | int | No | 9030 | `Doris` Fenodes query_port |
4648
| username | String | Yes | - | `Doris` user username |
4749
| password | String | Yes | - | `Doris` user password |
@@ -64,6 +66,19 @@ The internal implementation of Doris sink connector is cached and imported by st
6466
| custom_sql | String | no | - | When data_save_mode selects CUSTOM_PROCESSING, you should fill in the CUSTOM_SQL parameter. This parameter usually fills in a SQL that can be executed. SQL will be executed before synchronization tasks. |
6567
| doris.config | map | yes | - | This option is used to support operations such as `insert`, `delete`, and `update` when automatically generate sql,and supported formats. |
6668

69+
## Redirect Behavior
70+
71+
By default, Doris sink sends Stream Load requests to the FE nodes configured by `fenodes`.
72+
73+
When `direct_to_be=true`, SeaTunnel uses `benodes` for the Stream Load data write path.
74+
75+
If `sink.enable-2pc=true` at the same time:
76+
77+
- pre-commit data write requests use `benodes`
78+
- 2PC commit/abort control requests still use `fenodes`
79+
80+
This mixed path keeps the default FE control path while allowing the data path to bypass unstable FE redirect scenarios.
81+
6782
### schema_save_mode [Enum]
6883

6984
Before the synchronous task is turned on, different treatment schemes are selected for the existing surface structure of the target side.
@@ -175,6 +190,16 @@ This is because the total amount of data arriving at the end may not exceed the
175190

176191
Otherwise, if you enable the 2pc by the property `sink.enable-2pc=true`.The `sink.buffer-size` will have no effect. So only the checkpoint can trigger the commit.
177192

193+
## Troubleshooting 307 Temporary Redirect
194+
195+
If the job fails with `HTTP/1.1 307 Temporary Redirect`, check the following items first:
196+
197+
1. Whether the SeaTunnel worker can reach the redirected Doris BE address
198+
2. Whether Doris FE is under heavy load, timeout, or Full GC pressure
199+
3. Whether a proxy, SLB, ingress, or gateway rewrites or blocks the redirect path
200+
201+
If your environment already has reachable Doris BE HTTP addresses, you can configure `benodes` and set `direct_to_be=true` to bypass FE redirect on the data write path.
202+
178203
## Task Example
179204

180205
### Simple
@@ -235,6 +260,49 @@ sink {
235260
}
236261
```
237262

263+
### Direct To BE
264+
265+
```hocon
266+
sink {
267+
Doris {
268+
fenodes = "fe1:8030,fe2:8030"
269+
benodes = "be1:8040,be2:8040"
270+
direct_to_be = true
271+
username = root
272+
password = ""
273+
database = "test"
274+
table = "e2e_table_sink"
275+
sink.label-prefix = "test-direct-be"
276+
doris.config {
277+
format = "json"
278+
read_json_by_line = "true"
279+
}
280+
}
281+
}
282+
```
283+
284+
### Direct To BE With 2PC
285+
286+
```hocon
287+
sink {
288+
Doris {
289+
fenodes = "fe1:8030,fe2:8030"
290+
benodes = "be1:8040,be2:8040"
291+
direct_to_be = true
292+
username = root
293+
password = ""
294+
database = "test"
295+
table = "e2e_table_sink"
296+
sink.label-prefix = "test-direct-be-2pc"
297+
sink.enable-2pc = true
298+
doris.config {
299+
format = "json"
300+
read_json_by_line = "true"
301+
}
302+
}
303+
}
304+
```
305+
238306
### CDC(Change Data Capture) Event
239307

240308
> This example defines a SeaTunnel synchronization task that automatically generates data through FakeSource and sends it to Doris Sink,FakeSource simulates CDC data with schema, score (int type),Doris needs to create a table sink named test.e2e_table_sink and a corresponding table for it.

docs/zh/connectors/sink/Doris.md

Lines changed: 69 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@ Doris Sink连接器的内部实现是通过stream load批量缓存和导入的
4141
| Name | Type | Required | Default | Description |
4242
|--------------------------------|---------|----------|------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------|
4343
| fenodes | String | Yes | - | `Doris` 集群 fenodes 地址, 格式是 `"fe_ip:fe_http_port, ..."` |
44+
| benodes | String | No | - |`direct_to_be=true` 时使用的 `Doris` BE HTTP 地址列表,格式是 `"be_ip:be_http_port, ..."` |
45+
| direct_to_be | bool | No | false | 是否将 Stream Load 数据写入请求直接发送到 `benodes`。这是显式启用能力,不会改变默认的 FE 路径。 |
4446
| query-port | int | No | 9030 | `Doris` Fenodes mysql协议查询端口 |
4547
| username | String | Yes | - | `Doris` 用户名 |
4648
| password | String | Yes | - | `Doris` 密码 |
@@ -63,6 +65,19 @@ Doris Sink连接器的内部实现是通过stream load批量缓存和导入的
6365
| custom_sql | String | no | - | 当data_save_mode选择CUSTOM_PROCESSING时,需要填写CUSTOM_SQL参数。 该参数通常填写一条可以执行的SQL。 SQL将在同步任务之前执行。 |
6466
| doris.config | map | yes | - | 该选项用于支持自动生成sql时的insert、delete、update等操作,以及支持的格式。 |
6567

68+
## Redirect 行为说明
69+
70+
默认情况下,Doris sink 会先将 Stream Load 请求发送到 `fenodes` 指定的 FE 节点。
71+
72+
`direct_to_be=true` 时,SeaTunnel 会改为使用 `benodes` 作为 Stream Load 数据写入路径。
73+
74+
如果同时启用了 `sink.enable-2pc=true`
75+
76+
- pre-commit 阶段的数据写入请求走 `benodes`
77+
- 2PC 的 commit/abort 控制请求仍然走 `fenodes`
78+
79+
这种“数据面走 BE、控制面走 FE”的混合路径可以在保持 FE 控制路径的同时,绕过不稳定的 FE redirect 场景。
80+
6681
### schema_save_mode [Enum]
6782

6883
在开启同步任务之前,针对现有的表结构选择不同的处理方案。
@@ -172,6 +187,16 @@ CREATE TABLE IF NOT EXISTS `${database}`.`${table_name}`
172187

173188
此外,如果你通过`sink.enable-2pc=true`属性启用2pc。`sink.buffer-size`将会失去作用,只有检查点才能触发提交。
174189

190+
## `307 Temporary Redirect` 排查
191+
192+
如果任务报错 `HTTP/1.1 307 Temporary Redirect`,建议优先检查以下项目:
193+
194+
1. SeaTunnel worker 是否能够访问 Doris redirect 返回的目标 BE 地址
195+
2. Doris FE 是否存在高负载、超时或 Full GC
196+
3. 是否有代理、SLB、ingress 或网关改写、拦截了 redirect 路径
197+
198+
如果你所在的环境已经有可访问的 Doris BE HTTP 地址,可以通过配置 `benodes` 并设置 `direct_to_be=true`,绕过 FE redirect 的数据写入路径。
199+
175200
## 任务示例
176201

177202
### 简单示例
@@ -232,6 +257,49 @@ sink {
232257
}
233258
```
234259

260+
### 直连 BE
261+
262+
```hocon
263+
sink {
264+
Doris {
265+
fenodes = "fe1:8030,fe2:8030"
266+
benodes = "be1:8040,be2:8040"
267+
direct_to_be = true
268+
username = root
269+
password = ""
270+
database = "test"
271+
table = "e2e_table_sink"
272+
sink.label-prefix = "test-direct-be"
273+
doris.config {
274+
format = "json"
275+
read_json_by_line = "true"
276+
}
277+
}
278+
}
279+
```
280+
281+
### 直连 BE + 2PC
282+
283+
```hocon
284+
sink {
285+
Doris {
286+
fenodes = "fe1:8030,fe2:8030"
287+
benodes = "be1:8040,be2:8040"
288+
direct_to_be = true
289+
username = root
290+
password = ""
291+
database = "test"
292+
table = "e2e_table_sink"
293+
sink.label-prefix = "test-direct-be-2pc"
294+
sink.enable-2pc = true
295+
doris.config {
296+
format = "json"
297+
read_json_by_line = "true"
298+
}
299+
}
300+
}
301+
```
302+
235303
### CDC(监听数据变更捕获)事件
236304

237305
> 本示例定义了一个SeaTunnel同步任务,通过FakeSource自动生成数据并发送给Doris Sink,FakeSource使用schema、score(int类型)模拟CDC数据,Doris需要创建一个名为test.e2e_table_sink的sink任务及其对应的表 。
@@ -370,4 +438,4 @@ sink {
370438

371439
## 变更日志
372440

373-
<ChangeLog />
441+
<ChangeLog />

seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisSinkConfig.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,16 @@
1818
package org.apache.seatunnel.connectors.doris.config;
1919

2020
import org.apache.seatunnel.shade.com.typesafe.config.Config;
21+
import org.apache.seatunnel.shade.org.apache.commons.lang3.StringUtils;
2122

23+
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
2224
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
25+
import org.apache.seatunnel.connectors.doris.exception.DorisConnectorException;
2326

2427
import lombok.Getter;
2528
import lombok.Setter;
2629
import lombok.ToString;
30+
import lombok.extern.slf4j.Slf4j;
2731

2832
import java.io.Serializable;
2933
import java.util.Map;
@@ -36,7 +40,9 @@
3640
import static org.apache.seatunnel.connectors.doris.config.DorisBaseOptions.QUERY_PORT;
3741
import static org.apache.seatunnel.connectors.doris.config.DorisBaseOptions.TABLE;
3842
import static org.apache.seatunnel.connectors.doris.config.DorisBaseOptions.USERNAME;
43+
import static org.apache.seatunnel.connectors.doris.config.DorisSinkOptions.BENODES;
3944
import static org.apache.seatunnel.connectors.doris.config.DorisSinkOptions.CASE_SENSITIVE;
45+
import static org.apache.seatunnel.connectors.doris.config.DorisSinkOptions.DIRECT_TO_BE;
4046
import static org.apache.seatunnel.connectors.doris.config.DorisSinkOptions.DORIS_SINK_CONFIG_PREFIX;
4147
import static org.apache.seatunnel.connectors.doris.config.DorisSinkOptions.NEEDS_UNSUPPORTED_TYPE_CASTING;
4248
import static org.apache.seatunnel.connectors.doris.config.DorisSinkOptions.SAVE_MODE_CREATE_TEMPLATE;
@@ -48,13 +54,15 @@
4854
import static org.apache.seatunnel.connectors.doris.config.DorisSinkOptions.SINK_LABEL_PREFIX;
4955
import static org.apache.seatunnel.connectors.doris.config.DorisSinkOptions.SINK_MAX_RETRIES;
5056

57+
@Slf4j
5158
@Setter
5259
@Getter
5360
@ToString
5461
public class DorisSinkConfig implements Serializable {
5562

5663
// common option
5764
private String frontends;
65+
private String backends;
5866
private String database;
5967
private String table;
6068
private String username;
@@ -71,6 +79,7 @@ public class DorisSinkConfig implements Serializable {
7179
private Integer bufferSize;
7280
private Integer bufferCount;
7381
private Properties streamLoadProps;
82+
private boolean directToBe;
7483
private boolean needsUnsupportedTypeCasting;
7584
private boolean caseSensitive;
7685

@@ -81,12 +90,18 @@ public static DorisSinkConfig of(Config pluginConfig) {
8190
return of(ReadonlyConfig.fromConfig(pluginConfig));
8291
}
8392

93+
public static void validate(ReadonlyConfig config) {
94+
validateDirectWriteOptions(
95+
config.get(DIRECT_TO_BE), config.getOptional(BENODES).orElse(null), false);
96+
}
97+
8498
public static DorisSinkConfig of(ReadonlyConfig config) {
8599

86100
DorisSinkConfig dorisSinkConfig = new DorisSinkConfig();
87101

88102
// common option
89103
dorisSinkConfig.setFrontends(config.get(FENODES));
104+
dorisSinkConfig.setBackends(config.getOptional(BENODES).orElse(null));
90105
dorisSinkConfig.setUsername(config.get(USERNAME));
91106
dorisSinkConfig.setPassword(config.get(PASSWORD));
92107
dorisSinkConfig.setQueryPort(config.get(QUERY_PORT));
@@ -103,10 +118,13 @@ public static DorisSinkConfig of(ReadonlyConfig config) {
103118
dorisSinkConfig.setBufferSize(config.get(SINK_BUFFER_SIZE));
104119
dorisSinkConfig.setBufferCount(config.get(SINK_BUFFER_COUNT));
105120
dorisSinkConfig.setEnableDelete(config.get(SINK_ENABLE_DELETE));
121+
dorisSinkConfig.setDirectToBe(config.get(DIRECT_TO_BE));
106122
dorisSinkConfig.setNeedsUnsupportedTypeCasting(config.get(NEEDS_UNSUPPORTED_TYPE_CASTING));
107123
dorisSinkConfig.setCaseSensitive(config.get(CASE_SENSITIVE));
108124
// create table option
109125
dorisSinkConfig.setCreateTableTemplate(config.get(SAVE_MODE_CREATE_TEMPLATE));
126+
validateDirectWriteOptions(
127+
dorisSinkConfig.isDirectToBe(), dorisSinkConfig.getBackends(), true);
110128

111129
return dorisSinkConfig;
112130
}
@@ -122,4 +140,16 @@ private static Properties parseStreamLoadProperties(ReadonlyConfig config) {
122140
}
123141
return streamLoadProps;
124142
}
143+
144+
private static void validateDirectWriteOptions(
145+
boolean directToBe, String backends, boolean logInactiveBenodes) {
146+
if (directToBe && StringUtils.isBlank(backends)) {
147+
throw new DorisConnectorException(
148+
SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
149+
"PluginName: Doris, Message: Option 'benodes' must be configured when 'direct_to_be=true'.");
150+
}
151+
if (logInactiveBenodes && !directToBe && StringUtils.isNotBlank(backends)) {
152+
log.info("Option 'benodes' is configured but inactive because 'direct_to_be=false'.");
153+
}
154+
}
125155
}

seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisSinkOptions.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,20 @@ public class DorisSinkOptions extends DorisBaseOptions {
7171
.defaultValue(false)
7272
.withDescription("whether to enable the delete function");
7373

74+
public static final Option<String> BENODES =
75+
Options.key("benodes")
76+
.stringType()
77+
.noDefaultValue()
78+
.withDescription(
79+
"doris be http address list used when direct_to_be is enabled.");
80+
81+
public static final Option<Boolean> DIRECT_TO_BE =
82+
Options.key("direct_to_be")
83+
.booleanType()
84+
.defaultValue(false)
85+
.withDescription(
86+
"whether to send stream load write requests directly to Doris BE.");
87+
7488
public static final Option<Map<String, String>> DORIS_SINK_CONFIG_PREFIX =
7589
Options.key("doris.config")
7690
.mapType()

seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSinkFactory.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
3131
import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
3232
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
33+
import org.apache.seatunnel.connectors.doris.config.DorisSinkConfig;
3334
import org.apache.seatunnel.connectors.doris.config.DorisSinkOptions;
3435
import org.apache.seatunnel.connectors.doris.sink.committer.DorisCommitInfo;
3536
import org.apache.seatunnel.connectors.doris.sink.writer.DorisSinkState;
@@ -67,6 +68,8 @@ public OptionRule optionRule() {
6768
DorisSinkOptions.DATABASE,
6869
DorisSinkOptions.TABLE,
6970
DorisSinkOptions.TABLE_IDENTIFIER,
71+
DorisSinkOptions.BENODES,
72+
DorisSinkOptions.DIRECT_TO_BE,
7073
DorisSinkOptions.QUERY_PORT,
7174
DorisSinkOptions.DORIS_BATCH_SIZE,
7275
DorisSinkOptions.SINK_ENABLE_2PC,
@@ -83,6 +86,7 @@ public OptionRule optionRule() {
8386
DorisSinkOptions.DATA_SAVE_MODE,
8487
DataSaveMode.CUSTOM_PROCESSING,
8588
DorisSinkOptions.CUSTOM_SQL)
89+
.conditional(DorisSinkOptions.DIRECT_TO_BE, true, DorisSinkOptions.BENODES)
8690
.build();
8791
}
8892

@@ -95,6 +99,7 @@ public List<String> excludeTablePlaceholderReplaceKeys() {
9599
public TableSink<SeaTunnelRow, DorisSinkState, DorisCommitInfo, DorisCommitInfo> createSink(
96100
TableSinkFactoryContext context) {
97101
ReadonlyConfig config = context.getOptions();
102+
DorisSinkConfig.validate(config);
98103
CatalogTable catalogTable =
99104
config.get(NEEDS_UNSUPPORTED_TYPE_CASTING)
100105
? UnsupportedTypeConverterUtils.convertCatalogTable(

0 commit comments

Comments
 (0)