Skip to content

Commit 93e5547

Browse files
authored
[FLINK-38520][postgres] Postgres YAML CDC support array with null element (#4254)
1 parent 3a2a3bf commit 93e5547

3 files changed

Lines changed: 91 additions & 13 deletions

File tree

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresFullTypesITCase.java

Lines changed: 62 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -910,10 +910,65 @@ public void testArrayTypesUnsupportedMatrix() throws Exception {
910910
} catch (Exception e) {
911911
Assertions.assertThat(getRootCause(e))
912912
.hasMessage(
913-
"Unable convert multidimensional array value '[null, null]' to a flat array.");
913+
"Unable to convert multidimensional array value '[null, null]' to a flat array.");
914914
}
915915
}
916916

917+
@Test
918+
public void testArrayTypesWithNull() throws Exception {
919+
initializePostgresTable(POSTGIS_CONTAINER, "column_type_test");
920+
921+
PostgresSourceConfigFactory configFactory =
922+
(PostgresSourceConfigFactory)
923+
new PostgresSourceConfigFactory()
924+
.hostname(POSTGIS_CONTAINER.getHost())
925+
.port(POSTGIS_CONTAINER.getMappedPort(POSTGRESQL_PORT))
926+
.username(TEST_USER)
927+
.password(TEST_PASSWORD)
928+
.databaseList(POSTGRES_CONTAINER.getDatabaseName())
929+
.tableList("inventory.array_types_with_null")
930+
.startupOptions(StartupOptions.initial())
931+
.serverTimeZone("UTC");
932+
configFactory.database(POSTGRES_CONTAINER.getDatabaseName());
933+
configFactory.slotName(slotName);
934+
configFactory.decodingPluginName("pgoutput");
935+
936+
FlinkSourceProvider sourceProvider =
937+
(FlinkSourceProvider)
938+
new PostgresDataSource(configFactory).getEventSourceProvider();
939+
940+
CloseableIterator<Event> events =
941+
env.fromSource(
942+
sourceProvider.getSource(),
943+
WatermarkStrategy.noWatermarks(),
944+
PostgresDataSourceFactory.IDENTIFIER,
945+
new EventTypeInfo())
946+
.executeAndCollect();
947+
948+
List<Event> snapshotResults = fetchResultsAndCreateTableEvent(events, 1).f0;
949+
RecordData snapshotRecord = ((DataChangeEvent) snapshotResults.get(0)).after();
950+
951+
Object[] actualSnapshotObjects = recordFields(snapshotRecord, ARRAY_TYPES_WITH_NULL);
952+
953+
Assertions.assertThat(actualSnapshotObjects[0]).isEqualTo(1); // id column
954+
955+
// Test text array with null element: ARRAY['hello', NULL, 'world']
956+
ArrayData actualTextArray = (ArrayData) actualSnapshotObjects[1];
957+
Assertions.assertThat(actualTextArray.size()).isEqualTo(3);
958+
Assertions.assertThat(actualTextArray.getString(0))
959+
.isEqualTo(BinaryStringData.fromString("hello"));
960+
Assertions.assertThat(actualTextArray.isNullAt(1)).isTrue();
961+
Assertions.assertThat(actualTextArray.getString(2))
962+
.isEqualTo(BinaryStringData.fromString("world"));
963+
964+
// Test integer array with null element: ARRAY[1, NULL, 3]
965+
ArrayData actualIntArray = (ArrayData) actualSnapshotObjects[2];
966+
Assertions.assertThat(actualIntArray.size()).isEqualTo(3);
967+
Assertions.assertThat(actualIntArray.getInt(0)).isEqualTo(1);
968+
Assertions.assertThat(actualIntArray.isNullAt(1)).isTrue();
969+
Assertions.assertThat(actualIntArray.getInt(2)).isEqualTo(3);
970+
}
971+
917972
public Throwable getRootCause(Throwable throwable) {
918973
Throwable cause = throwable;
919974
while (cause.getCause() != null && cause.getCause() != cause) {
@@ -1064,6 +1119,12 @@ private Instant toInstant(String ts) {
10641119
DataTypes.ARRAY(DataTypes.INT()),
10651120
DataTypes.ARRAY(DataTypes.INT()));
10661121

1122+
private static final RowType ARRAY_TYPES_WITH_NULL =
1123+
RowType.of(
1124+
DataTypes.INT(),
1125+
DataTypes.ARRAY(DataTypes.STRING()),
1126+
DataTypes.ARRAY(DataTypes.INT()));
1127+
10671128
private static final RowType ARRAY_TYPES_MATRIX =
10681129
RowType.of(DataTypes.INT(), DataTypes.ARRAY(DataTypes.ARRAY(DataTypes.INT())));
10691130
}

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/resources/ddl/column_type_test.sql

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,22 @@ VALUES
173173
'{42}'
174174
);
175175

