Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/flink_cdc_base.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ on:

jobs:
test:
runs-on: ubuntu-latest
runs-on: ubuntu-22.04
timeout-minutes: 120
strategy:
max-parallel: 20
Expand Down
34 changes: 23 additions & 11 deletions .github/workflows/label.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,26 +16,38 @@
# This workflow will triage pull requests and apply a label based on the
# paths that are modified in the pull request.
#
# To use this workflow, you will need to set up a .github/label.yml
# To use this workflow, you will need to set up a .github/labeler.yml
# file with configuration. For more information, see:
# https://github.com/actions/labeler

name: Labeler

on:
pull_request:
branches:
- master
- release-*
workflow_run:
workflows: [Labeler-Trigger]
types: [requested]

permissions:
checks: write
contents: read
pull-requests: write

jobs:
label:

runs-on: ubuntu-latest
permissions:
contents: read
pull-requests: write

steps:
- name: "Checkout ${{ github.ref }} ( ${{ github.sha }} )"
uses: actions/checkout@v6
with:
persist-credentials: false
submodules: recursive
- name: "Get information about the original trigger of the run"
uses: ./.github/actions/get-workflow-origin
id: source-run-info
with:
token: ${{ secrets.GITHUB_TOKEN }}
sourceRunId: ${{ github.event.workflow_run.id }}
- uses: actions/labeler@v6
with:
repo-token: "${{ secrets.GITHUB_TOKEN }}"
repo-token: "${{ secrets.GITHUB_TOKEN }}"
pr-number: ${{ steps.source-run-info.outputs.pullRequestNumber }}
28 changes: 28 additions & 0 deletions .github/workflows/label_trigger.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

name: Labeler-Trigger
on:
pull_request:
branches:
- master
- release-*

