4747import org .apache .fluss .server .zk .data .TableAssignment ;
4848import org .apache .fluss .server .zk .data .TableRegistration ;
4949import org .apache .fluss .shaded .zookeeper3 .org .apache .zookeeper .KeeperException ;
50+ import org .apache .fluss .utils .concurrent .Executors ;
5051import org .apache .fluss .utils .function .RunnableWithException ;
5152import org .apache .fluss .utils .function .ThrowingRunnable ;
5253
6061import java .util .Optional ;
6162import java .util .Set ;
6263import java .util .concurrent .Callable ;
64+ import java .util .concurrent .Executor ;
6365
6466import static org .apache .fluss .server .utils .TableDescriptorValidation .validateTableDescriptor ;
6567
@@ -73,17 +75,26 @@ public class MetadataManager {
7375 private final int maxPartitionNum ;
7476 private final int maxBucketNum ;
7577
78+ private final Executor ioExecutor ;
79+
80+ public MetadataManager (ZooKeeperClient zookeeperClient , Configuration conf ) {
81+ this (zookeeperClient , conf , Executors .directExecutor ());
82+ }
83+
7684 /**
7785 * Creates a new metadata manager.
7886 *
7987 * @param zookeeperClient the zookeeper client
8088 * @param conf the cluster configuration
89+ * @param ioExecutor the io executor
8190 */
82- public MetadataManager (ZooKeeperClient zookeeperClient , Configuration conf ) {
91+ public MetadataManager (
92+ ZooKeeperClient zookeeperClient , Configuration conf , Executor ioExecutor ) {
8393 this .zookeeperClient = zookeeperClient ;
8494 this .defaultTableLakeOptions = LakeStorageUtils .generateDefaultTableLakeOptions (conf );
8595 this .maxPartitionNum = conf .get (ConfigOptions .MAX_PARTITION_NUM );
8696 this .maxBucketNum = conf .get (ConfigOptions .MAX_BUCKET_NUM );
97+ this .ioExecutor = ioExecutor ;
8798 }
8899
89100 public void createDatabase (
@@ -232,13 +243,18 @@ public void completeDeleteTable(long tableId) {
232243 String .format ("Delete tablet assignment meta fail for table %s." , tableId ));
233244 }
234245
235- public void completeDeletePartition (long partitionId ) {
246+ public void completeDeletePartition (TablePartition tablePartition ) {
236247 // final step for delete a partition.
237248 // delete partition assignments node, which will also delete the bucket state node,
238249 // so that all the zk nodes related to this partition are deleted.
239- rethrowIfIsNotNoNodeException (
240- () -> zookeeperClient .deletePartitionAssignment (partitionId ),
241- String .format ("Delete tablet assignment meta fail for partition %s." , partitionId ));
250+ ioExecutor .execute (
251+ () -> {
252+ try {
253+ zookeeperClient .deletePartitionAssignment (tablePartition .getPartitionId ());
254+ } catch (Exception e ) {
255+ LOG .error ("Fail to complete partition {} deletion." , tablePartition , e );
256+ }
257+ });
242258 }
243259
244260 /**
0 commit comments