diff --git a/flink-doris-connector/flink-doris-connector-base/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresType.java b/flink-doris-connector/flink-doris-connector-base/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresType.java index aff073821..bb602cb8a 100644 --- a/flink-doris-connector/flink-doris-connector-base/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresType.java +++ b/flink-doris-connector/flink-doris-connector-base/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresType.java @@ -55,6 +55,7 @@ public class PostgresType { private static final String VARBIT = "varbit"; private static final String UUID = "uuid"; private static final String BYTEA = "bytea"; + private static final String REGCLASS = "regclass"; private static final String JSON = "json"; private static final String JSONB = "jsonb"; private static final String _INT2 = "_int2"; @@ -127,6 +128,7 @@ public static String toDorisType(String postgresType, Integer precision, Integer case VARBIT: case UUID: case BYTEA: + case REGCLASS: return DorisType.STRING; case JSON: case JSONB: diff --git a/flink-doris-connector/flink-doris-connector-base/src/test/java/org/apache/doris/flink/sink/schema/SQLParserSchemaManagerTest.java b/flink-doris-connector/flink-doris-connector-base/src/test/java/org/apache/doris/flink/sink/schema/SQLParserSchemaManagerTest.java index c7b23c18e..23eca8048 100644 --- a/flink-doris-connector/flink-doris-connector-base/src/test/java/org/apache/doris/flink/sink/schema/SQLParserSchemaManagerTest.java +++ b/flink-doris-connector/flink-doris-connector-base/src/test/java/org/apache/doris/flink/sink/schema/SQLParserSchemaManagerTest.java @@ -80,6 +80,17 @@ public void testParserAlterDDLsAdd() { } } + @Test + public void testParserPostgresAlterDDLsAddRegclass() { + String ddl = "ALTER TABLE access_control_device ADD COLUMN relation_id regclass"; + List actualDDLs = + schemaManager.parseAlterDDLs(SourceConnector.POSTGRES, ddl, dorisTable); + + Assert.assertEquals(1, actualDDLs.size()); + Assert.assertEquals( + "ALTER TABLE `doris`.`tab` ADD COLUMN `relation_id` STRING", actualDDLs.get(0)); + } + @Test public void testParserAlterDDLsChange() { List expectDDLs = new ArrayList<>(); diff --git a/flink-doris-connector/flink-doris-connector-base/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java b/flink-doris-connector/flink-doris-connector-base/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java index fc3f6ffbd..a5a081be9 100644 --- a/flink-doris-connector/flink-doris-connector-base/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java +++ b/flink-doris-connector/flink-doris-connector-base/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java @@ -386,6 +386,12 @@ public void testBuildPostgres2DorisTypeName() throws IOException { JsonNode columns = objectMapper.readTree(columnInfo); String dorisTypeName = schemaChange.buildDorisTypeName(columns); Assert.assertEquals(dorisTypeName, "VARCHAR(384)"); + + columnInfo = + "{\"name\":\"RELATION_ID\",\"jdbcType\":1111,\"nativeType\":null,\"typeName\":\"regclass\",\"typeExpression\":\"regclass\",\"charsetName\":null,\"position\":3,\"optional\":true,\"autoIncremented\":false,\"generated\":false,\"comment\":null}"; + columns = objectMapper.readTree(columnInfo); + dorisTypeName = schemaChange.buildDorisTypeName(columns); + Assert.assertEquals(dorisTypeName, "STRING"); } @Test diff --git a/flink-doris-connector/flink-doris-connector-base/src/test/java/org/apache/doris/flink/tools/cdc/postgres/PostgresTypeTest.java b/flink-doris-connector/flink-doris-connector-base/src/test/java/org/apache/doris/flink/tools/cdc/postgres/PostgresTypeTest.java new file mode 100644 index 000000000..dec2263ab --- /dev/null +++ b/flink-doris-connector/flink-doris-connector-base/src/test/java/org/apache/doris/flink/tools/cdc/postgres/PostgresTypeTest.java @@ -0,0 +1,31 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.flink.tools.cdc.postgres; + +import org.apache.doris.flink.catalog.doris.DorisType; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class PostgresTypeTest { + @Test + public void regclassTypeTest() { + assertEquals(DorisType.STRING, PostgresType.toDorisType("regclass", null, null)); + assertEquals(DorisType.STRING, PostgresType.toDorisType("REGCLASS", null, null)); + } +}