Skip to content

Commit 33152ea

Browse files
filter
1 parent 4ccf4b1 commit 33152ea

File tree

22 files changed

+483
-11
lines changed

22 files changed

+483
-11
lines changed

.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,3 +42,6 @@ website/versioned_docs
4242
website/versioned_sidebars
4343
website/versions.json
4444
website/pnpm-lock.yaml
45+
46+
# jenv
47+
.java-version

fluss-client/src/main/java/com/alibaba/fluss/client/table/scanner/Scan.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import com.alibaba.fluss.client.table.scanner.batch.BatchScanner;
2222
import com.alibaba.fluss.client.table.scanner.log.LogScanner;
2323
import com.alibaba.fluss.metadata.TableBucket;
24+
import com.alibaba.fluss.predicate.Predicate;
2425

2526
import javax.annotation.Nullable;
2627

@@ -51,6 +52,8 @@ public interface Scan {
5152
*/
5253
Scan project(List<String> projectedColumnNames);
5354

55+
Scan lossyFilter(Predicate loosyFilter);
56+
5457
/**
5558
* Returns a new scan from this that will read the given limited row number.
5659
*

fluss-client/src/main/java/com/alibaba/fluss/client/table/scanner/TableScan.java

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import com.alibaba.fluss.exception.FlussRuntimeException;
3030
import com.alibaba.fluss.metadata.TableBucket;
3131
import com.alibaba.fluss.metadata.TableInfo;
32+
import com.alibaba.fluss.predicate.Predicate;
3233
import com.alibaba.fluss.types.RowType;
3334

3435
import javax.annotation.Nullable;
@@ -43,27 +44,32 @@ public class TableScan implements Scan {
4344

4445
/** The projected fields to do projection. No projection if is null. */
4546
@Nullable private final int[] projectedColumns;
47+
48+
@Nullable private final Predicate loosyFilter;
49+
4650
/** The limited row number to read. No limit if is null. */
4751
@Nullable private final Integer limit;
4852

4953
public TableScan(FlussConnection conn, TableInfo tableInfo) {
50-
this(conn, tableInfo, null, null);
54+
this(conn, tableInfo, null, null, null);
5155
}
5256

5357
private TableScan(
5458
FlussConnection conn,
5559
TableInfo tableInfo,
5660
@Nullable int[] projectedColumns,
61+
@Nullable Predicate loosyFilter,
5762
@Nullable Integer limit) {
5863
this.conn = conn;
5964
this.tableInfo = tableInfo;
6065
this.projectedColumns = projectedColumns;
66+
this.loosyFilter = loosyFilter;
6167
this.limit = limit;
6268
}
6369

6470
@Override
6571
public Scan project(@Nullable int[] projectedColumns) {
66-
return new TableScan(conn, tableInfo, projectedColumns, limit);
72+
return new TableScan(conn, tableInfo, projectedColumns, loosyFilter, limit);
6773
}
6874

6975
@Override
@@ -78,12 +84,17 @@ public Scan project(List<String> projectedColumnNames) {
7884
}
7985
columnIndexes[i] = index;
8086
}
81-
return new TableScan(conn, tableInfo, columnIndexes, limit);
87+
return new TableScan(conn, tableInfo, columnIndexes, loosyFilter, limit);
88+
}
89+
90+
@Override
91+
public Scan lossyFilter(Predicate loosyFilter) {
92+
return new TableScan(conn, tableInfo, projectedColumns, loosyFilter, limit);
8293
}
8394

8495
@Override
8596
public Scan limit(int rowNumber) {
86-
return new TableScan(conn, tableInfo, projectedColumns, rowNumber);
97+
return new TableScan(conn, tableInfo, projectedColumns, loosyFilter, rowNumber);
8798
}
8899

89100
@Override
@@ -98,7 +109,8 @@ public LogScanner createLogScanner() {
98109
conn.getMetadataUpdater(),
99110
conn.getClientMetricGroup(),
100111
conn.getOrCreateRemoteFileDownloader(),
101-
projectedColumns);
112+
projectedColumns,
113+
loosyFilter);
102114
}
103115

104116
@Override

