diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/FlussMetaDataApplier.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/FlussMetaDataApplier.java index 5ce51447ef4..2e13b70d95c 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/FlussMetaDataApplier.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/FlussMetaDataApplier.java @@ -58,6 +58,9 @@ public class FlussMetaDataApplier implements MetadataApplier { private Set enabledEventTypes = new HashSet<>(Arrays.asList(CREATE_TABLE, DROP_TABLE)); + private transient Connection connection; + private transient Admin admin; + public FlussMetaDataApplier( Configuration flussClientConfig, Map tableProperties, @@ -89,12 +92,13 @@ public Set getSupportedSchemaEvolutionTypes() { @Override public void applySchemaChange(SchemaChangeEvent schemaChangeEvent) { LOG.info("fluss metadata applier receive schemaChangeEvent {}", schemaChangeEvent); + Admin admin = getAdmin(); if (schemaChangeEvent instanceof CreateTableEvent) { CreateTableEvent createTableEvent = (CreateTableEvent) schemaChangeEvent; - applyCreateTable(createTableEvent); + applyCreateTable(admin, createTableEvent); } else if (schemaChangeEvent instanceof DropTableEvent) { DropTableEvent dropTableEvent = (DropTableEvent) schemaChangeEvent; - applyDropTable(dropTableEvent); + applyDropTable(admin, dropTableEvent); } else { throw new IllegalArgumentException( "fluss metadata applier only support CreateTableEvent now but receives " @@ -102,9 +106,8 @@ public void applySchemaChange(SchemaChangeEvent schemaChangeEvent) { } } - private void applyCreateTable(CreateTableEvent event) { - try (Connection connection = ConnectionFactory.createConnection(flussClientConfig); - Admin admin = connection.getAdmin()) { + private void applyCreateTable(Admin admin, CreateTableEvent event) { + try { TableId tableId = event.tableId(); TablePath tablePath = new TablePath(tableId.getSchemaName(), tableId.getTableName()); String tableIdentifier = tablePath.getDatabaseName() + "." + tablePath.getTableName(); @@ -126,9 +129,8 @@ private void applyCreateTable(CreateTableEvent event) { } } - private void applyDropTable(DropTableEvent event) { - try (Connection connection = ConnectionFactory.createConnection(flussClientConfig); - Admin admin = connection.getAdmin()) { + private void applyDropTable(Admin admin, DropTableEvent event) { + try { TableId tableId = event.tableId(); TablePath tablePath = new TablePath(tableId.getSchemaName(), tableId.getTableName()); admin.dropTable(tablePath, true).get(); @@ -138,6 +140,24 @@ private void applyDropTable(DropTableEvent event) { } } + private Admin getAdmin() { + if (connection == null) { + connection = ConnectionFactory.createConnection(flussClientConfig); + admin = connection.getAdmin(); + } + return admin; + } + + @Override + public void close() throws Exception { + if (admin != null) { + admin.close(); + } + if (connection != null) { + connection.close(); + } + } + private void sanityCheck(TableDescriptor inferredFlussTable, TableInfo currentTableInfo) { List inferredPrimaryKeyColumnNames = inferredFlussTable.getSchema().getPrimaryKeyColumnNames().stream()