Skip to content

Commit 72b10bb

Browse files
authored
[common] Remove TableInfo from Cluster (#2068)
1 parent b85ba5a commit 72b10bb

File tree

11 files changed

+18
-117
lines changed

11 files changed

+18
-117
lines changed

fluss-client/src/main/java/org/apache/fluss/client/metadata/MetadataUpdater.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,7 @@ public TabletServerGateway newRandomTabletServerClient() {
166166
public void checkAndUpdateTableMetadata(Set<TablePath> tablePaths) {
167167
Set<TablePath> needUpdateTablePaths =
168168
tablePaths.stream()
169-
.filter(tablePath -> !cluster.getTable(tablePath).isPresent())
169+
.filter(tablePath -> !cluster.getTableId(tablePath).isPresent())
170170
.collect(Collectors.toSet());
171171
if (!needUpdateTablePaths.isEmpty()) {
172172
updateMetadata(needUpdateTablePaths, null, null);
@@ -188,8 +188,8 @@ public boolean checkAndUpdatePartitionMetadata(PhysicalTablePath physicalTablePa
188188
}
189189

190190
/**
191-
* Check the table/partition info for the given table bucket exist in metadata cache, if not,
192-
* try to update the metadata cache.
191+
* Check the table/partition bucket info for the given table bucket exist in metadata cache, if
192+
* not, try to update the metadata cache.
193193
*/
194194
public void checkAndUpdateMetadata(TablePath tablePath, TableBucket tableBucket) {
195195
if (tableBucket.getPartitionId() == null) {

fluss-client/src/main/java/org/apache/fluss/client/utils/MetadataUtils.java

Lines changed: 2 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,6 @@
2424
import org.apache.fluss.exception.StaleMetadataException;
2525
import org.apache.fluss.metadata.PhysicalTablePath;
2626
import org.apache.fluss.metadata.TableBucket;
27-
import org.apache.fluss.metadata.TableDescriptor;
28-
import org.apache.fluss.metadata.TableInfo;
2927
import org.apache.fluss.metadata.TablePath;
3028
import org.apache.fluss.rpc.GatewayClientProxy;
3129
import org.apache.fluss.rpc.RpcClient;
@@ -121,7 +119,6 @@ public static Cluster sendMetadataRequestAndRebuildCluster(
121119
ServerNode coordinatorServer = getCoordinatorServer(response);
122120

123121
Map<TablePath, Long> newTablePathToTableId;
124-
Map<TablePath, TableInfo> newTablePathToTableInfo;
125122
Map<PhysicalTablePath, List<BucketLocation>> newBucketLocations;
126123
Map<PhysicalTablePath, Long> newPartitionIdByPath;
127124

@@ -133,24 +130,19 @@ public static Cluster sendMetadataRequestAndRebuildCluster(
133130
// the origin cluster.
134131
newTablePathToTableId =
135132
new HashMap<>(originCluster.getTableIdByPath());
136-
newTablePathToTableInfo =
137-
new HashMap<>(originCluster.getTableInfoByPath());
138133
newBucketLocations =
139134
new HashMap<>(originCluster.getBucketLocationsByPath());
140135
newPartitionIdByPath =
141136
new HashMap<>(originCluster.getPartitionIdByPath());
142137

143138
newTablePathToTableId.putAll(newTableMetadata.tablePathToTableId);
144-
newTablePathToTableInfo.putAll(
145-
newTableMetadata.tablePathToTableInfo);
146139
newBucketLocations.putAll(newTableMetadata.bucketLocations);
147140
newPartitionIdByPath.putAll(newTableMetadata.partitionIdByPath);
148141

149142
} else {
150143
// If full update, we will clear all tables info out ot the origin
151144
// cluster.
152145
newTablePathToTableId = newTableMetadata.tablePathToTableId;
153-
newTablePathToTableInfo = newTableMetadata.tablePathToTableInfo;
154146
newBucketLocations = newTableMetadata.bucketLocations;
155147
newPartitionIdByPath = newTableMetadata.partitionIdByPath;
156148
}
@@ -160,8 +152,7 @@ public static Cluster sendMetadataRequestAndRebuildCluster(
160152
coordinatorServer,
161153
newBucketLocations,
162154
newTablePathToTableId,
163-
newPartitionIdByPath,
164-
newTablePathToTableInfo);
155+
newPartitionIdByPath);
165156
})
166157
.get(30, TimeUnit.SECONDS); // TODO currently, we don't have timeout logic in
167158
// RpcClient, it will let the get() block forever. So we
@@ -171,7 +162,6 @@ public static Cluster sendMetadataRequestAndRebuildCluster(
171162
private static NewTableMetadata getTableMetadataToUpdate(
172163
Cluster cluster, MetadataResponse metadataResponse) {
173164
Map<TablePath, Long> newTablePathToTableId = new HashMap<>();
174-
Map<TablePath, TableInfo> newTablePathToTableInfo = new HashMap<>();
175165
Map<PhysicalTablePath, List<BucketLocation>> newBucketLocations = new HashMap<>();
176166
Map<PhysicalTablePath, Long> newPartitionIdByPath = new HashMap<>();
177167

@@ -187,17 +177,6 @@ private static NewTableMetadata getTableMetadataToUpdate(
187177
protoTablePath.getDatabaseName(),
188178
protoTablePath.getTableName());
189179
newTablePathToTableId.put(tablePath, tableId);
190-
TableDescriptor tableDescriptor =
191-
TableDescriptor.fromJsonBytes(pbTableMetadata.getTableJson());
192-
newTablePathToTableInfo.put(
193-
tablePath,
194-
TableInfo.of(
195-
tablePath,
196-
pbTableMetadata.getTableId(),
197-
pbTableMetadata.getSchemaId(),
198-
tableDescriptor,
199-
pbTableMetadata.getCreatedTime(),
200-
pbTableMetadata.getModifiedTime()));
201180

202181
// Get all buckets for the table.
203182
List<PbBucketMetadata> pbBucketMetadataList =
@@ -232,25 +211,19 @@ private static NewTableMetadata getTableMetadataToUpdate(
232211
});
233212

234213
return new NewTableMetadata(
235-
newTablePathToTableId,
236-
newTablePathToTableInfo,
237-
newBucketLocations,
238-
newPartitionIdByPath);
214+
newTablePathToTableId, newBucketLocations, newPartitionIdByPath);
239215
}
240216

241217
private static final class NewTableMetadata {
242218
private final Map<TablePath, Long> tablePathToTableId;
243-
private final Map<TablePath, TableInfo> tablePathToTableInfo;
244219
private final Map<PhysicalTablePath, List<BucketLocation>> bucketLocations;
245220
private final Map<PhysicalTablePath, Long> partitionIdByPath;
246221

247222
public NewTableMetadata(
248223
Map<TablePath, Long> tablePathToTableId,
249-
Map<TablePath, TableInfo> tablePathToTableInfo,
250224
Map<PhysicalTablePath, List<BucketLocation>> bucketLocations,
251225
Map<PhysicalTablePath, Long> partitionIdByPath) {
252226
this.tablePathToTableId = tablePathToTableId;
253-
this.tablePathToTableInfo = tablePathToTableInfo;
254227
this.bucketLocations = bucketLocations;
255228
this.partitionIdByPath = partitionIdByPath;
256229
}

fluss-client/src/test/java/org/apache/fluss/client/metadata/MetadataUpdaterITCase.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,6 @@ void testUpdateWithEmptyMetadataResponse() throws Exception {
108108
null,
109109
Collections.emptyMap(),
110110
Collections.emptyMap(),
111-
Collections.emptyMap(),
112111
Collections.emptyMap());
113112

114113
metadataUpdater = new MetadataUpdater(rpcClient, new Configuration(), newCluster);

fluss-client/src/test/java/org/apache/fluss/client/metadata/TestingMetadataUpdater.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ public void setResponseLogicId(int serverId, int responseLogicId) {
145145
public void checkAndUpdateTableMetadata(Set<TablePath> tablePaths) {
146146
Set<TablePath> needUpdateTablePaths =
147147
tablePaths.stream()
148-
.filter(tablePath -> !cluster.getTable(tablePath).isPresent())
148+
.filter(tablePath -> !cluster.getTableId(tablePath).isPresent())
149149
.collect(Collectors.toSet());
150150
if (!needUpdateTablePaths.isEmpty()) {
151151
throw new IllegalStateException(
@@ -189,7 +189,6 @@ private void initializeCluster(
189189

190190
Map<PhysicalTablePath, List<BucketLocation>> tablePathToBucketLocations = new HashMap<>();
191191
Map<TablePath, Long> tableIdByPath = new HashMap<>();
192-
Map<TablePath, TableInfo> tableInfoByPath = new HashMap<>();
193192
tableInfos.forEach(
194193
(tablePath, tableInfo) -> {
195194
long tableId = tableInfo.getTableId();
@@ -216,15 +215,13 @@ private void initializeCluster(
216215
tabletServers.get(2).id(),
217216
replicas)));
218217
tableIdByPath.put(tablePath, tableId);
219-
tableInfoByPath.put(tablePath, tableInfo);
220218
});
221219
cluster =
222220
new Cluster(
223221
tabletServerMap,
224222
coordinatorServer,
225223
tablePathToBucketLocations,
226224
tableIdByPath,
227-
Collections.emptyMap(),
228-
tableInfoByPath);
225+
Collections.emptyMap());
229226
}
230227
}

fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetcherTest.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -260,8 +260,7 @@ void testFetchWhenDestinationIsNullInMetadata() throws Exception {
260260
oldCluster.getCoordinatorServer(),
261261
oldCluster.getBucketLocationsByPath(),
262262
oldCluster.getTableIdByPath(),
263-
oldCluster.getPartitionIdByPath(),
264-
oldCluster.getTableInfoByPath());
263+
oldCluster.getPartitionIdByPath());
265264
metadataUpdater = new MetadataUpdater(rpcClient, clientConf, newCluster);
266265

267266
LogScannerStatus logScannerStatus = new LogScannerStatus();

fluss-client/src/test/java/org/apache/fluss/client/write/RecordAccumulatorTest.java

Lines changed: 1 addition & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import org.apache.fluss.config.ConfigOptions;
2626
import org.apache.fluss.config.Configuration;
2727
import org.apache.fluss.config.MemorySize;
28-
import org.apache.fluss.metadata.LogFormat;
2928
import org.apache.fluss.metadata.PhysicalTablePath;
3029
import org.apache.fluss.metadata.SchemaGetter;
3130
import org.apache.fluss.metadata.SchemaInfo;
@@ -563,32 +562,12 @@ private Cluster updateCluster(List<BucketLocation> bucketLocations) {
563562

564563
Map<TablePath, Long> tableIdByPath = new HashMap<>();
565564
tableIdByPath.put(DATA1_TABLE_PATH, DATA1_TABLE_ID);
566-
567-
TableInfo data1NonPkTableInfo =
568-
TableInfo.of(
569-
DATA1_TABLE_PATH,
570-
DATA1_TABLE_ID,
571-
1,
572-
TableDescriptor.builder()
573-
// use INDEXED format better memory control
574-
// to test RecordAccumulator
575-
.logFormat(LogFormat.INDEXED)
576-
.schema(DATA1_SCHEMA)
577-
.distributedBy(3)
578-
.build(),
579-
System.currentTimeMillis(),
580-
System.currentTimeMillis());
581-
Map<TablePath, TableInfo> tableInfoByPath = new HashMap<>();
582-
tableInfoByPath.put(DATA1_TABLE_PATH, data1NonPkTableInfo);
583-
tableInfoByPath.put(ZSTD_TABLE_INFO.getTablePath(), ZSTD_TABLE_INFO);
584-
585565
return new Cluster(
586566
aliveTabletServersById,
587567
new ServerNode(0, "localhost", 89, ServerType.COORDINATOR),
588568
bucketsByPath,
589569
tableIdByPath,
590-
Collections.emptyMap(),
591-
tableInfoByPath);
570+
Collections.emptyMap());
592571
}
593572

594573
private void delayedInterrupt(final Thread thread, final long delayMs) {

fluss-client/src/test/java/org/apache/fluss/client/write/SenderTest.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -671,8 +671,7 @@ void testSendWhenDestinationIsNullInMetadata() throws Exception {
671671
oldCluster.getCoordinatorServer(),
672672
oldCluster.getBucketLocationsByPath(),
673673
oldCluster.getTableIdByPath(),
674-
oldCluster.getPartitionIdByPath(),
675-
oldCluster.getTableInfoByPath());
674+
oldCluster.getPartitionIdByPath());
676675

677676
metadataUpdater.updateCluster(newCluster);
678677

fluss-client/src/test/java/org/apache/fluss/client/write/StickyStaticBucketAssignerTest.java

Lines changed: 1 addition & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,6 @@
2222
import org.apache.fluss.cluster.ServerNode;
2323
import org.apache.fluss.cluster.ServerType;
2424
import org.apache.fluss.metadata.PhysicalTablePath;
25-
import org.apache.fluss.metadata.TableDescriptor;
26-
import org.apache.fluss.metadata.TableInfo;
2725
import org.apache.fluss.metadata.TablePath;
2826

2927
import org.junit.jupiter.api.Test;
@@ -39,7 +37,6 @@
3937
import java.util.concurrent.ConcurrentLinkedQueue;
4038

4139
import static org.apache.fluss.record.TestData.DATA1_PHYSICAL_TABLE_PATH;
42-
import static org.apache.fluss.record.TestData.DATA1_SCHEMA;
4340
import static org.apache.fluss.record.TestData.DATA1_TABLE_ID;
4441
import static org.apache.fluss.record.TestData.DATA1_TABLE_PATH;
4542
import static org.assertj.core.api.Assertions.assertThat;
@@ -202,7 +199,6 @@ private Cluster updateCluster(List<BucketLocation> bucketLocations) {
202199

203200
Map<PhysicalTablePath, List<BucketLocation>> bucketsByPath = new HashMap<>();
204201
Map<TablePath, Long> tableIdByPath = new HashMap<>();
205-
Map<TablePath, TableInfo> tableInfoByPath = new HashMap<>();
206202
bucketLocations.forEach(
207203
bucketLocation -> {
208204
PhysicalTablePath physicalTablePath = bucketLocation.getPhysicalTablePath();
@@ -212,26 +208,13 @@ private Cluster updateCluster(List<BucketLocation> bucketLocations) {
212208
tableIdByPath.put(
213209
bucketLocation.getPhysicalTablePath().getTablePath(),
214210
bucketLocation.getTableBucket().getTableId());
215-
tableInfoByPath.put(
216-
physicalTablePath.getTablePath(),
217-
TableInfo.of(
218-
physicalTablePath.getTablePath(),
219-
bucketLocation.getTableBucket().getTableId(),
220-
1,
221-
TableDescriptor.builder()
222-
.schema(DATA1_SCHEMA)
223-
.distributedBy(3)
224-
.build(),
225-
System.currentTimeMillis(),
226-
System.currentTimeMillis()));
227211
});
228212

229213
return new Cluster(
230214
aliveTabletServersById,
231215
new ServerNode(0, "localhost", 89, ServerType.COORDINATOR),
232216
bucketsByPath,
233217
tableIdByPath,
234-
Collections.emptyMap(),
235-
tableInfoByPath);
218+
Collections.emptyMap());
236219
}
237220
}

fluss-common/src/main/java/org/apache/fluss/cluster/Cluster.java

Lines changed: 2 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import org.apache.fluss.exception.PartitionNotExistException;
2222
import org.apache.fluss.metadata.PhysicalTablePath;
2323
import org.apache.fluss.metadata.TableBucket;
24-
import org.apache.fluss.metadata.TableInfo;
2524
import org.apache.fluss.metadata.TablePath;
2625

2726
import javax.annotation.Nullable;
@@ -54,22 +53,17 @@ public final class Cluster {
5453
private final Map<PhysicalTablePath, Long> partitionsIdByPath;
5554
private final Map<Long, String> partitionNameById;
5655

57-
/** Only latest schema of table will be put in it. */
58-
private final Map<TablePath, TableInfo> tableInfoByPath;
59-
6056
public Cluster(
6157
Map<Integer, ServerNode> aliveTabletServersById,
6258
@Nullable ServerNode coordinatorServer,
6359
Map<PhysicalTablePath, List<BucketLocation>> bucketLocationsByPath,
6460
Map<TablePath, Long> tableIdByPath,
65-
Map<PhysicalTablePath, Long> partitionsIdByPath,
66-
Map<TablePath, TableInfo> tableInfoByPath) {
61+
Map<PhysicalTablePath, Long> partitionsIdByPath) {
6762
this.coordinatorServer = coordinatorServer;
6863
this.aliveTabletServersById = Collections.unmodifiableMap(aliveTabletServersById);
6964
this.aliveTabletServers =
7065
Collections.unmodifiableList(new ArrayList<>(aliveTabletServersById.values()));
7166
this.tableIdByPath = Collections.unmodifiableMap(tableIdByPath);
72-
this.tableInfoByPath = Collections.unmodifiableMap(tableInfoByPath);
7367
this.partitionsIdByPath = Collections.unmodifiableMap(partitionsIdByPath);
7468

7569
// Index the bucket locations by table path, and index bucket location by bucket.
@@ -138,8 +132,7 @@ public Cluster invalidPhysicalTableBucketMeta(Set<PhysicalTablePath> physicalTab
138132
coordinatorServer,
139133
newBucketLocationsByPath,
140134
new HashMap<>(tableIdByPath),
141-
new HashMap<>(partitionsIdByPath),
142-
new HashMap<>(tableInfoByPath));
135+
new HashMap<>(partitionsIdByPath));
143136
}
144137

145138
@Nullable
@@ -208,15 +201,6 @@ public List<BucketLocation> getAvailableBucketsForPhysicalTablePath(
208201
return availableLocationsByPath.getOrDefault(physicalTablePath, Collections.emptyList());
209202
}
210203

211-
/**
212-
* Get the table info for this table.
213-
*
214-
* <p>TODO this method need to be remove, use Admin getTableInfo instead.
215-
*/
216-
public Optional<TableInfo> getTable(TablePath tablePath) {
217-
return Optional.ofNullable(tableInfoByPath.get(tablePath));
218-
}
219-
220204
public Optional<Long> getTableId(TablePath tablePath) {
221205
return Optional.ofNullable(tableIdByPath.get(tablePath));
222206
}
@@ -265,11 +249,6 @@ public Map<TablePath, Long> getTableIdByPath() {
265249
return tableIdByPath;
266250
}
267251

268-
/** Get the table info by table. */
269-
public Map<TablePath, TableInfo> getTableInfoByPath() {
270-
return tableInfoByPath;
271-
}
272-
273252
/** Get the bucket by a physical table path. */
274253
public Map<PhysicalTablePath, List<BucketLocation>> getBucketLocationsByPath() {
275254
return availableLocationsByPath;
@@ -286,7 +265,6 @@ public static Cluster empty() {
286265
null,
287266
Collections.emptyMap(),
288267
Collections.emptyMap(),
289-
Collections.emptyMap(),
290268
Collections.emptyMap());
291269
}
292270

0 commit comments

Comments
 (0)