Skip to content

Commit 57ced54

Browse files
committed
[server] Use async mode for delete partition assignment in zk
1 parent 08758e9 commit 57ced54

File tree

5 files changed

+28
-14
lines changed

5 files changed

+28
-14
lines changed

fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ public CoordinatorEventProcessor(
168168
coordinatorEventManager,
169169
coordinatorContext),
170170
zooKeeperClient);
171-
this.metadataManager = new MetadataManager(zooKeeperClient, conf);
171+
this.metadataManager = new MetadataManager(zooKeeperClient, conf, ioExecutor);
172172

173173
this.tableManager =
174174
new TableManager(

fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,7 @@ protected void startServices() throws Exception {
183183

184184
this.lakeTableTieringManager = new LakeTableTieringManager();
185185

186-
MetadataManager metadataManager = new MetadataManager(zkClient, conf);
186+
MetadataManager metadataManager = new MetadataManager(zkClient, conf, ioExecutor);
187187
this.coordinatorService =
188188
new CoordinatorService(
189189
conf,
@@ -321,7 +321,7 @@ private void registerCoordinatorLeader() throws Exception {
321321
}
322322

323323
private void createDefaultDatabase() {
324-
MetadataManager metadataManager = new MetadataManager(zkClient, conf);
324+
MetadataManager metadataManager = new MetadataManager(zkClient, conf, ioExecutor);
325325
List<String> databases = metadataManager.listDatabases();
326326
if (databases.isEmpty()) {
327327
metadataManager.createDatabase(DEFAULT_DATABASE, DatabaseDescriptor.EMPTY, true);

fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import org.apache.fluss.server.zk.data.TableAssignment;
4848
import org.apache.fluss.server.zk.data.TableRegistration;
4949
import org.apache.fluss.shaded.zookeeper3.org.apache.zookeeper.KeeperException;
50+
import org.apache.fluss.utils.concurrent.Executors;
5051
import org.apache.fluss.utils.function.RunnableWithException;
5152
import org.apache.fluss.utils.function.ThrowingRunnable;
5253

@@ -60,6 +61,7 @@
6061
import java.util.Optional;
6162
import java.util.Set;
6263
import java.util.concurrent.Callable;
64+
import java.util.concurrent.Executor;
6365

6466
import static org.apache.fluss.server.utils.TableDescriptorValidation.validateTableDescriptor;
6567

@@ -73,17 +75,26 @@ public class MetadataManager {
7375
private final int maxPartitionNum;
7476
private final int maxBucketNum;
7577

78+
private final Executor ioExecutor;
79+
80+
public MetadataManager(ZooKeeperClient zookeeperClient, Configuration conf) {
81+
this(zookeeperClient, conf, Executors.directExecutor());
82+
}
83+
7684
/**
7785
* Creates a new metadata manager.
7886
*
7987
* @param zookeeperClient the zookeeper client
8088
* @param conf the cluster configuration
89+
* @param ioExecutor the io executor
8190
*/
82-
public MetadataManager(ZooKeeperClient zookeeperClient, Configuration conf) {
91+
public MetadataManager(
92+
ZooKeeperClient zookeeperClient, Configuration conf, Executor ioExecutor) {
8393
this.zookeeperClient = zookeeperClient;
8494
this.defaultTableLakeOptions = LakeStorageUtils.generateDefaultTableLakeOptions(conf);
8595
this.maxPartitionNum = conf.get(ConfigOptions.MAX_PARTITION_NUM);
8696
this.maxBucketNum = conf.get(ConfigOptions.MAX_BUCKET_NUM);
97+
this.ioExecutor = ioExecutor;
8798
}
8899

89100
public void createDatabase(
@@ -232,13 +243,18 @@ public void completeDeleteTable(long tableId) {
232243
String.format("Delete tablet assignment meta fail for table %s.", tableId));
233244
}
234245

235-
public void completeDeletePartition(long partitionId) {
246+
public void completeDeletePartition(TablePartition tablePartition) {
236247
// final step for delete a partition.
237248
// delete partition assignments node, which will also delete the bucket state node,
238249
// so that all the zk nodes related to this partition are deleted.
239-
rethrowIfIsNotNoNodeException(
240-
() -> zookeeperClient.deletePartitionAssignment(partitionId),
241-
String.format("Delete tablet assignment meta fail for partition %s.", partitionId));
250+
ioExecutor.execute(
251+
() -> {
252+
try {
253+
zookeeperClient.deletePartitionAssignment(tablePartition.getPartitionId());
254+
} catch (Exception e) {
255+
LOG.error("Fail to complete partition {} deletion.", tablePartition, e);
256+
}
257+
});
242258
}
243259

244260
/**

fluss-server/src/main/java/org/apache/fluss/server/coordinator/TableManager.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -265,11 +265,7 @@ private void completeDeletePartition(TablePartition tablePartition) {
265265
tablePartition.getTableId(), tablePartition.getPartitionId());
266266
replicaStateMachine.handleStateChanges(replicas, ReplicaState.NonExistentReplica);
267267
deleteRemoteDirectory(tablePartition);
268-
try {
269-
metadataManager.completeDeletePartition(tablePartition.getPartitionId());
270-
} catch (Exception e) {
271-
LOG.error("Fail to complete partition {} deletion.", tablePartition, e);
272-
}
268+
metadataManager.completeDeletePartition(tablePartition);
273269
coordinatorContext.removePartition(tablePartition);
274270
}
275271

fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
import org.apache.fluss.utils.ExceptionUtils;
5151
import org.apache.fluss.utils.clock.Clock;
5252
import org.apache.fluss.utils.clock.SystemClock;
53+
import org.apache.fluss.utils.concurrent.Executors;
5354
import org.apache.fluss.utils.concurrent.FlussScheduler;
5455
import org.apache.fluss.utils.concurrent.FutureUtils;
5556
import org.apache.fluss.utils.concurrent.Scheduler;
@@ -183,7 +184,8 @@ protected void startServices() throws Exception {
183184

184185
this.zkClient = ZooKeeperUtils.startZookeeperClient(conf, this);
185186

186-
MetadataManager metadataManager = new MetadataManager(zkClient, conf);
187+
MetadataManager metadataManager =
188+
new MetadataManager(zkClient, conf, Executors.directExecutor());
187189
this.metadataCache = new TabletServerMetadataCache(metadataManager, zkClient);
188190

189191
this.scheduler = new FlussScheduler(conf.get(BACKGROUND_THREADS));

0 commit comments

Comments
 (0)