Skip to content

Commit 29a7be6

Browse files
tchivsMrart
authored andcommitted
[FLINK-38844][pipeline-connector][postgres]Add metadata column support (apache#4202)
1 parent 47db41a commit 29a7be6

9 files changed

Lines changed: 511 additions & 4 deletions

File tree

docs/content.zh/docs/connectors/pipeline-connectors/postgres.md

Lines changed: 74 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -253,10 +253,10 @@ pipeline:
253253
<tr>
254254
<td>metadata.list</td>
255255
<td>optional</td>
256-
<td style="word-wrap: break-word;">false</td>
256+
<td style="word-wrap: break-word;">(none)</td>
257257
<td>String</td>
258258
<td>
259-
源记录中可读取的元数据列表,将传递给下游并在转换模块中使用,各字段以逗号分隔。可用的可读元数据包括:op_ts。
259+
源记录中可读取的元数据列表,将传递给下游并在转换模块中使用,各字段以逗号分隔。可用的可读元数据包括:op_ts、table_name、database_name、schema_name。详见<a href="#支持的元数据列">支持的元数据列</a>
260260
</td>
261261
</tr>
262262
<tr>
@@ -316,6 +316,78 @@ pipeline:
316316
注意:
317317
1. Group 名称是 `namespace.schema.table`,这里的 `namespace` 是实际的数据库名称, `schema` 是实际的 schema 名称, `table` 是实际的表名称。
318318

319+
## 支持的元数据列
320+
321+
PostgreSQL CDC 连接器支持从源记录中读取元数据列。这些元数据列可以在转换操作中使用或传递给下游 Sink。
322+
323+
**注意:** 部分元数据信息也可以通过 Transform 表达式获取(例如 `__namespace_name__`、`__schema_name__`、`__table_name__`)。主要区别如下:
324+
- **`op_ts`**:仅可通过 `metadata.list` 获取 - 提供数据库中实际的操作时间戳。
325+
- **`table_name`、`database_name`、`schema_name`**:可通过 `metadata.list` 或 Transform 表达式获取。使用 `metadata.list` 可以直接将这些值传递给下游 Sink,无需编写转换规则,对于基本用例更加简单。
326+
327+
要启用元数据列,请使用逗号分隔的元数据列名称列表配置 `metadata.list` 选项:
328+
329+
```yaml
330+
source:
331+
type: postgres
332+
# ... 其他配置
333+
metadata.list: op_ts,table_name,database_name,schema_name
334+
```
335+
336+
支持以下元数据列:
337+
338+
<div class="wy-table-responsive">
339+
<table class="colwidths-auto docutils">
340+
<thead>
341+
<tr>
342+
<th class="text-left" style="width: 20%">元数据列</th>
343+
<th class="text-left" style="width: 15%">数据类型</th>
344+
<th class="text-left" style="width: 65%">描述</th>
345+
</tr>
346+
</thead>
347+
<tbody>
348+
<tr>
349+
<td>op_ts</td>
350+
<td>BIGINT NOT NULL</td>
351+
<td>数据变更事件在数据库中发生的时间戳(自纪元以来的毫秒数)。对于快照记录,此值为 0。</td>
352+
</tr>
353+
<tr>
354+
<td>table_name</td>
355+
<td>STRING NOT NULL</td>
356+
<td>包含变更行的表名称。替代方案:在 Transform 表达式中使用 <code>__table_name__</code>。</td>
357+
</tr>
358+
<tr>
359+
<td>database_name</td>
360+
<td>STRING NOT NULL</td>
361+
<td>包含变更行的数据库名称。替代方案:在 Transform 表达式中使用 <code>__namespace_name__</code>。</td>
362+
</tr>
363+
<tr>
364+
<td>schema_name</td>
365+
<td>STRING NOT NULL</td>
366+
<td>包含变更行的 Schema 名称。这是 PostgreSQL 特有的。替代方案:在 Transform 表达式中使用 <code>__schema_name__</code>。</td>
367+
</tr>
368+
</tbody>
369+
</table>
370+
</div>
371+
372+
**使用示例:**
373+
374+
```yaml
375+
source:
376+
type: postgres
377+
hostname: localhost
378+
port: 5432
379+
username: postgres
380+
password: postgres
381+
tables: mydb.public.orders
382+
slot.name: flink_slot
383+
metadata.list: op_ts,table_name,schema_name
384+
385+
transform:
386+
- source-table: mydb.public.orders
387+
projection: order_id, customer_id, op_ts, table_name, schema_name
388+
description: 在输出中包含元数据列
389+
```
390+
319391
## 数据类型映射
320392

321393
<div class="wy-table-responsive">

docs/content/docs/connectors/pipeline-connectors/postgres.md

Lines changed: 74 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -245,10 +245,10 @@ pipeline:
245245
<tr>
246246
<td>metadata.list</td>
247247
<td>optional</td>
248-
<td style="word-wrap: break-word;">false</td>
248+
<td style="word-wrap: break-word;">(none)</td>
249249
<td>String</td>
250250
<td>
251-
List of readable metadata from SourceRecord to be passed to downstream and could be used in transform module, split by `,`. Available readable metadata are: op_ts.
251+
List of readable metadata from SourceRecord to be passed to downstream and could be used in transform module, split by `,`. Available readable metadata are: op_ts, table_name, database_name, schema_name. See <a href="#supported-metadata-columns">Supported Metadata Columns</a> for more details.
252252
</td>
253253
</tr>
254254
<tr>
@@ -311,6 +311,78 @@ Metrics can help understand the progress of assignments, and the following are t
311311
Notice:
312312
1. The group name is `namespace.schema.table`, where `namespace` is the actual database name, `schema` is the actual schema name, and `table` is the actual table name.
313313

314+
## Supported Metadata Columns
315+
316+
PostgreSQL CDC connector supports reading metadata columns from source records. These metadata columns can be used in transform operations or passed to downstream sinks.
317+
318+
**Note:** Some metadata information is also available through Transform expressions (e.g., `__namespace_name__`, `__schema_name__`, `__table_name__`). The key differences are:
319+
- **`op_ts`**: Only available via `metadata.list` - provides the actual operation timestamp from the database.
320+
- **`table_name`, `database_name`, `schema_name`**: Can be obtained via either `metadata.list` or Transform expressions. Using `metadata.list` allows you to pass these values directly to downstream sinks without writing transform rules, which is simpler for basic use cases.
321+
322+
To enable metadata columns, configure the `metadata.list` option with a comma-separated list of metadata column names:
323+
324+
```yaml
325+
source:
326+
type: postgres
327+
# ... other configurations
328+
metadata.list: op_ts,table_name,database_name,schema_name
329+
```
330+
331+
The following metadata columns are supported:
332+
333+
<div class="wy-table-responsive">
334+
<table class="colwidths-auto docutils">
335+
<thead>
336+
<tr>
337+
<th class="text-left" style="width: 20%">Metadata Column</th>
338+
<th class="text-left" style="width: 15%">Data Type</th>
339+
<th class="text-left" style="width: 65%">Description</th>
340+
</tr>
341+
</thead>
342+
<tbody>
343+
<tr>
344+
<td>op_ts</td>
345+
<td>BIGINT NOT NULL</td>
346+
<td>The timestamp (in milliseconds since epoch) when the change event occurred in the database. For snapshot records, this value is 0.</td>
347+
</tr>
348+
<tr>
349+
<td>table_name</td>
350+
<td>STRING NOT NULL</td>
351+
<td>The name of the table that contains the changed row. Alternative: use <code>__table_name__</code> in Transform expressions.</td>
352+
</tr>
353+
<tr>
354+
<td>database_name</td>
355+
<td>STRING NOT NULL</td>
356+
<td>The name of the database that contains the changed row. Alternative: use <code>__namespace_name__</code> in Transform expressions.</td>
357+
</tr>
358+
<tr>
359+
<td>schema_name</td>
360+
<td>STRING NOT NULL</td>
361+
<td>The name of the schema that contains the changed row. This is specific to PostgreSQL. Alternative: use <code>__schema_name__</code> in Transform expressions.</td>
362+
</tr>
363+
</tbody>
364+
</table>
365+
</div>
366+
367+
**Example Usage:**
368+
369+
```yaml
370+
source:
371+
type: postgres
372+
hostname: localhost
373+
port: 5432
374+
username: postgres
375+
password: postgres
376+
tables: mydb.public.orders
377+
slot.name: flink_slot
378+
metadata.list: op_ts,table_name,schema_name
379+
380+
transform:
381+
- source-table: mydb.public.orders
382+
projection: order_id, customer_id, op_ts, table_name, schema_name
383+
description: Include metadata columns in output
384+
```
385+
314386
## Data Type Mapping
315387

316388
<div class="wy-table-responsive">
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.cdc.connectors.postgres.source;
19+
20+
import org.apache.flink.cdc.common.source.SupportedMetadataColumn;
21+
import org.apache.flink.cdc.common.types.DataType;
22+
import org.apache.flink.cdc.common.types.DataTypes;
23+
24+
import java.util.Map;
25+
26+
/** A {@link SupportedMetadataColumn} for database_name. */
27+
public class DatabaseNameMetadataColumn implements SupportedMetadataColumn {
28+
29+
@Override
30+
public String getName() {
31+
return "database_name";
32+
}
33+
34+
@Override
35+
public DataType getType() {
36+
return DataTypes.STRING().notNull();
37+
}
38+
39+
@Override
40+
public Class<?> getJavaClass() {
41+
return String.class;
42+
}
43+
44+
@Override
45+
public Object read(Map<String, String> metadata) {
46+
if (metadata.containsKey(getName())) {
47+
return metadata.get(getName());
48+
}
49+
throw new IllegalArgumentException(
50+
"database_name doesn't exist in the metadata: " + metadata);
51+
}
52+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.cdc.connectors.postgres.source;
19+
20+
import org.apache.flink.cdc.common.source.SupportedMetadataColumn;
21+
import org.apache.flink.cdc.common.types.DataType;
22+
import org.apache.flink.cdc.common.types.DataTypes;
23+
24+
import java.util.Map;
25+
26+
/** A {@link SupportedMetadataColumn} for op_ts. */
27+
public class OpTsMetadataColumn implements SupportedMetadataColumn {
28+
29+
@Override
30+
public String getName() {
31+
return "op_ts";
32+
}
33+
34+
@Override
35+
public DataType getType() {
36+
return DataTypes.BIGINT().notNull();
37+
}
38+
39+
@Override
40+
public Class<?> getJavaClass() {
41+
return Long.class;
42+
}
43+
44+
@Override
45+
public Object read(Map<String, String> metadata) {
46+
if (metadata.containsKey(getName())) {
47+
return Long.parseLong(metadata.get(getName()));
48+
}
49+
throw new IllegalArgumentException("op_ts doesn't exist in the metadata: " + metadata);
50+
}
51+
}

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDataSource.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.flink.cdc.common.source.EventSourceProvider;
2525
import org.apache.flink.cdc.common.source.FlinkSourceProvider;
2626
import org.apache.flink.cdc.common.source.MetadataAccessor;
27+
import org.apache.flink.cdc.common.source.SupportedMetadataColumn;
2728
import org.apache.flink.cdc.connectors.base.config.SourceConfig;
2829
import org.apache.flink.cdc.connectors.base.source.jdbc.JdbcIncrementalSource;
2930
import org.apache.flink.cdc.connectors.base.source.meta.split.SourceRecords;
@@ -98,6 +99,23 @@ public PostgresSourceConfig getPostgresSourceConfig() {
9899
return postgresSourceConfig;
99100
}
100101

102+
@Override
103+
public SupportedMetadataColumn[] supportedMetadataColumns() {
104+
return new SupportedMetadataColumn[] {
105+
new OpTsMetadataColumn(),
106+
new TableNameMetadataColumn(),
107+
new DatabaseNameMetadataColumn(),
108+
new SchemaNameMetadataColumn()
109+
};
110+
}
111+
112+
@Override
113+
public boolean isParallelMetadataSource() {
114+
// During incremental stage, PostgreSQL never emits schema change events on different
115+
// partitions (since it has one WAL stream only.)
116+
return false;
117+
}
118+
101119
/** The {@link JdbcIncrementalSource} implementation for Postgres. */
102120
public static class PostgresPipelineSource<T>
103121
extends PostgresSourceBuilder.PostgresIncrementalSource<T> {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.cdc.connectors.postgres.source;
19+
20+
import org.apache.flink.cdc.common.source.SupportedMetadataColumn;
21+
import org.apache.flink.cdc.common.types.DataType;
22+
import org.apache.flink.cdc.common.types.DataTypes;
23+
24+
import java.util.Map;
25+
26+
/** A {@link SupportedMetadataColumn} for schema_name. */
27+
public class SchemaNameMetadataColumn implements SupportedMetadataColumn {
28+
29+
@Override
30+
public String getName() {
31+
return "schema_name";
32+
}
33+
34+
@Override
35+
public DataType getType() {
36+
return DataTypes.STRING().notNull();
37+
}
38+
39+
@Override
40+
public Class<?> getJavaClass() {
41+
return String.class;
42+
}
43+
44+
@Override
45+
public Object read(Map<String, String> metadata) {
46+
if (metadata.containsKey(getName())) {
47+
return metadata.get(getName());
48+
}
49+
throw new IllegalArgumentException(
50+
"schema_name doesn't exist in the metadata: " + metadata);
51+
}
52+
}

0 commit comments

Comments
 (0)