Skip to content

Commit 1489f14

Browse files
committed
add tests
1 parent e7b1bd0 commit 1489f14

File tree

9 files changed

+631
-370
lines changed

9 files changed

+631
-370
lines changed

.idea/vcs.xml

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

fluss-client/src/main/java/org/apache/fluss/client/lookup/Lookuper.java

Lines changed: 5 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -20,16 +20,16 @@
2020
import org.apache.fluss.annotation.PublicEvolving;
2121
import org.apache.fluss.row.InternalRow;
2222

23-
import java.util.List;
2423
import java.util.concurrent.CompletableFuture;
2524

2625
/**
27-
* The lookuper is used to fetch rows from a table either by primary key or by a key prefix,
28-
* depending on how it is created.
26+
* Lookuper performs point lookups by primary key or key prefix.
2927
*
3028
* <p>Instances are created via {@link Lookup} builders (e.g., {@code table.newLookup()}) and can
31-
* target primary-key lookups (exact match) or prefix-key lookups. Some operations are only
32-
* supported for primary-key tables and will throw {@link UnsupportedOperationException} otherwise.
29+
* target primary-key lookups (exact match) or prefix-key lookups.
30+
*
31+
* <p>Note: Full scans are not part of the Lookuper API. Use the Scan API instead: {@code
32+
* table.newScan().createBatchScanner(...)}.
3333
*
3434
* @since 0.6
3535
*/
@@ -53,36 +53,4 @@ public interface Lookuper {
5353
* empty if not found; the future may complete exceptionally on RPC or server errors
5454
*/
5555
CompletableFuture<LookupResult> lookup(InternalRow lookupKey);
56-
57-
/**
58-
* Returns all current values of a primary-key KV table using a point-in-time snapshot. This is
59-
* not a streaming/cursor operation; it materializes the full result as a list.
60-
*
61-
* <p>For partitioned tables, prefer {@link #snapshotAllPartition(String)}.
62-
*
63-
* @return a future completing with an unordered list of {@link InternalRow} representing the
64-
* current values; the future may complete exceptionally if unsupported by the lookuper or
65-
* on RPC/server errors
66-
* @throws UnsupportedOperationException if the implementation does not support snapshot reads
67-
*/
68-
default CompletableFuture<List<InternalRow>> snapshotAll() {
69-
throw new UnsupportedOperationException(
70-
"snapshotAll() is only supported for primary key lookuper.");
71-
}
72-
73-
/**
74-
* Returns all current values for the specified partition of a partitioned primary-key table,
75-
* using a point-in-time snapshot.
76-
*
77-
* @param partitionName the partition identifier (e.g., a date-based partition value)
78-
* @return a future completing with an unordered list of {@link InternalRow} for that partition;
79-
* the future may complete exceptionally if unsupported by the lookuper or on RPC/server
80-
* errors
81-
* @throws UnsupportedOperationException if the implementation does not support partition
82-
* snapshot reads
83-
*/
84-
default CompletableFuture<List<InternalRow>> snapshotAllPartition(String partitionName) {
85-
throw new UnsupportedOperationException(
86-
"snapshotAllPartition() is not supported by this lookuper.");
87-
}
8856
}

fluss-client/src/main/java/org/apache/fluss/client/lookup/PrimaryKeyLookuper.java

