Skip to content

Commit 4bad5b3

Browse files
log record batch filter push down
1 parent d587024 commit 4bad5b3

File tree

58 files changed

+6502
-68
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

58 files changed

+6502
-68
lines changed

.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,3 +42,6 @@ website/versioned_docs
4242
website/versioned_sidebars
4343
website/versions.json
4444
website/pnpm-lock.yaml
45+
46+
# jenv
47+
.java-version

fluss-client/src/main/java/com/alibaba/fluss/client/table/scanner/Scan.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import com.alibaba.fluss.client.table.scanner.batch.BatchScanner;
2222
import com.alibaba.fluss.client.table.scanner.log.LogScanner;
2323
import com.alibaba.fluss.metadata.TableBucket;
24+
import com.alibaba.fluss.predicate.Predicate;
2425

2526
import javax.annotation.Nullable;
2627

@@ -65,6 +66,13 @@ public interface Scan {
6566
*/
6667
LogScanner createLogScanner();
6768

69+
/**
70+
* Creates a {@link LogScanner} to continuously read log data for this scan with filter.
71+
*
72+
* <p>Note: this API doesn't support pre-configured with {@link #limit(int)}.
73+
*/
74+
LogScanner createLogScanner(Predicate recordBatchFilter);
75+
6876
/**
6977
* Creates a {@link BatchScanner} to read current data in the given table bucket for this scan.
7078
*

fluss-client/src/main/java/com/alibaba/fluss/client/table/scanner/TableScan.java

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import com.alibaba.fluss.exception.FlussRuntimeException;
3030
import com.alibaba.fluss.metadata.TableBucket;
3131
import com.alibaba.fluss.metadata.TableInfo;
32+
import com.alibaba.fluss.predicate.Predicate;
3233
import com.alibaba.fluss.types.RowType;
3334

3435
import javax.annotation.Nullable;
@@ -43,6 +44,7 @@ public class TableScan implements Scan {
4344

4445
/** The projected fields to do projection. No projection if is null. */
4546
@Nullable private final int[] projectedColumns;
47+
4648
/** The limited row number to read. No limit if is null. */
4749
@Nullable private final Integer limit;
4850

@@ -98,7 +100,24 @@ public LogScanner createLogScanner() {
98100
conn.getMetadataUpdater(),
99101
conn.getClientMetricGroup(),
100102
conn.getOrCreateRemoteFileDownloader(),
101-
projectedColumns);
103+
projectedColumns,
104+
null);
105+
}
106+
107+
@Override
108+
public LogScanner createLogScanner(Predicate recordBatchFilter) {
109+
if (limit != null) {
110+
throw new UnsupportedOperationException("LogScanner doesn't support limit pushdown.");
111+
}
112+
return new LogScannerImpl(
113+
conn.getConfiguration(),
114+
tableInfo,
115+
conn.getRpcClient(),
116+
conn.getMetadataUpdater(),
117+
conn.getClientMetricGroup(),
118+
conn.getOrCreateRemoteFileDownloader(),
119+
projectedColumns,
120+
recordBatchFilter);
102121
}
103122

104123
@Override

fluss-client/src/main/java/com/alibaba/fluss/client/table/scanner/log/LogFetcher.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import com.alibaba.fluss.metadata.TableInfo;
3636
import com.alibaba.fluss.metadata.TablePartition;
3737
import com.alibaba.fluss.metadata.TablePath;
38+
import com.alibaba.fluss.predicate.Predicate;
3839
import com.alibaba.fluss.record.LogRecordReadContext;
3940
import com.alibaba.fluss.record.LogRecords;
4041
import com.alibaba.fluss.record.MemoryLogRecords;
@@ -51,6 +52,7 @@
5152
import com.alibaba.fluss.rpc.messages.PbFetchLogRespForBucket;
5253
import com.alibaba.fluss.rpc.messages.PbFetchLogRespForTable;
5354
import com.alibaba.fluss.rpc.protocol.Errors;
55+
import com.alibaba.fluss.rpc.util.PredicateMessageUtils;
5456
import com.alibaba.fluss.utils.IOUtils;
5557
import com.alibaba.fluss.utils.Projection;
5658

