Skip to content

Commit 8695b1d

Browse files
committed
add small updates
1 parent e76b789 commit 8695b1d

File tree

5 files changed

+61
-43
lines changed

5 files changed

+61
-43
lines changed

fluss-client/src/main/java/org/apache/fluss/client/table/scanner/TableScan.java

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -21,25 +21,24 @@
2121
import org.apache.fluss.client.admin.Admin;
2222
import org.apache.fluss.client.metadata.KvSnapshotMetadata;
2323
import org.apache.fluss.client.table.scanner.batch.BatchScanner;
24+
import org.apache.fluss.client.table.scanner.batch.FullScanBatchScanner;
2425
import org.apache.fluss.client.table.scanner.batch.KvSnapshotBatchScanner;
2526
import org.apache.fluss.client.table.scanner.batch.LimitBatchScanner;
26-
import org.apache.fluss.client.table.scanner.batch.FullScanBatchScanner;
2727
import org.apache.fluss.client.table.scanner.log.LogScanner;
2828
import org.apache.fluss.client.table.scanner.log.LogScannerImpl;
2929
import org.apache.fluss.config.ConfigOptions;
3030
import org.apache.fluss.exception.FlussRuntimeException;
31-
import org.apache.fluss.exception.TableNotPartitionedException;
3231
import org.apache.fluss.exception.PartitionNotExistException;
32+
import org.apache.fluss.exception.TableNotPartitionedException;
3333
import org.apache.fluss.metadata.PhysicalTablePath;
3434
import org.apache.fluss.metadata.TableBucket;
3535
import org.apache.fluss.metadata.TableInfo;
36-
import org.apache.fluss.metadata.TablePath;
3736
import org.apache.fluss.types.RowType;
38-
import java.util.concurrent.ExecutionException;
3937

4038
import javax.annotation.Nullable;
4139

4240
import java.util.List;
41+
import java.util.concurrent.ExecutionException;
4342

4443
/** API for configuring and creating {@link LogScanner} and {@link BatchScanner}. */
4544
public class TableScan implements Scan {
@@ -80,7 +79,9 @@ public Scan project(List<String> projectedColumnNames) {
8079
int index = rowType.getFieldIndex(projectedColumnNames.get(i));
8180
if (index < 0) {
8281
throw new IllegalArgumentException(
83-
String.format("Field %s not found in table schema.", projectedColumnNames.get(i)));
82+
String.format(
83+
"Field %s not found in table schema.",
84+
projectedColumnNames.get(i)));
8485
}
8586
columnIndexes[i] = index;
8687
}
@@ -205,9 +206,7 @@ public BatchScanner createBatchScanner(String partitionName) {
205206
return new FullScanBatchScanner(tableInfo, conn.getMetadataUpdater(), pid);
206207
}
207208

