Skip to content

Commit 1f2e546

Browse files
committed
revert unneeded changes
1 parent 8b8a33d commit 1f2e546

File tree

22 files changed

+49
-294
lines changed

22 files changed

+49
-294
lines changed

fluss-client/src/main/java/org/apache/fluss/client/utils/MetadataUtils.java

Lines changed: 4 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import org.apache.fluss.exception.StaleMetadataException;
2525
import org.apache.fluss.metadata.PhysicalTablePath;
2626
import org.apache.fluss.metadata.TableBucket;
27-
import org.apache.fluss.metadata.TableInfo;
2827
import org.apache.fluss.metadata.TablePath;
2928
import org.apache.fluss.rpc.GatewayClientProxy;
3029
import org.apache.fluss.rpc.RpcClient;
@@ -126,7 +125,6 @@ public static Cluster sendMetadataRequestAndRebuildCluster(
126125
NewTableMetadata newTableMetadata =
127126
getTableMetadataToUpdate(originCluster, response);
128127

129-
Map<TablePath, TableInfo> newTableInfoByPath;
130128
if (partialUpdate) {
131129
// If partial update, we will clear the to be updated table out ot
132130
// the origin cluster.
@@ -136,29 +134,25 @@ public static Cluster sendMetadataRequestAndRebuildCluster(
136134
new HashMap<>(originCluster.getBucketLocationsByPath());
137135
newPartitionIdByPath =
138136
new HashMap<>(originCluster.getPartitionIdByPath());
139-
newTableInfoByPath =
140-
new HashMap<>(originCluster.getTableInfoByPath());
141137

142138
newTablePathToTableId.putAll(newTableMetadata.tablePathToTableId);
143139
newBucketLocations.putAll(newTableMetadata.bucketLocations);
144140
newPartitionIdByPath.putAll(newTableMetadata.partitionIdByPath);
145-
newTableInfoByPath.putAll(newTableMetadata.tableInfoByPath);
141+
146142
} else {
147143
// If full update, we will clear all tables info out ot the origin
148144
// cluster.
149145
newTablePathToTableId = newTableMetadata.tablePathToTableId;
150146
newBucketLocations = newTableMetadata.bucketLocations;
151147
newPartitionIdByPath = newTableMetadata.partitionIdByPath;
152-
newTableInfoByPath = newTableMetadata.tableInfoByPath;
153148
}
154149

155150
return new Cluster(
156151
newAliveTabletServers,
157152
coordinatorServer,
158153
newBucketLocations,
159154
newTablePathToTableId,
160-
newPartitionIdByPath,
161-
newTableInfoByPath);
155+
newPartitionIdByPath);
162156
})
163157
.get(30, TimeUnit.SECONDS); // TODO currently, we don't have timeout logic in
164158
// RpcClient, it will let the get() block forever. So we
@@ -170,7 +164,6 @@ private static NewTableMetadata getTableMetadataToUpdate(
170164
Map<TablePath, Long> newTablePathToTableId = new HashMap<>();
171165
Map<PhysicalTablePath, List<BucketLocation>> newBucketLocations = new HashMap<>();
172166
Map<PhysicalTablePath, Long> newPartitionIdByPath = new HashMap<>();
173-
Map<TablePath, TableInfo> newTableInfoByPath = new HashMap<>();
174167

175168
// iterate all table metadata
176169
List<PbTableMetadata> pbTableMetadataList = metadataResponse.getTableMetadatasList();
@@ -218,27 +211,21 @@ private static NewTableMetadata getTableMetadataToUpdate(
218211
});
219212

220213
return new NewTableMetadata(
221-
newTablePathToTableId,
222-
newBucketLocations,
223-
newPartitionIdByPath,
224-
newTableInfoByPath);
214+
newTablePathToTableId, newBucketLocations, newPartitionIdByPath);
225215
}
226216

227217
private static final class NewTableMetadata {
228218
private final Map<TablePath, Long> tablePathToTableId;
229219
private final Map<PhysicalTablePath, List<BucketLocation>> bucketLocations;
230220
private final Map<PhysicalTablePath, Long> partitionIdByPath;
231-
private final Map<TablePath, TableInfo> tableInfoByPath;
232221

233222
public NewTableMetadata(
234223
Map<TablePath, Long> tablePathToTableId,
235224
Map<PhysicalTablePath, List<BucketLocation>> bucketLocations,
236-
Map<PhysicalTablePath, Long> partitionIdByPath,
237-
Map<TablePath, TableInfo> tableInfoByPath) {
225+
Map<PhysicalTablePath, Long> partitionIdByPath) {
238226
this.tablePathToTableId = tablePathToTableId;
239227
this.bucketLocations = bucketLocations;
240228
this.partitionIdByPath = partitionIdByPath;
241-
this.tableInfoByPath = tableInfoByPath;
242229
}
243230
}
244231

