Skip to content

Commit d235ceb

Browse files
committed
[core] Fix FileMonitorTable with Deletion Vectors Enabled
1 parent 241ac76 commit d235ceb

File tree

3 files changed

+52
-10
lines changed

3 files changed

+52
-10
lines changed

paimon-core/src/main/java/org/apache/paimon/mergetree/SortedRun.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,9 @@
1919
package org.apache.paimon.mergetree;
2020

2121
import org.apache.paimon.annotation.VisibleForTesting;
22+
import org.apache.paimon.data.BinaryRow;
2223
import org.apache.paimon.data.InternalRow;
2324
import org.apache.paimon.io.DataFileMeta;
24-
import org.apache.paimon.utils.Preconditions;
2525

2626
import java.util.Collections;
2727
import java.util.Comparator;
@@ -87,9 +87,13 @@ public long totalSize() {
8787
@VisibleForTesting
8888
public void validate(Comparator<InternalRow> comparator) {
8989
for (int i = 1; i < files.size(); i++) {
90-
Preconditions.checkState(
91-
comparator.compare(files.get(i).minKey(), files.get(i - 1).maxKey()) > 0,
92-
"SortedRun is not sorted and may contain overlapping key intervals. This is a bug.");
90+
BinaryRow minKey = files.get(i).minKey();
91+
BinaryRow maxKey = files.get(i - 1).maxKey();
92+
if (comparator.compare(minKey, maxKey) <= 0) {
93+
throw new IllegalArgumentException(
94+
"SortedRun is not sorted and may contain overlapping key intervals. "
95+
+ "This is a bug.");
96+
}
9397
}
9498
}
9599

paimon-core/src/main/java/org/apache/paimon/table/source/DataTableStreamScan.java

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.paimon.table.source;
2020

2121
import org.apache.paimon.CoreOptions;
22+
import org.apache.paimon.CoreOptions.StreamScanMode;
2223
import org.apache.paimon.Snapshot;
2324
import org.apache.paimon.consumer.Consumer;
2425
import org.apache.paimon.lookup.LookupStrategy;
@@ -48,13 +49,15 @@
4849
import java.util.List;
4950

5051
import static org.apache.paimon.CoreOptions.ChangelogProducer.FULL_COMPACTION;
52+
import static org.apache.paimon.CoreOptions.StreamScanMode.FILE_MONITOR;
5153

5254
/** {@link StreamTableScan} implementation for streaming planning. */
5355
public class DataTableStreamScan extends AbstractDataTableScan implements StreamDataTableScan {
5456

5557
private static final Logger LOG = LoggerFactory.getLogger(DataTableStreamScan.class);
5658

5759
private final CoreOptions options;
60+
private final StreamScanMode scanMode;
5861
private final SnapshotManager snapshotManager;
5962
private final boolean supportStreamingReadOverwrite;
6063
private final DefaultValueAssigner defaultValueAssigner;
@@ -80,6 +83,7 @@ public DataTableStreamScan(
8083
DefaultValueAssigner defaultValueAssigner) {
8184
super(options, snapshotReader);
8285
this.options = options;
86+
this.scanMode = options.toConfiguration().get(CoreOptions.STREAM_SCAN_MODE);
8387
this.snapshotManager = snapshotManager;
8488
this.supportStreamingReadOverwrite = supportStreamingReadOverwrite;
8589
this.defaultValueAssigner = defaultValueAssigner;
@@ -139,7 +143,9 @@ private void initScanner() {
139143

140144
private Plan tryFirstPlan() {
141145
StartingScanner.Result result;
142-
if (options.needLookup()) {
146+
if (scanMode == FILE_MONITOR) {
147+
result = startingScanner.scan(snapshotReader);
148+
} else if (options.needLookup()) {
143149
result = startingScanner.scan(snapshotReader.withLevelFilter(level -> level > 0));
144150
snapshotReader.withLevelFilter(Filter.alwaysTrue());
145151
} else if (options.changelogProducer().equals(FULL_COMPACTION)) {
@@ -157,7 +163,9 @@ private Plan tryFirstPlan() {
157163
currentWatermark = scannedResult.currentWatermark();
158164
long currentSnapshotId = scannedResult.currentSnapshotId();
159165
LookupStrategy lookupStrategy = options.lookupStrategy();
160-
if (!lookupStrategy.produceChangelog && lookupStrategy.deletionVector) {
166+
if (scanMode == FILE_MONITOR) {
167+
nextSnapshotId = currentSnapshotId + 1;
168+
} else if (!lookupStrategy.produceChangelog && lookupStrategy.deletionVector) {
161169
// For DELETION_VECTOR_ONLY mode, we need to return the remaining data from level 0
162170
// in the subsequent plan.
163171
nextSnapshotId = currentSnapshotId;
@@ -250,9 +258,7 @@ protected SnapshotReader.Plan handleOverwriteSnapshot(Snapshot snapshot) {
250258
}
251259

252260
protected FollowUpScanner createFollowUpScanner() {
253-
CoreOptions.StreamScanMode type =
254-
options.toConfiguration().get(CoreOptions.STREAM_SCAN_MODE);
255-
switch (type) {
261+
switch (scanMode) {
256262
case COMPACT_BUCKET_TABLE:
257263
return new DeltaFollowUpScanner();
258264
case FILE_MONITOR:

paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1674,7 +1674,7 @@ public void testFullCompactedRead() throws Exception {
16741674
}
16751675

16761676
@Test
1677-
public void testInnerStreamScanMode() throws Exception {
1677+
public void testFileMonitorTableScan() throws Exception {
16781678
FileStoreTable table = createFileStoreTable();
16791679

16801680
FileMonitorTable monitorTable = new FileMonitorTable(table);
@@ -1751,6 +1751,38 @@ public void testInnerStreamScanMode() throws Exception {
17511751
commit.close();
17521752
}
17531753

1754+
@Test
1755+
public void testFileMonitorTableScanWithDv() throws Exception {
1756+
FileStoreTable table =
1757+
createFileStoreTable(options -> options.set(DELETION_VECTORS_ENABLED, true));
1758+
1759+
FileMonitorTable monitorTable = new FileMonitorTable(table);
1760+
ReadBuilder readBuilder = monitorTable.newReadBuilder();
1761+
StreamTableScan scan = readBuilder.newStreamScan();
1762+
TableRead read = readBuilder.newRead();
1763+
1764+
BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder();
1765+
BatchTableWrite write = writeBuilder.newWrite();
1766+
write.withIOManager(IOManager.create(tempDir.toString()));
1767+
BatchTableCommit commit = writeBuilder.newCommit();
1768+
1769+
write.write(rowData(1, 10, 100L));
1770+
write.write(rowData(1, 11, 101L));
1771+
commit.commit(write.prepareCommit());
1772+
1773+
List<InternalRow> results = new ArrayList<>();
1774+
read.createReader(scan.plan()).forEachRemaining(results::add);
1775+
read.createReader(scan.plan()).forEachRemaining(results::add);
1776+
assertThat(results).hasSize(1);
1777+
FileMonitorTable.FileChange change = FileMonitorTable.toFileChange(results.get(0));
1778+
assertThat(change.beforeFiles()).hasSize(0);
1779+
assertThat(change.dataFiles()).hasSize(1);
1780+
results.clear();
1781+
1782+
write.close();
1783+
commit.close();
1784+
}
1785+
17541786
@Test
17551787
public void testReadOptimizedTable() throws Exception {
17561788
// let max level has many files

0 commit comments

Comments
 (0)