176+
CREATE TABLE array_types_with_null (
177+
id SERIAL PRIMARY KEY,
178+
text_a1 TEXT[],
179+
int_a1 INTEGER[]
180+
);
181+
182+
ALTER TABLE inventory.array_types_with_null
183+
REPLICA IDENTITY FULL;
184+
185+
INSERT INTO array_types_with_null (id, text_a1, int_a1)
186+
VALUES
187+
(1,
188+
ARRAY['hello', NULL, 'world'],
189+
ARRAY[1, NULL, 3]
190+
);
191+
176192
CREATE TABLE array_types_unsupported_matrix (
177193
id SERIAL PRIMARY KEY,
178194
matrix_a1 INTEGER[][]

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/event/DebeziumEventDeserializationSchema.java

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -504,6 +504,13 @@ protected Object convertToArray(Object dbzObj, Schema schema) throws Exception {
504504
}
505505

506506
Schema elementSchema = schema.valueSchema();
507+
// Multidimensional arrays are not supported
508+
if (elementSchema.type() == Schema.Type.ARRAY) {
509+
throw new IllegalArgumentException(
510+
"Unable to convert multidimensional array value '"
511+
+ dbzObj
512+
+ "' to a flat array.");
513+
}
507514
DataType elementType = schemaDataTypeInference.infer(null, elementSchema);
508515
DeserializationRuntimeConverter elementConverter = getOrCreateConverter(elementType);
509516

@@ -513,13 +520,10 @@ protected Object convertToArray(Object dbzObj, Schema schema) throws Exception {
513520

514521
for (int i = 0; i < list.size(); i++) {
515522
Object element = list.get(i);
516-
if (element != null && elementSchema.type() != Schema.Type.ARRAY) {
517-
array[i] = elementConverter.convert(element, elementSchema);
523+
if (element == null) {
524+
array[i] = null;
518525
} else {
519-
throw new IllegalArgumentException(
520-
"Unable convert multidimensional array value '"
521-
+ dbzObj
522-
+ "' to a flat array.");
526+
array[i] = elementConverter.convert(element, elementSchema);
523527
}
524528
}
525529

@@ -529,13 +533,10 @@ protected Object convertToArray(Object dbzObj, Schema schema) throws Exception {
529533
Object[] convertedArray = new Object[inputArray.length];
530534

531535
for (int i = 0; i < inputArray.length; i++) {
532-
if (inputArray[i] != null && elementSchema.type() != Schema.Type.ARRAY) {
533-
convertedArray[i] = elementConverter.convert(inputArray[i], elementSchema);
536+
if (inputArray[i] == null) {
537+
convertedArray[i] = null;
534538
} else {
535-
throw new IllegalArgumentException(
536-
"Unable convert multidimensional array value '"
537-
+ dbzObj
538-
+ "' to a flat array.");
539+
convertedArray[i] = elementConverter.convert(inputArray[i], elementSchema);
539540
}
540541
}
541542

0 commit comments

Comments
 (0)