Skip to content

Commit 1254958

Browse files
committed
poc
1 parent 6278726 commit 1254958

File tree

12 files changed

+1054
-2
lines changed

12 files changed

+1054
-2
lines changed

fluss-client/src/main/java/org/apache/fluss/client/table/scanner/Scan.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,4 +80,10 @@ public interface Scan {
8080
* #limit(int)} and only support for Primary Key Tables.
8181
*/
8282
BatchScanner createBatchScanner(TableBucket tableBucket, long snapshotId);
83+
84+
/**
85+
* Creates a LogScanner that streams KV values for the given table bucket using the
86+
* scanner-based KV scan API. This is intended for full KV value scans.
87+
*/
88+
LogScanner createKvValueScanner(TableBucket tableBucket);
8389
}

fluss-client/src/main/java/org/apache/fluss/client/table/scanner/TableScan.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,15 @@ public BatchScanner createBatchScanner(TableBucket tableBucket) {
110110
tableInfo, tableBucket, conn.getMetadataUpdater(), projectedColumns, limit);
111111
}
112112

113+
/**
114+
* Creates a BatchScanner that streams the entire KV RocksDB for the given bucket.
115+
* This method is intended for full-table snapshot style reads of KV tables.
116+
*/
117+
public BatchScanner createKvFullBatchScanner(TableBucket tableBucket) {
118+
return new org.apache.fluss.client.table.scanner.batch.KvFullScanBatchScanner(
119+
tableInfo, tableBucket, conn.getMetadataUpdater(), projectedColumns);
120+
}
121+
113122
@Override
114123
public BatchScanner createBatchScanner(TableBucket tableBucket, long snapshotId) {
115124
if (limit != null) {
@@ -135,4 +144,27 @@ public BatchScanner createBatchScanner(TableBucket tableBucket, long snapshotId)
135144
tableInfo.getTableConfig().getKvFormat(),
136145
conn.getOrCreateRemoteFileDownloader());
137146
}
147+
148+
/**
149+
* Creates a LogScanner that streams the entire KV RocksDB for the given bucket.
150+
* This is useful for full-table snapshot style reads with streaming semantics.
151+
*/
152+
public LogScanner createKvFullLogScanner(TableBucket tableBucket) {
153+
org.apache.fluss.client.table.scanner.log.KvFullScanLogScanner scanner =
154+
new org.apache.fluss.client.table.scanner.log.KvFullScanLogScanner(
155+
tableInfo, conn.getMetadataUpdater(), projectedColumns);
156+
if (tableBucket.getPartitionId() != null) {
157+
scanner.subscribe(tableBucket.getPartitionId(), tableBucket.getBucket(), LogScanner.EARLIEST_OFFSET);
158+
} else {
159+
scanner.subscribe(tableBucket.getBucket(), LogScanner.EARLIEST_OFFSET);
160+
}
161+
return scanner;
162+
}
163+
164+
@Override
165+
public LogScanner createKvValueScanner(TableBucket tableBucket) {
166+
return createKvFullLogScanner(tableBucket);
167+
}
138168
}
169+
170+
Lines changed: 211 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,211 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.client.table.scanner.batch;
19+
20+
import org.apache.fluss.client.metadata.MetadataUpdater;
21+
import org.apache.fluss.exception.LeaderNotAvailableException;
22+
import org.apache.fluss.metadata.TableBucket;
23+
import org.apache.fluss.metadata.TableInfo;
24+
import org.apache.fluss.record.DefaultValueRecordBatch;
25+
import org.apache.fluss.record.ValueRecord;
26+
import org.apache.fluss.record.ValueRecordReadContext;
27+
import org.apache.fluss.row.GenericRow;
28+
import org.apache.fluss.row.InternalRow;
29+
import org.apache.fluss.row.ProjectedRow;
30+
import org.apache.fluss.row.decode.RowDecoder;
31+
import org.apache.fluss.row.encode.ValueDecoder;
32+
import org.apache.fluss.rpc.gateway.TabletServerGateway;
33+
import org.apache.fluss.rpc.messages.PbPrefixLookupReqForBucket;
34+
import org.apache.fluss.rpc.messages.PbPrefixLookupRespForBucket;
35+
import org.apache.fluss.rpc.messages.PbValueList;
36+
import org.apache.fluss.rpc.messages.PrefixLookupRequest;
37+
import org.apache.fluss.rpc.messages.PrefixLookupResponse;
38+
import org.apache.fluss.types.DataType;
39+
import org.apache.fluss.types.RowType;
40+
import org.apache.fluss.utils.CloseableIterator;
41+
42+
import javax.annotation.Nullable;
43+
44+
import java.io.IOException;
45+
import java.nio.ByteBuffer;
46+
import java.time.Duration;
47+
import java.util.ArrayList;
48+
import java.util.Collections;
49+
import java.util.List;
50+
import java.util.concurrent.CompletableFuture;
51+
import java.util.concurrent.TimeUnit;
52+
import java.util.concurrent.TimeoutException;
53+
54+
/**
55+
* A BatchScanner that streams the entire RocksDB values for a KV table bucket using
56+
* PrefixLookup with an empty prefix. This enables clients to iterate all records in chunks
57+
* without introducing a new RPC.
58+
*/
59+
public class KvFullScanBatchScanner implements BatchScanner {
60+
61+
private static final int DEFAULT_BATCH_CHUNK_SIZE = 1024;
62+
63+
private final TableInfo tableInfo;
64+
private final TableBucket tableBucket;
65+
private final MetadataUpdater metadataUpdater;
66+
@Nullable private final int[] projectedFields;
67+
private final ValueDecoder kvValueDecoder;
68+
private final InternalRow.FieldGetter[] fieldGetters;
69+
70+
private final CompletableFuture<PrefixLookupResponse> responseFuture;
71+
private List<InternalRow> allRows; // decoded once upon first poll
72+
private int nextIndex;
73+
private final int chunkSize;
74+
private boolean endOfInput;
75+
76+
public KvFullScanBatchScanner(
77+
TableInfo tableInfo,
78+
TableBucket tableBucket,
79+
MetadataUpdater metadataUpdater,
80+
@Nullable int[] projectedFields) {
81+
this(tableInfo, tableBucket, metadataUpdater, projectedFields, DEFAULT_BATCH_CHUNK_SIZE);
82+
}
83+
84+
public KvFullScanBatchScanner(
85+
TableInfo tableInfo,
86+
TableBucket tableBucket,
87+
MetadataUpdater metadataUpdater,
88+
@Nullable int[] projectedFields,
89+
int chunkSize) {
90+
this.tableInfo = tableInfo;
91+
this.tableBucket = tableBucket;
92+
this.metadataUpdater = metadataUpdater;
93+
this.projectedFields = projectedFields;
94+
this.chunkSize = Math.max(1, chunkSize);
95+
this.nextIndex = 0;
96+
this.endOfInput = false;
97+
this.allRows = null;
98+
99+
RowType rowType = tableInfo.getRowType();
100+
this.kvValueDecoder =
101+
new ValueDecoder(
102+
RowDecoder.create(
103+
tableInfo.getTableConfig().getKvFormat(),
104+
rowType.getChildren().toArray(new DataType[0])));
105+
this.fieldGetters = new InternalRow.FieldGetter[rowType.getFieldCount()];
106+
for (int i = 0; i < rowType.getFieldCount(); i++) {
107+
this.fieldGetters[i] = InternalRow.createFieldGetter(rowType.getTypeAt(i), i);
108+
}
109+
110+
// Build PrefixLookupRequest with empty prefix to fetch all values for the bucket
111+
PrefixLookupRequest request = new PrefixLookupRequest().setTableId(tableBucket.getTableId());
112+
PbPrefixLookupReqForBucket pb = request.addBucketsReq().setBucketId(tableBucket.getBucket());
113+
if (tableBucket.getPartitionId() != null) {
114+
pb.setPartitionId(tableBucket.getPartitionId());
115+
// ensure leader metadata is up to date for partitioned case
116+
metadataUpdater.checkAndUpdateMetadata(tableInfo.getTablePath(), tableBucket);
117+
}
118+
// empty prefix means iterate entire DB
119+
pb.addKey(new byte[0]);
120+
121+
int leader = metadataUpdater.leaderFor(tableBucket);
122+
TabletServerGateway gateway = metadataUpdater.newTabletServerClientForNode(leader);
123+
if (gateway == null) {
124+
throw new LeaderNotAvailableException(
125+
"Server " + leader + " is not found in metadata cache.");
126+
}
127+
this.responseFuture = gateway.prefixLookup(request);
128+
}
129+
130+
@Nullable
131+
@Override
132+
public CloseableIterator<InternalRow> pollBatch(Duration timeout) throws IOException {
133+
if (endOfInput) {
134+
return null;
135+
}
136+
137+
try {
138+
if (allRows == null) {
139+
PrefixLookupResponse response =
140+
responseFuture.get(timeout.toMillis(), TimeUnit.MILLISECONDS);
141+
this.allRows = decodeResponse(response);
142+
}
143+
} catch (TimeoutException te) {
144+
return CloseableIterator.emptyIterator();
145+
} catch (Exception e) {
146+
throw new IOException(e);
147+
}
148+
149+
if (nextIndex >= allRows.size()) {
150+
endOfInput = true;
151+
return null;
152+
}
153+
154+
int toIndex = Math.min(allRows.size(), nextIndex + chunkSize);
155+
List<InternalRow> sub = allRows.subList(nextIndex, toIndex);
156+
nextIndex = toIndex;
157+
return CloseableIterator.wrap(sub.iterator());
158+
}
159+
160+
private List<InternalRow> decodeResponse(PrefixLookupResponse response) {
161+
List<InternalRow> rows = new ArrayList<>();
162+
for (PbPrefixLookupRespForBucket pbResp : response.getBucketsRespsList()) {
163+
if (pbResp.hasErrorCode()) {
164+
// return empty on error; upper layers may handle per-bucket errors differently
165+
return Collections.emptyList();
166+
}
167+
// For our request, there is exactly one value list corresponding to empty prefix
168+
for (int i = 0; i < pbResp.getValueListsCount(); i++) {
169+
PbValueList valueList = pbResp.getValueListAt(i);
170+
// Values are encoded as DefaultValueRecordBatch payloads
171+
for (int j = 0; j < valueList.getValuesCount(); j++) {
172+
byte[] recordBytes = valueList.getValueAt(j);
173+
if (recordBytes == null) {
174+
continue;
175+
}
176+
// Each element in value list is a row-encoded payload. It may be a batch or a single row.
177+
// We support both by first trying to parse as DefaultValueRecordBatch; if that fails,
178+
// fall back to direct row decode.
179+
// Currently Kv writes use DefaultValueRecordBatch, so we handle that efficiently.
180+
ByteBuffer buf = ByteBuffer.wrap(recordBytes);
181+
DefaultValueRecordBatch valueBatch = DefaultValueRecordBatch.pointToByteBuffer(buf);
182+
ValueRecordReadContext ctx = new ValueRecordReadContext(kvValueDecoder.getRowDecoder());
183+
for (ValueRecord vr : valueBatch.records(ctx)) {
184+
rows.add(maybeProject(vr.getRow()));
185+
}
186+
}
187+
}
188+
}
189+
return rows;
190+
}
191+
192+
private InternalRow maybeProject(InternalRow originRow) {
193+
// Deep copy the row first using field getters so that we detach from underlying buffers
194+
GenericRow newRow = new GenericRow(fieldGetters.length);
195+
for (int i = 0; i < fieldGetters.length; i++) {
196+
newRow.setField(i, fieldGetters[i].getFieldOrNull(originRow));
197+
}
198+
if (projectedFields != null) {
199+
ProjectedRow projected = ProjectedRow.from(projectedFields);
200+
projected.replaceRow(newRow);
201+
return projected;
202+
}
203+
return newRow;
204+
}
205+
206+
@Override
207+
public void close() throws IOException {
208+
// Best effort cancel to release waiting resources
209+
responseFuture.cancel(true);
210+
}
211+
}

0 commit comments

Comments
 (0)