Skip to content

Commit 465730f

Browse files
authored
[coordinator] CoordinatorEventProcessor shouldn't process PartitionCreated event for existing partition (#419)
1 parent 0b623e4 commit 465730f

File tree

2 files changed

+149
-44
lines changed

2 files changed

+149
-44
lines changed

fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorEventProcessor.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -500,6 +500,11 @@ private void updateMetrics() {
500500
}
501501

502502
private void processCreateTable(CreateTableEvent createTableEvent) {
503+
long tableId = createTableEvent.getTableInfo().getTableId();
504+
// skip the table if it already exists
505+
if (coordinatorContext.containsTableId(tableId)) {
506+
return;
507+
}
503508
TableInfo tableInfo = createTableEvent.getTableInfo();
504509
coordinatorContext.putTableInfo(tableInfo);
505510
tableManager.onCreateNewTable(
@@ -512,6 +517,11 @@ private void processCreateTable(CreateTableEvent createTableEvent) {
512517
}
513518

514519
private void processCreatePartition(CreatePartitionEvent createPartitionEvent) {
520+
long partitionId = createPartitionEvent.getPartitionId();
521+
// skip the partition if it already exists
522+
if (coordinatorContext.containsPartitionId(partitionId)) {
523+
return;
524+
}
515525
tableManager.onCreateNewPartition(
516526
createPartitionEvent.getTablePath(),
517527
createPartitionEvent.getTableId(),

fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/CoordinatorEventProcessorTest.java

Lines changed: 139 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,7 @@
3030
import com.alibaba.fluss.metadata.TablePath;
3131
import com.alibaba.fluss.rpc.messages.CommitKvSnapshotResponse;
3232
import com.alibaba.fluss.server.coordinator.event.CommitKvSnapshotEvent;
33-
import com.alibaba.fluss.server.coordinator.event.CoordinatorEvent;
3433
import com.alibaba.fluss.server.coordinator.event.CoordinatorEventManager;
35-
import com.alibaba.fluss.server.coordinator.event.CreatePartitionEvent;
36-
import com.alibaba.fluss.server.coordinator.event.DropPartitionEvent;
3734
import com.alibaba.fluss.server.coordinator.statemachine.BucketState;
3835
import com.alibaba.fluss.server.coordinator.statemachine.ReplicaState;
3936
import com.alibaba.fluss.server.entity.CommitKvSnapshotData;
@@ -56,6 +53,7 @@
5653
import com.alibaba.fluss.server.zk.data.ZkData.TableIdsZNode;
5754
import com.alibaba.fluss.testutils.common.AllCallbackWrapper;
5855
import com.alibaba.fluss.types.DataTypes;
56+
import com.alibaba.fluss.utils.types.Tuple2;
5957

6058
import org.junit.jupiter.api.AfterEach;
6159
import org.junit.jupiter.api.BeforeAll;
@@ -78,6 +76,9 @@
7876
import java.util.concurrent.CompletableFuture;
7977
import java.util.stream.Collectors;
8078

79+
import static com.alibaba.fluss.server.coordinator.CoordinatorTestUtils.checkLeaderAndIsr;
80+
import static com.alibaba.fluss.server.coordinator.CoordinatorTestUtils.makeSendLeaderAndStopRequestAlwaysSuccess;
81+
import static com.alibaba.fluss.server.coordinator.CoordinatorTestUtils.makeSendLeaderAndStopRequestFailContext;
8182
import static com.alibaba.fluss.server.coordinator.CoordinatorTestUtils.verifyBucketForPartitionInState;
8283
import static com.alibaba.fluss.server.coordinator.CoordinatorTestUtils.verifyBucketForTableInState;
8384
import static com.alibaba.fluss.server.coordinator.CoordinatorTestUtils.verifyReplicaForPartitionInState;
@@ -173,7 +174,7 @@ void afterEach() {
173174
void testCreateAndDropTable() throws Exception {
174175
CoordinatorContext coordinatorContext = eventProcessor.getCoordinatorContext();
175176
// make sure all request to gateway should be successful
176-
CoordinatorTestUtils.makeSendLeaderAndStopRequestAlwaysSuccess(
177+
makeSendLeaderAndStopRequestAlwaysSuccess(
177178
eventProcessor.getCoordinatorContext(), testCoordinatorChannelManager);
178179
// create a table,
179180
TablePath t1 = TablePath.of(defaultDatabase, "create_drop_t1");
@@ -226,7 +227,7 @@ void testCreateAndDropTable() throws Exception {
226227
autoPartitionManager,
227228
TestingMetricGroups.COORDINATOR_METRICS,
228229
new Configuration());
229-
CoordinatorTestUtils.makeSendLeaderAndStopRequestAlwaysSuccess(
230+
makeSendLeaderAndStopRequestAlwaysSuccess(
230231
testCoordinatorChannelManager,
231232
Arrays.stream(zookeeperClient.getSortedTabletServerList())
232233
.boxed()
@@ -247,7 +248,7 @@ void testDropTableWithRetry() throws Exception {
247248
// make request to some server should fail, but delete will still be successful
248249
// finally with retry logic
249250
int failedServer = 0;
250-
CoordinatorTestUtils.makeSendLeaderAndStopRequestFailContext(
251+
makeSendLeaderAndStopRequestFailContext(
251252
testCoordinatorChannelManager,
252253
Arrays.stream(zookeeperClient.getSortedTabletServerList())
253254
.boxed()
@@ -283,7 +284,7 @@ void testDropTableWithRetry() throws Exception {
283284
void testServerBecomeOnlineAndOfflineLine() throws Exception {
284285
CoordinatorContext coordinatorContext = eventProcessor.getCoordinatorContext();
285286
// make sure all request to gateway should be successful
286-
CoordinatorTestUtils.makeSendLeaderAndStopRequestAlwaysSuccess(
287+
makeSendLeaderAndStopRequestAlwaysSuccess(
287288
eventProcessor.getCoordinatorContext(), testCoordinatorChannelManager);
288289
// assume a new server become online;
289290
// check the server has been added into coordinator context
@@ -303,7 +304,7 @@ void testServerBecomeOnlineAndOfflineLine() throws Exception {
303304
assertThat(coordinatorContext.getLiveTabletServers())
304305
.containsKey(newlyServerId));
305306

306-
CoordinatorTestUtils.makeSendLeaderAndStopRequestAlwaysSuccess(
307+
makeSendLeaderAndStopRequestAlwaysSuccess(
307308
eventProcessor.getCoordinatorContext(), testCoordinatorChannelManager);
308309
verifyTabletServer(coordinatorContext, newlyServerId, tabletServerRegistration);
309310

@@ -373,7 +374,7 @@ void testServerBecomeOnlineAndOfflineLine() throws Exception {
373374
// should change the leader for bucket2 of t1 should change since the leader fail
374375
assertThat(coordinatorContext.getBucketState(t1Bucket1)).isEqualTo(OnlineBucket);
375376
// leader should change to replica2, leader epoch should be 1
376-
CoordinatorTestUtils.checkLeaderAndIsr(zookeeperClient, t1Bucket1, 1, 2);
377+
checkLeaderAndIsr(zookeeperClient, t1Bucket1, 1, 2);
377378

378379
// the bucket with no any other available servers should be still offline,
379380
// t2 bucket0 should still be offline
@@ -420,7 +421,7 @@ void testServerBecomeOnlineAndOfflineLine() throws Exception {
420421

421422
// in this test case, so make requests to gateway should always be
422423
// successful for when start up, it will send request to tablet servers
423-
CoordinatorTestUtils.makeSendLeaderAndStopRequestAlwaysSuccess(
424+
makeSendLeaderAndStopRequestAlwaysSuccess(
424425
testCoordinatorChannelManager,
425426
Arrays.stream(zookeeperClient.getSortedTabletServerList())
426427
.boxed()
@@ -453,7 +454,7 @@ void testRestartTriggerReplicaToOffline() throws Exception {
453454
long table1Id = metadataManager.createTable(tablePath, TEST_TABLE, tableAssignment, false);
454455

455456
// let's restart
456-
CoordinatorTestUtils.makeSendLeaderAndStopRequestAlwaysSuccess(
457+
makeSendLeaderAndStopRequestAlwaysSuccess(
457458
eventProcessor.getCoordinatorContext(), testCoordinatorChannelManager);
458459
eventProcessor.shutdown();
459460
eventProcessor =
@@ -466,7 +467,7 @@ void testRestartTriggerReplicaToOffline() throws Exception {
466467
new Configuration());
467468
CoordinatorContext coordinatorContext = eventProcessor.getCoordinatorContext();
468469
int failedServer = 0;
469-
CoordinatorTestUtils.makeSendLeaderAndStopRequestFailContext(
470+
makeSendLeaderAndStopRequestFailContext(
470471
testCoordinatorChannelManager,
471472
Arrays.stream(zookeeperClient.getSortedTabletServerList())
472473
.boxed()
@@ -488,7 +489,7 @@ void testRestartTriggerReplicaToOffline() throws Exception {
488489
});
489490

490491
// check the changed leader and isr info
491-
CoordinatorTestUtils.checkLeaderAndIsr(zookeeperClient, t1Bucket0, 1, 1);
492+
checkLeaderAndIsr(zookeeperClient, t1Bucket0, 1, 1);
492493
retry(
493494
Duration.ofMinutes(1),
494495
() -> {
@@ -579,44 +580,31 @@ void testAddBucketCompletedSnapshot(@TempDir Path tempDir) throws Exception {
579580

580581
@Test
581582
void testCreateAndDropPartition() throws Exception {
583+
TablePath tablePath = TablePath.of(defaultDatabase, "test_create_drop_partition");
582584
CoordinatorContext coordinatorContext = eventProcessor.getCoordinatorContext();
583585
// make sure all request to gateway should be successful
584-
CoordinatorTestUtils.makeSendLeaderAndStopRequestAlwaysSuccess(
585-
eventProcessor.getCoordinatorContext(), testCoordinatorChannelManager);
586+
makeSendLeaderAndStopRequestAlwaysSuccess(
587+
coordinatorContext, testCoordinatorChannelManager);
586588
// create a partitioned table
587-
TablePath tablePath = TablePath.of(defaultDatabase, "partition_table");
588589
TableDescriptor tablePartitionTableDescriptor = getPartitionedTable();
589590
long tableId =
590591
metadataManager.createTable(tablePath, tablePartitionTableDescriptor, null, false);
591592

592-
retry(
593-
Duration.ofMinutes(1),
594-
// retry util the table has been put into context
595-
() -> assertThat(coordinatorContext.getTablePathById(tableId)).isNotNull());
596-
597-
// create partition
598-
long partition1Id = zookeeperClient.getPartitionIdAndIncrement();
599-
long partition2Id = zookeeperClient.getPartitionIdAndIncrement();
600593
int nBuckets = 3;
601594
int replicationFactor = 3;
602-
String partition1Name = "2024";
603-
String partition2Name = "2025";
604595
Map<Integer, BucketAssignment> assignments =
605596
TableAssignmentUtils.generateAssignment(
606597
nBuckets, replicationFactor, new int[] {0, 1, 2})
607598
.getBucketAssignments();
608599
PartitionAssignment partitionAssignment = new PartitionAssignment(tableId, assignments);
609-
zookeeperClient.registerPartitionAssignment(partition1Id, partitionAssignment);
610-
zookeeperClient.registerPartitionAssignment(partition2Id, partitionAssignment);
600+
Tuple2<PartitionIdName, PartitionIdName> partitionIdAndNameTuple2 =
601+
preparePartitionAssignment(
602+
tablePath, tableId, coordinatorContext, partitionAssignment);
603+
604+
long partition1Id = partitionIdAndNameTuple2.f0.partitionId;
605+
String partition1Name = partitionIdAndNameTuple2.f0.partitionName;
606+
long partition2Id = partitionIdAndNameTuple2.f1.partitionId;
611607

612-
CoordinatorEvent createPartitionEvent =
613-
new CreatePartitionEvent(
614-
tablePath, tableId, partition1Id, partition1Name, partitionAssignment);
615-
eventProcessor.process(createPartitionEvent);
616-
createPartitionEvent =
617-
new CreatePartitionEvent(
618-
tablePath, tableId, partition2Id, partition2Name, partitionAssignment);
619-
eventProcessor.process(createPartitionEvent);
620608
verifyPartitionCreated(
621609
coordinatorContext,
622610
new TablePartition(tableId, partition1Id),
@@ -644,9 +632,7 @@ void testCreateAndDropPartition() throws Exception {
644632
assertThat(completedSnapshotStoreManager.getBucketCompletedSnapshotStores()).isNotEmpty();
645633

646634
// drop the partition
647-
DropPartitionEvent dropPartitionEvent = new DropPartitionEvent(tableId, partition1Id);
648-
eventProcessor.process(dropPartitionEvent);
649-
635+
zookeeperClient.deletePartition(tablePath, partition1Name);
650636
verifyPartitionDropped(coordinatorContext, tableId, partition1Id);
651637

652638
// verify CompleteSnapshotStore has been removed when the table partition1 is dropped
@@ -666,16 +652,113 @@ void testCreateAndDropPartition() throws Exception {
666652
autoPartitionManager,
667653
TestingMetricGroups.COORDINATOR_METRICS,
668654
new Configuration());
669-
CoordinatorTestUtils.makeSendLeaderAndStopRequestAlwaysSuccess(
655+
makeSendLeaderAndStopRequestAlwaysSuccess(
670656
testCoordinatorChannelManager,
671657
Arrays.stream(zookeeperClient.getSortedTabletServerList())
672658
.boxed()
673659
.collect(Collectors.toSet()));
674660
eventProcessor.startup();
675-
verifyPartitionDropped(coordinatorContext, tableId, partition2Id);
661+
verifyPartitionDropped(eventProcessor.getCoordinatorContext(), tableId, partition2Id);
676662
}
677663

678-
// todo: add test resume drop partition
664+
@Test
665+
void testRestartResumeDropPartition() throws Exception {
666+
TablePath tablePath = TablePath.of(defaultDatabase, "test_resume_drop_partition");
667+
CoordinatorContext coordinatorContext = eventProcessor.getCoordinatorContext();
668+
// make sure all request to gateway should be successful
669+
makeSendLeaderAndStopRequestAlwaysSuccess(
670+
coordinatorContext, testCoordinatorChannelManager);
671+
// create a partitioned table
672+
TableDescriptor tablePartitionTableDescriptor = getPartitionedTable();
673+
long tableId =
674+
metadataManager.createTable(tablePath, tablePartitionTableDescriptor, null, false);
675+
676+
int nBuckets = 3;
677+
int replicationFactor = 3;
678+
Map<Integer, BucketAssignment> assignments =
679+
TableAssignmentUtils.generateAssignment(
680+
nBuckets, replicationFactor, new int[] {0, 1, 2})
681+
.getBucketAssignments();
682+
PartitionAssignment partitionAssignment = new PartitionAssignment(tableId, assignments);
683+
Tuple2<PartitionIdName, PartitionIdName> partitionIdAndNameTuple2 =
684+
preparePartitionAssignment(
685+
tablePath, tableId, coordinatorContext, partitionAssignment);
686+
687+
long partition1Id = partitionIdAndNameTuple2.f0.partitionId;
688+
String partition2Name = partitionIdAndNameTuple2.f1.partitionName;
689+
long partition2Id = partitionIdAndNameTuple2.f1.partitionId;
690+
691+
verifyPartitionCreated(
692+
coordinatorContext,
693+
new TablePartition(tableId, partition1Id),
694+
partitionAssignment,
695+
nBuckets,
696+
replicationFactor);
697+
verifyPartitionCreated(
698+
coordinatorContext,
699+
new TablePartition(tableId, partition2Id),
700+
partitionAssignment,
701+
nBuckets,
702+
replicationFactor);
703+
704+
// now, drop partition2 and restart the coordinator event processor,
705+
// the partition2 should be dropped
706+
eventProcessor.shutdown();
707+
zookeeperClient.deletePartition(tablePath, partition2Name);
708+
709+
// start the coordinator
710+
eventProcessor =
711+
new CoordinatorEventProcessor(
712+
zookeeperClient,
713+
serverMetadataCache,
714+
testCoordinatorChannelManager,
715+
autoPartitionManager,
716+
TestingMetricGroups.COORDINATOR_METRICS,
717+
new Configuration());
718+
makeSendLeaderAndStopRequestAlwaysSuccess(
719+
testCoordinatorChannelManager,
720+
Arrays.stream(zookeeperClient.getSortedTabletServerList())
721+
.boxed()
722+
.collect(Collectors.toSet()));
723+
eventProcessor.startup();
724+
725+
CoordinatorContext newContext = eventProcessor.getCoordinatorContext();
726+
// verify partition2 is dropped
727+
verifyPartitionDropped(newContext, tableId, partition2Id);
728+
// verify the status of partition1
729+
verifyPartitionCreated(
730+
newContext,
731+
new TablePartition(tableId, partition1Id),
732+
partitionAssignment,
733+
nBuckets,
734+
replicationFactor);
735+
}
736+
737+
private Tuple2<PartitionIdName, PartitionIdName> preparePartitionAssignment(
738+
TablePath tablePath,
739+
long tableId,
740+
CoordinatorContext coordinatorContext,
741+
PartitionAssignment partitionAssignment)
742+
throws Exception {
743+
retry(
744+
Duration.ofMinutes(1),
745+
// retry util the table has been put into context
746+
() -> assertThat(coordinatorContext.getTablePathById(tableId)).isNotNull());
747+
748+
// create partition
749+
long partition1Id = zookeeperClient.getPartitionIdAndIncrement();
750+
long partition2Id = zookeeperClient.getPartitionIdAndIncrement();
751+
String partition1Name = "2024";
752+
String partition2Name = "2025";
753+
zookeeperClient.registerPartitionAssignment(partition1Id, partitionAssignment);
754+
zookeeperClient.registerPartition(tablePath, tableId, partition1Name, partition1Id);
755+
zookeeperClient.registerPartitionAssignment(partition2Id, partitionAssignment);
756+
zookeeperClient.registerPartition(tablePath, tableId, partition2Name, partition2Id);
757+
758+
return Tuple2.of(
759+
new PartitionIdName(partition1Id, partition1Name),
760+
new PartitionIdName(partition2Id, partition2Name));
761+
}
679762

680763
private void verifyTableCreated(
681764
CoordinatorContext coordinatorContext,
@@ -713,7 +796,7 @@ private void verifyTableCreated(
713796
nBuckets * replicationFactor,
714797
ReplicaState.OnlineReplica));
715798
for (TableBucket tableBucket : coordinatorContext.getAllBucketsForTable(tableId)) {
716-
CoordinatorTestUtils.checkLeaderAndIsr(
799+
checkLeaderAndIsr(
717800
zookeeperClient,
718801
tableBucket,
719802
0,
@@ -766,7 +849,7 @@ private void verifyPartitionCreated(
766849
for (TableBucket tableBucket :
767850
coordinatorContext.getAllBucketsForPartition(
768851
tablePartition.getTableId(), tablePartition.getPartitionId())) {
769-
CoordinatorTestUtils.checkLeaderAndIsr(
852+
checkLeaderAndIsr(
770853
zookeeperClient,
771854
tableBucket,
772855
0,
@@ -891,7 +974,19 @@ private TableDescriptor getPartitionedTable() {
891974
.partitionedBy("b")
892975
.property(ConfigOptions.TABLE_AUTO_PARTITION_ENABLED.key(), "true")
893976
.property(ConfigOptions.TABLE_AUTO_PARTITION_TIME_UNIT.key(), "DAY")
977+
// set to 0 to disable pre-create partition
978+
.property(ConfigOptions.TABLE_AUTO_PARTITION_NUM_PRECREATE, 0)
894979
.build()
895980
.withReplicationFactor(REPLICATION_FACTOR);
896981
}
982+
983+
private static class PartitionIdName {
984+
private final long partitionId;
985+
private final String partitionName;
986+
987+
private PartitionIdName(long partitionId, String partitionName) {
988+
this.partitionId = partitionId;
989+
this.partitionName = partitionName;
990+
}
991+
}
897992
}

0 commit comments

Comments
 (0)