Skip to content

Commit 07721fe

Browse files
committed
[server][client] Improve the design and performance of ADD COLUMN operation
1. Make lookup requests async. 2. Introduce ServerProjectionCache to share ProjectionInfo in TabletServer level. 3. Revert FlinkAsFlussRow changes, and introduce PaddingRow for the padding null columns for schema changes. 4. Make PutKv zero-copy to persist kv records. 5. Rename SchemaMetadataManager to ServerSchemaCache and use TableId instead of TablePath to track schemas which is safer. 6. Revert source related changes to still include the projectedFieldIndexes parameter.
1 parent ea2d61c commit 07721fe

File tree

136 files changed

+1820
-1224
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

136 files changed

+1820
-1224
lines changed

fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java

Lines changed: 3 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -88,9 +88,8 @@
8888
import java.util.List;
8989
import java.util.Map;
9090
import java.util.concurrent.CompletableFuture;
91-
import java.util.stream.Collectors;
9291

93-
import static org.apache.fluss.client.utils.ClientRpcMessageUtils.addPbAlterSchemas;
92+
import static org.apache.fluss.client.utils.ClientRpcMessageUtils.makeAlterTableRequest;
9493
import static org.apache.fluss.client.utils.ClientRpcMessageUtils.makeCreatePartitionRequest;
9594
import static org.apache.fluss.client.utils.ClientRpcMessageUtils.makeDropPartitionRequest;
9695
import static org.apache.fluss.client.utils.ClientRpcMessageUtils.makeListOffsetsRequest;
@@ -249,23 +248,8 @@ public CompletableFuture<Void> createTable(
249248
public CompletableFuture<Void> alterTable(
250249
TablePath tablePath, List<TableChange> tableChanges, boolean ignoreIfNotExists) {
251250
tablePath.validate();
252-
AlterTableRequest request = new AlterTableRequest();
253-
List<PbAlterConfig> pbFlussTableChanges =
254-
tableChanges.stream()
255-
.filter(tableChange -> !(tableChange instanceof TableChange.SchemaChange))
256-
.map(ClientRpcMessageUtils::toPbAlterConfigs)
257-
.collect(Collectors.toList());
258-
259-
List<TableChange> schemaChanges =
260-
tableChanges.stream()
261-
.filter(tableChange -> tableChange instanceof TableChange.SchemaChange)
262-
.collect(Collectors.toList());
263-
addPbAlterSchemas(request, schemaChanges);
264-
request.addAllConfigChanges(pbFlussTableChanges)
265-
.setIgnoreIfNotExists(ignoreIfNotExists)
266-
.setTablePath()
267-
.setDatabaseName(tablePath.getDatabaseName())
268-
.setTableName(tablePath.getTableName());
251+
AlterTableRequest request =
252+
makeAlterTableRequest(tablePath, tableChanges, ignoreIfNotExists);
269253
return gateway.alterTable(request).thenApply(r -> null);
270254
}
271255

Lines changed: 180 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,180 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.client.lookup;
19+
20+
import org.apache.fluss.client.metadata.MetadataUpdater;
21+
import org.apache.fluss.memory.MemorySegment;
22+
import org.apache.fluss.metadata.Schema;
23+
import org.apache.fluss.metadata.SchemaGetter;
24+
import org.apache.fluss.metadata.SchemaInfo;
25+
import org.apache.fluss.metadata.TableInfo;
26+
import org.apache.fluss.row.InternalRow;
27+
import org.apache.fluss.row.decode.FixedSchemaDecoder;
28+
import org.apache.fluss.utils.CopyOnWriteMap;
29+
import org.apache.fluss.utils.concurrent.FutureUtils;
30+
31+
import java.util.ArrayList;
32+
import java.util.Collection;
33+
import java.util.HashMap;
34+
import java.util.HashSet;
35+
import java.util.List;
36+
import java.util.Map;
37+
import java.util.Set;
38+
import java.util.concurrent.CompletableFuture;
39+
40+
import static org.apache.fluss.utils.Preconditions.checkArgument;
41+
42+
/** Abstract lookuper implementation for common methods. */
43+
abstract class AbstractLookuper implements Lookuper {
44+
protected final TableInfo tableInfo;
45+
46+
protected final MetadataUpdater metadataUpdater;
47+
48+
protected final LookupClient lookupClient;
49+
50+
protected final short targetSchemaId;
51+
52+
private final SchemaGetter schemaGetter;
53+
54+
/**
55+
* Cache for row decoders for different schema ids. Use CopyOnWriteMap for fast access, as it is
56+
* not frequently updated.
57+
*/
58+
private final CopyOnWriteMap<Short, FixedSchemaDecoder> decoders;
59+
60+
AbstractLookuper(
61+
TableInfo tableInfo,
62+
MetadataUpdater metadataUpdater,
63+
LookupClient lookupClient,
64+
SchemaGetter schemaGetter) {
65+
this.tableInfo = tableInfo;
66+
this.metadataUpdater = metadataUpdater;
67+
this.lookupClient = lookupClient;
68+
this.targetSchemaId = (short) tableInfo.getSchemaId();
69+
this.schemaGetter = schemaGetter;
70+
this.decoders = new CopyOnWriteMap<>();
71+
// initialize the decoder for the same schema
72+
this.decoders.put(
73+
targetSchemaId,
74+
new FixedSchemaDecoder(
75+
tableInfo.getTableConfig().getKvFormat(), tableInfo.getSchema()));
76+
}
77+
78+
protected void handleLookupResponse(
79+
List<byte[]> result, CompletableFuture<LookupResult> lookupFuture) {
80+
List<MemorySegment> valueList = new ArrayList<>(result.size());
81+
Set<Short> schemaIdsToRequest = new HashSet<>();
82+
boolean allTargetSchema = true;
83+
for (byte[] valueBytes : result) {
84+
if (valueBytes == null) {
85+
continue;
86+
}
87+
MemorySegment memorySegment = MemorySegment.wrap(valueBytes);
88+
short schemaId = memorySegment.getShort(0);
89+
if (targetSchemaId != schemaId) {
90+
allTargetSchema = false;
91+
if (!decoders.containsKey(schemaId)) {
92+
schemaIdsToRequest.add(schemaId);
93+
}
94+
}
95+
valueList.add(memorySegment);
96+
}
97+
// all schema ids are the target schema id, fast path
98+
if (allTargetSchema) {
99+
lookupFuture.complete(processAllTargetSchemaRows(valueList));
100+
return;
101+
}
102+
103+
// all schema ids and decoders are cached in local
104+
if (schemaIdsToRequest.isEmpty()) {
105+
lookupFuture.complete(processSchemaMismatchedRows(valueList));
106+
return;
107+
}
108+
109+
// need to fetch schema infos for schema ids
110+
List<CompletableFuture<SchemaInfo>> schemaFutures =
111+
new ArrayList<>(schemaIdsToRequest.size());
112+
for (short schemaId : schemaIdsToRequest) {
113+
CompletableFuture<SchemaInfo> schemaFuture = schemaGetter.getSchemaInfoAsync(schemaId);
114+
schemaFutures.add(schemaFuture);
115+
}
116+
FutureUtils.ConjunctFuture<Collection<SchemaInfo>> allFutures =
117+
FutureUtils.combineAll(schemaFutures);
118+
// normal async path
119+
allFutures.whenComplete(
120+
(schemas, error) -> {
121+
if (error != null) {
122+
lookupFuture.completeExceptionally(
123+
new RuntimeException(
124+
"Failed to get schema infos for lookup", error));
125+
} else {
126+
LookupResult lookupResult = processSchemaRequestedRows(schemas, valueList);
127+
lookupFuture.complete(lookupResult);
128+
}
129+
});
130+
}
131+
132+
protected LookupResult processAllTargetSchemaRows(List<MemorySegment> valueList) {
133+
FixedSchemaDecoder decoder = decoders.get(targetSchemaId);
134+
List<InternalRow> rowList = new ArrayList<>(valueList.size());
135+
for (MemorySegment value : valueList) {
136+
rowList.add(decoder.decode(value));
137+
}
138+
return new LookupResult(rowList);
139+
}
140+
141+
protected LookupResult processSchemaMismatchedRows(List<MemorySegment> valueList) {
142+
List<InternalRow> rowList = new ArrayList<>(valueList.size());
143+
for (MemorySegment value : valueList) {
144+
short schemaId = value.getShort(0);
145+
FixedSchemaDecoder decoder = decoders.get(schemaId);
146+
checkArgument(decoder != null, "Decoder for schema id %s not found", schemaId);
147+
InternalRow row = decoder.decode(value);
148+
rowList.add(row);
149+
}
150+
return new LookupResult(rowList);
151+
}
152+
153+
protected LookupResult processSchemaRequestedRows(
154+
Collection<SchemaInfo> schemaInfos, List<MemorySegment> valueList) {
155+
// build a map from schema id to the target schema decoder
156+
Map<Short, Schema> schemaMap = new HashMap<>();
157+
for (SchemaInfo schemaInfo : schemaInfos) {
158+
schemaMap.put((short) schemaInfo.getSchemaId(), schemaInfo.getSchema());
159+
}
160+
161+
// process the value list to convert to target schema
162+
List<InternalRow> rowList = new ArrayList<>(valueList.size());
163+
for (MemorySegment value : valueList) {
164+
short schemaId = value.getShort(0);
165+
FixedSchemaDecoder decoder =
166+
decoders.computeIfAbsent(
167+
schemaId,
168+
(id) -> {
169+
Schema sourceSchema = schemaMap.get(id);
170+
return new FixedSchemaDecoder(
171+
tableInfo.getTableConfig().getKvFormat(),
172+
sourceSchema,
173+
tableInfo.getSchema());
174+
});
175+
InternalRow row = decoder.decode(value);
176+
rowList.add(row);
177+
}
178+
return new LookupResult(rowList);
179+
}
180+
}

fluss-client/src/main/java/org/apache/fluss/client/lookup/PrefixKeyLookuper.java

Lines changed: 16 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,11 @@
2222
import org.apache.fluss.client.table.getter.PartitionGetter;
2323
import org.apache.fluss.exception.PartitionNotExistException;
2424
import org.apache.fluss.metadata.DataLakeFormat;
25-
import org.apache.fluss.metadata.Schema;
2625
import org.apache.fluss.metadata.SchemaGetter;
2726
import org.apache.fluss.metadata.TableBucket;
2827
import org.apache.fluss.metadata.TableInfo;
2928
import org.apache.fluss.row.InternalRow;
30-
import org.apache.fluss.row.ProjectedRow;
3129
import org.apache.fluss.row.encode.KeyEncoder;
32-
import org.apache.fluss.row.encode.ValueDecoder;
3330
import org.apache.fluss.types.RowType;
3431

3532
import javax.annotation.Nullable;
@@ -49,13 +46,7 @@
4946
* of the primary key.
5047
*/
5148
@NotThreadSafe
52-
class PrefixKeyLookuper implements Lookuper {
53-
54-
private final TableInfo tableInfo;
55-
56-
private final MetadataUpdater metadataUpdater;
57-
58-
private final LookupClient lookupClient;
49+
class PrefixKeyLookuper extends AbstractLookuper {
5950

6051
/** Extract bucket key from prefix lookup key row. */
6152
private final KeyEncoder bucketKeyEncoder;
@@ -68,24 +59,16 @@ class PrefixKeyLookuper implements Lookuper {
6859
*/
6960
private @Nullable final PartitionGetter partitionGetter;
7061

71-
/** Decode the lookup bytes to result row. */
72-
private final ValueDecoder kvValueDecoder;
73-
74-
private final SchemaGetter schemaGetter;
75-
7662
public PrefixKeyLookuper(
7763
TableInfo tableInfo,
7864
SchemaGetter schemaGetter,
7965
MetadataUpdater metadataUpdater,
8066
LookupClient lookupClient,
8167
List<String> lookupColumnNames) {
68+
super(tableInfo, metadataUpdater, lookupClient, schemaGetter);
8269
// sanity check
8370
validatePrefixLookup(tableInfo, lookupColumnNames);
84-
// initialization
85-
this.tableInfo = tableInfo;
8671
this.numBuckets = tableInfo.getNumBuckets();
87-
this.metadataUpdater = metadataUpdater;
88-
this.lookupClient = lookupClient;
8972
// the row type of the input lookup row
9073
RowType lookupRowType = tableInfo.getRowType().project(lookupColumnNames);
9174
DataLakeFormat lakeFormat = tableInfo.getTableConfig().getDataLakeFormat().orElse(null);
@@ -96,9 +79,6 @@ public PrefixKeyLookuper(
9679
tableInfo.isPartitioned()
9780
? new PartitionGetter(lookupRowType, tableInfo.getPartitionKeys())
9881
: null;
99-
this.kvValueDecoder =
100-
new ValueDecoder(schemaGetter, tableInfo.getTableConfig().getKvFormat());
101-
this.schemaGetter = schemaGetter;
10282
}
10383

10484
private void validatePrefixLookup(TableInfo tableInfo, List<String> lookupColumns) {
@@ -168,36 +148,22 @@ public CompletableFuture<LookupResult> lookup(InternalRow prefixKey) {
168148
}
169149
}
170150

151+
CompletableFuture<LookupResult> lookupFuture = new CompletableFuture<>();
171152
TableBucket tableBucket = new TableBucket(tableInfo.getTableId(), partitionId, bucketId);
172-
CompletableFuture<LookupResult> future = new CompletableFuture<>();
173-
174-
CompletableFuture.runAsync(
175-
() -> {
176-
try {
177-
List<byte[]> result =
178-
lookupClient.prefixLookup(tableBucket, bucketKeyBytes).get();
179-
List<InternalRow> rowList = new ArrayList<>(result.size());
180-
for (byte[] valueBytes : result) {
181-
if (valueBytes == null) {
182-
continue;
183-
}
184-
ValueDecoder.Value value = kvValueDecoder.decodeValue(valueBytes);
185-
InternalRow row;
186-
if (value.schemaId == tableInfo.getSchemaId()) {
187-
row = value.row;
153+
lookupClient
154+
.prefixLookup(tableBucket, bucketKeyBytes)
155+
.whenComplete(
156+
(result, error) -> {
157+
if (error != null) {
158+
lookupFuture.completeExceptionally(
159+
new RuntimeException(
160+
"Failed to perform prefix lookup for table: "
161+
+ tableInfo.getTablePath(),
162+
error));
188163
} else {
189-
Schema schema = schemaGetter.getSchema(value.schemaId);
190-
row =
191-
ProjectedRow.from(schema, tableInfo.getSchema())
192-
.replaceRow(value.row);
164+
handleLookupResponse(result, lookupFuture);
193165
}
194-
rowList.add(row);
195-
}
196-
future.complete(new LookupResult(rowList));
197-
} catch (Exception e) {
198-
future.complete(new LookupResult(Collections.emptyList()));
199-
}
200-
});
201-
return future;
166+
});
167+
return lookupFuture;
202168
}
203169
}

fluss-client/src/main/java/org/apache/fluss/client/lookup/PrefixLookup.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,8 @@
2323
import java.util.List;
2424

2525
/**
26-
* Used to describe the operation to prefix lookup by {@link PrefixLookuper} to a primary key table.
26+
* Used to describe the operation to prefix lookup by {@link PrefixKeyLookuper} to a primary key
27+
* table.
2728
*
2829
* @since 0.6
2930
*/
@@ -37,7 +38,7 @@ public class PrefixLookup {
3738
* <p>For partitioned table, the lookupColumnNames exclude partition fields should be a prefix
3839
* of primary key exclude partition fields.
3940
*
40-
* <p>See {@link PrefixLookuper#prefixLookup(InternalRow)} for more details.
41+
* <p>See {@link PrefixKeyLookuper#lookup(InternalRow)} for more details.
4142
*/
4243
private final List<String> lookupColumnNames;
4344

0 commit comments

Comments
 (0)