Skip to content

Commit 8b9d293

Browse files
authored
[server] Use async zk operations to improve performance of CoordinatorServer initCoordinatorContext() (#1381)
1 parent d6a01f5 commit 8b9d293

File tree

10 files changed

+632
-81
lines changed

10 files changed

+632
-81
lines changed

fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -463,6 +463,12 @@ public class ConfigOptions {
463463
// ------------------------------------------------------------------------
464464
// ZooKeeper Client Settings
465465
// ------------------------------------------------------------------------
466+
public static final ConfigOption<Integer> ZOOKEEPER_MAX_INFLIGHT_REQUESTS =
467+
key("zookeeper.client.max-inflight-requests")
468+
.intType()
469+
.defaultValue(100)
470+
.withDescription(
471+
"The maximum number of unacknowledged requests the client will send to ZooKeeper before blocking.");
466472

467473
public static final ConfigOption<Duration> ZOOKEEPER_SESSION_TIMEOUT =
468474
key("zookeeper.client.session-timeout")

fluss-server/src/main/java/org/apache/fluss/server/authorizer/DefaultAuthorizer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -388,7 +388,7 @@ private void updateResourceAcl(
388388

389389
} else {
390390
LOG.trace("Deleting path for {} because it had no ACLs remaining", resource);
391-
zooKeeperClient.contitionalDeleteResourceAcl(
391+
zooKeeperClient.conditionalDeleteResourceAcl(
392392
resource, currentVersionedAcls.zkVersion);
393393
}
394394
writeComplete = true;

fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java

Lines changed: 79 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -272,8 +272,12 @@ private void initCoordinatorContext() throws Exception {
272272
int[] currentServers = zooKeeperClient.getSortedTabletServerList();
273273
List<ServerInfo> tabletServerInfos = new ArrayList<>();
274274
List<ServerNode> internalServerNodes = new ArrayList<>();
275+
276+
long start4loadTabletServer = System.currentTimeMillis();
277+
Map<Integer, TabletServerRegistration> tabletServerRegistrations =
278+
zooKeeperClient.getTabletServers(currentServers);
275279
for (int server : currentServers) {
276-
TabletServerRegistration registration = zooKeeperClient.getTabletServer(server).get();
280+
TabletServerRegistration registration = tabletServerRegistrations.get(server);
277281
ServerInfo serverInfo =
278282
new ServerInfo(
279283
server,
@@ -299,48 +303,74 @@ private void initCoordinatorContext() throws Exception {
299303
}
300304

301305
coordinatorContext.setLiveTabletServers(tabletServerInfos);
306+
LOG.info(
307+
"Load tablet servers success in {}ms when initializing coordinator context.",
308+
System.currentTimeMillis() - start4loadTabletServer);
309+
302310
// init tablet server channels
303311
coordinatorChannelManager.startup(internalServerNodes);
304312

305313
// load all tables
314+
long start4loadTables = System.currentTimeMillis();
306315
List<TableInfo> autoPartitionTables = new ArrayList<>();
307316
List<Tuple2<TableInfo, Long>> lakeTables = new ArrayList<>();
317+
Set<TablePath> tablePathSet = new HashSet<>();
308318
for (String database : metadataManager.listDatabases()) {
309319
for (String tableName : metadataManager.listTables(database)) {
310-
TablePath tablePath = TablePath.of(database, tableName);
311-
TableInfo tableInfo = metadataManager.getTable(tablePath);
312-
coordinatorContext.putTablePath(tableInfo.getTableId(), tablePath);
313-
coordinatorContext.putTableInfo(tableInfo);
314-
if (tableInfo.getTableConfig().isDataLakeEnabled()) {
315-
// always set to current time,
316-
// todo: should get from the last lake snapshot
317-
lakeTables.add(Tuple2.of(tableInfo, System.currentTimeMillis()));
318-
}
319-
if (tableInfo.isPartitioned()) {
320-
Map<String, Long> partitions =
321-
zooKeeperClient.getPartitionNameAndIds(tablePath);
320+
tablePathSet.add(TablePath.of(database, tableName));
321+
}
322+
}
323+
Map<TablePath, TableInfo> tablePath2TableInfoMap = metadataManager.getTables(tablePathSet);
324+
List<TablePath> partitionedTablePathList =
325+
tablePath2TableInfoMap.entrySet().stream()
326+
.filter(entry -> entry.getValue().isPartitioned())
327+
.map(Map.Entry::getKey)
328+
.collect(Collectors.toList());
329+
Map<TablePath, Map<String, Long>> tablePathMap =
330+
zooKeeperClient.getPartitionNameAndIdsForTables(partitionedTablePathList);
331+
for (TablePath tablePath : tablePathSet) {
332+
TableInfo tableInfo = tablePath2TableInfoMap.get(tablePath);
333+
coordinatorContext.putTablePath(tableInfo.getTableId(), tablePath);
334+
coordinatorContext.putTableInfo(tableInfo);
335+
if (tableInfo.getTableConfig().isDataLakeEnabled()) {
336+
// always set to current time,
337+
// todo: should get from the last lake snapshot
338+
lakeTables.add(Tuple2.of(tableInfo, System.currentTimeMillis()));
339+
}
340+
if (tableInfo.isPartitioned()) {
341+
Map<String, Long> partitions = tablePathMap.get(tablePath);
342+
if (partitions != null) {
322343
for (Map.Entry<String, Long> partition : partitions.entrySet()) {
323344
// put partition info to coordinator context
324345
coordinatorContext.putPartition(
325346
partition.getValue(),
326347
PhysicalTablePath.of(tableInfo.getTablePath(), partition.getKey()));
327348
}
328-
// if the table is auto partition, put the partitions info
329-
if (tableInfo
330-
.getTableConfig()
331-
.getAutoPartitionStrategy()
332-
.isAutoPartitionEnabled()) {
333-
autoPartitionTables.add(tableInfo);
334-
}
349+
}
350+
// if the table is auto partition, put the partitions info
351+
if (tableInfo
352+
.getTableConfig()
353+
.getAutoPartitionStrategy()
354+
.isAutoPartitionEnabled()) {
355+
autoPartitionTables.add(tableInfo);
335356
}
336357
}
337358
}
359+
LOG.info(
360+
"Load tables success in {}ms when initializing coordinator context.",
361+
System.currentTimeMillis() - start4loadTables);
362+
338363
autoPartitionManager.initAutoPartitionTables(autoPartitionTables);
339364
lakeTableTieringManager.initWithLakeTables(lakeTables);
340365

341366
// load all assignment
367+
long start4loadAssignment = System.currentTimeMillis();
342368
loadTableAssignment();
343369
loadPartitionAssignment();
370+
LOG.info(
371+
"Load table and partition assignment success in {}ms when initializing coordinator context.",
372+
System.currentTimeMillis() - start4loadAssignment);
373+
344374
long end = System.currentTimeMillis();
345375
LOG.info("Current total {} tables in the cluster.", coordinatorContext.allTables().size());
346376
LOG.info(
@@ -355,17 +385,19 @@ private void initCoordinatorContext() throws Exception {
355385
private void loadTableAssignment() throws Exception {
356386
List<String> assignmentTables = zooKeeperClient.getChildren(TableIdsZNode.path());
357387
Set<Long> deletedTables = new HashSet<>();
358-
for (String tableIdStr : assignmentTables) {
359-
long tableId = Long.parseLong(tableIdStr);
388+
List<Long> tableIds =
389+
assignmentTables.stream().map(Long::parseLong).collect(Collectors.toList());
390+
Map<Long, TableAssignment> tableId2tableAssignmentMap =
391+
zooKeeperClient.getTablesAssignments(tableIds);
392+
for (Long tableId : tableIds) {
360393
// if table id not in current coordinator context,
361394
// we'll consider it as deleted
362395
if (!coordinatorContext.containsTableId(tableId)) {
363396
deletedTables.add(tableId);
364397
}
365-
Optional<TableAssignment> optAssignment = zooKeeperClient.getTableAssignment(tableId);
366-
if (optAssignment.isPresent()) {
367-
TableAssignment tableAssignment = optAssignment.get();
368-
loadAssignment(tableId, tableAssignment, null);
398+
TableAssignment assignment = tableId2tableAssignmentMap.get(tableId);
399+
if (assignment != null) {
400+
loadAssignment(tableId, assignment, null);
369401
} else {
370402
LOG.warn(
371403
"Can't get the assignment for table {} with id {}.",
@@ -378,44 +410,51 @@ private void loadTableAssignment() throws Exception {
378410

379411
private void loadPartitionAssignment() throws Exception {
380412
// load all assignment
381-
List<String> partitionAssignmentNodes =
382-
zooKeeperClient.getChildren(PartitionIdsZNode.path());
413+
List<Long> partitionAssignmentNodes =
414+
zooKeeperClient.getChildren(PartitionIdsZNode.path()).stream()
415+
.map(Long::parseLong)
416+
.collect(Collectors.toList());
383417
Set<TablePartition> deletedPartitions = new HashSet<>();
384-
for (String partitionIdStr : partitionAssignmentNodes) {
385-
long partitionId = Long.parseLong(partitionIdStr);
386-
Optional<PartitionAssignment> optAssignment =
387-
zooKeeperClient.getPartitionAssignment(partitionId);
388-
if (!optAssignment.isPresent()) {
418+
Map<Long, PartitionAssignment> partitionId2partitionAssignmentMap =
419+
zooKeeperClient.getPartitionsAssignments(partitionAssignmentNodes);
420+
for (Long partitionId : partitionAssignmentNodes) {
421+
PartitionAssignment assignment = partitionId2partitionAssignmentMap.get(partitionId);
422+
if (assignment == null) {
389423
LOG.warn("Can't get the assignment for table partition {}.", partitionId);
390424
continue;
391425
}
392-
PartitionAssignment partitionAssignment = optAssignment.get();
393-
long tableId = partitionAssignment.getTableId();
426+
long tableId = assignment.getTableId();
394427
// partition id doesn't exist in coordinator context, consider it as deleted
395428
if (!coordinatorContext.containsPartitionId(partitionId)) {
396429
deletedPartitions.add(new TablePartition(tableId, partitionId));
397430
}
398-
loadAssignment(tableId, optAssignment.get(), partitionId);
431+
loadAssignment(tableId, assignment, partitionId);
399432
}
400433
coordinatorContext.queuePartitionDeletion(deletedPartitions);
401434
}
402435

403436
private void loadAssignment(
404437
long tableId, TableAssignment tableAssignment, @Nullable Long partitionId)
405438
throws Exception {
439+
Set<TableBucket> tableBucketSet = new HashSet<>();
406440
for (Map.Entry<Integer, BucketAssignment> entry :
407441
tableAssignment.getBucketAssignments().entrySet()) {
408442
int bucketId = entry.getKey();
409443
BucketAssignment bucketAssignment = entry.getValue();
410444
// put the assignment information to context
411445
TableBucket tableBucket = new TableBucket(tableId, partitionId, bucketId);
446+
tableBucketSet.add(tableBucket);
412447
coordinatorContext.updateBucketReplicaAssignment(
413448
tableBucket, bucketAssignment.getReplicas());
414-
Optional<LeaderAndIsr> optLeaderAndIsr = zooKeeperClient.getLeaderAndIsr(tableBucket);
449+
}
450+
Map<TableBucket, LeaderAndIsr> leaderAndIsrMap =
451+
zooKeeperClient.getLeaderAndIsrs(tableBucketSet);
452+
for (TableBucket tableBucket : tableBucketSet) {
453+
LeaderAndIsr leaderAndIsr = leaderAndIsrMap.get(tableBucket);
415454
// update bucket LeaderAndIsr info
416-
optLeaderAndIsr.ifPresent(
417-
leaderAndIsr ->
418-
coordinatorContext.putBucketLeaderAndIsr(tableBucket, leaderAndIsr));
455+
if (leaderAndIsr != null) {
456+
coordinatorContext.putBucketLeaderAndIsr(tableBucket, leaderAndIsr);
457+
}
419458
}
420459
}
421460

fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,8 @@
5555

5656
import javax.annotation.Nullable;
5757

58+
import java.util.Collection;
59+
import java.util.HashMap;
5860
import java.util.List;
5961
import java.util.Map;
6062
import java.util.Optional;
@@ -316,6 +318,29 @@ public TableInfo getTable(TablePath tablePath) throws TableNotExistException {
316318
return tableReg.toTableInfo(tablePath, schemaInfo, defaultTableLakeOptions);
317319
}
318320

321+
public Map<TablePath, TableInfo> getTables(Collection<TablePath> tablePaths)
322+
throws TableNotExistException {
323+
Map<TablePath, TableInfo> result = new HashMap<>();
324+
try {
325+
Map<TablePath, TableRegistration> tablePath2TableRegistrations =
326+
zookeeperClient.getTables(tablePaths);
327+
for (TablePath tablePath : tablePaths) {
328+
if (!tablePath2TableRegistrations.containsKey(tablePath)) {
329+
throw new TableNotExistException("Table '" + tablePath + "' does not exist.");
330+
}
331+
TableRegistration tableReg = tablePath2TableRegistrations.get(tablePath);
332+
SchemaInfo schemaInfo = getLatestSchema(tablePath);
333+
result.put(
334+
tablePath,
335+
tableReg.toTableInfo(tablePath, schemaInfo, defaultTableLakeOptions));
336+
}
337+
} catch (Exception e) {
338+
throw new FlussRuntimeException(
339+
String.format("Failed to get tables '%s'.", tablePaths), e);
340+
}
341+
return result;
342+
}
343+
319344
public TableRegistration getTableRegistration(TablePath tablePath) {
320345
Optional<TableRegistration> optionalTable;
321346
try {
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.server.zk;
19+
20+
/** The base class for ZooKeeper async operation request. */
21+
public abstract class ZkAsyncRequest {
22+
23+
private final String path;
24+
25+
protected ZkAsyncRequest(String path) {
26+
this.path = path;
27+
}
28+
29+
public String getPath() {
30+
return path;
31+
}
32+
33+
// -------------------------------------------------------------------------------------------
34+
35+
/** The request for ZooKeeper getData async operation. */
36+
public static class ZkGetDataRequest extends ZkAsyncRequest {
37+
protected ZkGetDataRequest(String path) {
38+
super(path);
39+
}
40+
}
41+
42+
/** The request for ZooKeeper getChildren async operation. */
43+
public static class ZkGetChildrenRequest extends ZkAsyncRequest {
44+
protected ZkGetChildrenRequest(String path) {
45+
super(path);
46+
}
47+
}
48+
}

0 commit comments

Comments
 (0)