2222import org .apache .fluss .metadata .TableBucket ;
2323import org .apache .fluss .metadata .TableBucketReplica ;
2424import org .apache .fluss .metadata .TableInfo ;
25+ import org .apache .fluss .metadata .TablePartition ;
2526import org .apache .fluss .server .coordinator .event .CoordinatorEvent ;
2627import org .apache .fluss .server .coordinator .event .DeleteReplicaResponseReceivedEvent ;
2728import org .apache .fluss .server .coordinator .event .TestingEventManager ;
4748import javax .annotation .Nullable ;
4849
4950import java .io .IOException ;
51+ import java .time .Duration ;
5052import java .util .Arrays ;
5153import java .util .Collections ;
5254import java .util .HashSet ;
6466import static org .apache .fluss .server .coordinator .statemachine .BucketState .OnlineBucket ;
6567import static org .apache .fluss .server .coordinator .statemachine .ReplicaState .OnlineReplica ;
6668import static org .apache .fluss .server .coordinator .statemachine .ReplicaState .ReplicaDeletionSuccessful ;
69+ import static org .apache .fluss .testutils .common .CommonTestUtils .retry ;
6770import static org .assertj .core .api .Assertions .assertThat ;
6871
6972/** Test for {@link TableManager}. */
@@ -196,7 +199,10 @@ void testDeleteTable() throws Exception {
196199
197200 // call method resumeDeletions, should delete the assignments from zk
198201 tableManager .resumeDeletions ();
199- assertThat (zookeeperClient .getTableAssignment (tableId )).isEmpty ();
202+ // retry for async deletion of TableAssignment
203+ retry (
204+ Duration .ofSeconds (30 ),
205+ () -> assertThat (zookeeperClient .getTableAssignment (tableId )).isEmpty ());
200206 // the table will also be removed from coordinator context
201207 assertThat (coordinatorContext .getAllReplicasForTable (tableId )).isEmpty ();
202208 }
@@ -268,15 +274,11 @@ void testCreateAndDropPartition() throws Exception {
268274 PartitionAssignment partitionAssignment =
269275 new PartitionAssignment (tableId , createAssignment ().getBucketAssignments ());
270276 String partitionName = "2024" ;
277+ long partitionId = zookeeperClient .getPartitionIdAndIncrement ();
271278 zookeeperClient .registerPartitionAssignmentAndMetadata (
272- zookeeperClient .getPartitionIdAndIncrement (),
273- partitionName ,
274- partitionAssignment ,
275- DATA1_TABLE_PATH ,
276- tableId );
279+ partitionId , partitionName , partitionAssignment , DATA1_TABLE_PATH , tableId );
277280
278281 // create partition
279- long partitionId = 1L ;
280282 tableManager .onCreateNewPartition (
281283 DATA1_TABLE_PATH , tableId , partitionId , partitionName , partitionAssignment );
282284
@@ -285,8 +287,24 @@ void testCreateAndDropPartition() throws Exception {
285287
286288 // drop partition
287289 // all replicas should be deleted
290+ coordinatorContext .queuePartitionDeletion (
291+ Collections .singleton (new TablePartition (tableId , partitionId )));
288292 tableManager .onDeletePartition (tableId , partitionId );
289293 checkReplicaDelete (tableId , partitionId , partitionAssignment );
294+
295+ // mark all replica as delete
296+ for (TableBucketReplica replica : getReplicas (tableId , partitionId , partitionAssignment )) {
297+ coordinatorContext .putReplicaState (replica , ReplicaDeletionSuccessful );
298+ }
299+
300+ // call method resumeDeletions, should delete the assignments from zk
301+ tableManager .resumeDeletions ();
302+ // retry for async deletion of PartitionAssignment
303+ retry (
304+ Duration .ofSeconds (30 ),
305+ () -> assertThat (zookeeperClient .getPartitionAssignment (partitionId )).isEmpty ());
306+ // the partition will also be removed from coordinator context
307+ assertThat (coordinatorContext .getAllReplicasForPartition (tableId , partitionId )).isEmpty ();
290308 }
291309
292310 private TableAssignment createAssignment () {
@@ -350,9 +368,14 @@ private void checkReplicaDelete(
350368 }
351369
352370 private Set <TableBucketReplica > getReplicas (long tableId , TableAssignment assignment ) {
371+ return getReplicas (tableId , null , assignment );
372+ }
373+
374+ private Set <TableBucketReplica > getReplicas (
375+ long tableId , Long partitionId , TableAssignment assignment ) {
353376 Set <TableBucketReplica > tableBucketReplicas = new HashSet <>();
354377 for (int bucketId : assignment .getBuckets ()) {
355- TableBucket tableBucket = new TableBucket (tableId , bucketId );
378+ TableBucket tableBucket = new TableBucket (tableId , partitionId , bucketId );
356379 List <Integer > replicas = assignment .getBucketAssignment (bucketId ).getReplicas ();
357380 for (int replica : replicas ) {
358381 tableBucketReplicas .add (new TableBucketReplica (tableBucket , replica ));
0 commit comments