Skip to content

Commit e96cee4

Browse files
committed
Optimize the connection of fluss when use FlussMetaDataApplier
1 parent 2f59f35 commit e96cee4

1 file changed

Lines changed: 25 additions & 4 deletions

File tree

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/FlussMetaDataApplier.java

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,9 @@ public class FlussMetaDataApplier implements MetadataApplier {
5858
private Set<SchemaChangeEventType> enabledEventTypes =
5959
new HashSet<>(Arrays.asList(CREATE_TABLE, DROP_TABLE));
6060

61+
private transient Connection connection;
62+
private transient Admin admin;
63+
6164
public FlussMetaDataApplier(
6265
Configuration flussClientConfig,
6366
Map<String, String> tableProperties,
@@ -103,8 +106,8 @@ public void applySchemaChange(SchemaChangeEvent schemaChangeEvent) {
103106
}
104107

105108
private void applyCreateTable(CreateTableEvent event) {
106-
try (Connection connection = ConnectionFactory.createConnection(flussClientConfig);
107-
Admin admin = connection.getAdmin()) {
109+
try {
110+
Admin admin = getAdmin();
108111
TableId tableId = event.tableId();
109112
TablePath tablePath = new TablePath(tableId.getSchemaName(), tableId.getTableName());
110113
String tableIdentifier = tablePath.getDatabaseName() + "." + tablePath.getTableName();
@@ -127,8 +130,8 @@ private void applyCreateTable(CreateTableEvent event) {
127130
}
128131

129132
private void applyDropTable(DropTableEvent event) {
130-
try (Connection connection = ConnectionFactory.createConnection(flussClientConfig);
131-
Admin admin = connection.getAdmin()) {
133+
try {
134+
Admin admin = getAdmin();
132135
TableId tableId = event.tableId();
133136
TablePath tablePath = new TablePath(tableId.getSchemaName(), tableId.getTableName());
134137
admin.dropTable(tablePath, true).get();
@@ -138,6 +141,24 @@ private void applyDropTable(DropTableEvent event) {
138141
}
139142
}
140143

144+
private Admin getAdmin() {
145+
if (connection == null) {
146+
connection = ConnectionFactory.createConnection(flussClientConfig);
147+
admin = connection.getAdmin();
148+
}
149+
return admin;
150+
}
151+
152+
@Override
153+
public void close() throws Exception {
154+
if (admin != null) {
155+
admin.close();
156+
}
157+
if (connection != null) {
158+
connection.close();
159+
}
160+
}
161+
141162
private void sanityCheck(TableDescriptor inferredFlussTable, TableInfo currentTableInfo) {
142163
List<String> inferredPrimaryKeyColumnNames =
143164
inferredFlussTable.getSchema().getPrimaryKeyColumnNames().stream()

0 commit comments

Comments
 (0)