Skip to content

Commit 1b774df

Browse files
committed
[FLINK-XXXXX][pipeline-connector-postgres] Add metadata column support for PostgreSQL Pipeline Connector
This commit adds metadata column support for the PostgreSQL Pipeline Connector, enabling users to access metadata information in their data pipelines. Changes: - Add OpTsMetadataColumn for operation timestamp - Add DatabaseNameMetadataColumn for database name - Add SchemaNameMetadataColumn for schema name - Add TableNameMetadataColumn for table name - Update PostgresDataSource to support metadata columns - Add comprehensive E2E test testAllMetadataColumns() - Update documentation (English and Chinese)
1 parent db1caee commit 1b774df

9 files changed

Lines changed: 503 additions & 4 deletions

File tree

docs/content.zh/docs/connectors/pipeline-connectors/postgres.md

Lines changed: 70 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -253,10 +253,10 @@ pipeline:
253253
<tr>
254254
<td>metadata.list</td>
255255
<td>optional</td>
256-
<td style="word-wrap: break-word;">false</td>
256+
<td style="word-wrap: break-word;">(none)</td>
257257
<td>String</td>
258258
<td>
259-
源记录中可读取的元数据列表,将传递给下游并在转换模块中使用,各字段以逗号分隔。可用的可读元数据包括:op_ts。
259+
源记录中可读取的元数据列表,将传递给下游并在转换模块中使用,各字段以逗号分隔。可用的可读元数据包括:op_ts、table_name、database_name、schema_name。详见<a href="#支持的元数据列">支持的元数据列</a>
260260
</td>
261261
</tr>
262262
<tr>
@@ -304,6 +304,74 @@ pipeline:
304304
注意:
305305
1. Group 名称是 `namespace.schema.table`,这里的 `namespace` 是实际的数据库名称, `schema` 是实际的 schema 名称, `table` 是实际的表名称。
306306

307+
## 支持的元数据列
308+
309+
PostgreSQL CDC 连接器支持从源记录中读取元数据列。这些元数据列可以在转换操作中使用或传递给下游 Sink。
310+
311+
要启用元数据列,请使用逗号分隔的元数据列名称列表配置 `metadata.list` 选项:
312+
313+
```yaml
314+
source:
315+
type: postgres
316+
# ... 其他配置
317+
metadata.list: op_ts,table_name,database_name,schema_name
318+
```
319+
320+
支持以下元数据列:
321+
322+
<div class="wy-table-responsive">
323+
<table class="colwidths-auto docutils">
324+
<thead>
325+
<tr>
326+
<th class="text-left" style="width: 20%">元数据列</th>
327+
<th class="text-left" style="width: 15%">数据类型</th>
328+
<th class="text-left" style="width: 65%">描述</th>
329+
</tr>
330+
</thead>
331+
<tbody>
332+
<tr>
333+
<td>op_ts</td>
334+
<td>BIGINT NOT NULL</td>
335+
<td>数据变更事件在数据库中发生的时间戳(自纪元以来的毫秒数)。对于快照记录,此值为 0。</td>
336+
</tr>
337+
<tr>
338+
<td>table_name</td>
339+
<td>STRING NOT NULL</td>
340+
<td>包含变更行的表名称。</td>
341+
</tr>
342+
<tr>
343+
<td>database_name</td>
344+
<td>STRING NOT NULL</td>
345+
<td>包含变更行的数据库名称。</td>
346+
</tr>
347+
<tr>
348+
<td>schema_name</td>
349+
<td>STRING NOT NULL</td>
350+
<td>包含变更行的 Schema 名称。这是 PostgreSQL 特有的。</td>
351+
</tr>
352+
</tbody>
353+
</table>
354+
</div>
355+
356+
**使用示例:**
357+
358+
```yaml
359+
source:
360+
type: postgres
361+
hostname: localhost
362+
port: 5432
363+
username: postgres
364+
password: postgres
365+
tables: mydb.public.orders
366+
slot.name: flink_slot
367+
metadata.list: op_ts,table_name,schema_name
368+
369+
transform:
370+
- source-table: mydb.public.orders
371+
projection: order_id, customer_id, op_ts, table_name, schema_name
372+
description: 在输出中包含元数据列
373+
```
374+
307375
## 数据类型映射
308376

