Skip to content

Commit a0f1591

Browse files
authored
Merge branch 'main' into lakesense
2 parents c14537a + 72b10bb commit a0f1591

File tree

48 files changed

+1603
-421
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

48 files changed

+1603
-421
lines changed

fluss-client/src/main/java/org/apache/fluss/client/metadata/MetadataUpdater.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,7 @@ public TabletServerGateway newRandomTabletServerClient() {
166166
public void checkAndUpdateTableMetadata(Set<TablePath> tablePaths) {
167167
Set<TablePath> needUpdateTablePaths =
168168
tablePaths.stream()
169-
.filter(tablePath -> !cluster.getTable(tablePath).isPresent())
169+
.filter(tablePath -> !cluster.getTableId(tablePath).isPresent())
170170
.collect(Collectors.toSet());
171171
if (!needUpdateTablePaths.isEmpty()) {
172172
updateMetadata(needUpdateTablePaths, null, null);
@@ -188,8 +188,8 @@ public boolean checkAndUpdatePartitionMetadata(PhysicalTablePath physicalTablePa
188188
}
189189

190190
/**
191-
* Check the table/partition info for the given table bucket exist in metadata cache, if not,
192-
* try to update the metadata cache.
191+
* Check the table/partition bucket info for the given table bucket exist in metadata cache, if
192+
* not, try to update the metadata cache.
193193
*/
194194
public void checkAndUpdateMetadata(TablePath tablePath, TableBucket tableBucket) {
195195
if (tableBucket.getPartitionId() == null) {

fluss-client/src/main/java/org/apache/fluss/client/utils/MetadataUtils.java

Lines changed: 2 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,6 @@
2424
import org.apache.fluss.exception.StaleMetadataException;
2525
import org.apache.fluss.metadata.PhysicalTablePath;
2626
import org.apache.fluss.metadata.TableBucket;
27-
import org.apache.fluss.metadata.TableDescriptor;
28-
import org.apache.fluss.metadata.TableInfo;
2927
import org.apache.fluss.metadata.TablePath;
3028
import org.apache.fluss.rpc.GatewayClientProxy;
3129
import org.apache.fluss.rpc.RpcClient;
@@ -121,7 +119,6 @@ public static Cluster sendMetadataRequestAndRebuildCluster(
121119
ServerNode coordinatorServer = getCoordinatorServer(response);
122120

123121
Map<TablePath, Long> newTablePathToTableId;
124-
Map<TablePath, TableInfo> newTablePathToTableInfo;
125122
Map<PhysicalTablePath, List<BucketLocation>> newBucketLocations;
126123
Map<PhysicalTablePath, Long> newPartitionIdByPath;
127124

@@ -133,24 +130,19 @@ public static Cluster sendMetadataRequestAndRebuildCluster(
133130
// the origin cluster.
134131
newTablePathToTableId =
135132
new HashMap<>(originCluster.getTableIdByPath());
136-
newTablePathToTableInfo =
137-
new HashMap<>(originCluster.getTableInfoByPath());
138133
newBucketLocations =
139134
new HashMap<>(originCluster.getBucketLocationsByPath());
140135
newPartitionIdByPath =
141136
new HashMap<>(originCluster.getPartitionIdByPath());
142137

143138
newTablePathToTableId.putAll(newTableMetadata.tablePathToTableId);
144-
newTablePathToTableInfo.putAll(
145-
newTableMetadata.tablePathToTableInfo);
146139
newBucketLocations.putAll(newTableMetadata.bucketLocations);
147140
newPartitionIdByPath.putAll(newTableMetadata.partitionIdByPath);
148141

149142
} else {
150143
// If full update, we will clear all tables info out ot the origin
151144
// cluster.
152145
newTablePathToTableId = newTableMetadata.tablePathToTableId;
153-
newTablePathToTableInfo = newTableMetadata.tablePathToTableInfo;
154146
newBucketLocations = newTableMetadata.bucketLocations;
155147
newPartitionIdByPath = newTableMetadata.partitionIdByPath;
156148
}
@@ -160,8 +152,7 @@ public static Cluster sendMetadataRequestAndRebuildCluster(
160152
coordinatorServer,
161153
newBucketLocations,
162154
newTablePathToTableId,
163-
newPartitionIdByPath,
164-
newTablePathToTableInfo);
155+
newPartitionIdByPath);
165156
})
166157
.get(30, TimeUnit.SECONDS); // TODO currently, we don't have timeout logic in
167158
// RpcClient, it will let the get() block forever. So we
@@ -171,7 +162,6 @@ public static Cluster sendMetadataRequestAndRebuildCluster(
171162
private static NewTableMetadata getTableMetadataToUpdate(
172163
Cluster cluster, MetadataResponse metadataResponse) {
173164
Map<TablePath, Long> newTablePathToTableId = new HashMap<>();
174-
Map<TablePath, TableInfo> newTablePathToTableInfo = new HashMap<>();
175165
Map<PhysicalTablePath, List<BucketLocation>> newBucketLocations = new HashMap<>();
176166
Map<PhysicalTablePath, Long> newPartitionIdByPath = new HashMap<>();
177167

@@ -187,17 +177,6 @@ private static NewTableMetadata getTableMetadataToUpdate(
187177
protoTablePath.getDatabaseName(),
188178
protoTablePath.getTableName());
189179
newTablePathToTableId.put(tablePath, tableId);
190-
TableDescriptor tableDescriptor =
191-
TableDescriptor.fromJsonBytes(pbTableMetadata.getTableJson());
192-
newTablePathToTableInfo.put(
193-
tablePath,
194-
TableInfo.of(
195-
tablePath,
196-
pbTableMetadata.getTableId(),
197-
pbTableMetadata.getSchemaId(),
198-
tableDescriptor,
199-
pbTableMetadata.getCreatedTime(),
200-
pbTableMetadata.getModifiedTime()));
201180

202181
// Get all buckets for the table.
203182
List<PbBucketMetadata> pbBucketMetadataList =
@@ -232,25 +211,19 @@ private static NewTableMetadata getTableMetadataToUpdate(
232211
});
233212

234213
return new NewTableMetadata(
235-
newTablePathToTableId,
236-
newTablePathToTableInfo,
237-
newBucketLocations,
238-
newPartitionIdByPath);
214+
newTablePathToTableId, newBucketLocations, newPartitionIdByPath);
239215
}
240216

241217
private static final class NewTableMetadata {
242218
private final Map<TablePath, Long> tablePathToTableId;
243-
private final Map<TablePath, TableInfo> tablePathToTableInfo;
244219
private final Map<PhysicalTablePath, List<BucketLocation>> bucketLocations;
245220
private final Map<PhysicalTablePath, Long> partitionIdByPath;
246221

247222
public NewTableMetadata(
248223
Map<TablePath, Long> tablePathToTableId,
249-
Map<TablePath, TableInfo> tablePathToTableInfo,
250224
Map<PhysicalTablePath, List<BucketLocation>> bucketLocations,
251225
Map<PhysicalTablePath, Long> partitionIdByPath) {
252226
this.tablePathToTableId = tablePathToTableId;
253-
this.tablePathToTableInfo = tablePathToTableInfo;
254227
this.bucketLocations = bucketLocations;
255228
this.partitionIdByPath = partitionIdByPath;
256229
}

fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -659,7 +659,7 @@ void testCreateTableWithInvalidProperty() {
659659
.cause()
660660
.isInstanceOf(InvalidConfigException.class)
661661
.hasMessageContaining(
662-
"Currently, Primary Key Table only supports ARROW log format if kv format is COMPACTED.");
662+
"Currently, Primary Key Table supports ARROW or COMPACTED log format when kv format is COMPACTED.");
663663
}
664664

665665
@Test

fluss-client/src/test/java/org/apache/fluss/client/metadata/MetadataUpdaterITCase.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,6 @@ void testUpdateWithEmptyMetadataResponse() throws Exception {
108108
null,
109109
Collections.emptyMap(),
110110
Collections.emptyMap(),
111-
Collections.emptyMap(),
112111
Collections.emptyMap());
113112

114113
metadataUpdater = new MetadataUpdater(rpcClient, new Configuration(), newCluster);

fluss-client/src/test/java/org/apache/fluss/client/metadata/TestingMetadataUpdater.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ public void setResponseLogicId(int serverId, int responseLogicId) {
145145
public void checkAndUpdateTableMetadata(Set<TablePath> tablePaths) {
146146
Set<TablePath> needUpdateTablePaths =
147147
tablePaths.stream()
148-
.filter(tablePath -> !cluster.getTable(tablePath).isPresent())
148+
.filter(tablePath -> !cluster.getTableId(tablePath).isPresent())
149149
.collect(Collectors.toSet());
150150
if (!needUpdateTablePaths.isEmpty()) {
151151
throw new IllegalStateException(
@@ -189,7 +189,6 @@ private void initializeCluster(
189189

190190
Map<PhysicalTablePath, List<BucketLocation>> tablePathToBucketLocations = new HashMap<>();
191191
Map<TablePath, Long> tableIdByPath = new HashMap<>();
192-
Map<TablePath, TableInfo> tableInfoByPath = new HashMap<>();
193192
tableInfos.forEach(
194193
(tablePath, tableInfo) -> {
195194
long tableId = tableInfo.getTableId();
@@ -216,15 +215,13 @@ private void initializeCluster(
216215
tabletServers.get(2).id(),
217216
replicas)));
218217
tableIdByPath.put(tablePath, tableId);
219-
tableInfoByPath.put(tablePath, tableInfo);
220218
});
221219
cluster =
222220
new Cluster(
223221
tabletServerMap,
224222
coordinatorServer,
225223
tablePathToBucketLocations,
226224
tableIdByPath,
227-
Collections.emptyMap(),
228-
tableInfoByPath);
225+
Collections.emptyMap());
229226
}
230227
}

fluss-client/src/test/java/org/apache/fluss/client/table/FlussTableITCase.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -753,6 +753,11 @@ void testPutAndPoll(String kvFormat) throws Exception {
753753
verifyAppendOrPut(false, "ARROW", kvFormat);
754754
}
755755

756+
@Test
757+
void testPutAndPollCompacted() throws Exception {
758+
verifyAppendOrPut(false, "COMPACTED", "COMPACTED");
759+
}
760+
756761
void verifyAppendOrPut(boolean append, String logFormat, @Nullable String kvFormat)
757762
throws Exception {
758763
Schema schema =
@@ -911,8 +916,9 @@ void testAppendAndProject(String format) throws Exception {
911916
}
912917
}
913918

914-
@Test
915-
void testPutAndProject() throws Exception {
919+
@ParameterizedTest
920+
@ValueSource(strings = {"ARROW", "COMPACTED"})
921+
void testPutAndProject(String changelogFormat) throws Exception {
916922
Schema schema =
917923
Schema.newBuilder()
918924
.column("a", DataTypes.INT())
@@ -921,7 +927,11 @@ void testPutAndProject() throws Exception {
921927
.column("d", DataTypes.BIGINT())
922928
.primaryKey("a")
923929
.build();
924-
TableDescriptor tableDescriptor = TableDescriptor.builder().schema(schema).build();
930+
TableDescriptor tableDescriptor =
931+
TableDescriptor.builder()
932+
.schema(schema)
933+
.property(ConfigOptions.TABLE_LOG_FORMAT.key(), changelogFormat)
934+
.build();
925935
TablePath tablePath = TablePath.of("test_db_1", "test_pk_table_1");
926936
createTable(tablePath, tableDescriptor, false);
927937

fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetcherTest.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -260,8 +260,7 @@ void testFetchWhenDestinationIsNullInMetadata() throws Exception {
260260
oldCluster.getCoordinatorServer(),
261261
oldCluster.getBucketLocationsByPath(),
262262
oldCluster.getTableIdByPath(),
263-
oldCluster.getPartitionIdByPath(),
264-
oldCluster.getTableInfoByPath());
263+
oldCluster.getPartitionIdByPath());
265264
metadataUpdater = new MetadataUpdater(rpcClient, clientConf, newCluster);
266265

267266
LogScannerStatus logScannerStatus = new LogScannerStatus();

fluss-client/src/test/java/org/apache/fluss/client/write/RecordAccumulatorTest.java

Lines changed: 1 addition & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import org.apache.fluss.config.ConfigOptions;
2626
import org.apache.fluss.config.Configuration;
2727
import org.apache.fluss.config.MemorySize;
28-
import org.apache.fluss.metadata.LogFormat;
2928
import org.apache.fluss.metadata.PhysicalTablePath;
3029
import org.apache.fluss.metadata.SchemaGetter;
3130
import org.apache.fluss.metadata.SchemaInfo;
@@ -563,32 +562,12 @@ private Cluster updateCluster(List<BucketLocation> bucketLocations) {
563562

564563
Map<TablePath, Long> tableIdByPath = new HashMap<>();
565564
tableIdByPath.put(DATA1_TABLE_PATH, DATA1_TABLE_ID);
566-
567-
TableInfo data1NonPkTableInfo =
568-
TableInfo.of(
569-
DATA1_TABLE_PATH,
570-
DATA1_TABLE_ID,
571-
1,
572-
TableDescriptor.builder()
573-
// use INDEXED format better memory control
574-
// to test RecordAccumulator
575-
.logFormat(LogFormat.INDEXED)
576-
.schema(DATA1_SCHEMA)
577-
.distributedBy(3)
578-
.build(),
579-
System.currentTimeMillis(),
580-
System.currentTimeMillis());
581-
Map<TablePath, TableInfo> tableInfoByPath = new HashMap<>();
582-
tableInfoByPath.put(DATA1_TABLE_PATH, data1NonPkTableInfo);
583-
tableInfoByPath.put(ZSTD_TABLE_INFO.getTablePath(), ZSTD_TABLE_INFO);
584-
585565
return new Cluster(
586566
aliveTabletServersById,
587567
new ServerNode(0, "localhost", 89, ServerType.COORDINATOR),
588568
bucketsByPath,
589569
tableIdByPath,
590-
Collections.emptyMap(),
591-
tableInfoByPath);
570+
Collections.emptyMap());
592571
}
593572

594573
private void delayedInterrupt(final Thread thread, final long delayMs) {

fluss-client/src/test/java/org/apache/fluss/client/write/SenderTest.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -671,8 +671,7 @@ void testSendWhenDestinationIsNullInMetadata() throws Exception {
671671
oldCluster.getCoordinatorServer(),
672672
oldCluster.getBucketLocationsByPath(),
673673
oldCluster.getTableIdByPath(),
674-
oldCluster.getPartitionIdByPath(),
675-
oldCluster.getTableInfoByPath());
674+
oldCluster.getPartitionIdByPath());
676675

677676
metadataUpdater.updateCluster(newCluster);
678677

fluss-client/src/test/java/org/apache/fluss/client/write/StickyStaticBucketAssignerTest.java

Lines changed: 1 addition & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,6 @@
2222
import org.apache.fluss.cluster.ServerNode;
2323
import org.apache.fluss.cluster.ServerType;
2424
import org.apache.fluss.metadata.PhysicalTablePath;
25-
import org.apache.fluss.metadata.TableDescriptor;
26-
import org.apache.fluss.metadata.TableInfo;
2725
import org.apache.fluss.metadata.TablePath;
2826

2927
import org.junit.jupiter.api.Test;
@@ -39,7 +37,6 @@
3937
import java.util.concurrent.ConcurrentLinkedQueue;
4038

4139
import static org.apache.fluss.record.TestData.DATA1_PHYSICAL_TABLE_PATH;
42-
import static org.apache.fluss.record.TestData.DATA1_SCHEMA;
4340
import static org.apache.fluss.record.TestData.DATA1_TABLE_ID;
4441
import static org.apache.fluss.record.TestData.DATA1_TABLE_PATH;
4542
import static org.assertj.core.api.Assertions.assertThat;
@@ -202,7 +199,6 @@ private Cluster updateCluster(List<BucketLocation> bucketLocations) {
202199

203200
Map<PhysicalTablePath, List<BucketLocation>> bucketsByPath = new HashMap<>();
204201
Map<TablePath, Long> tableIdByPath = new HashMap<>();
205-
Map<TablePath, TableInfo> tableInfoByPath = new HashMap<>();
206202
bucketLocations.forEach(
207203
bucketLocation -> {
208204
PhysicalTablePath physicalTablePath = bucketLocation.getPhysicalTablePath();
@@ -212,26 +208,13 @@ private Cluster updateCluster(List<BucketLocation> bucketLocations) {
212208
tableIdByPath.put(
213209
bucketLocation.getPhysicalTablePath().getTablePath(),
214210
bucketLocation.getTableBucket().getTableId());
215-
tableInfoByPath.put(
216-
physicalTablePath.getTablePath(),
217-
TableInfo.of(
218-
physicalTablePath.getTablePath(),
219-
bucketLocation.getTableBucket().getTableId(),
220-
1,
221-
TableDescriptor.builder()
222-
.schema(DATA1_SCHEMA)
223-
.distributedBy(3)
224-
.build(),
225-
System.currentTimeMillis(),
226-
System.currentTimeMillis()));
227211
});
228212

229213
return new Cluster(
230214
aliveTabletServersById,
231215
new ServerNode(0, "localhost", 89, ServerType.COORDINATOR),
232216
bucketsByPath,
233217
tableIdByPath,
234-
Collections.emptyMap(),
235-
tableInfoByPath);
218+
Collections.emptyMap());
236219
}
237220
}

0 commit comments

Comments
 (0)