Skip to content

Commit a497f3d

Browse files
LiebingYuLiebing
authored andcommitted
[server] Use batch operations of ZooKeeper to optimize updateLeaderAndIsr (apache#1445)
Co-authored-by: Liebing <[email protected]>
1 parent ef2c300 commit a497f3d

File tree

4 files changed

+152
-11
lines changed

4 files changed

+152
-11
lines changed

fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorEventProcessor.java

Lines changed: 29 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@
9191

9292
import java.util.ArrayList;
9393
import java.util.Collections;
94+
import java.util.HashMap;
9495
import java.util.HashSet;
9596
import java.util.List;
9697
import java.util.Map;
@@ -831,6 +832,7 @@ private List<AdjustIsrResultForBucket> tryProcessAdjustIsr(
831832
// TODO verify leader epoch.
832833

833834
List<AdjustIsrResultForBucket> result = new ArrayList<>();
835+
Map<TableBucket, LeaderAndIsr> newLeaderAndIsrList = new HashMap<>();
834836
for (Map.Entry<TableBucket, LeaderAndIsr> entry : leaderAndIsrList.entrySet()) {
835837
TableBucket tableBucket = entry.getKey();
836838
LeaderAndIsr tryAdjustLeaderAndIsr = entry.getValue();
@@ -863,21 +865,37 @@ private List<AdjustIsrResultForBucket> tryProcessAdjustIsr(
863865
tryAdjustLeaderAndIsr.isr(),
864866
coordinatorContext.getCoordinatorEpoch(),
865867
currentLeaderAndIsr.bucketEpoch() + 1);
866-
try {
867-
zooKeeperClient.updateLeaderAndIsr(tableBucket, newLeaderAndIsr);
868-
} catch (Exception e) {
869-
LOG.error("Error when register leader and isr.", e);
870-
result.add(new AdjustIsrResultForBucket(tableBucket, ApiError.fromThrowable(e)));
868+
newLeaderAndIsrList.put(tableBucket, newLeaderAndIsr);
869+
}
870+
871+
try {
872+
zooKeeperClient.batchUpdateLeaderAndIsr(newLeaderAndIsrList);
873+
newLeaderAndIsrList.forEach(
874+
(tableBucket, newLeaderAndIsr) ->
875+
result.add(new AdjustIsrResultForBucket(tableBucket, newLeaderAndIsr)));
876+
} catch (Exception batchException) {
877+
LOG.error("Error when batch update leader and isr. Try one by one.", batchException);
878+
879+
for (Map.Entry<TableBucket, LeaderAndIsr> entry : newLeaderAndIsrList.entrySet()) {
880+
TableBucket tableBucket = entry.getKey();
881+
LeaderAndIsr newLeaderAndIsr = entry.getValue();
882+
try {
883+
zooKeeperClient.updateLeaderAndIsr(tableBucket, newLeaderAndIsr);
884+
} catch (Exception e) {
885+
LOG.error("Error when register leader and isr.", e);
886+
result.add(
887+
new AdjustIsrResultForBucket(tableBucket, ApiError.fromThrowable(e)));
888+
}
889+
// Successful return.
890+
result.add(new AdjustIsrResultForBucket(tableBucket, newLeaderAndIsr));
871891
}
892+
}
872893

873-
// update coordinator leader and isr cache.
874-
coordinatorContext.putBucketLeaderAndIsr(tableBucket, newLeaderAndIsr);
894+
// update coordinator leader and isr cache.
895+
newLeaderAndIsrList.forEach(coordinatorContext::putBucketLeaderAndIsr);
875896

876-
// TODO update metadata for all alive tablet servers.
897+
// TODO update metadata for all alive tablet servers.
877898

878-
// Successful return.
879-
result.add(new AdjustIsrResultForBucket(tableBucket, newLeaderAndIsr));
880-
}
881899
return result;
882900
}
883901

fluss-server/src/main/java/com/alibaba/fluss/server/zk/ZooKeeperClient.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -301,6 +301,31 @@ public void updateLeaderAndIsr(TableBucket tableBucket, LeaderAndIsr leaderAndIs
301301
LOG.info("Updated {} for bucket {} in Zookeeper.", leaderAndIsr, tableBucket);
302302
}
303303

304+
public void batchUpdateLeaderAndIsr(Map<TableBucket, LeaderAndIsr> leaderAndIsrList)
305+
throws Exception {
306+
if (leaderAndIsrList.isEmpty()) {
307+
return;
308+
}
309+
310+
List<CuratorOp> ops = new ArrayList<>(leaderAndIsrList.size());
311+
for (Map.Entry<TableBucket, LeaderAndIsr> entry : leaderAndIsrList.entrySet()) {
312+
TableBucket tableBucket = entry.getKey();
313+
LeaderAndIsr leaderAndIsr = entry.getValue();
314+
315+
String path = LeaderAndIsrZNode.path(tableBucket);
316+
byte[] data = LeaderAndIsrZNode.encode(leaderAndIsr);
317+
CuratorOp updateOp = zkClient.transactionOp().setData().forPath(path, data);
318+
ops.add(updateOp);
319+
if (ops.size() == MAX_BATCH_SIZE) {
320+
zkClient.transaction().forOperations(ops);
321+
ops.clear();
322+
}
323+
}
324+
if (!ops.isEmpty()) {
325+
zkClient.transaction().forOperations(ops);
326+
}
327+
}
328+
304329
public void deleteLeaderAndIsr(TableBucket tableBucket) throws Exception {
305330
String path = LeaderAndIsrZNode.path(tableBucket);
306331
zkClient.delete().forPath(path);

fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/CoordinatorEventProcessorTest.java

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,16 +31,19 @@
3131
import com.alibaba.fluss.metadata.TableDescriptor;
3232
import com.alibaba.fluss.metadata.TablePartition;
3333
import com.alibaba.fluss.metadata.TablePath;
34+
import com.alibaba.fluss.rpc.messages.AdjustIsrResponse;
3435
import com.alibaba.fluss.rpc.messages.CommitKvSnapshotResponse;
3536
import com.alibaba.fluss.rpc.messages.CommitRemoteLogManifestResponse;
3637
import com.alibaba.fluss.rpc.messages.NotifyKvSnapshotOffsetRequest;
3738
import com.alibaba.fluss.rpc.messages.NotifyRemoteLogOffsetsRequest;
3839
import com.alibaba.fluss.server.coordinator.event.AccessContextEvent;
40+
import com.alibaba.fluss.server.coordinator.event.AdjustIsrReceivedEvent;
3941
import com.alibaba.fluss.server.coordinator.event.CommitKvSnapshotEvent;
4042
import com.alibaba.fluss.server.coordinator.event.CommitRemoteLogManifestEvent;
4143
import com.alibaba.fluss.server.coordinator.event.CoordinatorEventManager;
4244
import com.alibaba.fluss.server.coordinator.statemachine.BucketState;
4345
import com.alibaba.fluss.server.coordinator.statemachine.ReplicaState;
46+
import com.alibaba.fluss.server.entity.AdjustIsrResultForBucket;
4447
import com.alibaba.fluss.server.entity.CommitKvSnapshotData;
4548
import com.alibaba.fluss.server.entity.CommitRemoteLogManifestData;
4649
import com.alibaba.fluss.server.kv.snapshot.CompletedSnapshot;
@@ -78,6 +81,7 @@
7881
import java.time.Duration;
7982
import java.util.Arrays;
8083
import java.util.Collections;
84+
import java.util.HashMap;
8185
import java.util.List;
8286
import java.util.Map;
8387
import java.util.Optional;
@@ -99,6 +103,7 @@
99103
import static com.alibaba.fluss.server.coordinator.statemachine.ReplicaState.OfflineReplica;
100104
import static com.alibaba.fluss.server.coordinator.statemachine.ReplicaState.OnlineReplica;
101105
import static com.alibaba.fluss.server.testutils.KvTestUtils.mockCompletedSnapshot;
106+
import static com.alibaba.fluss.server.utils.ServerRpcMessageUtils.getAdjustIsrResponseData;
102107
import static com.alibaba.fluss.server.utils.TableAssignmentUtils.generateAssignment;
103108
import static com.alibaba.fluss.testutils.common.CommonTestUtils.retry;
104109
import static com.alibaba.fluss.testutils.common.CommonTestUtils.waitValue;
@@ -761,6 +766,59 @@ void testNotifyOffsetsWithShrinkISR(@TempDir Path tempDir) throws Exception {
761766
verifyReceiveRequestExceptFor(3, leader, NotifyKvSnapshotOffsetRequest.class);
762767
}
763768

769+
@Test
770+
void testProcessAdjustIsr() throws Exception {
771+
// make sure all request to gateway should be successful
772+
initCoordinatorChannel();
773+
// create a table,
774+
TablePath t1 = TablePath.of(defaultDatabase, "create_process_adjust_isr");
775+
int nBuckets = 3;
776+
int replicationFactor = 3;
777+
TableAssignment tableAssignment =
778+
generateAssignment(
779+
nBuckets,
780+
replicationFactor,
781+
new TabletServerInfo[] {
782+
new TabletServerInfo(0, "rack0"),
783+
new TabletServerInfo(1, "rack1"),
784+
new TabletServerInfo(2, "rack2")
785+
});
786+
long t1Id = metadataManager.createTable(t1, TEST_TABLE, tableAssignment, false);
787+
verifyTableCreated(t1Id, tableAssignment, nBuckets, replicationFactor);
788+
789+
// get the origin bucket leaderAndIsr
790+
Map<TableBucket, LeaderAndIsr> bucketLeaderAndIsrMap =
791+
new HashMap<>(
792+
waitValue(
793+
() -> fromCtx((ctx) -> Optional.of(ctx.bucketLeaderAndIsr())),
794+
Duration.ofMinutes(1),
795+
"leader not elected"));
796+
797+
// verify AdjustIsrReceivedEvent
798+
CompletableFuture<AdjustIsrResponse> response = new CompletableFuture<>();
799+
eventProcessor
800+
.getCoordinatorEventManager()
801+
.put(new AdjustIsrReceivedEvent(bucketLeaderAndIsrMap, response));
802+
803+
retryVerifyContext(
804+
ctx ->
805+
bucketLeaderAndIsrMap.forEach(
806+
(tableBucket, leaderAndIsr) ->
807+
assertThat(ctx.getBucketLeaderAndIsr(tableBucket))
808+
.contains(
809+
leaderAndIsr.newLeaderAndIsr(
810+
leaderAndIsr.leader(),
811+
leaderAndIsr.isr()))));
812+
813+
// verify the response
814+
AdjustIsrResponse adjustIsrResponse = response.get();
815+
Map<TableBucket, AdjustIsrResultForBucket> resultForBucketMap =
816+
getAdjustIsrResponseData(adjustIsrResponse);
817+
assertThat(resultForBucketMap.keySet())
818+
.containsAnyElementsOf(bucketLeaderAndIsrMap.keySet());
819+
assertThat(resultForBucketMap.values()).allMatch(AdjustIsrResultForBucket::succeeded);
820+
}
821+
764822
private CoordinatorEventProcessor buildCoordinatorEventProcessor() {
765823
return new CoordinatorEventProcessor(
766824
zookeeperClient,

fluss-server/src/test/java/com/alibaba/fluss/server/zk/ZooKeeperClientTest.java

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@
5858
import java.util.Map;
5959
import java.util.Optional;
6060
import java.util.Set;
61+
import java.util.stream.Collectors;
6162

6263
import static com.alibaba.fluss.server.utils.TableAssignmentUtils.generateAssignment;
6364
import static org.assertj.core.api.Assertions.assertThat;
@@ -220,6 +221,45 @@ void testBatchCreateLeaderAndIsr() throws Exception {
220221
}
221222
}
222223

224+
@Test
225+
void testBatchUpdateLeaderAndIsr() throws Exception {
226+
int totalCount = 100;
227+
228+
// try to register bucket leaderAndIsr
229+
Map<TableBucket, LeaderAndIsr> leaderAndIsrList = new HashMap<>();
230+
for (int i = 0; i < totalCount; i++) {
231+
TableBucket tableBucket = new TableBucket(1, i);
232+
LeaderAndIsr leaderAndIsr =
233+
new LeaderAndIsr(i, 10, Arrays.asList(i + 1, i + 2, i + 3), 100, 1000);
234+
leaderAndIsrList.put(tableBucket, leaderAndIsr);
235+
zookeeperClient.registerLeaderAndIsr(tableBucket, leaderAndIsr);
236+
}
237+
238+
// try to batch update
239+
Map<TableBucket, LeaderAndIsr> updateLeaderAndIsrList =
240+
leaderAndIsrList.entrySet().stream()
241+
.collect(
242+
Collectors.toMap(
243+
Map.Entry::getKey,
244+
entry -> {
245+
LeaderAndIsr old = entry.getValue();
246+
return new LeaderAndIsr(
247+
old.leader() + 1,
248+
old.leaderEpoch() + 1,
249+
old.isr(),
250+
old.coordinatorEpoch() + 1,
251+
old.bucketEpoch() + 1);
252+
}));
253+
zookeeperClient.batchUpdateLeaderAndIsr(updateLeaderAndIsrList);
254+
for (Map.Entry<TableBucket, LeaderAndIsr> entry : updateLeaderAndIsrList.entrySet()) {
255+
TableBucket tableBucket = entry.getKey();
256+
LeaderAndIsr leaderAndIsr = entry.getValue();
257+
assertThat(zookeeperClient.getLeaderAndIsr(tableBucket)).hasValue(leaderAndIsr);
258+
zookeeperClient.deleteLeaderAndIsr(tableBucket);
259+
assertThat(zookeeperClient.getLeaderAndIsr(tableBucket)).isEmpty();
260+
}
261+
}
262+
223263
@Test
224264
void testTable() throws Exception {
225265
TablePath tablePath = TablePath.of("db", "tb");

0 commit comments

Comments
 (0)