Skip to content

Commit 17ae760

Browse files
committed
add updates
1 parent 1489f14 commit 17ae760

File tree

5 files changed

+81
-17
lines changed

5 files changed

+81
-17
lines changed

fluss-client/src/main/java/org/apache/fluss/client/table/scanner/Scan.java

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -74,16 +74,20 @@ public interface Scan {
7474
LogScanner createLogScanner();
7575

7676
/**
77-
* Creates a {@link BatchScanner} to read current data in the given table bucket for this scan.
77+
* Creates a {@link BatchScanner} to read data for this scan over the whole table.
7878
*
79-
* <p>Notes:
80-
*
81-
* <ul>
82-
* <li>Projection configured via {@link #project} is supported client-side.
83-
* <li>For full scans (when {@link #limit(int)} is not set), the provided {@link TableBucket}
84-
* is used only to identify the table (and optional partition); the bucket id is ignored.
85-
* </ul>
79+
* <p>For full scans (when {@link #limit(int)} is not set), the returned scanner reads a single
80+
* snapshot of current values across all buckets. Use {@link BatchScanner#snapshotAll()} for
81+
* non-partitioned tables or {@link BatchScanner#snapshotAllPartition(String)} for partitioned
82+
* tables.
83+
*/
84+
BatchScanner createBatchScanner();
85+
86+
/**
87+
* [Deprecated] Creates a {@link BatchScanner} to read current data in the given table bucket
88+
* for this scan.
8689
*/
90+
@Deprecated
8791
BatchScanner createBatchScanner(TableBucket tableBucket);
8892

8993
/**

fluss-client/src/main/java/org/apache/fluss/client/table/scanner/TableScan.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -115,16 +115,27 @@ public LogScanner createLogScanner() {
115115
}
116116

117117
@Override
118-
public BatchScanner createBatchScanner(TableBucket tableBucket) {
118+
public BatchScanner createBatchScanner() {
119119
if (limit == null) {
120120
if (!tableInfo.hasPrimaryKey()) {
121121
throw new UnsupportedOperationException(
122122
"Full scan BatchScanner is only supported for primary key tables.");
123123
}
124-
// Full scan semantics: one-shot snapshot of current values across all buckets.
125124
return new DefaultBatchScanner(
126125
tableInfo, conn.getMetadataUpdater(), projectedColumns, partitionFilter);
127126
}
127+
// For limited scans, the legacy API requires a TableBucket to indicate which bucket.
128+
// Keep behavior via deprecated overloads; here we throw to signal misuse.
129+
throw new UnsupportedOperationException(
130+
"Limit scan requires bucket specification; use createBatchScanner(TableBucket) instead.");
131+
}
132+
133+
@Override
134+
public BatchScanner createBatchScanner(TableBucket tableBucket) {
135+
if (limit == null) {
136+
// delegate to whole table scanner
137+
return createBatchScanner();
138+
}
128139
return new LimitBatchScanner(
129140
tableInfo, tableBucket, conn.getMetadataUpdater(), projectedColumns, limit);
130141
}

fluss-client/src/main/java/org/apache/fluss/client/table/scanner/batch/BatchScanner.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,33 @@
3838
@PublicEvolving
3939
public interface BatchScanner extends Closeable {
4040

41+
/**
42+
* Configure the scanner to read a snapshot over the whole table (non-partitioned tables) or the
43+
* whole (single) partition must be selected via {@link #snapshotAllPartition(String)} for
44+
* partitioned tables.
45+
*
46+
* <p>For partitioned tables, calling this without specifying a partition name will cause the
47+
* implementation to throw an IllegalArgumentException when polling.
48+
*
49+
* @return a scanner configured for a one-shot snapshot over the current data
50+
*/
51+
default BatchScanner snapshotAll() {
52+
return this;
53+
}
54+
55+
/**
56+
* Configure the scanner to read a snapshot from the specified partition. Only applicable to
57+
* partitioned tables.
58+
*
59+
* @param partitionName the partition to scan
60+
* @return a scanner configured for a one-shot snapshot over the given partition
61+
* @throws UnsupportedOperationException if the table is not partitioned
62+
*/
63+
default BatchScanner snapshotAllPartition(String partitionName) {
64+
throw new UnsupportedOperationException(
65+
"Partition filter is only supported for partitioned tables");
66+
}
67+
4168
/**
4269
* Poll one batch records. The method should return null when reaching the end of the input.
4370
*

fluss-client/src/main/java/org/apache/fluss/client/table/scanner/batch/DefaultBatchScanner.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,24 @@
6565
*/
6666
public class DefaultBatchScanner implements BatchScanner {
6767

68+
@Override
69+
public BatchScanner snapshotAll() {
70+
return this;
71+
}
72+
73+
@Override
74+
public BatchScanner snapshotAllPartition(String partitionName) {
75+
if (!tableInfo.isPartitioned()) {
76+
throw new UnsupportedOperationException(
77+
"Partition filter is only supported for partitioned tables");
78+
}
79+
return new DefaultBatchScanner(
80+
tableInfo,
81+
metadataUpdater,
82+
projectedFields,
83+
PartitionFilter.ofPartitionName(partitionName));
84+
}
85+
6886
private final TableInfo tableInfo;
6987
private final MetadataUpdater metadataUpdater;
7088
@Nullable private final int[] projectedFields;

fluss-client/src/test/java/org/apache/fluss/client/table/scanner/batch/DefaultBatchScannerITCase.java

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ void testFullScanPrimaryKeyTable_returnsAllRows() throws Exception {
7070
writer.flush();
7171

7272
// Full scan via Scan API (limit == null)
73-
BatchScanner scanner = table.newScan().createBatchScanner(new TableBucket(tableId, 0));
73+
BatchScanner scanner = table.newScan().createBatchScanner().snapshotAll();
7474
List<InternalRow> actualRows = collectRows(scanner);
7575

7676
// Assert count
@@ -104,7 +104,8 @@ void testProjectionInFullScan() throws Exception {
104104
BatchScanner scanner =
105105
table.newScan()
106106
.project(projected)
107-
.createBatchScanner(new TableBucket(tableId, 0));
107+
.createBatchScanner()
108+
.snapshotAll();
108109
List<InternalRow> actualRows = collectRows(scanner);
109110

110111
assertThat(actualRows).hasSize(5);
@@ -127,7 +128,7 @@ void testFilterOnNonPartitionedTableThrows() throws Exception {
127128
() ->
128129
table.newScan()
129130
.filter(PartitionFilter.ofPartitionName("p=1"))
130-
.createBatchScanner(new TableBucket(tableId, 0)))
131+
.createBatchScanner())
131132
.isInstanceOf(UnsupportedOperationException.class)
132133
.hasMessageContaining(
133134
"Partition filter is only supported for partitioned tables");
@@ -170,7 +171,8 @@ void testFullScanPartitionedTable_requiresPartitionFilter() throws Exception {
170171
assertThatThrownBy(
171172
() ->
172173
table.newScan()
173-
.createBatchScanner(new TableBucket(tableId, 0))
174+
.createBatchScanner()
175+
.snapshotAll()
174176
.pollBatch(java.time.Duration.ofMillis(1)))
175177
.isInstanceOf(IllegalArgumentException.class)
176178
.hasMessageContaining("requires a PartitionFilter with a partition name");
@@ -179,7 +181,8 @@ void testFullScanPartitionedTable_requiresPartitionFilter() throws Exception {
179181
BatchScanner scannerA =
180182
table.newScan()
181183
.filter(PartitionFilter.ofPartitionName("a"))
182-
.createBatchScanner(new TableBucket(tableId, 0));
184+
.createBatchScanner()
185+
.snapshotAllPartition("a");
183186
List<InternalRow> rowsA = collectRows(scannerA);
184187
assertThat(rowsA).hasSize(4);
185188
List<Object[]> valuesA = toValues(rowsA, schema.getRowType());
@@ -195,7 +198,8 @@ void testFullScanPartitionedTable_requiresPartitionFilter() throws Exception {
195198
BatchScanner scannerB =
196199
table.newScan()
197200
.filter(PartitionFilter.ofPartitionName("b"))
198-
.createBatchScanner(new TableBucket(tableId, 0));
201+
.createBatchScanner()
202+
.snapshotAllPartition("b");
199203
List<InternalRow> rowsB = collectRows(scannerB);
200204
assertThat(rowsB).hasSize(4);
201205
List<Object[]> valuesB = toValues(rowsB, schema.getRowType());
@@ -224,7 +228,7 @@ void testFullScanOnLogTableThrows() throws Exception {
224228

225229
try (Table table = conn.getTable(tablePath)) {
226230
assertThatThrownBy(
227-
() -> table.newScan().createBatchScanner(new TableBucket(tableId, 0)))
231+
() -> table.newScan().createBatchScanner())
228232
.isInstanceOf(UnsupportedOperationException.class)
229233
.hasMessageContaining(
230234
"Full scan BatchScanner is only supported for primary key tables.");

0 commit comments

Comments
 (0)