Skip to content

Commit 4abbb06

Browse files
committed
extend poc
1 parent d2abf2b commit 4abbb06

File tree

6 files changed

+594
-0
lines changed

6 files changed

+594
-0
lines changed

fluss-client/src/main/java/org/apache/fluss/client/lookup/PrimaryKeyLookuper.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -295,6 +295,9 @@ private CompletableFuture<List<InternalRow>> executeFullScan(Optional<Long> part
295295

296296
FullScanRequest request = new FullScanRequest();
297297
request.setTableId(tableId);
298+
// bucket_id is required by the protocol, even though servers currently
299+
// ignore it for FULL_SCAN. Set a default bucket id to satisfy encoding.
300+
request.setBucketId(0);
298301

299302
if (partitionId != null) {
300303
request.setPartitionId(partitionId);

fluss-client/src/main/java/org/apache/fluss/client/table/scanner/TableScan.java

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,17 @@
2323
import org.apache.fluss.client.table.scanner.batch.BatchScanner;
2424
import org.apache.fluss.client.table.scanner.batch.KvSnapshotBatchScanner;
2525
import org.apache.fluss.client.table.scanner.batch.LimitBatchScanner;
26+
import org.apache.fluss.client.table.scanner.batch.FullScanBatchScanner;
2627
import org.apache.fluss.client.table.scanner.log.LogScanner;
2728
import org.apache.fluss.client.table.scanner.log.LogScannerImpl;
2829
import org.apache.fluss.config.ConfigOptions;
2930
import org.apache.fluss.exception.FlussRuntimeException;
31+
import org.apache.fluss.exception.TableNotPartitionedException;
32+
import org.apache.fluss.exception.PartitionNotExistException;
33+
import org.apache.fluss.metadata.PhysicalTablePath;
3034
import org.apache.fluss.metadata.TableBucket;
3135
import org.apache.fluss.metadata.TableInfo;
36+
import org.apache.fluss.metadata.TablePath;
3237
import org.apache.fluss.types.RowType;
3338

3439
import javax.annotation.Nullable;
@@ -135,4 +140,70 @@ public BatchScanner createBatchScanner(TableBucket tableBucket, long snapshotId)
135140
tableInfo.getTableConfig().getKvFormat(),
136141
conn.getOrCreateRemoteFileDownloader());
137142
}
143+
144+
/**
145+
* Create a BatchScanner that performs a FULL_SCAN over the entire (non-partitioned) table.
146+
* Allows chaining: table.newScan().createBatchScanner().snapshotAll()
147+
*/
148+
public BatchScanner createBatchScanner() {
149+
if (tableInfo.isPartitioned()) {
150+
throw new TableNotPartitionedException(
151+
"Table is partitioned. Please use createBatchScanner(partitionName)");
152+
}
153+
return new FullScanBatchScanner(tableInfo, conn.getMetadataUpdater(), null);
154+
}
155+
156+
/**
157+
* Create a BatchScanner that performs a FULL_SCAN over a specific partition of the table.
158+
* Allows chaining: table.newScan().createBatchScanner(partitionName).snapshotAll()
159+
*/
160+
public BatchScanner createBatchScanner(String partitionName) {
161+
if (!tableInfo.isPartitioned()) {
162+
throw new TableNotPartitionedException(
163+
"Table is not partitioned. Please use createBatchScanner().");
164+
}
165+
PhysicalTablePath physical = PhysicalTablePath.of(tableInfo.getTablePath(), partitionName);
166+
try {
167+
conn.getMetadataUpdater().checkAndUpdatePartitionMetadata(physical);
168+
} catch (PartitionNotExistException e) {
169+
throw e;
170+
}
171+
Long pid = conn.getMetadataUpdater().getPartitionId(physical).orElse(null);
172+
if (pid == null) {
173+
throw new IllegalStateException("Partition id not found for " + partitionName);
174+
}
175+
return new FullScanBatchScanner(tableInfo, conn.getMetadataUpdater(), pid);
176+
}
177+
178+
/**
179+
* Create a BatchScanner that performs a FULL_SCAN over the entire (non-partitioned) table.
180+
*/
181+
public BatchScanner createFullScanBatchScanner() {
182+
if (tableInfo.isPartitioned()) {
183+
throw new TableNotPartitionedException(
184+
"Table is partitioned. Please use createFullScanBatchScanner(partitionName)");
185+
}
186+
return new FullScanBatchScanner(tableInfo, conn.getMetadataUpdater(), null);
187+
}
188+
189+
/**
190+
* Create a BatchScanner that performs a FULL_SCAN over a specific partition of the table.
191+
*/
192+
public BatchScanner createFullScanBatchScanner(String partitionName) {
193+
if (!tableInfo.isPartitioned()) {
194+
throw new TableNotPartitionedException(
195+
"Table is not partitioned. Please use createFullScanBatchScanner().");
196+
}
197+
PhysicalTablePath physical = PhysicalTablePath.of(tableInfo.getTablePath(), partitionName);
198+
try {
199+
conn.getMetadataUpdater().checkAndUpdatePartitionMetadata(physical);
200+
} catch (PartitionNotExistException e) {
201+
throw e;
202+
}
203+
Long pid = conn.getMetadataUpdater().getPartitionId(physical).orElse(null);
204+
if (pid == null) {
205+
throw new IllegalStateException("Partition id not found for " + partitionName);
206+
}
207+
return new FullScanBatchScanner(tableInfo, conn.getMetadataUpdater(), pid);
208+
}
138209
}