Lines changed: 2 additions & 140 deletions
Original file line numberDiff line numberDiff line change
@@ -20,36 +20,20 @@
2020
import org.apache.fluss.bucketing.BucketingFunction;
2121
import org.apache.fluss.client.metadata.MetadataUpdater;
2222
import org.apache.fluss.client.table.getter.PartitionGetter;
23-
import org.apache.fluss.exception.LeaderNotAvailableException;
2423
import org.apache.fluss.exception.PartitionNotExistException;
25-
import org.apache.fluss.exception.TableNotPartitionedException;
2624
import org.apache.fluss.metadata.DataLakeFormat;
27-
import org.apache.fluss.metadata.PhysicalTablePath;
2825
import org.apache.fluss.metadata.TableBucket;
2926
import org.apache.fluss.metadata.TableInfo;
30-
import org.apache.fluss.metadata.TablePath;
31-
import org.apache.fluss.record.DefaultValueRecordBatch;
32-
import org.apache.fluss.record.ValueRecord;
33-
import org.apache.fluss.record.ValueRecordReadContext;
3427
import org.apache.fluss.row.InternalRow;
3528
import org.apache.fluss.row.decode.RowDecoder;
3629
import org.apache.fluss.row.encode.KeyEncoder;
3730
import org.apache.fluss.row.encode.ValueDecoder;
38-
import org.apache.fluss.rpc.gateway.TabletServerGateway;
39-
import org.apache.fluss.rpc.messages.FullScanRequest;
40-
import org.apache.fluss.rpc.messages.FullScanResponse;
41-
import org.apache.fluss.rpc.protocol.Errors;
4231
import org.apache.fluss.types.DataType;
4332
import org.apache.fluss.types.RowType;
4433

4534
import javax.annotation.Nullable;
4635

47-
import java.nio.ByteBuffer;
48-
import java.util.ArrayList;
4936
import java.util.Collections;
50-
import java.util.HashSet;
51-
import java.util.List;
52-
import java.util.Optional;
5337
import java.util.concurrent.CompletableFuture;
5438

