3131import com .alibaba .fluss .metadata .DataLakeFormat ;
3232import com .alibaba .fluss .metadata .DatabaseDescriptor ;
3333import com .alibaba .fluss .metadata .PartitionSpec ;
34+ import com .alibaba .fluss .metadata .PhysicalTablePath ;
3435import com .alibaba .fluss .metadata .ResolvedPartitionSpec ;
36+ import com .alibaba .fluss .metadata .TableBucket ;
3537import com .alibaba .fluss .metadata .TableDescriptor ;
38+ import com .alibaba .fluss .metadata .TableInfo ;
39+ import com .alibaba .fluss .metadata .TablePartition ;
3640import com .alibaba .fluss .metadata .TablePath ;
3741import com .alibaba .fluss .rpc .gateway .CoordinatorGateway ;
3842import com .alibaba .fluss .rpc .messages .AdjustIsrRequest ;
6165import com .alibaba .fluss .rpc .messages .DropTableResponse ;
6266import com .alibaba .fluss .rpc .messages .LakeTieringHeartbeatRequest ;
6367import com .alibaba .fluss .rpc .messages .LakeTieringHeartbeatResponse ;
68+ import com .alibaba .fluss .rpc .messages .MetadataRequest ;
69+ import com .alibaba .fluss .rpc .messages .MetadataResponse ;
6470import com .alibaba .fluss .rpc .messages .PbHeartbeatReqForTable ;
6571import com .alibaba .fluss .rpc .messages .PbHeartbeatRespForTable ;
72+ import com .alibaba .fluss .rpc .netty .server .Session ;
6673import com .alibaba .fluss .rpc .protocol .ApiError ;
6774import com .alibaba .fluss .security .acl .AclBinding ;
6875import com .alibaba .fluss .security .acl .AclBindingFilter ;
7279import com .alibaba .fluss .server .authorizer .AclCreateResult ;
7380import com .alibaba .fluss .server .authorizer .AclDeleteResult ;
7481import com .alibaba .fluss .server .authorizer .Authorizer ;
82+ import com .alibaba .fluss .server .coordinator .event .AccessContextEvent ;
7583import com .alibaba .fluss .server .coordinator .event .AdjustIsrReceivedEvent ;
7684import com .alibaba .fluss .server .coordinator .event .CommitKvSnapshotEvent ;
7785import com .alibaba .fluss .server .coordinator .event .CommitLakeTableSnapshotEvent ;
8189import com .alibaba .fluss .server .entity .LakeTieringTableInfo ;
8290import com .alibaba .fluss .server .kv .snapshot .CompletedSnapshot ;
8391import com .alibaba .fluss .server .kv .snapshot .CompletedSnapshotJsonSerde ;
92+ import com .alibaba .fluss .server .metadata .BucketMetadata ;
93+ import com .alibaba .fluss .server .metadata .PartitionMetadata ;
8494import com .alibaba .fluss .server .metadata .ServerMetadataCache ;
95+ import com .alibaba .fluss .server .metadata .TableMetadata ;
8596import com .alibaba .fluss .server .zk .ZooKeeperClient ;
8697import com .alibaba .fluss .server .zk .data .BucketAssignment ;
98+ import com .alibaba .fluss .server .zk .data .LeaderAndIsr ;
8799import com .alibaba .fluss .server .zk .data .PartitionAssignment ;
88100import com .alibaba .fluss .server .zk .data .TableAssignment ;
89101import com .alibaba .fluss .server .zk .data .TableRegistration ;
93105import javax .annotation .Nullable ;
94106
95107import java .io .UncheckedIOException ;
108+ import java .util .ArrayList ;
96109import java .util .HashMap ;
97110import java .util .List ;
98111import java .util .Map ;
112+ import java .util .Optional ;
99113import java .util .concurrent .CompletableFuture ;
100114import java .util .function .Supplier ;
101115
@@ -121,6 +135,7 @@ public final class CoordinatorService extends RpcServiceBase implements Coordina
121135 private final int defaultReplicationFactor ;
122136 private final Supplier <EventManager > eventManagerSupplier ;
123137 private final Supplier <Integer > coordinatorEpochSupplier ;
138+ private final ServerMetadataCache metadataCache ;
124139
125140 // null if the cluster hasn't configured datalake format
126141 private final @ Nullable DataLakeFormat dataLakeFormat ;
@@ -137,13 +152,7 @@ public CoordinatorService(
137152 @ Nullable Authorizer authorizer ,
138153 @ Nullable LakeCatalog lakeCatalog ,
139154 LakeTableTieringManager lakeTableTieringManager ) {
140- super (
141- remoteFileSystem ,
142- ServerType .COORDINATOR ,
143- zkClient ,
144- metadataCache ,
145- metadataManager ,
146- authorizer );
155+ super (remoteFileSystem , ServerType .COORDINATOR , zkClient , metadataManager , authorizer );
147156 this .defaultBucketNumber = conf .getInt (ConfigOptions .DEFAULT_BUCKET_NUMBER );
148157 this .defaultReplicationFactor = conf .getInt (ConfigOptions .DEFAULT_REPLICATION_FACTOR );
149158 this .eventManagerSupplier =
@@ -153,6 +162,7 @@ public CoordinatorService(
153162 this .dataLakeFormat = conf .getOptional (ConfigOptions .DATALAKE_FORMAT ).orElse (null );
154163 this .lakeCatalog = lakeCatalog ;
155164 this .lakeTableTieringManager = lakeTableTieringManager ;
165+ this .metadataCache = metadataCache ;
156166 checkState (
157167 (dataLakeFormat == null ) == (lakeCatalog == null ),
158168 "dataLakeFormat and lakeCatalog must both be null or both non-null, but dataLakeFormat is %s, lakeCatalog is %s." ,
@@ -406,6 +416,28 @@ public CompletableFuture<DropPartitionResponse> dropPartition(DropPartitionReque
406416 return CompletableFuture .completedFuture (response );
407417 }
408418
419+ @ Override
420+ public CompletableFuture <MetadataResponse > metadata (MetadataRequest request ) {
421+ String listenerName = currentListenerName ();
422+ Session session = currentSession ();
423+
424+ AccessContextEvent <MetadataResponse > metadataResponseAccessContextEvent =
425+ new AccessContextEvent <>(
426+ ctx ->
427+ makeMetadataResponse (
428+ request ,
429+ listenerName ,
430+ session ,
431+ authorizer ,
432+ metadataCache ,
433+ (tablePath ) -> getTableMetadata (ctx , tablePath ),
434+ ctx ::getPhysicalTablePath ,
435+ (physicalTablePath ) ->
436+ getPartitionMetadata (ctx , physicalTablePath )));
437+ eventManagerSupplier .get ().put (metadataResponseAccessContextEvent );
438+ return metadataResponseAccessContextEvent .getResultFuture ();
439+ }
440+
409441 public CompletableFuture <AdjustIsrResponse > adjustIsr (AdjustIsrRequest request ) {
410442 CompletableFuture <AdjustIsrResponse > response = new CompletableFuture <>();
411443 eventManagerSupplier
@@ -545,4 +577,76 @@ private void validateHeartbeatRequest(
545577 heartbeatReqForTable .getTableId ()));
546578 }
547579 }
580+
581+ private TableMetadata getTableMetadata (CoordinatorContext ctx , TablePath tablePath ) {
582+ // always get table info from zk.
583+ TableInfo tableInfo = metadataManager .getTable (tablePath );
584+ long tableId = ctx .getTableIdByPath (tablePath );
585+ List <BucketMetadata > bucketMetadataList ;
586+ if (tableId == TableInfo .UNKNOWN_TABLE_ID ) {
587+ // TODO no need to get assignment from zk if refactor client metadata cache. Trace by
588+ // https://github.com/alibaba/fluss/issues/483
589+ // get table assignment from zk.
590+ bucketMetadataList =
591+ getTableMetadataFromZk (
592+ zkClient , tablePath , tableInfo .getTableId (), tableInfo .isPartitioned ());
593+ } else {
594+ // get table assignment from coordinatorContext.
595+ bucketMetadataList =
596+ getBucketMetadataFromContext (
597+ ctx , tableId , null , ctx .getTableAssignment (tableId ));
598+ }
599+ return new TableMetadata (tableInfo , bucketMetadataList );
600+ }
601+
602+ private PartitionMetadata getPartitionMetadata (
603+ CoordinatorContext ctx , PhysicalTablePath partitionPath ) {
604+ TablePath tablePath =
605+ new TablePath (partitionPath .getDatabaseName (), partitionPath .getTableName ());
606+ String partitionName = partitionPath .getPartitionName ();
607+ long tableId = ctx .getTableIdByPath (tablePath );
608+ if (tableId == TableInfo .UNKNOWN_TABLE_ID ) {
609+ // TODO no need to get assignment from zk if refactor client metadata cache. Trace by
610+ // https://github.com/alibaba/fluss/issues/483
611+ return getPartitionMetadataFromZk (partitionPath , zkClient );
612+ } else {
613+ Optional <Long > partitionIdOpt = ctx .getPartitionId (partitionPath );
614+ if (partitionIdOpt .isPresent ()) {
615+ long partitionId = partitionIdOpt .get ();
616+ List <BucketMetadata > bucketMetadataList =
617+ getBucketMetadataFromContext (
618+ ctx ,
619+ tableId ,
620+ partitionId ,
621+ ctx .getPartitionAssignment (
622+ new TablePartition (tableId , partitionId )));
623+ return new PartitionMetadata (
624+ tableId , partitionName , partitionId , bucketMetadataList );
625+ } else {
626+ return getPartitionMetadataFromZk (partitionPath , zkClient );
627+ }
628+ }
629+ }
630+
631+ private static List <BucketMetadata > getBucketMetadataFromContext (
632+ CoordinatorContext ctx ,
633+ long tableId ,
634+ @ Nullable Long partitionId ,
635+ Map <Integer , List <Integer >> tableAssigment ) {
636+ List <BucketMetadata > bucketMetadataList = new ArrayList <>();
637+ tableAssigment .forEach (
638+ (bucketId , serverIds ) -> {
639+ TableBucket tableBucket = new TableBucket (tableId , partitionId , bucketId );
640+ Optional <LeaderAndIsr > optLeaderAndIsr = ctx .getBucketLeaderAndIsr (tableBucket );
641+ Integer leader = optLeaderAndIsr .map (LeaderAndIsr ::leader ).orElse (null );
642+ BucketMetadata bucketMetadata =
643+ new BucketMetadata (
644+ bucketId ,
645+ leader ,
646+ ctx .getBucketLeaderEpoch (tableBucket ),
647+ serverIds );
648+ bucketMetadataList .add (bucketMetadata );
649+ });
650+ return bucketMetadataList ;
651+ }
548652}
0 commit comments