fluss-client/src/main/java/org/apache/fluss/client/table/scanner/batch/BatchScanner.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@
2727
import java.io.Closeable;
2828
import java.io.IOException;
2929
import java.time.Duration;
30+
import java.util.List;
31+
import java.util.concurrent.CompletableFuture;
3032

3133
/**
3234
* The scanner that reads records form a table in a batch fashion. Compared to {@link LogScanner},
@@ -47,6 +49,23 @@ public interface BatchScanner extends Closeable {
4749
@Nullable
4850
CloseableIterator<InternalRow> pollBatch(Duration timeout) throws IOException;
4951

52+
/**
53+
* Perform a bounded snapshot and return all rows as a single collection.
54+
* Default implementation is unsupported; only specific scanners (e.g., full-scan) support it.
55+
*/
56+
default CompletableFuture<List<InternalRow>> snapshotAll() {
57+
throw new UnsupportedOperationException("snapshotAll is not supported by this scanner.");
58+
}
59+
60+
/**
61+
* Perform a bounded snapshot for a specific partition and return all rows as a single collection.
62+
* Default implementation is unsupported; only specific scanners (e.g., full-scan) support it.
63+
*/
64+
default CompletableFuture<List<InternalRow>> snapshotAllPartition(String partitionName) {
65+
throw new UnsupportedOperationException(
66+
"snapshotAllPartition is not supported by this scanner.");
67+
}
68+
5069
/** Closes the scanner and should release all resources. */
5170
@Override
5271
void close() throws IOException;
Lines changed: 201 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,201 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.client.table.scanner.batch;
19+
20+
import org.apache.fluss.annotation.Internal;
21+
import org.apache.fluss.client.metadata.MetadataUpdater;
22+
import org.apache.fluss.metadata.TableBucket;
23+
import org.apache.fluss.metadata.TableInfo;
24+
import org.apache.fluss.record.DefaultValueRecordBatch;
25+
import org.apache.fluss.record.ValueRecord;
26+
import org.apache.fluss.record.ValueRecordReadContext;
27+
import org.apache.fluss.row.InternalRow;
28+
import org.apache.fluss.row.decode.RowDecoder;
29+
import org.apache.fluss.row.encode.ValueDecoder;
30+
import org.apache.fluss.rpc.gateway.TabletServerGateway;
31+
import org.apache.fluss.rpc.messages.FullScanRequest;
32+
import org.apache.fluss.rpc.messages.FullScanResponse;
33+
import org.apache.fluss.rpc.protocol.Errors;
34+
import org.apache.fluss.types.DataType;
35+
import org.apache.fluss.utils.CloseableIterator;
36+
37+
import javax.annotation.Nullable;
38+
39+
import java.io.IOException;
40+
import java.nio.ByteBuffer;
41+
import java.time.Duration;
42+
import java.util.ArrayList;
43+
import java.util.HashSet;
44+
import java.util.List;
45+
import java.util.Objects;
46+
import java.util.concurrent.CompletableFuture;
47+
import java.util.concurrent.ExecutionException;
48+
import java.util.concurrent.TimeUnit;
49+
50+
/**
51+
* BatchScanner that performs a FULL_SCAN snapshot across all leaders of a KV table (optionally a
52+
* single partition) and exposes the result via pollBatch in a bounded fashion.
53+
*
54+
* <p>This moves the snapshotAll/snapshotAllPartition functionality previously implemented in
55+
* PrimaryKeyLookuper into a batch scanner form.
56+
*/
57+
@Internal
58+
public class FullScanBatchScanner implements BatchScanner {
59+
60+
private final TableInfo tableInfo;
61+
private final MetadataUpdater metadataUpdater;
62+
@Nullable private final Long partitionId;
63+
64+
private final ValueDecoder kvValueDecoder;
65+
66+
private final CompletableFuture<List<InternalRow>> rowsFuture;
67+
68+
private volatile boolean emitted = false;
69+
70+
public FullScanBatchScanner(
71+
TableInfo tableInfo, MetadataUpdater metadataUpdater, @Nullable Long partitionId) {
72+
this.tableInfo = Objects.requireNonNull(tableInfo, "tableInfo");
73+
this.metadataUpdater = Objects.requireNonNull(metadataUpdater, "metadataUpdater");
74+
this.partitionId = partitionId;
75+
76+
this.kvValueDecoder =
77+
new ValueDecoder(
78+
RowDecoder.create(
79+
tableInfo.getTableConfig().getKvFormat(),
80+
tableInfo.getRowType().getChildren().toArray(new DataType[0])));
81+
82+
this.rowsFuture = new CompletableFuture<>();
83+
// start async scan
84+
CompletableFuture.runAsync(this::startFullScan);
85+
}
86+
87+
private void startFullScan() {
88+
try {
89+
long tableId = tableInfo.getTableId();
90+
int numBuckets = tableInfo.getNumBuckets();
91+
92+
// Find leader tablet/servers for this table/partition
93+
HashSet<Integer> leaderServers = new HashSet<>();
94+
for (int bucketId = 0; bucketId < numBuckets; bucketId++) {
95+
TableBucket tableBucket = new TableBucket(tableId, partitionId, bucketId);
96+
metadataUpdater.checkAndUpdateMetadata(tableInfo.getTablePath(), tableBucket);
97+
int leader = metadataUpdater.leaderFor(tableBucket);
98+
leaderServers.add(leader);
99+
}
100+
101+
List<CompletableFuture<FullScanResponse>> responseFutures = new ArrayList<>();
102+
for (int leader : leaderServers) {
103+
TabletServerGateway gateway = metadataUpdater.newTabletServerClientForNode(leader);
104+
if (gateway == null) {
105+
rowsFuture.completeExceptionally(
106+
new IllegalStateException(
107+
"Leader server " + leader + " is not found in metadata cache."));
108+
return;
109+
}
110+
FullScanRequest request = new FullScanRequest();
111+
request.setTableId(tableId);
112+
// bucket_id is required by the protocol, ignored by server for FULL_SCAN
113+
request.setBucketId(0);
114+
if (partitionId != null) {
115+
request.setPartitionId(partitionId);
116+
}
117+
responseFutures.add(gateway.fullScan(request));
118+
}
119+
120+
CompletableFuture
121+
.allOf(responseFutures.toArray(new CompletableFuture[0]))
122+
.thenApply(v -> decodeFullScanResponses(responseFutures))
123+
.whenComplete(
124+
(rows, err) -> {
125+
if (err != null) {
126+
rowsFuture.completeExceptionally(err);
127+
} else {
128+
rowsFuture.complete(rows);
129+
}
130+
});
131+
} catch (Throwable t) {
132+
rowsFuture.completeExceptionally(t);
133+
}
134+
}
135+
136+
private List<InternalRow> decodeFullScanResponses(
137+
List<CompletableFuture<FullScanResponse>> responseFutures) {
138+
List<InternalRow> out = new ArrayList<>();
139+
for (CompletableFuture<FullScanResponse> responseFuture : responseFutures) {
140+
FullScanResponse response = responseFuture.join();
141+
if (response.hasErrorCode() && response.getErrorCode() != Errors.NONE.code()) {
142+
Errors err = Errors.forCode(response.getErrorCode());
143+
throw err.exception(
144+
response.hasErrorMessage() ? response.getErrorMessage() : err.message());
145+
}
146+
if (response.hasRecords()) {
147+
ByteBuffer buffer = ByteBuffer.wrap(response.getRecords());
148+
DefaultValueRecordBatch values = DefaultValueRecordBatch.pointToByteBuffer(buffer);
149+
ValueRecordReadContext context =
150+
new ValueRecordReadContext(kvValueDecoder.getRowDecoder());
151+
for (ValueRecord record : values.records(context)) {
152+
out.add(record.getRow());
153+
}
154+
}
155+
}
156+
return out;
157+
}
158+
159+
@Override
160+
public @Nullable CloseableIterator<InternalRow> pollBatch(Duration timeout) throws IOException {
161+
if (emitted) {
162+
return null;
163+
}
164+
try {
165+
List<InternalRow> rows;
166+
if (timeout == null || timeout.isZero() || timeout.isNegative()) {
167+
rows = rowsFuture.get();
168+
} else {
169+
rows = rowsFuture.get(timeout.toMillis(), TimeUnit.MILLISECONDS);
170+
}
171+
emitted = true;
172+
return CloseableIterator.wrap(rows.iterator());
173+
} catch (ExecutionException e) {
174+
Throwable cause = e.getCause() != null ? e.getCause() : e;
175+
if (cause instanceof RuntimeException) {
176+
throw (RuntimeException) cause;
177+
}
178+
throw new IOException("Failed to perform full scan", cause);
179+
} catch (Exception te) {
180+
// timeout or interruption -> return empty iterator to indicate no data yet
181+
return CloseableIterator.emptyIterator();
182+
}
183+
}
184+
185+
@Override
186+
public void close() throws IOException {
187+
// nothing to close
188+
}
189+
190+
// ---- New BatchScanner API implementations ----
191+
@Override
192+
public java.util.concurrent.CompletableFuture<java.util.List<InternalRow>> snapshotAll() {
193+
return rowsFuture;
194+
}
195+
196+
@Override
197+
public java.util.concurrent.CompletableFuture<java.util.List<InternalRow>> snapshotAllPartition(String partitionName) {
198+
throw new UnsupportedOperationException(
199+
"This scanner is already bound to a specific scope; use TableScan#createBatchScanner(partitionName) then snapshotAll().");
200+
}
201+
}

0 commit comments

Comments
 (0)