fluss-client/src/test/java/org/apache/fluss/client/admin/ClientToServerITCaseBase.java

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import org.apache.fluss.client.table.scanner.ScanRecord;
2727
import org.apache.fluss.client.table.scanner.log.LogScanner;
2828
import org.apache.fluss.client.table.scanner.log.ScanRecords;
29-
import org.apache.fluss.client.table.scanner.log.TypedLogScanner;
3029
import org.apache.fluss.client.table.writer.UpsertWriter;
3130
import org.apache.fluss.config.ConfigOptions;
3231
import org.apache.fluss.config.Configuration;
@@ -145,13 +144,6 @@ protected static void subscribeFromBeginning(LogScanner logScanner, Table table)
145144
}
146145
}
147146

148-
protected static void subscribeFromBeginning(TypedLogScanner<?> logScanner, Table table) {
149-
int bucketCount = table.getTableInfo().getNumBuckets();
150-
for (int i = 0; i < bucketCount; i++) {
151-
logScanner.subscribeFromBeginning(i);
152-
}
153-
}
154-
155147
protected static void subscribeFromTimestamp(
156148
TablePath tablePath,
157149
@Nullable String partitionName,

fluss-client/src/test/java/org/apache/fluss/client/metadata/MetadataUpdaterITCase.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,6 @@ void testUpdateWithEmptyMetadataResponse() throws Exception {
108108
null,
109109
Collections.emptyMap(),
110110
Collections.emptyMap(),
111-
Collections.emptyMap(),
112111
Collections.emptyMap());
113112

114113
metadataUpdater = new MetadataUpdater(rpcClient, new Configuration(), newCluster);

fluss-client/src/test/java/org/apache/fluss/client/metadata/TestingMetadataUpdater.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -218,7 +218,6 @@ private void initializeCluster(
218218
coordinatorServer,
219219
tablePathToBucketLocations,
220220
tableIdByPath,
221-
Collections.emptyMap(),
222-
tableInfos);
221+
Collections.emptyMap());
223222
}
224223
}

fluss-client/src/test/java/org/apache/fluss/client/table/FlussTableITCase.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1217,9 +1217,7 @@ void testFirstRowMergeEngine(boolean doProjection) throws Exception {
12171217
List<ScanRecord> actualLogRecords = new ArrayList<>(0);
12181218
while (actualLogRecords.size() < rows) {
12191219
ScanRecords scanRecords = logScanner.poll(Duration.ofSeconds(1));
1220-
for (ScanRecord rec : scanRecords) {
1221-
actualLogRecords.add(rec);
1222-
}
1220+
scanRecords.forEach(actualLogRecords::add);
12231221
}
12241222
logScanner.close();
12251223
assertThat(actualLogRecords).hasSize(rows);
@@ -1388,9 +1386,7 @@ void testMergeEngineWithVersion(boolean doProjection) throws Exception {
13881386
List<ScanRecord> actualLogRecords = new ArrayList<>(rows);
13891387
while (actualLogRecords.size() < rows) {
13901388
ScanRecords scanRecords = logScanner.poll(Duration.ofSeconds(1));
1391-
for (ScanRecord rec : scanRecords) {
1392-
actualLogRecords.add(rec);
1393-
}
1389+
scanRecords.forEach(actualLogRecords::add);
13941390
}
13951391
logScanner.close();
13961392

fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetcherITCase.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -258,8 +258,7 @@ void testFetchWhenDestinationIsNullInMetadata() throws Exception {
258258
oldCluster.getCoordinatorServer(),
259259
oldCluster.getBucketLocationsByPath(),
260260
oldCluster.getTableIdByPath(),
261-
oldCluster.getPartitionIdByPath(),
262-
oldCluster.getTableInfoByPath());
261+
oldCluster.getPartitionIdByPath());
263262
metadataUpdater = new MetadataUpdater(rpcClient, clientConf, newCluster);
264263

265264
LogScannerStatus logScannerStatus = new LogScannerStatus();

fluss-client/src/test/java/org/apache/fluss/client/write/RecordAccumulatorTest.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -569,8 +569,7 @@ private Cluster updateCluster(List<BucketLocation> bucketLocations) {
569569
new ServerNode(0, "localhost", 89, ServerType.COORDINATOR),
570570
bucketsByPath,
571571
tableIdByPath,
572-
Collections.emptyMap(),
573-
tableInfoByPath);
572+
Collections.emptyMap());
574573
}
575574

576575
private void delayedInterrupt(final Thread thread, final long delayMs) {

fluss-client/src/test/java/org/apache/fluss/client/write/SenderTest.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -671,8 +671,7 @@ void testSendWhenDestinationIsNullInMetadata() throws Exception {
671671
oldCluster.getCoordinatorServer(),
672672
oldCluster.getBucketLocationsByPath(),
673673
oldCluster.getTableIdByPath(),
674-
oldCluster.getPartitionIdByPath(),
675-
oldCluster.getTableInfoByPath());
674+
oldCluster.getPartitionIdByPath());
676675