fluss-client/src/main/java/com/alibaba/fluss/client/table/scanner/log/LogFetcher.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import com.alibaba.fluss.metadata.TableInfo;
3636
import com.alibaba.fluss.metadata.TablePartition;
3737
import com.alibaba.fluss.metadata.TablePath;
38+
import com.alibaba.fluss.predicate.Predicate;
3839
import com.alibaba.fluss.record.LogRecordReadContext;
3940
import com.alibaba.fluss.record.LogRecords;
4041
import com.alibaba.fluss.record.MemoryLogRecords;
@@ -91,6 +92,7 @@ public class LogFetcher implements Closeable {
9192
// bytes from remote file.
9293
private final LogRecordReadContext remoteReadContext;
9394
@Nullable private final Projection projection;
95+
@Nullable private final Predicate loosyFilter;
9496
private final RpcClient rpcClient;
9597
private final int maxFetchBytes;
9698
private final int maxBucketFetchBytes;
@@ -114,6 +116,7 @@ public class LogFetcher implements Closeable {
114116
public LogFetcher(
115117
TableInfo tableInfo,
116118
@Nullable Projection projection,
119+
@Nullable Predicate loosyFilter,
117120
RpcClient rpcClient,
118121
LogScannerStatus logScannerStatus,
119122
Configuration conf,
@@ -126,6 +129,7 @@ public LogFetcher(
126129
this.remoteReadContext =
127130
LogRecordReadContext.createReadContext(tableInfo, true, projection);
128131
this.projection = projection;
132+
this.loosyFilter = loosyFilter;
129133
this.rpcClient = rpcClient;
130134
this.logScannerStatus = logScannerStatus;
131135
this.maxFetchBytes =

fluss-client/src/main/java/com/alibaba/fluss/client/table/scanner/log/LogScannerImpl.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import com.alibaba.fluss.metadata.TableBucket;
2828
import com.alibaba.fluss.metadata.TableInfo;
2929
import com.alibaba.fluss.metadata.TablePath;
30+
import com.alibaba.fluss.predicate.Predicate;
3031
import com.alibaba.fluss.rpc.RpcClient;
3132
import com.alibaba.fluss.rpc.metrics.ClientMetricGroup;
3233
import com.alibaba.fluss.types.RowType;
@@ -84,7 +85,8 @@ public LogScannerImpl(
8485
MetadataUpdater metadataUpdater,
8586
ClientMetricGroup clientMetricGroup,
8687
RemoteFileDownloader remoteFileDownloader,
87-
@Nullable int[] projectedFields) {
88+
@Nullable int[] projectedFields,
89+
@Nullable Predicate loosyFilter) {
8890
this.tablePath = tableInfo.getTablePath();
8991
this.tableId = tableInfo.getTableId();
9092
this.isPartitionedTable = tableInfo.isPartitioned();
@@ -98,6 +100,7 @@ public LogScannerImpl(
98100
new LogFetcher(
99101
tableInfo,
100102
projection,
103+
loosyFilter,
101104
rpcClient,
102105
logScannerStatus,
103106
conf,

fluss-client/src/test/java/com/alibaba/fluss/client/table/scanner/log/LogFetcherTest.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ protected void setup() throws Exception {
8181
new LogFetcher(
8282
DATA1_TABLE_INFO,
8383
null,
84+
null,
8485
rpcClient,
8586
logScannerStatus,
8687
clientConf,
@@ -158,6 +159,7 @@ void testFetchWhenDestinationIsNullInMetadata() throws Exception {
158159
new LogFetcher(
159160
DATA1_TABLE_INFO,
160161
null,
162+
null,
161163
rpcClient,
162164
logScannerStatus,
163165
clientConf,

fluss-common/src/main/java/com/alibaba/fluss/record/DefaultLogRecordBatch.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434

3535
import java.nio.ByteBuffer;
3636
import java.util.NoSuchElementException;
37+
import java.util.Optional;
3738

3839
/* This file is based on source code of Apache Kafka Project (https://kafka.apache.org/), licensed by the Apache
3940
* Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for
@@ -434,4 +435,9 @@ public void remove() {
434435
throw new UnsupportedOperationException();
435436
}
436437
}
438+
439+
@Override
440+
public Optional<LogRecordBatchStatistics> getStatistics() {
441+
return Optional.empty();
442+
}
437443
}

fluss-common/src/main/java/com/alibaba/fluss/record/FileLogInputStream.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import java.nio.ByteOrder;
2828
import java.nio.channels.FileChannel;
2929
import java.util.Objects;
30+
import java.util.Optional;
3031

3132
import static com.alibaba.fluss.record.DefaultLogRecordBatch.BASE_OFFSET_OFFSET;
3233
import static com.alibaba.fluss.record.DefaultLogRecordBatch.LENGTH_OFFSET;
@@ -271,5 +272,10 @@ public String toString() {
271272
+ batchSize
272273
+ ")";
273274
}
275+
276+
@Override
277+
public Optional<LogRecordBatchStatistics> getStatistics() {
278+
return Optional.empty();
279+
}
274280
}
275281
}

fluss-common/src/main/java/com/alibaba/fluss/record/LogRecordBatch.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import com.alibaba.fluss.utils.CloseableIterator;
2626

2727
import java.util.Iterator;
28+
import java.util.Optional;
2829

2930
/**
3031
* A record batch is a container for {@link LogRecord LogRecords}.
@@ -52,6 +53,13 @@ public interface LogRecordBatch {
5253
*/
5354
boolean isValid();
5455

56+
/**
57+
* Get the statistics of this record batch.
58+
*
59+
* @return
60+
*/
61+
Optional<LogRecordBatchStatistics> getStatistics();
62+
5563
/** Raise an exception if the checksum is not valid. */
5664
void ensureValid();
5765

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
package com.alibaba.fluss.record;
2+
3+
/** Statistics infomation of {@link LogRecordBatch LogRecordBatch}. */
4+
public class LogRecordBatchStatistics {}

0 commit comments

Comments
 (0)