Skip to content

Commit 7d3247a

Browse files
committed
[server] Remove TableInfo from server MetadataCache
1 parent a7bf007 commit 7d3247a

File tree

10 files changed

+316
-35
lines changed

10 files changed

+316
-35
lines changed

fluss-client/src/test/java/org/apache/fluss/client/metadata/MetadataUpdaterTest.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.fluss.cluster.Cluster;
2525
import org.apache.fluss.cluster.ServerNode;
2626
import org.apache.fluss.config.Configuration;
27+
import org.apache.fluss.metadata.TableInfo;
2728
import org.apache.fluss.metadata.TablePath;
2829
import org.apache.fluss.rpc.RpcClient;
2930
import org.apache.fluss.server.testutils.FlussClusterExtension;
@@ -51,6 +52,7 @@ void testRebuildClusterNTimes() throws Exception {
5152
Admin admin = conn.getAdmin();
5253
TablePath tablePath = TablePath.of("fluss", "test");
5354
admin.createTable(tablePath, DATA1_TABLE_DESCRIPTOR, true).get();
55+
TableInfo tableInfo = admin.getTableInfo(tablePath).get();
5456
admin.close();
5557
conn.close();
5658

@@ -59,6 +61,7 @@ void testRebuildClusterNTimes() throws Exception {
5961
// update metadata
6062
metadataUpdater.updateMetadata(Collections.singleton(tablePath), null, null);
6163
Cluster cluster = metadataUpdater.getCluster();
64+
assertThat(cluster.getTable(tablePath).get()).isEqualTo(tableInfo);
6265

6366
// repeat 20K times to reproduce StackOverflowError if there is
6467
// any N levels UnmodifiableCollection

fluss-rpc/src/main/proto/FlussApi.proto

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,7 @@ message UpdateMetadataRequest {
173173
repeated PbTableMetadata table_metadata = 3;
174174
repeated PbPartitionMetadata partition_metadata = 4;
175175
optional int32 coordinator_epoch = 5;
176+
repeated PbTableMetadataV2 table_metadata_v2 = 6;
176177
}
177178

178179
message UpdateMetadataResponse {
@@ -575,6 +576,19 @@ message PbTableMetadata {
575576
// trace by: https://github.com/apache/fluss/issues/981
576577
}
577578

579+
message PbTableMetadataV2 {
580+
required PbTablePath table_path = 1;
581+
required int64 table_id = 2;
582+
required int32 schema_id = 3;
583+
optional bytes table_json = 4;
584+
repeated PbBucketMetadata bucket_metadata = 5;
585+
required int64 created_time = 6;
586+
required int64 modified_time = 7;
587+
588+
// TODO add a new filed 'deleted_table' to indicate this table is deleted in UpdateMetadataRequest.
589+
// trace by: https://github.com/alibaba/fluss/issues/981
590+
}
591+
578592
message PbPartitionMetadata {
579593
required int64 table_id = 1;
580594
// the partition name and id for the partition

fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorRequestBatch.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -618,7 +618,14 @@ private UpdateMetadataRequest buildUpdateMetadataRequest() {
618618
(tableId, bucketMetadataList) -> {
619619
TableInfo tableInfo = getTableInfo(tableId);
620620
if (tableInfo != null) {
621-
tableMetadataList.add(new TableMetadata(tableInfo, bucketMetadataList));
621+
tableMetadataList.add(
622+
new TableMetadata(
623+
tableInfo.getTableId(),
624+
tableInfo.getSchemaId(),
625+
tableInfo.getCreatedTime(),
626+
tableInfo.getModifiedTime(),
627+
tableInfo.getTablePath(),
628+
bucketMetadataList));
622629
}
623630
});
624631

@@ -662,7 +669,13 @@ private UpdateMetadataRequest buildUpdateMetadataRequest() {
662669
TableInfo tableInfo = getTableInfo(tableId);
663670
if (tableInfo != null) {
664671
tableMetadataList.add(
665-
new TableMetadata(getTableInfo(tableId), Collections.emptyList()));
672+
new TableMetadata(
673+
tableInfo.getTableId(),
674+
tableInfo.getSchemaId(),
675+
tableInfo.getCreatedTime(),
676+
tableInfo.getModifiedTime(),
677+
tableInfo.getTablePath(),
678+
Collections.emptyList()));
666679
}
667680
});
668681

fluss-server/src/main/java/org/apache/fluss/server/metadata/TableMetadata.java

Lines changed: 95 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
import org.apache.fluss.metadata.TableInfo;
2121
import org.apache.fluss.metadata.TablePath;
2222

23+
import javax.annotation.Nullable;
24+
2325
import java.util.List;
2426
import java.util.Objects;
2527

@@ -40,7 +42,18 @@ public class TableMetadata {
4042
*/
4143
public static final Long DELETED_TABLE_ID = -2L;
4244

43-
private final TableInfo tableInfo;
45+
private final long tableId;
46+
47+
private final int schemaId;
48+
49+
private final long createdTime;
50+
51+
private final long modifiedTime;
52+
53+
private final TablePath tablePath;
54+
55+
/** Will only be set for {@link org.apache.fluss.rpc.messages.MetadataResponse}. */
56+
private final @Nullable TableInfo tableInfo;
4457

4558
/**
4659
* For partition table, this list is always empty. The detail partition metadata is stored in
@@ -53,10 +66,64 @@ public class TableMetadata {
5366
private final List<BucketMetadata> bucketMetadataList;
5467

5568
public TableMetadata(TableInfo tableInfo, List<BucketMetadata> bucketMetadataList) {
56-
this.tableInfo = tableInfo;
69+
this(
70+
tableInfo.getTableId(),
71+
tableInfo.getSchemaId(),
72+
tableInfo.getCreatedTime(),
73+
tableInfo.getModifiedTime(),
74+
tableInfo.getTablePath(),
75+
bucketMetadataList,
76+
tableInfo);
77+
}
78+
79+
public TableMetadata(
80+
long tableId,
81+
int schemaId,
82+
long createdTime,
83+
long modifiedTime,
84+
TablePath tablePath,
85+
List<BucketMetadata> bucketMetadataList) {
86+
this(tableId, schemaId, createdTime, modifiedTime, tablePath, bucketMetadataList, null);
87+
}
88+
89+
public TableMetadata(
90+
long tableId,
91+
int schemaId,
92+
long createdTime,
93+
long modifiedTime,
94+
TablePath tablePath,
95+
List<BucketMetadata> bucketMetadataList,
96+
TableInfo tableInfo) {
97+
this.tableId = tableId;
98+
this.schemaId = schemaId;
99+
this.createdTime = createdTime;
100+
this.modifiedTime = modifiedTime;
101+
this.tablePath = tablePath;
57102
this.bucketMetadataList = bucketMetadataList;
103+
this.tableInfo = tableInfo;
58104
}
59105

106+
public long getTableId() {
107+
return tableId;
108+
}
109+
110+
public int getSchemaId() {
111+
return schemaId;
112+
}
113+
114+
public long getCreatedTime() {
115+
return createdTime;
116+
}
117+
118+
public long getModifiedTime() {
119+
return modifiedTime;
120+
}
121+
122+
public TablePath getTablePath() {
123+
return tablePath;
124+
}
125+
126+
@Nullable
60127
public TableInfo getTableInfo() {
61128
return tableInfo;
62129
}
@@ -68,7 +135,17 @@ public List<BucketMetadata> getBucketMetadataList() {
68135
@Override
69136
public String toString() {
70137
return "TableMetadata{"
71-
+ "tableInfo="
138+
+ "tableId="
139+
+ tableId
140+
+ ", schemaId="
141+
+ schemaId
142+
+ ", createdTime="
143+
+ createdTime
144+
+ ", modifiedTime="
145+
+ modifiedTime
146+
+ ", tablePath="
147+
+ tablePath
148+
+ ", tableInfo="
72149
+ tableInfo
73150
+ ", bucketMetadataList="
74151
+ bucketMetadataList
@@ -77,21 +154,28 @@ public String toString() {
77154

78155
@Override
79156
public boolean equals(Object o) {
80-
if (this == o) {
81-
return true;
82-
}
83157
if (o == null || getClass() != o.getClass()) {
84158
return false;
85159
}
86160
TableMetadata that = (TableMetadata) o;
87-
if (!tableInfo.equals(that.tableInfo)) {
88-
return false;
89-
}
90-
return bucketMetadataList.equals(that.bucketMetadataList);
161+
return tableId == that.tableId
162+
&& schemaId == that.schemaId
163+
&& createdTime == that.createdTime
164+
&& modifiedTime == that.modifiedTime
165+
&& Objects.equals(tablePath, that.tablePath)
166+
&& Objects.equals(tableInfo, that.tableInfo)
167+
&& Objects.equals(bucketMetadataList, that.bucketMetadataList);
91168
}
92169

93170
@Override
94171
public int hashCode() {
95-
return Objects.hash(tableInfo, bucketMetadataList);
172+
return Objects.hash(
173+
tableId,
174+
schemaId,
175+
createdTime,
176+
modifiedTime,
177+
tablePath,
178+
tableInfo,
179+
bucketMetadataList);
96180
}
97181
}

fluss-server/src/main/java/org/apache/fluss/server/metadata/TabletServerMetadataCache.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,10 @@ public Optional<PhysicalTablePath> getPhysicalTablePath(long partitionId) {
115115
return serverMetadataSnapshot.getPhysicalTablePath(partitionId);
116116
}
117117

118+
public Map<Integer, BucketMetadata> getBucketMetadataForTable(long tableId) {
119+
return serverMetadataSnapshot.getBucketMetadataForTable(tableId);
120+
}
121+
118122
public TableMetadata getTableMetadata(TablePath tablePath) {
119123
// always get table info from zk.
120124
TableInfo tableInfo = metadataManager.getTable(tablePath);
@@ -187,9 +191,8 @@ public void updateClusterMetadata(ClusterMetadata clusterMetadata) {
187191
new HashMap<>(serverMetadataSnapshot.getBucketMetadataMapForTables());
188192

189193
for (TableMetadata tableMetadata : clusterMetadata.getTableMetadataList()) {
190-
TableInfo tableInfo = tableMetadata.getTableInfo();
191-
TablePath tablePath = tableInfo.getTablePath();
192-
long tableId = tableInfo.getTableId();
194+
TablePath tablePath = tableMetadata.getTablePath();
195+
long tableId = tableMetadata.getTableId();
193196
if (tableId == DELETED_TABLE_ID) {
194197
Long removedTableId = tableIdByPath.remove(tablePath);
195198
if (removedTableId != null) {

0 commit comments

Comments
 (0)