2020import org .apache .fluss .annotation .Internal ;
2121import org .apache .fluss .config .ConfigOptions ;
2222import org .apache .fluss .config .Configuration ;
23- import org .apache .fluss .exception .InvalidCoordinatorException ;
2423import org .apache .fluss .metadata .PhysicalTablePath ;
2524import org .apache .fluss .metadata .ResolvedPartitionSpec ;
2625import org .apache .fluss .metadata .Schema ;
7574import org .apache .fluss .server .zk .data .ZkData .TableZNode ;
7675import org .apache .fluss .server .zk .data .ZkData .TablesZNode ;
7776import org .apache .fluss .server .zk .data .ZkData .WriterIdZNode ;
77+ import org .apache .fluss .server .zk .data .ZkVersion ;
7878import org .apache .fluss .shaded .curator5 .org .apache .curator .framework .CuratorFramework ;
7979import org .apache .fluss .shaded .curator5 .org .apache .curator .framework .api .BackgroundCallback ;
8080import org .apache .fluss .shaded .curator5 .org .apache .curator .framework .api .CuratorEvent ;
112112
113113import static java .util .stream .Collectors .toMap ;
114114import static org .apache .fluss .metadata .ResolvedPartitionSpec .fromPartitionName ;
115+ import static org .apache .fluss .server .zk .ZooKeeperOp .multiRequest ;
115116import static org .apache .fluss .utils .Preconditions .checkNotNull ;
116117
117118/**
@@ -129,6 +130,7 @@ public class ZooKeeperClient implements AutoCloseable {
129130 private final CuratorFrameworkWithUnhandledErrorListener curatorFrameworkWrapper ;
130131
131132 private final CuratorFramework zkClient ;
133+ private final ZooKeeperOp zkOp ;
132134 private final ZkSequenceIDCounter tableIdCounter ;
133135 private final ZkSequenceIDCounter partitionIdCounter ;
134136 private final ZkSequenceIDCounter writerIdCounter ;
@@ -140,6 +142,7 @@ public ZooKeeperClient(
140142 Configuration configuration ) {
141143 this .curatorFrameworkWrapper = curatorFrameworkWrapper ;
142144 this .zkClient = curatorFrameworkWrapper .asCuratorFramework ();
145+ this .zkOp = new ZooKeeperOp (zkClient );
143146 this .tableIdCounter = new ZkSequenceIDCounter (zkClient , TableSequenceIdZNode .path ());
144147 this .partitionIdCounter =
145148 new ZkSequenceIDCounter (zkClient , PartitionSequenceIdZNode .path ());
@@ -394,13 +397,14 @@ public void deletePartitionAssignment(long partitionId) throws Exception {
394397 // --------------------------------------------------------------------------------------------
395398
396399 /** Register bucket LeaderAndIsr to ZK. */
397- public void registerLeaderAndIsr (TableBucket tableBucket , LeaderAndIsr leaderAndIsr )
400+ public void registerLeaderAndIsr (
401+ TableBucket tableBucket , LeaderAndIsr leaderAndIsr , int expectedZkVersion )
398402 throws Exception {
403+
399404 String path = LeaderAndIsrZNode .path (tableBucket );
400- zkClient .create ()
401- .creatingParentsIfNeeded ()
402- .withMode (CreateMode .PERSISTENT )
403- .forPath (path , LeaderAndIsrZNode .encode (leaderAndIsr ));
405+ byte [] data = LeaderAndIsrZNode .encode (leaderAndIsr );
406+
407+ createRecursive (path , data , expectedZkVersion , false );
404408 LOG .info ("Registered {} for bucket {} in Zookeeper." , leaderAndIsr , tableBucket );
405409 }
406410
@@ -479,24 +483,20 @@ public Map<TableBucket, LeaderAndIsr> getLeaderAndIsrs(Collection<TableBucket> t
479483 }
480484
481485 public void updateLeaderAndIsr (
482- TableBucket tableBucket , LeaderAndIsr leaderAndIsr , int currentCoordinatorEpoch )
486+ TableBucket tableBucket , LeaderAndIsr leaderAndIsr , int expectedZkVersion )
483487 throws Exception {
484- // check coordinator epoch to ensure no other Coordinator leader exists.
485- if (leaderAndIsr .coordinatorEpoch () != currentCoordinatorEpoch ) {
486- throw new InvalidCoordinatorException (
487- String .format (
488- "LeaderAndIsr coordinator epoch %d does not match current coordinator epoch %d for bucket %s. "
489- + "This coordinator may no longer be the leader." ,
490- leaderAndIsr .coordinatorEpoch (), currentCoordinatorEpoch , tableBucket ));
491- }
492-
493488 String path = LeaderAndIsrZNode .path (tableBucket );
494- zkClient .setData ().forPath (path , LeaderAndIsrZNode .encode (leaderAndIsr ));
489+ byte [] data = LeaderAndIsrZNode .encode (leaderAndIsr );
490+
491+ CuratorOp updateOp = zkOp .updateOp (path , data );
492+ List <CuratorOp > ops = wrapRequestWithCoordinatorEpochCheck (updateOp , expectedZkVersion );
493+
494+ zkClient .transaction ().forOperations (ops );
495495 LOG .info ("Updated {} for bucket {} in Zookeeper." , leaderAndIsr , tableBucket );
496496 }
497497
498498 public void batchUpdateLeaderAndIsr (
499- Map <TableBucket , LeaderAndIsr > leaderAndIsrList , int currentCoordinatorEpoch )
499+ Map <TableBucket , LeaderAndIsr > leaderAndIsrList , int expectedZkVersion )
500500 throws Exception {
501501 if (leaderAndIsrList .isEmpty ()) {
502502 return ;
@@ -506,30 +506,21 @@ public void batchUpdateLeaderAndIsr(
506506 for (Map .Entry <TableBucket , LeaderAndIsr > entry : leaderAndIsrList .entrySet ()) {
507507 TableBucket tableBucket = entry .getKey ();
508508 LeaderAndIsr leaderAndIsr = entry .getValue ();
509-
510- // check coordinator epoch to ensure no other Coordinator leader exists.
511- if (leaderAndIsr .coordinatorEpoch () != currentCoordinatorEpoch ) {
512- throw new InvalidCoordinatorException (
513- String .format (
514- "LeaderAndIsr coordinator epoch %d does not match current coordinator epoch %d for bucket %s. "
515- + "This coordinator may no longer be the leader." ,
516- leaderAndIsr .coordinatorEpoch (),
517- currentCoordinatorEpoch ,
518- tableBucket ));
519- }
520-
521509 LOG .info ("Batch Update {} for bucket {} in Zookeeper." , leaderAndIsr , tableBucket );
522510 String path = LeaderAndIsrZNode .path (tableBucket );
523511 byte [] data = LeaderAndIsrZNode .encode (leaderAndIsr );
524512 CuratorOp updateOp = zkClient .transactionOp ().setData ().forPath (path , data );
525513 ops .add (updateOp );
526- if (ops .size () == MAX_BATCH_SIZE ) {
527- zkClient .transaction ().forOperations (ops );
514+ if (ops .size () == MAX_BATCH_SIZE - 1 ) {
515+ List <CuratorOp > wrapOps =
516+ wrapRequestsWithCoordinatorEpochCheck (ops , expectedZkVersion );
517+ zkClient .transaction ().forOperations (wrapOps );
528518 ops .clear ();
529519 }
530520 }
531521 if (!ops .isEmpty ()) {
532- zkClient .transaction ().forOperations (ops );
522+ List <CuratorOp > wrapOps = wrapRequestsWithCoordinatorEpochCheck (ops , expectedZkVersion );
523+ zkClient .transaction ().forOperations (wrapOps );
533524 }
534525 }
535526
@@ -1609,4 +1600,54 @@ public static <K> Map<K, List<String>> processGetChildrenResponses(
16091600 }
16101601 return result ;
16111602 }
1603+
1604+ public void createRecursive (
1605+ String path , byte [] data , int expectedZkVersion , boolean throwIfPathExists )
1606+ throws Exception {
1607+ CuratorOp createOp = zkOp .createOp (path , data , CreateMode .PERSISTENT );
1608+ List <CuratorOp > ops = wrapRequestWithCoordinatorEpochCheck (createOp , expectedZkVersion );
1609+
1610+ try {
1611+ // try to directly create
1612+ zkClient .transaction ().forOperations (ops );
1613+ } catch (KeeperException .NodeExistsException e ) {
1614+ // should not exist
1615+ if (throwIfPathExists ) {
1616+ throw e ;
1617+ }
1618+ } catch (KeeperException .NoNodeException e ) {
1619+ // if parent does not exist, create parent first
1620+ int indexOfLastSlash = path .lastIndexOf ("/" );
1621+ if (indexOfLastSlash == -1 ) {
1622+ throw new IllegalArgumentException ("Invalid path {}" + path );
1623+ }
1624+ String parentPath = path .substring (0 , indexOfLastSlash );
1625+ createRecursive (parentPath , null , expectedZkVersion , throwIfPathExists );
1626+ // After creating parent, retry creating the original path
1627+ zkClient .transaction ().forOperations (ops );
1628+ }
1629+ }
1630+
1631+ public List <CuratorOp > wrapRequestWithCoordinatorEpochCheck (
1632+ CuratorOp request , int expectedZkVersion ) throws Exception {
1633+ return wrapRequestsWithCoordinatorEpochCheck (
1634+ Collections .singletonList (request ), expectedZkVersion );
1635+ }
1636+
1637+ public List <CuratorOp > wrapRequestsWithCoordinatorEpochCheck (
1638+ List <CuratorOp > requestList , int expectedZkVersion ) throws Exception {
1639+ if (ZkVersion .MATCH_ANY_VERSION .getVersion () == expectedZkVersion ) {
1640+ return requestList ;
1641+ } else if (expectedZkVersion >= 0 ) {
1642+ CuratorOp checkOp =
1643+ zkOp .checkOp (ZkData .CoordinatorEpochZNode .path (), expectedZkVersion );
1644+ return multiRequest (checkOp , requestList );
1645+ } else {
1646+ throw new IllegalArgumentException (
1647+ "Expected coordinator epoch zkVersion "
1648+ + expectedZkVersion
1649+ + " should be non-negative or equal to "
1650+ + ZkVersion .MATCH_ANY_VERSION .getVersion ());
1651+ }
1652+ }
16121653}
0 commit comments