2020import org .apache .fluss .annotation .Internal ;
2121import org .apache .fluss .config .ConfigOptions ;
2222import org .apache .fluss .config .Configuration ;
23- import org .apache .fluss .exception .InvalidCoordinatorException ;
2423import org .apache .fluss .metadata .PhysicalTablePath ;
2524import org .apache .fluss .metadata .ResolvedPartitionSpec ;
2625import org .apache .fluss .metadata .Schema ;
7574import org .apache .fluss .server .zk .data .ZkData .TableZNode ;
7675import org .apache .fluss .server .zk .data .ZkData .TablesZNode ;
7776import org .apache .fluss .server .zk .data .ZkData .WriterIdZNode ;
77+ import org .apache .fluss .server .zk .data .ZkVersion ;
7878import org .apache .fluss .shaded .curator5 .org .apache .curator .framework .CuratorFramework ;
7979import org .apache .fluss .shaded .curator5 .org .apache .curator .framework .api .BackgroundCallback ;
8080import org .apache .fluss .shaded .curator5 .org .apache .curator .framework .api .CuratorEvent ;
112112
113113import static java .util .stream .Collectors .toMap ;
114114import static org .apache .fluss .metadata .ResolvedPartitionSpec .fromPartitionName ;
115+ import static org .apache .fluss .server .zk .ZooKeeperOp .multiRequest ;
115116import static org .apache .fluss .utils .Preconditions .checkNotNull ;
116117
117118/**
@@ -129,6 +130,7 @@ public class ZooKeeperClient implements AutoCloseable {
129130 private final CuratorFrameworkWithUnhandledErrorListener curatorFrameworkWrapper ;
130131
131132 private final CuratorFramework zkClient ;
133+ private final ZooKeeperOp zkOp ;
132134 private final ZkSequenceIDCounter tableIdCounter ;
133135 private final ZkSequenceIDCounter partitionIdCounter ;
134136 private final ZkSequenceIDCounter writerIdCounter ;
@@ -140,6 +142,7 @@ public ZooKeeperClient(
140142 Configuration configuration ) {
141143 this .curatorFrameworkWrapper = curatorFrameworkWrapper ;
142144 this .zkClient = curatorFrameworkWrapper .asCuratorFramework ();
145+ this .zkOp = new ZooKeeperOp (zkClient );
143146 this .tableIdCounter = new ZkSequenceIDCounter (zkClient , TableSequenceIdZNode .path ());
144147 this .partitionIdCounter =
145148 new ZkSequenceIDCounter (zkClient , PartitionSequenceIdZNode .path ());
@@ -394,13 +397,14 @@ public void deletePartitionAssignment(long partitionId) throws Exception {
394397 // --------------------------------------------------------------------------------------------
395398
396399 /** Register bucket LeaderAndIsr to ZK. */
397- public void registerLeaderAndIsr (TableBucket tableBucket , LeaderAndIsr leaderAndIsr )
400+ public void registerLeaderAndIsr (
401+ TableBucket tableBucket , LeaderAndIsr leaderAndIsr , int expectedZkVersion )
398402 throws Exception {
403+
399404 String path = LeaderAndIsrZNode .path (tableBucket );
400- zkClient .create ()
401- .creatingParentsIfNeeded ()
402- .withMode (CreateMode .PERSISTENT )
403- .forPath (path , LeaderAndIsrZNode .encode (leaderAndIsr ));
405+ byte [] data = LeaderAndIsrZNode .encode (leaderAndIsr );
406+
407+ createRecursive (path , data , expectedZkVersion , false );
404408 LOG .info ("Registered {} for bucket {} in Zookeeper." , leaderAndIsr , tableBucket );
405409 }
406410
@@ -479,24 +483,19 @@ public Map<TableBucket, LeaderAndIsr> getLeaderAndIsrs(Collection<TableBucket> t
479483 }
480484
481485 public void updateLeaderAndIsr (
482- TableBucket tableBucket , LeaderAndIsr leaderAndIsr , int currentCoordinatorEpoch )
486+ TableBucket tableBucket , LeaderAndIsr leaderAndIsr , int expectedZkVersion )
483487 throws Exception {
484- // check coordinator epoch to ensure no other Coordinator leader exists.
485- if (leaderAndIsr .coordinatorEpoch () != currentCoordinatorEpoch ) {
486- throw new InvalidCoordinatorException (
487- String .format (
488- "LeaderAndIsr coordinator epoch %d does not match current coordinator epoch %d for bucket %s. "
489- + "This coordinator may no longer be the leader." ,
490- leaderAndIsr .coordinatorEpoch (), currentCoordinatorEpoch , tableBucket ));
491- }
492-
493488 String path = LeaderAndIsrZNode .path (tableBucket );
494- zkClient .setData ().forPath (path , LeaderAndIsrZNode .encode (leaderAndIsr ));
489+ byte [] data = LeaderAndIsrZNode .encode (leaderAndIsr );
490+
491+ CuratorOp updateOp = zkOp .updateOp (path , data );
492+ List <CuratorOp > ops = wrapRequestWithCoordinatorEpochCheck (updateOp , expectedZkVersion );
493+
494+ zkClient .transaction ().forOperations (ops );
495495 LOG .info ("Updated {} for bucket {} in Zookeeper." , leaderAndIsr , tableBucket );
496496 }
497497
498- public void batchUpdateLeaderAndIsr (
499- Map <TableBucket , LeaderAndIsr > leaderAndIsrList , int currentCoordinatorEpoch )
498+ public void batchUpdateLeaderAndIsr (Map <TableBucket , LeaderAndIsr > leaderAndIsrList )
500499 throws Exception {
501500 if (leaderAndIsrList .isEmpty ()) {
502501 return ;
@@ -506,18 +505,6 @@ public void batchUpdateLeaderAndIsr(
506505 for (Map .Entry <TableBucket , LeaderAndIsr > entry : leaderAndIsrList .entrySet ()) {
507506 TableBucket tableBucket = entry .getKey ();
508507 LeaderAndIsr leaderAndIsr = entry .getValue ();
509-
510- // check coordinator epoch to ensure no other Coordinator leader exists.
511- if (leaderAndIsr .coordinatorEpoch () != currentCoordinatorEpoch ) {
512- throw new InvalidCoordinatorException (
513- String .format (
514- "LeaderAndIsr coordinator epoch %d does not match current coordinator epoch %d for bucket %s. "
515- + "This coordinator may no longer be the leader." ,
516- leaderAndIsr .coordinatorEpoch (),
517- currentCoordinatorEpoch ,
518- tableBucket ));
519- }
520-
521508 LOG .info ("Batch Update {} for bucket {} in Zookeeper." , leaderAndIsr , tableBucket );
522509 String path = LeaderAndIsrZNode .path (tableBucket );
523510 byte [] data = LeaderAndIsrZNode .encode (leaderAndIsr );
@@ -1609,4 +1596,48 @@ public static <K> Map<K, List<String>> processGetChildrenResponses(
16091596 }
16101597 return result ;
16111598 }
1599+
1600+ public void createRecursive (
1601+ String path , byte [] data , int expectedZkVersion , boolean throwIfPathExists )
1602+ throws Exception {
1603+ CuratorOp createOp = zkOp .createOp (path , data , CreateMode .PERSISTENT );
1604+ List <CuratorOp > ops = wrapRequestWithCoordinatorEpochCheck (createOp , expectedZkVersion );
1605+
1606+ try {
1607+ // try to directly create
1608+ zkClient .transaction ().forOperations (ops );
1609+ } catch (KeeperException .NodeExistsException e ) {
1610+ // should not exist
1611+ if (throwIfPathExists ) {
1612+ throw e ;
1613+ }
1614+ } catch (KeeperException .NoNodeException e ) {
1615+ // if parent does not exist, create parent first
1616+ int indexOfLastSlash = path .lastIndexOf ("/" );
1617+ if (indexOfLastSlash == -1 ) {
1618+ throw new IllegalArgumentException ("Invalid path {}" + path );
1619+ }
1620+ String parentPath = path .substring (0 , indexOfLastSlash );
1621+ createRecursive (parentPath , null , expectedZkVersion , throwIfPathExists );
1622+ // After creating parent, retry creating the original path
1623+ zkClient .transaction ().forOperations (ops );
1624+ }
1625+ }
1626+
1627+ public List <CuratorOp > wrapRequestWithCoordinatorEpochCheck (
1628+ CuratorOp request , int expectedZkVersion ) throws Exception {
1629+ if (ZkVersion .MATCH_ANY_VERSION .getVersion () == expectedZkVersion ) {
1630+ return Collections .singletonList (request );
1631+ } else if (expectedZkVersion >= 0 ) {
1632+ CuratorOp checkOp =
1633+ zkOp .checkOp (ZkData .CoordinatorEpochZNode .path (), expectedZkVersion );
1634+ return multiRequest (checkOp , request );
1635+ } else {
1636+ throw new IllegalArgumentException (
1637+ "Expected coordinator epoch zkVersion "
1638+ + expectedZkVersion
1639+ + " should be non-negative or equal to "
1640+ + ZkVersion .MATCH_ANY_VERSION .getVersion ());
1641+ }
1642+ }
16121643}
0 commit comments