Skip to content

Commit e7b1bd0

Browse files
committed
update based on comments
1 parent d2abf2b commit e7b1bd0

File tree

4 files changed

+297
-6
lines changed

4 files changed

+297
-6
lines changed
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.client.table.scanner;
19+
20+
import javax.annotation.Nullable;
21+
22+
/**
23+
* A simple filter that narrows scans to a specific partition.
24+
*
25+
* <p>This is an initial, minimal filter abstraction to enable partition pruning for client-side
26+
* scans. For now, only equality on a concrete partition name is supported. Row-level filtering is
27+
* intentionally not supported here.
28+
*/
29+
public final class PartitionFilter {
30+
31+
@Nullable private final String partitionName;
32+
33+
private PartitionFilter(@Nullable String partitionName) {
34+
this.partitionName = partitionName;
35+
}
36+
37+
/** Creates a filter that limits the scan to the given partition name. */
38+
public static PartitionFilter ofPartitionName(String partitionName) {
39+
return new PartitionFilter(partitionName);
40+
}
41+
42+
/** Returns the partition name to filter by, or null if none. */
43+
@Nullable
44+
public String getPartitionName() {
45+
return partitionName;
46+
}
47+
}

fluss-client/src/main/java/org/apache/fluss/client/table/scanner/Scan.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,14 @@ public interface Scan {
5151
*/
5252
Scan project(List<String> projectedColumnNames);
5353

54+
/**
55+
* Returns a new scan from this refined by a partition filter.
56+
*
57+
* <p>Currently, only equality predicates on partition columns are supported. Non-partitioned
58+
* tables do not accept filters.
59+
*/
60+
Scan filter(PartitionFilter partitionFilter);
61+
5462
/**
5563
* Returns a new scan from this that will read the given limited row number.
5664
*

fluss-client/src/main/java/org/apache/fluss/client/table/scanner/TableScan.java

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.fluss.client.admin.Admin;
2222
import org.apache.fluss.client.metadata.KvSnapshotMetadata;
2323
import org.apache.fluss.client.table.scanner.batch.BatchScanner;
24+
import org.apache.fluss.client.table.scanner.batch.DefaultBatchScanner;
2425
import org.apache.fluss.client.table.scanner.batch.KvSnapshotBatchScanner;
2526
import org.apache.fluss.client.table.scanner.batch.LimitBatchScanner;
2627
import org.apache.fluss.client.table.scanner.log.LogScanner;
@@ -43,27 +44,31 @@ public class TableScan implements Scan {
4344

4445
/** The projected fields to do projection. No projection if is null. */
4546
@Nullable private final int[] projectedColumns;
47+
/** Partition filter for partition pruning; null if none. */
48+
@Nullable private final PartitionFilter partitionFilter;
4649
/** The limited row number to read. No limit if is null. */
4750
@Nullable private final Integer limit;
4851

4952
public TableScan(FlussConnection conn, TableInfo tableInfo) {
50-
this(conn, tableInfo, null, null);
53+
this(conn, tableInfo, null, null, null);
5154
}
5255

5356
private TableScan(
5457
FlussConnection conn,
5558
TableInfo tableInfo,
5659
@Nullable int[] projectedColumns,
60+
@Nullable PartitionFilter partitionFilter,
5761
@Nullable Integer limit) {
5862
this.conn = conn;
5963
this.tableInfo = tableInfo;
6064
this.projectedColumns = projectedColumns;
65+
this.partitionFilter = partitionFilter;
6166
this.limit = limit;
6267
}
6368

6469
@Override
6570
public Scan project(@Nullable int[] projectedColumns) {
66-
return new TableScan(conn, tableInfo, projectedColumns, limit);
71+
return new TableScan(conn, tableInfo, projectedColumns, partitionFilter, limit);
6772
}
6873

6974
@Override
@@ -78,12 +83,21 @@ public Scan project(List<String> projectedColumnNames) {
7883
}
7984
columnIndexes[i] = index;
8085
}
81-
return new TableScan(conn, tableInfo, columnIndexes, limit);
86+
return new TableScan(conn, tableInfo, columnIndexes, partitionFilter, limit);
87+
}
88+
89+
@Override
90+
public Scan filter(PartitionFilter partitionFilter) {
91+
if (!tableInfo.isPartitioned()) {
92+
throw new UnsupportedOperationException(
93+
"Partition filter is only supported for partitioned tables.");
94+
}
95+
return new TableScan(conn, tableInfo, projectedColumns, partitionFilter, limit);
8296
}
8397

8498
@Override
8599
public Scan limit(int rowNumber) {
86-
return new TableScan(conn, tableInfo, projectedColumns, rowNumber);
100+
return new TableScan(conn, tableInfo, projectedColumns, partitionFilter, rowNumber);
87101
}
88102

