Skip to content

Commit 9b79d1e

Browse files
committed
support coordinator epoch1
1 parent 492bdc9 commit 9b79d1e

File tree

12 files changed

+179
-68
lines changed

12 files changed

+179
-68
lines changed

fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1012,8 +1012,7 @@ private List<AdjustIsrResultForBucket> tryProcessAdjustIsr(
10121012
}
10131013

10141014
try {
1015-
zooKeeperClient.batchUpdateLeaderAndIsr(
1016-
newLeaderAndIsrList, coordinatorContext.getCoordinatorEpoch());
1015+
zooKeeperClient.batchUpdateLeaderAndIsr(newLeaderAndIsrList);
10171016
newLeaderAndIsrList.forEach(
10181017
(tableBucket, newLeaderAndIsr) ->
10191018
result.add(new AdjustIsrResultForBucket(tableBucket, newLeaderAndIsr)));
@@ -1024,8 +1023,7 @@ private List<AdjustIsrResultForBucket> tryProcessAdjustIsr(
10241023
TableBucket tableBucket = entry.getKey();
10251024
LeaderAndIsr newLeaderAndIsr = entry.getValue();
10261025
try {
1027-
zooKeeperClient.updateLeaderAndIsr(
1028-
tableBucket, newLeaderAndIsr, coordinatorContext.getCoordinatorEpoch());
1026+
zooKeeperClient.updateLeaderAndIsr(tableBucket, newLeaderAndIsr);
10291027
} catch (Exception e) {
10301028
LOG.error("Error when register leader and isr.", e);
10311029
result.add(

fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/ReplicaStateMachine.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -459,8 +459,7 @@ private Map<TableBucketReplica, LeaderAndIsr> doRemoveReplicaFromIsr(
459459
toUpdateLeaderAndIsrList.put(tableBucket, adjustLeaderAndIsr);
460460
}
461461
try {
462-
zooKeeperClient.batchUpdateLeaderAndIsr(
463-
toUpdateLeaderAndIsrList, coordinatorContext.getCoordinatorEpoch());
462+
zooKeeperClient.batchUpdateLeaderAndIsr(toUpdateLeaderAndIsrList);
464463
toUpdateLeaderAndIsrList.forEach(coordinatorContext::putBucketLeaderAndIsr);
465464
return adjustedLeaderAndIsr;
466465
} catch (Exception e) {

fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/TableBucketStateMachine.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -309,7 +309,7 @@ private Optional<ElectionResult> initLeaderForTableBuckets(
309309
ElectionResult electionResult = optionalElectionResult.get();
310310
LeaderAndIsr leaderAndIsr = electionResult.leaderAndIsr;
311311
try {
312-
zooKeeperClient.registerLeaderAndIsr(tableBucket, leaderAndIsr);
312+
zooKeeperClient.registerLeaderAndIsr(tableBucket, leaderAndIsr, 0);
313313
} catch (Exception e) {
314314
LOG.error(
315315
"Fail to create state node for table bucket {} in zookeeper.",
@@ -445,7 +445,8 @@ private List<RegisterTableBucketLeadAndIsrInfo> tryRegisterLeaderAndIsrOneByOne(
445445
List<RegisterTableBucketLeadAndIsrInfo> registerSuccessList = new ArrayList<>();
446446
for (RegisterTableBucketLeadAndIsrInfo info : registerList) {
447447
try {
448-
zooKeeperClient.registerLeaderAndIsr(info.getTableBucket(), info.getLeaderAndIsr());
448+
zooKeeperClient.registerLeaderAndIsr(
449+
info.getTableBucket(), info.getLeaderAndIsr(), 0);
449450
registerSuccessList.add(info);
450451
} catch (Exception e) {
451452
LOG.error(
@@ -487,10 +488,7 @@ private Optional<ElectionResult> electNewLeaderForTableBuckets(
487488
}
488489
ElectionResult electionResult = optionalElectionResult.get();
489490
try {
490-
zooKeeperClient.updateLeaderAndIsr(
491-
tableBucket,
492-
electionResult.leaderAndIsr,
493-
coordinatorContext.getCoordinatorEpoch());
491+
zooKeeperClient.updateLeaderAndIsr(tableBucket, electionResult.leaderAndIsr);
494492
} catch (Exception e) {
495493
LOG.error(
496494
"Fail to update bucket LeaderAndIsr for table bucket {}.",

fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java

Lines changed: 64 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import org.apache.fluss.annotation.Internal;
2121
import org.apache.fluss.config.ConfigOptions;
2222
import org.apache.fluss.config.Configuration;
23-
import org.apache.fluss.exception.InvalidCoordinatorException;
2423
import org.apache.fluss.metadata.PhysicalTablePath;
2524
import org.apache.fluss.metadata.ResolvedPartitionSpec;
2625
import org.apache.fluss.metadata.Schema;
@@ -75,6 +74,7 @@
7574
import org.apache.fluss.server.zk.data.ZkData.TableZNode;
7675
import org.apache.fluss.server.zk.data.ZkData.TablesZNode;
7776
import org.apache.fluss.server.zk.data.ZkData.WriterIdZNode;
77+
import org.apache.fluss.server.zk.data.ZkVersion;
7878
import org.apache.fluss.shaded.curator5.org.apache.curator.framework.CuratorFramework;
7979
import org.apache.fluss.shaded.curator5.org.apache.curator.framework.api.BackgroundCallback;
8080
import org.apache.fluss.shaded.curator5.org.apache.curator.framework.api.CuratorEvent;
@@ -112,6 +112,7 @@
112112

113113
import static java.util.stream.Collectors.toMap;
114114
import static org.apache.fluss.metadata.ResolvedPartitionSpec.fromPartitionName;
115+
import static org.apache.fluss.server.zk.ZooKeeperOp.multiRequest;
115116
import static org.apache.fluss.utils.Preconditions.checkNotNull;
116117

117118
/**
@@ -129,6 +130,7 @@ public class ZooKeeperClient implements AutoCloseable {
129130
private final CuratorFrameworkWithUnhandledErrorListener curatorFrameworkWrapper;
130131

131132
private final CuratorFramework zkClient;
133+
private final ZooKeeperOp zkOp;
132134
private final ZkSequenceIDCounter tableIdCounter;
133135
private final ZkSequenceIDCounter partitionIdCounter;
134136
private final ZkSequenceIDCounter writerIdCounter;
@@ -140,6 +142,7 @@ public ZooKeeperClient(
140142
Configuration configuration) {
141143
this.curatorFrameworkWrapper = curatorFrameworkWrapper;
142144
this.zkClient = curatorFrameworkWrapper.asCuratorFramework();
145+
this.zkOp = new ZooKeeperOp(zkClient);
143146
this.tableIdCounter = new ZkSequenceIDCounter(zkClient, TableSequenceIdZNode.path());
144147
this.partitionIdCounter =
145148
new ZkSequenceIDCounter(zkClient, PartitionSequenceIdZNode.path());
@@ -394,13 +397,22 @@ public void deletePartitionAssignment(long partitionId) throws Exception {
394397
// --------------------------------------------------------------------------------------------
395398

396399
/** Register bucket LeaderAndIsr to ZK. */
397-
public void registerLeaderAndIsr(TableBucket tableBucket, LeaderAndIsr leaderAndIsr)
400+
public void registerLeaderAndIsr(
401+
TableBucket tableBucket, LeaderAndIsr leaderAndIsr, int expectedZkVersion)
398402
throws Exception {
403+
399404
String path = LeaderAndIsrZNode.path(tableBucket);
400-
zkClient.create()
401-
.creatingParentsIfNeeded()
402-
.withMode(CreateMode.PERSISTENT)
403-
.forPath(path, LeaderAndIsrZNode.encode(leaderAndIsr));
405+
byte[] data = LeaderAndIsrZNode.encode(leaderAndIsr);
406+
407+
createRecursive(path, data, expectedZkVersion, false);
408+
// List<CuratorTransactionResult> transactionResultList =
409+
// zkClient.transaction().forOperations(ops);
410+
//
411+
// String path = LeaderAndIsrZNode.path(tableBucket);
412+
// zkClient.create()
413+
// .creatingParentsIfNeeded()
414+
// .withMode(CreateMode.PERSISTENT)
415+
// .forPath(path, LeaderAndIsrZNode.encode(leaderAndIsr));
404416
LOG.info("Registered {} for bucket {} in Zookeeper.", leaderAndIsr, tableBucket);
405417
}
406418

@@ -478,25 +490,14 @@ public Map<TableBucket, LeaderAndIsr> getLeaderAndIsrs(Collection<TableBucket> t
478490
"leader and isr");
479491
}
480492

481-
public void updateLeaderAndIsr(
482-
TableBucket tableBucket, LeaderAndIsr leaderAndIsr, int currentCoordinatorEpoch)
493+
public void updateLeaderAndIsr(TableBucket tableBucket, LeaderAndIsr leaderAndIsr)
483494
throws Exception {
484-
// check coordinator epoch to ensure no other Coordinator leader exists.
485-
if (leaderAndIsr.coordinatorEpoch() != currentCoordinatorEpoch) {
486-
throw new InvalidCoordinatorException(
487-
String.format(
488-
"LeaderAndIsr coordinator epoch %d does not match current coordinator epoch %d for bucket %s. "
489-
+ "This coordinator may no longer be the leader.",
490-
leaderAndIsr.coordinatorEpoch(), currentCoordinatorEpoch, tableBucket));
491-
}
492-
493495
String path = LeaderAndIsrZNode.path(tableBucket);
494496
zkClient.setData().forPath(path, LeaderAndIsrZNode.encode(leaderAndIsr));
495497
LOG.info("Updated {} for bucket {} in Zookeeper.", leaderAndIsr, tableBucket);
496498
}
497499

498-
public void batchUpdateLeaderAndIsr(
499-
Map<TableBucket, LeaderAndIsr> leaderAndIsrList, int currentCoordinatorEpoch)
500+
public void batchUpdateLeaderAndIsr(Map<TableBucket, LeaderAndIsr> leaderAndIsrList)
500501
throws Exception {
501502
if (leaderAndIsrList.isEmpty()) {
502503
return;
@@ -506,18 +507,6 @@ public void batchUpdateLeaderAndIsr(
506507
for (Map.Entry<TableBucket, LeaderAndIsr> entry : leaderAndIsrList.entrySet()) {
507508
TableBucket tableBucket = entry.getKey();
508509
LeaderAndIsr leaderAndIsr = entry.getValue();
509-
510-
// check coordinator epoch to ensure no other Coordinator leader exists.
511-
if (leaderAndIsr.coordinatorEpoch() != currentCoordinatorEpoch) {
512-
throw new InvalidCoordinatorException(
513-
String.format(
514-
"LeaderAndIsr coordinator epoch %d does not match current coordinator epoch %d for bucket %s. "
515-
+ "This coordinator may no longer be the leader.",
516-
leaderAndIsr.coordinatorEpoch(),
517-
currentCoordinatorEpoch,
518-
tableBucket));
519-
}
520-
521510
LOG.info("Batch Update {} for bucket {} in Zookeeper.", leaderAndIsr, tableBucket);
522511
String path = LeaderAndIsrZNode.path(tableBucket);
523512
byte[] data = LeaderAndIsrZNode.encode(leaderAndIsr);
@@ -1609,4 +1598,48 @@ public static <K> Map<K, List<String>> processGetChildrenResponses(
16091598
}
16101599
return result;
16111600
}
1601+
1602+
public void createRecursive(
1603+
String path, byte[] data, int expectedZkVersion, boolean throwIfPathExists)
1604+
throws Exception {
1605+
List<CuratorOp> ops = wrapCheckAndCreateRequest(path, data, expectedZkVersion);
1606+
1607+
try {
1608+
// try to directly create
1609+
zkClient.transaction().forOperations(ops);
1610+
} catch (KeeperException.NodeExistsException e) {
1611+
// should not exist
1612+
if (throwIfPathExists) {
1613+
throw e;
1614+
}
1615+
} catch (KeeperException.NoNodeException e) {
1616+
// if parent does not exist, create parent first
1617+
int indexOfLastSlash = path.lastIndexOf("/");
1618+
if (indexOfLastSlash == -1) {
1619+
throw new IllegalArgumentException("Invalid path {}" + path);
1620+
}
1621+
String parentPath = path.substring(0, indexOfLastSlash);
1622+
createRecursive(parentPath, null, expectedZkVersion, throwIfPathExists);
1623+
// After creating parent, retry creating the original path
1624+
zkClient.transaction().forOperations(ops);
1625+
}
1626+
}
1627+
1628+
public List<CuratorOp> wrapCheckAndCreateRequest(
1629+
String path, byte[] data, int expectedZkVersion) throws Exception {
1630+
if (ZkVersion.MATCH_ANY_VERSION.getVersion() == expectedZkVersion) {
1631+
return Collections.singletonList(zkOp.createOp(path, data, CreateMode.PERSISTENT));
1632+
} else if (expectedZkVersion >= 0) {
1633+
CuratorOp checkOp =
1634+
zkOp.checkOp(ZkData.CoordinatorEpochZNode.path(), expectedZkVersion);
1635+
CuratorOp createOp = zkOp.createOp(path, data, CreateMode.PERSISTENT);
1636+
return multiRequest(checkOp, createOp);
1637+
} else {
1638+
throw new IllegalArgumentException(
1639+
"Expected coordinator epoch zkVersion "
1640+
+ expectedZkVersion
1641+
+ " should be non-negative or equal to "
1642+
+ ZkVersion.MATCH_ANY_VERSION.getVersion());
1643+
}
1644+
}
16121645
}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.server.zk;
19+
20+
import org.apache.fluss.shaded.curator5.org.apache.curator.framework.CuratorFramework;
21+
import org.apache.fluss.shaded.curator5.org.apache.curator.framework.api.transaction.CuratorOp;
22+
import org.apache.fluss.shaded.zookeeper3.org.apache.zookeeper.CreateMode;
23+
24+
import java.util.ArrayList;
25+
import java.util.List;
26+
27+
/** This class contains some utility methods for wrap/unwrap operations for Zookeeper. */
28+
public class ZooKeeperOp {
29+
private final CuratorFramework zkClient;
30+
31+
public ZooKeeperOp(CuratorFramework zkClient) {
32+
this.zkClient = zkClient;
33+
}
34+
35+
public CuratorOp checkOp(String path, int expectZkVersion) throws Exception {
36+
return zkClient.transactionOp().check().withVersion(expectZkVersion).forPath(path);
37+
}
38+
39+
public CuratorOp createOp(String path, byte[] data, CreateMode createMode) throws Exception {
40+
return zkClient.transactionOp().create().withMode(createMode).forPath(path, data);
41+
}
42+
43+
public static List<CuratorOp> multiRequest(CuratorOp op1, CuratorOp op2) {
44+
List<CuratorOp> ops = new ArrayList<>();
45+
ops.add(op1);
46+
ops.add(op2);
47+
return ops;
48+
}
49+
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.fluss.server.zk.data;
20+
21+
/** Enum to represent the type of special Zookeeper version. */
22+
public enum ZkVersion {
23+
MATCH_ANY_VERSION(-1),
24+
UNKNOWN_VERSION(-2);
25+
26+
private final int version;
27+
28+
ZkVersion(int version) {
29+
this.version = version;
30+
}
31+
32+
public int getVersion() {
33+
return version;
34+
}
35+
}

fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorServerTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ class CoordinatorServerTest extends ServerTestBase {
4040
@BeforeEach
4141
void beforeEach() throws Exception {
4242
coordinatorServer = startCoordinatorServer(createConfiguration());
43-
waitUtilCoordinatorServerElected();
43+
waitUntilCoordinatorServerElected();
4444
}
4545

4646
@AfterEach
@@ -75,7 +75,7 @@ protected void checkAfterStartServer() throws Exception {
7575
coordinatorServer.getRpcServer().getBindEndpoints());
7676
}
7777

78-
public void waitUtilCoordinatorServerElected() {
78+
public void waitUntilCoordinatorServerElected() {
7979
waitUntil(
8080
() -> zookeeperClient.getCoordinatorLeaderAddress().isPresent(),
8181
Duration.ofSeconds(10),

fluss-server/src/test/java/org/apache/fluss/server/coordinator/statemachine/ReplicaStateMachineTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -214,7 +214,7 @@ void testOfflineReplicasShouldBeRemovedFromIsr() throws Exception {
214214
}
215215
// put leader and isr
216216
LeaderAndIsr leaderAndIsr = new LeaderAndIsr(0, 0, Arrays.asList(0, 1, 2), 0, 0);
217-
zookeeperClient.registerLeaderAndIsr(tableBucket, leaderAndIsr);
217+
zookeeperClient.registerLeaderAndIsr(tableBucket, leaderAndIsr, 0);
218218
coordinatorContext.updateBucketReplicaAssignment(tableBucket, Arrays.asList(0, 1, 2));
219219
coordinatorContext.putBucketLeaderAndIsr(tableBucket, leaderAndIsr);
220220

@@ -250,7 +250,7 @@ void testOfflineReplicaShouldBeRemovedFromIsr() throws Exception {
250250
}
251251
// put leader and isr
252252
LeaderAndIsr leaderAndIsr = new LeaderAndIsr(0, 0, Arrays.asList(0, 1, 2), 0, 0);
253-
zookeeperClient.registerLeaderAndIsr(tableBucket, leaderAndIsr);
253+
zookeeperClient.registerLeaderAndIsr(tableBucket, leaderAndIsr, 0);
254254
coordinatorContext.updateBucketReplicaAssignment(tableBucket, Arrays.asList(0, 1, 2));
255255
coordinatorContext.putBucketLeaderAndIsr(tableBucket, leaderAndIsr);
256256

fluss-server/src/test/java/org/apache/fluss/server/coordinator/statemachine/TableBucketStateMachineTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -140,9 +140,9 @@ void testStartup() throws Exception {
140140

141141
// create LeaderAndIsr for t10/t11 info in zk,
142142
zookeeperClient.registerLeaderAndIsr(
143-
new TableBucket(t1Id, 0), new LeaderAndIsr(0, 0, Arrays.asList(0, 1), 0, 0));
143+
new TableBucket(t1Id, 0), new LeaderAndIsr(0, 0, Arrays.asList(0, 1), 0, 0), 0);
144144
zookeeperClient.registerLeaderAndIsr(
145-
new TableBucket(t1Id, 1), new LeaderAndIsr(2, 0, Arrays.asList(2, 3), 0, 0));
145+
new TableBucket(t1Id, 1), new LeaderAndIsr(2, 0, Arrays.asList(2, 3), 0, 0), 0);
146146
// update the LeaderAndIsr to context
147147
coordinatorContext.putBucketLeaderAndIsr(
148148
t1b0, zookeeperClient.getLeaderAndIsr(new TableBucket(t1Id, 0)).get());

fluss-server/src/test/java/org/apache/fluss/server/metadata/ZkBasedMetadataProviderTest.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -109,8 +109,8 @@ void testGetTableMetadataFromZk() throws Exception {
109109
LeaderAndIsr leaderAndIsr0 = new LeaderAndIsr(1, 10, Arrays.asList(1, 2, 3), 100, 1000);
110110
LeaderAndIsr leaderAndIsr1 = new LeaderAndIsr(2, 20, Arrays.asList(2, 3, 4), 200, 2000);
111111

112-
zookeeperClient.registerLeaderAndIsr(tableBucket0, leaderAndIsr0);
113-
zookeeperClient.registerLeaderAndIsr(tableBucket1, leaderAndIsr1);
112+
zookeeperClient.registerLeaderAndIsr(tableBucket0, leaderAndIsr0, 0);
113+
zookeeperClient.registerLeaderAndIsr(tableBucket1, leaderAndIsr1, 0);
114114

115115
List<TableMetadata> tablesMetadataFromZK =
116116
metadataProvider.getTablesMetadataFromZK(
@@ -170,8 +170,8 @@ void testGetPartitionMetadataFromZk() throws Exception {
170170
LeaderAndIsr leaderAndIsr0 = new LeaderAndIsr(1, 10, Arrays.asList(1, 2), 100, 1000);
171171
LeaderAndIsr leaderAndIsr1 = new LeaderAndIsr(2, 20, Arrays.asList(2, 3), 200, 2000);
172172

173-
zookeeperClient.registerLeaderAndIsr(partitionBucket0, leaderAndIsr0);
174-
zookeeperClient.registerLeaderAndIsr(partitionBucket1, leaderAndIsr1);
173+
zookeeperClient.registerLeaderAndIsr(partitionBucket0, leaderAndIsr0, 0);
174+
zookeeperClient.registerLeaderAndIsr(partitionBucket1, leaderAndIsr1, 0);
175175

176176
// Test getPartitionMetadataFromZkAsync
177177
PhysicalTablePath partitionPath = PhysicalTablePath.of(tablePath, partitionName);
@@ -254,11 +254,11 @@ void testBatchGetPartitionMetadataFromZkAsync() throws Exception {
254254
TableBucket bucket3 = new TableBucket(tableId2, partitionId3, 0);
255255

256256
zookeeperClient.registerLeaderAndIsr(
257-
bucket1, new LeaderAndIsr(1, 10, Arrays.asList(1, 2), 100, 1000));
257+
bucket1, new LeaderAndIsr(1, 10, Arrays.asList(1, 2), 100, 1000), 0);
258258
zookeeperClient.registerLeaderAndIsr(
259-
bucket2, new LeaderAndIsr(2, 20, Arrays.asList(2, 3), 200, 2000));
259+
bucket2, new LeaderAndIsr(2, 20, Arrays.asList(2, 3), 200, 2000), 0);
260260
zookeeperClient.registerLeaderAndIsr(
261-
bucket3, new LeaderAndIsr(1, 30, Arrays.asList(1, 3), 300, 3000));
261+
bucket3, new LeaderAndIsr(1, 30, Arrays.asList(1, 3), 300, 3000), 0);
262262

263263
// Test getPartitionsMetadataFromZK
264264
List<PhysicalTablePath> partitionPaths =

0 commit comments

Comments
 (0)