Skip to content

Commit dd3be77

Browse files
committed
WIP
1 parent f7f324c commit dd3be77

File tree

4 files changed

+13
-25
lines changed

4 files changed

+13
-25
lines changed

fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ public CoordinatorEventProcessor(
168168
coordinatorEventManager,
169169
coordinatorContext),
170170
zooKeeperClient);
171-
this.metadataManager = new MetadataManager(zooKeeperClient, conf, ioExecutor);
171+
this.metadataManager = new MetadataManager(zooKeeperClient, conf);
172172

173173
this.tableManager =
174174
new TableManager(

fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,7 @@ protected void startServices() throws Exception {
183183

184184
this.lakeTableTieringManager = new LakeTableTieringManager();
185185

186-
MetadataManager metadataManager = new MetadataManager(zkClient, conf, ioExecutor);
186+
MetadataManager metadataManager = new MetadataManager(zkClient, conf);
187187
this.coordinatorService =
188188
new CoordinatorService(
189189
conf,
@@ -321,7 +321,7 @@ private void registerCoordinatorLeader() throws Exception {
321321
}
322322

323323
private void createDefaultDatabase() {
324-
MetadataManager metadataManager = new MetadataManager(zkClient, conf, ioExecutor);
324+
MetadataManager metadataManager = new MetadataManager(zkClient, conf);
325325
List<String> databases = metadataManager.listDatabases();
326326
if (databases.isEmpty()) {
327327
metadataManager.createDatabase(DEFAULT_DATABASE, DatabaseDescriptor.EMPTY, true);

fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java

Lines changed: 5 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@
4747
import org.apache.fluss.server.zk.data.TableAssignment;
4848
import org.apache.fluss.server.zk.data.TableRegistration;
4949
import org.apache.fluss.shaded.zookeeper3.org.apache.zookeeper.KeeperException;
50-
import org.apache.fluss.utils.concurrent.Executors;
5150
import org.apache.fluss.utils.function.RunnableWithException;
5251
import org.apache.fluss.utils.function.ThrowingRunnable;
5352

@@ -61,7 +60,6 @@
6160
import java.util.Optional;
6261
import java.util.Set;
6362
import java.util.concurrent.Callable;
64-
import java.util.concurrent.Executor;
6563

6664
import static org.apache.fluss.server.utils.TableDescriptorValidation.validateTableDescriptor;
6765

@@ -75,26 +73,17 @@ public class MetadataManager {
7573
private final int maxPartitionNum;
7674
private final int maxBucketNum;
7775

78-
private final Executor ioExecutor;
79-
80-
public MetadataManager(ZooKeeperClient zookeeperClient, Configuration conf) {
81-
this(zookeeperClient, conf, Executors.directExecutor());
82-
}
83-
8476
/**
8577
* Creates a new metadata manager.
8678
*
8779
* @param zookeeperClient the zookeeper client
8880
* @param conf the cluster configuration
89-
* @param ioExecutor the io executor
9081
*/
91-
public MetadataManager(
92-
ZooKeeperClient zookeeperClient, Configuration conf, Executor ioExecutor) {
82+
public MetadataManager(ZooKeeperClient zookeeperClient, Configuration conf) {
9383
this.zookeeperClient = zookeeperClient;
9484
this.defaultTableLakeOptions = LakeStorageUtils.generateDefaultTableLakeOptions(conf);
9585
this.maxPartitionNum = conf.get(ConfigOptions.MAX_PARTITION_NUM);
9686
this.maxBucketNum = conf.get(ConfigOptions.MAX_BUCKET_NUM);
97-
this.ioExecutor = ioExecutor;
9887
}
9988

10089
public void createDatabase(
@@ -243,18 +232,13 @@ public void completeDeleteTable(long tableId) {
243232
String.format("Delete tablet assignment meta fail for table %s.", tableId));
244233
}
245234

246-
public void completeDeletePartition(TablePartition tablePartition) {
235+
public void completeDeletePartition(long partitionId) {
247236
// final step for delete a partition.
248237
// delete partition assignments node, which will also delete the bucket state node,
249238
// so that all the zk nodes related to this partition are deleted.
250-
ioExecutor.execute(
251-
() -> {
252-
try {
253-
zookeeperClient.deletePartitionAssignment(tablePartition.getPartitionId());
254-
} catch (Exception e) {
255-
LOG.error("Fail to complete partition {} deletion.", tablePartition, e);
256-
}
257-
});
239+
rethrowIfIsNotNoNodeException(
240+
() -> zookeeperClient.deletePartitionAssignment(partitionId),
241+
String.format("Delete tablet assignment meta fail for partition %s.", partitionId));
258242
}
259243

260244
/**

fluss-server/src/main/java/org/apache/fluss/server/coordinator/TableManager.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -265,7 +265,11 @@ private void completeDeletePartition(TablePartition tablePartition) {
265265
tablePartition.getTableId(), tablePartition.getPartitionId());
266266
replicaStateMachine.handleStateChanges(replicas, ReplicaState.NonExistentReplica);
267267
deleteRemoteDirectory(tablePartition);
268-
metadataManager.completeDeletePartition(tablePartition);
268+
try {
269+
metadataManager.completeDeletePartition(tablePartition.getPartitionId());
270+
} catch (Exception e) {
271+
LOG.error("Fail to complete partition {} deletion.", tablePartition, e);
272+
}
269273
coordinatorContext.removePartition(tablePartition);
270274
}
271275

0 commit comments

Comments
 (0)