Skip to content

Commit 43bbaa9

Browse files
committed
[FLINK-38514][postgres] Add support for UUID array type in PostgreSQL CDC connector
1 parent e731d67 commit 43bbaa9

5 files changed

Lines changed: 108 additions & 1 deletion

File tree

docs/content.zh/docs/connectors/flink-sources/postgres-cdc.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -736,6 +736,7 @@ Data Type Mapping
736736
CHARACTER(n)<br>
737737
VARCHAR(n)<br>
738738
CHARACTER VARYING(n)<br>
739+
UUID<br>
739740
TEXT</td>
740741
<td>STRING</td>
741742
</tr>

docs/content/docs/connectors/flink-sources/postgres-cdc.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -740,6 +740,7 @@ Since the order of processing these records cannot be guaranteed, the final valu
740740
CHARACTER(n)<br>
741741
VARCHAR(n)<br>
742742
CHARACTER VARYING(n)<br>
743+
UUID<br>
743744
TEXT</td>
744745
<td>STRING</td>
745746
</tr>

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/utils/PostgresTypeUtils.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ public class PostgresTypeUtils {
6161
private static final String PG_CHARACTER_VARYING = "varchar";
6262
private static final String PG_CHARACTER_VARYING_ARRAY = "_varchar";
6363
private static final String PG_UUID = "uuid";
64+
private static final String PG_UUID_ARRAY = "_uuid";
6465

6566
/** Returns a corresponding Flink data type from a debezium {@link Column}. */
6667
public static DataType fromDbzColumn(Column column) {
@@ -140,6 +141,7 @@ private static DataType convertFromColumn(Column column) {
140141
case PG_UUID:
141142
return DataTypes.STRING();
142143
case PG_TEXT_ARRAY:
144+
case PG_UUID_ARRAY:
143145
return DataTypes.ARRAY(DataTypes.STRING());
144146
case PG_TIMESTAMP:
145147
return DataTypes.TIMESTAMP(scale);

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -968,6 +968,85 @@ void testUpsertMode(boolean parallelismSnapshot) throws Exception {
968968
result.getJobClient().get().cancel().get();
969969
}
970970

971+
@ParameterizedTest
972+
@ValueSource(booleans = {true, false})
973+
void testArrayTypes(boolean parallelismSnapshot) throws Throwable {
974+
setup(parallelismSnapshot);
975+
initializePostgresTable(POSTGIS_CONTAINER, "column_type_test");
976+
977+
String sourceDDL =
978+
String.format(
979+
"CREATE TABLE array_types ("
980+
+ " id INTEGER NOT NULL,"
981+
+ " text_a1 ARRAY<STRING>,"
982+
+ " int_a1 ARRAY<INT>,"
983+
+ " int_s1 ARRAY<INT>,"
984+
+ " uuid_a1 ARRAY<STRING>"
985+
+ ") WITH ("
986+
+ " 'connector' = 'postgres-cdc',"
987+
+ " 'hostname' = '%s',"
988+
+ " 'port' = '%s',"
989+
+ " 'username' = '%s',"
990+
+ " 'password' = '%s',"
991+
+ " 'database-name' = '%s',"
992+
+ " 'schema-name' = '%s',"
993+
+ " 'table-name' = '%s',"
994+
+ " 'scan.incremental.snapshot.enabled' = '%s',"
995+
+ " 'decoding.plugin.name' = 'pgoutput', "
996+
+ " 'slot.name' = '%s'"
997+
+ ")",
998+
POSTGIS_CONTAINER.getHost(),
999+
POSTGIS_CONTAINER.getMappedPort(POSTGRESQL_PORT),
1000+
POSTGIS_CONTAINER.getUsername(),
1001+
POSTGIS_CONTAINER.getPassword(),
1002+
POSTGIS_CONTAINER.getDatabaseName(),
1003+
"inventory",
1004+
"array_types",
1005+
parallelismSnapshot,
1006+
getSlotName());
1007+
1008+
tEnv.executeSql(sourceDDL);
1009+
1010+
// async submit job
1011+
TableResult tableResult = tEnv.executeSql("SELECT * FROM array_types");
1012+
CloseableIterator<Row> iterator = tableResult.collect();
1013+
1014+
// wait for snapshot to complete
1015+
List<Row> snapshotResults = new ArrayList<>();
1016+
while (iterator.hasNext() && snapshotResults.size() < 1) {
1017+
Row row = iterator.next();
1018+
snapshotResults.add(row);
1019+
}
1020+
1021+
// verify snapshot data
1022+
Assertions.assertThat(snapshotResults).hasSize(1);
1023+
Row snapshotRow = snapshotResults.get(0);
1024+
1025+
// verify id
1026+
Assertions.assertThat(snapshotRow.getField(0)).isEqualTo(1);
1027+
1028+
// verify text_a1 array
1029+
String[] textArray = (String[]) snapshotRow.getField(1);
1030+
Assertions.assertThat(textArray).containsExactly("electronics", "gadget", "sale");
1031+
1032+
// verify int_a1 array
1033+
Integer[] intArray1 = (Integer[]) snapshotRow.getField(2);
1034+
Assertions.assertThat(intArray1).containsExactly(85, 90, 78);
1035+
1036+
// verify int_s1 array
1037+
Integer[] intArray2 = (Integer[]) snapshotRow.getField(3);
1038+
Assertions.assertThat(intArray2).containsExactly(42);
1039+
1040+
// verify uuid_a1 array - should be mapped to STRING array
1041+
String[] uuidArray = (String[]) snapshotRow.getField(4);
1042+
Assertions.assertThat(uuidArray).hasSize(2);
1043+
// UUID values should be present (format may vary)
1044+
Assertions.assertThat(uuidArray[0]).isNotEmpty();
1045+
Assertions.assertThat(uuidArray[1]).isNotEmpty();
1046+
1047+
tableResult.getJobClient().get().cancel().get();
1048+
}
1049+
9711050
@ParameterizedTest
9721051
@ValueSource(booleans = {true, false})
9731052
void testUniqueIndexIncludingFunction(boolean parallelismSnapshot) throws Exception {

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/resources/ddl/column_type_test.sql

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,4 +56,28 @@ INSERT INTO inventory.full_types
5656
VALUES (1, '2', 32767, 65535, 2147483647, 5.5, 6.6, 123.12345, 404.4443, true,
5757
'Hello World', 'a', 'abc', 'abcd..xyz', '2020-07-17 18:00:22.123', '2020-07-17 18:00:22.123456',
5858
'2020-07-17', '18:00:22', 500, 'SRID=3187;POINT(174.9479 -36.7208)'::geometry,
59-
'MULTILINESTRING((169.1321 -44.7032, 167.8974 -44.6414))'::geography);
59+
'MULTILINESTRING((169.1321 -44.7032, 167.8974 -44.6414))'::geography);
60+
61+
-- ----------------------------------------------------------------------------------------------------------------
62+
-- TABLE: array_types
63+
-- ----------------------------------------------------------------------------------------------------------------
64+
-- Test table for array types including UUID[]
65+
CREATE TABLE array_types (
66+
id SERIAL PRIMARY KEY,
67+
text_a1 TEXT[],
68+
int_a1 INTEGER[],
69+
int_s1 INTEGER[],
70+
uuid_a1 UUID[]
71+
);
72+
73+
ALTER TABLE inventory.array_types
74+
REPLICA IDENTITY FULL;
75+
76+
INSERT INTO array_types (id, text_a1, int_a1, int_s1, uuid_a1)
77+
VALUES
78+
(1,
79+
ARRAY['electronics', 'gadget', 'sale'],
80+
'{85, 90, 78}',
81+
'{42}',
82+
ARRAY['227496Ad-fdE9-CCFb-1F04-892fc505AFD5', '9d33f9E2-DfC7-fDef-9478-BCc5dBf7a6d7']::UUID[]
83+
);

0 commit comments

Comments
 (0)