Skip to content

Commit 83df9c9

Browse files
committed
[core] Unify timetravel processing to starting scanners
1 parent eac2798 commit 83df9c9

File tree

7 files changed

+74
-183
lines changed

7 files changed

+74
-183
lines changed

paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java

Lines changed: 4 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,6 @@
5050
import org.apache.paimon.table.source.StreamDataTableScan;
5151
import org.apache.paimon.table.source.snapshot.SnapshotReader;
5252
import org.apache.paimon.table.source.snapshot.SnapshotReaderImpl;
53-
import org.apache.paimon.table.source.snapshot.StaticFromTimestampStartingScanner;
54-
import org.apache.paimon.table.source.snapshot.StaticFromWatermarkStartingScanner;
5553
import org.apache.paimon.table.source.snapshot.TimeTravelUtil;
5654
import org.apache.paimon.utils.BranchManager;
5755
import org.apache.paimon.utils.CatalogBranchManager;
@@ -88,8 +86,6 @@ abstract class AbstractFileStoreTable implements FileStoreTable {
8886

8987
private static final long serialVersionUID = 1L;
9088

91-
private static final String WATERMARK_PREFIX = "watermark-";
92-
9389
protected final FileIO fileIO;
9490
protected final Path path;
9591
protected final TableSchema tableSchema;
@@ -481,74 +477,12 @@ protected Runnable newExpireRunnable() {
481477

482478
private Optional<TableSchema> tryTimeTravel(Options options) {
483479
CoreOptions coreOptions = new CoreOptions(options);
484-
485-
switch (coreOptions.startupMode()) {
486-
case FROM_SNAPSHOT:
487-
case FROM_SNAPSHOT_FULL:
488-
if (coreOptions.scanVersion() != null) {
489-
return travelToVersion(coreOptions.scanVersion(), options);
490-
} else if (coreOptions.scanSnapshotId() != null) {
491-
return travelToSnapshot(coreOptions.scanSnapshotId(), options);
492-
} else if (coreOptions.scanWatermark() != null) {
493-
return travelToWatermark(coreOptions.scanWatermark(), options);
494-
} else {
495-
return travelToTag(coreOptions.scanTagName(), options);
496-
}
497-
case FROM_TIMESTAMP:
498-
Snapshot snapshot =
499-
StaticFromTimestampStartingScanner.timeTravelToTimestamp(
500-
snapshotManager(), coreOptions.scanTimestampMills());
501-
return travelToSnapshot(snapshot, options);
502-
default:
503-
return Optional.empty();
504-
}
505-
}
506-
507-
/** Tag first when travelling to a version. */
508-
private Optional<TableSchema> travelToVersion(String version, Options options) {
509-
options.remove(CoreOptions.SCAN_VERSION.key());
510-
if (tagManager().tagExists(version)) {
511-
options.set(CoreOptions.SCAN_TAG_NAME, version);
512-
return travelToTag(version, options);
513-
} else if (version.startsWith(WATERMARK_PREFIX)) {
514-
long watermark = Long.parseLong(version.substring(WATERMARK_PREFIX.length()));
515-
options.set(CoreOptions.SCAN_WATERMARK, watermark);
516-
return travelToWatermark(watermark, options);
517-
} else if (version.chars().allMatch(Character::isDigit)) {
518-
options.set(CoreOptions.SCAN_SNAPSHOT_ID.key(), version);
519-
return travelToSnapshot(Long.parseLong(version), options);
520-
} else {
521-
throw new RuntimeException("Cannot find a time travel version for " + version);
522-
}
523-
}
524-
525-
private Optional<TableSchema> travelToTag(String tagName, Options options) {
526-
return travelToSnapshot(tagManager().getOrThrow(tagName).trimToSnapshot(), options);
527-
}
528-
529-
private Optional<TableSchema> travelToSnapshot(long snapshotId, Options options) {
530-
SnapshotManager snapshotManager = snapshotManager();
531-
if (snapshotManager.snapshotExists(snapshotId)) {
532-
return travelToSnapshot(snapshotManager.snapshot(snapshotId), options);
533-
}
534-
return Optional.empty();
535-
}
536-
537-
private Optional<TableSchema> travelToWatermark(long watermark, Options options) {
538480
Snapshot snapshot =
539-
StaticFromWatermarkStartingScanner.timeTravelToWatermark(
540-
snapshotManager(), watermark);
541-
if (snapshot != null) {
542-
return Optional.of(schemaManager().schema(snapshot.schemaId()).copy(options.toMap()));
481+
TimeTravelUtil.resolveSnapshotFromOptions(coreOptions, snapshotManager());
482+
if (snapshot == null) {
483+
return Optional.empty();
543484
}
544-
return Optional.empty();
545-
}
546-
547-
private Optional<TableSchema> travelToSnapshot(@Nullable Snapshot snapshot, Options options) {
548-
if (snapshot != null) {
549-
return Optional.of(schemaManager().schema(snapshot.schemaId()).copy(options.toMap()));
550-
}
551-
return Optional.empty();
485+
return Optional.of(schemaManager().schema(snapshot.schemaId()).copy(options.toMap()));
552486
}
553487

554488
@Override

paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromSnapshotStartingScanner.java

Lines changed: 23 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,11 @@
1919
package org.apache.paimon.table.source.snapshot;
2020

2121
import org.apache.paimon.CoreOptions;
22+
import org.apache.paimon.Snapshot;
2223
import org.apache.paimon.table.source.ScanMode;
2324
import org.apache.paimon.utils.SnapshotManager;
2425

25-
import org.slf4j.Logger;
26-
import org.slf4j.LoggerFactory;
26+
import java.io.FileNotFoundException;
2727

2828
import static org.apache.paimon.utils.Preconditions.checkArgument;
2929

@@ -33,9 +33,6 @@
3333
*/
3434
public class StaticFromSnapshotStartingScanner extends ReadPlanStartingScanner {
3535

36-
private static final Logger LOG =
37-
LoggerFactory.getLogger(StaticFromSnapshotStartingScanner.class);
38-
3936
public StaticFromSnapshotStartingScanner(SnapshotManager snapshotManager, long snapshotId) {
4037
super(snapshotManager);
4138
this.startingSnapshotId = snapshotId;
@@ -48,21 +45,29 @@ public ScanMode startingScanMode() {
4845

4946
@Override
5047
public SnapshotReader configure(SnapshotReader snapshotReader) {
51-
Long earliestSnapshotId = snapshotManager.earliestSnapshotId();
52-
Long latestSnapshotId = snapshotManager.latestSnapshotId();
48+
return snapshotReader.withMode(ScanMode.ALL).withSnapshot(getSnapshot());
49+
}
5350

54-
if (earliestSnapshotId == null || latestSnapshotId == null) {
55-
throw new IllegalArgumentException("There is currently no snapshot.");
56-
}
51+
public Snapshot getSnapshot() {
52+
try {
53+
return snapshotManager.tryGetSnapshot(startingSnapshotId);
54+
} catch (FileNotFoundException e) {
55+
Long earliestSnapshotId = snapshotManager.earliestSnapshotId();
56+
Long latestSnapshotId = snapshotManager.latestSnapshotId();
5757

58-
// Checks earlier whether the specified scan snapshot id is valid.
59-
checkArgument(
60-
startingSnapshotId >= earliestSnapshotId && startingSnapshotId <= latestSnapshotId,
61-
"The specified scan snapshotId %s is out of available snapshotId range [%s, %s].",
62-
startingSnapshotId,
63-
earliestSnapshotId,
64-
latestSnapshotId);
58+
if (earliestSnapshotId == null || latestSnapshotId == null) {
59+
throw new IllegalArgumentException("There is currently no snapshot.");
60+
}
6561

66-
return snapshotReader.withMode(ScanMode.ALL).withSnapshot(startingSnapshotId);
62+
// Checks earlier whether the specified scan snapshot id is valid.
63+
checkArgument(
64+
startingSnapshotId >= earliestSnapshotId
65+
&& startingSnapshotId <= latestSnapshotId,
66+
"The specified scan snapshotId %s is out of available snapshotId range [%s, %s].",
67+
startingSnapshotId,
68+
earliestSnapshotId,
69+
latestSnapshotId);
70+
throw new RuntimeException(e);
71+
}
6772
}
6873
}

paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromTagStartingScanner.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,16 +34,19 @@ public StaticFromTagStartingScanner(SnapshotManager snapshotManager, String tagN
3434
this.tagName = tagName;
3535
}
3636

37+
public Snapshot getSnapshot() {
38+
TagManager tagManager =
39+
new TagManager(snapshotManager.fileIO(), snapshotManager.tablePath());
40+
return tagManager.getOrThrow(tagName).trimToSnapshot();
41+
}
42+
3743
@Override
3844
public ScanMode startingScanMode() {
3945
return ScanMode.ALL;
4046
}
4147

4248
@Override
4349
public SnapshotReader configure(SnapshotReader snapshotReader) {
44-
TagManager tagManager =
45-
new TagManager(snapshotManager.fileIO(), snapshotManager.tablePath());
46-
Snapshot snapshot = tagManager.getOrThrow(tagName).trimToSnapshot();
47-
return snapshotReader.withMode(ScanMode.ALL).withSnapshot(snapshot);
50+
return snapshotReader.withMode(ScanMode.ALL).withSnapshot(getSnapshot());
4851
}
4952
}

paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromTimestampStartingScanner.java

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,6 @@
2323
import org.apache.paimon.table.source.ScanMode;
2424
import org.apache.paimon.utils.SnapshotManager;
2525

26-
import org.slf4j.Logger;
27-
import org.slf4j.LoggerFactory;
28-
2926
import javax.annotation.Nullable;
3027

3128
/**
@@ -34,15 +31,11 @@
3431
*/
3532
public class StaticFromTimestampStartingScanner extends ReadPlanStartingScanner {
3633

37-
private static final Logger LOG =
38-
LoggerFactory.getLogger(StaticFromTimestampStartingScanner.class);
39-
40-
private final long startupMillis;
34+
private final Snapshot snapshot;
4135

4236
public StaticFromTimestampStartingScanner(SnapshotManager snapshotManager, long startupMillis) {
4337
super(snapshotManager);
44-
this.startupMillis = startupMillis;
45-
Snapshot snapshot = timeTravelToTimestamp(snapshotManager, startupMillis);
38+
this.snapshot = timeTravelToTimestamp(snapshotManager, startupMillis);
4639
if (snapshot == null) {
4740
Snapshot earliestSnapshot = snapshotManager.earliestSnapshot();
4841
throw new IllegalArgumentException(
@@ -56,9 +49,13 @@ public StaticFromTimestampStartingScanner(SnapshotManager snapshotManager, long
5649
this.startingSnapshotId = snapshot.id();
5750
}
5851

52+
public Snapshot getSnapshot() {
53+
return snapshot;
54+
}
55+
5956
@Override
6057
public SnapshotReader configure(SnapshotReader snapshotReader) {
61-
return snapshotReader.withMode(ScanMode.ALL).withSnapshot(startingSnapshotId);
58+
return snapshotReader.withMode(ScanMode.ALL).withSnapshot(snapshot);
6259
}
6360

6461
@Nullable

paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromWatermarkStartingScanner.java

Lines changed: 17 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -34,15 +34,26 @@ public class StaticFromWatermarkStartingScanner extends ReadPlanStartingScanner
3434
private static final Logger LOG =
3535
LoggerFactory.getLogger(StaticFromWatermarkStartingScanner.class);
3636

37-
private final long watermark;
37+
private final Snapshot snapshot;
3838

3939
public StaticFromWatermarkStartingScanner(SnapshotManager snapshotManager, long watermark) {
4040
super(snapshotManager);
41-
this.watermark = watermark;
42-
Snapshot snapshot = timeTravelToWatermark(snapshotManager, watermark);
43-
if (snapshot != null) {
44-
this.startingSnapshotId = snapshot.id();
41+
this.snapshot = timeTravelToWatermark(snapshotManager, watermark);
42+
if (snapshot == null) {
43+
LOG.warn(
44+
"There is currently no snapshot later than or equal to watermark[{}]",
45+
watermark);
46+
throw new RuntimeException(
47+
String.format(
48+
"There is currently no snapshot later than or equal to "
49+
+ "watermark[%d]",
50+
watermark));
4551
}
52+
this.startingSnapshotId = snapshot.id();
53+
}
54+
55+
public Snapshot getSnapshot() {
56+
return snapshot;
4657
}
4758

4859
@Override
@@ -52,17 +63,7 @@ public ScanMode startingScanMode() {
5263

5364
@Override
5465
public SnapshotReader configure(SnapshotReader snapshotReader) {
55-
if (startingSnapshotId == null) {
56-
LOG.warn(
57-
"There is currently no snapshot later than or equal to watermark[{}]",
58-
watermark);
59-
throw new RuntimeException(
60-
String.format(
61-
"There is currently no snapshot later than or equal to "
62-
+ "watermark[%d]",
63-
watermark));
64-
}
65-
return snapshotReader.withMode(ScanMode.ALL).withSnapshot(startingSnapshotId);
66+
return snapshotReader.withMode(ScanMode.ALL).withSnapshot(snapshot);
6667
}
6768

6869
@Nullable

paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/TimeTravelUtil.java

Lines changed: 16 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,6 @@
2626
import org.apache.paimon.utils.ChangelogManager;
2727
import org.apache.paimon.utils.FunctionWithException;
2828
import org.apache.paimon.utils.SnapshotManager;
29-
import org.apache.paimon.utils.SnapshotNotExistException;
30-
import org.apache.paimon.utils.TagManager;
3129

3230
import org.slf4j.Logger;
3331
import org.slf4j.LoggerFactory;
@@ -38,7 +36,6 @@
3836
import java.util.ArrayList;
3937
import java.util.List;
4038

41-
import static org.apache.paimon.CoreOptions.SCAN_SNAPSHOT_ID;
4239
import static org.apache.paimon.utils.Preconditions.checkArgument;
4340
import static org.apache.paimon.utils.SnapshotManager.EARLIEST_SNAPSHOT_DEFAULT_RETRY_NUM;
4441

@@ -54,10 +51,12 @@ public class TimeTravelUtil {
5451
CoreOptions.SCAN_TIMESTAMP_MILLIS.key()
5552
};
5653

54+
@Nullable
5755
public static Snapshot resolveSnapshot(FileStoreTable table) {
5856
return resolveSnapshotFromOptions(table.coreOptions(), table.snapshotManager());
5957
}
6058

59+
@Nullable
6160
public static Snapshot resolveSnapshotFromOptions(
6261
CoreOptions options, SnapshotManager snapshotManager) {
6362
List<String> scanHandleKey = new ArrayList<>(1);
@@ -67,7 +66,7 @@ public static Snapshot resolveSnapshotFromOptions(
6766
}
6867
}
6968

70-
if (scanHandleKey.size() == 0) {
69+
if (scanHandleKey.isEmpty()) {
7170
return snapshotManager.latestSnapshot();
7271
}
7372

@@ -83,61 +82,27 @@ public static Snapshot resolveSnapshotFromOptions(
8382
String key = scanHandleKey.get(0);
8483
Snapshot snapshot = null;
8584
if (key.equals(CoreOptions.SCAN_SNAPSHOT_ID.key())) {
86-
snapshot = resolveSnapshotBySnapshotId(snapshotManager, options);
85+
snapshot =
86+
new StaticFromSnapshotStartingScanner(snapshotManager, options.scanSnapshotId())
87+
.getSnapshot();
8788
} else if (key.equals(CoreOptions.SCAN_WATERMARK.key())) {
88-
snapshot = resolveSnapshotByWatermark(snapshotManager, options);
89+
snapshot =
90+
new StaticFromWatermarkStartingScanner(snapshotManager, options.scanWatermark())
91+
.getSnapshot();
8992
} else if (key.equals(CoreOptions.SCAN_TIMESTAMP_MILLIS.key())) {
90-
snapshot = resolveSnapshotByTimestamp(snapshotManager, options);
93+
snapshot =
94+
new StaticFromTimestampStartingScanner(
95+
snapshotManager, options.scanTimestampMills())
96+
.getSnapshot();
9197
} else if (key.equals(CoreOptions.SCAN_TAG_NAME.key())) {
92-
snapshot = resolveSnapshotByTagName(snapshotManager, options);
98+
snapshot =
99+
new StaticFromTagStartingScanner(snapshotManager, options.scanTagName())
100+
.getSnapshot();
93101
}
94102

95-
if (snapshot == null) {
96-
snapshot = snapshotManager.latestSnapshot();
97-
}
98103
return snapshot;
99104
}
100105

101-
private static Snapshot resolveSnapshotBySnapshotId(
102-
SnapshotManager snapshotManager, CoreOptions options) {
103-
Long snapshotId = options.scanSnapshotId();
104-
if (snapshotId != null) {
105-
if (!snapshotManager.snapshotExists(snapshotId)) {
106-
Long earliestSnapshotId = snapshotManager.earliestSnapshotId();
107-
Long latestSnapshotId = snapshotManager.latestSnapshotId();
108-
throw new SnapshotNotExistException(
109-
String.format(
110-
"Specified parameter %s = %s is not exist, you can set it in range from %s to %s.",
111-
SCAN_SNAPSHOT_ID.key(),
112-
snapshotId,
113-
earliestSnapshotId,
114-
latestSnapshotId));
115-
}
116-
return snapshotManager.snapshot(snapshotId);
117-
}
118-
return null;
119-
}
120-
121-
private static Snapshot resolveSnapshotByTimestamp(
122-
SnapshotManager snapshotManager, CoreOptions options) {
123-
Long timestamp = options.scanTimestampMills();
124-
return snapshotManager.earlierOrEqualTimeMills(timestamp);
125-
}
126-
127-
private static Snapshot resolveSnapshotByWatermark(
128-
SnapshotManager snapshotManager, CoreOptions options) {
129-
Long watermark = options.scanWatermark();
130-
return snapshotManager.laterOrEqualWatermark(watermark);
131-
}
132-
133-
private static Snapshot resolveSnapshotByTagName(
134-
SnapshotManager snapshotManager, CoreOptions options) {
135-
String tagName = options.scanTagName();
136-
TagManager tagManager =
137-
new TagManager(snapshotManager.fileIO(), snapshotManager.tablePath());
138-
return tagManager.getOrThrow(tagName).trimToSnapshot();
139-
}
140-
141106
/**
142107
* Returns the latest snapshot earlier than the timestamp mills. A non-existent snapshot may be
143108
* returned if all snapshots are equal to or later than the timestamp mills.

0 commit comments

Comments
 (0)