677676
metadataUpdater.updateCluster(newCluster);
678677

fluss-client/src/test/java/org/apache/fluss/client/write/StickyStaticBucketAssignerTest.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -215,7 +215,6 @@ private Cluster updateCluster(List<BucketLocation> bucketLocations) {
215215
new ServerNode(0, "localhost", 89, ServerType.COORDINATOR),
216216
bucketsByPath,
217217
tableIdByPath,
218-
Collections.emptyMap(),
219218
Collections.emptyMap());
220219
}
221220
}

fluss-common/src/main/java/org/apache/fluss/cluster/Cluster.java

Lines changed: 2 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import org.apache.fluss.exception.PartitionNotExistException;
2222
import org.apache.fluss.metadata.PhysicalTablePath;
2323
import org.apache.fluss.metadata.TableBucket;
24-
import org.apache.fluss.metadata.TableInfo;
2524
import org.apache.fluss.metadata.TablePath;
2625

2726
import javax.annotation.Nullable;
@@ -53,22 +52,19 @@ public final class Cluster {
5352
private final Map<Long, TablePath> pathByTableId;
5453
private final Map<PhysicalTablePath, Long> partitionsIdByPath;
5554
private final Map<Long, String> partitionNameById;
56-
private final Map<TablePath, TableInfo> tableInfoByPath;
5755

5856
public Cluster(
5957
Map<Integer, ServerNode> aliveTabletServersById,
6058
@Nullable ServerNode coordinatorServer,
6159
Map<PhysicalTablePath, List<BucketLocation>> bucketLocationsByPath,
6260
Map<TablePath, Long> tableIdByPath,
63-
Map<PhysicalTablePath, Long> partitionsIdByPath,
64-
Map<TablePath, TableInfo> tableInfoByPath) {
61+
Map<PhysicalTablePath, Long> partitionsIdByPath) {
6562
this.coordinatorServer = coordinatorServer;
6663
this.aliveTabletServersById = Collections.unmodifiableMap(aliveTabletServersById);
6764
this.aliveTabletServers =
6865
Collections.unmodifiableList(new ArrayList<>(aliveTabletServersById.values()));
6966
this.tableIdByPath = Collections.unmodifiableMap(tableIdByPath);
7067
this.partitionsIdByPath = Collections.unmodifiableMap(partitionsIdByPath);
71-
this.tableInfoByPath = Collections.unmodifiableMap(tableInfoByPath);
7268

7369
// Index the bucket locations by table path, and index bucket location by bucket.
7470
// Note that this code is performance sensitive if there are a large number of buckets,
@@ -136,8 +132,7 @@ public Cluster invalidPhysicalTableBucketMeta(Set<PhysicalTablePath> physicalTab
136132
coordinatorServer,
137133
newBucketLocationsByPath,
138134
new HashMap<>(tableIdByPath),
139-
new HashMap<>(partitionsIdByPath),
140-
new HashMap<>(tableInfoByPath));
135+
new HashMap<>(partitionsIdByPath));
141136
}
142137

143138
@Nullable
@@ -171,21 +166,6 @@ public TablePath getTablePathOrElseThrow(long tableId) {
171166
+ " in cluster"));
172167
}
173168

174-
/** Get the table info for this table path. */
175-
public Optional<TableInfo> getTableInfo(TablePath tablePath) {
176-
return Optional.ofNullable(tableInfoByPath.get(tablePath));
177-
}
178-
179-
public TableInfo getTableInfoOrElseThrow(TablePath tablePath) {
180-
return getTableInfo(tablePath)
181-
.orElseThrow(
182-
() ->
183-
new IllegalArgumentException(
184-
"table info not found for tablePath "
185-
+ tablePath
186-
+ " in cluster"));
187-
}
188-
189169
/** Get the bucket location for this table-bucket. */
190170
public Optional<BucketLocation> getBucketLocation(TableBucket tableBucket) {
191171
return Optional.ofNullable(availableLocationByBucket.get(tableBucket));
@@ -278,18 +258,13 @@ public Map<PhysicalTablePath, Long> getPartitionIdByPath() {
278258
return partitionsIdByPath;
279259
}
280260

281-
public Map<TablePath, TableInfo> getTableInfoByPath() {
282-
return tableInfoByPath;
283-
}
284-
285261
/** Create an empty cluster instance with no nodes and no table-buckets. */
286262
public static Cluster empty() {
287263
return new Cluster(
288264
Collections.emptyMap(),
289265
null,
290266
Collections.emptyMap(),
291267
Collections.emptyMap(),
292-
Collections.emptyMap(),
293268
Collections.emptyMap());
294269
}
295270

0 commit comments

Comments
 (0)