5539
import static org.apache.fluss.client.utils.ClientUtils.getPartitionId;
@@ -58,10 +42,8 @@
5842
/**
5943
* Client-side lookuper implementation for primary-key tables.
6044
*
61-
* <p>This class supports single-key lookups as well as bounded, point-in-time snapshot reads that
62-
* return all current values of a KV table (and per-partition for partitioned tables). Snapshot
63-
* reads are executed via the FULL_SCAN RPC and aggregate results across the leaders of all buckets
64-
* on each target server.
45+
* <p>This class supports single-key lookups. For any scan semantics (full scan, limited scan, or
46+
* snapshot-by-id), use the Scan API instead.
6547
*/
6648
class PrimaryKeyLookuper implements Lookuper {
6749

@@ -185,124 +167,4 @@ public CompletableFuture<LookupResult> lookup(InternalRow lookupKey) {
185167
*
186168
* @return a future with an unordered list of current rows
187169
*/
188-
@Override
189-
public CompletableFuture<List<InternalRow>> snapshotAll() {
190-
if (tableInfo.isPartitioned()) {
191-
CompletableFuture<List<InternalRow>> result = new CompletableFuture<>();
192-
result.completeExceptionally(
193-
new TableNotPartitionedException(
194-
"Table is partitioned. Please use snapshotAllPartition(partitionName)."));
195-
196-
return result;
197-
}
198-
199-
return executeFullScan(Optional.empty());
200-
}
201-
202-
/**
203-
* Returns all current values for the specified partition of a partitioned primary-key table
204-
* using a point-in-time snapshot.
205-
*
206-
* @param partitionName the partition identifier (for example, a date string)
207-
* @return a future with an unordered list of current rows in the partition
208-
*/
209-
@Override
210-
public CompletableFuture<List<InternalRow>> snapshotAllPartition(String partitionName) {
211-
if (!tableInfo.isPartitioned()) {
212-
CompletableFuture<List<InternalRow>> result = new CompletableFuture<>();
213-
result.completeExceptionally(
214-
new TableNotPartitionedException(
215-
"Table is not partitioned. Please use snapshotAll()."));
216-
return result;
217-
}
218-
219-
// Resolve partition id from name
220-
TablePath tablePath = tableInfo.getTablePath();
221-
PhysicalTablePath physicalTablePath = PhysicalTablePath.of(tablePath, partitionName);
222-
223-
try {
224-
metadataUpdater.checkAndUpdatePartitionMetadata(physicalTablePath);
225-
} catch (PartitionNotExistException e) {
226-
CompletableFuture<List<InternalRow>> result = new CompletableFuture<>();
227-
result.completeExceptionally(e);
228-
return result;
229-
}
230-
231-
Optional<Long> partitionId = metadataUpdater.getPartitionId(physicalTablePath);
232-
return executeFullScan(partitionId);
233-
}
234-
235-
/**
236-
* Decodes and aggregates FULL_SCAN RPC responses into a list of rows.
237-
*
238-
* <p>Throws a mapped client exception if any response contains an error.
239-
*
240-
* @param responseFutures futures of server responses per target leader node
241-
* @return aggregated, unordered rows
242-
*/
243-
private List<InternalRow> decodeFullScanResponses(
244-
List<CompletableFuture<FullScanResponse>> responseFutures) {
245-
List<InternalRow> out = new ArrayList<>();
246-
for (CompletableFuture<FullScanResponse> responseFuture : responseFutures) {
247-
FullScanResponse response = responseFuture.join();
248-
249-
if (response.hasErrorCode() && response.getErrorCode() != Errors.NONE.code()) {
250-
Errors err = Errors.forCode(response.getErrorCode());
251-
throw err.exception(
252-
response.hasErrorMessage() ? response.getErrorMessage() : err.message());
253-
}
254-
255-
if (response.hasRecords()) {
256-
ByteBuffer buffer = ByteBuffer.wrap(response.getRecords());
257-
DefaultValueRecordBatch values = DefaultValueRecordBatch.pointToByteBuffer(buffer);
258-
259-
ValueRecordReadContext context =
260-
new ValueRecordReadContext(kvValueDecoder.getRowDecoder());
261-
for (ValueRecord record : values.records(context)) {
262-
out.add(record.getRow());
263-
}
264-
}
265-
}
266-
return out;
267-
}
268-
269-
private CompletableFuture<List<InternalRow>> executeFullScan(Optional<Long> partitionIdOpt) {
270-
Long partitionId = partitionIdOpt.orElse(null);
271-
long tableId = tableInfo.getTableId();
272-
int numBuckets = tableInfo.getNumBuckets();
273-
274-
// Find leader tablet/servers for this table/partition
275-
HashSet<Integer> leaderServers = new HashSet<>();
276-
for (int bucketId = 0; bucketId < numBuckets; bucketId++) {
277-
TableBucket tableBucket = new TableBucket(tableId, partitionId, bucketId);
278-
279-
// ensure metadata for this bucket is present
280-
metadataUpdater.checkAndUpdateMetadata(tableInfo.getTablePath(), tableBucket);
281-
int leader = metadataUpdater.leaderFor(tableBucket);
282-
leaderServers.add(leader);
283-
}
284-
285-
List<CompletableFuture<FullScanResponse>> responseFutures = new ArrayList<>();
286-
for (int leader : leaderServers) {
287-
TabletServerGateway gateway = metadataUpdater.newTabletServerClientForNode(leader);
288-
if (gateway == null) {
289-
CompletableFuture<List<InternalRow>> result = new CompletableFuture<>();
290-
result.completeExceptionally(
291-
new LeaderNotAvailableException(
292-
"Leader server " + leader + " is not found in metadata cache."));
293-
return result;
294-
}
295-
296-
FullScanRequest request = new FullScanRequest();
297-
request.setTableId(tableId);
298-
299-
if (partitionId != null) {
300-
request.setPartitionId(partitionId);
301-
}
302-
responseFutures.add(gateway.fullScan(request));
303-
}
304-
// Decode all responses and aggregate rows
305-
return CompletableFuture.allOf(responseFutures.toArray(new CompletableFuture[0]))
306-
.thenApply(v -> decodeFullScanResponses(responseFutures));
307-
}
308170
}

fluss-client/src/main/java/org/apache/fluss/client/table/scanner/Scan.java

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -76,16 +76,27 @@ public interface Scan {
7676
/**
7777
* Creates a {@link BatchScanner} to read current data in the given table bucket for this scan.
7878
*
79-
* <p>Note: this API doesn't support pre-configured with {@link #project}.
79+
* <p>Notes:
80+
*
81+
* <ul>
82+
* <li>Projection configured via {@link #project} is supported client-side.
83+
* <li>For full scans (when {@link #limit(int)} is not set), the provided {@link TableBucket}
84+
* is used only to identify the table (and optional partition); the bucket id is ignored.
85+
* </ul>
8086
*/
8187
BatchScanner createBatchScanner(TableBucket tableBucket);
8288

8389
/**
8490
* Creates a {@link BatchScanner} to read given snapshot data in the given table bucket for this
8591
* scan.
8692
*
87-
* <p>Note: this API doesn't support pre-configured with {@link #project} and {@link
88-
* #limit(int)} and only support for Primary Key Tables.
93+
* <p>Notes:
94+
*
95+
* <ul>
96+
* <li>Projection configured via {@link #project} is supported client-side.
97+
* <li>{@link #limit(int)} is not supported on snapshot scans.
98+
* <li>Only supported for primary-key tables.
99+
* </ul>
89100
*/
90101
BatchScanner createBatchScanner(TableBucket tableBucket, long snapshotId);
91102
}

fluss-client/src/main/java/org/apache/fluss/client/table/scanner/batch/DefaultBatchScanner.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import org.apache.fluss.client.metadata.MetadataUpdater;
2121
import org.apache.fluss.client.table.scanner.PartitionFilter;
2222
import org.apache.fluss.exception.LeaderNotAvailableException;
23-
import org.apache.fluss.exception.TableNotPartitionedException;
2423
import org.apache.fluss.metadata.PhysicalTablePath;
2524
import org.apache.fluss.metadata.TableBucket;
2625
import org.apache.fluss.metadata.TableInfo;
@@ -58,8 +57,8 @@
5857
* Default implementation of {@link BatchScanner} that performs a full scan against tablet servers.
5958
*
6059
* <p>This scanner issues FULL_SCAN RPCs to the leaders of all buckets and aggregates the results.
61-
* It returns all current values at a point in time for primary-key tables. The first call to
62-
* {@link #pollBatch(Duration)} returns the complete snapshot; subsequent calls return {@code null}.
60+
* It returns all current values at a point in time for primary-key tables. The first call to {@link
61+
* #pollBatch(Duration)} returns the complete snapshot; subsequent calls return {@code null}.
6362
*
6463
* <p>Note: For partitioned tables, callers may provide a {@link PartitionFilter} with a partition
6564
* name to restrict the scan to a single partition.
@@ -116,7 +115,11 @@ public CloseableIterator<InternalRow> pollBatch(Duration timeout) throws IOExcep
116115
} catch (TimeoutException e) {
117116
// try again in next poll
118117
return CloseableIterator.emptyIterator();
118+
} catch (RuntimeException re) {
119+
// propagate runtime exceptions (e.g., IllegalArgumentException) without wrapping
120+
throw re;
119121
} catch (Exception e) {
122+
// wrap checked exceptions into IOException as the API declares
120123
throw new IOException(e);
121124
}
122125
}
@@ -125,8 +128,8 @@ private List<CompletableFuture<FullScanResponse>> issueFullScanRequests() {
125128
Long partitionId = null;
126129
if (tableInfo.isPartitioned()) {
127130
if (partitionFilter == null || partitionFilter.getPartitionName() == null) {
128-
throw new TableNotPartitionedException(
129-
"Partition filter is required for partitioned table full scan.");
131+
throw new IllegalArgumentException(
132+
"Full scan on a partitioned table requires a PartitionFilter with a partition name.");
130133
}
131134
TablePath tablePath = tableInfo.getTablePath();
132135
PhysicalTablePath physicalTablePath =

0 commit comments

Comments
 (0)