Skip to content

Commit 426fae6

Browse files
zcoowuchong
andauthored
[Server]Accelerate the speed of creating tables and partitions (#957)
--------- Co-authored-by: Jark Wu <[email protected]>
1 parent 4922bfe commit 426fae6

File tree

5 files changed

+390
-13
lines changed

5 files changed

+390
-13
lines changed

fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/statemachine/TableBucketStateMachine.java

Lines changed: 161 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
import com.alibaba.fluss.metadata.TableBucket;
2121
import com.alibaba.fluss.server.coordinator.CoordinatorContext;
2222
import com.alibaba.fluss.server.coordinator.CoordinatorRequestBatch;
23+
import com.alibaba.fluss.server.entity.BatchRegisterLeadAndIsr;
24+
import com.alibaba.fluss.server.entity.RegisterTableBucketLeadAndIsrInfo;
2325
import com.alibaba.fluss.server.zk.ZooKeeperClient;
2426
import com.alibaba.fluss.server.zk.data.LeaderAndIsr;
2527
import com.alibaba.fluss.shaded.guava32.com.google.common.collect.Sets;
@@ -29,8 +31,10 @@
2931

3032
import javax.annotation.Nullable;
3133

34+
import java.util.ArrayList;
3235
import java.util.HashSet;
3336
import java.util.List;
37+
import java.util.Objects;
3438
import java.util.Optional;
3539
import java.util.Set;
3640
import java.util.stream.Collectors;
@@ -112,8 +116,14 @@ public void shutdown() {
112116
public void handleStateChange(Set<TableBucket> tableBuckets, BucketState targetState) {
113117
try {
114118
coordinatorRequestBatch.newBatch();
115-
for (TableBucket tableBucket : tableBuckets) {
116-
doHandleStateChange(tableBucket, targetState);
119+
120+
if (checkIfCreateTablePartitionRequest(tableBuckets, targetState)) {
121+
// batch register table bucket lead and isr
122+
batchHandleOnlineChangeAndInitLeader(tableBuckets);
123+
} else {
124+
for (TableBucket tableBucket : tableBuckets) {
125+
doHandleStateChange(tableBucket, targetState);
126+
}
117127
}
118128
coordinatorRequestBatch.sendRequestToTabletServers(
119129
coordinatorContext.getCoordinatorEpoch());
@@ -242,8 +252,139 @@ private void doHandleStateChange(TableBucket tableBucket, BucketState targetStat
242252
}
243253
}
244254

255+
private boolean checkIfCreateTablePartitionRequest(
256+
Set<TableBucket> tableBuckets, BucketState targetState) {
257+
// Check if the state is from NewBucket -> OnlineBucket
258+
// and all buckets belong to a same table (partition).
259+
// If so, we will merge the register zk requests to speed up
260+
if (targetState != BucketState.OnlineBucket) {
261+
return false;
262+
}
263+
264+
if (tableBuckets.isEmpty()) {
265+
return false;
266+
}
267+
268+
TableBucket first = tableBuckets.iterator().next();
269+
270+
for (TableBucket tableBucket : tableBuckets) {
271+
BucketState currentState = coordinatorContext.getBucketState(tableBucket);
272+
if (currentState != BucketState.NewBucket) {
273+
return false;
274+
}
275+
276+
if (tableBucket.getTableId() != first.getTableId()
277+
|| !Objects.equals(tableBucket.getPartitionId(), first.getPartitionId())) {
278+
// not belong to the same table(partition).
279+
return false;
280+
}
281+
}
282+
return true;
283+
}
284+
245285
private Optional<ElectionResult> initLeaderForTableBuckets(
246286
TableBucket tableBucket, List<Integer> assignedServers) {
287+
Optional<ElectionResult> optionalElectionResult =
288+
doInitElectionForBucket(tableBucket, assignedServers);
289+
if (optionalElectionResult.isPresent()) {
290+
ElectionResult electionResult = optionalElectionResult.get();
291+
LeaderAndIsr leaderAndIsr = electionResult.leaderAndIsr;
292+
try {
293+
zooKeeperClient.registerLeaderAndIsr(tableBucket, leaderAndIsr);
294+
} catch (Exception e) {
295+
LOG.error(
296+
"Fail to create state node for table bucket {} in zookeeper.",
297+
stringifyBucket(tableBucket),
298+
e);
299+
return Optional.empty();
300+
}
301+
coordinatorContext.putBucketLeaderAndIsr(tableBucket, leaderAndIsr);
302+
}
303+
return optionalElectionResult;
304+
}
305+
306+
public void batchHandleOnlineChangeAndInitLeader(Set<TableBucket> tableBuckets) {
307+
if (tableBuckets.isEmpty()) {
308+
return;
309+
}
310+
311+
TableBucket first = tableBuckets.iterator().next();
312+
BatchRegisterLeadAndIsr batchRegister =
313+
new BatchRegisterLeadAndIsr(first.getTableId(), first.getPartitionId());
314+
for (TableBucket tableBucket : tableBuckets) {
315+
// precheck partition name
316+
BucketState currentState = coordinatorContext.getBucketState(tableBucket);
317+
String partitionName = null;
318+
if (tableBucket.getPartitionId() != null) {
319+
partitionName = coordinatorContext.getPartitionName(tableBucket.getPartitionId());
320+
if (partitionName == null) {
321+
LOG.error(
322+
"Can't find partition name for partition: {}.",
323+
tableBucket.getBucket());
324+
logFailedStateChange(tableBucket, currentState, BucketState.OnlineBucket);
325+
continue;
326+
}
327+
}
328+
329+
List<Integer> assignedServers = coordinatorContext.getAssignment(tableBucket);
330+
331+
Optional<ElectionResult> optionalElectionResult =
332+
doInitElectionForBucket(tableBucket, assignedServers);
333+
if (!optionalElectionResult.isPresent()) {
334+
logFailedStateChange(tableBucket, currentState, BucketState.OnlineBucket);
335+
continue;
336+
}
337+
ElectionResult electionResult = optionalElectionResult.get();
338+
339+
batchRegister.add(
340+
tableBucket,
341+
electionResult.leaderAndIsr,
342+
partitionName,
343+
electionResult.liveReplicas);
344+
}
345+
346+
List<RegisterTableBucketLeadAndIsrInfo> registerSuccessList = new ArrayList<>();
347+
List<RegisterTableBucketLeadAndIsrInfo> tableBucketLeadAndIsrInfos =
348+
batchRegister.getRegisterList();
349+
350+
// Register the initial leader and isr.
351+
if (!tableBucketLeadAndIsrInfos.isEmpty()) {
352+
try {
353+
zooKeeperClient.batchRegisterLeaderAndIsrForTablePartition(
354+
tableBucketLeadAndIsrInfos);
355+
registerSuccessList.addAll(tableBucketLeadAndIsrInfos);
356+
} catch (Exception e) {
357+
LOG.error(
358+
"Fail to batch create state node for table buckets in zookeeper. The first bucket info: {}",
359+
stringifyBucket(tableBucketLeadAndIsrInfos.get(0).getTableBucket()),
360+
e);
361+
// Failed in batch mode, try to register one by one.
362+
registerSuccessList.addAll(
363+
tryRegisterLeaderAndIsrOneByOne(tableBucketLeadAndIsrInfos));
364+
}
365+
}
366+
367+
for (RegisterTableBucketLeadAndIsrInfo info : registerSuccessList) {
368+
TableBucket tableBucket = info.getTableBucket();
369+
LeaderAndIsr leaderAndIsr = info.getLeaderAndIsr();
370+
coordinatorContext.putBucketLeaderAndIsr(tableBucket, leaderAndIsr);
371+
372+
// transmit state
373+
doStateChange(tableBucket, BucketState.OnlineBucket);
374+
// then send request to the tablet servers
375+
coordinatorRequestBatch.addNotifyLeaderRequestForTabletServers(
376+
new HashSet<>(info.getLiveReplicas()),
377+
PhysicalTablePath.of(
378+
coordinatorContext.getTablePathById(tableBucket.getTableId()),
379+
info.getPartitionName()),
380+
tableBucket,
381+
coordinatorContext.getAssignment(tableBucket),
382+
leaderAndIsr);
383+
}
384+
}
385+
386+
private Optional<ElectionResult> doInitElectionForBucket(
387+
TableBucket tableBucket, List<Integer> assignedServers) {
247388
// filter out the live servers
248389
List<Integer> liveServers =
249390
assignedServers.stream()
@@ -286,19 +427,27 @@ private Optional<ElectionResult> initLeaderForTableBuckets(
286427
// Register the initial leader and isr.
287428
LeaderAndIsr leaderAndIsr =
288429
new LeaderAndIsr(leader, 0, isr, coordinatorContext.getCoordinatorEpoch(), 0);
289-
try {
290-
zooKeeperClient.registerLeaderAndIsr(tableBucket, leaderAndIsr);
291-
} catch (Exception e) {
292-
LOG.error(
293-
"Fail to create state node for table bucket {} in zookeeper.",
294-
stringifyBucket(tableBucket),
295-
e);
296-
return Optional.empty();
297-
}
298-
coordinatorContext.putBucketLeaderAndIsr(tableBucket, leaderAndIsr);
430+
299431
return Optional.of(new ElectionResult(liveServers, leaderAndIsr));
300432
}
301433

434+
private List<RegisterTableBucketLeadAndIsrInfo> tryRegisterLeaderAndIsrOneByOne(
435+
List<RegisterTableBucketLeadAndIsrInfo> registerList) {
436+
List<RegisterTableBucketLeadAndIsrInfo> registerSuccessList = new ArrayList<>();
437+
for (RegisterTableBucketLeadAndIsrInfo info : registerList) {
438+
try {
439+
zooKeeperClient.registerLeaderAndIsr(info.getTableBucket(), info.getLeaderAndIsr());
440+
registerSuccessList.add(info);
441+
} catch (Exception e) {
442+
LOG.error(
443+
"Fail to create state node for table bucket {} in zookeeper.",
444+
stringifyBucket(info.getTableBucket()),
445+
e);
446+
}
447+
}
448+
return registerSuccessList;
449+
}
450+
302451
private Optional<ElectionResult> electNewLeaderForTableBuckets(TableBucket tableBucket) {
303452
LeaderAndIsr leaderAndIsr;
304453
try {
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/*
2+
* Copyright (c) 2025 Alibaba Group Holding Ltd.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.alibaba.fluss.server.entity;
18+
19+
import com.alibaba.fluss.metadata.TableBucket;
20+
import com.alibaba.fluss.server.zk.data.LeaderAndIsr;
21+
22+
import javax.annotation.Nullable;
23+
24+
import java.util.ArrayList;
25+
import java.util.List;
26+
import java.util.Objects;
27+
28+
/**
29+
* The collection of {@link com.alibaba.fluss.server.entity.RegisterTableBucketLeadAndIsrInfo} for a
30+
* specific table or partition.
31+
*/
32+
public class BatchRegisterLeadAndIsr {
33+
private final long tableId;
34+
@Nullable private final Long partitionId;
35+
private final List<RegisterTableBucketLeadAndIsrInfo> registerList;
36+
37+
public BatchRegisterLeadAndIsr(long tableId, @Nullable Long partitionId) {
38+
this.tableId = tableId;
39+
this.partitionId = partitionId;
40+
this.registerList = new ArrayList<>();
41+
}
42+
43+
public void add(
44+
TableBucket tableBucket,
45+
LeaderAndIsr leaderAndIsr,
46+
@Nullable String partitionName,
47+
List<Integer> liveReplicas) {
48+
// check the tableBucket has the same tableId and partitionId.
49+
// add it to registerList.
50+
if (tableBucket.getTableId() == tableId
51+
&& Objects.equals(partitionId, tableBucket.getPartitionId())) {
52+
registerList.add(
53+
new RegisterTableBucketLeadAndIsrInfo(
54+
tableBucket, leaderAndIsr, partitionName, liveReplicas));
55+
} else {
56+
throw new IllegalArgumentException(
57+
"Try to add a bucket with different tableId or partitionId in collection when try to batch register to Zookeeper."
58+
+ "batch tableId="
59+
+ tableId
60+
+ " batch partitionId:"
61+
+ partitionId
62+
+ " current tableId="
63+
+ tableBucket.getTableId()
64+
+ " current partitionId="
65+
+ tableBucket.getPartitionId());
66+
}
67+
}
68+
69+
public List<RegisterTableBucketLeadAndIsrInfo> getRegisterList() {
70+
return registerList;
71+
}
72+
}
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/*
2+
* Copyright (c) 2025 Alibaba Group Holding Ltd.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.alibaba.fluss.server.entity;
18+
19+
import com.alibaba.fluss.metadata.TableBucket;
20+
import com.alibaba.fluss.server.zk.data.LeaderAndIsr;
21+
22+
import javax.annotation.Nullable;
23+
24+
import java.util.List;
25+
26+
/** The data for register LeaderAndIsr in zk. */
27+
public class RegisterTableBucketLeadAndIsrInfo {
28+
private final TableBucket tableBucket;
29+
private final LeaderAndIsr leaderAndIsr;
30+
@Nullable private final String partitionName;
31+
private final List<Integer> liveReplicas;
32+
33+
public RegisterTableBucketLeadAndIsrInfo(
34+
TableBucket tableBucket,
35+
LeaderAndIsr leaderAndIsr,
36+
@Nullable String partitionName,
37+
List<Integer> liveReplicas) {
38+
this.tableBucket = tableBucket;
39+
this.leaderAndIsr = leaderAndIsr;
40+
this.partitionName = partitionName;
41+
this.liveReplicas = liveReplicas;
42+
}
43+
44+
public TableBucket getTableBucket() {
45+
return tableBucket;
46+
}
47+
48+
public LeaderAndIsr getLeaderAndIsr() {
49+
return leaderAndIsr;
50+
}
51+
52+
@Nullable
53+
public String getPartitionName() {
54+
return partitionName;
55+
}
56+
57+
public List<Integer> getLiveReplicas() {
58+
return liveReplicas;
59+
}
60+
}

0 commit comments

Comments
 (0)