@@ -58,6 +58,9 @@ public class FlussMetaDataApplier implements MetadataApplier {
5858 private Set <SchemaChangeEventType > enabledEventTypes =
5959 new HashSet <>(Arrays .asList (CREATE_TABLE , DROP_TABLE ));
6060
61+ private transient Connection connection ;
62+ private transient Admin admin ;
63+
6164 public FlussMetaDataApplier (
6265 Configuration flussClientConfig ,
6366 Map <String , String > tableProperties ,
@@ -89,22 +92,22 @@ public Set<SchemaChangeEventType> getSupportedSchemaEvolutionTypes() {
8992 @ Override
9093 public void applySchemaChange (SchemaChangeEvent schemaChangeEvent ) {
9194 LOG .info ("fluss metadata applier receive schemaChangeEvent {}" , schemaChangeEvent );
95+ Admin admin = getAdmin ();
9296 if (schemaChangeEvent instanceof CreateTableEvent ) {
9397 CreateTableEvent createTableEvent = (CreateTableEvent ) schemaChangeEvent ;
94- applyCreateTable (createTableEvent );
98+ applyCreateTable (admin , createTableEvent );
9599 } else if (schemaChangeEvent instanceof DropTableEvent ) {
96100 DropTableEvent dropTableEvent = (DropTableEvent ) schemaChangeEvent ;
97- applyDropTable (dropTableEvent );
101+ applyDropTable (admin , dropTableEvent );
98102 } else {
99103 throw new IllegalArgumentException (
100104 "fluss metadata applier only support CreateTableEvent now but receives "
101105 + schemaChangeEvent );
102106 }
103107 }
104108
105- private void applyCreateTable (CreateTableEvent event ) {
106- try (Connection connection = ConnectionFactory .createConnection (flussClientConfig );
107- Admin admin = connection .getAdmin ()) {
109+ private void applyCreateTable (Admin admin , CreateTableEvent event ) {
110+ try {
108111 TableId tableId = event .tableId ();
109112 TablePath tablePath = new TablePath (tableId .getSchemaName (), tableId .getTableName ());
110113 String tableIdentifier = tablePath .getDatabaseName () + "." + tablePath .getTableName ();
@@ -126,9 +129,8 @@ private void applyCreateTable(CreateTableEvent event) {
126129 }
127130 }
128131
129- private void applyDropTable (DropTableEvent event ) {
130- try (Connection connection = ConnectionFactory .createConnection (flussClientConfig );
131- Admin admin = connection .getAdmin ()) {
132+ private void applyDropTable (Admin admin , DropTableEvent event ) {
133+ try {
132134 TableId tableId = event .tableId ();
133135 TablePath tablePath = new TablePath (tableId .getSchemaName (), tableId .getTableName ());
134136 admin .dropTable (tablePath , true ).get ();
@@ -138,6 +140,24 @@ private void applyDropTable(DropTableEvent event) {
138140 }
139141 }
140142
143+ private Admin getAdmin () {
144+ if (connection == null ) {
145+ connection = ConnectionFactory .createConnection (flussClientConfig );
146+ admin = connection .getAdmin ();
147+ }
148+ return admin ;
149+ }
150+
151+ @ Override
152+ public void close () throws Exception {
153+ if (admin != null ) {
154+ admin .close ();
155+ }
156+ if (connection != null ) {
157+ connection .close ();
158+ }
159+ }
160+
141161 private void sanityCheck (TableDescriptor inferredFlussTable , TableInfo currentTableInfo ) {
142162 List <String > inferredPrimaryKeyColumnNames =
143163 inferredFlussTable .getSchema ().getPrimaryKeyColumnNames ().stream ()
0 commit comments