Skip to content

Commit 98e766c

Browse files
committed
WIP
1 parent dd3be77 commit 98e766c

File tree

4 files changed

+44
-21
lines changed

4 files changed

+44
-21
lines changed

fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,8 @@ public CoordinatorEventProcessor(
176176
coordinatorContext,
177177
replicaStateMachine,
178178
tableBucketStateMachine,
179-
new RemoteStorageCleaner(conf, ioExecutor));
179+
new RemoteStorageCleaner(conf, ioExecutor),
180+
ioExecutor);
180181
this.tableChangeWatcher = new TableChangeWatcher(zooKeeperClient, coordinatorEventManager);
181182
this.tabletServerChangeWatcher =
182183
new TabletServerChangeWatcher(zooKeeperClient, coordinatorEventManager);

fluss-server/src/main/java/org/apache/fluss/server/coordinator/RemoteStorageCleaner.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,14 +57,14 @@ public RemoteStorageCleaner(Configuration configuration, ExecutorService ioExecu
5757
}
5858
}
5959

60-
public void deleteTableRemoteDir(TablePath tablePath, boolean isKvTable, long tableId) {
60+
public void asyncDeleteTableRemoteDir(TablePath tablePath, boolean isKvTable, long tableId) {
6161
if (isKvTable) {
6262
asyncDeleteDir(FlussPaths.remoteTableDir(remoteKvDir, tablePath, tableId));
6363
}
6464
asyncDeleteDir(FlussPaths.remoteTableDir(remoteLogDir, tablePath, tableId));
6565
}
6666

67-
public void deletePartitionRemoteDir(
67+
public void asyncDeletePartitionRemoteDir(
6868
PhysicalTablePath physicalTablePath, boolean isKvTable, TablePartition tablePartition) {
6969
if (isKvTable) {
7070
asyncDeleteDir(

fluss-server/src/main/java/org/apache/fluss/server/coordinator/TableManager.java

Lines changed: 38 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import java.util.List;
3939
import java.util.Map;
4040
import java.util.Set;
41+
import java.util.concurrent.ExecutorService;
4142

4243
/** A manager for tables. */
4344
public class TableManager {
@@ -48,18 +49,21 @@ public class TableManager {
4849
private final CoordinatorContext coordinatorContext;
4950
private final ReplicaStateMachine replicaStateMachine;
5051
private final TableBucketStateMachine tableBucketStateMachine;
52+
private final ExecutorService ioExecutor;
5153

5254
public TableManager(
5355
MetadataManager metadataManager,
5456
CoordinatorContext coordinatorContext,
5557
ReplicaStateMachine replicaStateMachine,
5658
TableBucketStateMachine tableBucketStateMachine,
57-
RemoteStorageCleaner remoteStorageCleaner) {
59+
RemoteStorageCleaner remoteStorageCleaner,
60+
ExecutorService ioExecutor) {
5861
this.metadataManager = metadataManager;
5962
this.remoteStorageCleaner = remoteStorageCleaner;
6063
this.coordinatorContext = coordinatorContext;
6164
this.replicaStateMachine = replicaStateMachine;
6265
this.tableBucketStateMachine = tableBucketStateMachine;
66+
this.ioExecutor = ioExecutor;
6367
}
6468

6569
public void startup() {
@@ -250,12 +254,8 @@ private void resumePartitionDeletions() {
250254
private void completeDeleteTable(long tableId) {
251255
Set<TableBucketReplica> replicas = coordinatorContext.getAllReplicasForTable(tableId);
252256
replicaStateMachine.handleStateChanges(replicas, ReplicaState.NonExistentReplica);
253-
deleteRemoteDirectory(tableId);
254-
try {
255-
metadataManager.completeDeleteTable(tableId);
256-
} catch (Exception e) {
257-
LOG.error("Fail to complete table deletion for table {}.", tableId, e);
258-
}
257+
asyncDeleteRemoteDirectory(tableId);
258+
asyncDeleteTableMetadata(tableId);
259259
coordinatorContext.removeTable(tableId);
260260
}
261261

@@ -264,41 +264,62 @@ private void completeDeletePartition(TablePartition tablePartition) {
264264
coordinatorContext.getAllReplicasForPartition(
265265
tablePartition.getTableId(), tablePartition.getPartitionId());
266266
replicaStateMachine.handleStateChanges(replicas, ReplicaState.NonExistentReplica);
267-
deleteRemoteDirectory(tablePartition);
268-
try {
269-
metadataManager.completeDeletePartition(tablePartition.getPartitionId());
270-
} catch (Exception e) {
271-
LOG.error("Fail to complete partition {} deletion.", tablePartition, e);
272-
}
267+
asyncDeleteRemoteDirectory(tablePartition);
268+
asyncDeletePartitionMetadata(tablePartition.getPartitionId());
273269
coordinatorContext.removePartition(tablePartition);
274270
}
275271

276-
private void deleteRemoteDirectory(long tableId) {
272+
private void asyncDeleteRemoteDirectory(long tableId) {
277273
// delete table remote dir, when restore the coordinator, the table info will be null
278274
// we can't delete the remote dir since we don't know tablePath now
279275
TableInfo tableInfo = coordinatorContext.getTableInfoById(tableId);
280276
if (tableInfo != null) {
281-
remoteStorageCleaner.deleteTableRemoteDir(
277+
remoteStorageCleaner.asyncDeleteTableRemoteDir(
282278
tableInfo.getTablePath(), tableInfo.hasPrimaryKey(), tableId);
283279
}
284280
}
285281

286-
private void deleteRemoteDirectory(TablePartition tablePartition) {
282+
private void asyncDeleteRemoteDirectory(TablePartition tablePartition) {
287283
// delete partition remote dir, when restore the coordinator, the table info will be null
288284
// we can't delete the remote dir since we don't tablePath and partition name now
289285
TableInfo tableInfo = coordinatorContext.getTableInfoById(tablePartition.getTableId());
290286
if (tableInfo != null) {
291287
String partitionName =
292288
coordinatorContext.getPartitionName(tablePartition.getPartitionId());
293289
if (partitionName != null) {
294-
remoteStorageCleaner.deletePartitionRemoteDir(
290+
remoteStorageCleaner.asyncDeletePartitionRemoteDir(
295291
PhysicalTablePath.of(tableInfo.getTablePath(), partitionName),
296292
tableInfo.hasPrimaryKey(),
297293
tablePartition);
298294
}
299295
}
300296
}
301297

298+
private void asyncDeleteTableMetadata(long tableId) {
299+
ioExecutor.submit(
300+
() -> {
301+
try {
302+
metadataManager.completeDeleteTable(tableId);
303+
} catch (Exception e) {
304+
LOG.error("Fail to delete ZooKeeper metadata for table id {}.", tableId, e);
305+
}
306+
});
307+
}
308+
309+
private void asyncDeletePartitionMetadata(long partitionId) {
310+
ioExecutor.submit(
311+
() -> {
312+
try {
313+
metadataManager.completeDeletePartition(partitionId);
314+
} catch (Exception e) {
315+
LOG.error(
316+
"Fail to delete ZooKeeper metadata for partition id {}.",
317+
partitionId,
318+
e);
319+
}
320+
});
321+
}
322+
302323
private boolean isEligibleForDeletion(long tableId) {
303324
// the table is queued for deletion and
304325
// no any replica is in state deletion started

fluss-server/src/test/java/org/apache/fluss/server/coordinator/TableManagerTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,8 @@ private void initTableManager() {
129129
coordinatorContext,
130130
replicaStateMachine,
131131
tableBucketStateMachine,
132-
new RemoteStorageCleaner(conf, ioExecutor));
132+
new RemoteStorageCleaner(conf, ioExecutor),
133+
ioExecutor);
133134
tableManager.startup();
134135

135136
coordinatorContext.setLiveTabletServers(

0 commit comments

Comments
 (0)