208-
/**
209-
* Create a BatchScanner that performs a FULL_SCAN over the entire (non-partitioned) table.
210-
*/
209+
/** Create a BatchScanner that performs a FULL_SCAN over the entire (non-partitioned) table. */
211210
public BatchScanner createFullScanBatchScanner() {
212211
if (tableInfo.isPartitioned()) {
213212
throw new TableNotPartitionedException(
@@ -216,9 +215,7 @@ public BatchScanner createFullScanBatchScanner() {
216215
return new FullScanBatchScanner(tableInfo, conn.getMetadataUpdater(), null);
217216
}
218217

219-
/**
220-
* Create a BatchScanner that performs a FULL_SCAN over a specific partition of the table.
221-
*/
218+
/** Create a BatchScanner that performs a FULL_SCAN over a specific partition of the table. */
222219
public BatchScanner createFullScanBatchScanner(String partitionName) {
223220
if (!tableInfo.isPartitioned()) {
224221
throw new TableNotPartitionedException(
@@ -232,7 +229,8 @@ public BatchScanner createFullScanBatchScanner(String partitionName) {
232229
}
233230
Long pid = conn.getMetadataUpdater().getPartitionId(physical).orElse(null);
234231
if (pid == null) {
235-
throw new IllegalStateException(String.format("Partition id not found for %s", partitionName));
232+
throw new IllegalStateException(
233+
String.format("Partition id not found for %s", partitionName));
236234
}
237235
return new FullScanBatchScanner(tableInfo, conn.getMetadataUpdater(), pid);
238236
}

fluss-client/src/main/java/org/apache/fluss/client/table/scanner/batch/BatchScanner.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -50,16 +50,17 @@ public interface BatchScanner extends Closeable {
5050
CloseableIterator<InternalRow> pollBatch(Duration timeout) throws IOException;
5151

5252
/**
53-
* Perform a bounded snapshot and return all rows as a single collection.
54-
* Default implementation is unsupported; only specific scanners (e.g., full-scan) support it.
53+
* Perform a bounded snapshot and return all rows as a single collection. Default implementation
54+
* is unsupported; only specific scanners (e.g., full-scan) support it.
5555
*/
5656
default CompletableFuture<List<InternalRow>> snapshotAll() {
5757
throw new UnsupportedOperationException("snapshotAll is not supported by this scanner.");
5858
}
5959

6060
/**
61-
* Perform a bounded snapshot for a specific partition and return all rows as a single collection.
62-
* Default implementation is unsupported; only specific scanners (e.g., full-scan) support it.
61+
* Perform a bounded snapshot for a specific partition and return all rows as a single
62+
* collection. Default implementation is unsupported; only specific scanners (e.g., full-scan)
63+
* support it.
6364
*/
6465
default CompletableFuture<List<InternalRow>> snapshotAllPartition(String partitionName) {
6566
throw new UnsupportedOperationException(

fluss-client/src/main/java/org/apache/fluss/client/table/scanner/batch/FullScanBatchScanner.java

Lines changed: 15 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -48,11 +48,8 @@
4848
import java.util.concurrent.TimeUnit;
4949

5050
/**
51-
* BatchScanner that performs a FULL_SCAN snapshot across all leaders of a KV table (optionally a
51+
* BatchScanner that performs a FULL_SCAN snapshot across all leaders of a KV table (or a
5252
* single partition) and exposes the result via pollBatch in a bounded fashion.
53-
*
54-
* <p>This moves the snapshotAll/snapshotAllPartition functionality previously implemented in
55-
* PrimaryKeyLookuper into a batch scanner form.
5653
*/
5754
@Internal
5855
public class FullScanBatchScanner implements BatchScanner {
@@ -104,30 +101,31 @@ private void startFullScan() {
104101
if (gateway == null) {
105102
rowsFuture.completeExceptionally(
106103
new IllegalStateException(
107-
"Leader server " + leader + " is not found in metadata cache."));
104+
"Leader server "
105+
+ leader
106+
+ " is not found in metadata cache."));
108107
return;
109108
}
110109
FullScanRequest request = new FullScanRequest();
111110
request.setTableId(tableId);
112-
// bucket_id is required by the protocol, ignored by server for FULL_SCAN
113111
request.setBucketId(0);
112+
114113
if (partitionId != null) {
115114
request.setPartitionId(partitionId);
116115
}
117116
responseFutures.add(gateway.fullScan(request));
118117
}
119118

120-
CompletableFuture
121-
.allOf(responseFutures.toArray(new CompletableFuture[0]))
122-
.thenApply(v -> decodeFullScanResponses(responseFutures))
123-
.whenComplete(
124-
(rows, err) -> {
125-
if (err != null) {
126-
rowsFuture.completeExceptionally(err);
127-
} else {
128-
rowsFuture.complete(rows);
129-
}
130-
});
119+
CompletableFuture.allOf(responseFutures.toArray(new CompletableFuture[0]))
120+
.thenApply(v -> decodeFullScanResponses(responseFutures))
121+
.whenComplete(
122+
(rows, err) -> {
123+
if (err != null) {
124+
rowsFuture.completeExceptionally(err);
125+
} else {
126+
rowsFuture.complete(rows);
127+
}
128+
});
131129
} catch (Throwable t) {
132130
rowsFuture.completeExceptionally(t);
133131
}
@@ -177,7 +175,6 @@ private List<InternalRow> decodeFullScanResponses(
177175
}
178176
throw new IOException("Failed to perform full scan", cause);
179177
} catch (Exception te) {
180-
// timeout or interruption -> return empty iterator to indicate no data yet
181178
return CloseableIterator.emptyIterator();
182179
}
183180
}
@@ -187,7 +184,6 @@ public void close() throws IOException {
187184
// nothing to close
188185
}
189186

190-
// ---- New BatchScanner API implementations ----
191187
@Override
192188
public CompletableFuture<List<InternalRow>> snapshotAll() {
193189
return rowsFuture;

fluss-client/src/test/java/org/apache/fluss/client/table/scanner/batch/FullScanBatchScannerITCase.java

Lines changed: 30 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,6 @@
5959

6060
/**
6161
* End-to-end IT cases for FullScanBatchScanner functionality.
62-
* Mirrors PrimaryKeyLookuperITCase but uses the BatchScanner API.
6362
*/
6463
class FullScanBatchScannerITCase {
6564

@@ -155,7 +154,11 @@ void testSnapshotAllNotPartitioned() throws Exception {
155154
}
156155

157156
// unhappy path: createBatchScanner with partition on non-partitioned table
158-
assertThatThrownBy(() -> ((org.apache.fluss.client.table.scanner.TableScan) table.newScan()).createBatchScanner("p1"))
157+
assertThatThrownBy(
158+
() ->
159+
((org.apache.fluss.client.table.scanner.TableScan)
160+
table.newScan())
161+
.createBatchScanner("p1"))
159162
.isInstanceOf(TableNotPartitionedException.class)
160163
.hasMessageContaining("Table is not partitioned");
161164
}
@@ -210,8 +213,16 @@ void testSnapshotAllPartitioned() throws Exception {
210213
upsert.flush();
211214

212215
// valid partition snapshots via BatchScanner
213-
List<InternalRow> p1Rows = ((org.apache.fluss.client.table.scanner.TableScan) table.newScan()).createBatchScanner("20240101").snapshotAll().get();
214-
List<InternalRow> p2Rows = ((org.apache.fluss.client.table.scanner.TableScan) table.newScan()).createBatchScanner("20240102").snapshotAll().get();
216+
List<InternalRow> p1Rows =
217+
((org.apache.fluss.client.table.scanner.TableScan) table.newScan())
218+
.createBatchScanner("20240101")
219+
.snapshotAll()
220+
.get();
221+
List<InternalRow> p2Rows =
222+
((org.apache.fluss.client.table.scanner.TableScan) table.newScan())
223+
.createBatchScanner("20240102")
224+
.snapshotAll()
225+
.get();
215226

216227
RowType rowType = schema.getRowType();
217228
Comparator<InternalRow> byKey = Comparator.comparingInt(r -> r.getInt(0));
@@ -239,12 +250,20 @@ void testSnapshotAllPartitioned() throws Exception {
239250
}
240251

241252
// unhappy path: invalid partition name
242-
assertThatThrownBy(() -> ((org.apache.fluss.client.table.scanner.TableScan) table.newScan()).createBatchScanner("p=does_not_exist"))
253+
assertThatThrownBy(
254+
() ->
255+
((org.apache.fluss.client.table.scanner.TableScan)
256+
table.newScan())
257+
.createBatchScanner("p=does_not_exist"))
243258
.isInstanceOf(PartitionNotExistException.class)
244259
.hasMessageContaining("does not exist");
245260

246261
// unhappy path: snapshotAll on partitioned table without specifying partition
247-
assertThatThrownBy(() -> ((org.apache.fluss.client.table.scanner.TableScan) table.newScan()).createBatchScanner())
262+
assertThatThrownBy(
263+
() ->
264+
((org.apache.fluss.client.table.scanner.TableScan)
265+
table.newScan())
266+
.createBatchScanner())
248267
.isInstanceOf(TableNotPartitionedException.class)
249268
.hasMessageContaining("Table is partitioned");
250269
}
@@ -281,7 +300,11 @@ void testSnapshotAllFor10KTable() throws Exception {
281300
upsert.flush();
282301

283302
// run snapshotAll via BatchScanner
284-
List<InternalRow> rows = ((org.apache.fluss.client.table.scanner.TableScan) table.newScan()).createBatchScanner().snapshotAll().get();
303+
List<InternalRow> rows =
304+
((org.apache.fluss.client.table.scanner.TableScan) table.newScan())
305+
.createBatchScanner()
306+
.snapshotAll()
307+
.get();
285308

286309
// verify size and key coverage
287310
assertThat(rows).hasSize(total);

fluss-rpc/src/main/proto/FlussApi.proto

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -259,7 +259,7 @@ message LimitScanResponse{
259259
message FullScanRequest {
260260
required int64 table_id = 1;
261261
optional int64 partition_id = 2; // omit or unset for full table when non-partitioned
262-
// required int32 bucket_id = 3;
262+
required int32 bucket_id = 3;
263263
}
264264

265265
message FullScanResponse {

0 commit comments

Comments
 (0)