309377
<div class="wy-table-responsive">

docs/content/docs/connectors/pipeline-connectors/postgres.md

Lines changed: 70 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -245,10 +245,10 @@ pipeline:
245245
<tr>
246246
<td>metadata.list</td>
247247
<td>optional</td>
248-
<td style="word-wrap: break-word;">false</td>
248+
<td style="word-wrap: break-word;">(none)</td>
249249
<td>String</td>
250250
<td>
251-
List of readable metadata from SourceRecord to be passed to downstream and could be used in transform module, split by `,`. Available readable metadata are: op_ts.
251+
List of readable metadata from SourceRecord to be passed to downstream and could be used in transform module, split by `,`. Available readable metadata are: op_ts, table_name, database_name, schema_name. See <a href="#supported-metadata-columns">Supported Metadata Columns</a> for more details.
252252
</td>
253253
</tr>
254254
<tr>
@@ -299,6 +299,74 @@ Metrics can help understand the progress of assignments, and the following are t
299299
Notice:
300300
1. The group name is `namespace.schema.table`, where `namespace` is the actual database name, `schema` is the actual schema name, and `table` is the actual table name.
301301

302+
## Supported Metadata Columns
303+
304+
PostgreSQL CDC connector supports reading metadata columns from source records. These metadata columns can be used in transform operations or passed to downstream sinks.
305+
306+
To enable metadata columns, configure the `metadata.list` option with a comma-separated list of metadata column names:
307+
308+
```yaml
309+
source:
310+
type: postgres
311+
# ... other configurations
312+
metadata.list: op_ts,table_name,database_name,schema_name
313+
```
314+
315+
The following metadata columns are supported:
316+
317+
<div class="wy-table-responsive">
318+
<table class="colwidths-auto docutils">
319+
<thead>
320+
<tr>
321+
<th class="text-left" style="width: 20%">Metadata Column</th>
322+
<th class="text-left" style="width: 15%">Data Type</th>
323+
<th class="text-left" style="width: 65%">Description</th>
324+
</tr>
325+
</thead>
326+
<tbody>
327+
<tr>
328+
<td>op_ts</td>
329+
<td>BIGINT NOT NULL</td>
330+
<td>The timestamp (in milliseconds since epoch) when the change event occurred in the database. For snapshot records, this value is 0.</td>
331+
</tr>
332+
<tr>
333+
<td>table_name</td>
334+
<td>STRING NOT NULL</td>
335+
<td>The name of the table that contains the changed row.</td>
336+
</tr>
337+
<tr>
338+
<td>database_name</td>
339+
<td>STRING NOT NULL</td>
340+
<td>The name of the database that contains the changed row.</td>
341+
</tr>
342+
<tr>
343+
<td>schema_name</td>
344+
<td>STRING NOT NULL</td>
345+
<td>The name of the schema that contains the changed row. This is specific to PostgreSQL.</td>
346+
</tr>
347+
</tbody>
348+
</table>
349+
</div>
350+
351+
**Example Usage:**
352+
353+
```yaml
354+
source:
355+
type: postgres
356+
hostname: localhost
357+
port: 5432
358+
username: postgres
359+
password: postgres
360+
tables: mydb.public.orders
361+
slot.name: flink_slot
362+
metadata.list: op_ts,table_name,schema_name
363+
364+
transform:
365+
- source-table: mydb.public.orders
366+
projection: order_id, customer_id, op_ts, table_name, schema_name
367+
description: Include metadata columns in output
368+
```
369+
302370
## Data Type Mapping
303371

