Skip to content

Commit 883cf3c

Browse files
filter
1 parent 4ccf4b1 commit 883cf3c

File tree

48 files changed

+1396
-53
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

48 files changed

+1396
-53
lines changed

.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,3 +42,6 @@ website/versioned_docs
4242
website/versioned_sidebars
4343
website/versions.json
4444
website/pnpm-lock.yaml
45+
46+
# jenv
47+
.java-version

fluss-client/src/main/java/com/alibaba/fluss/client/table/scanner/Scan.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import com.alibaba.fluss.client.table.scanner.batch.BatchScanner;
2222
import com.alibaba.fluss.client.table.scanner.log.LogScanner;
2323
import com.alibaba.fluss.metadata.TableBucket;
24+
import com.alibaba.fluss.predicate.Predicate;
2425

2526
import javax.annotation.Nullable;
2627

@@ -65,6 +66,13 @@ public interface Scan {
6566
*/
6667
LogScanner createLogScanner();
6768

69+
/**
70+
* Creates a {@link LogScanner} to continuously read log data for this scan with filter.
71+
*
72+
* <p>Note: this API doesn't support pre-configured with {@link #limit(int)}.
73+
*/
74+
LogScanner createLogScanner(Predicate recordBatchFilter);
75+
6876
/**
6977
* Creates a {@link BatchScanner} to read current data in the given table bucket for this scan.
7078
*

fluss-client/src/main/java/com/alibaba/fluss/client/table/scanner/TableScan.java

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import com.alibaba.fluss.exception.FlussRuntimeException;
3030
import com.alibaba.fluss.metadata.TableBucket;
3131
import com.alibaba.fluss.metadata.TableInfo;
32+
import com.alibaba.fluss.predicate.Predicate;
3233
import com.alibaba.fluss.types.RowType;
3334

3435
import javax.annotation.Nullable;
@@ -43,6 +44,7 @@ public class TableScan implements Scan {
4344

4445
/** The projected fields to do projection. No projection if is null. */
4546
@Nullable private final int[] projectedColumns;
47+
4648
/** The limited row number to read. No limit if is null. */
4749
@Nullable private final Integer limit;
4850

@@ -98,7 +100,24 @@ public LogScanner createLogScanner() {
98100
conn.getMetadataUpdater(),
99101
conn.getClientMetricGroup(),
100102
conn.getOrCreateRemoteFileDownloader(),
101-
projectedColumns);
103+
projectedColumns,
104+
null);
105+
}
106+
107+
@Override
108+
public LogScanner createLogScanner(Predicate recordBatchFilter) {
109+
if (limit != null) {
110+
throw new UnsupportedOperationException("LogScanner doesn't support limit pushdown.");
111+
}
112+
return new LogScannerImpl(
113+
conn.getConfiguration(),
114+
tableInfo,
115+
conn.getRpcClient(),
116+
conn.getMetadataUpdater(),
117+
conn.getClientMetricGroup(),
118+
conn.getOrCreateRemoteFileDownloader(),
119+
projectedColumns,
120+
recordBatchFilter);
102121
}
103122

104123
@Override

fluss-client/src/main/java/com/alibaba/fluss/client/table/scanner/log/LogFetcher.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import com.alibaba.fluss.metadata.TableInfo;
3636
import com.alibaba.fluss.metadata.TablePartition;
3737
import com.alibaba.fluss.metadata.TablePath;
38+
import com.alibaba.fluss.predicate.Predicate;
3839
import com.alibaba.fluss.record.LogRecordReadContext;
3940
import com.alibaba.fluss.record.LogRecords;
4041
import com.alibaba.fluss.record.MemoryLogRecords;
@@ -51,6 +52,7 @@
5152
import com.alibaba.fluss.rpc.messages.PbFetchLogRespForBucket;
5253
import com.alibaba.fluss.rpc.messages.PbFetchLogRespForTable;
5354
import com.alibaba.fluss.rpc.protocol.Errors;
55+
import com.alibaba.fluss.rpc.util.PredicateMessageUtils;
5456
import com.alibaba.fluss.utils.IOUtils;
5557
import com.alibaba.fluss.utils.Projection;
5658

