Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 24 additions & 1 deletion docs/content.zh/docs/connectors/pipeline-connectors/fluss.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ Fluss Pipeline 连接器可用作 Pipeline 的 *Data Sink*,将数据写入 [Fl
## What can the connector do?
* 自动创建不存在的表
* 数据同步
* Schema 变更同步(lenient 模式)

How to create Pipeline
----------------
Expand Down Expand Up @@ -60,6 +61,7 @@ sink:
pipeline:
name: MySQL to Fluss Pipeline
parallelism: 2
schema.change.behavior: LENIENT
```

Pipeline Connector Options
Expand Down Expand Up @@ -140,7 +142,13 @@ Pipeline Connector Options
* 桶数量由 `bucket.num` 选项控制
* 数据分布由 `bucket.key` 选项控制。对于主键表,若未指定分桶键,则分桶键默认为主键(不含分区键);对于无主键的日志表,若未指定分桶键,则数据将随机分配到各个桶中。

* 不支持 schema 变更同步。如果需要忽略 schema 变更,可使用 `schema.change.behavior: IGNORE`。
* 支持在 `lenient` 模式下进行 Schema 变更同步,通过 `schema.change.behavior: lenient` 配置。支持以下 Schema 变更事件:
* **新增列** — 新列会追加到 Fluss 表中。
* **删除列** — 在 lenient 模式下不会真正删除列,而是忽略该删除操作,后续写入时将该列的值设为 null。
* **重命名列** — 在 lenient 模式下,此操作会被转换为新增列 + 将旧列类型修改为可空的序列。
* **修改列类型** — 不支持。

要启用 Schema 变更同步,请在 pipeline 中配置 `schema.change.behavior: lenient`。如果想要忽略所有 Schema 变更,使用 `schema.change.behavior: IGNORE`。

* 关于数据同步, Pipeline 连接器使用 [Fluss Java Client](https://fluss.apache.org/docs/apis/java-client/) 向 Fluss 写入数据.

Expand Down Expand Up @@ -236,6 +244,21 @@ Data Type Mapping
<td>BYTES</td>
<td></td>
</tr>
<tr>
<td>ARRAY</td>
<td>ARRAY</td>
<td>元素类型递归映射。</td>
</tr>
<tr>
<td>MAP</td>
<td>MAP</td>
<td>键和值类型递归映射。</td>
</tr>
<tr>
<td>ROW</td>
<td>ROW</td>
<td>字段类型递归映射。</td>
</tr>
</tbody>
</table>
</div>
Expand Down
16 changes: 14 additions & 2 deletions docs/content.zh/docs/connectors/pipeline-connectors/postgres.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,14 @@ under the License.
# Postgres Connector

Postgres CDC Pipeline 连接器允许从 Postgres 数据库读取快照数据和增量数据,并提供端到端的整库数据同步能力。 本文描述了如何设置 Postgres CDC Pipeline 连接器。
注意:因为Postgres的wal log日志中展示没有办法解析表结构变更记录,因此Postgres CDC Pipeline Source暂时不支持同步表结构变更。

## 示例

从 Postgres 读取数据同步到 Fluss 的 Pipeline 可以定义如下:

```yaml
source:
type: posgtres
Comment thread
loserwang1024 marked this conversation as resolved.
type: postgres
name: Postgres Source
hostname: 127.0.0.1
port: 5432
Expand All @@ -45,6 +44,7 @@ source:
tables: adb.\.*.\.*
decoding.plugin.name: pgoutput
slot.name: pgtest
schema-change.enabled: true

sink:
type: fluss
Expand All @@ -59,6 +59,7 @@ sink:
pipeline:
name: Postgres to Fluss Pipeline
parallelism: 4
schema.change.behavior: lenient
```

## 连接器配置项
Expand Down Expand Up @@ -282,6 +283,17 @@ pipeline:
默认值为 false。
</td>
</tr>
<tr>
<td>schema-change.enabled</td>
<td>optional</td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>
是否开启 Postgres 源的 Schema 变更推导。开启后,连接器会通过对比 pgoutput Relation 消息与缓存的 Schema 来推导 Schema 变更事件(新增列、删除列、重命名列、修改列类型)。<br>
需要将 <code>decoding.plugin.name</code> 设置为 <code>pgoutput</code>。<br>
默认值为 false。
</td>
</tr>
</tbody>
</table>
</div>
Expand Down
Loading