Skip to content

Commit 939efdf

Browse files
committed
【connnector] Fix leader not found when Fluss limit scan on partition table.
1 parent 00fff20 commit 939efdf

File tree

3 files changed

+56
-26
lines changed

3 files changed

+56
-26
lines changed

fluss-client/src/main/java/com/alibaba/fluss/client/table/FlussTable.java

Lines changed: 47 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
import com.alibaba.fluss.record.MemoryLogRecords;
5656
import com.alibaba.fluss.record.ValueRecord;
5757
import com.alibaba.fluss.record.ValueRecordReadContext;
58+
import com.alibaba.fluss.row.GenericRow;
5859
import com.alibaba.fluss.row.InternalRow;
5960
import com.alibaba.fluss.row.ProjectedRow;
6061
import com.alibaba.fluss.row.decode.RowDecoder;
@@ -211,17 +212,27 @@ private int getBucketId(byte[] keyBytes, InternalRow key) {
211212
@Override
212213
public CompletableFuture<List<ScanRecord>> limitScan(
213214
TableBucket tableBucket, int limit, @Nullable int[] projectedFields) {
214-
// because that rocksdb is not suitable to projection, thus do it in client.
215-
int leader = metadataUpdater.leaderFor(tableBucket);
215+
216216
LimitScanRequest limitScanRequest =
217217
new LimitScanRequest()
218218
.setTableId(tableBucket.getTableId())
219219
.setBucketId(tableBucket.getBucket())
220220
.setLimit(limit);
221+
221222
if (tableBucket.getPartitionId() != null) {
222223
limitScanRequest.setPartitionId(tableBucket.getPartitionId());
224+
metadataUpdater.checkAndUpdateMetadata(tablePath, tableBucket);
223225
}
226+
227+
// because that rocksdb is not suitable to projection, thus do it in client.
228+
int leader = metadataUpdater.leaderFor(tableBucket);
224229
TabletServerGateway gateway = metadataUpdater.newTabletServerClientForNode(leader);
230+
RowType rowType = tableInfo.getTableDescriptor().getSchema().toRowType();
231+
InternalRow.FieldGetter[] fieldGetters =
232+
new InternalRow.FieldGetter[rowType.getFieldCount()];
233+
for (int i = 0; i < rowType.getFieldCount(); i++) {
234+
fieldGetters[i] = InternalRow.createFieldGetter(rowType.getTypeAt(i), i);
235+
}
225236

226237
CompletableFuture<List<ScanRecord>> future = new CompletableFuture<>();
227238
gateway.limitScan(limitScanRequest)
@@ -233,7 +244,8 @@ public CompletableFuture<List<ScanRecord>> limitScan(
233244
limit,
234245
limitScantResponse,
235246
projectedFields,
236-
hasPrimaryKey));
247+
hasPrimaryKey,
248+
fieldGetters));
237249
} else {
238250
throw ApiError.fromErrorMessage(limitScantResponse).exception();
239251
}
@@ -250,7 +262,8 @@ private List<ScanRecord> parseLimitScanResponse(
250262
int limit,
251263
LimitScanResponse limitScanResponse,
252264
@Nullable int[] projectedFields,
253-
boolean hasPrimaryKey) {
265+
boolean hasPrimaryKey,
266+
InternalRow.FieldGetter[] fieldGetters) {
254267
List<ScanRecord> scanRecordList = new ArrayList<>();
255268
if (!limitScanResponse.hasRecords()) {
256269
return scanRecordList;
@@ -262,45 +275,54 @@ private List<ScanRecord> parseLimitScanResponse(
262275
ValueRecordReadContext readContext =
263276
new ValueRecordReadContext(kvValueDecoder.getRowDecoder());
264277
for (ValueRecord record : valueRecords.records(readContext)) {
265-
InternalRow originRow = record.getRow();
266-
if (projectedFields != null) {
267-
ProjectedRow row = ProjectedRow.from(projectedFields);
268-
row.replaceRow(originRow);
269-
scanRecordList.add(new ScanRecord(row));
270-
} else {
271-
scanRecordList.add(new ScanRecord(originRow));
272-
}
278+
addScanRecord(projectedFields, scanRecordList, record.getRow(), fieldGetters);
273279
}
274280
} else {
275281
LogRecordReadContext readContext =
276282
LogRecordReadContext.createReadContext(tableInfo, null);
277283
LogRecords records = MemoryLogRecords.pointToByteBuffer(recordsBuffer);
278284
for (LogRecordBatch logRecordBatch : records.batches()) {
279-
// A batch of log record maybe little more than limit, thus we need slice the last
280-
// limit number.
281-
CloseableIterator<LogRecord> logRecordIterator =
282-
logRecordBatch.records(readContext);
283-
while (logRecordIterator.hasNext()) {
284-
InternalRow originRow = logRecordIterator.next().getRow();
285-
if (projectedFields != null) {
286-
ProjectedRow row = ProjectedRow.from(projectedFields);
287-
row.replaceRow(originRow);
288-
scanRecordList.add(new ScanRecord(row));
289-
} else {
290-
scanRecordList.add(new ScanRecord(originRow));
285+
// A batch of log record maybe little more than limit, thus we need slice the
286+
// last limit number.
287+
try (CloseableIterator<LogRecord> logRecordIterator =
288+
logRecordBatch.records(readContext)) {
289+
while (logRecordIterator.hasNext()) {
290+
addScanRecord(
291+
projectedFields,
292+
scanRecordList,
293+
logRecordIterator.next().getRow(),
294+
fieldGetters);
291295
}
292296
}
293297
}
294-
295298
if (scanRecordList.size() > limit) {
296299
scanRecordList =
297300
scanRecordList.subList(
298301
scanRecordList.size() - limit, scanRecordList.size());
299302
}
300303
}
304+
301305
return scanRecordList;
302306
}
303307

308+
private void addScanRecord(
309+
@Nullable int[] projectedFields,
310+
List<ScanRecord> scanRecordList,
311+
InternalRow originRow,
312+
InternalRow.FieldGetter[] fieldGetters) {
313+
GenericRow newRow = new GenericRow(fieldGetters.length);
314+
for (int i = 0; i < fieldGetters.length; i++) {
315+
newRow.setField(i, fieldGetters[i].getFieldOrNull(originRow));
316+
}
317+
if (projectedFields != null) {
318+
ProjectedRow row = ProjectedRow.from(projectedFields);
319+
row.replaceRow(newRow);
320+
scanRecordList.add(new ScanRecord(row));
321+
} else {
322+
scanRecordList.add(new ScanRecord(newRow));
323+
}
324+
}
325+
304326
/**
305327
* Return the id of the partition the row belongs to. It'll try to update the metadata if the
306328
* partition doesn't exist. If the partition doesn't exist yet after update metadata, it'll

fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/utils/PushdownUtils.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -365,7 +365,7 @@ public static Collection<RowData> limitScan(
365365
.collect(Collectors.toList()));
366366
}
367367

368-
return limit < responseList.size() ? rowDataList.subList(0, (int) limit) : rowDataList;
368+
return limit < rowDataList.size() ? rowDataList.subList(0, (int) limit) : rowDataList;
369369
} catch (Exception e) {
370370
throw new FlussRuntimeException(e);
371371
}

fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/source/FlinkTableSourceBatchITCase.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -270,6 +270,14 @@ void testLimitLogTableScan() throws Exception {
270270
"+I[5, name5]");
271271
assertThat(collected).isSubsetOf(expected);
272272
assertThat(collected).hasSize(3);
273+
274+
// test partition table.
275+
String partitionTable = preparePartitionedLogTable();
276+
query = String.format("SELECT id, name FROM %s limit 3", partitionTable);
277+
iterRows = tEnv.executeSql(query).collect();
278+
collected = assertAndCollectRecords(iterRows, 3);
279+
assertThat(collected).isSubsetOf(expected);
280+
assertThat(collected).hasSize(3);
273281
}
274282

275283
@ParameterizedTest

0 commit comments

Comments
 (0)