304372
<div class="wy-table-responsive">
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.cdc.connectors.postgres.source;
19+
20+
import org.apache.flink.cdc.common.source.SupportedMetadataColumn;
21+
import org.apache.flink.cdc.common.types.DataType;
22+
import org.apache.flink.cdc.common.types.DataTypes;
23+
24+
import java.util.Map;
25+
26+
/** A {@link SupportedMetadataColumn} for database_name. */
27+
public class DatabaseNameMetadataColumn implements SupportedMetadataColumn {
28+
29+
@Override
30+
public String getName() {
31+
return "database_name";
32+
}
33+
34+
@Override
35+
public DataType getType() {
36+
return DataTypes.STRING().notNull();
37+
}
38+
39+
@Override
40+
public Class<?> getJavaClass() {
41+
return String.class;
42+
}
43+
44+
@Override
45+
public Object read(Map<String, String> metadata) {
46+
if (metadata.containsKey(getName())) {
47+
return metadata.get(getName());
48+
}
49+
throw new IllegalArgumentException(
50+
"database_name doesn't exist in the metadata: " + metadata);
51+
}
52+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.cdc.connectors.postgres.source;
19+
20+
import org.apache.flink.cdc.common.source.SupportedMetadataColumn;
21+
import org.apache.flink.cdc.common.types.DataType;
22+
import org.apache.flink.cdc.common.types.DataTypes;
23+
24+
import java.util.Map;
25+
26+
/** A {@link SupportedMetadataColumn} for op_ts. */
27+
public class OpTsMetadataColumn implements SupportedMetadataColumn {
28+
29+
@Override
30+
public String getName() {
31+
return "op_ts";
32+
}
33+
34+
@Override
35+
public DataType getType() {
36+
return DataTypes.BIGINT().notNull();
37+
}
38+
39+
@Override
40+
public Class<?> getJavaClass() {
41+
return Long.class;
42+
}
43+
44+
@Override
45+
public Object read(Map<String, String> metadata) {
46+
if (metadata.containsKey(getName())) {
47+
return Long.parseLong(metadata.get(getName()));
48+
}
49+
throw new IllegalArgumentException("op_ts doesn't exist in the metadata: " + metadata);
50+
}
51+
}

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDataSource.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.flink.cdc.common.source.EventSourceProvider;
2525
import org.apache.flink.cdc.common.source.FlinkSourceProvider;
2626
import org.apache.flink.cdc.common.source.MetadataAccessor;
27+
import org.apache.flink.cdc.common.source.SupportedMetadataColumn;
2728
import org.apache.flink.cdc.connectors.base.config.SourceConfig;
2829
import org.apache.flink.cdc.connectors.base.source.jdbc.JdbcIncrementalSource;
2930
import org.apache.flink.cdc.connectors.base.source.meta.split.SourceRecords;
@@ -92,6 +93,23 @@ public PostgresSourceConfig getPostgresSourceConfig() {
9293
return postgresSourceConfig;
9394
}
9495

96+
@Override
97+
public SupportedMetadataColumn[] supportedMetadataColumns() {
98+
return new SupportedMetadataColumn[] {
99+
new OpTsMetadataColumn(),
100+
new TableNameMetadataColumn(),
101+
new DatabaseNameMetadataColumn(),
102+
new SchemaNameMetadataColumn()
103+
};
104+
}
105+
106+
@Override
107+
public boolean isParallelMetadataSource() {
108+
// During incremental stage, PostgreSQL never emits schema change events on different
109+
// partitions (since it has one WAL stream only.)
110+
return false;
111+
}
112+
95113
/** The {@link JdbcIncrementalSource} implementation for Postgres. */
96114
public static class PostgresPipelineSource<T>
97115
extends PostgresSourceBuilder.PostgresIncrementalSource<T> {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.cdc.connectors.postgres.source;
19+
20+
import org.apache.flink.cdc.common.source.SupportedMetadataColumn;
21+
import org.apache.flink.cdc.common.types.DataType;
22+
import org.apache.flink.cdc.common.types.DataTypes;
23+
24+
import java.util.Map;
25+
26+
/** A {@link SupportedMetadataColumn} for schema_name. */
27+
public class SchemaNameMetadataColumn implements SupportedMetadataColumn {
28+
29+
@Override
30+
public String getName() {
31+
return "schema_name";
32+
}
33+
34+
@Override
35+
public DataType getType() {
36+
return DataTypes.STRING().notNull();
37+
}
38+
39+
@Override
40+
public Class<?> getJavaClass() {
41+
return String.class;
42+
}
43+
44+
@Override
45+
public Object read(Map<String, String> metadata) {
46+
if (metadata.containsKey(getName())) {
47+
return metadata.get(getName());
48+
}
49+
throw new IllegalArgumentException(
50+
"schema_name doesn't exist in the metadata: " + metadata);
51+
}
52+
}

0 commit comments

Comments
 (0)