Skip to content

Commit 19fa00d

Browse files
committed
make Lookup requests async
1 parent e36f533 commit 19fa00d

File tree

13 files changed

+436
-120
lines changed

13 files changed

+436
-120
lines changed
Lines changed: 180 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,180 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.client.lookup;
19+
20+
import org.apache.fluss.client.metadata.MetadataUpdater;
21+
import org.apache.fluss.memory.MemorySegment;
22+
import org.apache.fluss.metadata.Schema;
23+
import org.apache.fluss.metadata.SchemaGetter;
24+
import org.apache.fluss.metadata.SchemaInfo;
25+
import org.apache.fluss.metadata.TableInfo;
26+
import org.apache.fluss.row.InternalRow;
27+
import org.apache.fluss.row.decode.FixedSchemaDecoder;
28+
import org.apache.fluss.utils.CopyOnWriteMap;
29+
import org.apache.fluss.utils.concurrent.FutureUtils;
30+
31+
import java.util.ArrayList;
32+
import java.util.Collection;
33+
import java.util.HashMap;
34+
import java.util.HashSet;
35+
import java.util.List;
36+
import java.util.Map;
37+
import java.util.Set;
38+
import java.util.concurrent.CompletableFuture;
39+
40+
import static org.apache.fluss.utils.Preconditions.checkArgument;
41+
42+
/** Abstract lookuper implementation for common methods. */
43+
abstract class AbstractLookuper implements Lookuper {
44+
protected final TableInfo tableInfo;
45+
46+
protected final MetadataUpdater metadataUpdater;
47+
48+
protected final LookupClient lookupClient;
49+
50+
protected final short targetSchemaId;
51+
52+
private final SchemaGetter schemaGetter;
53+
54+
/**
55+
* Cache for row decoders for different schema ids. Use CopyOnWriteMap for fast access, as it is
56+
* not frequently updated.
57+
*/
58+
private final CopyOnWriteMap<Short, FixedSchemaDecoder> decoders;
59+
60+
AbstractLookuper(
61+
TableInfo tableInfo,
62+
MetadataUpdater metadataUpdater,
63+
LookupClient lookupClient,
64+
SchemaGetter schemaGetter) {
65+
this.tableInfo = tableInfo;
66+
this.metadataUpdater = metadataUpdater;
67+
this.lookupClient = lookupClient;
68+
this.targetSchemaId = (short) tableInfo.getSchemaId();
69+
this.schemaGetter = schemaGetter;
70+
this.decoders = new CopyOnWriteMap<>();
71+
// initialize the decoder for the same schema
72+
this.decoders.put(
73+
targetSchemaId,
74+
new FixedSchemaDecoder(
75+
tableInfo.getTableConfig().getKvFormat(), tableInfo.getSchema()));
76+
}
77+
78+
protected void handleLookupResponse(
79+
List<byte[]> result, CompletableFuture<LookupResult> lookupFuture) {
80+
List<MemorySegment> valueList = new ArrayList<>(result.size());
81+
Set<Short> schemaIdsToRequest = new HashSet<>();
82+
boolean allTargetSchema = true;
83+
for (byte[] valueBytes : result) {
84+
if (valueBytes == null) {
85+
continue;
86+
}
87+
MemorySegment memorySegment = MemorySegment.wrap(valueBytes);
88+
short schemaId = memorySegment.getShort(0);
89+
if (targetSchemaId != schemaId) {
90+
allTargetSchema = false;
91+
if (!decoders.containsKey(schemaId)) {
92+
schemaIdsToRequest.add(schemaId);
93+
}
94+
}
95+
valueList.add(memorySegment);
96+
}
97+
// all schema ids are the target schema id, fast path
98+
if (allTargetSchema) {
99+
lookupFuture.complete(processAllTargetSchemaRows(valueList));
100+
return;
101+
}
102+
103+
// all schema ids and decoders are cached in local
104+
if (schemaIdsToRequest.isEmpty()) {
105+
lookupFuture.complete(processSchemaMismatchedRows(valueList));
106+
return;
107+
}
108+
109+
// need to fetch schema infos for schema ids
110+
List<CompletableFuture<SchemaInfo>> schemaFutures =
111+
new ArrayList<>(schemaIdsToRequest.size());
112+
for (short schemaId : schemaIdsToRequest) {
113+
CompletableFuture<SchemaInfo> schemaFuture = schemaGetter.getSchemaInfoAsync(schemaId);
114+
schemaFutures.add(schemaFuture);
115+
}
116+
FutureUtils.ConjunctFuture<Collection<SchemaInfo>> allFutures =
117+
FutureUtils.combineAll(schemaFutures);
118+
// normal async path
119+
allFutures.whenComplete(
120+
(schemas, error) -> {
121+
if (error != null) {
122+
lookupFuture.completeExceptionally(
123+
new RuntimeException(
124+
"Failed to get schema infos for prefix lookup", error));
125+
} else {
126+
LookupResult lookupResult = processSchemaRequestedRows(schemas, valueList);
127+
lookupFuture.complete(lookupResult);
128+
}
129+
});
130+
}
131+
132+
protected LookupResult processAllTargetSchemaRows(List<MemorySegment> valueList) {
133+
FixedSchemaDecoder decoder = decoders.get(targetSchemaId);
134+
List<InternalRow> rowList = new ArrayList<>(valueList.size());
135+
for (MemorySegment value : valueList) {
136+
rowList.add(decoder.decode(value));
137+
}
138+
return new LookupResult(rowList);
139+
}
140+
141+
protected LookupResult processSchemaMismatchedRows(List<MemorySegment> valueList) {
142+
List<InternalRow> rowList = new ArrayList<>(valueList.size());
143+
for (MemorySegment value : valueList) {
144+
short schemaId = value.getShort(0);
145+
FixedSchemaDecoder decoder = decoders.get(schemaId);
146+
checkArgument(decoder != null, "Decoder for schema id %s not found", schemaId);
147+
InternalRow row = decoder.decode(value);
148+
rowList.add(row);
149+
}
150+
return new LookupResult(rowList);
151+
}
152+
153+
protected LookupResult processSchemaRequestedRows(
154+
Collection<SchemaInfo> schemaInfos, List<MemorySegment> valueList) {
155+
// build a map from schema id to the target schema decoder
156+
Map<Short, Schema> schemaMap = new HashMap<>();
157+
for (SchemaInfo schemaInfo : schemaInfos) {
158+
schemaMap.put((short) schemaInfo.getSchemaId(), schemaInfo.getSchema());
159+
}
160+
161+
// process the value list to convert to target schema
162+
List<InternalRow> rowList = new ArrayList<>(valueList.size());
163+
for (MemorySegment value : valueList) {
164+
short schemaId = value.getShort(0);
165+
FixedSchemaDecoder decoder =
166+
decoders.computeIfAbsent(
167+
schemaId,
168+
(id) -> {
169+
Schema sourceSchema = schemaMap.get(id);
170+
return new FixedSchemaDecoder(
171+
tableInfo.getTableConfig().getKvFormat(),
172+
sourceSchema,
173+
tableInfo.getSchema());
174+
});
175+
InternalRow row = decoder.decode(value);
176+
rowList.add(row);
177+
}
178+
return new LookupResult(rowList);
179+
}
180+
}

