Skip to content

Commit 13115e4

Browse files
authored
[server] Support a consistent server cache across servers that contains table information (#927)
1 parent 2afb839 commit 13115e4

File tree

12 files changed

+832
-118
lines changed

12 files changed

+832
-118
lines changed

fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorContext.java

Lines changed: 36 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package com.alibaba.fluss.server.coordinator;
1818

1919
import com.alibaba.fluss.annotation.VisibleForTesting;
20+
import com.alibaba.fluss.metadata.PhysicalTablePath;
2021
import com.alibaba.fluss.metadata.TableBucket;
2122
import com.alibaba.fluss.metadata.TableBucketReplica;
2223
import com.alibaba.fluss.metadata.TableInfo;
@@ -79,10 +80,11 @@ public class CoordinatorContext {
7980
new HashMap<>();
8081
// a map from partition_id -> partition_name
8182
private final Map<Long, String> partitionNameById = new HashMap<>();
83+
private final Map<PhysicalTablePath, Long> partitionIdByPath = new HashMap<>();
8284

8385
// a map from table_id to the table path
8486
private final Map<Long, TablePath> tablePathById = new HashMap<>();
85-
// TODO: will be used in the future metadata cache
87+
private final Map<TablePath, Long> tableIdByPath = new HashMap<>();
8688
private final Map<Long, TableInfo> tableInfoById = new HashMap<>();
8789

8890
private final Map<TableBucket, LeaderAndIsr> bucketLeaderAndIsr = new HashMap<>();
@@ -219,14 +221,16 @@ public Set<TableBucketReplica> replicasOnTabletServer(int server) {
219221

220222
public void putTablePath(long tableId, TablePath tablePath) {
221223
this.tablePathById.put(tableId, tablePath);
224+
this.tableIdByPath.put(tablePath, tableId);
222225
}
223226

224227
public void putTableInfo(TableInfo tableInfo) {
225228
this.tableInfoById.put(tableInfo.getTableId(), tableInfo);
226229
}
227230

228-
public void putPartition(long partitionId, String partitionName) {
229-
this.partitionNameById.put(partitionId, partitionName);
231+
public void putPartition(long partitionId, PhysicalTablePath physicalTablePath) {
232+
this.partitionNameById.put(partitionId, physicalTablePath.getPartitionName());
233+
this.partitionIdByPath.put(physicalTablePath, partitionId);
230234
}
231235

232236
public TableInfo getTableInfoById(long tableId) {
@@ -237,6 +241,10 @@ public TablePath getTablePathById(long tableId) {
237241
return this.tablePathById.get(tableId);
238242
}
239243

244+
public Long getTableIdByPath(TablePath tablePath) {
245+
return tableIdByPath.getOrDefault(tablePath, TableInfo.UNKNOWN_TABLE_ID);
246+
}
247+
240248
public boolean containsTableId(long tableId) {
241249
return this.tablePathById.containsKey(tableId);
242250
}
@@ -249,6 +257,18 @@ public boolean containsPartitionId(long partitionId) {
249257
return this.partitionNameById.get(partitionId);
250258
}
251259

260+
public Optional<Long> getPartitionId(PhysicalTablePath physicalTablePath) {
261+
return Optional.ofNullable(partitionIdByPath.get(physicalTablePath));
262+
}
263+
264+
public Map<Integer, List<Integer>> getTableAssignment(long tableId) {
265+
return tableAssignments.getOrDefault(tableId, Collections.emptyMap());
266+
}
267+
268+
public Map<Integer, List<Integer>> getPartitionAssignment(TablePartition tablePartition) {
269+
return partitionAssignments.getOrDefault(tablePartition, Collections.emptyMap());
270+
}
271+
252272
public void updateBucketReplicaAssignment(
253273
TableBucket tableBucket, List<Integer> replicaAssignment) {
254274
Map<Integer, List<Integer>> assignments;
@@ -414,11 +434,6 @@ public void clearFailDeleteNumbers(Collection<TableBucketReplica> replicas) {
414434
}
415435
}
416436

417-
@VisibleForTesting
418-
protected Map<Integer, List<Integer>> getTableAssignment(long tableId) {
419-
return tableAssignments.getOrDefault(tableId, Collections.emptyMap());
420-
}
421-
422437
@VisibleForTesting
423438
protected int replicaCounts(long tableId) {
424439
return getTableAssignment(tableId).values().stream().mapToInt(List::size).sum();
@@ -429,11 +444,6 @@ protected int replicaCounts(TablePartition tablePartition) {
429444
return getPartitionAssignment(tablePartition).values().stream().mapToInt(List::size).sum();
430445
}
431446

432-
@VisibleForTesting
433-
protected Map<Integer, List<Integer>> getPartitionAssignment(TablePartition tablePartition) {
434-
return partitionAssignments.getOrDefault(tablePartition, Collections.emptyMap());
435-
}
436-
437447
public boolean isAnyReplicaInState(long tableId, ReplicaState replicaState) {
438448
return getAllReplicasForTable(tableId).stream()
439449
.anyMatch(replica -> getReplicaState(replica) == replicaState);
@@ -565,7 +575,11 @@ public void removeTable(long tableId) {
565575
.keySet()
566576
.forEach(bucket -> bucketLeaderAndIsr.remove(new TableBucket(tableId, bucket)));
567577
}
568-
tablePathById.remove(tableId);
578+
579+
TablePath tablePath = tablePathById.remove(tableId);
580+
if (tablePath != null) {
581+
tableIdByPath.remove(tablePath);
582+
}
569583
tableInfoById.remove(tableId);
570584
}
571585

@@ -584,7 +598,12 @@ public void removePartition(TablePartition tablePartition) {
584598
tablePartition.getPartitionId(),
585599
bucket)));
586600
}
587-
partitionNameById.remove(tablePartition.getPartitionId());
601+
602+
String partitionName = partitionNameById.remove(tablePartition.getPartitionId());
603+
if (partitionName != null) {
604+
TablePath tablePath = getTablePathById(tablePartition.getTableId());
605+
partitionIdByPath.remove(PhysicalTablePath.of(tablePath, partitionName));
606+
}
588607
}
589608

590609
private void clearTablesState() {
@@ -595,8 +614,10 @@ private void clearTablesState() {
595614
bucketStates.clear();
596615
replicaStates.clear();
597616
tablePathById.clear();
617+
tableIdByPath.clear();
598618
tableInfoById.clear();
599619
partitionNameById.clear();
620+
partitionIdByPath.clear();
600621
}
601622

602623
public void resetContext() {

fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorEventProcessor.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import com.alibaba.fluss.exception.InvalidCoordinatorException;
2828
import com.alibaba.fluss.exception.InvalidUpdateVersionException;
2929
import com.alibaba.fluss.exception.UnknownTableOrBucketException;
30+
import com.alibaba.fluss.metadata.PhysicalTablePath;
3031
import com.alibaba.fluss.metadata.TableBucket;
3132
import com.alibaba.fluss.metadata.TableBucketReplica;
3233
import com.alibaba.fluss.metadata.TableInfo;
@@ -336,7 +337,9 @@ private void initCoordinatorContext() throws Exception {
336337
zooKeeperClient.getPartitionNameAndIds(tablePath);
337338
for (Map.Entry<String, Long> partition : partitions.entrySet()) {
338339
// put partition info to coordinator context
339-
coordinatorContext.putPartition(partition.getValue(), partition.getKey());
340+
coordinatorContext.putPartition(
341+
partition.getValue(),
342+
PhysicalTablePath.of(tableInfo.getTablePath(), partition.getKey()));
340343
}
341344
// if the table is auto partition, put the partitions info
342345
if (tableInfo

fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/TableManager.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,8 @@ public void onCreateNewPartition(
134134
// put the bucket of the partition to context
135135
TableBucket tableBucket = new TableBucket(tableId, partitionId, bucket);
136136
coordinatorContext.updateBucketReplicaAssignment(tableBucket, replicas);
137-
coordinatorContext.putPartition(partitionId, partitionName);
137+
coordinatorContext.putPartition(
138+
partitionId, PhysicalTablePath.of(tablePath, partitionName));
138139
newTableBuckets.add(tableBucket);
139140
}
140141
onCreateNewTableBucket(tableId, newTableBuckets);

fluss-server/src/main/java/com/alibaba/fluss/server/metadata/BucketMetadata.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import javax.annotation.Nullable;
2020

2121
import java.util.List;
22+
import java.util.Objects;
2223
import java.util.OptionalInt;
2324

2425
/** This entity used to describe the bucket metadata. */
@@ -54,4 +55,38 @@ public OptionalInt getLeaderEpoch() {
5455
public List<Integer> getReplicas() {
5556
return replicas;
5657
}
58+
59+
@Override
60+
public String toString() {
61+
return "BucketMetadata{"
62+
+ "bucketId="
63+
+ bucketId
64+
+ ", leaderId="
65+
+ leaderId
66+
+ ", leaderEpoch="
67+
+ leaderEpoch
68+
+ ", replicas="
69+
+ replicas
70+
+ '}';
71+
}
72+
73+
@Override
74+
public boolean equals(Object o) {
75+
if (this == o) {
76+
return true;
77+
}
78+
if (o == null || getClass() != o.getClass()) {
79+
return false;
80+
}
81+
BucketMetadata that = (BucketMetadata) o;
82+
return bucketId == that.bucketId
83+
&& Objects.equals(leaderId, that.leaderId)
84+
&& Objects.equals(leaderEpoch, that.leaderEpoch)
85+
&& replicas.equals(that.replicas);
86+
}
87+
88+
@Override
89+
public int hashCode() {
90+
return Objects.hash(bucketId, leaderId, leaderEpoch, replicas);
91+
}
5792
}

fluss-server/src/main/java/com/alibaba/fluss/server/metadata/ServerMetadataSnapshot.java

Lines changed: 45 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import com.alibaba.fluss.cluster.ServerNode;
2121
import com.alibaba.fluss.cluster.TabletServerInfo;
2222
import com.alibaba.fluss.metadata.PhysicalTablePath;
23+
import com.alibaba.fluss.metadata.TableInfo;
2324
import com.alibaba.fluss.metadata.TablePath;
2425

2526
import javax.annotation.Nullable;
@@ -50,15 +51,24 @@ public class ServerMetadataSnapshot {
5051
private final Map<PhysicalTablePath, Long> partitionIdByPath;
5152
private final Map<Long, String> partitionNameById;
5253

53-
// TODO add detail metadata for table and partition, trace
54-
// by: https://github.com/alibaba/fluss/issues/900
54+
private final Map<Long, TableInfo> tableInfoByTableId;
55+
56+
// a map of bucket metadata of none-partition table, table_id -> <bucket, bucketMetadata>
57+
private final Map<Long, Map<Integer, BucketMetadata>> bucketMetadataMapForTables;
58+
59+
// a map of bucket metadata of partition table, partition_id -> <bucket,
60+
// bucketMetadata>
61+
private final Map<Long, Map<Integer, BucketMetadata>> bucketMetadataMapForPartitions;
5562

5663
public ServerMetadataSnapshot(
5764
@Nullable ServerInfo coordinatorServer,
5865
Map<Integer, ServerInfo> aliveTabletServers,
5966
Map<TablePath, Long> tableIdByPath,
6067
Map<Long, TablePath> pathByTableId,
61-
Map<PhysicalTablePath, Long> partitionIdByPath) {
68+
Map<PhysicalTablePath, Long> partitionIdByPath,
69+
Map<Long, TableInfo> tableInfoByTableId,
70+
Map<Long, Map<Integer, BucketMetadata>> bucketMetadataMapForTables,
71+
Map<Long, Map<Integer, BucketMetadata>> bucketMetadataMapForPartitions) {
6272
this.coordinatorServer = coordinatorServer;
6373
this.aliveTabletServers = Collections.unmodifiableMap(aliveTabletServers);
6474

@@ -72,6 +82,11 @@ public ServerMetadataSnapshot(
7282
tempPartitionNameById.put(
7383
partitionId, physicalTablePath.getPartitionName())));
7484
this.partitionNameById = Collections.unmodifiableMap(tempPartitionNameById);
85+
86+
this.tableInfoByTableId = Collections.unmodifiableMap(tableInfoByTableId);
87+
this.bucketMetadataMapForTables = Collections.unmodifiableMap(bucketMetadataMapForTables);
88+
this.bucketMetadataMapForPartitions =
89+
Collections.unmodifiableMap(bucketMetadataMapForPartitions);
7590
}
7691

7792
/** Create an empty cluster instance with no nodes and no table-buckets. */
@@ -81,6 +96,9 @@ public static ServerMetadataSnapshot empty() {
8196
Collections.emptyMap(),
8297
Collections.emptyMap(),
8398
Collections.emptyMap(),
99+
Collections.emptyMap(),
100+
Collections.emptyMap(),
101+
Collections.emptyMap(),
84102
Collections.emptyMap());
85103
}
86104

@@ -129,7 +147,6 @@ public Map<TablePath, Long> getTableIdByPath() {
129147
return tableIdByPath;
130148
}
131149

132-
/** Get the partition id for this partition. */
133150
public Optional<Long> getPartitionId(PhysicalTablePath physicalTablePath) {
134151
return Optional.ofNullable(partitionIdByPath.get(physicalTablePath));
135152
}
@@ -138,7 +155,31 @@ public Optional<String> getPartitionName(long partitionId) {
138155
return Optional.ofNullable(partitionNameById.get(partitionId));
139156
}
140157

158+
public Optional<TableInfo> getTableInfo(long tableId) {
159+
return Optional.ofNullable(tableInfoByTableId.get(tableId));
160+
}
161+
162+
public Map<Integer, BucketMetadata> getBucketMetadataForTable(long tableId) {
163+
return bucketMetadataMapForTables.getOrDefault(tableId, Collections.emptyMap());
164+
}
165+
166+
public Map<Integer, BucketMetadata> getBucketMetadataForPartition(long partitionId) {
167+
return bucketMetadataMapForPartitions.getOrDefault(partitionId, Collections.emptyMap());
168+
}
169+
141170
public Map<PhysicalTablePath, Long> getPartitionIdByPath() {
142171
return partitionIdByPath;
143172
}
173+
174+
public Map<Long, TableInfo> getTableInfoByTableId() {
175+
return tableInfoByTableId;
176+
}
177+
178+
public Map<Long, Map<Integer, BucketMetadata>> getBucketMetadataMapForTables() {
179+
return bucketMetadataMapForTables;
180+
}
181+
182+
public Map<Long, Map<Integer, BucketMetadata>> getBucketMetadataMapForPartitions() {
183+
return bucketMetadataMapForPartitions;
184+
}
144185
}

fluss-server/src/main/java/com/alibaba/fluss/server/metadata/TableMetadata.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import com.alibaba.fluss.metadata.TablePath;
2121

2222
import java.util.List;
23+
import java.util.Objects;
2324

2425
/** This entity used to describe the table metadata. */
2526
public class TableMetadata {
@@ -62,4 +63,34 @@ public TableInfo getTableInfo() {
6263
public List<BucketMetadata> getBucketMetadataList() {
6364
return bucketMetadataList;
6465
}
66+
67+
@Override
68+
public String toString() {
69+
return "TableMetadata{"
70+
+ "tableInfo="
71+
+ tableInfo
72+
+ ", bucketMetadataList="
73+
+ bucketMetadataList
74+
+ '}';
75+
}
76+
77+
@Override
78+
public boolean equals(Object o) {
79+
if (this == o) {
80+
return true;
81+
}
82+
if (o == null || getClass() != o.getClass()) {
83+
return false;
84+
}
85+
TableMetadata that = (TableMetadata) o;
86+
if (!tableInfo.equals(that.tableInfo)) {
87+
return false;
88+
}
89+
return bucketMetadataList.equals(that.bucketMetadataList);
90+
}
91+
92+
@Override
93+
public int hashCode() {
94+
return Objects.hash(tableInfo, bucketMetadataList);
95+
}
6596
}

0 commit comments

Comments
 (0)