jobs:
label:
runs-on: ubuntu-latest
steps:
- name: "Do nothing. Just triggers corresponding workflow."
run: echo
57 changes: 57 additions & 0 deletions docs/content.zh/docs/connectors/flink-sources/mongodb-cdc.md
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,63 @@ MongoDB 的`oplog.rs` 集合没有在状态之前保持更改记录的更新,
顺便说一句,[DBZ-435](https://issues.redhat.com/browse/DBZ-435)提到的Debezium的MongoDB变更流探索,正在制定路线图。<br>
如果完成了,我们可以考虑集成两种源连接器供用户选择。

### 动态加表

**注意:** 该功能从 Flink CDC 3.1.0 版本开始支持。

动态加表功能使你可以为正在运行的作业添加新集合进行监控。新添加的集合将首先读取其快照数据,然后自动读取其变更流。

想象一下这个场景:一开始,Flink 作业监控集合 `[product, user, address]`,但几天后,我们希望这个作业还可以监控集合 `[order, custom]`,这些集合包含历史数据,我们需要作业仍然可以复用作业的已有状态。动态加表功能可以优雅地解决此问题。

以下操作显示了如何启用此功能来解决上述场景。使用现有的 MongoDB CDC Source 作业,如下:

```java
MongoDBSource<String> mongoSource = MongoDBSource.<String>builder()
.hosts("yourHostname:27017")
.databaseList("db") // 设置捕获的数据库
.collectionList("db.product", "db.user", "db.address") // 设置捕获的集合
.username("yourUsername")
.password("yourPassword")
.scanNewlyAddedTableEnabled(true) // 启用扫描新添加的表功能
.deserializer(new JsonDebeziumDeserializationSchema()) // 将 SourceRecord 转换为 JSON 字符串
.build();
// 你的业务代码
```

如果我们想添加新集合 `[order, custom]` 到现有的 Flink 作业,只需更新作业的 `collectionList()` 将新增集合 `[order, custom]` 加入并从已有的 savepoint 恢复作业。

_Step 1_: 使用 savepoint 停止现有的 Flink 作业。
```shell
$ ./bin/flink stop $Existing_Flink_JOB_ID
```
```shell
Suspending job "cca7bc1061d61cf15238e92312c2fc20" with a savepoint.
Savepoint completed. Path: file:/tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0dab
```
_Step 2_: 更新现有 Flink 作业的集合列表选项。
1. 更新 `collectionList()` 参数。
2. 编译更新后的作业,示例如下:
```java
MongoDBSource<String> mongoSource = MongoDBSource.<String>builder()
.hosts("yourHostname:27017")
.databaseList("db")
.collectionList("db.product", "db.user", "db.address", "db.order", "db.custom") // 设置捕获的集合 [product, user, address, order, custom]
.username("yourUsername")
.password("yourPassword")
.scanNewlyAddedTableEnabled(true)
.deserializer(new JsonDebeziumDeserializationSchema()) // 将 SourceRecord 转换为 JSON 字符串
.build();
// 你的业务代码
```
_Step 3_: 从 savepoint 还原更新后的 Flink 作业。
```shell
$ ./bin/flink run \
--detached \
--from-savepoint /tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0dab \
./FlinkCDCExample.jar
```
**注意:** 请参考文档 [Restore the job from previous savepoint](https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/deployment/cli/#command-line-interface) 了解更多详细信息。

### DataStream Source

MongoDB CDC 连接器也可以是一个数据流源。 你可以创建 SourceFunction,如下所示:
Expand Down
61 changes: 61 additions & 0 deletions docs/content.zh/docs/connectors/flink-sources/oracle-cdc.md
Original file line number Diff line number Diff line change
Expand Up @@ -558,6 +558,67 @@ _Note: the mechanism of `scan.startup.mode` option relying on Debezium's `snapsh

The Oracle CDC source can't work in parallel reading, because there is only one task can receive change events.

### 动态加表

**注意:** 该功能从 Flink CDC 3.1.0 版本开始支持。

动态加表功能使你可以为正在运行的作业添加新表进行监控。新添加的表将首先读取其快照数据,然后自动读取其 redo log。

想象一下这个场景:一开始,Flink 作业监控表 `[product, user, address]`,但几天后,我们希望这个作业还可以监控表 `[order, custom]`,这些表包含历史数据,我们需要作业仍然可以复用作业的已有状态。动态加表功能可以优雅地解决此问题。

以下操作显示了如何启用此功能来解决上述场景。使用现有的 Oracle CDC Source 作业,如下:

```java
JdbcIncrementalSource<String> oracleSource = new OracleSourceBuilder()
.hostname("yourHostname")
.port(1521)
.databaseList("ORCLCDB") // 设置捕获的数据库
.schemaList("INVENTORY") // 设置捕获的 schema
.tableList("INVENTORY.PRODUCT", "INVENTORY.USER", "INVENTORY.ADDRESS") // 设置捕获的表
.username("yourUsername")
.password("yourPassword")
.scanNewlyAddedTableEnabled(true) // 启用扫描新添加的表功能
.deserializer(new JsonDebeziumDeserializationSchema()) // 将 SourceRecord 转换为 JSON 字符串
.build();
// 你的业务代码
```

如果我们想添加新表 `[INVENTORY.ORDER, INVENTORY.CUSTOM]` 到现有的 Flink 作业,只需更新作业的 `tableList()` 将新增表 `[INVENTORY.ORDER, INVENTORY.CUSTOM]` 加入并从已有的 savepoint 恢复作业。

_Step 1_: 使用 savepoint 停止现有的 Flink 作业。
```shell
$ ./bin/flink stop $Existing_Flink_JOB_ID
```
```shell
Suspending job "cca7bc1061d61cf15238e92312c2fc20" with a savepoint.
Savepoint completed. Path: file:/tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0dab
```
_Step 2_: 更新现有 Flink 作业的表列表选项。
1. 更新 `tableList()` 参数。
2. 编译更新后的作业,示例如下:
```java
JdbcIncrementalSource<String> oracleSource = new OracleSourceBuilder()
.hostname("yourHostname")
.port(1521)
.databaseList("ORCLCDB")
.schemaList("INVENTORY")
.tableList("INVENTORY.PRODUCT", "INVENTORY.USER", "INVENTORY.ADDRESS", "INVENTORY.ORDER", "INVENTORY.CUSTOM") // 设置捕获的表 [PRODUCT, USER, ADDRESS, ORDER, CUSTOM]
.username("yourUsername")
.password("yourPassword")
.scanNewlyAddedTableEnabled(true)
.deserializer(new JsonDebeziumDeserializationSchema()) // 将 SourceRecord 转换为 JSON 字符串
.build();
// 你的业务代码
```
_Step 3_: 从 savepoint 还原更新后的 Flink 作业。
```shell
$ ./bin/flink run \
--detached \
--from-savepoint /tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0dab \
./FlinkCDCExample.jar
```
**注意:** 请参考文档 [Restore the job from previous savepoint](https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/deployment/cli/#command-line-interface) 了解更多详细信息。

### DataStream Source

The Oracle CDC connector can also be a DataStream source. There are two modes for the DataStream source:
Expand Down
65 changes: 65 additions & 0 deletions docs/content.zh/docs/connectors/flink-sources/postgres-cdc.md
Original file line number Diff line number Diff line change
Expand Up @@ -510,6 +510,71 @@ The config option `scan.startup.mode` specifies the startup mode for PostgreSQL
- `committed-offset`: Skip snapshot phase and start reading events from a `confirmed_flush_lsn` offset of replication slot.
- `snapshot`: Only the snapshot phase is performed and exits after the snapshot phase reading is completed.

### 动态加表

**注意:** 该功能从 Flink CDC 3.1.0 版本开始支持。

动态加表功能使你可以为正在运行的作业添加新表进行监控。新添加的表将首先读取其快照数据,然后自动读取其 WAL (Write-Ahead Log) 日志 或者 replication slot changes 复制槽。

想象一下这个场景:一开始,Flink 作业监控表 `[product, user, address]`,但几天后,我们希望这个作业还可以监控表 `[order, custom]`,这些表包含历史数据,我们需要作业仍然可以复用作业的已有状态。动态加表功能可以优雅地解决此问题。

以下操作显示了如何启用此功能来解决上述场景。使用现有的 PostgreSQL CDC Source 作业,如下:

```java
JdbcIncrementalSource<String> postgresSource =
PostgresSourceBuilder.PostgresIncrementalSource.<String>builder()
.hostname("yourHostname")
.port(5432)
.database("postgres") // 设置捕获的数据库
.schemaList("inventory") // 设置捕获的 schema
.tableList("inventory.product", "inventory.user", "inventory.address") // 设置捕获的表
.username("yourUsername")
.password("yourPassword")
.slotName("flink")
.scanNewlyAddedTableEnabled(true) // 启用扫描新添加的表功能
.deserializer(new JsonDebeziumDeserializationSchema()) // 将 SourceRecord 转换为 JSON 字符串
.build();
// 你的业务代码
```

如果我们想添加新表 `[inventory.order, inventory.custom]` 到现有的 Flink 作业,只需更新作业的 `tableList()` 将新增表 `[inventory.order, inventory.custom]` 加入并从已有的 savepoint 恢复作业。

_Step 1_: 使用 savepoint 停止现有的 Flink 作业。
```shell
$ ./bin/flink stop $Existing_Flink_JOB_ID
```
```shell
Suspending job "cca7bc1061d61cf15238e92312c2fc20" with a savepoint.
Savepoint completed. Path: file:/tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0dab
```
_Step 2_: 更新现有 Flink 作业的表列表选项。
1. 更新 `tableList()` 参数。
2. 编译更新后的作业,示例如下:
```java
JdbcIncrementalSource<String> postgresSource =
PostgresSourceBuilder.PostgresIncrementalSource.<String>builder()
.hostname("yourHostname")
.port(5432)
.database("postgres")
.schemaList("inventory")
.tableList("inventory.product", "inventory.user", "inventory.address", "inventory.order", "inventory.custom") // 设置捕获的表 [product, user, address, order, custom]
.username("yourUsername")
.password("yourPassword")
.slotName("flink")
.scanNewlyAddedTableEnabled(true)
.deserializer(new JsonDebeziumDeserializationSchema()) // 将 SourceRecord 转换为 JSON 字符串
.build();
// 你的业务代码
```
_Step 3_: 从 savepoint 还原更新后的 Flink 作业。
```shell
$ ./bin/flink run \
--detached \
--from-savepoint /tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0dab \
./FlinkCDCExample.jar
```
**注意:** 请参考文档 [Restore the job from previous savepoint](https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/deployment/cli/#command-line-interface) 了解更多详细信息。

### DataStream Source

The Postgres CDC connector can also be a DataStream source. There are two modes for the DataStream source:
Expand Down
18 changes: 15 additions & 3 deletions docs/content.zh/docs/connectors/pipeline-connectors/starrocks.md
Original file line number Diff line number Diff line change
Expand Up @@ -128,15 +128,15 @@ pipeline:
<td>sink.connect.timeout-ms</td>
<td>optional</td>
<td style="word-wrap: break-word;">30000</td>
<td>String</td>
<td>Integer</td>
<td>与 FE 建立 HTTP 连接的超时时间。取值范围:[100, 60000]。</td>
</tr>
<tr>
<td>sink.wait-for-continue.timeout-ms</td>
<td>optional</td>
<td style="word-wrap: break-word;">30000</td>
<td>String</td>
<td>等待 FE HTTP 100-continue 应答的超时时间。取值范围:[3000, 60000]。</td>
<td>Integer</td>
<td>等待 FE HTTP 100-continue 应答的超时时间。取值范围:[3000, 600000]。</td>
</tr>
<tr>
<td>sink.buffer-flush.max-bytes</td>
Expand Down Expand Up @@ -174,6 +174,13 @@ pipeline:
<td>Boolean</td>
<td>at-least-once 下是否使用 transaction stream load。</td>
</tr>
<tr>
<td>sink.metric.histogram-window-size</td>
<td>optional</td>
<td style="word-wrap: break-word;">100</td>
<td>Integer</td>
<td>直方图指标的窗口大小。</td>
</tr>
<tr>
<td>sink.properties.*</td>
<td>optional</td>
Expand Down Expand Up @@ -297,6 +304,11 @@ pipeline:
<td>DATE</td>
<td></td>
</tr>
<tr>
<td>TIME</td>
<td>VARCHAR</td>
<td>StarRocks 不支持 TIME 类型,因此映射为 VARCHAR。TIME(p) 值以字符串形式存储:当 p = 0 时格式为 "HH:mm:ss",当 p > 0 时格式为 "HH:mm:ss.&lt;p 位小数&gt;"(例如 p = 3 时为 "HH:mm:ss.SSS")。</td>
</tr>
<tr>
<td>TIMESTAMP</td>
<td>DATETIME</td>
Expand Down
45 changes: 45 additions & 0 deletions docs/content.zh/docs/core-concept/transform.md
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,51 @@ pipeline:

注意这里的 `classpath` 必须是全限定名,并且对应的 `jar` 文件必须包含在 Flink `/lib` 文件夹中,或者通过 `flink-cdc.sh --jar` 选项传递。

### UDF 配置选项

你可以通过添加 `options` 块来向 UDF 传递额外的配置选项。这些选项可以在 `open` 方法中通过 `UserDefinedFunctionContext.configuration()` 获取:

```yaml
pipeline:
user-defined-function:
- name: query_redis
classpath: com.example.flink.cdc.udf.RedisQueryFunction
options:
hostname: localhost
port: "6379"
cache.enabled: "true"
```

在你的 UDF 实现中,可以通过定义 `ConfigOption` 实例来访问这些配置选项:

```java
import org.apache.flink.cdc.common.configuration.ConfigOption;
import org.apache.flink.cdc.common.configuration.ConfigOptions;

public class RedisQueryFunction implements UserDefinedFunction {
private static final ConfigOption<String> HOSTNAME =
ConfigOptions.key("hostname").stringType().noDefaultValue();
private static final ConfigOption<Integer> PORT =
ConfigOptions.key("port").intType().defaultValue(6379);

private String hostname;
private int port;

@Override
public void open(UserDefinedFunctionContext context) throws Exception {
hostname = context.configuration().get(HOSTNAME);
port = context.configuration().get(PORT);
// 在这里初始化你的连接...
}

public Object eval(String key) {
// 使用 hostname 和 port 查询 Redis...
}
}
```

`options` 字段是可选的。如果未指定,将会传递一个空的配置给 UDF。

在正确注册后,UDF 可以在 `projection` 和 `filter` 表达式中使用,就像内置函数一样:

```yaml
Expand Down
Loading