Skip to content

Commit b3869c0

Browse files
committed
[test][pipeline-connector/postgres] Add unit tests for PostgresTypeUtils and PostgresSchemaUtils
1 parent 2b51eb7 commit b3869c0

2 files changed

Lines changed: 436 additions & 0 deletions

File tree

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.cdc.connectors.postgres.utils;
19+
20+
import org.apache.flink.cdc.common.event.TableId;
21+
22+
import org.junit.jupiter.api.Test;
23+
24+
import static org.assertj.core.api.Assertions.assertThat;
25+
26+
/** Tests for {@link PostgresSchemaUtils}. */
27+
class PostgresSchemaUtilsTest {
28+
29+
// --------------------------------------------------------------------------------------------
30+
// Tests for quote
31+
// --------------------------------------------------------------------------------------------
32+
33+
@Test
34+
void testQuote() {
35+
assertThat(PostgresSchemaUtils.quote("my_table")).isEqualTo("\"my_table\"");
36+
}
37+
38+
@Test
39+
void testQuoteWithSpecialCharacters() {
40+
assertThat(PostgresSchemaUtils.quote("my-table.name")).isEqualTo("\"my-table.name\"");
41+
}
42+
43+
// --------------------------------------------------------------------------------------------
44+
// Tests for toDbzTableId
45+
// --------------------------------------------------------------------------------------------
46+
47+
@Test
48+
void testToDbzTableIdWithNamespaceAndSchema() {
49+
TableId cdcTableId = TableId.tableId("my_namespace", "my_schema", "my_table");
50+
io.debezium.relational.TableId dbzTableId = PostgresSchemaUtils.toDbzTableId(cdcTableId);
51+
52+
assertThat(dbzTableId.catalog()).isEqualTo("my_namespace");
53+
assertThat(dbzTableId.schema()).isEqualTo("my_schema");
54+
assertThat(dbzTableId.table()).isEqualTo("my_table");
55+
}
56+
57+
@Test
58+
void testToDbzTableIdWithSchemaOnly() {
59+
TableId cdcTableId = TableId.tableId("my_schema", "my_table");
60+
io.debezium.relational.TableId dbzTableId = PostgresSchemaUtils.toDbzTableId(cdcTableId);
61+
62+
assertThat(dbzTableId.schema()).isEqualTo("my_schema");
63+
assertThat(dbzTableId.table()).isEqualTo("my_table");
64+
}
65+
66+
// --------------------------------------------------------------------------------------------
67+
// Tests for toCdcTableId
68+
// --------------------------------------------------------------------------------------------
69+
70+
@Test
71+
void testToCdcTableIdSimple() {
72+
io.debezium.relational.TableId dbzTableId =
73+
new io.debezium.relational.TableId(null, "public", "users");
74+
TableId cdcTableId = PostgresSchemaUtils.toCdcTableId(dbzTableId);
75+
76+
assertThat(cdcTableId.getSchemaName()).isEqualTo("public");
77+
assertThat(cdcTableId.getTableName()).isEqualTo("users");
78+
}
79+
80+
@Test
81+
void testToCdcTableIdWithNullSchema() {
82+
io.debezium.relational.TableId dbzTableId =
83+
new io.debezium.relational.TableId(null, null, "users");
84+
TableId cdcTableId = PostgresSchemaUtils.toCdcTableId(dbzTableId);
85+
86+
assertThat(cdcTableId.getTableName()).isEqualTo("users");
87+
}
88+
89+
@Test
90+
void testToCdcTableIdWithDatabaseIncluded() {
91+
io.debezium.relational.TableId dbzTableId =
92+
new io.debezium.relational.TableId(null, "public", "users");
93+
TableId cdcTableId = PostgresSchemaUtils.toCdcTableId(dbzTableId, "my_db", true);
94+
95+
assertThat(cdcTableId.getNamespace()).isEqualTo("my_db");
96+
assertThat(cdcTableId.getSchemaName()).isEqualTo("public");
97+
assertThat(cdcTableId.getTableName()).isEqualTo("users");
98+
}
99+
100+
@Test
101+
void testToCdcTableIdWithDatabaseNotIncluded() {
102+
io.debezium.relational.TableId dbzTableId =
103+
new io.debezium.relational.TableId(null, "public", "users");
104+
TableId cdcTableId = PostgresSchemaUtils.toCdcTableId(dbzTableId, "my_db", false);
105+
106+
// Database not included, should use schema and table only
107+
assertThat(cdcTableId.getSchemaName()).isEqualTo("public");
108+
assertThat(cdcTableId.getTableName()).isEqualTo("users");
109+
}
110+
111+
@Test
112+
void testToCdcTableIdWithNullDatabaseName() {
113+
io.debezium.relational.TableId dbzTableId =
114+
new io.debezium.relational.TableId(null, "public", "users");
115+
TableId cdcTableId = PostgresSchemaUtils.toCdcTableId(dbzTableId, null, true);
116+
117+
// Null database name, should fall back to schema + table
118+
assertThat(cdcTableId.getSchemaName()).isEqualTo("public");
119+
assertThat(cdcTableId.getTableName()).isEqualTo("users");
120+
}
121+
122+
// --------------------------------------------------------------------------------------------
123+
// Tests for round-trip conversion
124+
// --------------------------------------------------------------------------------------------
125+
126+
@Test
127+
void testRoundTripConversion() {
128+
TableId original = TableId.tableId("my_namespace", "my_schema", "my_table");
129+
io.debezium.relational.TableId dbzId = PostgresSchemaUtils.toDbzTableId(original);
130+
TableId roundTripped = PostgresSchemaUtils.toCdcTableId(dbzId, "my_namespace", true);
131+
132+
assertThat(roundTripped).isEqualTo(original);
133+
}
134+
}

0 commit comments

Comments
 (0)