2020import org .apache .fluss .annotation .Internal ;
2121import org .apache .fluss .config .ConfigOptions ;
2222import org .apache .fluss .config .Configuration ;
23- import org .apache .fluss .exception .InvalidCoordinatorException ;
2423import org .apache .fluss .metadata .PhysicalTablePath ;
2524import org .apache .fluss .metadata .ResolvedPartitionSpec ;
2625import org .apache .fluss .metadata .Schema ;
7574import org .apache .fluss .server .zk .data .ZkData .TableZNode ;
7675import org .apache .fluss .server .zk .data .ZkData .TablesZNode ;
7776import org .apache .fluss .server .zk .data .ZkData .WriterIdZNode ;
77+ import org .apache .fluss .server .zk .data .ZkVersion ;
7878import org .apache .fluss .shaded .curator5 .org .apache .curator .framework .CuratorFramework ;
7979import org .apache .fluss .shaded .curator5 .org .apache .curator .framework .api .BackgroundCallback ;
8080import org .apache .fluss .shaded .curator5 .org .apache .curator .framework .api .CuratorEvent ;
112112
113113import static java .util .stream .Collectors .toMap ;
114114import static org .apache .fluss .metadata .ResolvedPartitionSpec .fromPartitionName ;
115+ import static org .apache .fluss .server .zk .ZooKeeperOp .multiRequest ;
115116import static org .apache .fluss .utils .Preconditions .checkNotNull ;
116117
117118/**
@@ -129,6 +130,7 @@ public class ZooKeeperClient implements AutoCloseable {
129130 private final CuratorFrameworkWithUnhandledErrorListener curatorFrameworkWrapper ;
130131
131132 private final CuratorFramework zkClient ;
133+ private final ZooKeeperOp zkOp ;
132134 private final ZkSequenceIDCounter tableIdCounter ;
133135 private final ZkSequenceIDCounter partitionIdCounter ;
134136 private final ZkSequenceIDCounter writerIdCounter ;
@@ -140,6 +142,7 @@ public ZooKeeperClient(
140142 Configuration configuration ) {
141143 this .curatorFrameworkWrapper = curatorFrameworkWrapper ;
142144 this .zkClient = curatorFrameworkWrapper .asCuratorFramework ();
145+ this .zkOp = new ZooKeeperOp (zkClient );
143146 this .tableIdCounter = new ZkSequenceIDCounter (zkClient , TableSequenceIdZNode .path ());
144147 this .partitionIdCounter =
145148 new ZkSequenceIDCounter (zkClient , PartitionSequenceIdZNode .path ());
@@ -394,13 +397,22 @@ public void deletePartitionAssignment(long partitionId) throws Exception {
394397 // --------------------------------------------------------------------------------------------
395398
396399 /** Register bucket LeaderAndIsr to ZK. */
397- public void registerLeaderAndIsr (TableBucket tableBucket , LeaderAndIsr leaderAndIsr )
400+ public void registerLeaderAndIsr (
401+ TableBucket tableBucket , LeaderAndIsr leaderAndIsr , int expectedZkVersion )
398402 throws Exception {
403+
399404 String path = LeaderAndIsrZNode .path (tableBucket );
400- zkClient .create ()
401- .creatingParentsIfNeeded ()
402- .withMode (CreateMode .PERSISTENT )
403- .forPath (path , LeaderAndIsrZNode .encode (leaderAndIsr ));
405+ byte [] data = LeaderAndIsrZNode .encode (leaderAndIsr );
406+
407+ createRecursive (path , data , expectedZkVersion , false );
408+ // List<CuratorTransactionResult> transactionResultList =
409+ // zkClient.transaction().forOperations(ops);
410+ //
411+ // String path = LeaderAndIsrZNode.path(tableBucket);
412+ // zkClient.create()
413+ // .creatingParentsIfNeeded()
414+ // .withMode(CreateMode.PERSISTENT)
415+ // .forPath(path, LeaderAndIsrZNode.encode(leaderAndIsr));
404416 LOG .info ("Registered {} for bucket {} in Zookeeper." , leaderAndIsr , tableBucket );
405417 }
406418
@@ -478,25 +490,14 @@ public Map<TableBucket, LeaderAndIsr> getLeaderAndIsrs(Collection<TableBucket> t
478490 "leader and isr" );
479491 }
480492
481- public void updateLeaderAndIsr (
482- TableBucket tableBucket , LeaderAndIsr leaderAndIsr , int currentCoordinatorEpoch )
493+ public void updateLeaderAndIsr (TableBucket tableBucket , LeaderAndIsr leaderAndIsr )
483494 throws Exception {
484- // check coordinator epoch to ensure no other Coordinator leader exists.
485- if (leaderAndIsr .coordinatorEpoch () != currentCoordinatorEpoch ) {
486- throw new InvalidCoordinatorException (
487- String .format (
488- "LeaderAndIsr coordinator epoch %d does not match current coordinator epoch %d for bucket %s. "
489- + "This coordinator may no longer be the leader." ,
490- leaderAndIsr .coordinatorEpoch (), currentCoordinatorEpoch , tableBucket ));
491- }
492-
493495 String path = LeaderAndIsrZNode .path (tableBucket );
494496 zkClient .setData ().forPath (path , LeaderAndIsrZNode .encode (leaderAndIsr ));
495497 LOG .info ("Updated {} for bucket {} in Zookeeper." , leaderAndIsr , tableBucket );
496498 }
497499
498- public void batchUpdateLeaderAndIsr (
499- Map <TableBucket , LeaderAndIsr > leaderAndIsrList , int currentCoordinatorEpoch )
500+ public void batchUpdateLeaderAndIsr (Map <TableBucket , LeaderAndIsr > leaderAndIsrList )
500501 throws Exception {
501502 if (leaderAndIsrList .isEmpty ()) {
502503 return ;
@@ -506,18 +507,6 @@ public void batchUpdateLeaderAndIsr(
506507 for (Map .Entry <TableBucket , LeaderAndIsr > entry : leaderAndIsrList .entrySet ()) {
507508 TableBucket tableBucket = entry .getKey ();
508509 LeaderAndIsr leaderAndIsr = entry .getValue ();
509-
510- // check coordinator epoch to ensure no other Coordinator leader exists.
511- if (leaderAndIsr .coordinatorEpoch () != currentCoordinatorEpoch ) {
512- throw new InvalidCoordinatorException (
513- String .format (
514- "LeaderAndIsr coordinator epoch %d does not match current coordinator epoch %d for bucket %s. "
515- + "This coordinator may no longer be the leader." ,
516- leaderAndIsr .coordinatorEpoch (),
517- currentCoordinatorEpoch ,
518- tableBucket ));
519- }
520-
521510 LOG .info ("Batch Update {} for bucket {} in Zookeeper." , leaderAndIsr , tableBucket );
522511 String path = LeaderAndIsrZNode .path (tableBucket );
523512 byte [] data = LeaderAndIsrZNode .encode (leaderAndIsr );
@@ -1609,4 +1598,48 @@ public static <K> Map<K, List<String>> processGetChildrenResponses(
16091598 }
16101599 return result ;
16111600 }
1601+
1602+ public void createRecursive (
1603+ String path , byte [] data , int expectedZkVersion , boolean throwIfPathExists )
1604+ throws Exception {
1605+ List <CuratorOp > ops = wrapCheckAndCreateRequest (path , data , expectedZkVersion );
1606+
1607+ try {
1608+ // try to directly create
1609+ zkClient .transaction ().forOperations (ops );
1610+ } catch (KeeperException .NodeExistsException e ) {
1611+ // should not exist
1612+ if (throwIfPathExists ) {
1613+ throw e ;
1614+ }
1615+ } catch (KeeperException .NoNodeException e ) {
1616+ // if parent does not exist, create parent first
1617+ int indexOfLastSlash = path .lastIndexOf ("/" );
1618+ if (indexOfLastSlash == -1 ) {
1619+ throw new IllegalArgumentException ("Invalid path {}" + path );
1620+ }
1621+ String parentPath = path .substring (0 , indexOfLastSlash );
1622+ createRecursive (parentPath , null , expectedZkVersion , throwIfPathExists );
1623+ // After creating parent, retry creating the original path
1624+ zkClient .transaction ().forOperations (ops );
1625+ }
1626+ }
1627+
1628+ public List <CuratorOp > wrapCheckAndCreateRequest (
1629+ String path , byte [] data , int expectedZkVersion ) throws Exception {
1630+ if (ZkVersion .MATCH_ANY_VERSION .getVersion () == expectedZkVersion ) {
1631+ return Collections .singletonList (zkOp .createOp (path , data , CreateMode .PERSISTENT ));
1632+ } else if (expectedZkVersion >= 0 ) {
1633+ CuratorOp checkOp =
1634+ zkOp .checkOp (ZkData .CoordinatorEpochZNode .path (), expectedZkVersion );
1635+ CuratorOp createOp = zkOp .createOp (path , data , CreateMode .PERSISTENT );
1636+ return multiRequest (checkOp , createOp );
1637+ } else {
1638+ throw new IllegalArgumentException (
1639+ "Expected coordinator epoch zkVersion "
1640+ + expectedZkVersion
1641+ + " should be non-negative or equal to "
1642+ + ZkVersion .MATCH_ANY_VERSION .getVersion ());
1643+ }
1644+ }
16121645}
0 commit comments