File tree Expand file tree Collapse file tree
connectors/pipeline-connectors
get-started/quickstart-for-1.20
connectors/pipeline-connectors
get-started/quickstart-for-1.20 Expand file tree Collapse file tree Original file line number Diff line number Diff line change @@ -30,6 +30,7 @@ Fluss Pipeline 连接器可用作 Pipeline 的 *Data Sink*,将数据写入 [Fl
3030## What can the connector do?
3131* 自动创建不存在的表
3232* 数据同步
33+ * Schema 变更同步(lenient 模式)
3334
3435How to create Pipeline
3536----------------
6061pipeline :
6162 name : MySQL to Fluss Pipeline
6263 parallelism : 2
64+ schema.change.behavior : LENIENT
6365` ` `
6466
6567Pipeline Connector Options
@@ -140,7 +142,13 @@ Pipeline Connector Options
140142 * 桶数量由 ` bucket.num` 选项控制
141143 * 数据分布由 `bucket.key` 选项控制。对于主键表,若未指定分桶键,则分桶键默认为主键(不含分区键);对于无主键的日志表,若未指定分桶键,则数据将随机分配到各个桶中。
142144
143- * 不支持 schema 变更同步。如果需要忽略 schema 变更,可使用 `schema.change.behavior: IGNORE`。
145+ * 支持在 `lenient` 模式下进行 Schema 变更同步,通过 `schema.change.behavior: lenient` 配置。支持以下 Schema 变更事件:
146+ * **新增列** — 新列会追加到 Fluss 表中。
147+ * **删除列** — 在 lenient 模式下不会真正删除列,而是忽略该删除操作,后续写入时将该列的值设为 null。
148+ * **重命名列** — 在 lenient 模式下,此操作会被转换为新增列 + 将旧列类型修改为可空的序列。
149+ * **修改列类型** — 不支持。
150+
151+ 要启用 Schema 变更同步,请在 pipeline 中配置 `schema.change.behavior : lenient`。如果想要忽略所有 Schema 变更,使用 `schema.change.behavior: IGNORE`。
144152
145153* 关于数据同步, Pipeline 连接器使用 [Fluss Java Client](https://fluss.apache.org/docs/apis/java-client/) 向 Fluss 写入数据.
146154
@@ -236,6 +244,21 @@ Data Type Mapping
236244 <td>BYTES</td>
237245 <td></td>
238246 </tr>
247+ <tr>
248+ <td>ARRAY</td>
249+ <td>ARRAY</td>
250+ <td>元素类型递归映射。</td>
251+ </tr>
252+ <tr>
253+ <td>MAP</td>
254+ <td>MAP</td>
255+ <td>键和值类型递归映射。</td>
256+ </tr>
257+ <tr>
258+ <td>ROW</td>
259+ <td>ROW</td>
260+ <td>字段类型递归映射。</td>
261+ </tr>
239262 </tbody>
240263</table>
241264</div>
Original file line number Diff line number Diff line change @@ -27,15 +27,14 @@ under the License.
2727# Postgres Connector
2828
2929Postgres CDC Pipeline 连接器允许从 Postgres 数据库读取快照数据和增量数据,并提供端到端的整库数据同步能力。 本文描述了如何设置 Postgres CDC Pipeline 连接器。
30- 注意:因为Postgres的wal log日志中展示没有办法解析表结构变更记录,因此Postgres CDC Pipeline Source暂时不支持同步表结构变更。
3130
3231## 示例
3332
3433从 Postgres 读取数据同步到 Fluss 的 Pipeline 可以定义如下:
3534
3635``` yaml
3736source :
38- type : posgtres
37+ type : postgres
3938 name : Postgres Source
4039 hostname : 127.0.0.1
4140 port : 5432
@@ -45,6 +44,7 @@ source:
4544 tables : adb.\.*.\.*
4645 decoding.plugin.name : pgoutput
4746 slot.name : pgtest
47+ schema-change.enabled : true
4848
4949sink :
5050 type : fluss
5959pipeline :
6060 name : Postgres to Fluss Pipeline
6161 parallelism : 4
62+ schema.change.behavior : lenient
6263` ` `
6364
6465## 连接器配置项
@@ -282,6 +283,17 @@ pipeline:
282283 默认值为 false。
283284 </td>
284285 </tr>
286+ <tr>
287+ <td>schema-change.enabled</td>
288+ <td>optional</td>
289+ <td style="word-wrap: break-word;">false</td>
290+ <td>Boolean</td>
291+ <td>
292+ 是否开启 Postgres 源的 Schema 变更推导。开启后,连接器会通过对比 pgoutput Relation 消息与缓存的 Schema 来推导 Schema 变更事件(新增列、删除列、重命名列、修改列类型)。<br>
293+ 需要将 <code>decoding.plugin.name</code> 设置为 <code>pgoutput</code>。<br>
294+ 默认值为 false。
295+ </td>
296+ </tr>
285297 </tbody>
286298</table>
287299</div>
You can’t perform that action at this time.
0 commit comments