@@ -91,6 +93,7 @@ public class LogFetcher implements Closeable {
9193
// bytes from remote file.
9294
private final LogRecordReadContext remoteReadContext;
9395
@Nullable private final Projection projection;
96+
@Nullable private final Predicate recordBatchFilter;
9497
private final RpcClient rpcClient;
9598
private final int maxFetchBytes;
9699
private final int maxBucketFetchBytes;
@@ -114,6 +117,7 @@ public class LogFetcher implements Closeable {
114117
public LogFetcher(
115118
TableInfo tableInfo,
116119
@Nullable Projection projection,
120+
@Nullable Predicate recordBatchFilter,
117121
RpcClient rpcClient,
118122
LogScannerStatus logScannerStatus,
119123
Configuration conf,
@@ -126,6 +130,7 @@ public LogFetcher(
126130
this.remoteReadContext =
127131
LogRecordReadContext.createReadContext(tableInfo, true, projection);
128132
this.projection = projection;
133+
this.recordBatchFilter = recordBatchFilter;
129134
this.rpcClient = rpcClient;
130135
this.logScannerStatus = logScannerStatus;
131136
this.maxFetchBytes =
@@ -461,6 +466,10 @@ private Map<Integer, FetchLogRequest> prepareFetchLogRequests() {
461466
} else {
462467
reqForTable.setProjectionPushdownEnabled(false);
463468
}
469+
if (null != recordBatchFilter) {
470+
reqForTable.setRecordBatchFilter(
471+
PredicateMessageUtils.toPbPredicate(recordBatchFilter));
472+
}
464473
reqForTable.addAllBucketsReqs(reqForBuckets);
465474
fetchLogRequest.addAllTablesReqs(Collections.singletonList(reqForTable));
466475
fetchLogRequests.put(leaderId, fetchLogRequest);

fluss-client/src/main/java/com/alibaba/fluss/client/table/scanner/log/LogScannerImpl.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import com.alibaba.fluss.metadata.TableBucket;
2828
import com.alibaba.fluss.metadata.TableInfo;
2929
import com.alibaba.fluss.metadata.TablePath;
30+
import com.alibaba.fluss.predicate.Predicate;
3031
import com.alibaba.fluss.rpc.RpcClient;
3132
import com.alibaba.fluss.rpc.metrics.ClientMetricGroup;
3233
import com.alibaba.fluss.types.RowType;
@@ -84,7 +85,8 @@ public LogScannerImpl(
8485
MetadataUpdater metadataUpdater,
8586
ClientMetricGroup clientMetricGroup,
8687
RemoteFileDownloader remoteFileDownloader,
87-
@Nullable int[] projectedFields) {
88+
@Nullable int[] projectedFields,
89+
@Nullable Predicate recordBatchFilter) {
8890
this.tablePath = tableInfo.getTablePath();
8991
this.tableId = tableInfo.getTableId();
9092
this.isPartitionedTable = tableInfo.isPartitioned();
@@ -98,6 +100,7 @@ public LogScannerImpl(
98100
new LogFetcher(
99101
tableInfo,
100102
projection,
103+
recordBatchFilter,
101104
rpcClient,
102105
logScannerStatus,
103106
conf,

fluss-client/src/main/resources/META-INF/NOTICE

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ This project bundles the following dependencies under the Apache Software Licens
1010
- com.ververica:frocksdbjni:6.20.3-ververica-2.0
1111
- org.apache.commons:commons-lang3:3.12.0
1212
- org.apache.commons:commons-math3:3.6.1
13+
- org.eclipse.collections:eclipse-collections-api:11.1.0
14+
- org.eclipse.collections:eclipse-collections:11.1.0
1315
- org.lz4:lz4-java:1.8.0
1416

1517
This project bundles the following dependencies under the MIT (https://opensource.org/licenses/MIT)

fluss-client/src/test/java/com/alibaba/fluss/client/table/scanner/log/LogFetcherTest.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ protected void setup() throws Exception {
8181
new LogFetcher(
8282
DATA1_TABLE_INFO,
8383
null,
84+
null,
8485
rpcClient,
8586
logScannerStatus,
8687
clientConf,
@@ -158,6 +159,7 @@ void testFetchWhenDestinationIsNullInMetadata() throws Exception {
158159
new LogFetcher(
159160
DATA1_TABLE_INFO,
160161
null,
162+
null,
161163
rpcClient,
162164
logScannerStatus,
163165
clientConf,

fluss-common/src/main/java/com/alibaba/fluss/record/DefaultLogRecordBatch.java

Lines changed: 100 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,13 +33,13 @@
3333

3434
import java.nio.ByteBuffer;
3535
import java.util.NoSuchElementException;
36+
import java.util.Optional;
3637

3738
import static com.alibaba.fluss.record.LogRecordBatchFormat.BASE_OFFSET_OFFSET;
3839
import static com.alibaba.fluss.record.LogRecordBatchFormat.LENGTH_OFFSET;
3940
import static com.alibaba.fluss.record.LogRecordBatchFormat.LOG_OVERHEAD;
4041
import static com.alibaba.fluss.record.LogRecordBatchFormat.MAGIC_OFFSET;
4142
import static com.alibaba.fluss.record.LogRecordBatchFormat.NO_LEADER_EPOCH;
42-
import static com.alibaba.fluss.record.LogRecordBatchFormat.arrowChangeTypeOffset;
4343
import static com.alibaba.fluss.record.LogRecordBatchFormat.attributeOffset;
4444
import static com.alibaba.fluss.record.LogRecordBatchFormat.batchSequenceOffset;
4545
import static com.alibaba.fluss.record.LogRecordBatchFormat.commitTimestampOffset;
@@ -282,7 +282,7 @@ private CloseableIterator<LogRecord> columnRecordIterator(
282282
if (isAppendOnly) {
283283
// append only batch, no change type vector,
284284
// the start of the arrow data is the beginning of the batch records
285-
int recordBatchHeaderSize = recordBatchHeaderSize(magic);
285+
int recordBatchHeaderSize = getActualRecordBatchHeaderSize(magic);
286286
int arrowOffset = position + recordBatchHeaderSize;
287287
int arrowLength = sizeInBytes() - recordBatchHeaderSize;
288288
ArrowReader reader =
@@ -297,12 +297,14 @@ protected ChangeType getChangeType(int rowId) {
297297
} else {
298298
// with change type, decode the change type vector first,
299299
// the arrow data starts after the change type vector
300-
int changeTypeOffset = position + arrowChangeTypeOffset(magic);
300+
int changeTypeOffset = position + getActualArrowChangeTypeOffset(magic);
301301
ChangeTypeVector changeTypeVector =
302302
new ChangeTypeVector(segment, changeTypeOffset, getRecordCount());
303303
int arrowOffset = changeTypeOffset + changeTypeVector.sizeInBytes();
304304
int arrowLength =
305-
sizeInBytes() - arrowChangeTypeOffset(magic) - changeTypeVector.sizeInBytes();
305+
sizeInBytes()
306+
- getActualArrowChangeTypeOffset(magic)
307+
- changeTypeVector.sizeInBytes();
306308
ArrowReader reader =
307309
ArrowUtils.createArrowReader(
308310
segment, arrowOffset, arrowLength, root, allocator, rowType);
@@ -410,4 +412,98 @@ public void remove() {
410412
throw new UnsupportedOperationException();
411413
}
412414
}
415+
416+
@Override
417+
public Optional<LogRecordBatchStatistics> getStatistics(ReadContext context) {
418+
if (context == null) {
419+
return Optional.empty();
420+
}
421+
422+
byte magic = magic();
423+
if (magic < LogRecordBatchFormat.LOG_MAGIC_VALUE_V2) {
424+
// Statistics are only available in V2 and later
425+
return Optional.empty();
426+
}
427+
428+
// Check if statistics flag is set
429+
byte attributes = attributes();
430+
if ((attributes & LogRecordBatchFormat.STATISTICS_FLAG_MASK) == 0) {
431+
// No statistics available
432+
return Optional.empty();
433+
}
434+
435+
try {
436+
// Get row type from context
437+
RowType rowType = context.getRowType(schemaId());
438+
if (rowType == null) {
439+
return Optional.empty();
440+
}
441+
442+
// Read statistics length
443+
int statisticsLength =
444+
segment.getInt(position + LogRecordBatchFormat.statisticsLengthOffset(magic));
445+
if (statisticsLength <= 0) {
446+
return Optional.empty();
447+
}
448+
449+
// Read statistics data
450+
int statisticsDataOffset = position + LogRecordBatchFormat.statisticsDataOffset(magic);
451+
byte[] statisticsData = new byte[statisticsLength];
452+
segment.get(statisticsDataOffset, statisticsData, 0, statisticsLength);
453+
454+
// Deserialize statistics using the row type from context
455+
LogRecordBatchStatistics statistics =
456+
LogRecordBatchStatisticsSerializer.deserialize(statisticsData, rowType);
457+
return Optional.ofNullable(statistics);
458+
} catch (Exception e) {
459+
// If reading or deserializing statistics fails, return empty
460+
return Optional.empty();
461+
}
462+
}
463+
464+
/** Get the actual record batch header size including statistics for V2 format. */
465+
private int getActualRecordBatchHeaderSize(byte magic) {
466+
if (magic < LogRecordBatchFormat.LOG_MAGIC_VALUE_V2) {
467+
return LogRecordBatchFormat.recordBatchHeaderSize(magic);
468+
}
469+
470+
// For V2, we need to add statistics length
471+
int statisticsLength = 0;
472+
byte attributes = attributes();
473+
if ((attributes & LogRecordBatchFormat.STATISTICS_FLAG_MASK) != 0) {
474+
try {
475+
statisticsLength =
476+
segment.getInt(
477+
position + LogRecordBatchFormat.statisticsLengthOffset(magic));
478+
} catch (Exception e) {
479+
// If reading statistics length fails, use 0
480+
statisticsLength = 0;
481+
}
482+
}
483+
484+
return LogRecordBatchFormat.recordBatchHeaderSizeWithStats(magic, statisticsLength);
485+
}
486+
487+
/** Get the actual arrow change type offset including statistics for V2 format. */
488+
private int getActualArrowChangeTypeOffset(byte magic) {
489+
if (magic < LogRecordBatchFormat.LOG_MAGIC_VALUE_V2) {
490+
return LogRecordBatchFormat.arrowChangeTypeOffset(magic);
491+
}
492+
493+
// For V2, we need to add statistics length
494+
int statisticsLength = 0;
495+
byte attributes = attributes();
496+
if ((attributes & LogRecordBatchFormat.STATISTICS_FLAG_MASK) != 0) {
497+
try {
498+
statisticsLength =
499+
segment.getInt(
500+
position + LogRecordBatchFormat.statisticsLengthOffset(magic));
501+
} catch (Exception e) {
502+
// If reading statistics length fails, use 0
503+
statisticsLength = 0;
504+
}
505+
}
506+
507+
return LogRecordBatchFormat.arrowChangeTypeOffsetWithStats(magic, statisticsLength);
508+
}
413509
}
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package com.alibaba.fluss.record;
19+
20+
import com.alibaba.fluss.row.InternalArray;
21+
import com.alibaba.fluss.row.InternalRow;
22+
23+
/** Default implementation of {@link LogRecordBatchStatistics}. */
24+
public class DefaultLogRecordBatchStatistics implements LogRecordBatchStatistics {
25+
26+
private final InternalRow minValues;
27+
private final InternalRow maxValues;
28+
private final InternalArray nullCounts;
29+
30+
public DefaultLogRecordBatchStatistics(
31+
InternalRow minValues, InternalRow maxValues, InternalArray nullCounts) {
32+
this.minValues = minValues;
33+
this.maxValues = maxValues;
34+
this.nullCounts = nullCounts;
35+
}
36+
37+
@Override
38+
public InternalRow getMinValues() {
39+
return minValues;
40+
}
41+
42+
@Override
43+
public InternalRow getMaxValues() {
44+
return maxValues;
45+
}
46+
47+
@Override
48+
public InternalArray getNullCounts() {
49+
return nullCounts;
50+
}
51+
52+
@Override
53+
public String toString() {
54+
return "DefaultLogRecordBatchStatistics{"
55+
+ ", minValues="
56+
+ minValues
57+
+ ", maxValues="
58+
+ maxValues
59+
+ ", nullCounts="
60+
+ nullCounts
61+
+ '}';
62+
}
63+
}

0 commit comments

Comments
 (0)