fluss-client/src/main/java/org/apache/fluss/client/lookup/PrefixKeyLookuper.java

Lines changed: 16 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,11 @@
2222
import org.apache.fluss.client.table.getter.PartitionGetter;
2323
import org.apache.fluss.exception.PartitionNotExistException;
2424
import org.apache.fluss.metadata.DataLakeFormat;
25-
import org.apache.fluss.metadata.Schema;
2625
import org.apache.fluss.metadata.SchemaGetter;
2726
import org.apache.fluss.metadata.TableBucket;
2827
import org.apache.fluss.metadata.TableInfo;
2928
import org.apache.fluss.row.InternalRow;
30-
import org.apache.fluss.row.ProjectedRow;
3129
import org.apache.fluss.row.encode.KeyEncoder;
32-
import org.apache.fluss.row.encode.ValueDecoder;
3330
import org.apache.fluss.types.RowType;
3431

3532
import javax.annotation.Nullable;
@@ -49,13 +46,7 @@
4946
* of the primary key.
5047
*/
5148
@NotThreadSafe
52-
class PrefixKeyLookuper implements Lookuper {
53-
54-
private final TableInfo tableInfo;
55-
56-
private final MetadataUpdater metadataUpdater;
57-
58-
private final LookupClient lookupClient;
49+
class PrefixKeyLookuper extends AbstractLookuper {
5950

6051
/** Extract bucket key from prefix lookup key row. */
6152
private final KeyEncoder bucketKeyEncoder;
@@ -68,24 +59,16 @@ class PrefixKeyLookuper implements Lookuper {
6859
*/
6960
private @Nullable final PartitionGetter partitionGetter;
7061

71-
/** Decode the lookup bytes to result row. */
72-
private final ValueDecoder kvValueDecoder;
73-
74-
private final SchemaGetter schemaGetter;
75-
7662
public PrefixKeyLookuper(
7763
TableInfo tableInfo,
7864
SchemaGetter schemaGetter,
7965
MetadataUpdater metadataUpdater,
8066
LookupClient lookupClient,
8167
List<String> lookupColumnNames) {
68+
super(tableInfo, metadataUpdater, lookupClient, schemaGetter);
8269
// sanity check
8370
validatePrefixLookup(tableInfo, lookupColumnNames);
84-
// initialization
85-
this.tableInfo = tableInfo;
8671
this.numBuckets = tableInfo.getNumBuckets();
87-
this.metadataUpdater = metadataUpdater;
88-
this.lookupClient = lookupClient;
8972
// the row type of the input lookup row
9073
RowType lookupRowType = tableInfo.getRowType().project(lookupColumnNames);
9174
DataLakeFormat lakeFormat = tableInfo.getTableConfig().getDataLakeFormat().orElse(null);
@@ -96,9 +79,6 @@ public PrefixKeyLookuper(
9679
tableInfo.isPartitioned()
9780
? new PartitionGetter(lookupRowType, tableInfo.getPartitionKeys())
9881
: null;
99-
this.kvValueDecoder =
100-
new ValueDecoder(schemaGetter, tableInfo.getTableConfig().getKvFormat());
101-
this.schemaGetter = schemaGetter;
10282
}
10383

10484
private void validatePrefixLookup(TableInfo tableInfo, List<String> lookupColumns) {
@@ -168,36 +148,22 @@ public CompletableFuture<LookupResult> lookup(InternalRow prefixKey) {
168148
}
169149
}
170150

151+
CompletableFuture<LookupResult> lookupFuture = new CompletableFuture<>();
171152
TableBucket tableBucket = new TableBucket(tableInfo.getTableId(), partitionId, bucketId);
172-
CompletableFuture<LookupResult> future = new CompletableFuture<>();
173-
174-
CompletableFuture.runAsync(
175-
() -> {
176-
try {
177-
List<byte[]> result =
178-
lookupClient.prefixLookup(tableBucket, bucketKeyBytes).get();
179-
List<InternalRow> rowList = new ArrayList<>(result.size());
180-
for (byte[] valueBytes : result) {
181-
if (valueBytes == null) {
182-
continue;
183-
}
184-
ValueDecoder.Value value = kvValueDecoder.decodeValue(valueBytes);
185-
InternalRow row;
186-
if (value.schemaId == tableInfo.getSchemaId()) {
187-
row = value.row;
153+
lookupClient
154+
.prefixLookup(tableBucket, bucketKeyBytes)
155+
.whenComplete(
156+
(result, error) -> {
157+
if (error != null) {
158+
lookupFuture.completeExceptionally(
159+
new RuntimeException(
160+
"Failed to perform prefix lookup for table: "
161+
+ tableInfo.getTablePath(),
162+
error));
188163
} else {
189-
Schema schema = schemaGetter.getSchema(value.schemaId);
190-
row =
191-
ProjectedRow.from(schema, tableInfo.getSchema())
192-
.replaceRow(value.row);
164+
handleLookupResponse(result, lookupFuture);
193165
}
194-
rowList.add(row);
195-
}
196-
future.complete(new LookupResult(rowList));
197-
} catch (Exception e) {
198-
future.complete(new LookupResult(Collections.emptyList()));
199-
}
200-
});
201-
return future;
166+
});
167+
return lookupFuture;
202168
}
203169
}

fluss-client/src/main/java/org/apache/fluss/client/lookup/PrefixLookup.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,8 @@
2323
import java.util.List;
2424

2525
/**
26-
* Used to describe the operation to prefix lookup by {@link PrefixLookuper} to a primary key table.
26+
* Used to describe the operation to prefix lookup by {@link PrefixKeyLookuper} to a primary key
27+
* table.
2728
*
2829
* @since 0.6
2930
*/
@@ -37,7 +38,7 @@ public class PrefixLookup {
3738
* <p>For partitioned table, the lookupColumnNames exclude partition fields should be a prefix
3839
* of primary key exclude partition fields.
3940
*
40-
* <p>See {@link PrefixLookuper#prefixLookup(InternalRow)} for more details.
41+
* <p>See {@link PrefixKeyLookuper#lookup(InternalRow)} for more details.
4142
*/
4243
private final List<String> lookupColumnNames;
4344

0 commit comments

Comments
 (0)