diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresFullTypesITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresFullTypesITCase.java index 0dac3c153e8..22bc89ce0b0 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresFullTypesITCase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresFullTypesITCase.java @@ -910,10 +910,65 @@ public void testArrayTypesUnsupportedMatrix() throws Exception { } catch (Exception e) { Assertions.assertThat(getRootCause(e)) .hasMessage( - "Unable convert multidimensional array value '[null, null]' to a flat array."); + "Unable to convert multidimensional array value '[null, null]' to a flat array."); } } + @Test + public void testArrayTypesWithNull() throws Exception { + initializePostgresTable(POSTGIS_CONTAINER, "column_type_test"); + + PostgresSourceConfigFactory configFactory = + (PostgresSourceConfigFactory) + new PostgresSourceConfigFactory() + .hostname(POSTGIS_CONTAINER.getHost()) + .port(POSTGIS_CONTAINER.getMappedPort(POSTGRESQL_PORT)) + .username(TEST_USER) + .password(TEST_PASSWORD) + .databaseList(POSTGRES_CONTAINER.getDatabaseName()) + .tableList("inventory.array_types_with_null") + .startupOptions(StartupOptions.initial()) + .serverTimeZone("UTC"); + configFactory.database(POSTGRES_CONTAINER.getDatabaseName()); + configFactory.slotName(slotName); + configFactory.decodingPluginName("pgoutput"); + + FlinkSourceProvider sourceProvider = + (FlinkSourceProvider) + new PostgresDataSource(configFactory).getEventSourceProvider(); + + CloseableIterator events = + env.fromSource( + sourceProvider.getSource(), + WatermarkStrategy.noWatermarks(), + PostgresDataSourceFactory.IDENTIFIER, + new EventTypeInfo()) + .executeAndCollect(); + + List snapshotResults = fetchResultsAndCreateTableEvent(events, 1).f0; + RecordData snapshotRecord = ((DataChangeEvent) snapshotResults.get(0)).after(); + + Object[] actualSnapshotObjects = recordFields(snapshotRecord, ARRAY_TYPES_WITH_NULL); + + Assertions.assertThat(actualSnapshotObjects[0]).isEqualTo(1); // id column + + // Test text array with null element: ARRAY['hello', NULL, 'world'] + ArrayData actualTextArray = (ArrayData) actualSnapshotObjects[1]; + Assertions.assertThat(actualTextArray.size()).isEqualTo(3); + Assertions.assertThat(actualTextArray.getString(0)) + .isEqualTo(BinaryStringData.fromString("hello")); + Assertions.assertThat(actualTextArray.isNullAt(1)).isTrue(); + Assertions.assertThat(actualTextArray.getString(2)) + .isEqualTo(BinaryStringData.fromString("world")); + + // Test integer array with null element: ARRAY[1, NULL, 3] + ArrayData actualIntArray = (ArrayData) actualSnapshotObjects[2]; + Assertions.assertThat(actualIntArray.size()).isEqualTo(3); + Assertions.assertThat(actualIntArray.getInt(0)).isEqualTo(1); + Assertions.assertThat(actualIntArray.isNullAt(1)).isTrue(); + Assertions.assertThat(actualIntArray.getInt(2)).isEqualTo(3); + } + public Throwable getRootCause(Throwable throwable) { Throwable cause = throwable; while (cause.getCause() != null && cause.getCause() != cause) { @@ -1064,6 +1119,12 @@ private Instant toInstant(String ts) { DataTypes.ARRAY(DataTypes.INT()), DataTypes.ARRAY(DataTypes.INT())); + private static final RowType ARRAY_TYPES_WITH_NULL = + RowType.of( + DataTypes.INT(), + DataTypes.ARRAY(DataTypes.STRING()), + DataTypes.ARRAY(DataTypes.INT())); + private static final RowType ARRAY_TYPES_MATRIX = RowType.of(DataTypes.INT(), DataTypes.ARRAY(DataTypes.ARRAY(DataTypes.INT()))); } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/resources/ddl/column_type_test.sql b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/resources/ddl/column_type_test.sql index 81db1f24880..a78561ef532 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/resources/ddl/column_type_test.sql +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/resources/ddl/column_type_test.sql @@ -173,6 +173,22 @@ VALUES '{42}' ); +CREATE TABLE array_types_with_null ( + id SERIAL PRIMARY KEY, + text_a1 TEXT[], + int_a1 INTEGER[] +); + +ALTER TABLE inventory.array_types_with_null + REPLICA IDENTITY FULL; + +INSERT INTO array_types_with_null (id, text_a1, int_a1) +VALUES + (1, + ARRAY['hello', NULL, 'world'], + ARRAY[1, NULL, 3] + ); + CREATE TABLE array_types_unsupported_matrix ( id SERIAL PRIMARY KEY, matrix_a1 INTEGER[][] diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/event/DebeziumEventDeserializationSchema.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/event/DebeziumEventDeserializationSchema.java index c484709ff58..a1eb76bcc7e 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/event/DebeziumEventDeserializationSchema.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/event/DebeziumEventDeserializationSchema.java @@ -504,6 +504,13 @@ protected Object convertToArray(Object dbzObj, Schema schema) throws Exception { } Schema elementSchema = schema.valueSchema(); + // Multidimensional arrays are not supported + if (elementSchema.type() == Schema.Type.ARRAY) { + throw new IllegalArgumentException( + "Unable to convert multidimensional array value '" + + dbzObj + + "' to a flat array."); + } DataType elementType = schemaDataTypeInference.infer(null, elementSchema); DeserializationRuntimeConverter elementConverter = getOrCreateConverter(elementType); @@ -513,13 +520,10 @@ protected Object convertToArray(Object dbzObj, Schema schema) throws Exception { for (int i = 0; i < list.size(); i++) { Object element = list.get(i); - if (element != null && elementSchema.type() != Schema.Type.ARRAY) { - array[i] = elementConverter.convert(element, elementSchema); + if (element == null) { + array[i] = null; } else { - throw new IllegalArgumentException( - "Unable convert multidimensional array value '" - + dbzObj - + "' to a flat array."); + array[i] = elementConverter.convert(element, elementSchema); } } @@ -529,13 +533,10 @@ protected Object convertToArray(Object dbzObj, Schema schema) throws Exception { Object[] convertedArray = new Object[inputArray.length]; for (int i = 0; i < inputArray.length; i++) { - if (inputArray[i] != null && elementSchema.type() != Schema.Type.ARRAY) { - convertedArray[i] = elementConverter.convert(inputArray[i], elementSchema); + if (inputArray[i] == null) { + convertedArray[i] = null; } else { - throw new IllegalArgumentException( - "Unable convert multidimensional array value '" - + dbzObj - + "' to a flat array."); + convertedArray[i] = elementConverter.convert(inputArray[i], elementSchema); } }