Skip to content

Commit c52c37c

Browse files
committed
[core] StreamScan should skip empty plan to more timely
1 parent 27b23b3 commit c52c37c

File tree

3 files changed

+53
-10
lines changed

3 files changed

+53
-10
lines changed

paimon-core/src/main/java/org/apache/paimon/table/source/DataTableStreamScan.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,9 @@ private Plan nextPlan() {
220220
SnapshotReader.Plan overwritePlan = handleOverwriteSnapshot(snapshot);
221221
if (overwritePlan != null) {
222222
nextSnapshotId++;
223+
if (overwritePlan.splits().isEmpty()) {
224+
continue;
225+
}
223226
return overwritePlan;
224227
}
225228
}
@@ -229,6 +232,9 @@ private Plan nextPlan() {
229232
SnapshotReader.Plan plan = followUpScanner.scan(snapshot, snapshotReader);
230233
currentWatermark = plan.watermark();
231234
nextSnapshotId++;
235+
if (plan.splits().isEmpty()) {
236+
continue;
237+
}
232238
return plan;
233239
} else {
234240
nextSnapshotId++;

paimon-core/src/test/java/org/apache/paimon/table/source/StreamTableScanTest.java

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,52 @@
4545
/** Tests for {@link StreamTableScan}. */
4646
public class StreamTableScanTest extends ScannerTestBase {
4747

48+
@Test
49+
public void testIgnoreEmptyPlan() throws Exception {
50+
TableRead read = table.newRead();
51+
StreamTableWrite write = table.newWrite(commitUser);
52+
StreamTableCommit commit = table.newCommit(commitUser);
53+
54+
// only plan partition 1
55+
ReadBuilder readBuilder = table.newReadBuilder();
56+
PredicateBuilder predicateBuilder = new PredicateBuilder(readBuilder.readType());
57+
readBuilder.withFilter(predicateBuilder.lessOrEqual(0, 1));
58+
StreamTableScan scan = readBuilder.newStreamScan();
59+
60+
// first call without any snapshot, should return empty plan
61+
assertThat(scan.plan().splits()).isEmpty();
62+
63+
// write partition 1
64+
write.write(rowData(1, 10, 100L));
65+
commit.commit(0, write.prepareCommit(true, 0));
66+
67+
// first call with snapshot, should return complete records from 1 commit
68+
TableScan.Plan plan = scan.plan();
69+
assertThat(getResult(read, plan.splits()))
70+
.hasSameElementsAs(Collections.singletonList("+I 1|10|100"));
71+
72+
// write partition 2
73+
write.write(rowData(2, 10, 100L));
74+
commit.commit(1, write.prepareCommit(true, 1));
75+
76+
// write partition 1
77+
write.write(rowData(1, 20, 200L));
78+
commit.commit(2, write.prepareCommit(true, 2));
79+
80+
// second call with snapshot, should ignore 2 commit and return complete records from 3
81+
// commit
82+
plan = scan.plan();
83+
assertThat(getResult(read, plan.splits()))
84+
.hasSameElementsAs(Collections.singletonList("+I 1|20|200"));
85+
86+
// third call without snapshot
87+
plan = scan.plan();
88+
assertThat(plan.splits()).isEmpty();
89+
90+
write.close();
91+
commit.close();
92+
}
93+
4894
@Test
4995
public void testPlan() throws Exception {
5096
TableRead read = table.newRead();

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -265,20 +265,11 @@ public void refresh() throws Exception {
265265
}
266266

267267
private void doRefresh() throws Exception {
268-
Long latestSnapshotId = context.table.snapshotManager().latestSnapshotId();
269-
if (latestSnapshotId == null) {
270-
return;
271-
}
272-
273268
while (true) {
274269
try (RecordReaderIterator<InternalRow> batch =
275270
new RecordReaderIterator<>(reader.nextBatch(false))) {
276271
if (!batch.hasNext()) {
277-
if (reader.nextSnapshotId() > latestSnapshotId) {
278-
return;
279-
} else {
280-
continue;
281-
}
272+
return;
282273
}
283274
refresh(batch);
284275
}

0 commit comments

Comments
 (0)