Skip to content

Commit 6abe903

Browse files
committed
Fix lookup and get schema deadlock.
1 parent 24bec29 commit 6abe903

File tree

3 files changed

+53
-42
lines changed

3 files changed

+53
-42
lines changed

fluss-client/src/main/java/org/apache/fluss/client/lookup/PrefixKeyLookuper.java

Lines changed: 29 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -169,28 +169,35 @@ public CompletableFuture<LookupResult> lookup(InternalRow prefixKey) {
169169
}
170170

171171
TableBucket tableBucket = new TableBucket(tableInfo.getTableId(), partitionId, bucketId);
172-
return lookupClient
173-
.prefixLookup(tableBucket, bucketKeyBytes)
174-
.thenApply(
175-
result -> {
176-
List<InternalRow> rowList = new ArrayList<>(result.size());
177-
for (byte[] valueBytes : result) {
178-
if (valueBytes == null) {
179-
continue;
180-
}
181-
ValueDecoder.Value value = kvValueDecoder.decodeValue(valueBytes);
182-
InternalRow row;
183-
if (value.schemaId == tableInfo.getSchemaId()) {
184-
row = value.row;
185-
} else {
186-
Schema schema = schemaGetter.getSchema(value.schemaId);
187-
row =
188-
ProjectedRow.from(schema, tableInfo.getSchema())
189-
.replaceRow(value.row);
190-
}
191-
rowList.add(row);
172+
CompletableFuture<LookupResult> future = new CompletableFuture<>();
173+
174+
CompletableFuture.runAsync(
175+
() -> {
176+
try {
177+
List<byte[]> result =
178+
lookupClient.prefixLookup(tableBucket, bucketKeyBytes).get();
179+
List<InternalRow> rowList = new ArrayList<>(result.size());
180+
for (byte[] valueBytes : result) {
181+
if (valueBytes == null) {
182+
continue;
192183
}
193-
return new LookupResult(rowList);
194-
});
184+
ValueDecoder.Value value = kvValueDecoder.decodeValue(valueBytes);
185+
InternalRow row;
186+
if (value.schemaId == tableInfo.getSchemaId()) {
187+
row = value.row;
188+
} else {
189+
Schema schema = schemaGetter.getSchema(value.schemaId);
190+
row =
191+
ProjectedRow.from(schema, tableInfo.getSchema())
192+
.replaceRow(value.row);
193+
}
194+
rowList.add(row);
195+
}
196+
future.complete(new LookupResult(rowList));
197+
} catch (Exception e) {
198+
future.complete(new LookupResult(Collections.emptyList()));
199+
}
200+
});
201+
return future;
195202
}
196203
}

fluss-client/src/main/java/org/apache/fluss/client/lookup/PrimaryKeyLookuper.java

Lines changed: 24 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -132,25 +132,30 @@ public CompletableFuture<LookupResult> lookup(InternalRow lookupKey) {
132132

133133
int bucketId = bucketingFunction.bucketing(bkBytes, numBuckets);
134134
TableBucket tableBucket = new TableBucket(tableInfo.getTableId(), partitionId, bucketId);
135-
lookupClient
136-
.lookup(tableBucket, pkBytes)
137-
.thenApply(
138-
valueBytes -> {
139-
InternalRow row = null;
140-
if (valueBytes != null) {
141-
ValueDecoder.Value value = kvValueDecoder.decodeValue(valueBytes);
142-
if (value.schemaId == tableInfo.getSchemaId()) {
143-
row = value.row;
144-
} else {
145-
Schema schema = schemaGetter.getSchema(value.schemaId);
146-
checkNotNull(schema, "schema is null");
147-
row =
148-
ProjectedRow.from(schema, tableInfo.getSchema())
149-
.replaceRow(value.row);
150-
}
135+
CompletableFuture<LookupResult> future = new CompletableFuture<>();
136+
137+
CompletableFuture.runAsync(
138+
() -> {
139+
try {
140+
byte[] valueBytes = lookupClient.lookup(tableBucket, pkBytes).get();
141+
InternalRow row = null;
142+
if (valueBytes != null) {
143+
ValueDecoder.Value value = kvValueDecoder.decodeValue(valueBytes);
144+
if (value.schemaId == tableInfo.getSchemaId()) {
145+
row = value.row;
146+
} else {
147+
Schema schema = schemaGetter.getSchema(value.schemaId);
148+
checkNotNull(schema, "schema is null");
149+
row =
150+
ProjectedRow.from(schema, tableInfo.getSchema())
151+
.replaceRow(value.row);
151152
}
152-
153-
return new LookupResult(row);
154-
});
153+
}
154+
future.complete(new LookupResult(row));
155+
} catch (Exception e) {
156+
future.completeExceptionally(e);
157+
}
158+
});
159+
return future;
155160
}
156161
}

fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/lookup/FlinkLookupFunctionTest.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,6 @@ void testSchemaChange() throws Exception {
195195
lookupFunction.close();
196196

197197
// start lookup job after schema change.
198-
admin.getTableSchema(tablePath, 1).get();
199198
lookupFunction =
200199
new FlinkLookupFunction(
201200
clientConf,

0 commit comments

Comments
 (0)