89103
@Override
@@ -103,8 +117,13 @@ public LogScanner createLogScanner() {
103117
@Override
104118
public BatchScanner createBatchScanner(TableBucket tableBucket) {
105119
if (limit == null) {
106-
throw new UnsupportedOperationException(
107-
"Currently, BatchScanner is only available when limit is set.");
120+
if (!tableInfo.hasPrimaryKey()) {
121+
throw new UnsupportedOperationException(
122+
"Full scan BatchScanner is only supported for primary key tables.");
123+
}
124+
// Full scan semantics: one-shot snapshot of current values across all buckets.
125+
return new DefaultBatchScanner(
126+
tableInfo, conn.getMetadataUpdater(), projectedColumns, partitionFilter);
108127
}
109128
return new LimitBatchScanner(
110129
tableInfo, tableBucket, conn.getMetadataUpdater(), projectedColumns, limit);
Lines changed: 217 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,217 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.client.table.scanner.batch;
19+
20+
import org.apache.fluss.client.metadata.MetadataUpdater;
21+
import org.apache.fluss.client.table.scanner.PartitionFilter;
22+
import org.apache.fluss.exception.LeaderNotAvailableException;
23+
import org.apache.fluss.exception.TableNotPartitionedException;
24+
import org.apache.fluss.metadata.PhysicalTablePath;
25+
import org.apache.fluss.metadata.TableBucket;
26+
import org.apache.fluss.metadata.TableInfo;
27+
import org.apache.fluss.metadata.TablePath;
28+
import org.apache.fluss.record.DefaultValueRecordBatch;
29+
import org.apache.fluss.record.ValueRecord;
30+
import org.apache.fluss.record.ValueRecordReadContext;
31+
import org.apache.fluss.row.GenericRow;
32+
import org.apache.fluss.row.InternalRow;
33+
import org.apache.fluss.row.ProjectedRow;
34+
import org.apache.fluss.row.decode.RowDecoder;
35+
import org.apache.fluss.row.encode.ValueDecoder;
36+
import org.apache.fluss.rpc.gateway.TabletServerGateway;
37+
import org.apache.fluss.rpc.messages.FullScanRequest;
38+
import org.apache.fluss.rpc.messages.FullScanResponse;
39+
import org.apache.fluss.rpc.protocol.Errors;
40+
import org.apache.fluss.types.DataType;
41+
import org.apache.fluss.types.RowType;
42+
import org.apache.fluss.utils.CloseableIterator;
43+
44+
import javax.annotation.Nullable;
45+
46+
import java.io.IOException;
47+
import java.nio.ByteBuffer;
48+
import java.time.Duration;
49+
import java.util.ArrayList;
50+
import java.util.HashSet;
51+
import java.util.List;
52+
import java.util.Objects;
53+
import java.util.concurrent.CompletableFuture;
54+
import java.util.concurrent.TimeUnit;
55+
import java.util.concurrent.TimeoutException;
56+
57+
/**
58+
* Default implementation of {@link BatchScanner} that performs a full scan against tablet servers.
59+
*
60+
* <p>This scanner issues FULL_SCAN RPCs to the leaders of all buckets and aggregates the results.
61+
* It returns all current values at a point in time for primary-key tables. The first call to
62+
* {@link #pollBatch(Duration)} returns the complete snapshot; subsequent calls return {@code null}.
63+
*
64+
* <p>Note: For partitioned tables, callers may provide a {@link PartitionFilter} with a partition
65+
* name to restrict the scan to a single partition.
66+
*/
67+
public class DefaultBatchScanner implements BatchScanner {
68+
69+
private final TableInfo tableInfo;
70+
private final MetadataUpdater metadataUpdater;
71+
@Nullable private final int[] projectedFields;
72+
@Nullable private final PartitionFilter partitionFilter;
73+
74+
private final InternalRow.FieldGetter[] fieldGetters;
75+
private final ValueDecoder kvValueDecoder;
76+
77+
private boolean endOfInput = false;
78+
79+
public DefaultBatchScanner(
80+
TableInfo tableInfo,
81+
MetadataUpdater metadataUpdater,
82+
@Nullable int[] projectedFields,
83+
@Nullable PartitionFilter partitionFilter) {
84+
this.tableInfo = Objects.requireNonNull(tableInfo, "tableInfo");
85+
this.metadataUpdater = Objects.requireNonNull(metadataUpdater, "metadataUpdater");
86+
this.projectedFields = projectedFields;
87+
this.partitionFilter = partitionFilter;
88+
89+
RowType rowType = tableInfo.getRowType();
90+
this.fieldGetters = new InternalRow.FieldGetter[rowType.getFieldCount()];
91+
for (int i = 0; i < rowType.getFieldCount(); i++) {
92+
this.fieldGetters[i] = InternalRow.createFieldGetter(rowType.getTypeAt(i), i);
93+
}
94+
this.kvValueDecoder =
95+
new ValueDecoder(
96+
RowDecoder.create(
97+
tableInfo.getTableConfig().getKvFormat(),
98+
rowType.getChildren().toArray(new DataType[0])));
99+
}
100+
101+
@Nullable
102+
@Override
103+
public CloseableIterator<InternalRow> pollBatch(Duration timeout) throws IOException {
104+
if (endOfInput) {
105+
return null;
106+
}
107+
try {
108+
List<CompletableFuture<FullScanResponse>> futures = issueFullScanRequests();
109+
// wait for all responses or timeout for this poll
110+
CompletableFuture<Void> all =
111+
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
112+
all.get(timeout.toMillis(), TimeUnit.MILLISECONDS);
113+
List<InternalRow> rows = decodeFullScanResponses(futures);
114+
endOfInput = true;
115+
return CloseableIterator.wrap(rows.iterator());
116+
} catch (TimeoutException e) {
117+
// try again in next poll
118+
return CloseableIterator.emptyIterator();
119+
} catch (Exception e) {
120+
throw new IOException(e);
121+
}
122+
}
123+
124+
private List<CompletableFuture<FullScanResponse>> issueFullScanRequests() {
125+
Long partitionId = null;
126+
if (tableInfo.isPartitioned()) {
127+
if (partitionFilter == null || partitionFilter.getPartitionName() == null) {
128+
throw new TableNotPartitionedException(
129+
"Partition filter is required for partitioned table full scan.");
130+
}
131+
TablePath tablePath = tableInfo.getTablePath();
132+
PhysicalTablePath physicalTablePath =
133+
PhysicalTablePath.of(tablePath, partitionFilter.getPartitionName());
134+
metadataUpdater.checkAndUpdatePartitionMetadata(physicalTablePath);
135+
partitionId =
136+
metadataUpdater
137+
.getPartitionId(physicalTablePath)
138+
.orElseThrow(
139+
() ->
140+
new IllegalStateException(
141+
"Partition id not found for "
142+
+ partitionFilter.getPartitionName()));
143+
}
144+
145+
long tableId = tableInfo.getTableId();
146+
int numBuckets = tableInfo.getNumBuckets();
147+
148+
// collect leaders for all buckets
149+
HashSet<Integer> leaderServers = new HashSet<>();
150+
for (int bucketId = 0; bucketId < numBuckets; bucketId++) {
151+
TableBucket tableBucket = new TableBucket(tableId, partitionId, bucketId);
152+
metadataUpdater.checkAndUpdateMetadata(tableInfo.getTablePath(), tableBucket);
153+
int leader = metadataUpdater.leaderFor(tableBucket);
154+
leaderServers.add(leader);
155+
}
156+
157+
List<CompletableFuture<FullScanResponse>> responseFutures = new ArrayList<>();
158+
for (int leader : leaderServers) {
159+
TabletServerGateway gateway = metadataUpdater.newTabletServerClientForNode(leader);
160+
if (gateway == null) {
161+
throw new LeaderNotAvailableException(
162+
"Server " + leader + " is not found in metadata cache.");
163+
}
164+
FullScanRequest request = new FullScanRequest().setTableId(tableId);
165+
if (partitionId != null) {
166+
request.setPartitionId(partitionId);
167+
}
168+
// Future-proof: optionally pass bucket list when supported by server
169+
// request.setBucketId(Collections.singletonList(bucketId));
170+
responseFutures.add(gateway.fullScan(request));
171+
}
172+
return responseFutures;
173+
}
174+
175+
private List<InternalRow> decodeFullScanResponses(
176+
List<CompletableFuture<FullScanResponse>> responseFutures) {
177+
List<InternalRow> out = new ArrayList<>();
178+
for (CompletableFuture<FullScanResponse> responseFuture : responseFutures) {
179+
FullScanResponse response = responseFuture.join();
180+
if (response.hasErrorCode() && response.getErrorCode() != Errors.NONE.code()) {
181+
Errors err = Errors.forCode(response.getErrorCode());
182+
throw err.exception(
183+
response.hasErrorMessage() ? response.getErrorMessage() : err.message());
184+
}
185+
if (response.hasRecords()) {
186+
ByteBuffer buffer = ByteBuffer.wrap(response.getRecords());
187+
DefaultValueRecordBatch values = DefaultValueRecordBatch.pointToByteBuffer(buffer);
188+
ValueRecordReadContext context =
189+
new ValueRecordReadContext(kvValueDecoder.getRowDecoder());
190+
for (ValueRecord record : values.records(context)) {
191+
out.add(maybeProject(record.getRow()));
192+
}
193+
}
194+
}
195+
return out;
196+
}
197+
198+
private InternalRow maybeProject(InternalRow originRow) {
199+
// deep copy and project if requested to avoid referencing released buffers
200+
GenericRow newRow = new GenericRow(fieldGetters.length);
201+
for (int i = 0; i < fieldGetters.length; i++) {
202+
newRow.setField(i, fieldGetters[i].getFieldOrNull(originRow));
203+
}
204+
if (projectedFields != null) {
205+
ProjectedRow projectedRow = ProjectedRow.from(projectedFields);
206+
projectedRow.replaceRow(newRow);
207+
return projectedRow;
208+
} else {
209+
return newRow;
210+
}
211+
}
212+
213+
@Override
214+
public void close() {
215+
// no-op
216+
}
217+
}

0 commit comments

Comments
 (0)