Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -910,10 +910,65 @@ public void testArrayTypesUnsupportedMatrix() throws Exception {
} catch (Exception e) {
Assertions.assertThat(getRootCause(e))
.hasMessage(
"Unable convert multidimensional array value '[null, null]' to a flat array.");
"Unable to convert multidimensional array value '[null, null]' to a flat array.");
}
}

@Test
public void testArrayTypesWithNull() throws Exception {
initializePostgresTable(POSTGIS_CONTAINER, "column_type_test");

PostgresSourceConfigFactory configFactory =
(PostgresSourceConfigFactory)
new PostgresSourceConfigFactory()
.hostname(POSTGIS_CONTAINER.getHost())
.port(POSTGIS_CONTAINER.getMappedPort(POSTGRESQL_PORT))
.username(TEST_USER)
.password(TEST_PASSWORD)
.databaseList(POSTGRES_CONTAINER.getDatabaseName())
.tableList("inventory.array_types_with_null")
.startupOptions(StartupOptions.initial())
.serverTimeZone("UTC");
configFactory.database(POSTGRES_CONTAINER.getDatabaseName());
configFactory.slotName(slotName);
configFactory.decodingPluginName("pgoutput");

FlinkSourceProvider sourceProvider =
(FlinkSourceProvider)
new PostgresDataSource(configFactory).getEventSourceProvider();

CloseableIterator<Event> events =
env.fromSource(
sourceProvider.getSource(),
WatermarkStrategy.noWatermarks(),
PostgresDataSourceFactory.IDENTIFIER,
new EventTypeInfo())
.executeAndCollect();

List<Event> snapshotResults = fetchResultsAndCreateTableEvent(events, 1).f0;
RecordData snapshotRecord = ((DataChangeEvent) snapshotResults.get(0)).after();

Object[] actualSnapshotObjects = recordFields(snapshotRecord, ARRAY_TYPES_WITH_NULL);

Assertions.assertThat(actualSnapshotObjects[0]).isEqualTo(1); // id column

// Test text array with null element: ARRAY['hello', NULL, 'world']
ArrayData actualTextArray = (ArrayData) actualSnapshotObjects[1];
Assertions.assertThat(actualTextArray.size()).isEqualTo(3);
Assertions.assertThat(actualTextArray.getString(0))
.isEqualTo(BinaryStringData.fromString("hello"));
Assertions.assertThat(actualTextArray.isNullAt(1)).isTrue();
Assertions.assertThat(actualTextArray.getString(2))
.isEqualTo(BinaryStringData.fromString("world"));

// Test integer array with null element: ARRAY[1, NULL, 3]
ArrayData actualIntArray = (ArrayData) actualSnapshotObjects[2];
Assertions.assertThat(actualIntArray.size()).isEqualTo(3);
Assertions.assertThat(actualIntArray.getInt(0)).isEqualTo(1);
Assertions.assertThat(actualIntArray.isNullAt(1)).isTrue();
Assertions.assertThat(actualIntArray.getInt(2)).isEqualTo(3);
}

public Throwable getRootCause(Throwable throwable) {
Throwable cause = throwable;
while (cause.getCause() != null && cause.getCause() != cause) {
Expand Down Expand Up @@ -1064,6 +1119,12 @@ private Instant toInstant(String ts) {
DataTypes.ARRAY(DataTypes.INT()),
DataTypes.ARRAY(DataTypes.INT()));

private static final RowType ARRAY_TYPES_WITH_NULL =
RowType.of(
DataTypes.INT(),
DataTypes.ARRAY(DataTypes.STRING()),
DataTypes.ARRAY(DataTypes.INT()));

private static final RowType ARRAY_TYPES_MATRIX =
RowType.of(DataTypes.INT(), DataTypes.ARRAY(DataTypes.ARRAY(DataTypes.INT())));
}
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,22 @@ VALUES
'{42}'
);

CREATE TABLE array_types_with_null (
id SERIAL PRIMARY KEY,
text_a1 TEXT[],
int_a1 INTEGER[]
);

ALTER TABLE inventory.array_types_with_null
REPLICA IDENTITY FULL;

INSERT INTO array_types_with_null (id, text_a1, int_a1)
VALUES
(1,
ARRAY['hello', NULL, 'world'],
ARRAY[1, NULL, 3]
);

CREATE TABLE array_types_unsupported_matrix (
id SERIAL PRIMARY KEY,
matrix_a1 INTEGER[][]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,13 @@ protected Object convertToArray(Object dbzObj, Schema schema) throws Exception {
}

Schema elementSchema = schema.valueSchema();
// Multidimensional arrays are not supported
if (elementSchema.type() == Schema.Type.ARRAY) {
throw new IllegalArgumentException(
"Unable to convert multidimensional array value '"
+ dbzObj
+ "' to a flat array.");
}
DataType elementType = schemaDataTypeInference.infer(null, elementSchema);
DeserializationRuntimeConverter elementConverter = getOrCreateConverter(elementType);

Expand All @@ -513,13 +520,10 @@ protected Object convertToArray(Object dbzObj, Schema schema) throws Exception {

for (int i = 0; i < list.size(); i++) {
Object element = list.get(i);
if (element != null && elementSchema.type() != Schema.Type.ARRAY) {
array[i] = elementConverter.convert(element, elementSchema);
if (element == null) {
array[i] = null;
} else {
throw new IllegalArgumentException(
"Unable convert multidimensional array value '"
+ dbzObj
+ "' to a flat array.");
array[i] = elementConverter.convert(element, elementSchema);
}
}

Expand All @@ -529,13 +533,10 @@ protected Object convertToArray(Object dbzObj, Schema schema) throws Exception {
Object[] convertedArray = new Object[inputArray.length];

for (int i = 0; i < inputArray.length; i++) {
if (inputArray[i] != null && elementSchema.type() != Schema.Type.ARRAY) {
convertedArray[i] = elementConverter.convert(inputArray[i], elementSchema);
if (inputArray[i] == null) {
convertedArray[i] = null;
} else {
throw new IllegalArgumentException(
"Unable convert multidimensional array value '"
+ dbzObj
+ "' to a flat array.");
convertedArray[i] = elementConverter.convert(inputArray[i], elementSchema);
}
}

Expand Down
Loading