@@ -91,6 +93,7 @@ public class LogFetcher implements Closeable {
9193
// bytes from remote file.
9294
private final LogRecordReadContext remoteReadContext;
9395
@Nullable private final Projection projection;
96+
@Nullable private final Predicate recordBatchFilter;
9497
private final RpcClient rpcClient;
9598
private final int maxFetchBytes;
9699
private final int maxBucketFetchBytes;
@@ -114,6 +117,7 @@ public class LogFetcher implements Closeable {
114117
public LogFetcher(
115118
TableInfo tableInfo,
116119
@Nullable Projection projection,
120+
@Nullable Predicate recordBatchFilter,
117121
RpcClient rpcClient,
118122
LogScannerStatus logScannerStatus,
119123
Configuration conf,
@@ -126,6 +130,7 @@ public LogFetcher(
126130
this.remoteReadContext =
127131
LogRecordReadContext.createReadContext(tableInfo, true, projection);
128132
this.projection = projection;
133+
this.recordBatchFilter = recordBatchFilter;
129134
this.rpcClient = rpcClient;
130135
this.logScannerStatus = logScannerStatus;
131136
this.maxFetchBytes =
@@ -461,6 +466,10 @@ private Map<Integer, FetchLogRequest> prepareFetchLogRequests() {
461466
} else {
462467
reqForTable.setProjectionPushdownEnabled(false);
463468
}
469+
if (null != recordBatchFilter) {
470+
reqForTable.setRecordBatchFilter(
471+
PredicateMessageUtils.toPbPredicate(recordBatchFilter));
472+
}
464473
reqForTable.addAllBucketsReqs(reqForBuckets);
465474
fetchLogRequest.addAllTablesReqs(Collections.singletonList(reqForTable));
466475
fetchLogRequests.put(leaderId, fetchLogRequest);

fluss-client/src/main/java/com/alibaba/fluss/client/table/scanner/log/LogScannerImpl.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import com.alibaba.fluss.metadata.TableBucket;
2828
import com.alibaba.fluss.metadata.TableInfo;
2929
import com.alibaba.fluss.metadata.TablePath;
30+
import com.alibaba.fluss.predicate.Predicate;
3031
import com.alibaba.fluss.rpc.RpcClient;
3132
import com.alibaba.fluss.rpc.metrics.ClientMetricGroup;
3233
import com.alibaba.fluss.types.RowType;
@@ -84,7 +85,8 @@ public LogScannerImpl(
8485
MetadataUpdater metadataUpdater,
8586
ClientMetricGroup clientMetricGroup,
8687
RemoteFileDownloader remoteFileDownloader,
87-
@Nullable int[] projectedFields) {
88+
@Nullable int[] projectedFields,
89+
@Nullable Predicate recordBatchFilter) {
8890
this.tablePath = tableInfo.getTablePath();
8991
this.tableId = tableInfo.getTableId();
9092
this.isPartitionedTable = tableInfo.isPartitioned();
@@ -98,6 +100,7 @@ public LogScannerImpl(
98100
new LogFetcher(
99101
tableInfo,
100102
projection,
103+
recordBatchFilter,
101104
rpcClient,
102105
logScannerStatus,
103106
conf,

fluss-client/src/test/java/com/alibaba/fluss/client/table/scanner/log/LogFetcherTest.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ protected void setup() throws Exception {
8181
new LogFetcher(
8282
DATA1_TABLE_INFO,
8383
null,
84+
null,
8485
rpcClient,
8586
logScannerStatus,
8687
clientConf,
@@ -158,6 +159,7 @@ void testFetchWhenDestinationIsNullInMetadata() throws Exception {
158159
new LogFetcher(
159160
DATA1_TABLE_INFO,
160161
null,
162+
null,
161163
rpcClient,
162164
logScannerStatus,
163165
clientConf,

fluss-common/src/main/java/com/alibaba/fluss/predicate/And.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package com.alibaba.fluss.predicate;
1919

20+
import com.alibaba.fluss.row.InternalArray;
2021
import com.alibaba.fluss.row.InternalRow;
2122

2223
import java.util.ArrayList;
@@ -46,6 +47,21 @@ public boolean test(InternalRow row, List<Predicate> children) {
4647
return true;
4748
}
4849

50+
@Override
51+
public boolean test(
52+
long rowCount,
53+
InternalRow minValues,
54+
InternalRow maxValues,
55+
InternalArray nullCounts,
56+
List<Predicate> children) {
57+
for (Predicate child : children) {
58+
if (!child.test(rowCount, minValues, maxValues, nullCounts)) {
59+
return false;
60+
}
61+
}
62+
return true;
63+
}
64+
4965
@Override
5066
public Optional<Predicate> negate(List<Predicate> children) {
5167
List<Predicate> negatedChildren = new ArrayList<>();

fluss-common/src/main/java/com/alibaba/fluss/predicate/CompoundPredicate.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package com.alibaba.fluss.predicate;
1919

20+
import com.alibaba.fluss.row.InternalArray;
2021
import com.alibaba.fluss.row.InternalRow;
2122

2223
import java.io.Serializable;
@@ -55,6 +56,12 @@ public boolean test(InternalRow row) {
5556
return function.test(row, children);
5657
}
5758

59+
@Override
60+
public boolean test(
61+
long rowCount, InternalRow minValues, InternalRow maxValues, InternalArray nullCounts) {
62+
return function.test(rowCount, minValues, maxValues, nullCounts, children);
63+
}
64+
5865
@Override
5966
public Optional<Predicate> negate() {
6067
return function.negate(children);
@@ -89,6 +96,13 @@ public abstract static class Function implements Serializable {
8996

9097
public abstract boolean test(InternalRow row, List<Predicate> children);
9198

99+
public abstract boolean test(
100+
long rowCount,
101+
InternalRow minValues,
102+
InternalRow maxValues,
103+
InternalArray nullCounts,
104+
List<Predicate> children);
105+
92106
public abstract Optional<Predicate> negate(List<Predicate> children);
93107

94108
public abstract <T> T visit(FunctionVisitor<T> visitor, List<T> children);

fluss-common/src/main/java/com/alibaba/fluss/predicate/LeafPredicate.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package com.alibaba.fluss.predicate;
1919

20+
import com.alibaba.fluss.row.InternalArray;
2021
import com.alibaba.fluss.row.InternalRow;
2122
import com.alibaba.fluss.types.DataType;
2223
import com.alibaba.fluss.types.DecimalType;
@@ -89,6 +90,23 @@ public boolean test(InternalRow row) {
8990
return function.test(type, get(row, fieldIndex, type), literals);
9091
}
9192

93+
@Override
94+
public boolean test(
95+
long rowCount, InternalRow minValues, InternalRow maxValues, InternalArray nullCounts) {
96+
Object min = get(minValues, fieldIndex, type);
97+
Object max = get(maxValues, fieldIndex, type);
98+
Long nullCount = nullCounts.isNullAt(fieldIndex) ? null : nullCounts.getLong(fieldIndex);
99+
if (nullCount == null || rowCount != nullCount) {
100+
// not all null
101+
// min or max is null
102+
// unknown stats
103+
if (min == null || max == null) {
104+
return true;
105+
}
106+
}
107+
return function.test(type, rowCount, min, max, nullCount, literals);
108+
}
109+
92110
@Override
93111
public Optional<Predicate> negate() {
94112
return function.negate()

fluss-common/src/main/java/com/alibaba/fluss/predicate/Or.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package com.alibaba.fluss.predicate;
1919

20+
import com.alibaba.fluss.row.InternalArray;
2021
import com.alibaba.fluss.row.InternalRow;
2122

2223
import java.util.ArrayList;
@@ -46,6 +47,21 @@ public boolean test(InternalRow row, List<Predicate> children) {
4647
return false;
4748
}
4849

50+
@Override
51+
public boolean test(
52+
long rowCount,
53+
InternalRow minValues,
54+
InternalRow maxValues,
55+
InternalArray nullCounts,
56+
List<Predicate> children) {
57+
for (Predicate child : children) {
58+
if (child.test(rowCount, minValues, maxValues, nullCounts)) {
59+
return true;
60+
}
61+
}
62+
return false;
63+
}
64+
4965
@Override
5066
public Optional<Predicate> negate(List<Predicate> children) {
5167
List<Predicate> negatedChildren = new ArrayList<>();

0 commit comments

Comments
 (0)