Skip to content

Commit 64ff69d

Browse files
committed
[FLINK-39143][pipeline-connecotr][Fluss]Optimize the connection of fluss when use FlussMetaDataApplier (apache#4282)
Co-authored-by: Thorne <syyfffy@163.com>
1 parent 894134c commit 64ff69d

1 file changed

Lines changed: 28 additions & 8 deletions

File tree

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/FlussMetaDataApplier.java

Lines changed: 28 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,9 @@ public class FlussMetaDataApplier implements MetadataApplier {
5858
private Set<SchemaChangeEventType> enabledEventTypes =
5959
new HashSet<>(Arrays.asList(CREATE_TABLE, DROP_TABLE));
6060

61+
private transient Connection connection;
62+
private transient Admin admin;
63+
6164
public FlussMetaDataApplier(
6265
Configuration flussClientConfig,
6366
Map<String, String> tableProperties,
@@ -89,22 +92,22 @@ public Set<SchemaChangeEventType> getSupportedSchemaEvolutionTypes() {
8992
@Override
9093
public void applySchemaChange(SchemaChangeEvent schemaChangeEvent) {
9194
LOG.info("fluss metadata applier receive schemaChangeEvent {}", schemaChangeEvent);
95+
Admin admin = getAdmin();
9296
if (schemaChangeEvent instanceof CreateTableEvent) {
9397
CreateTableEvent createTableEvent = (CreateTableEvent) schemaChangeEvent;
94-
applyCreateTable(createTableEvent);
98+
applyCreateTable(admin, createTableEvent);
9599
} else if (schemaChangeEvent instanceof DropTableEvent) {
96100
DropTableEvent dropTableEvent = (DropTableEvent) schemaChangeEvent;
97-
applyDropTable(dropTableEvent);
101+
applyDropTable(admin, dropTableEvent);
98102
} else {
99103
throw new IllegalArgumentException(
100104
"fluss metadata applier only support CreateTableEvent now but receives "
101105
+ schemaChangeEvent);
102106
}
103107
}
104108

105-
private void applyCreateTable(CreateTableEvent event) {
106-
try (Connection connection = ConnectionFactory.createConnection(flussClientConfig);
107-
Admin admin = connection.getAdmin()) {
109+
private void applyCreateTable(Admin admin, CreateTableEvent event) {
110+
try {
108111
TableId tableId = event.tableId();
109112
TablePath tablePath = new TablePath(tableId.getSchemaName(), tableId.getTableName());
110113
String tableIdentifier = tablePath.getDatabaseName() + "." + tablePath.getTableName();
@@ -126,9 +129,8 @@ private void applyCreateTable(CreateTableEvent event) {
126129
}
127130
}
128131

129-
private void applyDropTable(DropTableEvent event) {
130-
try (Connection connection = ConnectionFactory.createConnection(flussClientConfig);
131-
Admin admin = connection.getAdmin()) {
132+
private void applyDropTable(Admin admin, DropTableEvent event) {
133+
try {
132134
TableId tableId = event.tableId();
133135
TablePath tablePath = new TablePath(tableId.getSchemaName(), tableId.getTableName());
134136
admin.dropTable(tablePath, true).get();
@@ -138,6 +140,24 @@ private void applyDropTable(DropTableEvent event) {
138140
}
139141
}
140142

143+
private Admin getAdmin() {
144+
if (connection == null) {
145+
connection = ConnectionFactory.createConnection(flussClientConfig);
146+
admin = connection.getAdmin();
147+
}
148+
return admin;
149+
}
150+
151+
@Override
152+
public void close() throws Exception {
153+
if (admin != null) {
154+
admin.close();
155+
}
156+
if (connection != null) {
157+
connection.close();
158+
}
159+
}
160+
141161
private void sanityCheck(TableDescriptor inferredFlussTable, TableInfo currentTableInfo) {
142162
List<String> inferredPrimaryKeyColumnNames =
143163
inferredFlussTable.getSchema().getPrimaryKeyColumnNames().stream()

0 commit comments

Comments
 (0)