Skip to content

Commit 3f4dd56

Browse files
authored
[Doc][Improve] support chinese [docs/zh/connector-v2/sink/RocketMQ.md] (#8834)
1 parent 4b5261c commit 3f4dd56

File tree

1 file changed

+202
-0
lines changed

1 file changed

+202
-0
lines changed

Diff for: docs/zh/connector-v2/sink/RocketMQ.md

+202
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,202 @@
1+
# RocketMQ
2+
3+
> RocketMQ sink 连接器
4+
5+
## 支持Apache RocketMQ版本
6+
7+
- 4.9.0 (或更新版本,供参考)
8+
9+
## 支持这些引擎
10+
11+
> Spark<br/>
12+
> Flink<br/>
13+
> SeaTunnel Zeta<br/>
14+
15+
## 主要特性
16+
17+
- [x] [精确一次](../../concept/connector-v2-features.md)
18+
19+
默认情况下,我们将使用2pc来保证消息精确一次到RocketMQ。
20+
21+
## 描述
22+
23+
将数据行写入Apache RocketMQ主题
24+
25+
## Sink 参数
26+
27+
| 名称 | 类型 | 是否必填 | 默认值 | 描述 |
28+
|----------------------|---------|----------|--------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------|
29+
| topic | string || - | `RocketMQ topic` 名称. |
30+
| name.srv.addr | string || - | `RocketMQ`名称服务器集群地址。 |
31+
| acl.enabled | Boolean || false | false |
32+
| access.key | String || | 当ACL_ENABLED为true时,access key不能为空。 |
33+
| secret.key | String || | 当ACL_ENABLED为true时, secret key 不能为空。 |
34+
| producer.group | String || SeaTunnel-producer-Group | SeaTunnel-producer-Group |
35+
| tag | String || - | `RocketMQ`消息标签。 |
36+
| partition.key.fields | array || - | - |
37+
| format | String || json | 数据格式。默认格式为json。可选text格式。默认字段分隔符为“,”。如果自定义分隔符,请添加“field_delimiter”选项。 |
38+
| field.delimiter | String || , | 自定义数据格式的字段分隔符。 |
39+
| producer.send.sync | Boolean || false | 如果为 true, 则消息将同步发送。 |
40+
| common-options | config || - | Sink插件常用参数,请参考[sink common options](../Sink-common-Options.md)了解详细信息。 |
41+
42+
### partition.key.fields [array]
43+
44+
配置哪些字段用作RocketMQ消息的键。
45+
例如,如果要使用上游数据中的字段值作为键,可以为此属性指定字段名。
46+
上游数据如下:
47+
48+
| name | age | data |
49+
|------|-----|---------------|
50+
| Jack | 16 | data-example1 |
51+
| Mary | 23 | data-example2 |
52+
53+
如果name被设置为主键,那么name列的哈希值将决定消息被发送到哪个分区。
54+
55+
## 任务示例
56+
57+
### Fake 到 RocketMQ 简单示例
58+
59+
>数据是随机生成的,并异步发送到测试主题
60+
61+
```hocon
62+
env {
63+
parallelism = 1
64+
}
65+
66+
source {
67+
FakeSource {
68+
schema = {
69+
fields {
70+
c_map = "map<string, string>"
71+
c_array = "array<int>"
72+
c_string = string
73+
c_boolean = boolean
74+
c_tinyint = tinyint
75+
c_smallint = smallint
76+
c_int = int
77+
c_bigint = bigint
78+
c_float = float
79+
c_double = double
80+
c_decimal = "decimal(30, 8)"
81+
c_bytes = bytes
82+
c_date = date
83+
c_timestamp = timestamp
84+
}
85+
}
86+
}
87+
}
88+
89+
transform {
90+
#如果你想了解更多关于如何配置seatunnel的信息,并查看转换插件的完整列表,
91+
#请前往https://seatunnel.apache.org/docs/category/transform
92+
}
93+
94+
sink {
95+
Rocketmq {
96+
name.srv.addr = "localhost:9876"
97+
topic = "test_topic"
98+
}
99+
}
100+
101+
```
102+
103+
### Rocketmq 到 Rocketmq 简单示例
104+
105+
> 使用RocketMQ时,会向c_int字段写入哈希数,该哈希数表示写入不同分区的分区数量。这是默认的异步写入方式
106+
107+
```hocon
108+
env {
109+
parallelism = 1
110+
}
111+
112+
source {
113+
Rocketmq {
114+
name.srv.addr = "localhost:9876"
115+
topics = "test_topic"
116+
plugin_output = "rocketmq_table"
117+
schema = {
118+
fields {
119+
c_map = "map<string, string>"
120+
c_array = "array<int>"
121+
c_string = string
122+
c_boolean = boolean
123+
c_tinyint = tinyint
124+
c_smallint = smallint
125+
c_int = int
126+
c_bigint = bigint
127+
c_float = float
128+
c_double = double
129+
c_decimal = "decimal(30, 8)"
130+
c_bytes = bytes
131+
c_date = date
132+
c_timestamp = timestamp
133+
}
134+
}
135+
}
136+
}
137+
138+
sink {
139+
Rocketmq {
140+
name.srv.addr = "localhost:9876"
141+
topic = "test_topic_sink"
142+
partition.key.fields = ["c_int"]
143+
}
144+
}
145+
```
146+
147+
### 时间戳消费写入示例
148+
149+
>这是流消费中特定的时间戳消费,当添加新分区时,程序将定期刷新感知和消费,并写入另一个主题类型
150+
151+
```hocon
152+
153+
env {
154+
parallelism = 1
155+
job.mode = "STREAMING"
156+
}
157+
158+
source {
159+
Rocketmq {
160+
name.srv.addr = "localhost:9876"
161+
topics = "test_topic"
162+
plugin_output = "rocketmq_table"
163+
start.mode = "CONSUME_FROM_FIRST_OFFSET"
164+
batch.size = "400"
165+
consumer.group = "test_topic_group"
166+
format = "json"
167+
format = json
168+
schema = {
169+
fields {
170+
c_map = "map<string, string>"
171+
c_array = "array<int>"
172+
c_string = string
173+
c_boolean = boolean
174+
c_tinyint = tinyint
175+
c_smallint = smallint
176+
c_int = int
177+
c_bigint = bigint
178+
c_float = float
179+
c_double = double
180+
c_decimal = "decimal(30, 8)"
181+
c_bytes = bytes
182+
c_date = date
183+
c_timestamp = timestamp
184+
}
185+
}
186+
}
187+
}
188+
189+
transform {
190+
#如果你想了解更多关于如何配置seatunnel的信息,并查看转换插件的完整列表,
191+
#请前往https://seatunnel.apache.org/docs/category/transform
192+
}
193+
sink {
194+
Rocketmq {
195+
name.srv.addr = "localhost:9876"
196+
topic = "test_topic"
197+
partition.key.fields = ["c_int"]
198+
producer.send.sync = true
199+
}
200+
}
201+
```
202+

0 commit comments

Comments
 (0)