Skip to content

Commit b542f04

Browse files
committed
poc
1 parent 69ba18e commit b542f04

File tree

13 files changed

+667
-9
lines changed

13 files changed

+667
-9
lines changed

fluss-client/src/main/java/org/apache/fluss/client/table/scanner/Scan.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,11 +65,19 @@ public interface Scan {
6565
*/
6666
LogScanner createLogScanner();
6767

68+
/**
69+
* Creates a {@link BatchScanner} to read data for this scan over the whole table.
70+
* For partitioned tables, scope must be selected via
71+
* {@link org.apache.fluss.client.table.scanner.batch.BatchScanner#snapshotAllPartition(String)}.
72+
*/
73+
BatchScanner createBatchScanner();
74+
6875
/**
6976
* Creates a {@link BatchScanner} to read current data in the given table bucket for this scan.
7077
*
7178
* <p>Note: this API doesn't support pre-configured with {@link #project}.
7279
*/
80+
@Deprecated
7381
BatchScanner createBatchScanner(TableBucket tableBucket);
7482

7583
/**

fluss-client/src/main/java/org/apache/fluss/client/table/scanner/TableScan.java

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.fluss.client.table.scanner.batch.BatchScanner;
2424
import org.apache.fluss.client.table.scanner.batch.KvSnapshotBatchScanner;
2525
import org.apache.fluss.client.table.scanner.batch.LimitBatchScanner;
26+
import org.apache.fluss.client.table.scanner.batch.DefaultBatchScanner;
2627
import org.apache.fluss.client.table.scanner.log.LogScanner;
2728
import org.apache.fluss.client.table.scanner.log.LogScannerImpl;
2829
import org.apache.fluss.config.ConfigOptions;
@@ -100,11 +101,37 @@ public LogScanner createLogScanner() {
100101
projectedColumns);
101102
}
102103

104+
@Override
105+
public BatchScanner createBatchScanner() {
106+
if (limit == null) {
107+
if (!tableInfo.hasPrimaryKey()) {
108+
throw new UnsupportedOperationException(
109+
"Full scan BatchScanner is only supported for primary key tables.");
110+
}
111+
// Prefer server-side full scan if explicitly enabled
112+
boolean useServer = conn.getConfiguration().getBoolean(ConfigOptions.CLIENT_SCANNER_USE_SERVER_FULL_SCAN);
113+
if (useServer) {
114+
return new org.apache.fluss.client.table.scanner.batch.ServerFullScanBatchScanner(
115+
conn.getMetadataUpdater(), tableInfo, projectedColumns);
116+
}
117+
String scannerTmpDir =
118+
conn.getConfiguration().getString(ConfigOptions.CLIENT_SCANNER_IO_TMP_DIR);
119+
return new DefaultBatchScanner(
120+
tableInfo,
121+
conn.getAdmin(),
122+
conn.getOrCreateRemoteFileDownloader(),
123+
projectedColumns,
124+
scannerTmpDir);
125+
}
126+
throw new UnsupportedOperationException(
127+
"Limit scan requires bucket specification; use createBatchScanner(TableBucket) instead.");
128+
}
129+
103130
@Override
104131
public BatchScanner createBatchScanner(TableBucket tableBucket) {
105132
if (limit == null) {
106133
throw new UnsupportedOperationException(
107-
"Currently, BatchScanner is only available when limit is set.");
134+
"Deprecated API: use createBatchScanner() for full-table scans; bucket-specific full scan is no longer supported.");
108135
}
109136
return new LimitBatchScanner(
110137
tableInfo, tableBucket, conn.getMetadataUpdater(), projectedColumns, limit);

fluss-client/src/main/java/org/apache/fluss/client/table/scanner/batch/BatchScanner.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,30 @@
3838
@PublicEvolving
3939
public interface BatchScanner extends Closeable {
4040

41+
/**
42+
* Configure the scanner to read a snapshot over the whole table (non-partitioned tables).
43+
* For partitioned tables, use {@link #snapshotAllPartition(String)} to select a partition.
44+
*
45+
* <p>By default, this is a no-op and returns {@code this}. Implementations may override to
46+
* return a new configured scanner instance.
47+
*/
48+
default BatchScanner snapshotAll() {
49+
return this;
50+
}
51+
52+
/**
53+
* Configure the scanner to read a snapshot from the specified partition. Only applicable to
54+
* partitioned tables.
55+
*
56+
* @param partitionName the partition to scan
57+
* @return a scanner configured for a one-shot snapshot over the given partition
58+
* @throws UnsupportedOperationException if the table is not partitioned
59+
*/
60+
default BatchScanner snapshotAllPartition(String partitionName) {
61+
throw new UnsupportedOperationException(
62+
"Partition filter is only supported for partitioned tables");
63+
}
64+
4165
/**
4266
* Poll one batch records. The method should return null when reaching the end of the input.
4367
*
Lines changed: 210 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,210 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.client.table.scanner.batch;
19+
20+
import org.apache.fluss.client.admin.Admin;
21+
import org.apache.fluss.client.metadata.KvSnapshotMetadata;
22+
import org.apache.fluss.client.metadata.KvSnapshots;
23+
import org.apache.fluss.client.table.scanner.RemoteFileDownloader;
24+
import org.apache.fluss.config.ConfigOptions;
25+
import org.apache.fluss.exception.FlussRuntimeException;
26+
import org.apache.fluss.metadata.KvFormat;
27+
import org.apache.fluss.metadata.TableBucket;
28+
import org.apache.fluss.metadata.TableInfo;
29+
import org.apache.fluss.row.InternalRow;
30+
import org.apache.fluss.types.RowType;
31+
import org.apache.fluss.utils.CloseableIterator;
32+
33+
import javax.annotation.Nullable;
34+
35+
import java.io.IOException;
36+
import java.time.Duration;
37+
import java.util.ArrayList;
38+
import java.util.List;
39+
import java.util.concurrent.CompletableFuture;
40+
import java.util.concurrent.TimeUnit;
41+
42+
/**
43+
* BatchScanner that performs a full-table (or single-partition) snapshot by reading
44+
* the latest KV snapshots per bucket and merging results client-side. Designed for small tables.
45+
*/
46+
public class DefaultBatchScanner implements BatchScanner {
47+
48+
// Track underlying per-bucket scanners to ensure resources are released on early close/errors
49+
private final List<BatchScanner> activeBucketScanners = new ArrayList<>();
50+
51+
private final TableInfo tableInfo;
52+
private final Admin admin;
53+
private final RemoteFileDownloader downloader;
54+
@Nullable private final int[] projectedFields;
55+
@Nullable private final String partitionName; // when non-null, restrict to this partition
56+
57+
private final String scannerTmpDir;
58+
private final KvFormat kvFormat;
59+
private final RowType rowType;
60+
61+
private volatile boolean endOfInput = false;
62+
63+
public DefaultBatchScanner(
64+
TableInfo tableInfo,
65+
Admin admin,
66+
RemoteFileDownloader downloader,
67+
@Nullable int[] projectedFields,
68+
@Nullable String partitionName,
69+
String scannerTmpDir) {
70+
this.tableInfo = tableInfo;
71+
this.admin = admin;
72+
this.downloader = downloader;
73+
this.projectedFields = projectedFields;
74+
this.partitionName = partitionName;
75+
this.scannerTmpDir = scannerTmpDir;
76+
this.kvFormat = tableInfo.getTableConfig().getKvFormat();
77+
this.rowType = tableInfo.getRowType();
78+
}
79+
80+
public DefaultBatchScanner(
81+
TableInfo tableInfo,
82+
Admin admin,
83+
RemoteFileDownloader downloader,
84+
@Nullable int[] projectedFields,
85+
String scannerTmpDir) {
86+
this(tableInfo, admin, downloader, projectedFields, null, scannerTmpDir);
87+
}
88+
89+
@Override
90+
public BatchScanner snapshotAll() {
91+
// no-op: scanning whole table is the default in this implementation
92+
return this;
93+
}
94+
95+
@Override
96+
public BatchScanner snapshotAllPartition(String partitionName) {
97+
if (!tableInfo.isPartitioned()) {
98+
throw new UnsupportedOperationException(
99+
"Partition filter is only supported for partitioned tables");
100+
}
101+
return new DefaultBatchScanner(
102+
tableInfo, admin, downloader, projectedFields, partitionName, scannerTmpDir);
103+
}
104+
105+
@Nullable
106+
@Override
107+
public CloseableIterator<InternalRow> pollBatch(Duration timeout) throws IOException {
108+
if (endOfInput) {
109+
return null;
110+
}
111+
112+
if (tableInfo.isPartitioned() && partitionName == null) {
113+
throw new IllegalArgumentException(
114+
"Full-table snapshot on a partitioned table requires a PartitionFilter with a partition name. "
115+
+ "Use snapshotAllPartition(partitionName).");
116+
}
117+
if (!tableInfo.hasPrimaryKey()) {
118+
throw new UnsupportedOperationException(
119+
"Full scan BatchScanner is only supported for primary key tables.");
120+
}
121+
122+
List<BatchScanner> bucketScanners = new ArrayList<>();
123+
try {
124+
// 1) Fetch latest KV snapshots for scope
125+
final KvSnapshots kvSnapshots;
126+
if (partitionName == null) {
127+
kvSnapshots = admin.getLatestKvSnapshots(tableInfo.getTablePath()).get(30, TimeUnit.SECONDS);
128+
} else {
129+
kvSnapshots = admin.getLatestKvSnapshots(tableInfo.getTablePath(), partitionName).get(30, TimeUnit.SECONDS);
130+
}
131+
132+
// 2) Build per-bucket snapshot scanners
133+
for (int bucketId : kvSnapshots.getBucketIds()) {
134+
long snapshotId;
135+
if (kvSnapshots.getSnapshotId(bucketId).isPresent()) {
136+
snapshotId = kvSnapshots.getSnapshotId(bucketId).getAsLong();
137+
} else {
138+
// No snapshot for this bucket yet; skip
139+
continue;
140+
}
141+
TableBucket tableBucket = new TableBucket(
142+
kvSnapshots.getTableId(), kvSnapshots.getPartitionId(), bucketId);
143+
144+
KvSnapshotMetadata meta = getSnapshotMetadata(tableBucket, snapshotId);
145+
KvSnapshotBatchScanner scanner =
146+
new KvSnapshotBatchScanner(
147+
rowType,
148+
tableBucket,
149+
meta.getSnapshotFiles(),
150+
projectedFields,
151+
scannerTmpDir,
152+
kvFormat,
153+
downloader);
154+
bucketScanners.add(scanner);
155+
}
156+
157+
// make them visible to close()
158+
synchronized (activeBucketScanners) {
159+
activeBucketScanners.clear();
160+
activeBucketScanners.addAll(bucketScanners);
161+
}
162+
163+
// 3) Collect all rows from bucket scanners in one shot
164+
List<InternalRow> rows = BatchScanUtils.collectAllRows(bucketScanners);
165+
166+
// collection closes the scanners; clear the active set
167+
synchronized (activeBucketScanners) {
168+
activeBucketScanners.clear();
169+
}
170+
171+
endOfInput = true;
172+
return CloseableIterator.wrap(rows.iterator());
173+
} catch (Exception e) {
174+
// close any scanners that may have been created
175+
for (BatchScanner scanner : bucketScanners) {
176+
try { scanner.close(); } catch (Exception ignore) {}
177+
}
178+
synchronized (activeBucketScanners) {
179+
activeBucketScanners.clear();
180+
}
181+
throw new IOException("Failed to execute full snapshot scan", e);
182+
}
183+
}
184+
185+
private KvSnapshotMetadata getSnapshotMetadata(TableBucket tableBucket, long snapshotId) {
186+
try {
187+
CompletableFuture<KvSnapshotMetadata> f = admin.getKvSnapshotMetadata(tableBucket, snapshotId);
188+
return f.get(30, TimeUnit.SECONDS);
189+
} catch (Exception e) {
190+
throw new FlussRuntimeException("Failed to get snapshot metadata for " + tableBucket
191+
+ ", snapshotId=" + snapshotId, e);
192+
}
193+
}
194+
195+
@Override
196+
public void close() throws IOException {
197+
endOfInput = true;
198+
// Ensure any active bucket-level scanners are closed to free resources
199+
synchronized (activeBucketScanners) {
200+
for (BatchScanner scanner : activeBucketScanners) {
201+
try {
202+
scanner.close();
203+
} catch (Exception ignore) {
204+
// swallow to ensure best-effort close
205+
}
206+
}
207+
activeBucketScanners.clear();
208+
}
209+
}
210+
}

0 commit comments

Comments
 (0)